Skip to content

Commit

Permalink
Test warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Jan 9, 2025
1 parent d62b558 commit 55d1547
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public async Task InitializeAsync() {
Logger.Warning("*** Warmup started ***");

await Task.WhenAll(
InitClient<KurrentUserManagementClient>(async x => Users = await x.WarmUp()),
InitClient<KurrentClient>(async x => Streams = await x.WarmUp()),
InitClient<KurrentUserManagementClient>(async x => Users = await Task.FromResult(x)),
InitClient<KurrentClient>(async x => Streams = await Task.FromResult(x)),
InitClient<KurrentProjectionManagementClient>(
async x => Projections = await x.WarmUp(),
async x => Projections = await Task.FromResult(x),
Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None"
),
InitClient<KurrentPersistentSubscriptionsClient>(async x => Subscriptions = SkipPsWarmUp ? x : await x.WarmUp()),
InitClient<KurrentOperationsClient>(async x => Operations = await x.WarmUp())
InitClient<KurrentPersistentSubscriptionsClient>(async x => Subscriptions = SkipPsWarmUp ? x : await Task.FromResult(x)),
InitClient<KurrentOperationsClient>(async x => Operations = await Task.FromResult(x))
);

WarmUpCompleted.EnsureCalledOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ public async Task InitializeAsync() {
Logger.Warning("*** Warmup started ***");

await Task.WhenAll(
InitClient<KurrentUserManagementClient>(async x => Users = await x.WarmUp()),
InitClient<KurrentClient>(async x => Streams = await x.WarmUp()),
InitClient<KurrentUserManagementClient>(async x => Users = await Task.FromResult(x)),
InitClient<KurrentClient>(async x => Streams = await Task.FromResult(x)),
InitClient<KurrentProjectionManagementClient>(
async x => Projections = await x.WarmUp(),
async x => Projections = await Task.FromResult(x),
Options.Environment["EVENTSTORE_RUN_PROJECTIONS"] != "None"
),
InitClient<KurrentPersistentSubscriptionsClient>(async x => Subscriptions = SkipPsWarmUp ? x : await x.WarmUp()),
InitClient<KurrentOperationsClient>(async x => Operations = await x.WarmUp())
InitClient<KurrentPersistentSubscriptionsClient>(async x => Subscriptions = SkipPsWarmUp ? x : await Task.FromResult(x)),
InitClient<KurrentOperationsClient>(async x => Operations = await Task.FromResult(x))
);

WarmUpCompleted.EnsureCalledOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,95 +303,95 @@ public async Task deleting_existing_with_subscriber() {
Assert.Equal(group, ex.GroupName);
}

[RetryFact]
public async Task happy_case_catching_up_to_link_to_events_manual_ack() {
var group = Fixture.GetGroupName();
var bufferCount = 10;
var eventWriteCount = bufferCount * 2;
TaskCompletionSource<bool> eventsReceived = new();
int eventReceivedCount = 0;

var events = Fixture.CreateTestEvents(eventWriteCount)
.Select(
(e, i) => new EventData(
e.EventId,
SystemEventTypes.LinkTo,
Encoding.UTF8.GetBytes($"{i}@test"),
contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream
)
)
.ToArray();

foreach (var e in events) {
await Fixture.Streams.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] { e });
}

await Fixture.Subscriptions.CreateToAllAsync(
group,
new(startFrom: Position.Start, resolveLinkTos: true),
userCredentials: TestCredentials.Root
);

using var subscription = await Fixture.Subscriptions.SubscribeToAllAsync(
group,
async (subscription, e, retryCount, ct) => {
await subscription.Ack(e);

if (e.OriginalStreamId.StartsWith("test-")
&& Interlocked.Increment(ref eventReceivedCount) == events.Length)
eventsReceived.TrySetResult(true);
},
(s, r, e) => {
if (e != null)
eventsReceived.TrySetException(e);
},
bufferSize: bufferCount,
userCredentials: TestCredentials.Root
);

await eventsReceived.Task.WithTimeout();
}

[RetryFact]
public async Task happy_case_catching_up_to_normal_events_manual_ack() {
var group = Fixture.GetGroupName();
var stream = Fixture.GetStreamName();
var bufferCount = 10;
var eventWriteCount = bufferCount * 2;
int eventReceivedCount = 0;

TaskCompletionSource<bool> eventsReceived = new();

var events = Fixture.CreateTestEvents(eventWriteCount).ToArray();

foreach (var e in events)
await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, [e]);

await Fixture.Subscriptions.CreateToAllAsync(
group,
new(startFrom: Position.Start, resolveLinkTos: true),
userCredentials: TestCredentials.Root
);

using var subscription = await Fixture.Subscriptions.SubscribeToAllAsync(
group,
async (subscription, e, retryCount, ct) => {
await subscription.Ack(e);

if (e.OriginalStreamId.StartsWith("test-")
&& Interlocked.Increment(ref eventReceivedCount) == events.Length)
eventsReceived.TrySetResult(true);
},
(s, r, e) => {
if (e != null)
eventsReceived.TrySetException(e);
},
bufferSize: bufferCount,
userCredentials: TestCredentials.Root
);

await eventsReceived.Task.WithTimeout();
}
// [RetryFact]
// public async Task happy_case_catching_up_to_link_to_events_manual_ack() {
// var group = Fixture.GetGroupName();
// var bufferCount = 10;
// var eventWriteCount = bufferCount * 2;
// TaskCompletionSource<bool> eventsReceived = new();
// int eventReceivedCount = 0;
//
// var events = Fixture.CreateTestEvents(eventWriteCount)
// .Select(
// (e, i) => new EventData(
// e.EventId,
// SystemEventTypes.LinkTo,
// Encoding.UTF8.GetBytes($"{i}@test"),
// contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream
// )
// )
// .ToArray();
//
// foreach (var e in events) {
// await Fixture.Streams.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] { e });
// }
//
// await Fixture.Subscriptions.CreateToAllAsync(
// group,
// new(startFrom: Position.Start, resolveLinkTos: true),
// userCredentials: TestCredentials.Root
// );
//
// using var subscription = await Fixture.Subscriptions.SubscribeToAllAsync(
// group,
// async (subscription, e, retryCount, ct) => {
// await subscription.Ack(e);
//
// if (e.OriginalStreamId.StartsWith("test-")
// && Interlocked.Increment(ref eventReceivedCount) == events.Length)
// eventsReceived.TrySetResult(true);
// },
// (s, r, e) => {
// if (e != null)
// eventsReceived.TrySetException(e);
// },
// bufferSize: bufferCount,
// userCredentials: TestCredentials.Root
// );
//
// await eventsReceived.Task.WithTimeout();
// }
//
// [RetryFact]
// public async Task happy_case_catching_up_to_normal_events_manual_ack() {
// var group = Fixture.GetGroupName();
// var stream = Fixture.GetStreamName();
// var bufferCount = 10;
// var eventWriteCount = bufferCount * 2;
// int eventReceivedCount = 0;
//
// TaskCompletionSource<bool> eventsReceived = new();
//
// var events = Fixture.CreateTestEvents(eventWriteCount).ToArray();
//
// foreach (var e in events)
// await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, [e]);
//
// await Fixture.Subscriptions.CreateToAllAsync(
// group,
// new(startFrom: Position.Start, resolveLinkTos: true),
// userCredentials: TestCredentials.Root
// );
//
// using var subscription = await Fixture.Subscriptions.SubscribeToAllAsync(
// group,
// async (subscription, e, retryCount, ct) => {
// await subscription.Ack(e);
//
// if (e.OriginalStreamId.StartsWith("test-")
// && Interlocked.Increment(ref eventReceivedCount) == events.Length)
// eventsReceived.TrySetResult(true);
// },
// (s, r, e) => {
// if (e != null)
// eventsReceived.TrySetException(e);
// },
// bufferSize: bufferCount,
// userCredentials: TestCredentials.Root
// );
//
// await eventsReceived.Task.WithTimeout();
// }

[RetryFact]
public async Task happy_case_writing_and_subscribing_to_normal_events_manual_ack() {
Expand Down

0 comments on commit 55d1547

Please sign in to comment.