Skip to content

Commit a6eb9c2

Browse files
committed
Refactor tests.
1 parent 6baa4f8 commit a6eb9c2

20 files changed

Lines changed: 326 additions & 217 deletions

src/OpenDDD.Tests/Integration/Infrastructure/Events/Azure/AzureServiceBusMessagingProviderTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ public async Task AtLeastOnceGurantee_ShouldDeliverToLateSubscriber_WhenSubscrib
121121
var messageToSend = "Persistent Message Test";
122122
var messageReceivedTcs = new TaskCompletionSource<bool>();
123123

124-
await _messagingProvider.SubscribeAsync(topicName, subscriptionName, async (msg, token) =>
124+
var firstSubscription = await _messagingProvider.SubscribeAsync(topicName, subscriptionName, async (msg, token) =>
125125
{
126126
Assert.Fail("First subscription should not receive the message.");
127127
}, _cts.Token);
128128

129129
await Task.Delay(500);
130130

131-
await _messagingProvider.UnsubscribeAsync(topicName, subscriptionName, _cts.Token);
131+
await _messagingProvider.UnsubscribeAsync(firstSubscription, _cts.Token);
132132

133133
await Task.Delay(500);
134134

src/OpenDDD.Tests/Integration/Infrastructure/Events/InMemory/InMemoryMessagingProviderTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public async Task AtLeastOnceGuarantee_ShouldDeliverToLateSubscriber_WhenSubscri
4040

4141
var lateSubscriberReceived = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
4242

43-
await _messagingProvider.SubscribeAsync(topicName, groupName, async (msg, token) =>
43+
var firstSubscription = await _messagingProvider.SubscribeAsync(topicName, groupName, async (msg, token) =>
4444
{
4545
Assert.Fail("First subscription should not receive the message.");
4646
}, _cts.Token);
4747

4848
await Task.Delay(500);
4949

50-
await _messagingProvider.UnsubscribeAsync(topicName, groupName, _cts.Token);
50+
await _messagingProvider.UnsubscribeAsync(firstSubscription, _cts.Token);
5151

5252
// Act
5353
await _messagingProvider.PublishAsync(topicName, messageToSend, _cts.Token);
@@ -178,7 +178,7 @@ public async Task CompetingConsumers_ShouldDistributeMessages_WhenMultipleConsum
178178
var consumerGroup = "test-consumer-group";
179179
var totalMessages = 100;
180180
var numConsumers = 2;
181-
var variancePercentage = 0.2;
181+
var variancePercentage = 0.3;
182182
var perConsumerMessageCount = new ConcurrentDictionary<Guid, int>();
183183
var allMessagesProcessed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
184184

src/OpenDDD.Tests/Integration/Infrastructure/Events/Kafka/KafkaMessagingProviderTests.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public KafkaMessagingProviderTests(ITestOutputHelper testOutputHelper)
3737
_consumerFactory = new KafkaConsumerFactory(_bootstrapServers, _consumerLogger);
3838

3939
_messagingProvider = new KafkaMessagingProvider(
40-
_bootstrapServers,
4140
_adminClient,
4241
_producer,
4342
_consumerFactory,
@@ -98,7 +97,6 @@ public async Task AutoCreateTopic_ShouldCreateTopicOnSubscribe_WhenSettingEnable
9897
var topicName = $"test-topic-{Guid.NewGuid()}";
9998
var consumerGroup = "test-consumer-group";
10099

101-
// Ensure topic does not exist
102100
var metadata = _adminClient.GetMetadata(TimeSpan.FromSeconds(5));
103101
metadata.Topics.Any(t => t.Topic == topicName).Should().BeFalse();
104102

@@ -134,7 +132,6 @@ public async Task AutoCreateTopic_ShouldNotCreateTopicOnSubscribe_WhenSettingDis
134132
var consumerGroup = "test-consumer-group";
135133

136134
var messagingProvider = new KafkaMessagingProvider(
137-
_bootstrapServers,
138135
_adminClient,
139136
_producer,
140137
_consumerFactory,
@@ -162,7 +159,7 @@ public async Task AtLeastOnceGurantee_ShouldDeliverToLateSubscriber_WhenSubscrib
162159
var lateSubscriberReceived = new TaskCompletionSource<bool>();
163160
ConcurrentBag<string> _receivedMessages = new();
164161

165-
await _messagingProvider.SubscribeAsync(topicName, consumerGroup, async (msg, token) =>
162+
var firstSubscription = await _messagingProvider.SubscribeAsync(topicName, consumerGroup, async (msg, token) =>
166163
{
167164
firstSubscriberReceived.SetResult(true);
168165
}, _cts.Token);
@@ -171,7 +168,7 @@ await _messagingProvider.SubscribeAsync(topicName, consumerGroup, async (msg, to
171168
await _messagingProvider.PublishAsync(topicName, messageToSend, _cts.Token);
172169
await firstSubscriberReceived.Task.WaitAsync(TimeSpan.FromSeconds(10));
173170

174-
await _messagingProvider.UnsubscribeAsync(topicName, consumerGroup, _cts.Token);
171+
await _messagingProvider.UnsubscribeAsync(firstSubscription, _cts.Token);
175172

176173
await Task.Delay(5000);
177174

@@ -276,7 +273,6 @@ async Task MessageHandler(string msg, CancellationToken token)
276273
messageProcessed.TrySetResult(true);
277274
}
278275

279-
// Subscribe multiple consumers in the same group
280276
await _messagingProvider.SubscribeAsync(topicName, consumerGroup, MessageHandler, _cts.Token);
281277
await _messagingProvider.SubscribeAsync(topicName, consumerGroup, MessageHandler, _cts.Token);
282278

@@ -295,10 +291,9 @@ async Task MessageHandler(string msg, CancellationToken token)
295291
Assert.Fail("No consumer received the message.");
296292
}
297293

298-
// Wait a little longer to ensure no second consumer processes the same message
299294
await Task.Delay(5000);
300295

301-
// Assert: Exactly one consumer should have received the message
296+
// Assert
302297
receivedMessages.Count.Should().Be(1,
303298
$"Expected only one consumer to receive the message, but {receivedMessages.Count} consumers received it.");
304299
}
@@ -311,7 +306,7 @@ public async Task CompetingConsumers_ShouldDistributeMessages_WhenMultipleConsum
311306
var consumerGroup = "test-consumer-group";
312307
var totalMessages = 100;
313308
var numConsumers = 2;
314-
var variancePercentage = 0.1; // Allow 10% variance
309+
var variancePercentage = 0.2;
315310
var perConsumerMessageCount = new ConcurrentDictionary<Guid, int>(); // Track messages per consumer
316311
var allMessagesProcessed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
317312

@@ -334,7 +329,6 @@ async Task MessageHandler(string msg, CancellationToken token)
334329
await _messagingProvider.SubscribeAsync(topicName, consumerGroup, MessageHandler, _cts.Token);
335330
}
336331

337-
// Subscribe multiple consumers
338332
for (int i = 0; i < numConsumers; i++)
339333
{
340334
await CreateConsumer();

src/OpenDDD.Tests/Integration/Infrastructure/Events/RabbitMq/RabbitMqMessagingProviderTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,13 @@ public async Task AtLeastOnceGurantee_ShouldDeliverToLateSubscriber_WhenSubscrib
160160

161161
var lateSubscriberReceived = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
162162

163-
await _messagingProvider.SubscribeAsync(_testTopic, _testConsumerGroup, async (msg, token) =>
163+
var firstSubscription = await _messagingProvider.SubscribeAsync(_testTopic, _testConsumerGroup, async (msg, token) =>
164164
{
165165
Assert.Fail("First subscription should not receive the message.");
166166
}, _cts.Token);
167167
await Task.Delay(500);
168168

169-
await _messagingProvider.UnsubscribeAsync(_testTopic, _testConsumerGroup, _cts.Token);
169+
await _messagingProvider.UnsubscribeAsync(firstSubscription, _cts.Token);
170170
await Task.Delay(500);
171171

172172
// Act

src/OpenDDD.Tests/Unit/Infrastructure/Events/InMemory/InMemoryMessagingProviderTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ await Assert.ThrowsAsync<ArgumentException>(() =>
5252
[Fact]
5353
public async Task SubscribeAsync_ShouldThrowException_WhenHandlerIsNull()
5454
{
55-
await Assert.ThrowsAsync<ArgumentNullException>(() =>
55+
await Assert.ThrowsAsync<ArgumentException>(() =>
5656
_messagingProvider.SubscribeAsync(Topic, ConsumerGroup, null!, CancellationToken.None));
5757
}
5858

src/OpenDDD.Tests/Unit/Infrastructure/Events/Kafka/KafkaMessagingProviderTests.cs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public KafkaMessagingProviderTests()
3131
_mockConsumerLogger = new Mock<ILogger<KafkaConsumer>>();
3232

3333
_provider = new KafkaMessagingProvider(
34-
BootstrapServers,
3534
_mockAdminClient.Object,
3635
_mockProducer.Object,
3736
_mockConsumerFactory.Object,
@@ -58,22 +57,20 @@ public KafkaMessagingProviderTests()
5857
}
5958

6059
[Theory]
61-
[InlineData(null, "adminClient", "producer", "consumerFactory", "logger")]
62-
[InlineData("bootstrapServers", null, "producer", "consumerFactory", "logger")]
63-
[InlineData("bootstrapServers", "adminClient", null, "consumerFactory", "logger")]
64-
[InlineData("bootstrapServers", "adminClient", "producer", null, "logger")]
65-
[InlineData("bootstrapServers", "adminClient", "producer", "consumerFactory", null)]
60+
[InlineData(null, "producer", "consumerFactory", "logger")]
61+
[InlineData("adminClient", null, "consumerFactory", "logger")]
62+
[InlineData("adminClient", "producer", null, "logger")]
63+
[InlineData("adminClient", "producer", "consumerFactory", null)]
6664
public void Constructor_ShouldThrowException_WhenDependenciesAreNull(
67-
string? bootstrapServers, string? adminClient, string? producer, string? consumerFactory, string? logger)
65+
string? adminClient, string? producer, string? consumerFactory, string? logger)
6866
{
69-
var bs = bootstrapServers is null ? null! : BootstrapServers;
7067
var mockAdmin = adminClient is null ? null! : _mockAdminClient.Object;
7168
var mockProducer = producer is null ? null! : _mockProducer.Object;
7269
var mockConsumerFactory = consumerFactory is null ? null! : _mockConsumerFactory.Object;
7370
var mockLogger = logger is null ? null! : _mockLogger.Object;
7471

7572
Assert.Throws<ArgumentNullException>(() =>
76-
new KafkaMessagingProvider(bs, mockAdmin, mockProducer, mockConsumerFactory, true, mockLogger));
73+
new KafkaMessagingProvider(mockAdmin, mockProducer, mockConsumerFactory, true, mockLogger));
7774
}
7875

7976
[Theory]

src/OpenDDD/API/Extensions/OpenDddServiceCollectionExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,6 @@ private static void AddKafka(this IServiceCollection services)
340340
var logger = provider.GetRequiredService<ILogger<KafkaMessagingProvider>>();
341341
var consumerLogger = provider.GetRequiredService<ILogger<KafkaConsumer>>();
342342
return new KafkaMessagingProvider(
343-
kafkaOptions.BootstrapServers,
344343
new AdminClientBuilder(new AdminClientConfig { BootstrapServers = kafkaOptions.BootstrapServers, ClientId = "OpenDDD" }).Build(),
345344
new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = kafkaOptions.BootstrapServers, ClientId = "OpenDDD" }).Build(),
346345
new KafkaConsumerFactory(kafkaOptions.BootstrapServers, consumerLogger),

src/OpenDDD/Infrastructure/Events/Azure/AzureServiceBusMessagingProvider.cs

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Microsoft.Extensions.Logging;
1+
using System.Collections.Concurrent;
2+
using Microsoft.Extensions.Logging;
3+
using OpenDDD.Infrastructure.Events.Base;
24
using Azure.Messaging.ServiceBus;
35
using Azure.Messaging.ServiceBus.Administration;
46

@@ -10,7 +12,7 @@ public class AzureServiceBusMessagingProvider : IMessagingProvider, IAsyncDispos
1012
private readonly ServiceBusAdministrationClient _adminClient;
1113
private readonly bool _autoCreateTopics;
1214
private readonly ILogger<AzureServiceBusMessagingProvider> _logger;
13-
private readonly List<ServiceBusProcessor> _processors = new();
15+
private readonly ConcurrentDictionary<string, AzureServiceBusSubscription> _subscriptions = new();
1416
private bool _disposed;
1517

1618
public AzureServiceBusMessagingProvider(
@@ -25,7 +27,7 @@ public AzureServiceBusMessagingProvider(
2527
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2628
}
2729

28-
public async Task SubscribeAsync(string topic, string consumerGroup, Func<string, CancellationToken, Task> messageHandler, CancellationToken cancellationToken = default)
30+
public async Task<ISubscription> SubscribeAsync(string topic, string consumerGroup, Func<string, CancellationToken, Task> messageHandler, CancellationToken cancellationToken = default)
2931
{
3032
if (string.IsNullOrWhiteSpace(topic))
3133
{
@@ -59,7 +61,6 @@ public async Task SubscribeAsync(string topic, string consumerGroup, Func<string
5961
await CreateSubscriptionIfNotExistsAsync(topic, subscriptionName, cancellationToken);
6062

6163
var processor = _client.CreateProcessor(topic, subscriptionName);
62-
_processors.Add(processor);
6364

6465
processor.ProcessMessageAsync += async args =>
6566
{
@@ -73,41 +74,30 @@ public async Task SubscribeAsync(string topic, string consumerGroup, Func<string
7374
return Task.CompletedTask;
7475
};
7576

76-
_logger.LogInformation("Starting message processor for topic '{Topic}' and subscription '{Subscription}'", topic, subscriptionName);
77+
var subscription = new AzureServiceBusSubscription(topic, consumerGroup, processor);
78+
_subscriptions[subscription.Id] = subscription;
79+
80+
_logger.LogInformation("Starting message processor for topic '{Topic}' and subscription '{Subscription}', Subscription ID: {SubscriptionId}", topic, subscriptionName, subscription.Id);
7781
await processor.StartProcessingAsync(cancellationToken);
82+
83+
return subscription;
7884
}
7985

80-
public async Task UnsubscribeAsync(string topic, string consumerGroup, CancellationToken cancellationToken = default)
86+
public async Task UnsubscribeAsync(ISubscription subscription, CancellationToken cancellationToken = default)
8187
{
82-
if (string.IsNullOrWhiteSpace(topic))
83-
{
84-
throw new ArgumentException("Topic cannot be null or empty.", nameof(topic));
85-
}
88+
if (subscription == null)
89+
throw new ArgumentNullException(nameof(subscription));
8690

87-
if (string.IsNullOrWhiteSpace(consumerGroup))
91+
if (subscription is not AzureServiceBusSubscription serviceBusSubscription || !_subscriptions.TryRemove(serviceBusSubscription.Id, out _))
8892
{
89-
throw new ArgumentException("Consumer group cannot be null or empty.", nameof(consumerGroup));
93+
_logger.LogWarning("No active subscription found with ID {SubscriptionId}", subscription.Id);
94+
return;
9095
}
9196

92-
var subscriptionName = consumerGroup;
93-
94-
_logger.LogInformation("Unsubscribing from topic '{Topic}' and subscription '{Subscription}'", topic, subscriptionName);
95-
96-
var processor = _processors.FirstOrDefault(p =>
97-
p.EntityPath.Equals($"{topic}/Subscriptions/{subscriptionName}", StringComparison.OrdinalIgnoreCase));
98-
99-
if (processor != null)
100-
{
101-
_processors.Remove(processor);
102-
_logger.LogInformation("Stopping and disposing message processor for topic '{Topic}' and subscription '{Subscription}'", topic, subscriptionName);
97+
_logger.LogInformation("Unsubscribing from Azure Service Bus topic '{Topic}' and subscription '{Subscription}', Subscription ID: {SubscriptionId}", serviceBusSubscription.Topic, serviceBusSubscription.ConsumerGroup, serviceBusSubscription.Id);
10398

104-
await processor.StopProcessingAsync(cancellationToken);
105-
await processor.DisposeAsync();
106-
}
107-
else
108-
{
109-
_logger.LogWarning("No active subscription found for topic '{Topic}' and subscription '{Subscription}'", topic, subscriptionName);
110-
}
99+
await serviceBusSubscription.Consumer.StopProcessingAsync(cancellationToken);
100+
await serviceBusSubscription.DisposeAsync();
111101
}
112102

113103
public async Task PublishAsync(string topic, string message, CancellationToken cancellationToken = default)
@@ -157,14 +147,16 @@ public async ValueTask DisposeAsync()
157147

158148
_logger.LogDebug("Disposing AzureServiceBusMessagingProvider...");
159149

160-
foreach (var processor in _processors)
150+
foreach (var subscription in _subscriptions.Values)
161151
{
162-
_logger.LogDebug("Stopping message processor...");
163-
await processor.StopProcessingAsync();
164-
await processor.DisposeAsync();
152+
if (subscription.Consumer.IsProcessing)
153+
{
154+
await subscription.Consumer.StopProcessingAsync(CancellationToken.None);
155+
}
156+
await subscription.DisposeAsync();
165157
}
166158

167-
_logger.LogDebug("Disposing ServiceBusClient...");
159+
_subscriptions.Clear();
168160
await _client.DisposeAsync();
169161
}
170162
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using OpenDDD.Infrastructure.Events.Base;
2+
using Azure.Messaging.ServiceBus;
3+
4+
namespace OpenDDD.Infrastructure.Events.Azure
5+
{
6+
public class AzureServiceBusSubscription : Subscription<ServiceBusProcessor>
7+
{
8+
public AzureServiceBusSubscription(string topic, string consumerGroup, ServiceBusProcessor processor)
9+
: base(topic, consumerGroup, processor)
10+
{
11+
12+
}
13+
14+
public override async ValueTask DisposeAsync()
15+
{
16+
if (Consumer.IsProcessing)
17+
{
18+
await Consumer.StopProcessingAsync();
19+
}
20+
await Consumer.DisposeAsync();
21+
}
22+
}
23+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace OpenDDD.Infrastructure.Events.Base
2+
{
3+
public interface ISubscription : IAsyncDisposable
4+
{
5+
string Id { get; }
6+
string Topic { get; }
7+
string ConsumerGroup { get; }
8+
}
9+
}

0 commit comments

Comments
 (0)