From 50ad0383c635beb49802a2ffda318f59f13432ef Mon Sep 17 00:00:00 2001 From: Joseph Cummings Date: Tue, 9 Apr 2024 18:13:32 +0100 Subject: [PATCH] Add support for authenticating a user with an X.509 cer --- EventStore.Client.sln | 7 + README.md | 18 + gencert.ps1 | 6 +- gencert.sh | 6 +- .../secure-with-tls/docker-compose.certs.yml | 2 +- src/Directory.Build.props | 6 +- src/EventStore.Client.Common/Constants.cs | 10 +- ...ore.Client.Extensions.OpenTelemetry.csproj | 20 + .../TracerProviderBuilderExtensions.cs | 19 + ...StorePersistentSubscriptionsClient.Read.cs | 267 ++++++++----- .../EventStoreClient.Append.cs | 354 +++++++++++------- .../EventStoreClient.Metadata.cs | 2 +- .../EventStoreClient.Subscriptions.cs | 188 ++++++---- .../EventStoreClient.cs | 90 +++-- .../StreamSubscription.cs | 69 ++-- .../Diagnostics/ActivityExtensions.cs | 45 +++ .../Diagnostics/ActivityStatus.cs | 11 + .../ActivityTagsCollectionExtensions.cs | 51 +++ .../Diagnostics/EventMetadataExtensions.cs | 84 +++++ .../EventStoreClientDiagnostics.cs | 77 ++++ .../EventStoreClientInstrumentation.cs | 5 + .../Telemetry/TelemetryAttributes.cs | 29 ++ .../Diagnostics/Tracing/TracingConstants.cs | 13 + .../Diagnostics/Tracing/TracingMetadata.cs | 5 + .../EventStore.Client.csproj | 1 + src/EventStore.Client/EventStoreClientBase.cs | 68 ++-- ...ubscriptionsTracingInstrumentationTests.cs | 74 ++++ .../TracingInstrumentationTests.cs | 10 + .../Append/append_to_stream.cs | 65 ++-- .../StreamsTracingInstrumentationTests.cs | 103 +++++ .../Subscriptions/subscribe_to_all.cs | 127 +++++-- .../Subscriptions/subscribe_to_stream.cs | 37 +- .../Fixtures/Base/EventStoreTestServer.cs | 2 +- .../Fixtures/CertificatesManager.cs | 4 +- .../Fixtures/DiagnosticsFixture.cs | 85 +++++ .../Fixtures/EventStoreFixture.Helpers.cs | 36 +- .../Fixtures/EventStoreFixture.cs | 66 ++-- .../Fixtures/EventStoreTestNode.cs | 2 +- .../docker-compose.certs.yml | 4 +- .../docker-compose.cluster.yml | 2 +- .../docker-compose.yml | 2 +- 41 files changed, 1545 insertions(+), 527 deletions(-) create mode 100644 src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj create mode 100644 src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityStatus.cs create mode 100644 src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs create mode 100644 src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs create mode 100644 src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs create mode 100644 src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs create mode 100644 src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs create mode 100644 src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs create mode 100644 test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs create mode 100644 test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs create mode 100644 test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs create mode 100644 test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs diff --git a/EventStore.Client.sln b/EventStore.Client.sln index 51229f72c..84919f42b 100644 --- a/EventStore.Client.sln +++ b/EventStore.Client.sln @@ -33,6 +33,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.UserManag EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests.Common", "test\EventStore.Client.Tests.Common\EventStore.Client.Tests.Common.csproj", "{E326832D-DE52-4DE4-9E54-C800908B75F3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Extensions.OpenTelemetry", "src\EventStore.Client.Extensions.OpenTelemetry\EventStore.Client.Extensions.OpenTelemetry.csproj", "{3723933C-585A-49BE-98E8-52D3FAD904CE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -94,6 +96,10 @@ Global {E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.Build.0 = Debug|Any CPU {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.ActiveCfg = Release|Any CPU {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.Build.0 = Release|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.ActiveCfg = Debug|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.Build.0 = Debug|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.ActiveCfg = Release|Any CPU + {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {D3744A86-DD35-4104-AAEE-84B79062C4A2} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8} @@ -109,5 +115,6 @@ Global {6CEB731F-72E1-461F-A6B3-54DBF3FD786C} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} {22634CEE-4F7B-4679-A48D-38A2A8580ECA} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} {E326832D-DE52-4DE4-9E54-C800908B75F3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340} + {3723933C-585A-49BE-98E8-52D3FAD904CE} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8} EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 37657a3df..27e3feb40 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,24 @@ Reference the nuget package(s) for the API that you would like to call [User Management](https://www.nuget.org/packages/EventStore.Client.Grpc.UserManagement) +## Open Telemetry + +Telemetry instrumentation can be enabled by installing the [Open Telemetry Extensions](https://www.nuget.org/packages/EventStore.Client.Extensions.OpenTelemetry) package. + +Once installed you can configure instrumentation using the `AddEventStoreClientInstrumentation` extension method on a `TracerProviderBuilder`. + +```csharp +using var tracerProvider = Sdk.CreateTracerProviderBuilder() + ... + .AddEventStoreClientInstrumentation() + ... + .Build(); +``` + +Tracing is the only telemetry currently exported, specifically for the `Append` and `Subscribe` (Catchup and Persistent) operations. + +For more information about Open Telemetry, refer to the [official documentation](https://opentelemetry.io/docs/what-is-opentelemetry/). + ## Support Information on support and commercial tools such as LDAP authentication can be found here: [Event Store Support](https://eventstore.com/support/). diff --git a/gencert.ps1 b/gencert.ps1 index 3908f57e8..593e9c386 100644 --- a/gencert.ps1 +++ b/gencert.ps1 @@ -7,13 +7,13 @@ New-Item -ItemType Directory -Path .\certs -Force icacls .\certs /grant:r "$($env:UserName):(OI)(CI)RX" # Pull the Docker image -docker pull eventstore/es-gencert-cli:1.0.2 +docker pull ghcr.io/eventstore/es-gencert-cli:1.3.0 # Create CA certificate -docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId eventstore/es-gencert-cli:1.0.2 create-ca -out /tmp/ca +docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId ghcr.io/eventstore/es-gencert-cli:1.3.0 create-ca -out /tmp/ca # Create node certificate -docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId eventstore/es-gencert-cli:1.0.2 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost +docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId ghcr.io/eventstore/es-gencert-cli:1.3.0 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost # Set permissions recursively for the directory icacls .\certs /grant:r "$($env:UserName):(OI)(CI)RX" diff --git a/gencert.sh b/gencert.sh index fa640f624..2d8d5e346 100755 --- a/gencert.sh +++ b/gencert.sh @@ -13,11 +13,11 @@ mkdir -p certs chmod 0755 ./certs -docker pull eventstore/es-gencert-cli:1.0.2 +docker pull ghcr.io/eventstore/es-gencert-cli:1.3.0 -docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) eventstore/es-gencert-cli:1.0.2 create-ca -out /tmp/ca +docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) ghcr.io/eventstore/es-gencert-cli:1.3.0 create-ca -out /tmp/ca -docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) eventstore/es-gencert-cli:1.0.2 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost +docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) ghcr.io/eventstore/es-gencert-cli:1.3.0 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost chmod -R 0755 ./certs diff --git a/samples/secure-with-tls/docker-compose.certs.yml b/samples/secure-with-tls/docker-compose.certs.yml index 56c6278dc..03d2e225f 100644 --- a/samples/secure-with-tls/docker-compose.certs.yml +++ b/samples/secure-with-tls/docker-compose.certs.yml @@ -16,7 +16,7 @@ services: network_mode: none cert-gen: - image: eventstore/es-gencert-cli:1.0.2 + image: ghcr.io/eventstore/es-gencert-cli:1.3.0 container_name: cert-gen user: "1000:1000" entrypoint: [ "/bin/sh","-c" ] diff --git a/src/Directory.Build.props b/src/Directory.Build.props index c12d9449c..609de534e 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -4,7 +4,7 @@ EventStore.Client - + $(MSBuildProjectName.Remove(0,18)) $(ESPackageIdSuffix.ToLower()).proto ../EventStore.Client.Common/protos/$(ESProto) @@ -50,6 +50,10 @@ + + + + <_Parameter1>$(ProjectName).Tests diff --git a/src/EventStore.Client.Common/Constants.cs b/src/EventStore.Client.Common/Constants.cs index 3e0279e6b..a088bcab2 100644 --- a/src/EventStore.Client.Common/Constants.cs +++ b/src/EventStore.Client.Common/Constants.cs @@ -39,10 +39,10 @@ public static class Exceptions { } public static class Metadata { - public const string Type = "type"; - public const string Created = "created"; - public const string ContentType = "content-type"; - public static readonly string[] RequiredMetadata = { Type, ContentType }; + public const string Type = "type"; + public const string Created = "created"; + public const string ContentType = "content-type"; + public static readonly string[] RequiredMetadata = { Type, ContentType }; public static class ContentTypes { public const string ApplicationJson = "application/json"; @@ -58,4 +58,4 @@ public static class Headers { public const string ConnectionName = "connection-name"; public const string RequiresLeader = "requires-leader"; } -} \ No newline at end of file +} diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj new file mode 100644 index 000000000..2783f0adc --- /dev/null +++ b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj @@ -0,0 +1,20 @@ + + + + EventStore.Client.Extensions.OpenTelemetry + + + + EventStore.Client.Extensions.OpenTelemetry + Extensions used to facilitate instrumentation of the EventStore Client. + + + + + + + + + + + diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs new file mode 100644 index 000000000..30e1de843 --- /dev/null +++ b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs @@ -0,0 +1,19 @@ +using EventStore.Client.Diagnostics; +using JetBrains.Annotations; +using OpenTelemetry.Trace; + +namespace EventStore.Client.Extensions.OpenTelemetry; + +/// +/// Extension methods used to facilitate tracing instrumentation of the EventStore Client. +/// +[PublicAPI] +public static class TracerProviderBuilderExtensions { + /// + /// Adds the EventStore client ActivitySource name to the list of subscribed sources on the + /// + /// being configured. + /// The instance of to chain configuration. + public static TracerProviderBuilder AddEventStoreClientInstrumentation(this TracerProviderBuilder builder) + => builder.AddSource(EventStoreClientInstrumentation.ActivitySourceName); +} diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs index 148ebcf8d..7f671dcfa 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs @@ -1,4 +1,5 @@ using System.Threading.Channels; +using EventStore.Client.Diagnostics; using EventStore.Client.PersistentSubscriptions; using Grpc.Core; using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase; @@ -12,18 +13,28 @@ partial class EventStorePersistentSubscriptionsClient { /// /// [Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)] - public async Task SubscribeAsync(string streamName, string groupName, + public async Task SubscribeAsync( + string streamName, string groupName, Func eventAppeared, Action? subscriptionDropped = null, UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { if (autoAck) { throw new InvalidOperationException( - $"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead."); + $"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead." + ); } - return await SubscribeToStreamAsync(streamName, groupName, eventAppeared, subscriptionDropped, - userCredentials, bufferSize, cancellationToken).ConfigureAwait(false); + return await SubscribeToStreamAsync( + streamName, + groupName, + eventAppeared, + subscriptionDropped, + userCredentials, + bufferSize, + cancellationToken + ).ConfigureAwait(false); } /// @@ -32,15 +43,26 @@ public async Task SubscribeAsync(string streamName, stri /// /// /// - [Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)] - public async Task SubscribeToStreamAsync(string streamName, string groupName, - Func eventAppeared, - Action? subscriptionDropped = null, - UserCredentials? userCredentials = null, int bufferSize = 10, - CancellationToken cancellationToken = default) { + [Obsolete( + "SubscribeToStreamAsync is no longer supported. Use SubscribeToStream with manual acks instead.", + false + )] + public async Task SubscribeToStreamAsync( + string streamName, string groupName, + Func eventAppeared, + Action? subscriptionDropped = null, + UserCredentials? userCredentials = null, int bufferSize = 10, + CancellationToken cancellationToken = default + ) { return await PersistentSubscription - .Confirm(SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken), - eventAppeared, subscriptionDropped ?? delegate { }, _log, userCredentials, cancellationToken) + .Confirm( + SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken), + eventAppeared, + subscriptionDropped ?? delegate { }, + _log, + userCredentials, + cancellationToken + ) .ConfigureAwait(false); } @@ -53,8 +75,10 @@ public async Task SubscribeToStreamAsync(string streamNa /// The optional user credentials to perform operation with. /// The optional . /// - public PersistentSubscriptionResult SubscribeToStream(string streamName, string groupName, int bufferSize = 10, - UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + public PersistentSubscriptionResult SubscribeToStream( + string streamName, string groupName, int bufferSize = 10, + UserCredentials? userCredentials = null, CancellationToken cancellationToken = default + ) { if (streamName == null) { throw new ArgumentNullException(nameof(streamName)); } @@ -77,7 +101,7 @@ public PersistentSubscriptionResult SubscribeToStream(string streamName, string var readOptions = new ReadReq.Types.Options { BufferSize = bufferSize, - GroupName = groupName, + GroupName = groupName, UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() } }; @@ -87,29 +111,48 @@ public PersistentSubscriptionResult SubscribeToStream(string streamName, string readOptions.StreamIdentifier = streamName; } - return new PersistentSubscriptionResult(streamName, groupName, async ct => { - var channelInfo = await GetChannelInfo(ct).ConfigureAwait(false); - - if (streamName == SystemStreams.AllStream && - !channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) { - throw new NotSupportedException("The server does not support persistent subscriptions to $all."); - } + return new PersistentSubscriptionResult( + streamName, + groupName, + async ct => { + var channelInfo = await GetChannelInfo(ct).ConfigureAwait(false); + + if (streamName == SystemStreams.AllStream && + !channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) { + throw new NotSupportedException( + "The server does not support persistent subscriptions to $all." + ); + } - return channelInfo.CallInvoker; - }, new() { Options = readOptions }, Settings, userCredentials, cancellationToken); + return channelInfo; + }, + new() { Options = readOptions }, + Settings, + userCredentials, + cancellationToken + ); } /// /// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged /// [Obsolete("SubscribeToAllAsync is no longer supported. Use SubscribeToAll with manual acks instead.", false)] - public async Task SubscribeToAllAsync(string groupName, - Func eventAppeared, - Action? subscriptionDropped = null, - UserCredentials? userCredentials = null, int bufferSize = 10, - CancellationToken cancellationToken = default) => - await SubscribeToStreamAsync(SystemStreams.AllStream, groupName, eventAppeared, subscriptionDropped, - userCredentials, bufferSize, cancellationToken) + public async Task SubscribeToAllAsync( + string groupName, + Func eventAppeared, + Action? subscriptionDropped = null, + UserCredentials? userCredentials = null, int bufferSize = 10, + CancellationToken cancellationToken = default + ) => + await SubscribeToStreamAsync( + SystemStreams.AllStream, + groupName, + eventAppeared, + subscriptionDropped, + userCredentials, + bufferSize, + cancellationToken + ) .ConfigureAwait(false); /// @@ -120,31 +163,33 @@ await SubscribeToStreamAsync(SystemStreams.AllStream, groupName, eventAppeared, /// The optional user credentials to perform operation with. /// The optional . /// - public PersistentSubscriptionResult SubscribeToAll(string groupName, int bufferSize = 10, - UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) => + public PersistentSubscriptionResult SubscribeToAll( + string groupName, int bufferSize = 10, + UserCredentials? userCredentials = null, CancellationToken cancellationToken = default + ) => SubscribeToStream(SystemStreams.AllStream, groupName, bufferSize, userCredentials, cancellationToken); /// public class PersistentSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable { - private const int MaxEventIdLength = 2000; - private readonly ReadReq _request; + private const int MaxEventIdLength = 2000; + private readonly ReadReq _request; private readonly Channel _channel; - private readonly CancellationTokenSource _cts; - private readonly CallOptions _callOptions; + private readonly CancellationTokenSource _cts; + private readonly CallOptions _callOptions; private AsyncDuplexStreamingCall? _call; - private int _messagesEnumerated; + private int _messagesEnumerated; /// /// The server-generated unique identifier for the subscription. /// public string? SubscriptionId { get; private set; } - + /// /// The name of the stream to read events from. /// public string StreamName { get; } - + /// /// The name of the persistent subscription group. /// @@ -178,17 +223,22 @@ async IAsyncEnumerable GetMessages() { } } - internal PersistentSubscriptionResult(string streamName, string groupName, - Func> selectCallInvoker, + internal PersistentSubscriptionResult( + string streamName, string groupName, + Func> selectChannelInfo, ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials, - CancellationToken cancellationToken) { + CancellationToken cancellationToken + ) { StreamName = streamName; - GroupName = groupName; - + GroupName = groupName; + _request = request; - - _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials, - cancellationToken: cancellationToken); + + _callOptions = EventStoreCallOptions.CreateStreaming( + settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken + ); _channel = Channel.CreateBounded(ReadBoundedChannelOptions); @@ -200,25 +250,42 @@ internal PersistentSubscriptionResult(string streamName, string groupName, async Task PumpMessages() { try { - var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); - var client = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( - callInvoker); + var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false); + var client = + new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( + channelInfo.CallInvoker + ); + _call = client.Read(_callOptions); await _call.RequestStream.WriteAsync(_request).ConfigureAwait(false); await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token) .ConfigureAwait(false)) { - await _channel.Writer.WriteAsync(response.ContentCase switch { + PersistentSubscriptionMessage subscriptionMessage = response.ContentCase switch { SubscriptionConfirmation => new PersistentSubscriptionMessage.SubscriptionConfirmation( - response.SubscriptionConfirmation.SubscriptionId), - Event => new PersistentSubscriptionMessage.Event(ConvertToResolvedEvent(response), + response.SubscriptionConfirmation.SubscriptionId + ), + Event => new PersistentSubscriptionMessage.Event( + ConvertToResolvedEvent(response), response.Event.CountCase switch { ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount, - _ => null - }), + _ => null + } + ), _ => PersistentSubscriptionMessage.Unknown.Instance - }, _cts.Token).ConfigureAwait(false); + }; + + if (subscriptionMessage is PersistentSubscriptionMessage.Event evnt) + EventStoreClientDiagnostics.TraceSubscriptionEvent( + SubscriptionId, + evnt.ResolvedEvent, + channelInfo, + settings, + userCredentials + ); + + await _channel.Writer.WriteAsync(subscriptionMessage, _cts.Token).ConfigureAwait(false); } _channel.Writer.TryComplete(); @@ -240,11 +307,15 @@ when rex2.Status.Detail.Contains("No grpc-status found on response"): } #endif if (ex is PersistentSubscriptionNotFoundException) { - await _channel.Writer.WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, - cancellationToken).ConfigureAwait(false); + await _channel.Writer.WriteAsync( + PersistentSubscriptionMessage.NotFound.Instance, + cancellationToken + ).ConfigureAwait(false); + _channel.Writer.TryComplete(); return; } + _channel.Writer.TryComplete(ex); } } @@ -280,7 +351,6 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) => public Task Ack(IEnumerable resolvedEvents) => Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId)); - /// /// Acknowledge that a message has failed processing (this will tell the server it has not been processed). /// @@ -298,58 +368,72 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par /// A reason given. /// The s to nak. There should not be more than 2000 to nak at a time. /// The number of resolvedEvents exceeded the limit of 2000. - public Task Nack(PersistentSubscriptionNakEventAction action, string reason, - params ResolvedEvent[] resolvedEvents) => Nack(action, reason, - Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)); + public Task Nack( + PersistentSubscriptionNakEventAction action, string reason, + params ResolvedEvent[] resolvedEvents + ) => Nack( + action, + reason, + Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId) + ); private static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new( ConvertToEventRecord(response.Event.Event)!, ConvertToEventRecord(response.Event.Link), response.Event.PositionCase switch { ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition, - _ => null - }); + _ => null + } + ); private Task AckInternal(params Uuid[] eventIds) { if (eventIds.Length > MaxEventIdLength) { throw new ArgumentException( - $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", nameof(eventIds)); + $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", + nameof(eventIds) + ); } return _call is null ? throw new InvalidOperationException() - : _call.RequestStream.WriteAsync(new ReadReq { - Ack = new ReadReq.Types.Ack { - Ids = { - Array.ConvertAll(eventIds, id => id.ToDto()) + : _call.RequestStream.WriteAsync( + new ReadReq { + Ack = new ReadReq.Types.Ack { + Ids = { + Array.ConvertAll(eventIds, id => id.ToDto()) + } } } - }); + ); } private Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) { if (eventIds.Length > MaxEventIdLength) { throw new ArgumentException( - $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", nameof(eventIds)); + $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", + nameof(eventIds) + ); } return _call is null ? throw new InvalidOperationException() - : _call.RequestStream.WriteAsync(new ReadReq { - Nack = new ReadReq.Types.Nack { - Ids = { - Array.ConvertAll(eventIds, id => id.ToDto()) - }, - Action = action switch { - PersistentSubscriptionNakEventAction.Park => ReadReq.Types.Nack.Types.Action.Park, - PersistentSubscriptionNakEventAction.Retry => ReadReq.Types.Nack.Types.Action.Retry, - PersistentSubscriptionNakEventAction.Skip => ReadReq.Types.Nack.Types.Action.Skip, - PersistentSubscriptionNakEventAction.Stop => ReadReq.Types.Nack.Types.Action.Stop, - _ => ReadReq.Types.Nack.Types.Action.Unknown - }, - Reason = reason + : _call.RequestStream.WriteAsync( + new ReadReq { + Nack = new ReadReq.Types.Nack { + Ids = { + Array.ConvertAll(eventIds, id => id.ToDto()) + }, + Action = action switch { + PersistentSubscriptionNakEventAction.Park => ReadReq.Types.Nack.Types.Action.Park, + PersistentSubscriptionNakEventAction.Retry => ReadReq.Types.Nack.Types.Action.Retry, + PersistentSubscriptionNakEventAction.Skip => ReadReq.Types.Nack.Types.Action.Skip, + PersistentSubscriptionNakEventAction.Stop => ReadReq.Types.Nack.Types.Action.Stop, + _ => ReadReq.Types.Nack.Types.Action.Unknown + }, + Reason = reason + } } - }); + ); } private static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => @@ -362,7 +446,8 @@ e is null new Position(e.CommitPosition, e.PreparePosition), e.Metadata, e.Data.ToByteArray(), - e.CustomMetadata.ToByteArray()); + e.CustomMetadata.ToByteArray() + ); /// public async ValueTask DisposeAsync() { @@ -375,9 +460,11 @@ static async Task CastAndDispose(IDisposable? resource) { switch (resource) { case null: return; + case IAsyncDisposable resourceAsyncDisposable: await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false); break; + default: resource.Dispose(); break; @@ -385,7 +472,6 @@ static async Task CastAndDispose(IDisposable? resource) { } } - /// public void Dispose() { _cts.Dispose(); @@ -394,7 +480,8 @@ public void Dispose() { /// public async IAsyncEnumerator GetAsyncEnumerator( - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { await foreach (var message in Messages.WithCancellation(cancellationToken)) { if (message is not PersistentSubscriptionMessage.Event(var resolvedEvent, _)) { continue; diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 9d0842855..ae82c98f4 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -1,14 +1,13 @@ -using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; +using System.Diagnostics; using System.Threading.Channels; using Google.Protobuf; using EventStore.Client.Streams; using Grpc.Core; using Microsoft.Extensions.Logging; -using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; namespace EventStore.Client { public partial class EventStoreClient { @@ -30,24 +29,30 @@ public async Task AppendToStreamAsync( Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var options = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(options); _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision); - var batchAppender = _streamAppender; var task = - userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false) - ? batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) + userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false) + ? _batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) : AppendToStreamInternal( - (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker, + await GetChannelInfo(cancellationToken).ConfigureAwait(false), new AppendReq { Options = new AppendReq.Types.Options { StreamIdentifier = streamName, Revision = expectedRevision } - }, eventData, options, deadline, userCredentials, cancellationToken); + }, + eventData, + options, + deadline, + userCredentials, + cancellationToken + ); return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options); } @@ -70,29 +75,35 @@ public async Task AppendToStreamAsync( Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var operationOptions = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(operationOptions); - _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedState); + _log.LogDebug("Append to stream - {streamName}@{expectedState}.", streamName, expectedState); - var batchAppender = _streamAppender; var task = - userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false) - ? batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken) + userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false) + ? _batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken) : AppendToStreamInternal( - (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker, + await GetChannelInfo(cancellationToken).ConfigureAwait(false), new AppendReq { Options = new AppendReq.Types.Options { StreamIdentifier = streamName } - }.WithAnyStreamRevision(expectedState), eventData, operationOptions, deadline, userCredentials, - cancellationToken); + }.WithAnyStreamRevision(expectedState), + eventData, + operationOptions, + deadline, + userCredentials, + cancellationToken + ); + return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions); } - private async ValueTask AppendToStreamInternal( - CallInvoker callInvoker, + private ValueTask AppendToStreamInternal( + ChannelInfo channelInfo, AppendReq header, IEnumerable eventData, EventStoreClientOperationOptions operationOptions, @@ -100,39 +111,54 @@ private async ValueTask AppendToStreamInternal( UserCredentials? userCredentials, CancellationToken cancellationToken ) { - using var call = new Streams.Streams.StreamsClient(callInvoker).Append( - EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + return EventStoreClientDiagnostics.TraceOperation( + Operation, + TracingConstants.Operations.Append, + new ActivityTagsCollection { + { + TelemetryAttributes.EventStoreStream, + header.Options.StreamIdentifier.StreamName.ToStringUtf8() + } + } + .WithTagsFrom(channelInfo, Settings) + .WithTagsFrom(userCredentials) ); - await call.RequestStream.WriteAsync(header).ConfigureAwait(false); + async ValueTask Operation() { + using var call = new Streams.Streams.StreamsClient(channelInfo.CallInvoker).Append( + EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); + + await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - foreach (var e in eventData) { - await call.RequestStream.WriteAsync( - new AppendReq { + foreach (var e in eventData) { + var appendReq = new AppendReq { ProposedMessage = new AppendReq.Types.ProposedMessage { Id = e.EventId.ToDto(), Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingMetadata()), Metadata = { { Constants.Metadata.Type, e.Type }, { Constants.Metadata.ContentType, e.ContentType } } - }, - } - ).ConfigureAwait(false); - } + } + }; - await call.RequestStream.CompleteAsync().ConfigureAwait(false); + await call.RequestStream.WriteAsync(appendReq).ConfigureAwait(false); + } - var response = await call.ResponseAsync.ConfigureAwait(false); + await call.RequestStream.CompleteAsync().ConfigureAwait(false); + + var response = await call.ResponseAsync.ConfigureAwait(false); - if (response.Success != null) - return HandleSuccessAppend(response, header); + if (response.Success != null) + return HandleSuccessAppend(response, header); - if (response.WrongExpectedVersion == null) - throw new InvalidOperationException("The operation completed with an unexpected result."); + if (response.WrongExpectedVersion == null) + throw new InvalidOperationException("The operation completed with an unexpected result."); - return HandleWrongExpectedRevision(response, header, operationOptions); + return HandleWrongExpectedRevision(response, header, operationOptions); + } } private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { @@ -150,7 +176,8 @@ private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) "Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.", header.Options.StreamIdentifier, position, - currentRevision); + currentRevision + ); return new SuccessResult(currentRevision, position); } @@ -224,144 +251,203 @@ private class StreamAppender : IDisposable { private readonly Action _onException; private readonly Channel _channel; private readonly ConcurrentDictionary> _pendingRequests; + private readonly TaskCompletionSource _isUsable; - private readonly Task?> _callTask; + private ChannelInfo? _channelInfo; + AsyncDuplexStreamingCall? _call; - public StreamAppender(EventStoreClientSettings settings, - Task?> callTask, CancellationToken cancellationToken, - Action onException) { + public StreamAppender( + EventStoreClientSettings settings, + ValueTask channelInfoTask, + CancellationToken cancellationToken, + Action onException + ) { _settings = settings; - _callTask = callTask; _cancellationToken = cancellationToken; _onException = onException; - _channel = System.Threading.Channels.Channel.CreateBounded(10000); + _channel = Channel.CreateBounded(10000); _pendingRequests = new ConcurrentDictionary>(); - _ = Task.Factory.StartNew(Send); - _ = Task.Factory.StartNew(Receive); + _isUsable = new TaskCompletionSource(); + + _ = Task.Run(() => Duplex(channelInfoTask), cancellationToken); } - public ValueTask Append(string streamName, StreamRevision expectedStreamPosition, - IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) => - AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter), - events, cancellationToken); + public ValueTask Append( + string streamName, StreamRevision expectedStreamPosition, + IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default + ) => + AppendInternal( + BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter), + events, + cancellationToken + ); - public ValueTask Append(string streamName, StreamState expectedStreamState, - IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) => - AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter), - events, cancellationToken); + public ValueTask Append( + string streamName, StreamState expectedStreamState, + IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default + ) => + AppendInternal( + BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter), + events, + cancellationToken + ); - public async ValueTask IsUsable() { - var call = await _callTask.ConfigureAwait(false); - return call != null; - } + public Task IsUsable() => _isUsable.Task; + + private ValueTask AppendInternal( + BatchAppendReq.Types.Options options, + IEnumerable events, CancellationToken cancellationToken + ) { + return EventStoreClientDiagnostics.TraceOperation( + Operation, + TracingConstants.Operations.Append, + new ActivityTagsCollection { + { + TelemetryAttributes.EventStoreStream, options.StreamIdentifier.StreamName.ToStringUtf8() + } + } + .WithTagsFrom(_channelInfo, _settings) + ); - private async Task Receive() { - try { - var call = await _callTask.ConfigureAwait(false); - if (call is null) { - _channel.Writer.TryComplete( - new NotSupportedException("Server does not support batch append")); - return; - } + async ValueTask Operation() { + var correlationId = Uuid.NewUuid(); - await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken) - .ConfigureAwait(false)) { - if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) { - continue; // TODO: Log? - } + var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource()); - try { - writeResult.TrySetResult(response.ToWriteResult()); - } catch (Exception ex) { - writeResult.TrySetException(ex); + try { + foreach (var appendRequest in GetRequests(events, options, correlationId)) { + await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); } + } catch (ChannelClosedException ex) { + // channel is closed, our tcs won't necessarily get completed, don't wait for it. + throw ex.InnerException ?? ex; } - } catch (Exception ex) { - // signal that no tcs added to _pendingRequests after this point will necessarily complete - _channel.Writer.TryComplete(ex); - // complete whatever tcs's we have - _onException(ex); - foreach (var request in _pendingRequests) { - request.Value.TrySetException(ex); - } + return await complete.Task.ConfigureAwait(false); } } - private async Task Send() { - var call = await _callTask.ConfigureAwait(false); - if (call is null) - throw new NotSupportedException("Server does not support batch append"); + private async Task Duplex( + ValueTask channelInfoTask + ) { + try { + _channelInfo = await channelInfoTask.ConfigureAwait(false); + if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) { + _channel.Writer.TryComplete(new NotSupportedException("Server does not support batch append")); + _isUsable.TrySetResult(false); + return; + } - await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) - .ConfigureAwait(false)) { - await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); - } + _call = new Streams.Streams.StreamsClient(_channelInfo.CallInvoker).BatchAppend( + EventStoreCallOptions.CreateStreaming( + _settings, + userCredentials: _settings.DefaultCredentials, + cancellationToken: _cancellationToken + ) + ); - await call.RequestStream.CompleteAsync().ConfigureAwait(false); - } + _ = Task.Run(Send, _cancellationToken); + _ = Task.Run(Receive, _cancellationToken); - private async ValueTask AppendInternal(BatchAppendReq.Types.Options options, - IEnumerable events, CancellationToken cancellationToken) { - var batchSize = 0; - var correlationId = Uuid.NewUuid(); - var correlationIdDto = correlationId.ToDto(); + _isUsable.TrySetResult(true); + } catch (Exception ex) { + _isUsable.TrySetException(ex); + _onException(ex); + } - var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource()); + return; - try { - foreach (var appendRequest in GetRequests()) { - await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); + async Task Send() { + if (_call == null) return; + + await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); } - } catch (ChannelClosedException ex) { - // channel is closed, our tcs won't necessarily get completed, don't wait for it. - throw ex.InnerException ?? ex; + + await _call.RequestStream.CompleteAsync().ConfigureAwait(false); } - return await complete.Task.ConfigureAwait(false); + async Task Receive() { + if (_call == null) return; + + try { + await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + if (!_pendingRequests.TryRemove( + Uuid.FromDto(response.CorrelationId), + out var writeResult + )) { + continue; // TODO: Log? + } - IEnumerable GetRequests() { - bool first = true; - var proposedMessages = new List(); - foreach (var @event in events) { - var proposedMessage = new BatchAppendReq.Types.ProposedMessage { - Data = ByteString.CopyFrom(@event.Data.Span), - CustomMetadata = ByteString.CopyFrom(@event.Metadata.Span), - Id = @event.EventId.ToDto(), - Metadata = { - {Constants.Metadata.Type, @event.Type}, - {Constants.Metadata.ContentType, @event.ContentType} + try { + writeResult.TrySetResult(response.ToWriteResult()); + } catch (Exception ex) { + writeResult.TrySetException(ex); } - }; + } + } catch (Exception ex) { + // signal that no tcs added to _pendingRequests after this point will necessarily complete + _channel.Writer.TryComplete(ex); - proposedMessages.Add(proposedMessage); + // complete whatever tcs's we have + foreach (var request in _pendingRequests) + request.Value.TrySetException(ex); - if ((batchSize += proposedMessage.CalculateSize()) < - _settings.OperationOptions.BatchAppendSize) { - continue; + _onException(ex); + } + } + } + + private IEnumerable GetRequests( + IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId + ) { + var batchSize = 0; + bool first = true; + var correlationIdDto = correlationId.ToDto(); + var proposedMessages = new List(); + + foreach (var @event in events) { + var proposedMessage = new BatchAppendReq.Types.ProposedMessage { + Data = ByteString.CopyFrom(@event.Data.Span), + CustomMetadata = ByteString.CopyFrom(@event.Metadata.InjectTracingMetadata()), + Id = @event.EventId.ToDto(), + Metadata = { + { Constants.Metadata.Type, @event.Type }, + { Constants.Metadata.ContentType, @event.ContentType } } + }; + + proposedMessages.Add(proposedMessage); - yield return new BatchAppendReq { - ProposedMessages = {proposedMessages}, - CorrelationId = correlationIdDto, - Options = first ? options : null - }; - first = false; - proposedMessages.Clear(); - batchSize = 0; + if ((batchSize += proposedMessage.CalculateSize()) < + _settings.OperationOptions.BatchAppendSize) { + continue; } yield return new BatchAppendReq { - ProposedMessages = {proposedMessages}, - IsFinal = true, + ProposedMessages = { proposedMessages }, CorrelationId = correlationIdDto, Options = first ? options : null }; + + first = false; + proposedMessages.Clear(); + batchSize = 0; } + + yield return new BatchAppendReq { + ProposedMessages = { proposedMessages }, + IsFinal = true, + CorrelationId = correlationIdDto, + Options = first ? options : null + }; } public void Dispose() { _channel.Writer.TryComplete(); + _call?.Dispose(); } } } diff --git a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs index 6581bd94b..19de629e7 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs @@ -97,7 +97,7 @@ private async Task SetStreamMetadataInternal(StreamMetadata metada CancellationToken cancellationToken) { var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] { + return await AppendToStreamInternal(channelInfo, appendReq, new[] { new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata, JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)), }, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false); diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index f82bd5d07..7a2f7fdbc 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -1,4 +1,5 @@ using System.Threading.Channels; +using EventStore.Client.Diagnostics; using EventStore.Client.Streams; using Grpc.Core; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; @@ -24,10 +25,15 @@ public Task SubscribeToAllAsync( Action? subscriptionDropped = default, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => StreamSubscription.Confirm( + CancellationToken cancellationToken = default + ) => StreamSubscription.Confirm( SubscribeToAll(start, resolveLinkTos, filterOptions, userCredentials, cancellationToken), - eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached, - cancellationToken: cancellationToken); + eventAppeared, + subscriptionDropped, + _log, + filterOptions?.CheckpointReached, + cancellationToken: cancellationToken + ); /// /// Subscribes to all events. @@ -43,19 +49,23 @@ public StreamSubscriptionResult SubscribeToAll( bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => new(async _ => { - var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return channelInfo.CallInvoker; - }, new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), - Filter = GetFilterOptions(filterOptions)!, - UuidOption = new() { Structured = new() } - } - }, Settings, userCredentials, cancellationToken); + CancellationToken cancellationToken = default + ) => new( + async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false), + new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + Filter = GetFilterOptions(filterOptions)!, + UuidOption = new() { Structured = new() } + } + }, + Settings, + userCredentials, + cancellationToken + ); /// /// Subscribes to a stream from a checkpoint. @@ -69,15 +79,21 @@ public StreamSubscriptionResult SubscribeToAll( /// The optional . /// [Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.", false)] - public Task SubscribeToStreamAsync(string streamName, - FromStream start, - Func eventAppeared, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => StreamSubscription.Confirm( + public Task SubscribeToStreamAsync( + string streamName, + FromStream start, + Func eventAppeared, + bool resolveLinkTos = false, + Action? subscriptionDropped = default, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) => StreamSubscription.Confirm( SubscribeToStream(streamName, start, resolveLinkTos, userCredentials, cancellationToken), - eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken); + eventAppeared, + subscriptionDropped, + _log, + cancellationToken: cancellationToken + ); /// /// Subscribes to a stream from a checkpoint. @@ -93,28 +109,33 @@ public StreamSubscriptionResult SubscribeToStream( FromStream start, bool resolveLinkTos = false, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => new(async _ => { - var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); - return channelInfo.CallInvoker; - }, new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), - UuidOption = new() { Structured = new() } - } - }, Settings, userCredentials, cancellationToken); + CancellationToken cancellationToken = default + ) => new( + async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false), + new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + UuidOption = new() { Structured = new() } + } + }, + Settings, + userCredentials, + cancellationToken + ); /// /// A class that represents the result of a subscription operation. You may either enumerate this instance directly or . Do not enumerate more than once. /// public class StreamSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable { - private readonly ReadReq _request; - private readonly Channel _channel; - private readonly CancellationTokenSource _cts; - private readonly CallOptions _callOptions; - private AsyncServerStreamingCall? _call; + private readonly ReadReq _request; + private readonly Channel _channel; + private readonly CancellationTokenSource _cts; + private readonly CallOptions _callOptions; + private readonly EventStoreClientSettings _settings; + private AsyncServerStreamingCall? _call; private int _messagesEnumerated; @@ -150,12 +171,18 @@ async IAsyncEnumerable GetMessages() { } } - internal StreamSubscriptionResult(Func> selectCallInvoker, + internal StreamSubscriptionResult( + Func> selectChannelInfo, ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials, - CancellationToken cancellationToken) { - _request = request; - _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials, - cancellationToken: cancellationToken); + CancellationToken cancellationToken + ) { + _request = request; + _settings = settings; + _callOptions = EventStoreCallOptions.CreateStreaming( + settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken + ); _channel = Channel.CreateBounded(ReadBoundedChannelOptions); @@ -171,29 +198,53 @@ internal StreamSubscriptionResult(Func> sel async Task PumpMessages() { try { - var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); - var client = new Streams.Streams.StreamsClient(callInvoker); + var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false); + var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker); _call = client.Read(_request, _callOptions); await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token) .ConfigureAwait(false)) { - await _channel.Writer.WriteAsync(response.ContentCase switch { - Confirmation => new StreamMessage.SubscriptionConfirmation( - response.Confirmation.SubscriptionId), - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), - FirstStreamPosition => new StreamMessage.FirstStreamPosition( - new StreamPosition(response.FirstStreamPosition)), - LastStreamPosition => new StreamMessage.LastStreamPosition( - new StreamPosition(response.LastStreamPosition)), - LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( - new Position(response.LastAllStreamPosition.CommitPosition, - response.LastAllStreamPosition.PreparePosition)), - Checkpoint => new StreamMessage.AllStreamCheckpointReached( - new Position(response.Checkpoint.CommitPosition, - response.Checkpoint.PreparePosition)), - CaughtUp => StreamMessage.CaughtUp.Instance, - FellBehind => StreamMessage.FellBehind.Instance, - _ => StreamMessage.Unknown.Instance - }, _cts.Token).ConfigureAwait(false); + StreamMessage subscriptionMessage = + response.ContentCase switch { + Confirmation => new StreamMessage.SubscriptionConfirmation( + response.Confirmation.SubscriptionId + ), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + FirstStreamPosition => new StreamMessage.FirstStreamPosition( + new StreamPosition(response.FirstStreamPosition) + ), + LastStreamPosition => new StreamMessage.LastStreamPosition( + new StreamPosition(response.LastStreamPosition) + ), + LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( + new Position( + response.LastAllStreamPosition.CommitPosition, + response.LastAllStreamPosition.PreparePosition + ) + ), + Checkpoint => new StreamMessage.AllStreamCheckpointReached( + new Position( + response.Checkpoint.CommitPosition, + response.Checkpoint.PreparePosition + ) + ), + CaughtUp => StreamMessage.CaughtUp.Instance, + FellBehind => StreamMessage.FellBehind.Instance, + _ => StreamMessage.Unknown.Instance + }; + + if (subscriptionMessage is StreamMessage.Event evnt) + EventStoreClientDiagnostics.TraceSubscriptionEvent( + SubscriptionId, + evnt.ResolvedEvent, + channelInfo, + _settings, + userCredentials + ); + + await _channel.Writer.WriteAsync( + subscriptionMessage, + _cts.Token + ).ConfigureAwait(false); } _channel.Writer.Complete(); @@ -214,9 +265,11 @@ static async ValueTask CastAndDispose(IDisposable? resource) { switch (resource) { case null: return; + case IAsyncDisposable resourceAsyncDisposable: await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false); break; + default: resource.Dispose(); break; @@ -232,7 +285,8 @@ public void Dispose() { /// public async IAsyncEnumerator GetAsyncEnumerator( - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { try { await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken) .ConfigureAwait(false)) { diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs index 361e6e2d4..700df3c3f 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.cs @@ -1,6 +1,5 @@ using System.Text.Json; using System.Threading.Channels; -using EventStore.Client.Streams; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -12,32 +11,48 @@ namespace EventStore.Client { /// The client used for operations on streams. /// public sealed partial class EventStoreClient : EventStoreClientBase { - private static readonly JsonSerializerOptions StreamMetadataJsonSerializerOptions = new() { Converters = { StreamMetadataJsonConverter.Instance }, }; - private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) { - SingleReader = true, - SingleWriter = true, + private static BoundedChannelOptions ReadBoundedChannelOptions = new(1) { + SingleReader = true, + SingleWriter = true, AllowSynchronousContinuations = true }; - - private readonly ILogger _log; - private Lazy _streamAppenderLazy; - private StreamAppender _streamAppender => _streamAppenderLazy.Value; - private readonly CancellationTokenSource _disposedTokenSource; + private readonly ILogger _log; + private Lazy _batchAppenderLazy; + private StreamAppender _batchAppender => _batchAppenderLazy.Value; + private readonly CancellationTokenSource _disposedTokenSource; private static readonly Dictionary> ExceptionMap = new() { [Constants.Exceptions.InvalidTransaction] = ex => new InvalidTransactionException(ex.Message, ex), - [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", ex), - [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), ex, ex.Message), - [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), ex), - [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex), - [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, ex), + [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", + ex + ), + [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, + ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), + ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), + ex, + ex.Message + ), + [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException( + ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), + ex + ), + [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, + ex + ), + [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException( + ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, + ex + ), }; /// @@ -53,29 +68,24 @@ public EventStoreClient(IOptions options) : this(optio public EventStoreClient(EventStoreClientSettings? settings = null) : base(settings, ExceptionMap) { _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); _disposedTokenSource = new CancellationTokenSource(); - _streamAppenderLazy = new Lazy(CreateStreamAppender); + _batchAppenderLazy = new Lazy(CreateStreamAppender); } private void SwapStreamAppender(Exception ex) => - Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value.Dispose(); + Interlocked.Exchange(ref _batchAppenderLazy, new Lazy(CreateStreamAppender)).Value + .Dispose(); // todo: might be nice to have two different kinds of appenders and we decide which to instantiate according to the server caps. - private StreamAppender CreateStreamAppender() { - return new StreamAppender(Settings, GetCall(), _disposedTokenSource.Token, SwapStreamAppender); - - async Task?> GetCall() { - var channelInfo = await GetChannelInfo(_disposedTokenSource.Token).ConfigureAwait(false); - if (!channelInfo.ServerCapabilities.SupportsBatchAppend) - return null; - - var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker); - - return client.BatchAppend(EventStoreCallOptions.CreateStreaming(Settings, - userCredentials: Settings.DefaultCredentials, cancellationToken: _disposedTokenSource.Token)); - } - } - - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) { + private StreamAppender CreateStreamAppender() => new StreamAppender( + Settings, + GetChannelInfo(_disposedTokenSource.Token), + _disposedTokenSource.Token, + SwapStreamAppender + ); + + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( + IEventFilter? filter, uint checkpointInterval = 0 + ) { if (filter == null) { return null; } @@ -131,21 +141,25 @@ private StreamAppender CreateStreamAppender() { return options; } - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions) + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( + SubscriptionFilterOptions? filterOptions + ) => filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval); /// public override void Dispose() { - if (_streamAppenderLazy.IsValueCreated) - _streamAppenderLazy.Value.Dispose(); + if (_batchAppenderLazy.IsValueCreated) + _batchAppenderLazy.Value.Dispose(); + _disposedTokenSource.Dispose(); base.Dispose(); } /// public override async ValueTask DisposeAsync() { - if (_streamAppenderLazy.IsValueCreated) - _streamAppenderLazy.Value.Dispose(); + if (_batchAppenderLazy.IsValueCreated) + _batchAppenderLazy.Value.Dispose(); + _disposedTokenSource.Dispose(); await base.DisposeAsync().ConfigureAwait(false); } diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs index 9019eda64..c6b9e8ae6 100644 --- a/src/EventStore.Client.Streams/StreamSubscription.cs +++ b/src/EventStore.Client.Streams/StreamSubscription.cs @@ -7,26 +7,28 @@ namespace EventStore.Client { /// [Obsolete] public class StreamSubscription : IDisposable { - private readonly EventStoreClient.StreamSubscriptionResult _subscription; - private readonly IAsyncEnumerator _messages; - private readonly Func _eventAppeared; - private readonly Func _checkpointReached; + private readonly EventStoreClient.StreamSubscriptionResult _subscription; + private readonly IAsyncEnumerator _messages; + private readonly Func _eventAppeared; + private readonly Func _checkpointReached; private readonly Action? _subscriptionDropped; - private readonly ILogger _log; - private readonly CancellationTokenSource _cts; - private int _subscriptionDroppedInvoked; + private readonly ILogger _log; + private readonly CancellationTokenSource _cts; + private int _subscriptionDroppedInvoked; /// /// The id of the set by the server. /// public string SubscriptionId { get; } - internal static async Task Confirm(EventStoreClient.StreamSubscriptionResult subscription, + internal static async Task Confirm( + EventStoreClient.StreamSubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped, ILogger log, Func? checkpointReached = null, - CancellationToken cancellationToken = default) { + CancellationToken cancellationToken = default + ) { var messages = subscription.Messages; var enumerator = messages.GetAsyncEnumerator(cancellationToken); @@ -35,26 +37,36 @@ enumerator.Current is not StreamMessage.SubscriptionConfirmation(var subscriptio throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed."); } - return new StreamSubscription(subscription, enumerator, subscriptionId, eventAppeared, subscriptionDropped, - log, checkpointReached, cancellationToken); + return new StreamSubscription( + subscription, + enumerator, + subscriptionId, + eventAppeared, + subscriptionDropped, + log, + checkpointReached, + cancellationToken + ); } - private StreamSubscription(EventStoreClient.StreamSubscriptionResult subscription, + private StreamSubscription( + EventStoreClient.StreamSubscriptionResult subscription, IAsyncEnumerator messages, string subscriptionId, Func eventAppeared, Action? subscriptionDropped, ILogger log, Func? checkpointReached, - CancellationToken cancellationToken = default) { - _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _subscription = subscription; - _messages = messages; - _eventAppeared = eventAppeared; - _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask); - _subscriptionDropped = subscriptionDropped; - _log = log; + CancellationToken cancellationToken = default + ) { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _subscription = subscription; + _messages = messages; + _eventAppeared = eventAppeared; + _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask); + _subscriptionDropped = subscriptionDropped; + _log = log; _subscriptionDroppedInvoked = 0; - SubscriptionId = subscriptionId; + SubscriptionId = subscriptionId; _log.LogDebug("Subscription {subscriptionId} confirmed.", SubscriptionId); @@ -77,11 +89,14 @@ private async Task Subscribe() { resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position ); + await _eventAppeared(this, resolvedEvent, _cts.Token).ConfigureAwait(false); break; + case StreamMessage.AllStreamCheckpointReached (var position): await _checkpointReached(this, position, _cts.Token) .ConfigureAwait(false); + break; } } catch (Exception ex) when @@ -105,6 +120,7 @@ await _checkpointReached(this, position, _cts.Token) "Subscription {subscriptionId} was dropped because the subscriber made an error.", SubscriptionId ); + SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex); return; @@ -114,13 +130,18 @@ await _checkpointReached(this, position, _cts.Token) ex.Status.Detail.Contains("Call canceled by the client.")) { _log.LogInformation( "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); } catch (Exception ex) { if (_subscriptionDroppedInvoked == 0) { - _log.LogError(ex, + _log.LogError( + ex, "Subscription {subscriptionId} was dropped because an error occurred on the server.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); } } diff --git a/src/EventStore.Client/Diagnostics/ActivityExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs new file mode 100644 index 000000000..564aa98fe --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs @@ -0,0 +1,45 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Diagnostics; + +static class ActivityExtensions { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static TracingMetadata GetTracingMetadata(this Activity activity) + => new(activity.TraceId.ToString(), activity.SpanId.ToString()); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void SetActivityStatus(this Activity activity, ActivityStatus activityStatus) { + var (activityStatusCode, description, exception) = activityStatus; + + var statusCode = activityStatusCode switch { + ActivityStatusCode.Error => "ERROR", + ActivityStatusCode.Ok => "OK", + _ => "UNSET" + }; + + activity.SetStatus(activityStatus.StatusCode, description); + activity.SetTag(TelemetryAttributes.OtelStatusCode, statusCode); + activity.SetTag(TelemetryAttributes.OtelStatusDescription, description); + + if (exception != null) + activity.SetException(exception); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static void SetException(this Activity activity, Exception exception) { + var tags = new ActivityTagsCollection { + { TelemetryAttributes.ExceptionType, exception.GetType().Name }, + { TelemetryAttributes.ExceptionMessage, $"{exception.Message} {exception.InnerException?.Message}" }, + { TelemetryAttributes.ExceptionStacktrace, exception.StackTrace } + }; + + foreach (var tag in tags) { + activity.SetTag(tag.Key, tag.Value); + } + + activity.AddEvent(new ActivityEvent(TelemetryAttributes.ExceptionEventName, DateTimeOffset.Now, tags)); + } +} diff --git a/src/EventStore.Client/Diagnostics/ActivityStatus.cs b/src/EventStore.Client/Diagnostics/ActivityStatus.cs new file mode 100644 index 000000000..9470f7b59 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityStatus.cs @@ -0,0 +1,11 @@ +using System.Diagnostics; + +namespace EventStore.Client.Diagnostics; + +record ActivityStatus(ActivityStatusCode StatusCode, string? Description, Exception? Exception) { + public static ActivityStatus Ok(string? description = null) + => new(ActivityStatusCode.Ok, description, null); + + public static ActivityStatus Error(Exception ex, string? description = null) + => new(ActivityStatusCode.Error, description, ex); +} diff --git a/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs new file mode 100644 index 000000000..e7efaf5f7 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs @@ -0,0 +1,51 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using EventStore.Client.Diagnostics.Telemetry; + +namespace EventStore.Client.Diagnostics; + +static class ActivityTagsCollectionExtensions { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ActivityTagsCollection WithTagsFrom( + this ActivityTagsCollection tags, ChannelInfo? channelInfo, EventStoreClientSettings settings + ) { + ActivityTagsCollection? serverTags = null; + if (settings.ConnectivitySettings.DnsGossipSeeds?.Length == 1) { + // Ensure consistent server.address attribute when connecting to cluster via dns discovery + var gossipSeed = settings.ConnectivitySettings.DnsGossipSeeds[0]; + serverTags = CreateServerAttributes(gossipSeed.Host, gossipSeed.Port); + } else if (channelInfo != null) { + // Otherwise use the current gRPC channel target + var authorityParts = channelInfo.Channel.Target.Split(':'); + serverTags = CreateServerAttributes(authorityParts[0], int.Parse(authorityParts[1])); + } + + return tags.WithTags(serverTags).WithTagsFrom(settings.DefaultCredentials); + + ActivityTagsCollection CreateServerAttributes(string? host, int? port) => new() { + { TelemetryAttributes.ServerAddress, host }, + { TelemetryAttributes.ServerPort, port } + }; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ActivityTagsCollection WithTagsFrom( + this ActivityTagsCollection tags, UserCredentials? userCredentials + ) { + return tags.WithTag(TelemetryAttributes.DatabaseUser, userCredentials?.Username); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ActivityTagsCollection WithTags(this ActivityTagsCollection current, ActivityTagsCollection? tags) + => tags == null + ? current + : tags.Aggregate(current, (newTags, tag) => newTags.WithTag(tag.Key, tag.Value)); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static ActivityTagsCollection WithTag(this ActivityTagsCollection tags, string key, object? value) { + if (value != null) + tags[key] = value; + + return tags; + } +} diff --git a/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs new file mode 100644 index 000000000..99f83c7e2 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs @@ -0,0 +1,84 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Text.Json; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Diagnostics; + +static class EventMetadataExtensions { + // TODO JC: TEMPORARY WORKAROUND CODE TO USE CUSTOM METADATA UNTIL DATABASE CHANGES ARE READY + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ReadOnlySpan InjectTracingMetadata( + this ReadOnlyMemory rawCustomMetadata + ) { + if (Activity.Current == null) return rawCustomMetadata.Span; + + try { + using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata); + var tracingMetadata = Activity.Current.GetTracingMetadata(); + + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + writer.WriteStartObject(); + + foreach (var prop in customMetadataJson.RootElement.EnumerateObject()) + prop.WriteTo(writer); + + writer.WriteIfNotNull(TracingConstants.Metadata.TraceId, tracingMetadata.TraceId) + .WriteIfNotNull(TracingConstants.Metadata.SpanId, tracingMetadata.SpanId); + + writer.WriteEndObject(); + writer.Flush(); + + return stream.ToArray(); + } catch (Exception) { + return rawCustomMetadata.Span; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ActivityContext? RestoreTracingContext(this ReadOnlyMemory rawCustomMetadata) { + var (traceId, spanId) = rawCustomMetadata.ExtractTracingMetadata(); + + if (traceId == null || spanId == null) + return default; + + try { + return new( + ActivityTraceId.CreateFromString(traceId.ToCharArray()), + ActivitySpanId.CreateFromString(spanId.ToCharArray()), + ActivityTraceFlags.Recorded, + isRemote: true + ); + } catch (Exception) { + return default; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory rawCustomMetadata) { + try { + using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata); + + return new TracingMetadata( + customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.TraceId).GetString(), + customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.SpanId).GetString() + ); + } catch (Exception) { + return TracingMetadata.None(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static Utf8JsonWriter WriteIfNotNull( + this Utf8JsonWriter jsonWriter, string key, string? value + ) { + if (string.IsNullOrEmpty(value)) return jsonWriter; + + jsonWriter.WritePropertyName(key); + jsonWriter.WriteStringValue(value); + + return jsonWriter; + } +} diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs new file mode 100644 index 000000000..98e4b96e2 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs @@ -0,0 +1,77 @@ +using System.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Diagnostics; + +static class EventStoreClientDiagnostics { + static readonly ActivitySource _activitySource = + new ActivitySource(EventStoreClientInstrumentation.ActivitySourceName); + + static readonly ActivityTagsCollection _defaultTags = [new(TelemetryAttributes.DatabaseSystem, "eventstoredb")]; + + public static Activity? StartActivity( + string operation, ActivityTagsCollection? tags, ActivityKind activityKind = default, + ActivityContext? activityContext = null + ) { + var activity = _activitySource.CreateActivity( + operation, + activityKind, + parentContext: activityContext ?? default, + new ActivityTagsCollection { + { TelemetryAttributes.DatabaseOperation, operation } + } + .WithTags(_defaultTags) + .WithTags(tags), + idFormat: ActivityIdFormat.W3C + ); + + return activity?.Start(); + } + + public static async ValueTask TraceOperation( + Func> tracedOperation, string operationName, ActivityTagsCollection? tags = null + ) { + using var activity = StartActivity(operationName, tags, ActivityKind.Client); + + try { + var res = await tracedOperation().ConfigureAwait(false); + activity?.SetActivityStatus(ActivityStatus.Ok()); + return res; + } catch (Exception ex) { + activity?.SetActivityStatus(ActivityStatus.Error(ex)); + throw; + } + } + + public static void TraceSubscriptionEvent( + string? subscriptionId, ResolvedEvent evnt, ChannelInfo channelInfo, + EventStoreClientSettings settings, UserCredentials? userCredentials + ) { + var restoredTracingContext = + evnt.OriginalEvent.Metadata.RestoreTracingContext(); + + if (restoredTracingContext != null) + StartActivity( + TracingConstants.Operations.Subscribe, + new ActivityTagsCollection { + { + TelemetryAttributes.EventStoreStream, + evnt.OriginalEvent.EventStreamId + }, { + TelemetryAttributes.EventStoreSubscriptionId, + subscriptionId + }, { + TelemetryAttributes.EventStoreEventId, + evnt.OriginalEvent.EventId.ToString() + }, { + TelemetryAttributes.EventStoreEventType, + evnt.OriginalEvent.EventType + } + }.WithTagsFrom(channelInfo, settings) + .WithTagsFrom(userCredentials), + ActivityKind.Consumer, + restoredTracingContext + )?.Dispose(); + } +} diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs new file mode 100644 index 000000000..7ae9b6ba1 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs @@ -0,0 +1,5 @@ +namespace EventStore.Client.Diagnostics; + +static class EventStoreClientInstrumentation { + public const string ActivitySourceName = "eventstoredb.client"; +} diff --git a/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs new file mode 100644 index 000000000..5b63d4e36 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs @@ -0,0 +1,29 @@ +namespace EventStore.Client.Diagnostics.Telemetry; + +// The attributes below match the specification of v1.24.0 of the Open Telemetry semantic conventions. +// Some attributes are ignored where not required or relevant. +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/general/trace.md +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/database/database-spans.md +// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/exceptions/exceptions-spans.md +static class TelemetryAttributes { + public const string DatabaseUser = "db.user"; + public const string DatabaseSystem = "db.system"; + public const string DatabaseOperation = "db.operation"; + + public const string ServerAddress = "server.address"; + public const string ServerPort = "server.port"; + + public const string ExceptionEventName = "exception"; + public const string ExceptionType = "exception.type"; + public const string ExceptionMessage = "exception.message"; + public const string ExceptionStacktrace = "exception.stacktrace"; + + public const string OtelStatusCode = "otel.status_code"; + public const string OtelStatusDescription = "otel.status_description"; + + // Custom attributes + public const string EventStoreStream = "db.eventstoredb.stream"; + public const string EventStoreSubscriptionId = "db.eventstoredb.subscription.id"; + public const string EventStoreEventId = "db.eventstoredb.event.id"; + public const string EventStoreEventType = "db.eventstoredb.event.type"; +} diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs new file mode 100644 index 000000000..2c2d22cf3 --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs @@ -0,0 +1,13 @@ +namespace EventStore.Client.Diagnostics.Tracing; + +static class TracingConstants { + public static class Metadata { + public const string TraceId = "$traceId"; + public const string SpanId = "$spanId"; + } + + public static class Operations { + public const string Append = "streams.append"; + public const string Subscribe = "streams.subscribe"; + } +} diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs new file mode 100644 index 000000000..f907cc49d --- /dev/null +++ b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs @@ -0,0 +1,5 @@ +namespace EventStore.Client.Diagnostics.Tracing; + +record TracingMetadata(string? TraceId, string? SpanId) { + public static TracingMetadata None() => new(null, null); +} diff --git a/src/EventStore.Client/EventStore.Client.csproj b/src/EventStore.Client/EventStore.Client.csproj index ddf36c6af..8c9ffaaf2 100644 --- a/src/EventStore.Client/EventStore.Client.csproj +++ b/src/EventStore.Client/EventStore.Client.csproj @@ -8,6 +8,7 @@ + diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs index 39f579fcc..2b4a5b2f6 100644 --- a/src/EventStore.Client/EventStoreClientBase.cs +++ b/src/EventStore.Client/EventStoreClientBase.cs @@ -1,10 +1,7 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; using EventStore.Client.Interceptors; using Grpc.Core; using Grpc.Core.Interceptors; +using Enum = System.Enum; namespace EventStore.Client { /// @@ -13,28 +10,29 @@ namespace EventStore.Client { public abstract class EventStoreClientBase : IDisposable, // for grpc.net we can dispose synchronously, but not for grpc.core IAsyncDisposable { - - private readonly Dictionary> _exceptionMap; - private readonly CancellationTokenSource _cts; - private readonly ChannelCache _channelCache; + private readonly Dictionary> _exceptionMap; + private readonly CancellationTokenSource _cts; + private readonly ChannelCache _channelCache; private readonly SharingProvider _channelInfoProvider; - private readonly Lazy _httpFallback; - + private readonly Lazy _httpFallback; + /// The name of the connection. public string ConnectionName { get; } - + /// The . protected EventStoreClientSettings Settings { get; } /// Constructs a new . - protected EventStoreClientBase(EventStoreClientSettings? settings, - Dictionary> exceptionMap) { - Settings = settings ?? new EventStoreClientSettings(); + protected EventStoreClientBase( + EventStoreClientSettings? settings, + Dictionary> exceptionMap + ) { + Settings = settings ?? new EventStoreClientSettings(); _exceptionMap = exceptionMap; - _cts = new CancellationTokenSource(); + _cts = new CancellationTokenSource(); _channelCache = new(Settings); _httpFallback = new Lazy(() => new HttpFallback(Settings)); - + ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}"; var channelSelector = new ChannelSelector(Settings, _channelCache); @@ -43,17 +41,18 @@ protected EventStoreClientBase(EventStoreClientSettings? settings, GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token), factoryRetryDelay: Settings.ConnectivitySettings.DiscoveryInterval, initialInput: ReconnectionRequired.Rediscover.Instance, - loggerFactory: Settings.LoggerFactory); + loggerFactory: Settings.LoggerFactory + ); } - + // Select a channel and query its capabilities. This is an expensive call that // we don't want to do often. private async Task GetChannelInfoExpensive( ReconnectionRequired reconnectionRequired, Action onReconnectionRequired, IChannelSelector channelSelector, - CancellationToken cancellationToken) { - + CancellationToken cancellationToken + ) { var channel = reconnectionRequired switch { ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken) .ConfigureAwait(false), @@ -78,11 +77,10 @@ private async Task GetChannelInfoExpensive( return new(channel, caps, invoker); } - + /// Gets the current channel info. protected async ValueTask GetChannelInfo(CancellationToken cancellationToken) => await _channelInfoProvider.CurrentAsync.WithCancellation(cancellationToken).ConfigureAwait(false); - /// /// Only exists so that we can manually trigger rediscovery in the tests @@ -95,20 +93,30 @@ internal Task RediscoverAsync() { } /// Returns the result of an HTTP Get request based on the client settings. - protected async Task HttpGet(string path, Action onNotFound, ChannelInfo channelInfo, - TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) { - + protected async Task HttpGet( + string path, Action onNotFound, ChannelInfo channelInfo, + TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken + ) { return await _httpFallback.Value .HttpGetAsync(path, channelInfo, deadline, userCredentials, onNotFound, cancellationToken) .ConfigureAwait(false); } /// Executes an HTTP Post request based on the client settings. - protected async Task HttpPost(string path, string query, Action onNotFound, ChannelInfo channelInfo, - TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) { - + protected async Task HttpPost( + string path, string query, Action onNotFound, ChannelInfo channelInfo, + TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken + ) { await _httpFallback.Value - .HttpPostAsync(path, query, channelInfo, deadline, userCredentials, onNotFound, cancellationToken) + .HttpPostAsync( + path, + query, + channelInfo, + deadline, + userCredentials, + onNotFound, + cancellationToken + ) .ConfigureAwait(false); } @@ -118,7 +126,7 @@ public virtual void Dispose() { _cts.Cancel(); _cts.Dispose(); _channelCache.Dispose(); - + if (_httpFallback.IsValueCreated) { _httpFallback.Value.Dispose(); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs new file mode 100644 index 000000000..597bd7401 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs @@ -0,0 +1,74 @@ +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.PersistentSubscriptions.Tests.Diagnostics; + +[Trait("Category", "Diagnostics:Tracing")] +public class PersistentSubscriptionsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) + : EventStoreTests(output, fixture) { + [Fact] + public async Task PersistentSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() { + var stream = Fixture.GetStreamName(); + var events = Fixture.CreateTestEventsWithWorkaroundCustomMetadata(2).ToArray(); + + var groupName = $"{stream}-group"; + await Fixture.Subscriptions.CreateToStreamAsync( + stream, + groupName, + new() + ); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + events + ); + + string? subscriptionId = null; + await Subscribe().WithTimeout(); + + var appendActivity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + var subscribeActivities = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream) + .ToArray(); + + subscriptionId.ShouldNotBeNull(); + subscribeActivities.Length.ShouldBe(events.Length); + + for (var i = 0; i < subscribeActivities.Length; i++) { + subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId); + subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId); + subscribeActivities[i].HasRemoteParent.ShouldBeTrue(); + + Fixture.AssertSubscriptionActivityHasExpectedTags( + subscribeActivities[i], + stream, + events[i].EventId.ToString(), + subscriptionId + ); + } + + return; + + async Task Subscribe() { + await using var subscription = Fixture.Subscriptions.SubscribeToStream(stream, groupName); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + + int eventsAppeared = 0; + while (await enumerator.MoveNextAsync()) { + if (enumerator.Current is PersistentSubscriptionMessage.SubscriptionConfirmation(var sid)) + subscriptionId = sid; + + if (enumerator.Current is not PersistentSubscriptionMessage.Event(_, _)) + continue; + + eventsAppeared++; + if (eventsAppeared >= events.Length) + return; + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs new file mode 100644 index 000000000..1868aef48 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs @@ -0,0 +1,10 @@ +using System.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.PersistentSubscriptions.Tests.Diagnostics; + +[Trait("Category", "Diagnostics:Tracing")] +public class TracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) + : EventStoreTests(output, fixture) { +} diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs index 989a6b6bf..6cd5b813d 100644 --- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs @@ -5,7 +5,8 @@ namespace EventStore.Client.Streams.Tests.Append; [Trait("Category", "Target:Stream")] [Trait("Category", "Operation:Append")] -public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) { +public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) + : EventStoreTests(output, fixture) { public static IEnumerable ExpectedVersionCreateStreamTestCases() { yield return new object?[] { StreamState.Any }; yield return new object?[] { StreamState.NoStream }; @@ -18,7 +19,12 @@ public async Task appending_zero_events(StreamState expectedStreamState) { const int iterations = 2; for (var i = 0; i < iterations; i++) { - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, expectedStreamState, Enumerable.Empty()); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + expectedStreamState, + Enumerable.Empty() + ); + writeResult.NextExpectedStreamRevision.ShouldBe(StreamRevision.None); } @@ -34,7 +40,12 @@ public async Task appending_zero_events_again(StreamState expectedStreamState) { const int iterations = 2; for (var i = 0; i < iterations; i++) { - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, expectedStreamState, Enumerable.Empty()); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + expectedStreamState, + Enumerable.Empty() + ); + Assert.Equal(StreamRevision.None, writeResult.NextExpectedStreamRevision); } @@ -87,7 +98,8 @@ public async Task multiple_idempotent_writes_with_same_id_bug_case() { } [Fact] - public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_any_then_next_expected_version_is_unreliable() { + public async Task + in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_any_then_next_expected_version_is_unreliable() { var stream = Fixture.GetStreamName(); var evnt = Fixture.CreateTestEvents().First(); @@ -103,7 +115,8 @@ public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same } [Fact] - public async Task in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_nostream_then_next_expected_version_is_correct() { + public async Task + in_case_where_multiple_writes_of_multiple_events_with_the_same_ids_using_expected_version_nostream_then_next_expected_version_is_correct() { var stream = Fixture.GetStreamName(); var evnt = Fixture.CreateTestEvents().First(); @@ -280,18 +293,20 @@ await Fixture.Streams.AppendToStreamAsync( } [Fact] - public async Task appending_with_stream_exists_expected_version_and_stream_does_not_exist_throws_wrong_expected_version() { + public async Task + appending_with_stream_exists_expected_version_and_stream_does_not_exist_throws_wrong_expected_version() { var stream = Fixture.GetStreamName(); var ex = await Fixture.Streams .AppendToStreamAsync(stream, StreamState.StreamExists, Fixture.CreateTestEvents()) .ShouldThrowAsync(); - + ex.ActualStreamRevision.ShouldBe(StreamRevision.None); } [Fact] - public async Task appending_with_stream_exists_expected_version_and_stream_does_not_exist_returns_wrong_expected_version() { + public async Task + appending_with_stream_exists_expected_version_and_stream_does_not_exist_returns_wrong_expected_version() { var stream = Fixture.GetStreamName(); var writeResult = await Fixture.Streams.AppendToStreamAsync( @@ -334,7 +349,11 @@ await Fixture.Streams public async Task can_append_multiple_events_at_once() { var stream = Fixture.GetStreamName(); - var writeResult = await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(100)); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents(100) + ); Assert.Equal(new(99), writeResult.NextExpectedStreamRevision); } @@ -387,7 +406,7 @@ public async Task returns_failure_status_when_conditionally_appending_to_a_delet Assert.Equal(ConditionalWriteResult.StreamDeleted, result); } - + [Fact] public async Task expected_version_no_stream() { var result = await Fixture.Streams.AppendToStreamAsync( @@ -409,7 +428,7 @@ public async Task expected_version_no_stream_returns_position() { Assert.True(result.LogPosition > Position.Start); } - + [Fact] public async Task with_timeout_any_stream_revision_fails_when_operation_expired() { var stream = Fixture.GetStreamName(); @@ -429,7 +448,7 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() { var stream = Fixture.GetStreamName(); await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, Fixture.CreateTestEvents()); - + var ex = await Fixture.Streams.AppendToStreamAsync( stream, new StreamRevision(0), @@ -448,32 +467,14 @@ await Fixture.Streams .AppendToStreamAsync( streamName, StreamRevision.None, - GetEvents(), + Fixture.CreateTestEventsThatThrowsException(), userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!) ) - .ShouldThrowAsync(); + .ShouldThrowAsync(); var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) .ReadState; state.ShouldBe(ReadState.StreamNotFound); - - return; - - IEnumerable GetEvents() { - for (var i = 0; i < 5; i++) { - if (i % 3 == 0) - throw new EnumerationFailedException(); - - yield return Fixture.CreateTestEvents(1).First(); - } - } - } - - class EnumerationFailedException : Exception { } - - public static IEnumerable ArgumentOutOfRangeTestCases() { - yield return new object?[] { StreamState.Any }; - yield return new object?[] { ulong.MaxValue - 1UL }; } } diff --git a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs new file mode 100644 index 000000000..9cec2fc99 --- /dev/null +++ b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs @@ -0,0 +1,103 @@ +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Streams.Tests.Diagnostics; + +[Trait("Category", "Diagnostics:Tracing")] +public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) + : EventStoreTests(output, fixture) { + [Fact] + public async Task AppendIsInstrumentedWithTracingAsExpected() { + var stream = Fixture.GetStreamName(); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents() + ); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + Fixture.AssertAppendActivityHasExpectedTags(activity, stream); + } + + [Fact] + public async Task AppendTraceIsTaggedWithErrorStatusOnException() { + var stream = Fixture.GetStreamName(); + + var actualException = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEventsThatThrowsException() + ).ShouldThrowAsync(); + + var activity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException); + } + + [Fact] + public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() { + var stream = Fixture.GetStreamName(); + var events = Fixture.CreateTestEventsWithWorkaroundCustomMetadata(2).ToArray(); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + events + ); + + string? subscriptionId = null; + await Subscribe().WithTimeout(); + + var appendActivity = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Append, stream) + .SingleOrDefault() + .ShouldNotBeNull(); + + var subscribeActivities = Fixture + .GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream) + .ToArray(); + + subscriptionId.ShouldNotBeNull(); + subscribeActivities.Length.ShouldBe(events.Length); + + for (var i = 0; i < subscribeActivities.Length; i++) { + subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId); + subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId); + subscribeActivities[i].HasRemoteParent.ShouldBeTrue(); + + Fixture.AssertSubscriptionActivityHasExpectedTags( + subscribeActivities[i], + stream, + events[i].EventId.ToString(), + subscriptionId + ); + } + + return; + + async Task Subscribe() { + await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + + int eventsAppeared = 0; + while (await enumerator.MoveNextAsync()) { + if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid)) + subscriptionId = sid; + + if (enumerator.Current is not StreamMessage.Event(_)) + continue; + + eventsAppeared++; + if (eventsAppeared >= events.Length) + return; + } + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs index f0f7cb9dc..2e9879a4a 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs @@ -7,24 +7,30 @@ public class subscribe_to_all(ITestOutputHelper output, SubscriptionsFixture fix [Fact] public async Task receives_all_events_from_start() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -52,7 +58,7 @@ public async Task receives_all_events_from_end() { var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.End); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -60,8 +66,11 @@ public async Task receives_all_events_from_end() { // add the events we want to receive after we start the subscription foreach (var evt in seedEvents) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -85,28 +94,34 @@ async Task Subscribe() { [Fact] public async Task receives_all_events_from_position() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", - StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); await using var subscription = Fixture.Streams.SubscribeToAll(position); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await Subscribe().WithTimeout(); @@ -131,13 +146,13 @@ async Task Subscribe() { public async Task receives_all_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, true); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -178,11 +193,15 @@ public async Task receives_all_filtered_events_from_start(SubscriptionFilter fil var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // Debugging: @@ -191,13 +210,16 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add some of the events we want to see before we start the subscription foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -205,8 +227,11 @@ await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid() // add some of the events we want to see after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -223,6 +248,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -254,11 +280,15 @@ public async Task receives_all_filtered_events_from_end(SubscriptionFilter filte var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // Debugging: @@ -267,13 +297,16 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add some of the events we want to see before we start the subscription foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.End, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -281,8 +314,11 @@ await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid() // add some of the events we want to see after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -299,6 +335,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -330,25 +367,32 @@ public async Task receives_all_filtered_events_from_position(SubscriptionFilter var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, - Fixture.CreateTestEvents(3)); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start) .Messages.CountAsync(); + Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // add some of the events that are a match to the filter but will not be received IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", - StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1); await using var subscription = Fixture.Streams.SubscribeToAll(position, filterOptions: filterOptions); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -356,8 +400,11 @@ await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.N // add the events we want to receive after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, - new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); bool checkpointReached = false; @@ -374,6 +421,7 @@ async Task Subscribe() { checkpointReached = true; break; + case StreamMessage.Event(var resolvedEvent): { availableEvents.Remove(resolvedEvent.Event.EventId); @@ -392,7 +440,7 @@ async Task Subscribe() { public async Task receives_all_filtered_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); @@ -403,6 +451,7 @@ public async Task receives_all_filtered_events_with_resolved_links() { await using var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, true, filterOptions: filterOptions); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs index 3cc26c8a1..d38391e8f 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs @@ -9,14 +9,14 @@ public async Task receives_all_events_from_start() { var streamName = Fixture.GetStreamName(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents.Take(pageSize)); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -48,25 +48,29 @@ public async Task receives_all_events_from_position() { var streamName = Fixture.GetStreamName(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - var pageSize = seedEvents.Length / 2; + var pageSize = seedEvents.Length / 2; // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); var writeResult = await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents.Take(pageSize)); + var streamPosition = StreamPosition.FromStreamRevision(writeResult.NextExpectedStreamRevision); - var checkpoint = FromStream.After(streamPosition); + var checkpoint = FromStream.After(streamPosition); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, checkpoint); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); Assert.IsType(enumerator.Current); - await Fixture.Streams.AppendToStreamAsync(streamName, writeResult.NextExpectedStreamRevision, - seedEvents.Skip(pageSize)); + await Fixture.Streams.AppendToStreamAsync( + streamName, + writeResult.NextExpectedStreamRevision, + seedEvents.Skip(pageSize) + ); await Subscribe().WithTimeout(); @@ -96,7 +100,7 @@ public async Task receives_all_events_from_non_existing_stream() { var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -132,14 +136,14 @@ public async Task allow_multiple_subscriptions_to_same_stream() { await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription1 = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator1 = subscription1.Messages.GetAsyncEnumerator(); + await using var enumerator1 = subscription1.Messages.GetAsyncEnumerator(); Assert.True(await enumerator1.MoveNextAsync()); Assert.IsType(enumerator1.Current); await using var subscription2 = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator2 = subscription2.Messages.GetAsyncEnumerator(); + await using var enumerator2 = subscription2.Messages.GetAsyncEnumerator(); Assert.True(await enumerator2.MoveNextAsync()); @@ -171,7 +175,7 @@ public async Task drops_when_stream_tombstoned() { var streamName = Fixture.GetStreamName(); await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start); - await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); @@ -180,9 +184,11 @@ public async Task drops_when_stream_tombstoned() { // rest in peace await Fixture.Streams.TombstoneAsync(streamName, StreamState.NoStream); - var ex = await Assert.ThrowsAsync(async () => { - while (await enumerator.MoveNextAsync()) { } - }).WithTimeout(); + var ex = await Assert.ThrowsAsync( + async () => { + while (await enumerator.MoveNextAsync()) { } + } + ).WithTimeout(); ex.ShouldBeOfType().Stream.ShouldBe(streamName); } @@ -191,13 +197,14 @@ public async Task drops_when_stream_tombstoned() { public async Task receives_all_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); - var seedEvents = Fixture.CreateTestEvents(3).ToArray(); + var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); await using var subscription = Fixture.Streams.SubscribeToStream($"$et-{EventStoreFixture.TestEventType}", FromStream.Start, true); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); Assert.True(await enumerator.MoveNextAsync()); diff --git a/test/EventStore.Client.Tests.Common/Fixtures/Base/EventStoreTestServer.cs b/test/EventStore.Client.Tests.Common/Fixtures/Base/EventStoreTestServer.cs index c1edde5a3..20c8c0c93 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/Base/EventStoreTestServer.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/Base/EventStoreTestServer.cs @@ -42,7 +42,7 @@ public EventStoreTestServer( var env = new Dictionary { ["EVENTSTORE_DB_LOG_FORMAT"] = "V2", ["EVENTSTORE_MEM_DB"] = "true", - ["EVENTSTORE_CHUNK_SIZE"] = (1024 * 1024).ToString(), + ["EVENTSTORE_CHUNK_SIZE"] = (1024 * 1024 * 1024).ToString(), ["EVENTSTORE_CERTIFICATE_FILE"] = "/etc/eventstore/certs/node/node.crt", ["EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE"] = "/etc/eventstore/certs/node/node.key", ["EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH"] = "/etc/eventstore/certs/ca", diff --git a/test/EventStore.Client.Tests.Common/Fixtures/CertificatesManager.cs b/test/EventStore.Client.Tests.Common/Fixtures/CertificatesManager.cs index 0fd15d584..6b57137cc 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/CertificatesManager.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/CertificatesManager.cs @@ -55,7 +55,7 @@ await GenerateCertificates( static Task GenerateCertificates(string sourceFolder, string expectedLogMessage, string command, params string[] commandArgs) { using var container = new Builder() .UseContainer() - .UseImage("eventstore/es-gencert-cli:1.0.2") + .UseImage("ghcr.io/eventstore/es-gencert-cli:1.3.0") .MountVolume(sourceFolder, "/tmp", Ductus.FluentDocker.Model.Builders.MountType.ReadWrite) // .MountVolume(Options.CertificateDirectory.FullName, "/etc/eventstore/certs", MountType.ReadOnly) .Command(command, commandArgs) @@ -72,4 +72,4 @@ static Task GenerateCertificates(string sourceFolder, string expectedLogMessage, return certificateDirectory; } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs b/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs new file mode 100644 index 000000000..465e5a4b0 --- /dev/null +++ b/test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs @@ -0,0 +1,85 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using EventStore.Client.Diagnostics; +using EventStore.Client.Diagnostics.Telemetry; +using EventStore.Client.Diagnostics.Tracing; + +namespace EventStore.Client.Tests; + +public class DiagnosticsFixture : EventStoreFixture { + readonly ConcurrentQueue _activities = []; + + public DiagnosticsFixture() { + var diagnosticActivityListener = new ActivityListener { + ShouldListenTo = source => source.Name == EventStoreClientInstrumentation.ActivitySourceName, + Sample = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => _activities.Enqueue(activity) + }; + + OnSetup = () => { + ActivitySource.AddActivityListener(diagnosticActivityListener); + + return Task.CompletedTask; + }; + + OnTearDown = () => { + diagnosticActivityListener.Dispose(); + + return Task.CompletedTask; + }; + } + + public IEnumerable GetActivitiesForOperation(string operation, string stream) + => _activities.Where( + activity => (string?)activity.GetTagItem(TelemetryAttributes.DatabaseOperation) == operation + && (string?)activity.GetTagItem(TelemetryAttributes.EventStoreStream) == stream + ); + + public void AssertAppendActivityHasExpectedTags(Activity activity, string stream) { + var expectedTags = new Dictionary { + { TelemetryAttributes.DatabaseSystem, "eventstoredb" }, + { TelemetryAttributes.DatabaseOperation, TracingConstants.Operations.Append }, + { TelemetryAttributes.EventStoreStream, stream }, + { TelemetryAttributes.DatabaseUser, TestCredentials.Root.Username }, + { TelemetryAttributes.OtelStatusCode, "OK" } + }; + + foreach (var tag in expectedTags) + activity.Tags.ShouldContain(tag); + } + + public void AssertErroneousAppendActivityHasExpectedTags( + Activity activity, Exception actualException + ) { + var expectedTags = new Dictionary { + { TelemetryAttributes.OtelStatusCode, "ERROR" }, + { TelemetryAttributes.ExceptionType, actualException.GetType().Name }, { + TelemetryAttributes.ExceptionMessage, + $"{actualException.Message} {actualException.InnerException?.Message}" + } + }; + + foreach (var tag in expectedTags) + activity.Tags.ShouldContain(tag); + + activity.GetTagItem(TelemetryAttributes.ExceptionStacktrace).ShouldNotBeNull(); + } + + public void AssertSubscriptionActivityHasExpectedTags( + Activity activity, string stream, string eventId, string? subscriptionId + ) { + var expectedTags = new Dictionary { + { TelemetryAttributes.DatabaseSystem, "eventstoredb" }, + { TelemetryAttributes.DatabaseOperation, TracingConstants.Operations.Subscribe }, + { TelemetryAttributes.EventStoreStream, stream }, + { TelemetryAttributes.EventStoreEventId, eventId }, + { TelemetryAttributes.EventStoreEventType, EventStoreFixture.TestEventType }, + { TelemetryAttributes.EventStoreSubscriptionId, subscriptionId }, + { TelemetryAttributes.DatabaseUser, TestCredentials.Root.Username } + }; + + foreach (var tag in expectedTags) + activity.Tags.ShouldContain(tag); + } +} diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs index d1b6740d2..28c190039 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs @@ -4,19 +4,38 @@ namespace EventStore.Client.Tests; public partial class EventStoreFixture { - public const string TestEventType = "test-event-type"; + public const string TestEventType = "test-event-type"; public const string AnotherTestEventTypePrefix = "another"; - public const string AnotherTestEventType = $"{AnotherTestEventTypePrefix}-test-event-type"; + public const string AnotherTestEventType = $"{AnotherTestEventTypePrefix}-test-event-type"; public T NewClient(Action configure) where T : EventStoreClientBase, new() => (T)Activator.CreateInstance(typeof(T), new object?[] { ClientSettings.With(configure) })!; - + public string GetStreamName([CallerMemberName] string? testMethod = null) => $"{testMethod}-{Guid.NewGuid():N}"; public IEnumerable CreateTestEvents(int count = 1, string? type = null, int metadataSize = 1) => Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadataSize)); + // TODO JC: TEMPORARY WORKAROUND CODE TO USE CUSTOM METADATA UNTIL DATABASE CHANGES ARE READY + public IEnumerable CreateTestEventsWithWorkaroundCustomMetadata(int count = 1) => + Enumerable.Range(0, count).Select( + _ => new EventData( + Uuid.NewUuid(), + TestEventType, + """{"foo":"bar"}"""u8.ToArray(), + """{"foo":"bar"}"""u8.ToArray() + ) + ); + + public IEnumerable CreateTestEventsThatThrowsException() { + // Ensure initial IEnumerator.Current does not throw + yield return CreateTestEvent(1); + + // Throw after enumerator advances + throw new Exception(); + } + protected static EventData CreateTestEvent(int index) => CreateTestEvent(index, TestEventType, 1); protected static EventData CreateTestEvent(int index, string type, int metadataSize) => @@ -32,21 +51,26 @@ public async Task CreateTestUser(bool withoutGroups = true, bool useUs return result.First(); } - public Task CreateTestUsers(int count = 3, bool withoutGroups = true, bool useUserCredentials = false) => + public Task CreateTestUsers( + int count = 3, bool withoutGroups = true, bool useUserCredentials = false + ) => Fakers.Users .RuleFor(x => x.Groups, f => withoutGroups ? Array.Empty() : f.Lorem.Words()) .Generate(count) .Select( async user => { await Users.CreateUserAsync( - user.LoginName, user.FullName, user.Groups, user.Password, + user.LoginName, + user.FullName, + user.Groups, + user.Password, userCredentials: useUserCredentials ? user.Credentials : TestCredentials.Root ); return user; } ).WhenAll(); - + public async Task RestartService(TimeSpan delay) { await Service.Restart(delay); await Streams.WarmUp(); diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs index 1744ceda6..50faebf7a 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.cs @@ -8,7 +8,10 @@ namespace EventStore.Client.Tests; -public record EventStoreFixtureOptions(EventStoreClientSettings ClientSettings, IDictionary Environment) { +public record EventStoreFixtureOptions( + EventStoreClientSettings ClientSettings, + IDictionary Environment +) { public EventStoreFixtureOptions RunInMemory(bool runInMemory = true) => this with { Environment = Environment.With(x => x["EVENTSTORE_MEM_DB"] = runInMemory.ToString()) }; @@ -24,7 +27,7 @@ this with { public EventStoreFixtureOptions WithoutDefaultCredentials() => this with { ClientSettings = ClientSettings.With(x => x.DefaultCredentials = null) }; - + public EventStoreFixtureOptions WithMaxAppendSize(uint maxAppendSize) => this with { Environment = Environment.With(x => x["EVENTSTORE_MAX_APPEND_SIZE"] = $"{maxAppendSize}") }; } @@ -54,8 +57,7 @@ protected EventStoreFixture(ConfigureFixture configure) { if (GlobalEnvironment.UseCluster) { Options = configure(EventStoreTestCluster.DefaultOptions()); Service = new EventStoreTestCluster(Options); - } - else { + } else { Options = configure(EventStoreTestNode.DefaultOptions()); Service = new EventStoreTestNode(Options); } @@ -64,7 +66,7 @@ protected EventStoreFixture(ConfigureFixture configure) { List TestRuns { get; } = new(); public ILogger Log => Logger; - + public ITestService Service { get; } public EventStoreFixtureOptions Options { get; } public Faker Faker { get; } = new Faker(); @@ -80,7 +82,7 @@ protected EventStoreFixture(ConfigureFixture configure) { public Func OnSetup { get; init; } = () => Task.CompletedTask; public Func OnTearDown { get; init; } = () => Task.CompletedTask; - + /// /// must test this /// @@ -96,62 +98,62 @@ protected EventStoreFixture(ConfigureFixture configure) { DefaultCredentials = Options.ClientSettings.DefaultCredentials, DefaultDeadline = Options.ClientSettings.DefaultDeadline }; - + InterlockedBoolean WarmUpCompleted { get; } = new InterlockedBoolean(); SemaphoreSlim WarmUpGatekeeper { get; } = new(1, 1); - public void CaptureTestRun(ITestOutputHelper outputHelper) { var testRunId = Logging.CaptureLogs(outputHelper); TestRuns.Add(testRunId); Logger.Information(">>> Test Run {TestRunId} {Operation} <<<", testRunId, "starting"); Service.ReportStatus(); } - + public async Task InitializeAsync() { await Service.Start(); EventStoreVersion = GetEventStoreVersion(); EventStoreHasLastStreamPosition = (EventStoreVersion?.Major ?? int.MaxValue) >= 21; - + await WarmUpGatekeeper.WaitAsync(); - + try { if (!WarmUpCompleted.CurrentValue) { Logger.Warning("*** Warmup started ***"); await Task.WhenAll( InitClient(async x => Users = await x.WarmUp()), - InitClient(async x => Streams = await x.WarmUp()), - InitClient(async x => Projections = await x.WarmUp(), Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None"), + InitClient(async x => Streams = await x.WarmUp()), + InitClient( + async x => Projections = await x.WarmUp(), + Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None" + ), InitClient(async x => Subscriptions = await x.WarmUp()), - InitClient(async x => Operations = await x.WarmUp()) + InitClient(async x => Operations = await x.WarmUp()) ); - + WarmUpCompleted.EnsureCalledOnce(); - + Logger.Warning("*** Warmup completed ***"); - } - else { + } else { Logger.Information("*** Warmup skipped ***"); } - } - finally { + } finally { WarmUpGatekeeper.Release(); } - + await OnSetup(); - + return; async Task InitClient(Func action, bool execute = true) where T : EventStoreClientBase { if (!execute) return default(T)!; + var client = (Activator.CreateInstance(typeof(T), new object?[] { ClientSettings }) as T)!; await action(client); return client; } - - + static Version GetEventStoreVersion() { const string versionPrefix = "EventStoreDB version"; @@ -166,7 +168,10 @@ static Version GetEventStoreVersion() { using var log = eventstore.Logs(true, cancellator.Token); foreach (var line in log.ReadToEnd()) { if (line.StartsWith(versionPrefix) && - Version.TryParse(new string(ReadVersion(line[(versionPrefix.Length + 1)..]).ToArray()), out var version)) { + Version.TryParse( + new string(ReadVersion(line[(versionPrefix.Length + 1)..]).ToArray()), + out var version + )) { return version; } } @@ -184,8 +189,7 @@ IEnumerable ReadVersion(string s) { public async Task DisposeAsync() { try { await OnTearDown(); - } - catch { + } catch { // ignored } @@ -206,11 +210,13 @@ public class EventStoreSharedDatabaseFixture : ICollectionFixture : IClassFixture where TFixture : EventStoreFixture { - protected EventStoreTests(ITestOutputHelper output, TFixture fixture) => Fixture = fixture.With(x => x.CaptureTestRun(output)); - + protected EventStoreTests(ITestOutputHelper output, TFixture fixture) => + Fixture = fixture.With(x => x.CaptureTestRun(output)); + protected TFixture Fixture { get; } } [Collection(nameof(EventStoreSharedDatabaseFixture))] -public abstract class EventStoreSharedDatabaseTests(ITestOutputHelper output, TFixture fixture) : EventStoreTests(output, fixture) +public abstract class EventStoreSharedDatabaseTests(ITestOutputHelper output, TFixture fixture) + : EventStoreTests(output, fixture) where TFixture : EventStoreFixture; diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs index 11f247691..5b03f75b1 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreTestNode.cs @@ -33,7 +33,7 @@ public static EventStoreFixtureOptions DefaultOptions() { var defaultEnvironment = new Dictionary(GlobalEnvironment.Variables) { ["EVENTSTORE_MEM_DB"] = "true", - ["EVENTSTORE_CHUNK_SIZE"] = (1024 * 1024).ToString(), + ["EVENTSTORE_CHUNK_SIZE"] = (1024 * 1024 * 1024).ToString(), ["EVENTSTORE_CERTIFICATE_FILE"] = "/etc/eventstore/certs/node/node.crt", ["EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE"] = "/etc/eventstore/certs/node/node.key", ["EVENTSTORE_STREAM_EXISTENCE_FILTER_SIZE"] = "10000", diff --git a/test/EventStore.Client.Tests.Common/docker-compose.certs.yml b/test/EventStore.Client.Tests.Common/docker-compose.certs.yml index 4ff518f6a..49c16183c 100644 --- a/test/EventStore.Client.Tests.Common/docker-compose.certs.yml +++ b/test/EventStore.Client.Tests.Common/docker-compose.certs.yml @@ -16,7 +16,7 @@ services: network_mode: none cert-gen: - image: eventstore/es-gencert-cli:1.0.2 + image: ghcr.io/eventstore/es-gencert-cli:1.3.0 container_name: cert-gen user: "1000:1000" entrypoint: [ "/bin/sh","-c" ] @@ -32,4 +32,4 @@ services: volumes: - "${ES_CERTS_CLUSTER}:/tmp/certs" depends_on: - - volumes-provisioner \ No newline at end of file + - volumes-provisioner diff --git a/test/EventStore.Client.Tests.Common/docker-compose.cluster.yml b/test/EventStore.Client.Tests.Common/docker-compose.cluster.yml index 2c92d5162..49591b41f 100644 --- a/test/EventStore.Client.Tests.Common/docker-compose.cluster.yml +++ b/test/EventStore.Client.Tests.Common/docker-compose.cluster.yml @@ -11,7 +11,7 @@ services: network_mode: none cert-gen: - image: eventstore/es-gencert-cli:1.0.2 + image: ghcr.io/eventstore/es-gencert-cli:1.3.0 container_name: cert-gen user: "1000:1000" entrypoint: [ "/bin/sh","-c" ] diff --git a/test/EventStore.Client.Tests.Common/docker-compose.yml b/test/EventStore.Client.Tests.Common/docker-compose.yml index 610a27445..86e4536b1 100644 --- a/test/EventStore.Client.Tests.Common/docker-compose.yml +++ b/test/EventStore.Client.Tests.Common/docker-compose.yml @@ -11,7 +11,7 @@ services: network_mode: none cert-gen: - image: eventstore/es-gencert-cli:1.0.2 + image: ghcr.io/eventstore/es-gencert-cli:1.3.0 container_name: cert-gen user: "1000:1000" entrypoint: [ "/bin/sh","-c" ]