diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 35e74a17..6faad3cb 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -583,7 +583,7 @@ public void Dispose() } catch (Exception e) { - _logger.LogError(e, "Error during disposing of consumer: {SubscriberId}.", _subscriberId); + _logger.LogError(e, "Error during disposing of consumer: {SubscriberId}", _subscriberId); } finally { diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index f8768a24..2a145210 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -397,7 +397,7 @@ public void Dispose() } catch (Exception e) { - _logger.LogError(e, "Error during disposing Consumer: {PublisherId}.", _publisherId); + _logger.LogError(e, "Error during disposing Consumer: {PublisherId}", _publisherId); } GC.SuppressFinalize(this); diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index f15550cf..32c93bce 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -90,23 +90,23 @@ public async void ConsumerStoreOffset() var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config); await system.CreateStream(new StreamSpec(stream)); - const int numberOfMessages = 10; - await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper); + const int NumberOfMessages = 10; + await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper); var count = 0; var consumer = await system.CreateRawConsumer( new RawConsumerConfig(stream) { Reference = "consumer_offset", OffsetSpec = new OffsetTypeFirst(), - MessageHandler = async (consumer, ctx, message) => + MessageHandler = async (consumer, ctx, _) => { testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}"); count++; - if (count == numberOfMessages) + if (count == NumberOfMessages) { await consumer.StoreOffset(ctx.Offset); testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}"); - testPassed.SetResult(numberOfMessages); + testPassed.SetResult(NumberOfMessages); } await Task.CompletedTask; @@ -122,7 +122,7 @@ public async void ConsumerStoreOffset() var client = await Client.Create(clientParameters); var offset = await client.QueryOffset("consumer_offset", stream); // The offset must be numberOfMessages less one - Assert.Equal(offset.Offset, Convert.ToUInt64(numberOfMessages - 1)); + Assert.Equal(offset.Offset, Convert.ToUInt64(NumberOfMessages - 1)); await consumer.Close(); await system.DeleteStream(stream); await system.Close(); @@ -456,33 +456,33 @@ public async void ConsumerQueryOffset() var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config); await system.CreateStream(new StreamSpec(stream)); - const int numberOfMessages = 10; - const int numberOfMessagesToStore = 4; - await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper); + const int NumberOfMessages = 10; + const int NumberOfMessagesToStore = 4; + await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper); var count = 0; - const string reference = "consumer_offset"; + const string Reference = "consumer_offset"; var rawConsumer = await system.CreateRawConsumer( new RawConsumerConfig(stream) { Crc32 = _crc32, - Reference = reference, + Reference = Reference, OffsetSpec = new OffsetTypeOffset(), MessageHandler = async (consumer, ctx, message) => { testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}"); count++; - if (count == numberOfMessagesToStore) - { - // store the the offset after numberOfMessagesToStore messages - // so when we query the offset we should (must) have the same - // values - await consumer.StoreOffset(ctx.Offset); - testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}"); - } - - if (count == numberOfMessages) + switch (count) { - testPassed.SetResult(numberOfMessages); + case NumberOfMessagesToStore: + // store the the offset after numberOfMessagesToStore messages + // so when we query the offset we should (must) have the same + // values + await consumer.StoreOffset(ctx.Offset); + testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}"); + break; + case NumberOfMessages: + testPassed.SetResult(NumberOfMessages); + break; } await Task.CompletedTask; @@ -494,8 +494,8 @@ public async void ConsumerQueryOffset() // it may need some time to store the offset SystemUtils.Wait(); // numberOfMessagesToStore index 0 - Assert.Equal((ulong)(numberOfMessagesToStore - 1), - await system.QueryOffset(reference, stream)); + Assert.Equal((ulong)(NumberOfMessagesToStore - 1), + await system.QueryOffset(Reference, stream)); // this has to raise OffsetNotFoundException in case the offset // does not exist like in this case. @@ -696,5 +696,41 @@ public async void ProducerConsumerMixingDifferentSendTypesCompressAndStandard() await system.DeleteStream(stream); await system.Close(); } + + [Fact] + public async void ShouldConsumeFromDateTimeOffset() + { + // validate the consumer can start from a specific time + // this test is not deterministic because it depends on the + // time the test is executed. + // but at least we can validate the consumer can start from a specific time less 100 ms + // and it has to receive all the messages + // not 100% perfect but it is better than nothing + + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var before = DateTimeOffset.Now.AddMilliseconds(-100); + await SystemUtils.PublishMessages(system, stream, 100, testOutputHelper); + var testPassed = new TaskCompletionSource(); + + var consumer = await system.CreateRawConsumer( + new RawConsumerConfig(stream) + { + Reference = "consumer", + OffsetSpec = new OffsetTypeTimestamp(before), + MessageHandler = async (_, ctx, _) => + { + Assert.True(ctx.Timestamp >= before.Offset); + if (ctx.Offset == 99) + { + testPassed.SetResult(true); + } + + await Task.CompletedTask; + } + }); + new Utils(testOutputHelper).WaitUntilTaskCompletes(testPassed); + await consumer.Close().ConfigureAwait(false); + await SystemUtils.CleanUpStreamSystem(system, stream); + } } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index e63e2904..a2cc1d1c 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -80,8 +80,8 @@ public void MessageConfirmationShouldHaveTheSameMessages() var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); - confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null); - confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null); + confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null).ConfigureAwait(false); + confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null).ConfigureAwait(false); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status); Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status);