From 02b4ef98f369456d2e6ccd6435400e52481b17fb Mon Sep 17 00:00:00 2001 From: Joseph Cummings Date: Mon, 5 Aug 2024 12:36:44 +0100 Subject: [PATCH] Fix tracing injection when event is non-JSON --- .../EventStoreClient.Append.cs | 176 ++++++++++-------- .../Diagnostics/ActivitySourceExtensions.cs | 132 ++++++------- .../Diagnostics/EventMetadataExtensions.cs | 19 +- .../EventStore.Client.csproj | 2 +- ...ubscriptionsTracingInstrumentationTests.cs | 81 ++++++++ .../StreamsTracingInstrumentationTests.cs | 28 ++- .../Fixtures/EventStoreFixture.Helpers.cs | 9 +- 7 files changed, 287 insertions(+), 160 deletions(-) diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index cc82e57a0..ec719c4fc 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -40,21 +40,21 @@ public async Task AppendToStreamAsync( _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision); var task = userCredentials is null && await BatchAppender.IsUsable().ConfigureAwait(false) - ? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) - : AppendToStreamInternal( - await GetChannelInfo(cancellationToken).ConfigureAwait(false), - new AppendReq { - Options = new() { - StreamIdentifier = streamName, - Revision = expectedRevision - } - }, - eventData, - options, - deadline, - userCredentials, - cancellationToken - ); + ? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken) + : AppendToStreamInternal( + await GetChannelInfo(cancellationToken).ConfigureAwait(false), + new AppendReq { + Options = new() { + StreamIdentifier = streamName, + Revision = expectedRevision + } + }, + eventData, + options, + deadline, + userCredentials, + cancellationToken + ); return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options); } @@ -104,7 +104,7 @@ await GetChannelInfo(cancellationToken).ConfigureAwait(false), return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions); } - ValueTask AppendToStreamInternal( + ValueTask AppendToStreamInternal( ChannelInfo channelInfo, AppendReq header, IEnumerable eventData, @@ -113,28 +113,32 @@ ValueTask AppendToStreamInternal( UserCredentials? userCredentials, CancellationToken cancellationToken ) { - var tags = new ActivityTagsCollection() - .WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8()) - .WithGrpcChannelServerTags(channelInfo) - .WithClientSettingsServerTags(Settings) - .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username); + var tags = new ActivityTagsCollection() + .WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8()) + .WithGrpcChannelServerTags(channelInfo) + .WithClientSettingsServerTags(Settings) + .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username); - return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags); + return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags); async ValueTask Operation() { using var call = new StreamsClient(channelInfo.CallInvoker) - .Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)); + .Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)); await call.RequestStream - .WriteAsync(header) - .ConfigureAwait(false); + .WriteAsync(header) + .ConfigureAwait(false); foreach (var e in eventData) { var appendReq = new AppendReq { ProposedMessage = new() { - Id = e.EventId.ToDto(), - Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)), + Id = e.EventId.ToDto(), + Data = ByteString.CopyFrom(e.Data.Span), + CustomMetadata = ByteString.CopyFrom( + e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson + ? e.Metadata.InjectTracingContext(Activity.Current) + : e.Metadata.Span + ), Metadata = { { Constants.Metadata.Type, e.Type }, { Constants.Metadata.ContentType, e.ContentType } @@ -159,7 +163,7 @@ await call.RequestStream } } - IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { + IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream ? StreamRevision.None : new StreamRevision(response.Success.CurrentRevision); @@ -178,13 +182,13 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { return new SuccessResult(currentRevision, position); } - IWriteResult HandleWrongExpectedRevision( + IWriteResult HandleWrongExpectedRevision( AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions ) { - var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision - ? new StreamRevision(response.WrongExpectedVersion.CurrentRevision) - : StreamRevision.None; - + var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision + ? new StreamRevision(response.WrongExpectedVersion.CurrentRevision) + : StreamRevision.None; + _log.LogDebug( "Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}", header.Options.StreamIdentifier, @@ -201,12 +205,12 @@ IWriteResult HandleWrongExpectedRevision( ); } - var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch { - ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any, - ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream, - ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists, - _ => StreamState.Any - }; + var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch { + ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any, + ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream, + ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists, + _ => StreamState.Any + }; throw new WrongExpectedVersionException( header.Options.StreamIdentifier!, @@ -226,16 +230,16 @@ IWriteResult HandleWrongExpectedRevision( ); } - class StreamAppender : IDisposable { - readonly EventStoreClientSettings _settings; - readonly CancellationToken _cancellationToken; - readonly Action _onException; - readonly Channel _channel; - readonly ConcurrentDictionary> _pendingRequests; - readonly TaskCompletionSource _isUsable; + class StreamAppender : IDisposable { + readonly EventStoreClientSettings _settings; + readonly CancellationToken _cancellationToken; + readonly Action _onException; + readonly Channel _channel; + readonly ConcurrentDictionary> _pendingRequests; + readonly TaskCompletionSource _isUsable; - ChannelInfo? _channelInfo; - AsyncDuplexStreamingCall? _call; + ChannelInfo? _channelInfo; + AsyncDuplexStreamingCall? _call; public StreamAppender( EventStoreClientSettings settings, @@ -255,8 +259,8 @@ Action onException public ValueTask Append( string streamName, StreamRevision expectedStreamPosition, - IEnumerable events, TimeSpan? timeoutAfter, - CancellationToken cancellationToken = default + IEnumerable events, TimeSpan? timeoutAfter, + CancellationToken cancellationToken = default ) => AppendInternal( BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter), @@ -266,8 +270,8 @@ public ValueTask Append( public ValueTask Append( string streamName, StreamState expectedStreamState, - IEnumerable events, TimeSpan? timeoutAfter, - CancellationToken cancellationToken = default + IEnumerable events, TimeSpan? timeoutAfter, + CancellationToken cancellationToken = default ) => AppendInternal( BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter), @@ -277,21 +281,21 @@ public ValueTask Append( public Task IsUsable() => _isUsable.Task; - ValueTask AppendInternal( + ValueTask AppendInternal( BatchAppendReq.Types.Options options, - IEnumerable events, - CancellationToken cancellationToken + IEnumerable events, + CancellationToken cancellationToken ) { - var tags = new ActivityTagsCollection() - .WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8()) - .WithGrpcChannelServerTags(_channelInfo) - .WithClientSettingsServerTags(_settings) - .WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username); - + var tags = new ActivityTagsCollection() + .WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8()) + .WithGrpcChannelServerTags(_channelInfo) + .WithClientSettingsServerTags(_settings) + .WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username); + return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation( Operation, TracingConstants.Operations.Append, - tags + tags ); async ValueTask Operation() { @@ -300,9 +304,10 @@ async ValueTask Operation() { var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource()); try { - foreach (var appendRequest in GetRequests(events, options, correlationId)) - await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); - } catch (ChannelClosedException ex) { + foreach (var appendRequest in GetRequests(events, options, correlationId)) + await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); + } + catch (ChannelClosedException ex) { // channel is closed, our tcs won't necessarily get completed, don't wait for it. throw ex.InnerException ?? ex; } @@ -311,7 +316,7 @@ async ValueTask Operation() { } } - async Task Duplex(ValueTask channelInfoTask) { + async Task Duplex(ValueTask channelInfoTask) { try { _channelInfo = await channelInfoTask.ConfigureAwait(false); if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) { @@ -332,7 +337,8 @@ async Task Duplex(ValueTask channelInfoTask) { _ = Task.Run(Receive, _cancellationToken); _isUsable.TrySetResult(true); - } catch (Exception ex) { + } + catch (Exception ex) { _isUsable.TrySetException(ex); _onException(ex); } @@ -342,8 +348,8 @@ async Task Duplex(ValueTask channelInfoTask) { async Task Send() { if (_call is null) return; - await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) - await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); + await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); await _call.RequestStream.CompleteAsync().ConfigureAwait(false); } @@ -359,11 +365,13 @@ async Task Receive() { try { writeResult.TrySetResult(response.ToWriteResult()); - } catch (Exception ex) { + } + catch (Exception ex) { writeResult.TrySetException(ex); } } - } catch (Exception ex) { + } + catch (Exception ex) { // signal that no tcs added to _pendingRequests after this point will necessarily complete _channel.Writer.TryComplete(ex); @@ -376,17 +384,21 @@ async Task Receive() { } } - IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) { - var batchSize = 0; - var first = true; - var correlationIdDto = correlationId.ToDto(); - var proposedMessages = new List(); + IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) { + var batchSize = 0; + var first = true; + var correlationIdDto = correlationId.ToDto(); + var proposedMessages = new List(); foreach (var eventData in events) { var proposedMessage = new BatchAppendReq.Types.ProposedMessage { - Data = ByteString.CopyFrom(eventData.Data.Span), - CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)), - Id = eventData.EventId.ToDto(), + Data = ByteString.CopyFrom(eventData.Data.Span), + CustomMetadata = ByteString.CopyFrom( + eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson + ? eventData.Metadata.InjectTracingContext(Activity.Current) + : eventData.Metadata.Span + ), + Id = eventData.EventId.ToDto(), Metadata = { { Constants.Metadata.Type, eventData.Type }, { Constants.Metadata.ContentType, eventData.ContentType } @@ -396,7 +408,7 @@ IEnumerable GetRequests(IEnumerable events, BatchAppe proposedMessages.Add(proposedMessage); if ((batchSize += proposedMessage.CalculateSize()) < _settings.OperationOptions.BatchAppendSize) - continue; + continue; yield return new BatchAppendReq { ProposedMessages = { proposedMessages }, @@ -423,4 +435,4 @@ public void Dispose() { } } } -} +} \ No newline at end of file diff --git a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs index 02af67eaa..ef0be085f 100644 --- a/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs +++ b/src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs @@ -6,73 +6,77 @@ namespace EventStore.Client.Diagnostics; static class ActivitySourceExtensions { - public static async ValueTask TraceClientOperation(this ActivitySource source, - Func> tracedOperation, - string operationName, - ActivityTagsCollection? tags = null - ) { - using var activity = StartActivity(source, operationName, ActivityKind.Client, tags, Activity.Current?.Context); + public static async ValueTask TraceClientOperation( + this ActivitySource source, + Func> tracedOperation, + string operationName, + ActivityTagsCollection? tags = null + ) { + using var activity = StartActivity(source, operationName, ActivityKind.Client, tags, Activity.Current?.Context); - try { - var res = await tracedOperation().ConfigureAwait(false); - activity?.StatusOk(); - return res; - } - catch (Exception ex) { - activity?.StatusError(ex); - throw; - } - } + try { + var res = await tracedOperation().ConfigureAwait(false); + activity?.StatusOk(); + return res; + } + catch (Exception ex) { + activity?.StatusError(ex); + throw; + } + } - public static void TraceSubscriptionEvent( - this ActivitySource source, - string? subscriptionId, - ResolvedEvent resolvedEvent, - ChannelInfo channelInfo, - EventStoreClientSettings settings, - UserCredentials? userCredentials - ) { - if (source.HasNoActiveListeners()) - return; + public static void TraceSubscriptionEvent( + this ActivitySource source, + string? subscriptionId, + ResolvedEvent resolvedEvent, + ChannelInfo channelInfo, + EventStoreClientSettings settings, + UserCredentials? userCredentials + ) { + if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson) + return; - var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext(); + if (source.HasNoActiveListeners()) + return; - if (parentContext is null) return; + var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext(); - var tags = new ActivityTagsCollection() - .WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId) - .WithOptionalTag(TelemetryTags.EventStore.SubscriptionId, subscriptionId) - .WithRequiredTag(TelemetryTags.EventStore.EventId, resolvedEvent.OriginalEvent.EventId.ToString()) - .WithRequiredTag(TelemetryTags.EventStore.EventType, resolvedEvent.OriginalEvent.EventType) - // Ensure consistent server.address attribute when connecting to cluster via dns discovery - .WithGrpcChannelServerTags(channelInfo) - .WithClientSettingsServerTags(settings) - .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username); - - StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose(); - } - - static Activity? StartActivity( - this ActivitySource source, - string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null - ) { - if (source.HasNoActiveListeners()) - return null; - - (tags ??= new ActivityTagsCollection()) - .WithRequiredTag(TelemetryTags.Database.System, "eventstoredb") - .WithRequiredTag(TelemetryTags.Database.Operation, operationName); - - return source - .CreateActivity( - operationName, - activityKind, - parentContext ?? default, - tags, - idFormat: ActivityIdFormat.W3C - ) - ?.Start(); - } - - static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners(); + if (parentContext is null) return; + + var tags = new ActivityTagsCollection() + .WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId) + .WithOptionalTag(TelemetryTags.EventStore.SubscriptionId, subscriptionId) + .WithRequiredTag(TelemetryTags.EventStore.EventId, resolvedEvent.OriginalEvent.EventId.ToString()) + .WithRequiredTag(TelemetryTags.EventStore.EventType, resolvedEvent.OriginalEvent.EventType) + // Ensure consistent server.address attribute when connecting to cluster via dns discovery + .WithGrpcChannelServerTags(channelInfo) + .WithClientSettingsServerTags(settings) + .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username); + + StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose(); + } + + static Activity? StartActivity( + this ActivitySource source, + string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null + ) { + if (source.HasNoActiveListeners()) + return null; + + (tags ??= new ActivityTagsCollection()) + .WithRequiredTag(TelemetryTags.Database.System, "eventstoredb") + .WithRequiredTag(TelemetryTags.Database.Operation, operationName); + + return source + .CreateActivity( + operationName, + activityKind, + parentContext ?? default, + tags, + idFormat: ActivityIdFormat.W3C + ) + ?.Start(); + } + + static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners(); } \ No newline at end of file diff --git a/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs index f5af53005..d97861c5a 100644 --- a/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs +++ b/src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs @@ -21,15 +21,20 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory e return TracingMetadata.None; var reader = new Utf8JsonReader(eventMetadata.Span); - if (!JsonDocument.TryParseValue(ref reader, out var doc)) - return TracingMetadata.None; - - using (doc) { - if (!doc.RootElement.TryGetProperty(TracingConstants.Metadata.TraceId, out var traceId) - || !doc.RootElement.TryGetProperty(TracingConstants.Metadata.SpanId, out var spanId)) + try { + if (!JsonDocument.TryParseValue(ref reader, out var doc)) return TracingMetadata.None; - return new TracingMetadata(traceId.GetString(), spanId.GetString()); + using (doc) { + if (!doc.RootElement.TryGetProperty(TracingConstants.Metadata.TraceId, out var traceId) + || !doc.RootElement.TryGetProperty(TracingConstants.Metadata.SpanId, out var spanId)) + return TracingMetadata.None; + + return new TracingMetadata(traceId.GetString(), spanId.GetString()); + } + } + catch (Exception) { + return TracingMetadata.None; } } diff --git a/src/EventStore.Client/EventStore.Client.csproj b/src/EventStore.Client/EventStore.Client.csproj index e455b3dce..61fc75243 100644 --- a/src/EventStore.Client/EventStore.Client.csproj +++ b/src/EventStore.Client/EventStore.Client.csproj @@ -29,7 +29,7 @@ - + diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs index 9a2e4ec10..904f74f70 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs @@ -71,4 +71,85 @@ async Task Subscribe() { } } } + + [Fact] + public async Task PersistentSubscriptionDoesNotThrowWhenInstrumentedWithTracingAndReceivesNonJsonEvents() { + var stream = Fixture.GetStreamName(); + var events = Fixture.CreateTestEvents( + 2, + metadata: Fixture.CreateTestJsonMetadata(), + contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream + ).ToArray(); + + var groupName = $"{stream}-group"; + await Fixture.Subscriptions.CreateToStreamAsync( + stream, + groupName, + new() + ); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + events + ); + + await Subscribe().WithTimeout(); + + return; + + async Task Subscribe() { + await using var subscription = Fixture.Subscriptions.SubscribeToStream(stream, groupName); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + + var eventsAppeared = 0; + while (await enumerator.MoveNextAsync()) { + if (enumerator.Current is PersistentSubscriptionMessage.Event(_, _)) + eventsAppeared++; + + if (eventsAppeared >= events.Length) + return; + } + } + } + + [Fact] + public async Task PersistentSubscriptionDoesNotThrowWhenInstrumentedWithTracingAndReceivesEventsWithInvalidJsonMetadata() { + var stream = Fixture.GetStreamName(); + var events = Fixture.CreateTestEvents( + 2, + metadata: "clearlynotavalidjsonobject"u8.ToArray() + ).ToArray(); + + var groupName = $"{stream}-group"; + await Fixture.Subscriptions.CreateToStreamAsync( + stream, + groupName, + new() + ); + + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + events + ); + + await Subscribe().WithTimeout(); + + return; + + async Task Subscribe() { + await using var subscription = Fixture.Subscriptions.SubscribeToStream(stream, groupName); + await using var enumerator = subscription.Messages.GetAsyncEnumerator(); + + var eventsAppeared = 0; + while (await enumerator.MoveNextAsync()) { + if (enumerator.Current is PersistentSubscriptionMessage.Event(_, _)) + eventsAppeared++; + + if (eventsAppeared >= events.Length) + return; + } + } + } } \ No newline at end of file diff --git a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs index bd60c279c..b208e4abe 100644 --- a/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs +++ b/test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using EventStore.Client.Diagnostics; using EventStore.Diagnostics.Tracing; @@ -129,8 +130,8 @@ await Fixture.Streams.AppendToStreamAsync( [Fact] public async Task TracingContextIsNotInjectedWhenUserMetadataIsNotValidJsonObject() { - var stream = Fixture.GetStreamName(); - + var stream = Fixture.GetStreamName(); + var inputMetadata = "clearlynotavalidjsonobject"u8.ToArray(); await Fixture.Streams.AppendToStreamAsync( stream, @@ -145,4 +146,27 @@ await Fixture.Streams.AppendToStreamAsync( var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray(); outputMetadata.ShouldBe(inputMetadata); } + + [Fact] + public async Task TracingContextIsNotInjectedWhenEventIsNotJsonButHasJsonMetadata() { + var stream = Fixture.GetStreamName(); + + var inputMetadata = Fixture.CreateTestJsonMetadata().ToArray(); + await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.NoStream, + Fixture.CreateTestEvents( + metadata: inputMetadata, + contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream + ) + ); + + var readResult = await Fixture.Streams + .ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start) + .ToListAsync(); + + var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray(); + var test = JsonSerializer.Deserialize(outputMetadata); + outputMetadata.ShouldBe(inputMetadata); + } } \ No newline at end of file diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs index a75383055..9fdf72d4a 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs @@ -19,8 +19,8 @@ public ReadOnlyMemory CreateMetadataOfSize(int metadataSize) => public ReadOnlyMemory CreateTestJsonMetadata() => "{\"Foo\": \"Bar\"}"u8.ToArray(); - public IEnumerable CreateTestEvents(int count = 1, string? type = null, ReadOnlyMemory? metadata = null) => - Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadata)); + public IEnumerable CreateTestEvents(int count = 1, string? type = null, ReadOnlyMemory? metadata = null, string? contentType = null) => + Enumerable.Range(0, count).Select(index => CreateTestEvent(index, type ?? TestEventType, metadata, contentType)); public IEnumerable CreateTestEventsThatThrowsException() { // Ensure initial IEnumerator.Current does not throw @@ -32,12 +32,13 @@ public IEnumerable CreateTestEventsThatThrowsException() { protected static EventData CreateTestEvent(int index) => CreateTestEvent(index, TestEventType); - protected static EventData CreateTestEvent(int index, string type, ReadOnlyMemory? metadata = null) => + protected static EventData CreateTestEvent(int index, string type, ReadOnlyMemory? metadata = null, string? contentType = null) => new( Uuid.NewUuid(), type, Encoding.UTF8.GetBytes($$"""{"x":{{index}}}"""), - metadata + metadata, + contentType ?? "application/json" ); public async Task CreateTestUser(bool withoutGroups = true, bool useUserCredentials = false) {