From 1cd841a8cb22c5eca10b979dc302b9d22b5a98de Mon Sep 17 00:00:00 2001 From: Joseph Cummings Date: Fri, 5 Apr 2024 14:48:23 +0100 Subject: [PATCH] Introduce support for distributed tracing and Open Telemetry --- EventStore.Client.sln | 7 + README.md | 8 + src/Directory.Build.props | 6 +- src/EventStore.Client.Common/Constants.cs | 10 +- ...ore.Client.Extensions.OpenTelemetry.csproj | 20 + .../TracerProviderBuilderExtensions.cs | 19 + .../EventStoreClient.Append.cs | 346 +++++++++++------- .../EventStoreClient.Metadata.cs | 2 +- .../EventStoreClient.Subscriptions.cs | 210 +++++++---- .../EventStoreClient.cs | 79 ++-- .../StreamSubscription.cs | 69 ++-- .../Diagnostics/ActivityExtensions.cs | 45 +++ .../Diagnostics/ActivityStatus.cs | 11 + .../ActivityTagsCollectionExtensions.cs | 66 ++++ .../Diagnostics/EventMetadataExtensions.cs | 83 +++++ .../EventStoreClientDiagnostics.cs | 45 +++ .../EventStoreClientInstrumentation.cs | 5 + .../Telemetry/TelemetryAttributes.cs | 29 ++ .../Diagnostics/Tracing/TracingConstants.cs | 13 + .../Diagnostics/Tracing/TracingMetadata.cs | 5 + .../EventStore.Client.csproj | 1 + src/EventStore.Client/EventStoreClientBase.cs | 68 ++-- .../Append/append_to_stream.cs | 65 ++-- .../Subscriptions/subscribe_to_all.cs | 127 +++++-- .../Subscriptions/subscribe_to_stream.cs | 37 +- .../Fixtures/EventStoreFixture.Helpers.cs | 8 + .../Fixtures/EventStoreFixture.cs | 66 ++-- .../Fixtures/EventStoreTestNode.cs | 2 +- .../GlobalEnvironment.cs | 4 +- .../Diagnostics/DiagnosticsFixture.cs | 25 ++ .../TracingInstrumentationTests.cs | 113 ++++++ 31 files changed, 1179 insertions(+), 415 deletions(-) create mode 100644 src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj create mode 100644 src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityStatus.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs create mode 100644 src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs create mode 100644 src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs create mode 100644 src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs create mode 100644 src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs create mode 100644 test/EventStore.Client.Tests/Diagnostics/DiagnosticsFixture.cs create mode 100644 test/EventStore.Client.Tests/Diagnostics/TracingInstrumentationTests.cs diff --git a/EventStore.Client.sln b/EventStore.Client.sln index 51229f72c..84919f42b 100644 --- a/EventStore.Client.sln +++ b/EventStore.Client.sln @@ -33,6 +33,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.UserManag EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests.Common", "test\EventStore.Client.Tests.Common\EventStore.Client.Tests.Common.csproj", "{E326832D-DE52-4DE4-9E54-C800908B75F3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Extensions.OpenTelemetry", "src\EventStore.Client.Extensions.OpenTelemetry\EventStore.Client.Extensions.OpenTelemetry.csproj", "{3723933C-585A-49BE-98E8-52D3FAD904CE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -94,6 +96,10 @@ Global {E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.Build.0 = Debug|Any CPU {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.ActiveCfg = Release|Any CPU {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.Build.0 = Release|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.ActiveCfg = Debug|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.Build.0 = Debug|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.ActiveCfg = Release|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {D3744A86-DD35-4104-AAEE-84B79062C4A2} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8} @@ -109,5 +115,6 @@ Global {6CEB731F-72E1-461F-A6B3-54DBF3FD786C} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} {22634CEE-4F7B-4679-A48D-38A2A8580ECA} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} {E326832D-DE52-4DE4-9E54-C800908B75F3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} + {3723933C-585A-49BE-98E8-52D3FAD904CE} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8} EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 37657a3df..0e4c39125 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,14 @@ Reference the nuget package(s) for the API that you would like to call [User Management](https://www.nuget.org/packages/EventStore.Client.Grpc.UserManagement) +## Open Telemetry + +Telemetry instrumentation can be enabled by installing the [Open Telemetry Extensions](https://www.nuget.org/packages/EventStore.Client.Extensions.OpenTelemetry) package. + +Once installed you can configure instrumentation using the `AddEventStoreClientInstrumentation` extension method on a `TracerProviderBuilder`. + +Tracing is the only telemetry currently exported, specifically for the `Append` and `Subscribe` (Catchup and Persistent) operations. + ## Support Information on support and commercial tools such as LDAP authentication can be found here: [Event Store Support](https://eventstore.com/support/). diff --git a/src/Directory.Build.props b/src/Directory.Build.props index c12d9449c..609de534e 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -4,7 +4,7 @@ EventStore.Client - + $(MSBuildProjectName.Remove(0,18)) $(ESPackageIdSuffix.ToLower()).proto ../EventStore.Client.Common/protos/$(ESProto) @@ -50,6 +50,10 @@ + + + + <_Parameter1>$(ProjectName).Tests diff --git a/src/EventStore.Client.Common/Constants.cs b/src/EventStore.Client.Common/Constants.cs index 3e0279e6b..a088bcab2 100644 --- a/src/EventStore.Client.Common/Constants.cs +++ b/src/EventStore.Client.Common/Constants.cs @@ -39,10 +39,10 @@ public static class Exceptions { } public static class Metadata { - public const string Type = "type"; - public const string Created = "created"; - public const string ContentType = "content-type"; - public static readonly string[] RequiredMetadata = { Type, ContentType }; + public const string Type = "type"; + public const string Created = "created"; + public const string ContentType = "content-type"; + public static readonly string[] RequiredMetadata = { Type, ContentType }; public static class ContentTypes { public const string ApplicationJson = "application/json"; @@ -58,4 +58,4 @@ public static class Headers { public const string ConnectionName = "connection-name"; public const string RequiresLeader = "requires-leader"; } -} \ No newline at end of file +} diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj new file mode 100644 index 000000000..2783f0adc --- /dev/null +++ b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj @@ -0,0 +1,20 @@ + + + + EventStore.Client.Extensions.OpenTelemetry + + + + EventStore.Client.Extensions.OpenTelemetry + Extensions used to facilitate instrumentation of the EventStore Client. + + + + + + + + + + + diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs new file mode 100644 index 000000000..6ee1c9edf --- /dev/null +++ b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs @@ -0,0 +1,19 @@ +using EventStore.Client.Diagnostics; +using JetBrains.Annotations; +using OpenTelemetry.Trace; + +namespace EventStore.Client.Extensions.OpenTelemetry; + +/// +/// Extension methods used to facilitate instrumentation of the EventStore Client. +/// +[PublicAPI] +public static class TracerProviderBuilderExtensions { + /// + /// Enables instrumentation of the EventStore .NET Client on the OpenTelemetry TracerProvider. + /// + /// being configured. + /// The instance of to chain configuration. + public static TracerProviderBuilder AddEventStoreClientInstrumentation(this TracerProviderBuilder builder) + => builder.AddSource(EventStoreClientInstrumentation.ActivitySourceName); +} diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 9d0842855..b7341769c 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -1,14 +1,13 @@ -using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; +using System.Diagnostics; using System.Threading.Channels; using Google.Protobuf; using EventStore.Client.Streams; using Grpc.Core; using Microsoft.Extensions.Logging; -using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; namespace EventStore.Client { public partial class EventStoreClient { @@ -30,24 +29,30 @@ public async Task AppendToStreamAsync( Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var options = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(options); _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision); - var batchAppender = _streamAppender; var task = - userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false) - ? batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) + userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false) + ? _batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) : AppendToStreamInternal( - (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker, + await GetChannelInfo(cancellationToken).ConfigureAwait(false), new AppendReq { Options = new AppendReq.Types.Options { StreamIdentifier = streamName, Revision = expectedRevision } - }, eventData, options, deadline, userCredentials, cancellationToken); + }, + eventData, + options, + deadline, + userCredentials, + cancellationToken + ); return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options); } @@ -70,29 +75,35 @@ public async Task AppendToStreamAsync( Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var operationOptions = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(operationOptions); - _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedState); + _log.LogDebug("Append to stream - {streamName}@{expectedState}.", streamName, expectedState); - var batchAppender = _streamAppender; var task = - userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false) - ? batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken) + userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false) + ? _batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken) : AppendToStreamInternal( - (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker, + await GetChannelInfo(cancellationToken).ConfigureAwait(false), new AppendReq { Options = new AppendReq.Types.Options { StreamIdentifier = streamName } - }.WithAnyStreamRevision(expectedState), eventData, operationOptions, deadline, userCredentials, - cancellationToken); + }.WithAnyStreamRevision(expectedState), + eventData, + operationOptions, + deadline, + userCredentials, + cancellationToken + ); + return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions); } - private async ValueTask AppendToStreamInternal( - CallInvoker callInvoker, + private ValueTask AppendToStreamInternal( + ChannelInfo channelInfo, AppendReq header, IEnumerable eventData, EventStoreClientOperationOptions operationOptions, @@ -100,39 +111,54 @@ private async ValueTask AppendToStreamInternal( UserCredentials? userCredentials, CancellationToken cancellationToken ) { - using var call = new Streams.Streams.StreamsClient(callInvoker).Append( - EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + return EventStoreClientDiagnostics.Trace( + Operation, + TracingConstants.Operations.Append, + new ActivityTagsCollection { + { + TelemetryAttributes.EventStoreStream, + header.Options.StreamIdentifier.StreamName.ToStringUtf8() + } + } + .WithTagsFrom(channelInfo, Settings) + .WithTagsFrom(userCredentials) ); - await call.RequestStream.WriteAsync(header).ConfigureAwait(false); + async ValueTask Operation() { + using var call = new Streams.Streams.StreamsClient(channelInfo.CallInvoker).Append( + EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); + + await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - foreach (var e in eventData) { - await call.RequestStream.WriteAsync( - new AppendReq { + foreach (var e in eventData) { + var appendReq = new AppendReq { ProposedMessage = new AppendReq.Types.ProposedMessage { Id = e.EventId.ToDto(), Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingMetadata()), Metadata = { { Constants.Metadata.Type, e.Type }, { Constants.Metadata.ContentType, e.ContentType } } - }, - } - ).ConfigureAwait(false); - } + } + }; - await call.RequestStream.CompleteAsync().ConfigureAwait(false); + await call.RequestStream.WriteAsync(appendReq).ConfigureAwait(false); + } - var response = await call.ResponseAsync.ConfigureAwait(false); + await call.RequestStream.CompleteAsync().ConfigureAwait(false); - if (response.Success != null) - return HandleSuccessAppend(response, header); + var response = await call.ResponseAsync.ConfigureAwait(false); - if (response.WrongExpectedVersion == null) - throw new InvalidOperationException("The operation completed with an unexpected result."); + if (response.Success != null) + return HandleSuccessAppend(response, header); - return HandleWrongExpectedRevision(response, header, operationOptions); + if (response.WrongExpectedVersion == null) + throw new InvalidOperationException("The operation completed with an unexpected result."); + + return HandleWrongExpectedRevision(response, header, operationOptions); + } } private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { @@ -150,7 +176,8 @@ private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) "Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.", header.Options.StreamIdentifier, position, - currentRevision); + currentRevision + ); return new SuccessResult(currentRevision, position); } @@ -224,140 +251,187 @@ private class StreamAppender : IDisposable { private readonly Action _onException; private readonly Channel _channel; private readonly ConcurrentDictionary> _pendingRequests; + private readonly TaskCompletionSource _isUsable; - private readonly Task?> _callTask; + private ChannelInfo? _channelInfo; - public StreamAppender(EventStoreClientSettings settings, - Task?> callTask, CancellationToken cancellationToken, - Action onException) { + public StreamAppender( + EventStoreClientSettings settings, + ValueTask channelInfoTask, + CancellationToken cancellationToken, + Action onException + ) { _settings = settings; - _callTask = callTask; _cancellationToken = cancellationToken; _onException = onException; - _channel = System.Threading.Channels.Channel.CreateBounded(10000); + _channel = Channel.CreateBounded(10000); _pendingRequests = new ConcurrentDictionary>(); - _ = Task.Factory.StartNew(Send); - _ = Task.Factory.StartNew(Receive); + _isUsable = new TaskCompletionSource(); + + _ = Task.Run(() => Duplex(channelInfoTask)); } - public ValueTask Append(string streamName, StreamRevision expectedStreamPosition, - IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) => - AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter), - events, cancellationToken); + public ValueTask Append( + string streamName, StreamRevision expectedStreamPosition, + IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default + ) => + AppendInternal( + BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter), + events, + cancellationToken + ); - public ValueTask Append(string streamName, StreamState expectedStreamState, - IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) => - AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter), - events, cancellationToken); + public ValueTask Append( + string streamName, StreamState expectedStreamState, + IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default + ) => + AppendInternal( + BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter), + events, + cancellationToken + ); - public async ValueTask IsUsable() { - var call = await _callTask.ConfigureAwait(false); - return call != null; - } + public Task IsUsable() => _isUsable.Task; + + private ValueTask AppendInternal( + BatchAppendReq.Types.Options options, + IEnumerable events, CancellationToken cancellationToken + ) { + return EventStoreClientDiagnostics.Trace( + Operation, + TracingConstants.Operations.Append, + new ActivityTagsCollection { + { TelemetryAttributes.EventStoreStream, options.StreamIdentifier.StreamName.ToStringUtf8() }, + { TelemetryAttributes.ServerAddress, _channelInfo?.Channel.Target } + } + .WithTagsFrom(_channelInfo, _settings) + ); - private async Task Receive() { - try { - var call = await _callTask.ConfigureAwait(false); - if (call is null) { - _channel.Writer.TryComplete( - new NotSupportedException("Server does not support batch append")); - return; - } + async ValueTask Operation() { + var correlationId = Uuid.NewUuid(); - await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken) - .ConfigureAwait(false)) { - if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) { - continue; // TODO: Log? - } + var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource()); - try { - writeResult.TrySetResult(response.ToWriteResult()); - } catch (Exception ex) { - writeResult.TrySetException(ex); + try { + foreach (var appendRequest in GetRequests(events, options, correlationId)) { + await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); } + } catch (ChannelClosedException ex) { + // channel is closed, our tcs won't necessarily get completed, don't wait for it. + throw ex.InnerException ?? ex; } - } catch (Exception ex) { - // signal that no tcs added to _pendingRequests after this point will necessarily complete - _channel.Writer.TryComplete(ex); - - // complete whatever tcs's we have - _onException(ex); - foreach (var request in _pendingRequests) { - request.Value.TrySetException(ex); - } + + return await complete.Task.ConfigureAwait(false); } } - private async Task Send() { - var call = await _callTask.ConfigureAwait(false); - if (call is null) - throw new NotSupportedException("Server does not support batch append"); - - await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) - .ConfigureAwait(false)) { - await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); + private async Task Duplex( + ValueTask channelInfoTask + ) { + _channelInfo = await channelInfoTask.ConfigureAwait(false); + if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) { + _channel.Writer.TryComplete(new NotSupportedException("Server does not support batch append")); + _isUsable.TrySetResult(false); + return; } - await call.RequestStream.CompleteAsync().ConfigureAwait(false); - } + var call = new Streams.Streams.StreamsClient(_channelInfo.CallInvoker).BatchAppend( + EventStoreCallOptions.CreateStreaming( + _settings, + userCredentials: _settings.DefaultCredentials, + cancellationToken: _cancellationToken + ) + ); + + _ = Task.Run(() => Send(call)); + _ = Task.Run(() => Receive(call)); - private async ValueTask AppendInternal(BatchAppendReq.Types.Options options, - IEnumerable events, CancellationToken cancellationToken) { - var batchSize = 0; - var correlationId = Uuid.NewUuid(); - var correlationIdDto = correlationId.ToDto(); + _isUsable.TrySetResult(true); - var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource()); + return; - try { - foreach (var appendRequest in GetRequests()) { - await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); + async Task Send(AsyncDuplexStreamingCall call) { + await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); } - } catch (ChannelClosedException ex) { - // channel is closed, our tcs won't necessarily get completed, don't wait for it. - throw ex.InnerException ?? ex; - } - return await complete.Task.ConfigureAwait(false); + await call.RequestStream.CompleteAsync().ConfigureAwait(false); + } - IEnumerable GetRequests() { - bool first = true; - var proposedMessages = new List(); - foreach (var @event in events) { - var proposedMessage = new BatchAppendReq.Types.ProposedMessage { - Data = ByteString.CopyFrom(@event.Data.Span), - CustomMetadata = ByteString.CopyFrom(@event.Metadata.Span), - Id = @event.EventId.ToDto(), - Metadata = { - {Constants.Metadata.Type, @event.Type}, - {Constants.Metadata.ContentType, @event.ContentType} + async Task Receive(AsyncDuplexStreamingCall call) { + try { + await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + if (!_pendingRequests.TryRemove( + Uuid.FromDto(response.CorrelationId), + out var writeResult + )) { + continue; // TODO: Log? } - }; - proposedMessages.Add(proposedMessage); + try { + writeResult.TrySetResult(response.ToWriteResult()); + } catch (Exception ex) { + writeResult.TrySetException(ex); + } + } + } catch (Exception ex) { + // signal that no tcs added to _pendingRequests after this point will necessarily complete + _channel.Writer.TryComplete(ex); + + // complete whatever tcs's we have + _onException(ex); + foreach (var request in _pendingRequests) { + request.Value.TrySetException(ex); + } + } + } + } - if ((batchSize += proposedMessage.CalculateSize()) < - _settings.OperationOptions.BatchAppendSize) { - continue; + private IEnumerable GetRequests( + IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId + ) { + var batchSize = 0; + bool first = true; + var correlationIdDto = correlationId.ToDto(); + var proposedMessages = new List(); + + foreach (var @event in events) { + var proposedMessage = new BatchAppendReq.Types.ProposedMessage { + Data = ByteString.CopyFrom(@event.Data.Span), + CustomMetadata = ByteString.CopyFrom(@event.Metadata.InjectTracingMetadata()), + Id = @event.EventId.ToDto(), + Metadata = { + { Constants.Metadata.Type, @event.Type }, + { Constants.Metadata.ContentType, @event.ContentType } } + }; + + proposedMessages.Add(proposedMessage); - yield return new BatchAppendReq { - ProposedMessages = {proposedMessages}, - CorrelationId = correlationIdDto, - Options = first ? options : null - }; - first = false; - proposedMessages.Clear(); - batchSize = 0; + if ((batchSize += proposedMessage.CalculateSize()) < + _settings.OperationOptions.BatchAppendSize) { + continue; } yield return new BatchAppendReq { - ProposedMessages = {proposedMessages}, - IsFinal = true, + ProposedMessages = { proposedMessages }, CorrelationId = correlationIdDto, Options = first ? options : null }; + + first = false; + proposedMessages.Clear(); + batchSize = 0; } + + yield return new BatchAppendReq { + ProposedMessages = { proposedMessages }, + IsFinal = true, + CorrelationId = correlationIdDto, + Options = first ? options : null + }; } public void Dispose() { diff --git a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs index 6581bd94b..19de629e7 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs @@ -97,7 +97,7 @@ private async Task SetStreamMetadataInternal(StreamMetadata metada CancellationToken cancellationToken) { var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] { + return await AppendToStreamInternal(channelInfo, appendReq, new[] { new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata, JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)), }, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false); diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index f82bd5d07..4b47b57b4 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -1,4 +1,8 @@ +using System.Diagnostics; using System.Threading.Channels; +using EventStore.Client.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; using EventStore.Client.Streams; using Grpc.Core; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; @@ -24,10 +28,15 @@ public Task SubscribeToAllAsync( Action? subscriptionDropped = default, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => StreamSubscription.Confirm( + CancellationToken cancellationToken = default + ) => StreamSubscription.Confirm( SubscribeToAll(start, resolveLinkTos, filterOptions, userCredentials, cancellationToken), - eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached, - cancellationToken: cancellationToken); + eventAppeared, + subscriptionDropped, + _log, + filterOptions?.CheckpointReached, + cancellationToken: cancellationToken + ); /// /// Subscribes to all events. @@ -43,19 +52,23 @@ public StreamSubscriptionResult SubscribeToAll( bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => new(async _ => { - var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return channelInfo.CallInvoker; - }, new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), - Filter = GetFilterOptions(filterOptions)!, - UuidOption = new() { Structured = new() } - } - }, Settings, userCredentials, cancellationToken); + CancellationToken cancellationToken = default + ) => new( + async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false), + new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + Filter = GetFilterOptions(filterOptions)!, + UuidOption = new() { Structured = new() } + } + }, + Settings, + userCredentials, + cancellationToken + ); /// /// Subscribes to a stream from a checkpoint. @@ -69,15 +82,21 @@ public StreamSubscriptionResult SubscribeToAll( /// The optional . /// [Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.", false)] - public Task SubscribeToStreamAsync(string streamName, - FromStream start, - Func eventAppeared, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => StreamSubscription.Confirm( + public Task SubscribeToStreamAsync( + string streamName, + FromStream start, + Func eventAppeared, + bool resolveLinkTos = false, + Action? subscriptionDropped = default, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) => StreamSubscription.Confirm( SubscribeToStream(streamName, start, resolveLinkTos, userCredentials, cancellationToken), - eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken); + eventAppeared, + subscriptionDropped, + _log, + cancellationToken: cancellationToken + ); /// /// Subscribes to a stream from a checkpoint. @@ -93,28 +112,33 @@ public StreamSubscriptionResult SubscribeToStream( FromStream start, bool resolveLinkTos = false, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => new(async _ => { - var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return channelInfo.CallInvoker; - }, new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), - UuidOption = new() { Structured = new() } - } - }, Settings, userCredentials, cancellationToken); + CancellationToken cancellationToken = default + ) => new( + async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false), + new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + UuidOption = new() { Structured = new() } + } + }, + Settings, + userCredentials, + cancellationToken + ); /// /// A class that represents the result of a subscription operation. You may either enumerate this instance directly or . Do not enumerate more than once. /// public class StreamSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable { - private readonly ReadReq _request; - private readonly Channel _channel; - private readonly CancellationTokenSource _cts; - private readonly CallOptions _callOptions; - private AsyncServerStreamingCall? _call; + private readonly ReadReq _request; + private readonly Channel _channel; + private readonly CancellationTokenSource _cts; + private readonly CallOptions _callOptions; + private readonly EventStoreClientSettings _settings; + private AsyncServerStreamingCall? _call; private int _messagesEnumerated; @@ -150,12 +174,18 @@ async IAsyncEnumerable GetMessages() { } } - internal StreamSubscriptionResult(Func> selectCallInvoker, + internal StreamSubscriptionResult( + Func> selectChannelInfo, ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials, - CancellationToken cancellationToken) { - _request = request; - _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials, - cancellationToken: cancellationToken); + CancellationToken cancellationToken + ) { + _request = request; + _settings = settings; + _callOptions = EventStoreCallOptions.CreateStreaming( + settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken + ); _channel = Channel.CreateBounded(ReadBoundedChannelOptions); @@ -171,29 +201,72 @@ internal StreamSubscriptionResult(Func> sel async Task PumpMessages() { try { - var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); - var client = new Streams.Streams.StreamsClient(callInvoker); + var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false); + var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker); _call = client.Read(_request, _callOptions); await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token) .ConfigureAwait(false)) { - await _channel.Writer.WriteAsync(response.ContentCase switch { - Confirmation => new StreamMessage.SubscriptionConfirmation( - response.Confirmation.SubscriptionId), - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), - FirstStreamPosition => new StreamMessage.FirstStreamPosition( - new StreamPosition(response.FirstStreamPosition)), - LastStreamPosition => new StreamMessage.LastStreamPosition( - new StreamPosition(response.LastStreamPosition)), - LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( - new Position(response.LastAllStreamPosition.CommitPosition, - response.LastAllStreamPosition.PreparePosition)), - Checkpoint => new StreamMessage.AllStreamCheckpointReached( - new Position(response.Checkpoint.CommitPosition, - response.Checkpoint.PreparePosition)), - CaughtUp => StreamMessage.CaughtUp.Instance, - FellBehind => StreamMessage.FellBehind.Instance, - _ => StreamMessage.Unknown.Instance - }, _cts.Token).ConfigureAwait(false); + StreamMessage streamMessage = + response.ContentCase switch { + Confirmation => new StreamMessage.SubscriptionConfirmation( + response.Confirmation.SubscriptionId + ), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + FirstStreamPosition => new StreamMessage.FirstStreamPosition( + new StreamPosition(response.FirstStreamPosition) + ), + LastStreamPosition => new StreamMessage.LastStreamPosition( + new StreamPosition(response.LastStreamPosition) + ), + LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( + new Position( + response.LastAllStreamPosition.CommitPosition, + response.LastAllStreamPosition.PreparePosition + ) + ), + Checkpoint => new StreamMessage.AllStreamCheckpointReached( + new Position( + response.Checkpoint.CommitPosition, + response.Checkpoint.PreparePosition + ) + ), + CaughtUp => StreamMessage.CaughtUp.Instance, + FellBehind => StreamMessage.FellBehind.Instance, + _ => StreamMessage.Unknown.Instance + }; + + if (streamMessage is StreamMessage.Event evnt) { + var restoredTracingContext = + evnt.ResolvedEvent.OriginalEvent.Metadata.RestoreTracingContext(); + + if (restoredTracingContext != null) + EventStoreClientDiagnostics.StartActivity( + TracingConstants.Operations.Subscribe, + new ActivityTagsCollection { + { + TelemetryAttributes.EventStoreStream, + evnt.ResolvedEvent.OriginalEvent.EventStreamId + }, { + TelemetryAttributes.EventStoreSubscriptionId, + SubscriptionId + }, { + TelemetryAttributes.EventStoreEventId, + evnt.ResolvedEvent.OriginalEvent.EventId + }, { + TelemetryAttributes.EventStoreEventType, + evnt.ResolvedEvent.OriginalEvent.EventType + } + }.WithTagsFrom(channelInfo, _settings) + .WithTagsFrom(userCredentials), + ActivityKind.Consumer, + restoredTracingContext + )?.Dispose(); + } + + await _channel.Writer.WriteAsync( + streamMessage, + _cts.Token + ).ConfigureAwait(false); } _channel.Writer.Complete(); @@ -214,9 +287,11 @@ static async ValueTask CastAndDispose(IDisposable? resource) { switch (resource) { case null: return; + case IAsyncDisposable resourceAsyncDisposable: await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false); break; + default: resource.Dispose(); break; @@ -232,7 +307,8 @@ public void Dispose() { /// public async IAsyncEnumerator GetAsyncEnumerator( - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { try { await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken) .ConfigureAwait(false)) { diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs index 361e6e2d4..63928055e 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.cs @@ -12,32 +12,48 @@ namespace EventStore.Client { /// The client used for operations on streams. /// public sealed partial class EventStoreClient : EventStoreClientBase { - private static readonly JsonSerializerOptions StreamMetadataJsonSerializerOptions = new() { Converters = { StreamMetadataJsonConverter.Instance }, }; - private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) { - SingleReader = true, - SingleWriter = true, + private static BoundedChannelOptions ReadBoundedChannelOptions = new(1) { + SingleReader = true, + SingleWriter = true, AllowSynchronousContinuations = true }; - - private readonly ILogger _log; - private Lazy _streamAppenderLazy; - private StreamAppender _streamAppender => _streamAppenderLazy.Value; - private readonly CancellationTokenSource _disposedTokenSource; + private readonly ILogger _log; + private Lazy _streamAppenderLazy; + private StreamAppender _batchAppender => _streamAppenderLazy.Value; + private readonly CancellationTokenSource _disposedTokenSource; private static readonly Dictionary> ExceptionMap = new() { [Constants.Exceptions.InvalidTransaction] = ex => new InvalidTransactionException(ex.Message, ex), - [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", ex), - [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), ex, ex.Message), - [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), ex), - [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex), - [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, ex), + [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", + ex + ), + [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, + ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), + ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), + ex, + ex.Message + ), + [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException( + ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), + ex + ), + [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, + ex + ), + [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, + ex + ), }; /// @@ -57,25 +73,20 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin } private void SwapStreamAppender(Exception ex) => - Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value.Dispose(); + Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value + .Dispose(); // todo: might be nice to have two different kinds of appenders and we decide which to instantiate according to the server caps. - private StreamAppender CreateStreamAppender() { - return new StreamAppender(Settings, GetCall(), _disposedTokenSource.Token, SwapStreamAppender); - - async Task?> GetCall() { - var channelInfo = await GetChannelInfo(_disposedTokenSource.Token).ConfigureAwait(false); - if (!channelInfo.ServerCapabilities.SupportsBatchAppend) - return null; - - var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker); - - return client.BatchAppend(EventStoreCallOptions.CreateStreaming(Settings, - userCredentials: Settings.DefaultCredentials, cancellationToken: _disposedTokenSource.Token)); - } - } - - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) { + private StreamAppender CreateStreamAppender() => new StreamAppender( + Settings, + GetChannelInfo(_disposedTokenSource.Token), + _disposedTokenSource.Token, + SwapStreamAppender + ); + + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( + IEventFilter? filter, uint checkpointInterval = 0 + ) { if (filter == null) { return null; } @@ -131,13 +142,16 @@ private StreamAppender CreateStreamAppender() { return options; } - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions) + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( + SubscriptionFilterOptions? filterOptions + ) => filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval); /// public override void Dispose() { if (_streamAppenderLazy.IsValueCreated) _streamAppenderLazy.Value.Dispose(); + _disposedTokenSource.Dispose(); base.Dispose(); } @@ -146,6 +160,7 @@ public override void Dispose() { public override async ValueTask DisposeAsync() { if (_streamAppenderLazy.IsValueCreated) _streamAppenderLazy.Value.Dispose(); + _disposedTokenSource.Dispose(); await base.DisposeAsync().ConfigureAwait(false); } diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs index 9019eda64..c6b9e8ae6 100644 --- a/src/EventStore.Client.Streams/StreamSubscription.cs +++ b/src/EventStore.Client.Streams/StreamSubscription.cs @@ -7,26 +7,28 @@ namespace EventStore.Client { /// [Obsolete] public class StreamSubscription : IDisposable { - private readonly EventStoreClient.StreamSubscriptionResult _subscription; - private readonly IAsyncEnumerator _messages; - private readonly Func _eventAppeared; - private readonly Func _checkpointReached; + private readonly EventStoreClient.StreamSubscriptionResult _subscription; + private readonly IAsyncEnumerator _messages; + private readonly Func _eventAppeared; + private readonly Func _checkpointReached; private readonly Action? _subscriptionDropped; - private readonly ILogger _log; - private readonly CancellationTokenSource _cts; - private int _subscriptionDroppedInvoked; + private readonly ILogger _log; + private readonly CancellationTokenSource _cts; + private int _subscriptionDroppedInvoked; /// /// The id of the set by the server. /// public string SubscriptionId { get; } - internal static async Task Confirm(EventStoreClient.StreamSubscriptionResult subscription, + internal static async Task Confirm( + EventStoreClient.StreamSubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped, ILogger log, Func? checkpointReached = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var messages = subscription.Messages; var enumerator = messages.GetAsyncEnumerator(cancellationToken); @@ -35,26 +37,36 @@ enumerator.Current is not StreamMessage.SubscriptionConfirmation(var subscriptio throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed."); } - return new StreamSubscription(subscription, enumerator, subscriptionId, eventAppeared, subscriptionDropped, - log, checkpointReached, cancellationToken); + return new StreamSubscription( + subscription, + enumerator, + subscriptionId, + eventAppeared, + subscriptionDropped, + log, + checkpointReached, + cancellationToken + ); } - private StreamSubscription(EventStoreClient.StreamSubscriptionResult subscription, + private StreamSubscription( + EventStoreClient.StreamSubscriptionResult subscription, IAsyncEnumerator messages, string subscriptionId, Func eventAppeared, Action? subscriptionDropped, ILogger log, Func? checkpointReached, - CancellationToken cancellationToken = default) { - _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _subscription = subscription; - _messages = messages; - _eventAppeared = eventAppeared; - _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask); - _subscriptionDropped = subscriptionDropped; - _log = log; + CancellationToken cancellationToken = default + ) { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _subscription = subscription; + _messages = messages; + _eventAppeared = eventAppeared; + _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask); + _subscriptionDropped = subscriptionDropped; + _log = log; _subscriptionDroppedInvoked = 0; - SubscriptionId = subscriptionId; + SubscriptionId = subscriptionId; _log.LogDebug("Subscription {subscriptionId} confirmed.", SubscriptionId); @@ -77,11 +89,14 @@ private async Task Subscribe() { resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position ); + await _eventAppeared(this, resolvedEvent, _cts.Token).ConfigureAwait(false); break; + case StreamMessage.AllStreamCheckpointReached (var position): await _checkpointReached(this, position, _cts.Token) .ConfigureAwait(false); + break; } } catch (Exception ex) when @@ -105,6 +120,7 @@ await _checkpointReached(this, position, _cts.Token) "Subscription {subscriptionId} was dropped because the subscriber made an error.", SubscriptionId ); + SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex); return; @@ -114,13 +130,18 @@ await _checkpointReached(this, position, _cts.Token) ex.Status.Detail.Contains("Call canceled by the client.")) { _log.LogInformation( "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); } catch (Exception ex) { if (_subscriptionDroppedInvoked == 0) { - _log.LogError(ex, + _log.LogError( + ex, "Subscription {subscriptionId} was dropped because an error occurred on the server.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); } } diff --git a/src/EventStore.Client/Diagnostics/ActivityExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs new file mode 100644 index 000000000..564aa98fe --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs @@ -0,0 +1,45 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Diagnostics; + +static class ActivityExtensions { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static TracingMetadata GetTracingMetadata(this Activity activity) + => new(activity.TraceId.ToString(), activity.SpanId.ToString()); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void SetActivityStatus(this Activity activity, ActivityStatus activityStatus) { + var (activityStatusCode, description, exception) = activityStatus; + + var statusCode = activityStatusCode switch { + ActivityStatusCode.Error => "ERROR", + ActivityStatusCode.Ok => "OK", + _ => "UNSET" + }; + + activity.SetStatus(activityStatus.StatusCode, description); + activity.SetTag(TelemetryAttributes.OtelStatusCode, statusCode); + activity.SetTag(TelemetryAttributes.OtelStatusDescription, description); + + if (exception != null) + activity.SetException(exception); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static void SetException(this Activity activity, Exception exception) { + var tags = new ActivityTagsCollection { + { TelemetryAttributes.ExceptionType, exception.GetType().Name }, + { TelemetryAttributes.ExceptionMessage, $"{exception.Message} {exception.InnerException?.Message}" }, + { TelemetryAttributes.ExceptionStacktrace, exception.StackTrace } + }; + + foreach (var tag in tags) { + activity.SetTag(tag.Key, tag.Value); + } + + activity.AddEvent(new ActivityEvent(TelemetryAttributes.ExceptionEventName, DateTimeOffset.Now, tags)); + } +} diff --git a/src/EventStore.Client/Diagnostics/ActivityStatus.cs b/src/EventStore.Client/Diagnostics/ActivityStatus.cs new file mode 100644 index 000000000..9470f7b59 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityStatus.cs @@ -0,0 +1,11 @@ +using System.Diagnostics; + +namespace EventStore.Client.Diagnostics; + +record ActivityStatus(ActivityStatusCode StatusCode, string? Description, Exception? Exception) { + public static ActivityStatus Ok(string? description = null) + => new(ActivityStatusCode.Ok, description, null); + + public static ActivityStatus Error(Exception ex, string? description = null) + => new(ActivityStatusCode.Error, description, ex); +} diff --git a/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs new file mode 100644 index 000000000..72bb05a47 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs @@ -0,0 +1,66 @@ +using System.Diagnostics; +using System.Net; +using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics.Telemetry; + +namespace EventStore.Client.Diagnostics; + +static class ActivityTagsCollectionExtensions { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ActivityTagsCollection WithTagsFrom( + this ActivityTagsCollection tags, ChannelInfo? channelInfo, EventStoreClientSettings settings + ) { + ActivityTagsCollection? serverTags = null; + + // Ensure consistent server.address attribute when connecting to multiple nodes via dns discovery + if (settings.ConnectivitySettings.GossipSeeds.Length == 1) { + var gossipSeed = settings.ConnectivitySettings.GossipSeeds[0]; + serverTags = CreateServerAttributes(gossipSeed.GetHost(), gossipSeed.GetPort()); + } else if (channelInfo != null) { + var authorityParts = channelInfo.Channel.Target.Split(':'); + serverTags = CreateServerAttributes(authorityParts[0], int.Parse(authorityParts[1])); + } + + return tags.WithTags(serverTags).WithTagsFrom(settings.DefaultCredentials); + + ActivityTagsCollection CreateServerAttributes(string? host, int? port) => new() { + { TelemetryAttributes.ServerAddress, host }, + { TelemetryAttributes.ServerPort, port } + }; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ActivityTagsCollection WithTagsFrom( + this ActivityTagsCollection tags, UserCredentials? userCredentials + ) { + return tags.WithTag(TelemetryAttributes.DatabaseUser, userCredentials?.Username); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ActivityTagsCollection WithTags(this ActivityTagsCollection current, ActivityTagsCollection? tags) + => tags == null + ? current + : tags.Aggregate(current, (newTags, tag) => newTags.WithTag(tag.Key, tag.Value)); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static ActivityTagsCollection WithTag(this ActivityTagsCollection tags, string key, object? value) { + tags[key] = value; + return tags; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static string? GetHost(this EndPoint endpoint) => + endpoint switch { + IPEndPoint ip => ip.Address.ToString(), + DnsEndPoint dns => dns.Host, + _ => null + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static int? GetPort(this EndPoint endpoint) => + endpoint switch { + IPEndPoint ip => ip.Port, + DnsEndPoint dns => dns.Port, + _ => null + }; +} diff --git a/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs new file mode 100644 index 000000000..b6a7eb280 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs @@ -0,0 +1,83 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Text.Json; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Diagnostics; + +static class EventMetadataExtensions { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ReadOnlySpan InjectTracingMetadata( + this ReadOnlyMemory rawCustomMetadata + ) { + if (Activity.Current == null) return rawCustomMetadata.Span; + + try { + using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata); + var tracingMetadata = Activity.Current.GetTracingMetadata(); + + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + writer.WriteStartObject(); + + foreach (var prop in customMetadataJson.RootElement.EnumerateObject()) + prop.WriteTo(writer); + + writer.WriteIfNotNull(TracingConstants.Metadata.TraceId, tracingMetadata.TraceId) + .WriteIfNotNull(TracingConstants.Metadata.SpanId, tracingMetadata.SpanId); + + writer.WriteEndObject(); + writer.Flush(); + + return stream.ToArray(); + } catch (Exception) { + return rawCustomMetadata.Span; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ActivityContext? RestoreTracingContext(this ReadOnlyMemory rawCustomMetadata) { + var (traceId, spanId) = rawCustomMetadata.ExtractTracingMetadata(); + + if (traceId == null || spanId == null) + return default; + + try { + return new( + ActivityTraceId.CreateFromString(traceId.ToCharArray()), + ActivitySpanId.CreateFromString(spanId.ToCharArray()), + ActivityTraceFlags.Recorded, + isRemote: true + ); + } catch (Exception) { + return default; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory rawCustomMetadata) { + try { + using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata); + + return new TracingMetadata( + customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.TraceId).GetString(), + customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.SpanId).GetString() + ); + } catch (Exception) { + return TracingMetadata.None(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static Utf8JsonWriter WriteIfNotNull( + this Utf8JsonWriter jsonWriter, string key, string? value + ) { + if (string.IsNullOrEmpty(value)) return jsonWriter; + + jsonWriter.WritePropertyName(key); + jsonWriter.WriteStringValue(value); + + return jsonWriter; + } +} diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs new file mode 100644 index 000000000..b3f13e8f0 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs @@ -0,0 +1,45 @@ +using System.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; + +namespace EventStore.Client.Diagnostics; + +static class EventStoreClientDiagnostics { + static readonly ActivitySource _activitySource = + new ActivitySource(EventStoreClientInstrumentation.ActivitySourceName); + + static readonly ActivityTagsCollection _defaultTags = [new(TelemetryAttributes.DatabaseSystem, "eventstoredb")]; + + public static Activity? StartActivity( + string operation, ActivityTagsCollection? tags, ActivityKind activityKind = default, + ActivityContext? activityContext = null + ) { + var activity = _activitySource.CreateActivity( + operation, + activityKind, + parentContext: activityContext ?? default, + new ActivityTagsCollection { + { TelemetryAttributes.DatabaseOperation, operation } + } + .WithTags(_defaultTags) + .WithTags(tags), + idFormat: ActivityIdFormat.W3C + ); + + return activity?.Start(); + } + + public static async ValueTask Trace( + Func> tracedOperation, string operationName, ActivityTagsCollection? tags = null + ) { + using var activity = StartActivity(operationName, tags, ActivityKind.Client); + + try { + var res = await tracedOperation().ConfigureAwait(false); + activity?.SetActivityStatus(ActivityStatus.Ok()); + return res; + } catch (Exception ex) { + activity?.SetActivityStatus(ActivityStatus.Error(ex)); + throw; + } + } +} diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs new file mode 100644 index 000000000..7ae9b6ba1 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs @@ -0,0 +1,5 @@ +namespace EventStore.Client.Diagnostics; + +static class EventStoreClientInstrumentation { + public const string ActivitySourceName = "eventstoredb.client"; +} diff --git a/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs new file mode 100644 index 000000000..5b63d4e36 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs @@ -0,0 +1,29 @@ +namespace EventStore.Client.Diagnostics.Telemetry; + +// The attributes below match the specification of v1.24.0 of the Open Telemetry semantic conventions. +// Some attributes are ignored where not required or relevant. +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/general/trace.md +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/database/database-spans.md +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/exceptions/exceptions-spans.md +static class TelemetryAttributes { + public const string DatabaseUser = "db.user"; + public const string DatabaseSystem = "db.system"; + public const string DatabaseOperation = "db.operation"; + + public const string ServerAddress = "server.address"; + public const string ServerPort = "server.port"; + + public const string ExceptionEventName = "exception"; + public const string ExceptionType = "exception.type"; + public const string ExceptionMessage = "exception.message"; + public const string ExceptionStacktrace = "exception.stacktrace"; + + public const string OtelStatusCode = "otel.status_code"; + public const string OtelStatusDescription = "otel.status_description"; + + // Custom attributes + public const string EventStoreStream = "db.eventstoredb.stream"; + public const string EventStoreSubscriptionId = "db.eventstoredb.subscription.id"; + public const string EventStoreEventId = "db.eventstoredb.event.id"; + public const string EventStoreEventType = "db.eventstoredb.event.type"; +} diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs new file mode 100644 index 000000000..2c2d22cf3 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs @@ -0,0 +1,13 @@ +namespace EventStore.Client.Diagnostics.Tracing; + +static class TracingConstants { + public static class Metadata { + public const string TraceId = "$traceId"; + public const string SpanId = "$spanId"; + } + + public static class Operations { + public const string Append = "streams.append"; + public const string Subscribe = "streams.subscribe"; + } +} diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs new file mode 100644 index 000000000..f907cc49d --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs @@ -0,0 +1,5 @@ +namespace EventStore.Client.Diagnostics.Tracing; + +record TracingMetadata(string? TraceId, string? SpanId) { + public static TracingMetadata None() => new(null, null); +} diff --git a/src/EventStore.Client/EventStore.Client.csproj b/src/EventStore.Client/EventStore.Client.csproj index ddf36c6af..8c9ffaaf2 100644 --- a/src/EventStore.Client/EventStore.Client.csproj +++ b/src/EventStore.Client/EventStore.Client.csproj @@ -8,6 +8,7 @@ + diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs index 39f579fcc..2b4a5b2f6 100644 --- a/src/EventStore.Client/EventStoreClientBase.cs +++ b/src/EventStore.Client/EventStoreClientBase.cs @@ -1,10 +1,7 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; using EventStore.Client.Interceptors; using Grpc.Core; using Grpc.Core.Interceptors; +using Enum = System.Enum; namespace EventStore.Client { /// @@ -13,28 +10,29 @@ namespace EventStore.Client { public abstract class EventStoreClientBase : IDisposable, // for grpc.net we can dispose synchronously, but not for grpc.core IAsyncDisposable { - - private readonly Dictionary> _exceptionMap; - private readonly CancellationTokenSource _cts; - private readonly ChannelCache _channelCache; + private readonly Dictionary> _exceptionMap; + private readonly CancellationTokenSource _cts; + private readonly ChannelCache _channelCache; private readonly SharingProvider _channelInfoProvider; - private readonly Lazy _httpFallback; - + private readonly Lazy _httpFallback; + /// The name of the connection. public string ConnectionName { get; } - + /// The . protected EventStoreClientSettings Settings { get; } /// Constructs a new . - protected EventStoreClientBase(EventStoreClientSettings? settings, - Dictionary> exceptionMap) { - Settings = settings ?? new EventStoreClientSettings(); + protected EventStoreClientBase( + EventStoreClientSettings? settings, + Dictionary> exceptionMap + ) { + Settings = settings ?? new EventStoreClientSettings(); _exceptionMap = exceptionMap; - _cts = new CancellationTokenSource(); + _cts = new CancellationTokenSource(); _channelCache = new(Settings); _httpFallback = new Lazy(() => new HttpFallback(Settings)); - + ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}"; var channelSelector = new ChannelSelector(Settings, _channelCache); @@ -43,17 +41,18 @@ protected EventStoreClientBase(EventStoreClientSettings? settings, GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token), factoryRetryDelay: Settings.ConnectivitySettings.DiscoveryInterval, initialInput: ReconnectionRequired.Rediscover.Instance, - loggerFactory: Settings.LoggerFactory); + loggerFactory: Settings.LoggerFactory + ); } - + // Select a channel and query its capabilities. This is an expensive call that // we don't want to do often. private async Task GetChannelInfoExpensive( ReconnectionRequired reconnectionRequired, Action onReconnectionRequired, IChannelSelector channelSelector, - CancellationToken cancellationToken) { - + CancellationToken cancellationToken + ) { var channel = reconnectionRequired switch { ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken) .ConfigureAwait(false), @@ -78,11 +77,10 @@ private async Task GetChannelInfoExpensive( return new(channel, caps, invoker); } - + /// Gets the current channel info. protected async ValueTask GetChannelInfo(CancellationToken cancellationToken) => await _channelInfoProvider.CurrentAsync.WithCancellation(cancellationToken).ConfigureAwait(false); - /// /// Only exists so that we can manually trigger rediscovery in the tests @@ -95,20 +93,30 @@ internal Task RediscoverAsync() { } /// Returns the result of an HTTP Get request based on the client settings. - protected async Task HttpGet(string path, Action onNotFound, ChannelInfo channelInfo, - TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) { - + protected async Task HttpGet( + string path, Action onNotFound, ChannelInfo channelInfo, + TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken + ) { return await _httpFallback.Value .HttpGetAsync(path, channelInfo, deadline, userCredentials, onNotFound, cancellationToken) .ConfigureAwait(false); } /// Executes an HTTP Post request based on the client settings. - protected async Task HttpPost(string path, string query, Action onNotFound, ChannelInfo channelInfo, - TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) { - + protected async Task HttpPost( + string path, string query, Action onNotFound, ChannelInfo channelInfo, + TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken + ) { await _httpFallback.Value - .HttpPostAsync(path, query, channelInfo, deadline, userCredentials, onNotFound, cancellationToken) + .HttpPostAsync( + path, + query, + channelInfo, + deadline, + userCredentials, + onNotFound, + cancellationToken + ) .ConfigureAwait(false); } @@ -118,7 +126,7 @@ public virtual void Dispose() { _cts.Cancel(); _cts.Dispose(); _channelCache.Dispose(); - + if (_httpFallback.IsValueCreated) { _httpFallback.Value.Dispose(); } diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs index 989a6b6bf..6cd5b813d 100644 --- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs @@ -5,7 +5,8 @@ namespace EventStore.Client.Streams.Tests.Append; [Trait("Category", "Target:Stream")] [Trait("Category", "Operation:Append")] -public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) { +public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) + : EventStoreTests(output, fixture) { public static IEnumerable ExpectedVersionCreateStreamTestCases() { yield return new object?[] { StreamState.Any }; yield return new object?[] { StreamState.NoStream }; @@ -18,7 +19,12 @@ public async Task appending_zero_events(StreamState expectedStreamState) { const int iterations = 2; for (var i = 0; i < iterations; i++) { - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, expectedStreamState, Enumerable.Empty()); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + expectedStreamState, + Enumerable.Empty() + ); + writeResult.NextExpectedStreamRevision.ShouldBe(StreamRevision.None); } @@ -34,7 +40,12 @@ public async Task appending_zero_events_again(StreamState expectedStreamState) { const int iterations = 2; for (var i = 0; i < iterations; i++) { - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, expectedStreamState, Enumerable.Empty()); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + expectedStreamState, + Enumerable.Empty() + ); + Assert.Equal(StreamRevision.None, writeResult.NextExpectedStreamRevision); } @@ -87,7 +98,8 @@ public async Task multiple_idempotent_writes_with_same_id_bug_case() { } [Fact] - public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_any_then_next_expected_version_is_unreliable() { + public async Task + in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_any_then_next_expected_version_is_unreliable() { var stream = Fixture.GetStreamName(); var evnt = Fixture.CreateTestEvents().First(); @@ -103,7 +115,8 @@ public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same } [Fact] - public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_nostream_then_next_expected_version_is_correct() { + public async Task + in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_nostream_then_next_expected_version_is_correct() { var stream = Fixture.GetStreamName(); var evnt = Fixture.CreateTestEvents().First(); @@ -280,18 +293,20 @@ await Fixture.Streams.AppendToStreamAsync( } [Fact] - public async Task appending_with_stream_exists_expected_version_and_stream_does_not_exist_throws_wrong_expected_version() { + public async Task + appending_with_stream_exists_expected_version_and_stream_does_not_exist_throws_wrong_expected_version() { var stream = Fixture.GetStreamName(); var ex = await Fixture.Streams .AppendToStreamAsync(stream, StreamState.StreamExists, Fixture.CreateTestEvents()) .ShouldThrowAsync(); - + ex.ActualStreamRevision.ShouldBe(StreamRevision.None); } [Fact] - public async Task appending_with_stream_exists_expected_version_and_stream_does_not_exist_returns_wrong_expected_version() { + public async Task + appending_with_stream_exists_expected_version_and_stream_does_not_exist_returns_wrong_expected_version() { var stream = Fixture.GetStreamName(); var writeResult = await Fixture.Streams.AppendToStreamAsync( @@ -334,7 +349,11 @@ await Fixture.Streams public async Task can_append_multiple_events_at_once() { var stream = Fixture.GetStreamName(); - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(100)); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents(100) + ); Assert.Equal(new(99), writeResult.NextExpectedStreamRevision); } @@ -387,7 +406,7 @@ public async Task returns_failure_status_when_conditionally_appending_to_a_delet Assert.Equal(ConditionalWriteResult.StreamDeleted, result); } - + [Fact] public async Task expected_version_no_stream() { var result = await Fixture.Streams.AppendToStreamAsync( @@ -409,7 +428,7 @@ public async Task expected_version_no_stream_returns_position() { Assert.True(result.LogPosition > Position.Start); } - + [Fact] public async Task with_timeout_any_stream_revision_fails_when_operation_expired() { var stream = Fixture.GetStreamName(); @@ -429,7 +448,7 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() { var stream = Fixture.GetStreamName(); await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, Fixture.CreateTestEvents()); - + var ex = await Fixture.Streams.AppendToStreamAsync( stream, new StreamRevision(0), @@ -448,32 +467,14 @@ await Fixture.Streams .AppendToStreamAsync( streamName, StreamRevision.None, - GetEvents(), + Fixture.CreateTestEventsThatThrowsException(), userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!) ) - .ShouldThrowAsync(); + .ShouldThrowAsync(); var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) .ReadState; state.ShouldBe(ReadState.StreamNotFound); - - return; - - IEnumerable GetEvents() { - for (var i = 0; i < 5; i++) { - if (i % 3 == 0) - throw new EnumerationFailedException(); - - yield return Fixture.CreateTestEvents(1).First(); - } - } - } - - class EnumerationFailedException : Exception { } - - public static IEnumerable ArgumentOutOfRangeTestCases() { - yield return new object?[] { StreamState.Any }; - yield return new object?[] { ulong.MaxValue - 1UL }; } } diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs index f0f7cb9dc..2e9879a4a 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs @@ -7,24 +7,30 @@ public class subscribe_to_all(ITestOutputHelper output, SubscriptionsFixture fix [Fact] public async Task receives_all_events_from_start() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -52,7 +58,7 @@ public async Task receives_all_events_from_end() { var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.End); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -60,8 +66,11 @@ public async Task receives_all_events_from_end() { // add the events we want to receive after we start the subscription foreach (var evt in seedEvents) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -85,28 +94,34 @@ async Task Subscribe() { [Fact] public async Task receives_all_events_from_position() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", - StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); await using var subscription = Fixture.Streams.SubscribeToAll(position); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -131,13 +146,13 @@ async Task Subscribe() { public async Task receives_all_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, true); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -178,11 +193,15 @@ public async Task receives_all_filtered_events_from_start(SubscriptionFilter fil var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // Debugging: @@ -191,13 +210,16 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add some of the events we want to see before we start the subscription foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -205,8 +227,11 @@ await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid() // add some of the events we want to see after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -223,6 +248,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -254,11 +280,15 @@ public async Task receives_all_filtered_events_from_end(SubscriptionFilter filte var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // Debugging: @@ -267,13 +297,16 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add some of the events we want to see before we start the subscription foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.End, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -281,8 +314,11 @@ await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid() // add some of the events we want to see after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -299,6 +335,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -330,25 +367,32 @@ public async Task receives_all_filtered_events_from_position(SubscriptionFilter var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // add some of the events that are a match to the filter but will not be received IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", - StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(position, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -356,8 +400,11 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add the events we want to receive after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -374,6 +421,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -392,7 +440,7 @@ async Task Subscribe() { public async Task receives_all_filtered_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); @@ -403,6 +451,7 @@ public async Task receives_all_filtered_events_with_resolved_links() { await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, true, filterOptions: filterOptions); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs index 3cc26c8a1..d38391e8f 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs @@ -9,14 +9,14 @@ public async Task receives_all_events_from_start() { var streamName = Fixture.GetStreamName(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents.Take(pageSize)); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -48,25 +48,29 @@ public async Task receives_all_events_from_position() { var streamName = Fixture.GetStreamName(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); var writeResult = await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents.Take(pageSize)); + var streamPosition = StreamPosition.FromStreamRevision(writeResult.NextExpectedStreamRevision); - var checkpoint = FromStream.After(streamPosition); + var checkpoint = FromStream.After(streamPosition); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, checkpoint); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); - await Fixture.Streams.AppendToStreamAsync(streamName, writeResult.NextExpectedStreamRevision, - seedEvents.Skip(pageSize)); + await Fixture.Streams.AppendToStreamAsync( + streamName, + writeResult.NextExpectedStreamRevision, + seedEvents.Skip(pageSize) + ); await Subscribe().WithTimeout(); @@ -96,7 +100,7 @@ public async Task receives_all_events_from_non_existing_stream() { var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -132,14 +136,14 @@ public async Task allow_multiple_subscriptions_to_same_stream() { await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription1 = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator1 = subscription1.Messages.GetAsyncEnumerator(); + await using var enumerator1 = subscription1.Messages.GetAsyncEnumerator(); Assert.True(await enumerator1.MoveNextAsync()); Assert.IsType(enumerator1.Current); await using var subscription2 = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator2 = subscription2.Messages.GetAsyncEnumerator(); + await using var enumerator2 = subscription2.Messages.GetAsyncEnumerator(); Assert.True(await enumerator2.MoveNextAsync()); @@ -171,7 +175,7 @@ public async Task drops_when_stream_tombstoned() { var streamName = Fixture.GetStreamName(); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -180,9 +184,11 @@ public async Task drops_when_stream_tombstoned() { // rest in peace await Fixture.Streams.TombstoneAsync(streamName, StreamState.NoStream); - var ex = await Assert.ThrowsAsync(async () => { - while (await enumerator.MoveNextAsync()) { } - }).WithTimeout(); + var ex = await Assert.ThrowsAsync( + async () => { + while (await enumerator.MoveNextAsync()) { } + } + ).WithTimeout(); ex.ShouldBeOfType().Stream.ShouldBe(streamName); } @@ -191,13 +197,14 @@ public async Task drops_when_stream_tombstoned() { public async Task receives_all_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription = Fixture.Streams.SubscribeToStream($"$et-{EventStoreFixture.TestEventType}", FromStream.Start, true); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs index d1b6740d2..dc85c7446 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs @@ -17,6 +17,14 @@ public string GetStreamName([CallerMemberName] string? testMethod = null) => public IEnumerable CreateTestEvents(int count = 1, string? type = null, int metadataSize = 1) => Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadataSize)); + public IEnumerable CreateTestEventsThatThrowsException() { + // Ensure initial IEnumerator.Current does not throw + yield return CreateTestEvent(1); + + // Throw after enumerator advances + throw new Exception(); + } + protected static EventData CreateTestEvent(int index) => CreateTestEvent(index, TestEventType, 1); protected static EventData CreateTestEvent(int index, string type, int metadataSize) => diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs index 1744ceda6..50faebf7a 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs @@ -8,7 +8,10 @@ namespace EventStore.Client.Tests; -public record EventStoreFixtureOptions(EventStoreClientSettings ClientSettings, IDictionary Environment) { +public record EventStoreFixtureOptions( + EventStoreClientSettings ClientSettings, + IDictionary Environment +) { public EventStoreFixtureOptions RunInMemory(bool runInMemory = true) => this with { Environment = Environment.With(x => x["EVENTSTORE_MEM_DB"] = runInMemory.ToString()) }; @@ -24,7 +27,7 @@ this with { public EventStoreFixtureOptions WithoutDefaultCredentials() => this with { ClientSettings = ClientSettings.With(x => x.DefaultCredentials = null) }; - + public EventStoreFixtureOptions WithMaxAppendSize(uint maxAppendSize) => this with { Environment = Environment.With(x => x["EVENTSTORE_MAX_APPEND_SIZE"] = $"{maxAppendSize}") }; } @@ -54,8 +57,7 @@ protected EventStoreFixture(ConfigureFixture configure) { if (GlobalEnvironment.UseCluster) { Options = configure(EventStoreTestCluster.DefaultOptions()); Service = new EventStoreTestCluster(Options); - } - else { + } else { Options = configure(EventStoreTestNode.DefaultOptions()); Service = new EventStoreTestNode(Options); } @@ -64,7 +66,7 @@ protected EventStoreFixture(ConfigureFixture configure) { List TestRuns { get; } = new(); public ILogger Log => Logger; - + public ITestService Service { get; } public EventStoreFixtureOptions Options { get; } public Faker Faker { get; } = new Faker(); @@ -80,7 +82,7 @@ protected EventStoreFixture(ConfigureFixture configure) { public Func OnSetup { get; init; } = () => Task.CompletedTask; public Func OnTearDown { get; init; } = () => Task.CompletedTask; - + /// /// must test this /// @@ -96,62 +98,62 @@ protected EventStoreFixture(ConfigureFixture configure) { DefaultCredentials = Options.ClientSettings.DefaultCredentials, DefaultDeadline = Options.ClientSettings.DefaultDeadline }; - + InterlockedBoolean WarmUpCompleted { get; } = new InterlockedBoolean(); SemaphoreSlim WarmUpGatekeeper { get; } = new(1, 1); - public void CaptureTestRun(ITestOutputHelper outputHelper) { var testRunId = Logging.CaptureLogs(outputHelper); TestRuns.Add(testRunId); Logger.Information(">>> Test Run {TestRunId} {Operation} <<<", testRunId, "starting"); Service.ReportStatus(); } - + public async Task InitializeAsync() { await Service.Start(); EventStoreVersion = GetEventStoreVersion(); EventStoreHasLastStreamPosition = (EventStoreVersion?.Major ?? int.MaxValue) >= 21; - + await WarmUpGatekeeper.WaitAsync(); - + try { if (!WarmUpCompleted.CurrentValue) { Logger.Warning("*** Warmup started ***"); await Task.WhenAll( InitClient(async x => Users = await x.WarmUp()), - InitClient(async x => Streams = await x.WarmUp()), - InitClient(async x => Projections = await x.WarmUp(), Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None"), + InitClient(async x => Streams = await x.WarmUp()), + InitClient( + async x => Projections = await x.WarmUp(), + Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None" + ), InitClient(async x => Subscriptions = await x.WarmUp()), - InitClient(async x => Operations = await x.WarmUp()) + InitClient(async x => Operations = await x.WarmUp()) ); - + WarmUpCompleted.EnsureCalledOnce(); - + Logger.Warning("*** Warmup completed ***"); - } - else { + } else { Logger.Information("*** Warmup skipped ***"); } - } - finally { + } finally { WarmUpGatekeeper.Release(); } - + await OnSetup(); - + return; async Task InitClient(Func action, bool execute = true) where T : EventStoreClientBase { if (!execute) return default(T)!; + var client = (Activator.CreateInstance(typeof(T), new object?[] { ClientSettings }) as T)!; await action(client); return client; } - - + static Version GetEventStoreVersion() { const string versionPrefix = "EventStoreDB version"; @@ -166,7 +168,10 @@ static Version GetEventStoreVersion() { using var log = eventstore.Logs(true, cancellator.Token); foreach (var line in log.ReadToEnd()) { if (line.StartsWith(versionPrefix) && - Version.TryParse(new string(ReadVersion(line[(versionPrefix.Length + 1)..]).ToArray()), out var version)) { + Version.TryParse( + new string(ReadVersion(line[(versionPrefix.Length + 1)..]).ToArray()), + out var version + )) { return version; } } @@ -184,8 +189,7 @@ IEnumerable ReadVersion(string s) { public async Task DisposeAsync() { try { await OnTearDown(); - } - catch { + } catch { // ignored } @@ -206,11 +210,13 @@ public class EventStoreSharedDatabaseFixture : ICollectionFixture : IClassFixture where TFixture : EventStoreFixture { - protected EventStoreTests(ITestOutputHelper output, TFixture fixture) => Fixture = fixture.With(x => x.CaptureTestRun(output)); - + protected EventStoreTests(ITestOutputHelper output, TFixture fixture) => + Fixture = fixture.With(x => x.CaptureTestRun(output)); + protected TFixture Fixture { get; } } [Collection(nameof(EventStoreSharedDatabaseFixture))] -public abstract class EventStoreSharedDatabaseTests(ITestOutputHelper output, TFixture fixture) : EventStoreTests(output, fixture) +public abstract class EventStoreSharedDatabaseTests(ITestOutputHelper output, TFixture fixture) + : EventStoreTests(output, fixture) where TFixture : EventStoreFixture; diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs index 11f247691..c566e9263 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs @@ -45,7 +45,7 @@ public static EventStoreFixtureOptions DefaultOptions() { // TODO SS: must find a way to enable parallel tests on CI. It works locally. if (port != NetworkPortProvider.DefaultEsdbPort) { - if (GlobalEnvironment.Variables.TryGetValue("ES_DOCKER_TAG", out var tag) && tag == "ci") + if (GlobalEnvironment.Variables.TryGetValue("ES_DOCKER_TAG", out var tag) && tag == "24.2.0-alpha-arm64v8") defaultEnvironment["EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS"] = $"{port}"; else defaultEnvironment["EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS"] = $"{port}"; diff --git a/test/EventStore.Client.Tests.Common/GlobalEnvironment.cs b/test/EventStore.Client.Tests.Common/GlobalEnvironment.cs index 57c632154..856fcb5ad 100644 --- a/test/EventStore.Client.Tests.Common/GlobalEnvironment.cs +++ b/test/EventStore.Client.Tests.Common/GlobalEnvironment.cs @@ -23,7 +23,7 @@ static void EnsureDefaults(IConfiguration configuration) { configuration.EnsureValue("ES_USE_EXTERNAL_SERVER", "false"); configuration.EnsureValue("ES_DOCKER_REGISTRY", "ghcr.io/eventstore/eventstore"); - configuration.EnsureValue("ES_DOCKER_TAG", "ci"); + configuration.EnsureValue("ES_DOCKER_TAG", "24.2.0-alpha-arm64v8"); configuration.EnsureValue("ES_DOCKER_IMAGE", $"{configuration["ES_DOCKER_REGISTRY"]}:{configuration["ES_DOCKER_TAG"]}"); configuration.EnsureValue("EVENTSTORE_MEM_DB", "false"); @@ -48,7 +48,7 @@ static void EnsureDefaults(IConfiguration configuration) { //[Obsolete("Use the EventStoreFixture instead so you don't have to use this method.", false)] public static IDictionary GetEnvironmentVariables(IDictionary? overrides = null) { var env = new Dictionary { - ["ES_DOCKER_TAG"] = "ci", + ["ES_DOCKER_TAG"] = "24.2.0-alpha-arm64v8", ["EVENTSTORE_DB_LOG_FORMAT"] = "V2", }; diff --git a/test/EventStore.Client.Tests/Diagnostics/DiagnosticsFixture.cs b/test/EventStore.Client.Tests/Diagnostics/DiagnosticsFixture.cs new file mode 100644 index 000000000..dddef888d --- /dev/null +++ b/test/EventStore.Client.Tests/Diagnostics/DiagnosticsFixture.cs @@ -0,0 +1,25 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using EventStore.Client.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; + +namespace EventStore.Client.Tests.Diagnostics; + +public class DiagnosticsFixture : EventStoreFixture { + readonly ConcurrentBag _activities = []; + + public DiagnosticsFixture() => ActivitySource.AddActivityListener( + new ActivityListener { + ShouldListenTo = source => source.Name == EventStoreClientInstrumentation.ActivitySourceName, + Sample = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => _activities.Add(activity) + } + ); + + public IEnumerable GetActivitiesForOperation(string operation, string stream) + => _activities.Where( + activity => (string?)activity.GetTagItem(TelemetryAttributes.DatabaseOperation) == operation + && (string?)activity.GetTagItem(TelemetryAttributes.EventStoreStream) == stream + ); +} diff --git a/test/EventStore.Client.Tests/Diagnostics/TracingInstrumentationTests.cs b/test/EventStore.Client.Tests/Diagnostics/TracingInstrumentationTests.cs new file mode 100644 index 000000000..3c8037c4d --- /dev/null +++ b/test/EventStore.Client.Tests/Diagnostics/TracingInstrumentationTests.cs @@ -0,0 +1,113 @@ +using System.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Tests.Diagnostics; + +[Trait("Category", "Diagnostics:Tracing")] +public class TracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) + : EventStoreTests(output, fixture) { + [Fact] + public async Task NonBatchAppendIsInstrumentedWithTracingAsExpected() { + var stream = Fixture.GetStreamName(); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents(), + userCredentials: TestCredentials.Root + ); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + AssertAppendActivityHasExpectedTags(activity, stream); + } + + [Fact] + public async Task BatchAppendIsInstrumentedWithTracingAsExpected() { + var stream = Fixture.GetStreamName(); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents() + ); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + AssertAppendActivityHasExpectedTags(activity, stream); + } + + [Fact] + public async Task NonBatchAppendTraceIsTaggedWithErrorStatusOnException() { + var stream = Fixture.GetStreamName(); + + var actualException = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEventsThatThrowsException() + ).ShouldThrowAsync(); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + AssertErroneousAppendActivityHasExpectedTags(activity, actualException); + } + + [Fact] + public async Task BatchAppendTraceIsTaggedWithErrorStatusOnException() { + var stream = Fixture.GetStreamName(); + + var actualException = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEventsThatThrowsException(), + userCredentials: TestCredentials.Root + ).ShouldThrowAsync(); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + AssertErroneousAppendActivityHasExpectedTags(activity, actualException); + } + + static void AssertErroneousAppendActivityHasExpectedTags( + Activity activity, Exception actualException + ) { + var expectedTags = new Dictionary { + { TelemetryAttributes.OtelStatusCode, "ERROR" }, + { TelemetryAttributes.ExceptionType, actualException.GetType().Name }, { + TelemetryAttributes.ExceptionMessage, + $"{actualException.Message} {actualException.InnerException?.Message}" + } + }; + + foreach (var tag in expectedTags) + activity.Tags.ShouldContain(tag); + + activity.GetTagItem(TelemetryAttributes.ExceptionStacktrace).ShouldNotBeNull(); + } + + static void AssertAppendActivityHasExpectedTags(Activity activity, string stream) { + var expectedTags = new Dictionary { + { TelemetryAttributes.DatabaseSystem, "eventstoredb" }, + { TelemetryAttributes.DatabaseOperation, TracingConstants.Operations.Append }, + { TelemetryAttributes.EventStoreStream, stream }, + { TelemetryAttributes.DatabaseUser, TestCredentials.Root.Username }, + { TelemetryAttributes.OtelStatusCode, "OK" } + }; + + foreach (var tag in expectedTags) + activity.Tags.ShouldContain(tag); + } +}