From 824f6cd1e0e27b12b9159405eb47ee0c899b5db7 Mon Sep 17 00:00:00 2001 From: Simon Novak Date: Thu, 5 Oct 2023 18:12:47 +0200 Subject: [PATCH 1/4] fix: Updated CassandraRemindersTable.cs - Default Scheduler --- .../Provider/CassandraRemindersTable.cs | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/src/Reminders/Cassandra/Provider/CassandraRemindersTable.cs b/src/Reminders/Cassandra/Provider/CassandraRemindersTable.cs index 3413ded..2ddb56c 100644 --- a/src/Reminders/Cassandra/Provider/CassandraRemindersTable.cs +++ b/src/Reminders/Cassandra/Provider/CassandraRemindersTable.cs @@ -60,11 +60,15 @@ public CassandraRemindersTable( /// public async Task Init() { - _session = await _cluster.ConnectAsync(string.Empty); + _session = await _cluster + .ConnectAsync(string.Empty) + .ConfigureAwait(false); _session.CreateKeyspaceIfNotExists(_clientOptions.DefaultKeyspace); _session.ChangeKeyspace(_clientOptions.DefaultKeyspace); - await InitializeDatabase(); - await InitializeStatements(); + await InitializeDatabase() + .ConfigureAwait(false); + await InitializeStatements() + .ConfigureAwait(false); } /// @@ -72,7 +76,8 @@ public async Task ReadRows(GrainId grainId) { var (type, id) = GenerateDatabaseIds(grainId); var resultSet = await Execute(() => - _session!.ExecuteAsync(_readRowsStatement!.Bind(type, id))); + _session!.ExecuteAsync(_readRowsStatement!.Bind(type, id))) + .ConfigureAwait(false); if (resultSet.GetAvailableWithoutFetching() == 0) { return new ReminderTableData(); @@ -91,7 +96,8 @@ public async Task ReadRows(uint begin, uint end) var bigEnd = Convert.ToInt64(end); var resultSet = await Execute(() => - _session!.ExecuteAsync(_readRowsHashStatement!.Bind(bigBegin, bigEnd))); + _session!.ExecuteAsync(_readRowsHashStatement!.Bind(bigBegin, bigEnd))) + .ConfigureAwait(false); if (resultSet.GetAvailableWithoutFetching() == 0) { return new ReminderTableData(); @@ -109,7 +115,8 @@ public async Task ReadRow(GrainId grainId, string reminderName) ArgumentNullException.ThrowIfNull(reminderName); var (type, id) = GenerateDatabaseIds(grainId); var resultSet = await Execute(() => - _session!.ExecuteAsync(_readRowStatement!.Bind(type, id, reminderName))); + _session!.ExecuteAsync(_readRowStatement!.Bind(type, id, reminderName))) + .ConfigureAwait(false); if (resultSet.GetAvailableWithoutFetching() == 0) { return new ReminderEntry(); @@ -137,7 +144,8 @@ await Execute(() => Convert.ToInt64(entry.GrainId.GetUniformHashCode()), new DateTimeOffset(entry.StartAt), entry.Period.Ticks, - etag))); + etag))) + .ConfigureAwait(false); return etag; } @@ -157,14 +165,17 @@ DELETE FROM "{RemindersTableName}" type, id, reminderName, - eTag))); + eTag))) + .ConfigureAwait(false); return resultSet.GetAvailableWithoutFetching() > 0; } /// - public Task TestOnlyClearTable() + public async Task TestOnlyClearTable() { - return Execute(() => _session!.ExecuteAsync(new SimpleStatement($"""DELETE FROM "{RemindersTableName}";"""))); + await Execute(() => + _session!.ExecuteAsync(new SimpleStatement($"""DELETE FROM "{RemindersTableName}";"""))) + .ConfigureAwait(false); } private static (byte[] Type, byte[] Id) GenerateDatabaseIds(GrainId grainId) @@ -180,7 +191,8 @@ private static GrainId BuildGrainId(byte[] type, byte[] id) private async Task InitializeDatabase() { await Execute(() => new Table(_session, _mapping, RemindersTableName) - .CreateIfNotExistsAsync()); + .CreateIfNotExistsAsync()) + .ConfigureAwait(false); } private async Task InitializeStatements() @@ -192,7 +204,8 @@ private async Task InitializeStatements() FROM "{RemindersTableName}" WHERE type = ? AND id = ? ALLOW FILTERING - """)); + """)) + .ConfigureAwait(false); _readRowsHashStatement = await Execute(() => _session!.PrepareAsync( $""" @@ -200,20 +213,23 @@ ALLOW FILTERING FROM "{RemindersTableName}" WHERE hash > ? AND hash <= ? ALLOW FILTERING - """)); + """)) + .ConfigureAwait(false); _readRowStatement = await Execute(() => _session!.PrepareAsync( $""" SELECT type, id, name, start_on, period, etag FROM "{RemindersTableName}" WHERE type = ? AND id = ? AND name = ? - """)); + """)) + .ConfigureAwait(false); _upsertRowStatement = await Execute(() => _session!.PrepareAsync( $""" INSERT INTO "{RemindersTableName}" (type, id, name, hash, start_on, period, etag) VALUES (?, ?, ?, ?, ?, ?, ?) - """)); + """)) + .ConfigureAwait(false); } /// From f01fb41259d2cceeac8964df492b206a465a94c7 Mon Sep 17 00:00:00 2001 From: Simon Novak Date: Thu, 5 Oct 2023 18:13:18 +0200 Subject: [PATCH 2/4] fix: Updated GrainStorageBase.cs - Default Scheduler --- src/Persistence/Cassandra/Storage/GrainStorageBase.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Persistence/Cassandra/Storage/GrainStorageBase.cs b/src/Persistence/Cassandra/Storage/GrainStorageBase.cs index 58a98ef..77faa6e 100644 --- a/src/Persistence/Cassandra/Storage/GrainStorageBase.cs +++ b/src/Persistence/Cassandra/Storage/GrainStorageBase.cs @@ -88,7 +88,9 @@ public void Dispose() protected virtual async Task Initialize(CancellationToken cancellationToken) { LogInitialize(_name, _clientOptions.DefaultKeyspace!); - Session = await _cluster.ConnectAsync(string.Empty); + Session = await _cluster + .ConnectAsync(string.Empty) + .ConfigureAwait(false); Session.CreateKeyspaceIfNotExists(_clientOptions.DefaultKeyspace); Session.ChangeKeyspace(_clientOptions.DefaultKeyspace); LogConnect(_name, _clientOptions.DefaultKeyspace!); From b44e996a47eb46e1e2b560c30ba101e9c34b0449 Mon Sep 17 00:00:00 2001 From: Simon Novak Date: Thu, 5 Oct 2023 18:16:23 +0200 Subject: [PATCH 3/4] fix: Updated SessionContextProviderBase.cs - Default Scheduler on missing tasks --- .../Provider/SessionContextProviderBase.cs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/Clustering/Cassandra/Provider/SessionContextProviderBase.cs b/src/Clustering/Cassandra/Provider/SessionContextProviderBase.cs index 1265a4d..44d02c9 100644 --- a/src/Clustering/Cassandra/Provider/SessionContextProviderBase.cs +++ b/src/Clustering/Cassandra/Provider/SessionContextProviderBase.cs @@ -226,7 +226,8 @@ protected async Task Insert(MembershipEntry entry, TableVersion tableVersi { ArgumentNullException.ThrowIfNull(entry); ArgumentNullException.ThrowIfNull(tableVersion); - var silos = await Execute(ReadMany); + var silos = await Execute(ReadMany) + .ConfigureAwait(false); var currentEntry = silos.Members.FirstOrDefault(w => w.Item1.SiloAddress.ToParsableString() == entry.SiloAddress.ToParsableString()); @@ -333,7 +334,8 @@ await Execute(() => _mapper! /// A representing the result of the asynchronous operation. protected async Task> GetGatewayList() { - var membershipTableData = await Execute(ReadMany); + var membershipTableData = await Execute(ReadMany) + .ConfigureAwait(false); return membershipTableData .Members .Where(w => w.Item1.Status == SiloStatus.Active) @@ -501,9 +503,15 @@ private async Task InitializeDatabaseAsync() var sqlSiloType = await siloTypeReader.ReadToEndAsync().ConfigureAwait(false); var sqlMembershipTable = await membershipTableReader.ReadToEndAsync().ConfigureAwait(false); - await _session!.ExecuteAsync(new SimpleStatement(sqlSuspectTimesType)); - await _session!.ExecuteAsync(new SimpleStatement(sqlSiloType)); - await _session!.ExecuteAsync(new SimpleStatement(sqlMembershipTable)); + await Execute(() => + _session!.ExecuteAsync(new SimpleStatement(sqlSuspectTimesType))) + .ConfigureAwait(false); + await Execute(() => + _session!.ExecuteAsync(new SimpleStatement(sqlSiloType))) + .ConfigureAwait(false); + await Execute(() => + _session!.ExecuteAsync(new SimpleStatement(sqlMembershipTable))) + .ConfigureAwait(false); } [LoggerMessage( From 1c8dac02285e4f4f0368022c79573398a98cb035 Mon Sep 17 00:00:00 2001 From: Simon Novak Date: Thu, 5 Oct 2023 18:19:03 +0200 Subject: [PATCH 4/4] fix: Updated SingleTableGrainStorage.cs - Default Scheduler --- .../Storage/SingleTableGrainStorage.cs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs b/src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs index b637d3a..77b45d8 100644 --- a/src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs +++ b/src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs @@ -85,7 +85,9 @@ protected override async Task Initialize(CancellationToken cancellationToken) var storage = new Table(Session, _mappingConfiguration, _storageOptions.TableNameOrPrefix); - await Execute(() => storage.CreateIfNotExistsAsync()); + await Execute(() => + storage.CreateIfNotExistsAsync()) + .ConfigureAwait(false); _readStatement = await Execute( @@ -94,21 +96,24 @@ await Execute( SELECT name, type, id, state, etag FROM "{_storageOptions.TableNameOrPrefix}" WHERE name = ? AND type = ? AND id = ? - """)); + """)) + .ConfigureAwait(false); _writeStatement = await Execute( () => Session!.PrepareAsync( $""" INSERT INTO "{_storageOptions.TableNameOrPrefix}" (name, type, id, state, etag) VALUES (?, ?, ?, ?, ?) - """)); + """)) + .ConfigureAwait(false); _clearStatement = await Execute( () => Session!.PrepareAsync( $""" DELETE FROM "{_storageOptions.TableNameOrPrefix}" WHERE name = ? AND type = ? AND id = ? - """)); + """)) + .ConfigureAwait(false); } private async Task ReadStateInternalAsync(string stateName, GrainId grainId, IGrainState grainState) @@ -117,7 +122,8 @@ private async Task ReadStateInternalAsync(string stateName, GrainId grainId, var type = GenerateTypeName(stateName, grainId); var id = GenerateId(stateName, grainId); var results = await Execute(() => - Session!.ExecuteAsync(_readStatement!.Bind(name, type, id))); + Session!.ExecuteAsync(_readStatement!.Bind(name, type, id))) + .ConfigureAwait(false); if (results.GetAvailableWithoutFetching() == 0) { @@ -142,7 +148,8 @@ private async Task WriteStateInternalAsync(string stateName, GrainId grainId, var state = _storageOptions.GrainStorageSerializer!.Serialize(grainState.State).ToArray(); var etag = grainState.ETag; await Execute(() => - Session!.ExecuteAsync(_writeStatement!.Bind(name, type, id, state, etag))); + Session!.ExecuteAsync(_writeStatement!.Bind(name, type, id, state, etag))) + .ConfigureAwait(false); } private async Task ClearStateInternalAsync(string stateName, GrainId grainId, IGrainState grainState) @@ -156,7 +163,8 @@ private async Task ClearStateInternalAsync(string stateName, GrainId grainId, var type = GenerateTypeName(stateName, grainId); var id = GenerateId(stateName, grainId); var results = await Execute(() => - Session!.ExecuteAsync(_clearStatement!.Bind(name, type, id))); + Session!.ExecuteAsync(_clearStatement!.Bind(name, type, id))) + .ConfigureAwait(false); if (results.GetAvailableWithoutFetching() != 0) {