Skip to content

Commit

Permalink
Do nothing with rpc exception
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 13, 2024
1 parent ec5b31c commit fcebd76
Showing 1 changed file with 41 additions and 34 deletions.
75 changes: 41 additions & 34 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,40 @@ public async Task<IWriteResult> AppendToStreamAsync(
}

private async ValueTask<IWriteResult> AppendToStreamInternal(
CallInvoker callInvoker,
AppendReq header,
IEnumerable<EventData> eventData,
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
CallInvoker callInvoker,
AppendReq header,
IEnumerable<EventData> eventData,
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

IWriteResult writeResult;
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
_log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.",
header.Options.StreamIdentifier, e.EventId, e.Type);
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
},
}
).ConfigureAwait(false);

try {
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
},
}
).ConfigureAwait(false);
}
} catch (RpcException) {
// Do nothing so that RpcExceptions propagate to the call.ResponseAsync and be translated by the TypedInterceptor
}

await call.RequestStream.CompleteAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -152,11 +155,13 @@ await call.RequestStream.WriteAsync(
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
var actualStreamRevision =
response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase
.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
Expand All @@ -167,7 +172,8 @@ await call.RequestStream.WriteAsync(

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
Expand Down Expand Up @@ -215,6 +221,7 @@ await call.RequestStream.WriteAsync(
}
}


return writeResult;
}

Expand Down

0 comments on commit fcebd76

Please sign in to comment.