Skip to content

Commit

Permalink
Fix partial append on error append to stream
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 8, 2024
1 parent 9ae7e8b commit 1159799
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 37 deletions.
105 changes: 69 additions & 36 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,47 +98,69 @@ private async ValueTask<IWriteResult> AppendToStreamInternal(
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken) {
CancellationToken cancellationToken
) {
try {
eventData.ToList();
} catch (Exception) {
_log.LogError("Error while appending to stream - {streamName}.", header.Options.StreamIdentifier);
throw;
}

using var call = new Streams.Streams.StreamsClient(
callInvoker).Append(EventStoreCallOptions.CreateNonStreaming(
Settings, deadline, userCredentials, cancellationToken));
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

IWriteResult writeResult;
try {
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
_log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.",
header.Options.StreamIdentifier, e.EventId, e.Type);
await call.RequestStream.WriteAsync(new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{Constants.Metadata.Type, e.Type},
{Constants.Metadata.ContentType, e.ContentType}
_log.LogTrace(
"Appending event to stream - {streamName}@{eventId} {eventType}.",
header.Options.StreamIdentifier,
e.EventId,
e.Type
);

await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
}
}
}).ConfigureAwait(false);
).ConfigureAwait(false);
}
} finally {
await call.RequestStream.CompleteAsync().ConfigureAwait(false);

var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition)
: default);
_log.LogDebug("Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedStreamRevision);
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
Expand All @@ -149,42 +171,53 @@ await call.RequestStream.WriteAsync(new AppendReq {

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision),
actualStreamRevision);
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
actualStreamRevision
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedStreamExists =>
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
expectedStreamState, actualStreamRevision);
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision);
actualStreamRevision
);
} else {
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision);
actualStreamRevision
);
}

} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
Expand Down Expand Up @@ -355,4 +388,4 @@ private static async IAsyncEnumerable<T> ReadAllAsync<T>(ChannelReader<T> reader
#endif
}
}
}
}
44 changes: 43 additions & 1 deletion test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text;
using Grpc.Core;

namespace EventStore.Client.Streams.Tests.Append;
Expand Down Expand Up @@ -423,6 +424,47 @@ public async Task with_timeout_any_stream_revision_fails_when_operation_expired(
ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
}

[Fact]
public async Task should_not_append_to_stream_when_error_thrown_midway() {
var streamName = Fixture.GetStreamName();
const int initialNumberOfEvents = 5;

// Append some events before
await Fixture.Streams.AppendToStreamAsync(
streamName,
StreamState.Any,
Fixture.CreateTestEvents(initialNumberOfEvents)
);

// Force regular append by passing credentials
await Assert.ThrowsAsync<EnumerationFailedException>(
async () =>{
await Fixture.Streams.AppendToStreamAsync(
streamName,
StreamState.StreamExists,
GetEvents(),
userCredentials: new UserCredentials("admin", "changeit")
);
}
);

// No more events should be appended to the stream
var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);
var eventsCount = await result.CountAsync();
eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream");

return;

// Throw an exception after 5 events
IEnumerable<EventData> GetEvents() {
for (var i = 0; i < 5; i++) {
yield return Fixture.CreateTestEvents(1).First();
}

throw new EnumerationFailedException();
}
}

[Fact]
public async Task with_timeout_stream_revision_fails_when_operation_expired() {
var stream = Fixture.GetStreamName();
Expand Down Expand Up @@ -472,4 +514,4 @@ class EnumerationFailedException : Exception { }
yield return new object?[] { StreamState.Any };
yield return new object?[] { ulong.MaxValue - 1UL };
}
}
}

0 comments on commit 1159799

Please sign in to comment.