Skip to content

Commit

Permalink
try to dispose the call after catching an error
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 8, 2024
1 parent d7e2882 commit 7517dae
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 85 deletions.
164 changes: 80 additions & 84 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,9 @@ private async ValueTask<IWriteResult> AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
using var lct = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

// using var lct = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, lct.Token)
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

IWriteResult writeResult;
Expand All @@ -122,107 +121,104 @@ await call.RequestStream.WriteAsync(
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
}
},
}
).ConfigureAwait(false);
}
} catch (Exception ex) {
_log.LogError(
ex,
"Failed to append to stream - {StreamName}.",
header.Options.StreamIdentifier
);

lct.Cancel();
} finally {
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
if (!lct.IsCancellationRequested) {
var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);
var response = await call.ResponseAsync.ConfigureAwait(false);

if (response.Success != null) {
writeResult = new SuccessResult(
response.Success.CurrentRevisionOptionCase ==
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision),
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(
response.Success.Position.CommitPosition,
response.Success.Position.PreparePosition
)
: default
);

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
writeResult.LogPosition,
writeResult.NextExpectedStreamRevision
new StreamRevision(header.Options.Revision),
actualStreamRevision
);
} else {
if (response.WrongExpectedVersion != null) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
StreamRevision.None,
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
};

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
new StreamRevision(header.Options.Revision),
actualStreamRevision
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
} else {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
StreamState.Any,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
StreamState.NoStream,
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
.ExpectedStreamExists =>
StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
expectedStreamState,
actualStreamRevision
);
}

if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
actualStreamRevision
);
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
writeResult = new WrongExpectedVersionResult(
header.Options.StreamIdentifier!,
StreamRevision.None,
actualStreamRevision
);
}
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
} catch (Exception ex) {
_log.LogError(
ex,
"Failed to append to stream - {StreamName}.",
header.Options.StreamIdentifier
);

call.Dispose();
throw;

}

return writeResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ await Fixture.Streams.AppendToStreamAsync(
);

// Force regular append by passing credentials
await Assert.ThrowsAsync<InvalidOperationException>(
await Assert.ThrowsAsync<EnumerationFailedException>(
async () =>{
await Fixture.Streams.AppendToStreamAsync(
streamName,
Expand Down

0 comments on commit 7517dae

Please sign in to comment.