Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcummings committed Apr 2, 2024
1 parent b4667b6 commit b04de3b
Show file tree
Hide file tree
Showing 17 changed files with 742 additions and 265 deletions.
10 changes: 5 additions & 5 deletions src/EventStore.Client.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public static class Exceptions {
}

public static class Metadata {
public const string Type = "type";
public const string Created = "created";
public const string ContentType = "content-type";
public static readonly string[] RequiredMetadata = { Type, ContentType };
public const string Type = "type";
public const string Created = "created";
public const string ContentType = "content-type";
public static readonly string[] RequiredMetadata = { Type, ContentType };

public static class ContentTypes {
public const string ApplicationJson = "application/json";
Expand All @@ -58,4 +58,4 @@ public static class Headers {
public const string ConnectionName = "connection-name";
public const string RequiresLeader = "requires-leader";
}
}
}
346 changes: 210 additions & 136 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private async Task<IWriteResult> SetStreamMetadataInternal(StreamMetadata metada
CancellationToken cancellationToken) {

var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] {
return await AppendToStreamInternal(channelInfo, appendReq, new[] {
new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata,
JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)),
}, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false);
Expand Down
175 changes: 110 additions & 65 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ public Task<StreamSubscription> SubscribeToAllAsync(
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
CancellationToken cancellationToken = default
) => StreamSubscription.Confirm(
SubscribeToAll(start, resolveLinkTos, filterOptions, userCredentials, cancellationToken),
eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached,
cancellationToken: cancellationToken);
eventAppeared,
subscriptionDropped,
_log,
filterOptions?.CheckpointReached,
cancellationToken: cancellationToken
);

/// <summary>
/// Subscribes to all events.
Expand All @@ -43,19 +48,26 @@ public StreamSubscriptionResult SubscribeToAll(
bool resolveLinkTos = false,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => new(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!,
UuidOption = new() { Structured = new() }
}
}, Settings, userCredentials, cancellationToken);
CancellationToken cancellationToken = default
) => new(
async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
},
new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!,
UuidOption = new() { Structured = new() }
}
},
Settings,
userCredentials,
cancellationToken
);

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
Expand All @@ -69,15 +81,21 @@ public StreamSubscriptionResult SubscribeToAll(
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
[Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.", false)]
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
FromStream start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
public Task<StreamSubscription> SubscribeToStreamAsync(
string streamName,
FromStream start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) => StreamSubscription.Confirm(
SubscribeToStream(streamName, start, resolveLinkTos, userCredentials, cancellationToken),
eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken);
eventAppeared,
subscriptionDropped,
_log,
cancellationToken: cancellationToken
);

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
Expand All @@ -93,28 +111,35 @@ public StreamSubscriptionResult SubscribeToStream(
FromStream start,
bool resolveLinkTos = false,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => new(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
UuidOption = new() { Structured = new() }
}
}, Settings, userCredentials, cancellationToken);
CancellationToken cancellationToken = default
) => new(
async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
},
new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
UuidOption = new() { Structured = new() }
}
},
Settings,
userCredentials,
cancellationToken
);

/// <summary>
/// A class that represents the result of a subscription operation. You may either enumerate this instance directly or <see cref="Messages"/>. Do not enumerate more than once.
/// </summary>
public class StreamSubscriptionResult : IAsyncEnumerable<ResolvedEvent>, IAsyncDisposable, IDisposable {
private readonly ReadReq _request;
private readonly Channel<StreamMessage> _channel;
private readonly CancellationTokenSource _cts;
private readonly CallOptions _callOptions;
private AsyncServerStreamingCall<ReadResp>? _call;
private readonly ReadReq _request;
private readonly Channel<StreamMessage> _channel;
private readonly CancellationTokenSource _cts;
private readonly CallOptions _callOptions;
private AsyncServerStreamingCall<ReadResp>? _call;

private int _messagesEnumerated;

Expand Down Expand Up @@ -150,12 +175,17 @@ async IAsyncEnumerable<StreamMessage> GetMessages() {
}
}

internal StreamSubscriptionResult(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker,
internal StreamSubscriptionResult(
Func<CancellationToken, Task<CallInvoker>> selectCallInvoker,
ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials,
CancellationToken cancellationToken) {
CancellationToken cancellationToken
) {
_request = request;
_callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
cancellationToken: cancellationToken);
_callOptions = EventStoreCallOptions.CreateStreaming(
settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken
);

_channel = Channel.CreateBounded<StreamMessage>(ReadBoundedChannelOptions);

Expand All @@ -172,28 +202,40 @@ internal StreamSubscriptionResult(Func<CancellationToken, Task<CallInvoker>> sel
async Task PumpMessages() {
try {
var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
var client = new Streams.Streams.StreamsClient(callInvoker);
var client = new Streams.Streams.StreamsClient(callInvoker);
_call = client.Read(_request, _callOptions);
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
.ConfigureAwait(false)) {
await _channel.Writer.WriteAsync(response.ContentCase switch {
Confirmation => new StreamMessage.SubscriptionConfirmation(
response.Confirmation.SubscriptionId),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)),
LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition)),
Checkpoint => new StreamMessage.AllStreamCheckpointReached(
new Position(response.Checkpoint.CommitPosition,
response.Checkpoint.PreparePosition)),
CaughtUp => StreamMessage.CaughtUp.Instance,
FellBehind => StreamMessage.FellBehind.Instance,
_ => StreamMessage.Unknown.Instance
}, _cts.Token).ConfigureAwait(false);
await _channel.Writer.WriteAsync(
response.ContentCase switch {
Confirmation => new StreamMessage.SubscriptionConfirmation(
response.Confirmation.SubscriptionId
),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)
),
LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)
),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(
response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition
)
),
Checkpoint => new StreamMessage.AllStreamCheckpointReached(
new Position(
response.Checkpoint.CommitPosition,
response.Checkpoint.PreparePosition
)
),
CaughtUp => StreamMessage.CaughtUp.Instance,
FellBehind => StreamMessage.FellBehind.Instance,
_ => StreamMessage.Unknown.Instance
},
_cts.Token
).ConfigureAwait(false);
}

_channel.Writer.Complete();
Expand All @@ -214,9 +256,11 @@ static async ValueTask CastAndDispose(IDisposable? resource) {
switch (resource) {
case null:
return;

case IAsyncDisposable resourceAsyncDisposable:
await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false);
break;

default:
resource.Dispose();
break;
Expand All @@ -232,7 +276,8 @@ public void Dispose() {

/// <inheritdoc />
public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(
CancellationToken cancellationToken = default) {
CancellationToken cancellationToken = default
) {
try {
await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false)) {
Expand Down
Loading

0 comments on commit b04de3b

Please sign in to comment.