diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 4aed95126..754e472aa 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -92,37 +92,40 @@ public async Task AppendToStreamAsync( } private async ValueTask AppendToStreamInternal( - CallInvoker callInvoker, - AppendReq header, - IEnumerable eventData, - EventStoreClientOperationOptions operationOptions, - TimeSpan? deadline, - UserCredentials? userCredentials, - CancellationToken cancellationToken - ) { + CallInvoker callInvoker, + AppendReq header, + IEnumerable eventData, + EventStoreClientOperationOptions operationOptions, + TimeSpan? deadline, + UserCredentials? userCredentials, + CancellationToken cancellationToken + ) { using var call = new Streams.Streams.StreamsClient(callInvoker).Append( - EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) - ); + EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); IWriteResult writeResult; - await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - - foreach (var e in eventData) { - _log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.", - header.Options.StreamIdentifier, e.EventId, e.Type); - await call.RequestStream.WriteAsync( - new AppendReq { - ProposedMessage = new AppendReq.Types.ProposedMessage { - Id = e.EventId.ToDto(), - Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), - Metadata = { - { Constants.Metadata.Type, e.Type }, - { Constants.Metadata.ContentType, e.ContentType } - } - }, - } - ).ConfigureAwait(false); + + try { + await call.RequestStream.WriteAsync(header).ConfigureAwait(false); + + foreach (var e in eventData) { + await call.RequestStream.WriteAsync( + new AppendReq { + ProposedMessage = new AppendReq.Types.ProposedMessage { + Id = e.EventId.ToDto(), + Data = ByteString.CopyFrom(e.Data.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), + Metadata = { + { Constants.Metadata.Type, e.Type }, + { Constants.Metadata.ContentType, e.ContentType } + } + }, + } + ).ConfigureAwait(false); + } + } catch (RpcException) { + // Do nothing so that RpcExceptions propagate to the call.ResponseAsync and be translated by the TypedInterceptor } await call.RequestStream.CompleteAsync().ConfigureAwait(false); @@ -152,11 +155,13 @@ await call.RequestStream.WriteAsync( ); } else { if (response.WrongExpectedVersion != null) { - var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch { - AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream => - StreamRevision.None, - _ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision) - }; + var actualStreamRevision = + response.WrongExpectedVersion.CurrentRevisionOptionCase switch { + AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase + .CurrentNoStream => + StreamRevision.None, + _ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision) + }; _log.LogDebug( "Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}", @@ -167,7 +172,8 @@ await call.RequestStream.WriteAsync( if (operationOptions.ThrowOnAppendFailure) { if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types - .WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) { + .WrongExpectedVersion.ExpectedRevisionOptionOneofCase + .ExpectedRevision) { throw new WrongExpectedVersionException( header.Options.StreamIdentifier!, new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), @@ -215,6 +221,7 @@ await call.RequestStream.WriteAsync( } } + return writeResult; }