Skip to content

Commit

Permalink
DEV-125 - Fix partial append on error append to stream (#283)
Browse files Browse the repository at this point in the history
* Fix partial append on error append to stream

* Catch for non rpc errors

* Add trace log back and remove redundant catch for non rpc exceptions

* Do nothing with rpc exception

* Move RequestStream.CompleteAsync in try block

* Log exceptions in catch and refactor

* Update `GetVersion` to account pre-release tag

* Add interceptor on request stream
  • Loading branch information
w1am authored Feb 26, 2024
1 parent a0c283b commit 1173149
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 95 deletions.
183 changes: 103 additions & 80 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,103 +98,126 @@ private async ValueTask<IWriteResult> AppendToStreamInternal(
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken) {
CancellationToken cancellationToken
) {
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

using var call = new Streams.Streams.StreamsClient(
callInvoker).Append(EventStoreCallOptions.CreateNonStreaming(
Settings, deadline, userCredentials, cancellationToken));
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

IWriteResult writeResult;
try {
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
_log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.",
header.Options.StreamIdentifier, e.EventId, e.Type);
await call.RequestStream.WriteAsync(new AppendReq {
foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{Constants.Metadata.Type, e.Type},
{Constants.Metadata.ContentType, e.ContentType}
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
}
}).ConfigureAwait(false);
}
} finally {
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
},
}
).ConfigureAwait(false);
}

var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition)
: default);
_log.LogDebug("Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedStreamRevision);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};
await call.RequestStream.CompleteAsync().ConfigureAwait(false);

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision),
actualStreamRevision);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
}
var response = await call.ResponseAsync.ConfigureAwait(false);

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
expectedStreamState, actualStreamRevision);
}
if (response.Success != null)
return HandleSuccessAppend(response, header);

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
} else {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision);
}
if (response.WrongExpectedVersion == null)
throw new InvalidOperationException("The operation completed with an unexpected result.");

} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
return HandleWrongExpectedRevision(response, header, operationOptions);
}

private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
var currentRevision = response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);

var position = response.Success.PositionOptionCase ==
AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
: default;

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
position,
currentRevision);

return new SuccessResult(currentRevision, position);
}

private IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions
) {
var actualStreamRevision =
response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase
.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
}

var expectedStreamState =
response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

return writeResult;
var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
== AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedRevision
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
: StreamRevision.None;

return new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
expectedRevision,
actualStreamRevision
);
}


private class StreamAppender : IDisposable {
private readonly EventStoreClientSettings _settings;
private readonly CancellationToken _cancellationToken;
Expand Down Expand Up @@ -355,4 +378,4 @@ private static async IAsyncEnumerable<T> ReadAllAsync<T>(ChannelReader<T> reader
#endif
}
}
}
}
27 changes: 24 additions & 3 deletions src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Diagnostics.CodeAnalysis;
using Grpc.Core;
using Grpc.Core.Interceptors;
using static EventStore.Client.Constants;
Expand Down Expand Up @@ -55,7 +54,7 @@ AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation
var response = continuation(context);

return new AsyncClientStreamingCall<TRequest, TResponse>(
response.RequestStream,
response.RequestStream.Apply(ConvertRpcException),
response.ResponseAsync.Apply(ConvertRpcException),
response.ResponseHeadersAsync,
response.GetStatus,
Expand Down Expand Up @@ -103,7 +102,15 @@ public static IAsyncStreamReader<TRequest> Apply<TRequest>(this IAsyncStreamRead

public static Task<TResponse> Apply<TResponse>(this Task<TResponse> task, Func<RpcException, Exception> convertException) =>
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t.Result);


public static IClientStreamWriter<TRequest> Apply<TRequest>(
this IClientStreamWriter<TRequest> writer, Func<RpcException, Exception> convertException
) =>
new ExceptionConverterStreamWriter<TRequest>(writer, convertException);

public static Task Apply(this Task task, Func<RpcException, Exception> convertException) =>
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t);

public static AccessDeniedException ToAccessDeniedException(this RpcException exception) =>
new(exception.Message, exception);

Expand Down Expand Up @@ -142,3 +149,17 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken) {
}
}
}

class ExceptionConverterStreamWriter<TRequest>(
IClientStreamWriter<TRequest> writer,
Func<RpcException, Exception> convertException
)
: IClientStreamWriter<TRequest> {
public WriteOptions? WriteOptions {
get => writer.WriteOptions;
set => writer.WriteOptions = value;
}

public Task WriteAsync(TRequest message) => writer.WriteAsync(message).Apply(convertException);
public Task CompleteAsync() => writer.CompleteAsync().Apply(convertException);
}
28 changes: 16 additions & 12 deletions test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text;
using Grpc.Core;

namespace EventStore.Client.Streams.Tests.Append;
Expand Down Expand Up @@ -438,38 +439,41 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() {

ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
}

[Fact]
public async Task when_events_enumerator_throws_the_write_does_not_succeed() {
var streamName = Fixture.GetStreamName();

await Fixture.Streams
.AppendToStreamAsync(streamName, StreamRevision.None, GetEvents())
.AppendToStreamAsync(
streamName,
StreamRevision.None,
GetEvents(),
userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!)
)
.ShouldThrowAsync<EnumerationFailedException>();

var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);

var state = await result.ReadState;
var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start)
.ReadState;

state.ShouldBe(ReadState.StreamNotFound);

return;

IEnumerable<EventData> GetEvents() {
var i = 0;
foreach (var evt in Fixture.CreateTestEvents(5)) {
if (i++ % 3 == 0)
for (var i = 0; i < 5; i++) {
if (i % 3 == 0)
throw new EnumerationFailedException();

yield return evt;
yield return Fixture.CreateTestEvents(1).First();
}
}
}

class EnumerationFailedException : Exception { }

public static IEnumerable<object?[]> ArgumentOutOfRangeTestCases() {
yield return new object?[] { StreamState.Any };
yield return new object?[] { ulong.MaxValue - 1UL };
}
}
}

0 comments on commit 1173149

Please sign in to comment.