Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tracing injection when event is non-JSON #317

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 94 additions & 82 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ public async Task<IWriteResult> AppendToStreamAsync(
_log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision);

var task = userCredentials is null && await BatchAppender.IsUsable().ConfigureAwait(false)
? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new() {
StreamIdentifier = streamName,
Revision = expectedRevision
}
},
eventData,
options,
deadline,
userCredentials,
cancellationToken
);
? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new() {
StreamIdentifier = streamName,
Revision = expectedRevision
}
},
eventData,
options,
deadline,
userCredentials,
cancellationToken
);

return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options);
}
Expand Down Expand Up @@ -104,7 +104,7 @@ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions);
}

ValueTask<IWriteResult> AppendToStreamInternal(
ValueTask<IWriteResult> AppendToStreamInternal(
ChannelInfo channelInfo,
AppendReq header,
IEnumerable<EventData> eventData,
Expand All @@ -113,28 +113,32 @@ ValueTask<IWriteResult> AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);

async ValueTask<IWriteResult> Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
.Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
.Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));

await call.RequestStream
.WriteAsync(header)
.ConfigureAwait(false);
.WriteAsync(header)
.ConfigureAwait(false);

foreach (var e in eventData) {
var appendReq = new AppendReq {
ProposedMessage = new() {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)),
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(
e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? e.Metadata.InjectTracingContext(Activity.Current)
: e.Metadata.Span
),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
Expand All @@ -159,7 +163,7 @@ await call.RequestStream
}
}

IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);
Expand All @@ -178,13 +182,13 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
return new SuccessResult(currentRevision, position);
}

IWriteResult HandleWrongExpectedRevision(
IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions
) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
Expand All @@ -201,12 +205,12 @@ IWriteResult HandleWrongExpectedRevision(
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any,
ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream,
ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists,
_ => StreamState.Any
};
var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any,
ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream,
ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
Expand All @@ -226,16 +230,16 @@ IWriteResult HandleWrongExpectedRevision(
);
}

class StreamAppender : IDisposable {
readonly EventStoreClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action<Exception> _onException;
readonly Channel<BatchAppendReq> _channel;
readonly ConcurrentDictionary<Uuid, TaskCompletionSource<IWriteResult>> _pendingRequests;
readonly TaskCompletionSource<bool> _isUsable;
class StreamAppender : IDisposable {
readonly EventStoreClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action<Exception> _onException;
readonly Channel<BatchAppendReq> _channel;
readonly ConcurrentDictionary<Uuid, TaskCompletionSource<IWriteResult>> _pendingRequests;
readonly TaskCompletionSource<bool> _isUsable;

ChannelInfo? _channelInfo;
AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>? _call;
ChannelInfo? _channelInfo;
AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>? _call;

public StreamAppender(
EventStoreClientSettings settings,
Expand All @@ -255,8 +259,8 @@ Action<Exception> onException

public ValueTask<IWriteResult> Append(
string streamName, StreamRevision expectedStreamPosition,
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
Expand All @@ -266,8 +270,8 @@ public ValueTask<IWriteResult> Append(

public ValueTask<IWriteResult> Append(
string streamName, StreamState expectedStreamState,
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
Expand All @@ -277,21 +281,21 @@ public ValueTask<IWriteResult> Append(

public Task<bool> IsUsable() => _isUsable.Task;

ValueTask<IWriteResult> AppendInternal(
ValueTask<IWriteResult> AppendInternal(
BatchAppendReq.Types.Options options,
IEnumerable<EventData> events,
CancellationToken cancellationToken
IEnumerable<EventData> events,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(
Operation,
TracingConstants.Operations.Append,
tags
tags
);

async ValueTask<IWriteResult> Operation() {
Expand All @@ -300,9 +304,10 @@ async ValueTask<IWriteResult> Operation() {
var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource<IWriteResult>());

try {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
} catch (ChannelClosedException ex) {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException ex) {
// channel is closed, our tcs won't necessarily get completed, don't wait for it.
throw ex.InnerException ?? ex;
}
Expand All @@ -311,7 +316,7 @@ async ValueTask<IWriteResult> Operation() {
}
}

async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
try {
_channelInfo = await channelInfoTask.ConfigureAwait(false);
if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) {
Expand All @@ -332,7 +337,8 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
_ = Task.Run(Receive, _cancellationToken);

_isUsable.TrySetResult(true);
} catch (Exception ex) {
}
catch (Exception ex) {
_isUsable.TrySetException(ex);
_onException(ex);
}
Expand All @@ -342,8 +348,8 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
async Task Send() {
if (_call is null) return;

await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);

await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
}
Expand All @@ -359,11 +365,13 @@ async Task Receive() {

try {
writeResult.TrySetResult(response.ToWriteResult());
} catch (Exception ex) {
}
catch (Exception ex) {
writeResult.TrySetException(ex);
}
}
} catch (Exception ex) {
}
catch (Exception ex) {
// signal that no tcs added to _pendingRequests after this point will necessarily complete
_channel.Writer.TryComplete(ex);

Expand All @@ -376,17 +384,21 @@ async Task Receive() {
}
}

IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
var proposedMessages = new List<BatchAppendReq.Types.ProposedMessage>();
IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
var proposedMessages = new List<BatchAppendReq.Types.ProposedMessage>();

foreach (var eventData in events) {
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)),
Id = eventData.EventId.ToDto(),
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(
eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? eventData.Metadata.InjectTracingContext(Activity.Current)
: eventData.Metadata.Span
),
Id = eventData.EventId.ToDto(),
Metadata = {
{ Constants.Metadata.Type, eventData.Type },
{ Constants.Metadata.ContentType, eventData.ContentType }
Expand All @@ -396,7 +408,7 @@ IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppe
proposedMessages.Add(proposedMessage);

if ((batchSize += proposedMessage.CalculateSize()) < _settings.OperationOptions.BatchAppendSize)
continue;
continue;

yield return new BatchAppendReq {
ProposedMessages = { proposedMessages },
Expand All @@ -423,4 +435,4 @@ public void Dispose() {
}
}
}
}
}
Loading
Loading