diff --git a/EventStore.Client.sln b/EventStore.Client.sln
index 51229f72c..84919f42b 100644
--- a/EventStore.Client.sln
+++ b/EventStore.Client.sln
@@ -33,6 +33,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.UserManag
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests.Common", "test\EventStore.Client.Tests.Common\EventStore.Client.Tests.Common.csproj", "{E326832D-DE52-4DE4-9E54-C800908B75F3}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Extensions.OpenTelemetry", "src\EventStore.Client.Extensions.OpenTelemetry\EventStore.Client.Extensions.OpenTelemetry.csproj", "{3723933C-585A-49BE-98E8-52D3FAD904CE}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
@@ -94,6 +96,10 @@ Global
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.Build.0 = Debug|Any CPU
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.ActiveCfg = Release|Any CPU
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.Build.0 = Release|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.Build.0 = Debug|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.ActiveCfg = Release|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{D3744A86-DD35-4104-AAEE-84B79062C4A2} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
@@ -109,5 +115,6 @@ Global
{6CEB731F-72E1-461F-A6B3-54DBF3FD786C} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{22634CEE-4F7B-4679-A48D-38A2A8580ECA} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{E326832D-DE52-4DE4-9E54-C800908B75F3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
+ {3723933C-585A-49BE-98E8-52D3FAD904CE} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
EndGlobalSection
EndGlobal
diff --git a/README.md b/README.md
index 37657a3df..0e4c39125 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,14 @@ Reference the nuget package(s) for the API that you would like to call
[User Management](https://www.nuget.org/packages/EventStore.Client.Grpc.UserManagement)
+## Open Telemetry
+
+Telemetry instrumentation can be enabled by installing the [Open Telemetry Extensions](https://www.nuget.org/packages/EventStore.Client.Extensions.OpenTelemetry) package.
+
+Once installed you can configure instrumentation using the `AddEventStoreClientInstrumentation` extension method on a `TracerProviderBuilder`.
+
+Tracing is the only telemetry currently exported, specifically for the `Append` and `Subscribe` (Catchup and Persistent) operations.
+
## Support
Information on support and commercial tools such as LDAP authentication can be found here: [Event Store Support](https://eventstore.com/support/).
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index c12d9449c..609de534e 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -4,7 +4,7 @@
EventStore.Client
-
+
$(MSBuildProjectName.Remove(0,18))
$(ESPackageIdSuffix.ToLower()).proto
../EventStore.Client.Common/protos/$(ESProto)
@@ -50,6 +50,10 @@
+
+
+
+
<_Parameter1>$(ProjectName).Tests
diff --git a/src/EventStore.Client.Common/Constants.cs b/src/EventStore.Client.Common/Constants.cs
index 3e0279e6b..a088bcab2 100644
--- a/src/EventStore.Client.Common/Constants.cs
+++ b/src/EventStore.Client.Common/Constants.cs
@@ -39,10 +39,10 @@ public static class Exceptions {
}
public static class Metadata {
- public const string Type = "type";
- public const string Created = "created";
- public const string ContentType = "content-type";
- public static readonly string[] RequiredMetadata = { Type, ContentType };
+ public const string Type = "type";
+ public const string Created = "created";
+ public const string ContentType = "content-type";
+ public static readonly string[] RequiredMetadata = { Type, ContentType };
public static class ContentTypes {
public const string ApplicationJson = "application/json";
@@ -58,4 +58,4 @@ public static class Headers {
public const string ConnectionName = "connection-name";
public const string RequiresLeader = "requires-leader";
}
-}
\ No newline at end of file
+}
diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj
new file mode 100644
index 000000000..2783f0adc
--- /dev/null
+++ b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj
@@ -0,0 +1,20 @@
+
+
+
+ EventStore.Client.Extensions.OpenTelemetry
+
+
+
+ EventStore.Client.Extensions.OpenTelemetry
+ Extensions used to facilitate instrumentation of the EventStore Client.
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs
new file mode 100644
index 000000000..6ee1c9edf
--- /dev/null
+++ b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs
@@ -0,0 +1,19 @@
+using EventStore.Client.Diagnostics;
+using JetBrains.Annotations;
+using OpenTelemetry.Trace;
+
+namespace EventStore.Client.Extensions.OpenTelemetry;
+
+///
+/// Extension methods used to facilitate instrumentation of the EventStore Client.
+///
+[PublicAPI]
+public static class TracerProviderBuilderExtensions {
+ ///
+ /// Enables instrumentation of the EventStore .NET Client on the OpenTelemetry TracerProvider.
+ ///
+ /// being configured.
+ /// The instance of to chain configuration.
+ public static TracerProviderBuilder AddEventStoreClientInstrumentation(this TracerProviderBuilder builder)
+ => builder.AddSource(EventStoreClientInstrumentation.ActivitySourceName);
+}
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs
index 9d0842855..b7341769c 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs
@@ -1,14 +1,13 @@
-using System;
using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
+using System.Diagnostics;
using System.Threading.Channels;
using Google.Protobuf;
using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
-using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
namespace EventStore.Client {
public partial class EventStoreClient {
@@ -30,24 +29,30 @@ public async Task AppendToStreamAsync(
Action? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var options = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(options);
_log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision);
- var batchAppender = _streamAppender;
var task =
- userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
- ? batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
+ userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false)
+ ? _batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
- (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
+ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName,
Revision = expectedRevision
}
- }, eventData, options, deadline, userCredentials, cancellationToken);
+ },
+ eventData,
+ options,
+ deadline,
+ userCredentials,
+ cancellationToken
+ );
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options);
}
@@ -70,29 +75,35 @@ public async Task AppendToStreamAsync(
Action? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);
- _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedState);
+ _log.LogDebug("Append to stream - {streamName}@{expectedState}.", streamName, expectedState);
- var batchAppender = _streamAppender;
var task =
- userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
- ? batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken)
+ userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false)
+ ? _batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
- (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
+ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName
}
- }.WithAnyStreamRevision(expectedState), eventData, operationOptions, deadline, userCredentials,
- cancellationToken);
+ }.WithAnyStreamRevision(expectedState),
+ eventData,
+ operationOptions,
+ deadline,
+ userCredentials,
+ cancellationToken
+ );
+
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions);
}
- private async ValueTask AppendToStreamInternal(
- CallInvoker callInvoker,
+ private ValueTask AppendToStreamInternal(
+ ChannelInfo channelInfo,
AppendReq header,
IEnumerable eventData,
EventStoreClientOperationOptions operationOptions,
@@ -100,39 +111,54 @@ private async ValueTask AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
- using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
- EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
+ return EventStoreClientDiagnostics.Trace(
+ Operation,
+ TracingConstants.Operations.Append,
+ new ActivityTagsCollection {
+ {
+ TelemetryAttributes.EventStoreStream,
+ header.Options.StreamIdentifier.StreamName.ToStringUtf8()
+ }
+ }
+ .WithTagsFrom(channelInfo, Settings)
+ .WithTagsFrom(userCredentials)
);
- await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
+ async ValueTask Operation() {
+ using var call = new Streams.Streams.StreamsClient(channelInfo.CallInvoker).Append(
+ EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
+ );
+
+ await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
- foreach (var e in eventData) {
- await call.RequestStream.WriteAsync(
- new AppendReq {
+ foreach (var e in eventData) {
+ var appendReq = new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
- CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
+ CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingMetadata()),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
- },
- }
- ).ConfigureAwait(false);
- }
+ }
+ };
- await call.RequestStream.CompleteAsync().ConfigureAwait(false);
+ await call.RequestStream.WriteAsync(appendReq).ConfigureAwait(false);
+ }
- var response = await call.ResponseAsync.ConfigureAwait(false);
+ await call.RequestStream.CompleteAsync().ConfigureAwait(false);
- if (response.Success != null)
- return HandleSuccessAppend(response, header);
+ var response = await call.ResponseAsync.ConfigureAwait(false);
- if (response.WrongExpectedVersion == null)
- throw new InvalidOperationException("The operation completed with an unexpected result.");
+ if (response.Success != null)
+ return HandleSuccessAppend(response, header);
- return HandleWrongExpectedRevision(response, header, operationOptions);
+ if (response.WrongExpectedVersion == null)
+ throw new InvalidOperationException("The operation completed with an unexpected result.");
+
+ return HandleWrongExpectedRevision(response, header, operationOptions);
+ }
}
private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
@@ -150,7 +176,8 @@ private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header)
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
position,
- currentRevision);
+ currentRevision
+ );
return new SuccessResult(currentRevision, position);
}
@@ -224,140 +251,187 @@ private class StreamAppender : IDisposable {
private readonly Action _onException;
private readonly Channel _channel;
private readonly ConcurrentDictionary> _pendingRequests;
+ private readonly TaskCompletionSource _isUsable;
- private readonly Task?> _callTask;
+ private ChannelInfo? _channelInfo;
- public StreamAppender(EventStoreClientSettings settings,
- Task?> callTask, CancellationToken cancellationToken,
- Action onException) {
+ public StreamAppender(
+ EventStoreClientSettings settings,
+ ValueTask channelInfoTask,
+ CancellationToken cancellationToken,
+ Action onException
+ ) {
_settings = settings;
- _callTask = callTask;
_cancellationToken = cancellationToken;
_onException = onException;
- _channel = System.Threading.Channels.Channel.CreateBounded(10000);
+ _channel = Channel.CreateBounded(10000);
_pendingRequests = new ConcurrentDictionary>();
- _ = Task.Factory.StartNew(Send);
- _ = Task.Factory.StartNew(Receive);
+ _isUsable = new TaskCompletionSource();
+
+ _ = Task.Run(() => Duplex(channelInfoTask));
}
- public ValueTask Append(string streamName, StreamRevision expectedStreamPosition,
- IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
- AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
- events, cancellationToken);
+ public ValueTask Append(
+ string streamName, StreamRevision expectedStreamPosition,
+ IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
+ ) =>
+ AppendInternal(
+ BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
+ events,
+ cancellationToken
+ );
- public ValueTask Append(string streamName, StreamState expectedStreamState,
- IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
- AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
- events, cancellationToken);
+ public ValueTask Append(
+ string streamName, StreamState expectedStreamState,
+ IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
+ ) =>
+ AppendInternal(
+ BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
+ events,
+ cancellationToken
+ );
- public async ValueTask IsUsable() {
- var call = await _callTask.ConfigureAwait(false);
- return call != null;
- }
+ public Task IsUsable() => _isUsable.Task;
+
+ private ValueTask AppendInternal(
+ BatchAppendReq.Types.Options options,
+ IEnumerable events, CancellationToken cancellationToken
+ ) {
+ return EventStoreClientDiagnostics.Trace(
+ Operation,
+ TracingConstants.Operations.Append,
+ new ActivityTagsCollection {
+ { TelemetryAttributes.EventStoreStream, options.StreamIdentifier.StreamName.ToStringUtf8() },
+ { TelemetryAttributes.ServerAddress, _channelInfo?.Channel.Target }
+ }
+ .WithTagsFrom(_channelInfo, _settings)
+ );
- private async Task Receive() {
- try {
- var call = await _callTask.ConfigureAwait(false);
- if (call is null) {
- _channel.Writer.TryComplete(
- new NotSupportedException("Server does not support batch append"));
- return;
- }
+ async ValueTask Operation() {
+ var correlationId = Uuid.NewUuid();
- await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false)) {
- if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) {
- continue; // TODO: Log?
- }
+ var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource());
- try {
- writeResult.TrySetResult(response.ToWriteResult());
- } catch (Exception ex) {
- writeResult.TrySetException(ex);
+ try {
+ foreach (var appendRequest in GetRequests(events, options, correlationId)) {
+ await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
}
+ } catch (ChannelClosedException ex) {
+ // channel is closed, our tcs won't necessarily get completed, don't wait for it.
+ throw ex.InnerException ?? ex;
}
- } catch (Exception ex) {
- // signal that no tcs added to _pendingRequests after this point will necessarily complete
- _channel.Writer.TryComplete(ex);
-
- // complete whatever tcs's we have
- _onException(ex);
- foreach (var request in _pendingRequests) {
- request.Value.TrySetException(ex);
- }
+
+ return await complete.Task.ConfigureAwait(false);
}
}
- private async Task Send() {
- var call = await _callTask.ConfigureAwait(false);
- if (call is null)
- throw new NotSupportedException("Server does not support batch append");
-
- await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false)) {
- await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
+ private async Task Duplex(
+ ValueTask channelInfoTask
+ ) {
+ _channelInfo = await channelInfoTask.ConfigureAwait(false);
+ if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) {
+ _channel.Writer.TryComplete(new NotSupportedException("Server does not support batch append"));
+ _isUsable.TrySetResult(false);
+ return;
}
- await call.RequestStream.CompleteAsync().ConfigureAwait(false);
- }
+ var call = new Streams.Streams.StreamsClient(_channelInfo.CallInvoker).BatchAppend(
+ EventStoreCallOptions.CreateStreaming(
+ _settings,
+ userCredentials: _settings.DefaultCredentials,
+ cancellationToken: _cancellationToken
+ )
+ );
+
+ _ = Task.Run(() => Send(call));
+ _ = Task.Run(() => Receive(call));
- private async ValueTask AppendInternal(BatchAppendReq.Types.Options options,
- IEnumerable events, CancellationToken cancellationToken) {
- var batchSize = 0;
- var correlationId = Uuid.NewUuid();
- var correlationIdDto = correlationId.ToDto();
+ _isUsable.TrySetResult(true);
- var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource());
+ return;
- try {
- foreach (var appendRequest in GetRequests()) {
- await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
+ async Task Send(AsyncDuplexStreamingCall call) {
+ await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false)) {
+ await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
}
- } catch (ChannelClosedException ex) {
- // channel is closed, our tcs won't necessarily get completed, don't wait for it.
- throw ex.InnerException ?? ex;
- }
- return await complete.Task.ConfigureAwait(false);
+ await call.RequestStream.CompleteAsync().ConfigureAwait(false);
+ }
- IEnumerable GetRequests() {
- bool first = true;
- var proposedMessages = new List();
- foreach (var @event in events) {
- var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
- Data = ByteString.CopyFrom(@event.Data.Span),
- CustomMetadata = ByteString.CopyFrom(@event.Metadata.Span),
- Id = @event.EventId.ToDto(),
- Metadata = {
- {Constants.Metadata.Type, @event.Type},
- {Constants.Metadata.ContentType, @event.ContentType}
+ async Task Receive(AsyncDuplexStreamingCall call) {
+ try {
+ await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false)) {
+ if (!_pendingRequests.TryRemove(
+ Uuid.FromDto(response.CorrelationId),
+ out var writeResult
+ )) {
+ continue; // TODO: Log?
}
- };
- proposedMessages.Add(proposedMessage);
+ try {
+ writeResult.TrySetResult(response.ToWriteResult());
+ } catch (Exception ex) {
+ writeResult.TrySetException(ex);
+ }
+ }
+ } catch (Exception ex) {
+ // signal that no tcs added to _pendingRequests after this point will necessarily complete
+ _channel.Writer.TryComplete(ex);
+
+ // complete whatever tcs's we have
+ _onException(ex);
+ foreach (var request in _pendingRequests) {
+ request.Value.TrySetException(ex);
+ }
+ }
+ }
+ }
- if ((batchSize += proposedMessage.CalculateSize()) <
- _settings.OperationOptions.BatchAppendSize) {
- continue;
+ private IEnumerable GetRequests(
+ IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId
+ ) {
+ var batchSize = 0;
+ bool first = true;
+ var correlationIdDto = correlationId.ToDto();
+ var proposedMessages = new List();
+
+ foreach (var @event in events) {
+ var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
+ Data = ByteString.CopyFrom(@event.Data.Span),
+ CustomMetadata = ByteString.CopyFrom(@event.Metadata.InjectTracingMetadata()),
+ Id = @event.EventId.ToDto(),
+ Metadata = {
+ { Constants.Metadata.Type, @event.Type },
+ { Constants.Metadata.ContentType, @event.ContentType }
}
+ };
+
+ proposedMessages.Add(proposedMessage);
- yield return new BatchAppendReq {
- ProposedMessages = {proposedMessages},
- CorrelationId = correlationIdDto,
- Options = first ? options : null
- };
- first = false;
- proposedMessages.Clear();
- batchSize = 0;
+ if ((batchSize += proposedMessage.CalculateSize()) <
+ _settings.OperationOptions.BatchAppendSize) {
+ continue;
}
yield return new BatchAppendReq {
- ProposedMessages = {proposedMessages},
- IsFinal = true,
+ ProposedMessages = { proposedMessages },
CorrelationId = correlationIdDto,
Options = first ? options : null
};
+
+ first = false;
+ proposedMessages.Clear();
+ batchSize = 0;
}
+
+ yield return new BatchAppendReq {
+ ProposedMessages = { proposedMessages },
+ IsFinal = true,
+ CorrelationId = correlationIdDto,
+ Options = first ? options : null
+ };
}
public void Dispose() {
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
index 6581bd94b..19de629e7 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
@@ -97,7 +97,7 @@ private async Task SetStreamMetadataInternal(StreamMetadata metada
CancellationToken cancellationToken) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] {
+ return await AppendToStreamInternal(channelInfo, appendReq, new[] {
new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata,
JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)),
}, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false);
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
index f82bd5d07..4b47b57b4 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
@@ -1,4 +1,8 @@
+using System.Diagnostics;
using System.Threading.Channels;
+using EventStore.Client.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;
@@ -24,10 +28,15 @@ public Task SubscribeToAllAsync(
Action? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
+ CancellationToken cancellationToken = default
+ ) => StreamSubscription.Confirm(
SubscribeToAll(start, resolveLinkTos, filterOptions, userCredentials, cancellationToken),
- eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached,
- cancellationToken: cancellationToken);
+ eventAppeared,
+ subscriptionDropped,
+ _log,
+ filterOptions?.CheckpointReached,
+ cancellationToken: cancellationToken
+ );
///
/// Subscribes to all events.
@@ -43,19 +52,23 @@ public StreamSubscriptionResult SubscribeToAll(
bool resolveLinkTos = false,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => new(async _ => {
- var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return channelInfo.CallInvoker;
- }, new ReadReq {
- Options = new ReadReq.Types.Options {
- ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
- ResolveLinks = resolveLinkTos,
- All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
- Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
- Filter = GetFilterOptions(filterOptions)!,
- UuidOption = new() { Structured = new() }
- }
- }, Settings, userCredentials, cancellationToken);
+ CancellationToken cancellationToken = default
+ ) => new(
+ async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false),
+ new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ Filter = GetFilterOptions(filterOptions)!,
+ UuidOption = new() { Structured = new() }
+ }
+ },
+ Settings,
+ userCredentials,
+ cancellationToken
+ );
///
/// Subscribes to a stream from a checkpoint.
@@ -69,15 +82,21 @@ public StreamSubscriptionResult SubscribeToAll(
/// The optional .
///
[Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.", false)]
- public Task SubscribeToStreamAsync(string streamName,
- FromStream start,
- Func eventAppeared,
- bool resolveLinkTos = false,
- Action? subscriptionDropped = default,
- UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
+ public Task SubscribeToStreamAsync(
+ string streamName,
+ FromStream start,
+ Func eventAppeared,
+ bool resolveLinkTos = false,
+ Action? subscriptionDropped = default,
+ UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default
+ ) => StreamSubscription.Confirm(
SubscribeToStream(streamName, start, resolveLinkTos, userCredentials, cancellationToken),
- eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken);
+ eventAppeared,
+ subscriptionDropped,
+ _log,
+ cancellationToken: cancellationToken
+ );
///
/// Subscribes to a stream from a checkpoint.
@@ -93,28 +112,33 @@ public StreamSubscriptionResult SubscribeToStream(
FromStream start,
bool resolveLinkTos = false,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => new(async _ => {
- var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return channelInfo.CallInvoker;
- }, new ReadReq {
- Options = new ReadReq.Types.Options {
- ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
- ResolveLinks = resolveLinkTos,
- Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
- Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
- UuidOption = new() { Structured = new() }
- }
- }, Settings, userCredentials, cancellationToken);
+ CancellationToken cancellationToken = default
+ ) => new(
+ async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false),
+ new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ UuidOption = new() { Structured = new() }
+ }
+ },
+ Settings,
+ userCredentials,
+ cancellationToken
+ );
///
/// A class that represents the result of a subscription operation. You may either enumerate this instance directly or . Do not enumerate more than once.
///
public class StreamSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable {
- private readonly ReadReq _request;
- private readonly Channel _channel;
- private readonly CancellationTokenSource _cts;
- private readonly CallOptions _callOptions;
- private AsyncServerStreamingCall? _call;
+ private readonly ReadReq _request;
+ private readonly Channel _channel;
+ private readonly CancellationTokenSource _cts;
+ private readonly CallOptions _callOptions;
+ private readonly EventStoreClientSettings _settings;
+ private AsyncServerStreamingCall? _call;
private int _messagesEnumerated;
@@ -150,12 +174,18 @@ async IAsyncEnumerable GetMessages() {
}
}
- internal StreamSubscriptionResult(Func> selectCallInvoker,
+ internal StreamSubscriptionResult(
+ Func> selectChannelInfo,
ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials,
- CancellationToken cancellationToken) {
- _request = request;
- _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
- cancellationToken: cancellationToken);
+ CancellationToken cancellationToken
+ ) {
+ _request = request;
+ _settings = settings;
+ _callOptions = EventStoreCallOptions.CreateStreaming(
+ settings,
+ userCredentials: userCredentials,
+ cancellationToken: cancellationToken
+ );
_channel = Channel.CreateBounded(ReadBoundedChannelOptions);
@@ -171,29 +201,72 @@ internal StreamSubscriptionResult(Func> sel
async Task PumpMessages() {
try {
- var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
- var client = new Streams.Streams.StreamsClient(callInvoker);
+ var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
+ var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker);
_call = client.Read(_request, _callOptions);
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
.ConfigureAwait(false)) {
- await _channel.Writer.WriteAsync(response.ContentCase switch {
- Confirmation => new StreamMessage.SubscriptionConfirmation(
- response.Confirmation.SubscriptionId),
- Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
- FirstStreamPosition => new StreamMessage.FirstStreamPosition(
- new StreamPosition(response.FirstStreamPosition)),
- LastStreamPosition => new StreamMessage.LastStreamPosition(
- new StreamPosition(response.LastStreamPosition)),
- LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
- new Position(response.LastAllStreamPosition.CommitPosition,
- response.LastAllStreamPosition.PreparePosition)),
- Checkpoint => new StreamMessage.AllStreamCheckpointReached(
- new Position(response.Checkpoint.CommitPosition,
- response.Checkpoint.PreparePosition)),
- CaughtUp => StreamMessage.CaughtUp.Instance,
- FellBehind => StreamMessage.FellBehind.Instance,
- _ => StreamMessage.Unknown.Instance
- }, _cts.Token).ConfigureAwait(false);
+ StreamMessage streamMessage =
+ response.ContentCase switch {
+ Confirmation => new StreamMessage.SubscriptionConfirmation(
+ response.Confirmation.SubscriptionId
+ ),
+ Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
+ FirstStreamPosition => new StreamMessage.FirstStreamPosition(
+ new StreamPosition(response.FirstStreamPosition)
+ ),
+ LastStreamPosition => new StreamMessage.LastStreamPosition(
+ new StreamPosition(response.LastStreamPosition)
+ ),
+ LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
+ new Position(
+ response.LastAllStreamPosition.CommitPosition,
+ response.LastAllStreamPosition.PreparePosition
+ )
+ ),
+ Checkpoint => new StreamMessage.AllStreamCheckpointReached(
+ new Position(
+ response.Checkpoint.CommitPosition,
+ response.Checkpoint.PreparePosition
+ )
+ ),
+ CaughtUp => StreamMessage.CaughtUp.Instance,
+ FellBehind => StreamMessage.FellBehind.Instance,
+ _ => StreamMessage.Unknown.Instance
+ };
+
+ if (streamMessage is StreamMessage.Event evnt) {
+ var restoredTracingContext =
+ evnt.ResolvedEvent.OriginalEvent.Metadata.RestoreTracingContext();
+
+ if (restoredTracingContext != null)
+ EventStoreClientDiagnostics.StartActivity(
+ TracingConstants.Operations.Subscribe,
+ new ActivityTagsCollection {
+ {
+ TelemetryAttributes.EventStoreStream,
+ evnt.ResolvedEvent.OriginalEvent.EventStreamId
+ }, {
+ TelemetryAttributes.EventStoreSubscriptionId,
+ SubscriptionId
+ }, {
+ TelemetryAttributes.EventStoreEventId,
+ evnt.ResolvedEvent.OriginalEvent.EventId
+ }, {
+ TelemetryAttributes.EventStoreEventType,
+ evnt.ResolvedEvent.OriginalEvent.EventType
+ }
+ }.WithTagsFrom(channelInfo, _settings)
+ .WithTagsFrom(userCredentials),
+ ActivityKind.Consumer,
+ restoredTracingContext
+ )?.Dispose();
+ }
+
+ await _channel.Writer.WriteAsync(
+ streamMessage,
+ _cts.Token
+ ).ConfigureAwait(false);
}
_channel.Writer.Complete();
@@ -214,9 +287,11 @@ static async ValueTask CastAndDispose(IDisposable? resource) {
switch (resource) {
case null:
return;
+
case IAsyncDisposable resourceAsyncDisposable:
await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false);
break;
+
default:
resource.Dispose();
break;
@@ -232,7 +307,8 @@ public void Dispose() {
///
public async IAsyncEnumerator GetAsyncEnumerator(
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
try {
await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false)) {
diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs
index 361e6e2d4..63928055e 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.cs
@@ -12,32 +12,48 @@ namespace EventStore.Client {
/// The client used for operations on streams.
///
public sealed partial class EventStoreClient : EventStoreClientBase {
-
private static readonly JsonSerializerOptions StreamMetadataJsonSerializerOptions = new() {
Converters = {
StreamMetadataJsonConverter.Instance
},
};
- private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
- SingleReader = true,
- SingleWriter = true,
+ private static BoundedChannelOptions ReadBoundedChannelOptions = new(1) {
+ SingleReader = true,
+ SingleWriter = true,
AllowSynchronousContinuations = true
};
-
- private readonly ILogger _log;
- private Lazy _streamAppenderLazy;
- private StreamAppender _streamAppender => _streamAppenderLazy.Value;
- private readonly CancellationTokenSource _disposedTokenSource;
+ private readonly ILogger _log;
+ private Lazy _streamAppenderLazy;
+ private StreamAppender _batchAppender => _streamAppenderLazy.Value;
+ private readonly CancellationTokenSource _disposedTokenSource;
private static readonly Dictionary> ExceptionMap = new() {
[Constants.Exceptions.InvalidTransaction] = ex => new InvalidTransactionException(ex.Message, ex),
- [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", ex),
- [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), ex, ex.Message),
- [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), ex),
- [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex),
- [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, ex),
+ [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "",
+ ex
+ ),
+ [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!,
+ ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion),
+ ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion),
+ ex,
+ ex.Message
+ ),
+ [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(
+ ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize),
+ ex
+ ),
+ [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!,
+ ex
+ ),
+ [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!,
+ ex
+ ),
};
///
@@ -57,25 +73,20 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
}
private void SwapStreamAppender(Exception ex) =>
- Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value.Dispose();
+ Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value
+ .Dispose();
// todo: might be nice to have two different kinds of appenders and we decide which to instantiate according to the server caps.
- private StreamAppender CreateStreamAppender() {
- return new StreamAppender(Settings, GetCall(), _disposedTokenSource.Token, SwapStreamAppender);
-
- async Task?> GetCall() {
- var channelInfo = await GetChannelInfo(_disposedTokenSource.Token).ConfigureAwait(false);
- if (!channelInfo.ServerCapabilities.SupportsBatchAppend)
- return null;
-
- var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker);
-
- return client.BatchAppend(EventStoreCallOptions.CreateStreaming(Settings,
- userCredentials: Settings.DefaultCredentials, cancellationToken: _disposedTokenSource.Token));
- }
- }
-
- private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) {
+ private StreamAppender CreateStreamAppender() => new StreamAppender(
+ Settings,
+ GetChannelInfo(_disposedTokenSource.Token),
+ _disposedTokenSource.Token,
+ SwapStreamAppender
+ );
+
+ private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
+ IEventFilter? filter, uint checkpointInterval = 0
+ ) {
if (filter == null) {
return null;
}
@@ -131,13 +142,16 @@ private StreamAppender CreateStreamAppender() {
return options;
}
- private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions)
+ private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
+ SubscriptionFilterOptions? filterOptions
+ )
=> filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval);
///
public override void Dispose() {
if (_streamAppenderLazy.IsValueCreated)
_streamAppenderLazy.Value.Dispose();
+
_disposedTokenSource.Dispose();
base.Dispose();
}
@@ -146,6 +160,7 @@ public override void Dispose() {
public override async ValueTask DisposeAsync() {
if (_streamAppenderLazy.IsValueCreated)
_streamAppenderLazy.Value.Dispose();
+
_disposedTokenSource.Dispose();
await base.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs
index 9019eda64..c6b9e8ae6 100644
--- a/src/EventStore.Client.Streams/StreamSubscription.cs
+++ b/src/EventStore.Client.Streams/StreamSubscription.cs
@@ -7,26 +7,28 @@ namespace EventStore.Client {
///
[Obsolete]
public class StreamSubscription : IDisposable {
- private readonly EventStoreClient.StreamSubscriptionResult _subscription;
- private readonly IAsyncEnumerator _messages;
- private readonly Func _eventAppeared;
- private readonly Func _checkpointReached;
+ private readonly EventStoreClient.StreamSubscriptionResult _subscription;
+ private readonly IAsyncEnumerator _messages;
+ private readonly Func _eventAppeared;
+ private readonly Func _checkpointReached;
private readonly Action? _subscriptionDropped;
- private readonly ILogger _log;
- private readonly CancellationTokenSource _cts;
- private int _subscriptionDroppedInvoked;
+ private readonly ILogger _log;
+ private readonly CancellationTokenSource _cts;
+ private int _subscriptionDroppedInvoked;
///
/// The id of the set by the server.
///
public string SubscriptionId { get; }
- internal static async Task Confirm(EventStoreClient.StreamSubscriptionResult subscription,
+ internal static async Task Confirm(
+ EventStoreClient.StreamSubscriptionResult subscription,
Func eventAppeared,
Action? subscriptionDropped,
ILogger log,
Func? checkpointReached = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var messages = subscription.Messages;
var enumerator = messages.GetAsyncEnumerator(cancellationToken);
@@ -35,26 +37,36 @@ enumerator.Current is not StreamMessage.SubscriptionConfirmation(var subscriptio
throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed.");
}
- return new StreamSubscription(subscription, enumerator, subscriptionId, eventAppeared, subscriptionDropped,
- log, checkpointReached, cancellationToken);
+ return new StreamSubscription(
+ subscription,
+ enumerator,
+ subscriptionId,
+ eventAppeared,
+ subscriptionDropped,
+ log,
+ checkpointReached,
+ cancellationToken
+ );
}
- private StreamSubscription(EventStoreClient.StreamSubscriptionResult subscription,
+ private StreamSubscription(
+ EventStoreClient.StreamSubscriptionResult subscription,
IAsyncEnumerator messages, string subscriptionId,
Func eventAppeared,
Action? subscriptionDropped,
ILogger log,
Func? checkpointReached,
- CancellationToken cancellationToken = default) {
- _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- _subscription = subscription;
- _messages = messages;
- _eventAppeared = eventAppeared;
- _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask);
- _subscriptionDropped = subscriptionDropped;
- _log = log;
+ CancellationToken cancellationToken = default
+ ) {
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _subscription = subscription;
+ _messages = messages;
+ _eventAppeared = eventAppeared;
+ _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask);
+ _subscriptionDropped = subscriptionDropped;
+ _log = log;
_subscriptionDroppedInvoked = 0;
- SubscriptionId = subscriptionId;
+ SubscriptionId = subscriptionId;
_log.LogDebug("Subscription {subscriptionId} confirmed.", SubscriptionId);
@@ -77,11 +89,14 @@ private async Task Subscribe() {
resolvedEvent.OriginalEvent.EventNumber,
resolvedEvent.OriginalEvent.Position
);
+
await _eventAppeared(this, resolvedEvent, _cts.Token).ConfigureAwait(false);
break;
+
case StreamMessage.AllStreamCheckpointReached (var position):
await _checkpointReached(this, position, _cts.Token)
.ConfigureAwait(false);
+
break;
}
} catch (Exception ex) when
@@ -105,6 +120,7 @@ await _checkpointReached(this, position, _cts.Token)
"Subscription {subscriptionId} was dropped because the subscriber made an error.",
SubscriptionId
);
+
SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);
return;
@@ -114,13 +130,18 @@ await _checkpointReached(this, position, _cts.Token)
ex.Status.Detail.Contains("Call canceled by the client.")) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
- _log.LogError(ex,
+ _log.LogError(
+ ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
diff --git a/src/EventStore.Client/Diagnostics/ActivityExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs
new file mode 100644
index 000000000..564aa98fe
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs
@@ -0,0 +1,45 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivityExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static TracingMetadata GetTracingMetadata(this Activity activity)
+ => new(activity.TraceId.ToString(), activity.SpanId.ToString());
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void SetActivityStatus(this Activity activity, ActivityStatus activityStatus) {
+ var (activityStatusCode, description, exception) = activityStatus;
+
+ var statusCode = activityStatusCode switch {
+ ActivityStatusCode.Error => "ERROR",
+ ActivityStatusCode.Ok => "OK",
+ _ => "UNSET"
+ };
+
+ activity.SetStatus(activityStatus.StatusCode, description);
+ activity.SetTag(TelemetryAttributes.OtelStatusCode, statusCode);
+ activity.SetTag(TelemetryAttributes.OtelStatusDescription, description);
+
+ if (exception != null)
+ activity.SetException(exception);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static void SetException(this Activity activity, Exception exception) {
+ var tags = new ActivityTagsCollection {
+ { TelemetryAttributes.ExceptionType, exception.GetType().Name },
+ { TelemetryAttributes.ExceptionMessage, $"{exception.Message} {exception.InnerException?.Message}" },
+ { TelemetryAttributes.ExceptionStacktrace, exception.StackTrace }
+ };
+
+ foreach (var tag in tags) {
+ activity.SetTag(tag.Key, tag.Value);
+ }
+
+ activity.AddEvent(new ActivityEvent(TelemetryAttributes.ExceptionEventName, DateTimeOffset.Now, tags));
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/ActivityStatus.cs b/src/EventStore.Client/Diagnostics/ActivityStatus.cs
new file mode 100644
index 000000000..9470f7b59
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityStatus.cs
@@ -0,0 +1,11 @@
+using System.Diagnostics;
+
+namespace EventStore.Client.Diagnostics;
+
+record ActivityStatus(ActivityStatusCode StatusCode, string? Description, Exception? Exception) {
+ public static ActivityStatus Ok(string? description = null)
+ => new(ActivityStatusCode.Ok, description, null);
+
+ public static ActivityStatus Error(Exception ex, string? description = null)
+ => new(ActivityStatusCode.Error, description, ex);
+}
diff --git a/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs
new file mode 100644
index 000000000..72bb05a47
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs
@@ -0,0 +1,66 @@
+using System.Diagnostics;
+using System.Net;
+using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics.Telemetry;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivityTagsCollectionExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithTagsFrom(
+ this ActivityTagsCollection tags, ChannelInfo? channelInfo, EventStoreClientSettings settings
+ ) {
+ ActivityTagsCollection? serverTags = null;
+
+ // Ensure consistent server.address attribute when connecting to multiple nodes via dns discovery
+ if (settings.ConnectivitySettings.GossipSeeds.Length == 1) {
+ var gossipSeed = settings.ConnectivitySettings.GossipSeeds[0];
+ serverTags = CreateServerAttributes(gossipSeed.GetHost(), gossipSeed.GetPort());
+ } else if (channelInfo != null) {
+ var authorityParts = channelInfo.Channel.Target.Split(':');
+ serverTags = CreateServerAttributes(authorityParts[0], int.Parse(authorityParts[1]));
+ }
+
+ return tags.WithTags(serverTags).WithTagsFrom(settings.DefaultCredentials);
+
+ ActivityTagsCollection CreateServerAttributes(string? host, int? port) => new() {
+ { TelemetryAttributes.ServerAddress, host },
+ { TelemetryAttributes.ServerPort, port }
+ };
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithTagsFrom(
+ this ActivityTagsCollection tags, UserCredentials? userCredentials
+ ) {
+ return tags.WithTag(TelemetryAttributes.DatabaseUser, userCredentials?.Username);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal static ActivityTagsCollection WithTags(this ActivityTagsCollection current, ActivityTagsCollection? tags)
+ => tags == null
+ ? current
+ : tags.Aggregate(current, (newTags, tag) => newTags.WithTag(tag.Key, tag.Value));
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static ActivityTagsCollection WithTag(this ActivityTagsCollection tags, string key, object? value) {
+ tags[key] = value;
+ return tags;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static string? GetHost(this EndPoint endpoint) =>
+ endpoint switch {
+ IPEndPoint ip => ip.Address.ToString(),
+ DnsEndPoint dns => dns.Host,
+ _ => null
+ };
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static int? GetPort(this EndPoint endpoint) =>
+ endpoint switch {
+ IPEndPoint ip => ip.Port,
+ DnsEndPoint dns => dns.Port,
+ _ => null
+ };
+}
diff --git a/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs
new file mode 100644
index 000000000..b6a7eb280
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs
@@ -0,0 +1,83 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Text.Json;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class EventMetadataExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ReadOnlySpan InjectTracingMetadata(
+ this ReadOnlyMemory rawCustomMetadata
+ ) {
+ if (Activity.Current == null) return rawCustomMetadata.Span;
+
+ try {
+ using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata);
+ var tracingMetadata = Activity.Current.GetTracingMetadata();
+
+ using var stream = new MemoryStream();
+ using var writer = new Utf8JsonWriter(stream);
+
+ writer.WriteStartObject();
+
+ foreach (var prop in customMetadataJson.RootElement.EnumerateObject())
+ prop.WriteTo(writer);
+
+ writer.WriteIfNotNull(TracingConstants.Metadata.TraceId, tracingMetadata.TraceId)
+ .WriteIfNotNull(TracingConstants.Metadata.SpanId, tracingMetadata.SpanId);
+
+ writer.WriteEndObject();
+ writer.Flush();
+
+ return stream.ToArray();
+ } catch (Exception) {
+ return rawCustomMetadata.Span;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityContext? RestoreTracingContext(this ReadOnlyMemory rawCustomMetadata) {
+ var (traceId, spanId) = rawCustomMetadata.ExtractTracingMetadata();
+
+ if (traceId == null || spanId == null)
+ return default;
+
+ try {
+ return new(
+ ActivityTraceId.CreateFromString(traceId.ToCharArray()),
+ ActivitySpanId.CreateFromString(spanId.ToCharArray()),
+ ActivityTraceFlags.Recorded,
+ isRemote: true
+ );
+ } catch (Exception) {
+ return default;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory rawCustomMetadata) {
+ try {
+ using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata);
+
+ return new TracingMetadata(
+ customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.TraceId).GetString(),
+ customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.SpanId).GetString()
+ );
+ } catch (Exception) {
+ return TracingMetadata.None();
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static Utf8JsonWriter WriteIfNotNull(
+ this Utf8JsonWriter jsonWriter, string key, string? value
+ ) {
+ if (string.IsNullOrEmpty(value)) return jsonWriter;
+
+ jsonWriter.WritePropertyName(key);
+ jsonWriter.WriteStringValue(value);
+
+ return jsonWriter;
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs
new file mode 100644
index 000000000..b3f13e8f0
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs
@@ -0,0 +1,45 @@
+using System.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+
+namespace EventStore.Client.Diagnostics;
+
+static class EventStoreClientDiagnostics {
+ static readonly ActivitySource _activitySource =
+ new ActivitySource(EventStoreClientInstrumentation.ActivitySourceName);
+
+ static readonly ActivityTagsCollection _defaultTags = [new(TelemetryAttributes.DatabaseSystem, "eventstoredb")];
+
+ public static Activity? StartActivity(
+ string operation, ActivityTagsCollection? tags, ActivityKind activityKind = default,
+ ActivityContext? activityContext = null
+ ) {
+ var activity = _activitySource.CreateActivity(
+ operation,
+ activityKind,
+ parentContext: activityContext ?? default,
+ new ActivityTagsCollection {
+ { TelemetryAttributes.DatabaseOperation, operation }
+ }
+ .WithTags(_defaultTags)
+ .WithTags(tags),
+ idFormat: ActivityIdFormat.W3C
+ );
+
+ return activity?.Start();
+ }
+
+ public static async ValueTask Trace(
+ Func> tracedOperation, string operationName, ActivityTagsCollection? tags = null
+ ) {
+ using var activity = StartActivity(operationName, tags, ActivityKind.Client);
+
+ try {
+ var res = await tracedOperation().ConfigureAwait(false);
+ activity?.SetActivityStatus(ActivityStatus.Ok());
+ return res;
+ } catch (Exception ex) {
+ activity?.SetActivityStatus(ActivityStatus.Error(ex));
+ throw;
+ }
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs
new file mode 100644
index 000000000..7ae9b6ba1
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs
@@ -0,0 +1,5 @@
+namespace EventStore.Client.Diagnostics;
+
+static class EventStoreClientInstrumentation {
+ public const string ActivitySourceName = "eventstoredb.client";
+}
diff --git a/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs
new file mode 100644
index 000000000..5b63d4e36
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs
@@ -0,0 +1,29 @@
+namespace EventStore.Client.Diagnostics.Telemetry;
+
+// The attributes below match the specification of v1.24.0 of the Open Telemetry semantic conventions.
+// Some attributes are ignored where not required or relevant.
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/general/trace.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/database/database-spans.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/exceptions/exceptions-spans.md
+static class TelemetryAttributes {
+ public const string DatabaseUser = "db.user";
+ public const string DatabaseSystem = "db.system";
+ public const string DatabaseOperation = "db.operation";
+
+ public const string ServerAddress = "server.address";
+ public const string ServerPort = "server.port";
+
+ public const string ExceptionEventName = "exception";
+ public const string ExceptionType = "exception.type";
+ public const string ExceptionMessage = "exception.message";
+ public const string ExceptionStacktrace = "exception.stacktrace";
+
+ public const string OtelStatusCode = "otel.status_code";
+ public const string OtelStatusDescription = "otel.status_description";
+
+ // Custom attributes
+ public const string EventStoreStream = "db.eventstoredb.stream";
+ public const string EventStoreSubscriptionId = "db.eventstoredb.subscription.id";
+ public const string EventStoreEventId = "db.eventstoredb.event.id";
+ public const string EventStoreEventType = "db.eventstoredb.event.type";
+}
diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs
new file mode 100644
index 000000000..2c2d22cf3
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs
@@ -0,0 +1,13 @@
+namespace EventStore.Client.Diagnostics.Tracing;
+
+static class TracingConstants {
+ public static class Metadata {
+ public const string TraceId = "$traceId";
+ public const string SpanId = "$spanId";
+ }
+
+ public static class Operations {
+ public const string Append = "streams.append";
+ public const string Subscribe = "streams.subscribe";
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs
new file mode 100644
index 000000000..f907cc49d
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs
@@ -0,0 +1,5 @@
+namespace EventStore.Client.Diagnostics.Tracing;
+
+record TracingMetadata(string? TraceId, string? SpanId) {
+ public static TracingMetadata None() => new(null, null);
+}
diff --git a/src/EventStore.Client/EventStore.Client.csproj b/src/EventStore.Client/EventStore.Client.csproj
index ddf36c6af..8c9ffaaf2 100644
--- a/src/EventStore.Client/EventStore.Client.csproj
+++ b/src/EventStore.Client/EventStore.Client.csproj
@@ -8,6 +8,7 @@
+
diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs
index 39f579fcc..2b4a5b2f6 100644
--- a/src/EventStore.Client/EventStoreClientBase.cs
+++ b/src/EventStore.Client/EventStoreClientBase.cs
@@ -1,10 +1,7 @@
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
using EventStore.Client.Interceptors;
using Grpc.Core;
using Grpc.Core.Interceptors;
+using Enum = System.Enum;
namespace EventStore.Client {
///
@@ -13,28 +10,29 @@ namespace EventStore.Client {
public abstract class EventStoreClientBase :
IDisposable, // for grpc.net we can dispose synchronously, but not for grpc.core
IAsyncDisposable {
-
- private readonly Dictionary> _exceptionMap;
- private readonly CancellationTokenSource _cts;
- private readonly ChannelCache _channelCache;
+ private readonly Dictionary> _exceptionMap;
+ private readonly CancellationTokenSource _cts;
+ private readonly ChannelCache _channelCache;
private readonly SharingProvider _channelInfoProvider;
- private readonly Lazy _httpFallback;
-
+ private readonly Lazy _httpFallback;
+
/// The name of the connection.
public string ConnectionName { get; }
-
+
/// The .
protected EventStoreClientSettings Settings { get; }
/// Constructs a new .
- protected EventStoreClientBase(EventStoreClientSettings? settings,
- Dictionary> exceptionMap) {
- Settings = settings ?? new EventStoreClientSettings();
+ protected EventStoreClientBase(
+ EventStoreClientSettings? settings,
+ Dictionary> exceptionMap
+ ) {
+ Settings = settings ?? new EventStoreClientSettings();
_exceptionMap = exceptionMap;
- _cts = new CancellationTokenSource();
+ _cts = new CancellationTokenSource();
_channelCache = new(Settings);
_httpFallback = new Lazy(() => new HttpFallback(Settings));
-
+
ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}";
var channelSelector = new ChannelSelector(Settings, _channelCache);
@@ -43,17 +41,18 @@ protected EventStoreClientBase(EventStoreClientSettings? settings,
GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token),
factoryRetryDelay: Settings.ConnectivitySettings.DiscoveryInterval,
initialInput: ReconnectionRequired.Rediscover.Instance,
- loggerFactory: Settings.LoggerFactory);
+ loggerFactory: Settings.LoggerFactory
+ );
}
-
+
// Select a channel and query its capabilities. This is an expensive call that
// we don't want to do often.
private async Task GetChannelInfoExpensive(
ReconnectionRequired reconnectionRequired,
Action onReconnectionRequired,
IChannelSelector channelSelector,
- CancellationToken cancellationToken) {
-
+ CancellationToken cancellationToken
+ ) {
var channel = reconnectionRequired switch {
ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken)
.ConfigureAwait(false),
@@ -78,11 +77,10 @@ private async Task GetChannelInfoExpensive(
return new(channel, caps, invoker);
}
-
+
/// Gets the current channel info.
protected async ValueTask GetChannelInfo(CancellationToken cancellationToken) =>
await _channelInfoProvider.CurrentAsync.WithCancellation(cancellationToken).ConfigureAwait(false);
-
///
/// Only exists so that we can manually trigger rediscovery in the tests
@@ -95,20 +93,30 @@ internal Task RediscoverAsync() {
}
/// Returns the result of an HTTP Get request based on the client settings.
- protected async Task HttpGet(string path, Action onNotFound, ChannelInfo channelInfo,
- TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) {
-
+ protected async Task HttpGet(
+ string path, Action onNotFound, ChannelInfo channelInfo,
+ TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) {
return await _httpFallback.Value
.HttpGetAsync(path, channelInfo, deadline, userCredentials, onNotFound, cancellationToken)
.ConfigureAwait(false);
}
/// Executes an HTTP Post request based on the client settings.
- protected async Task HttpPost(string path, string query, Action onNotFound, ChannelInfo channelInfo,
- TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) {
-
+ protected async Task HttpPost(
+ string path, string query, Action onNotFound, ChannelInfo channelInfo,
+ TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) {
await _httpFallback.Value
- .HttpPostAsync(path, query, channelInfo, deadline, userCredentials, onNotFound, cancellationToken)
+ .HttpPostAsync(
+ path,
+ query,
+ channelInfo,
+ deadline,
+ userCredentials,
+ onNotFound,
+ cancellationToken
+ )
.ConfigureAwait(false);
}
@@ -118,7 +126,7 @@ public virtual void Dispose() {
_cts.Cancel();
_cts.Dispose();
_channelCache.Dispose();
-
+
if (_httpFallback.IsValueCreated) {
_httpFallback.Value.Dispose();
}
diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
index 989a6b6bf..6cd5b813d 100644
--- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
+++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
@@ -5,7 +5,8 @@ namespace EventStore.Client.Streams.Tests.Append;
[Trait("Category", "Target:Stream")]
[Trait("Category", "Operation:Append")]
-public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) {
+public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture)
+ : EventStoreTests(output, fixture) {
public static IEnumerable