Skip to content

Commit 126a26d

Browse files
committed
Cache topic checks for ten minutes in messaging providers.
1 parent a80a69b commit 126a26d

4 files changed

Lines changed: 53 additions & 17 deletions

File tree

src/OpenDDD.Tests/Integration/Infrastructure/Events/Kafka/KafkaMessagingProviderTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public async Task CompetingConsumers_ShouldDistributeMessages_WhenMultipleConsum
365365
var consumerGroup = "test-consumer-group";
366366
var totalMessages = 100;
367367
var numConsumers = 2;
368-
var variancePercentage = 0.2;
368+
var variancePercentage = 0.3;
369369
var perConsumerMessageCount = new ConcurrentDictionary<Guid, int>(); // Track messages per consumer
370370
var allMessagesProcessed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
371371

src/OpenDDD/Infrastructure/Events/Azure/AzureServiceBusMessagingProvider.cs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public class AzureServiceBusMessagingProvider : IMessagingProvider, IAsyncDispos
1313
private readonly bool _autoCreateTopics;
1414
private readonly ILogger<AzureServiceBusMessagingProvider> _logger;
1515
private readonly ConcurrentDictionary<string, AzureServiceBusSubscription> _subscriptions = new();
16+
private readonly ConcurrentDictionary<string, DateTime> _topicCache = new();
17+
private readonly TimeSpan _cacheExpiration = TimeSpan.FromSeconds(600);
1618
private bool _disposed;
1719

1820
public AzureServiceBusMessagingProvider(
@@ -105,18 +107,29 @@ public async Task PublishAsync(string topic, string message, CancellationToken c
105107

106108
private async Task EnsureTopicExistsAsync(string topic, CancellationToken cancellationToken)
107109
{
108-
if (!await _adminClient.TopicExistsAsync(topic, cancellationToken))
110+
if (_topicCache.TryGetValue(topic, out var lastChecked) && DateTime.UtcNow - lastChecked < _cacheExpiration)
109111
{
110-
if (_autoCreateTopics)
111-
{
112-
await _adminClient.CreateTopicAsync(topic, cancellationToken);
113-
_logger.LogInformation("Created topic: {Topic}", topic);
114-
}
115-
else
116-
{
117-
throw new InvalidOperationException($"Topic '{topic}' does not exist. Enable 'autoCreateTopics' to create topics automatically.");
118-
}
112+
_logger.LogDebug("Skipping topic check for '{Topic}' (cached result).", topic);
113+
return;
114+
}
115+
116+
var topicExists = await _adminClient.TopicExistsAsync(topic, cancellationToken);
117+
118+
if (topicExists)
119+
{
120+
_topicCache[topic] = DateTime.UtcNow;
121+
return;
122+
}
123+
124+
if (_autoCreateTopics)
125+
{
126+
await _adminClient.CreateTopicAsync(topic, cancellationToken);
127+
_logger.LogInformation("Created topic: {Topic}", topic);
128+
_topicCache[topic] = DateTime.UtcNow;
129+
return;
119130
}
131+
132+
throw new InvalidOperationException($"Topic '{topic}' does not exist. Enable 'autoCreateTopics' to create topics automatically.");
120133
}
121134

122135
private async Task CreateSubscriptionIfNotExistsAsync(string topic, string subscriptionName, CancellationToken cancellationToken)

src/OpenDDD/Infrastructure/Events/Kafka/KafkaMessagingProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public class KafkaMessagingProvider : IMessagingProvider, IAsyncDisposable
1515
private readonly IKafkaConsumerFactory _consumerFactory;
1616
private readonly ILogger<KafkaMessagingProvider> _logger;
1717
private readonly ConcurrentDictionary<string, KafkaSubscription> _subscriptions = new();
18+
private readonly ConcurrentDictionary<string, DateTime> _topicCache = new();
19+
private readonly TimeSpan _cacheExpiration = TimeSpan.FromSeconds(600);
1820
private readonly CancellationTokenSource _cts = new();
1921
private bool _disposed;
2022

@@ -93,11 +95,18 @@ public async Task PublishAsync(string topic, string message, CancellationToken c
9395

9496
private async Task EnsureTopicExistsAsync(string topic, CancellationToken cancellationToken)
9597
{
98+
if (_topicCache.TryGetValue(topic, out var lastChecked) && DateTime.UtcNow - lastChecked < _cacheExpiration)
99+
{
100+
_logger.LogDebug("Skipping topic check for '{Topic}' (cached result).", topic);
101+
return;
102+
}
103+
96104
var metadata = _adminClient.GetMetadata(TimeSpan.FromSeconds(5));
97105

98106
if (metadata.Topics.Any(t => t.Topic == topic))
99107
{
100108
_logger.LogDebug("Topic '{Topic}' already exists.", topic);
109+
_topicCache[topic] = DateTime.UtcNow;
101110
return;
102111
}
103112

@@ -109,6 +118,8 @@ await _adminClient.CreateTopicsAsync(new[]
109118
new TopicSpecification { Name = topic, NumPartitions = 2, ReplicationFactor = 1 }
110119
}, null);
111120

121+
_topicCache[topic] = DateTime.UtcNow;
122+
112123
// Wait for the topic to be available
113124
for (int i = 0; i < 30; i++)
114125
{

src/OpenDDD/Infrastructure/Events/RabbitMq/RabbitMqMessagingProvider.cs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ public class RabbitMqMessagingProvider : IMessagingProvider, IAsyncDisposable
1717
private IConnection? _connection;
1818
private IChannel? _channel;
1919
private readonly ConcurrentDictionary<string, RabbitMqSubscription> _subscriptions = new();
20+
private readonly ConcurrentDictionary<string, DateTime> _topicCache = new();
21+
private readonly TimeSpan _cacheExpiration = TimeSpan.FromSeconds(600);
22+
private bool _disposed;
2023

2124
public RabbitMqMessagingProvider(
2225
IConnectionFactory factory,
@@ -104,23 +107,29 @@ private async Task EnsureConnectedAsync(CancellationToken cancellationToken)
104107

105108
private async Task EnsureTopicExistsAsync(string topic, CancellationToken cancellationToken)
106109
{
107-
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
110+
if (_topicCache.TryGetValue(topic, out var lastChecked) && DateTime.UtcNow - lastChecked < _cacheExpiration)
111+
{
112+
_logger.LogDebug("Skipping exchange check for '{Topic}' (cached result).", topic);
113+
return;
114+
}
108115

116+
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
117+
109118
if (exchangeExists)
110119
{
111-
_logger.LogDebug("Exchange '{Topic}' already exists.", topic);
120+
_topicCache[topic] = DateTime.UtcNow;
112121
return;
113122
}
114123

115124
if (_autoCreateTopics)
116125
{
117126
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
118127
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
128+
_topicCache[topic] = DateTime.UtcNow;
129+
return;
119130
}
120-
else
121-
{
122-
throw new InvalidOperationException($"Topic '{topic}' does not exist. Enable 'autoCreateTopics' to create topics automatically.");
123-
}
131+
132+
throw new InvalidOperationException($"Topic '{topic}' does not exist. Enable 'autoCreateTopics' to create topics automatically.");
124133
}
125134

126135
private async Task<bool> ExchangeExistsAsync(string exchange, CancellationToken cancellationToken)
@@ -147,6 +156,9 @@ private async Task<bool> ExchangeExistsAsync(string exchange, CancellationToken
147156

148157
public async ValueTask DisposeAsync()
149158
{
159+
if (_disposed) return;
160+
_disposed = true;
161+
150162
_logger.LogDebug("Disposing RabbitMqMessagingProvider...");
151163

152164
foreach (var subscription in _subscriptions.Values)

0 commit comments

Comments
 (0)