Skip to content

Commit

Permalink
Merge pull request #4 from snovak7/main
Browse files Browse the repository at this point in the history
Task scheduler fixes
  • Loading branch information
snovak7 authored Oct 5, 2023
2 parents af8e58d + 62e8968 commit a71b412
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 28 deletions.
18 changes: 13 additions & 5 deletions src/Clustering/Cassandra/Provider/SessionContextProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ protected async Task<bool> Insert(MembershipEntry entry, TableVersion tableVersi
{
ArgumentNullException.ThrowIfNull(entry);
ArgumentNullException.ThrowIfNull(tableVersion);
var silos = await Execute(ReadMany);
var silos = await Execute(ReadMany)
.ConfigureAwait(false);

var currentEntry = silos.Members.FirstOrDefault(w => w.Item1.SiloAddress.ToParsableString() == entry.SiloAddress.ToParsableString());

Expand Down Expand Up @@ -333,7 +334,8 @@ await Execute(() => _mapper!
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
protected async Task<IList<Uri>> GetGatewayList()
{
var membershipTableData = await Execute(ReadMany);
var membershipTableData = await Execute(ReadMany)
.ConfigureAwait(false);
return membershipTableData
.Members
.Where(w => w.Item1.Status == SiloStatus.Active)
Expand Down Expand Up @@ -501,9 +503,15 @@ private async Task InitializeDatabaseAsync()
var sqlSiloType = await siloTypeReader.ReadToEndAsync().ConfigureAwait(false);
var sqlMembershipTable = await membershipTableReader.ReadToEndAsync().ConfigureAwait(false);

await _session!.ExecuteAsync(new SimpleStatement(sqlSuspectTimesType));
await _session!.ExecuteAsync(new SimpleStatement(sqlSiloType));
await _session!.ExecuteAsync(new SimpleStatement(sqlMembershipTable));
await Execute(() =>
_session!.ExecuteAsync(new SimpleStatement(sqlSuspectTimesType)))
.ConfigureAwait(false);
await Execute(() =>
_session!.ExecuteAsync(new SimpleStatement(sqlSiloType)))
.ConfigureAwait(false);
await Execute(() =>
_session!.ExecuteAsync(new SimpleStatement(sqlMembershipTable)))
.ConfigureAwait(false);
}

[LoggerMessage(
Expand Down
4 changes: 3 additions & 1 deletion src/Persistence/Cassandra/Storage/GrainStorageBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ public void Dispose()
protected virtual async Task Initialize(CancellationToken cancellationToken)
{
LogInitialize(_name, _clientOptions.DefaultKeyspace!);
Session = await _cluster.ConnectAsync(string.Empty);
Session = await _cluster
.ConnectAsync(string.Empty)
.ConfigureAwait(false);
Session.CreateKeyspaceIfNotExists(_clientOptions.DefaultKeyspace);
Session.ChangeKeyspace(_clientOptions.DefaultKeyspace);
LogConnect(_name, _clientOptions.DefaultKeyspace!);
Expand Down
22 changes: 15 additions & 7 deletions src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ protected override async Task Initialize(CancellationToken cancellationToken)

var storage =
new Table<SingleGrainStorageTable>(Session, _mappingConfiguration, _storageOptions.TableNameOrPrefix);
await Execute(() => storage.CreateIfNotExistsAsync());
await Execute(() =>
storage.CreateIfNotExistsAsync())
.ConfigureAwait(false);

_readStatement =
await Execute(
Expand All @@ -94,21 +96,24 @@ await Execute(
SELECT name, type, id, state, etag
FROM "{_storageOptions.TableNameOrPrefix}"
WHERE name = ? AND type = ? AND id = ?
"""));
"""))
.ConfigureAwait(false);
_writeStatement =
await Execute(
() => Session!.PrepareAsync(
$"""
INSERT INTO "{_storageOptions.TableNameOrPrefix}" (name, type, id, state, etag)
VALUES (?, ?, ?, ?, ?)
"""));
"""))
.ConfigureAwait(false);
_clearStatement =
await Execute(
() => Session!.PrepareAsync(
$"""
DELETE FROM "{_storageOptions.TableNameOrPrefix}"
WHERE name = ? AND type = ? AND id = ?
"""));
"""))
.ConfigureAwait(false);
}

private async Task ReadStateInternalAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
Expand All @@ -117,7 +122,8 @@ private async Task ReadStateInternalAsync<T>(string stateName, GrainId grainId,
var type = GenerateTypeName<T>(stateName, grainId);
var id = GenerateId<T>(stateName, grainId);
var results = await Execute(() =>
Session!.ExecuteAsync(_readStatement!.Bind(name, type, id)));
Session!.ExecuteAsync(_readStatement!.Bind(name, type, id)))
.ConfigureAwait(false);

if (results.GetAvailableWithoutFetching() == 0)
{
Expand All @@ -142,7 +148,8 @@ private async Task WriteStateInternalAsync<T>(string stateName, GrainId grainId,
var state = _storageOptions.GrainStorageSerializer!.Serialize(grainState.State).ToArray();
var etag = grainState.ETag;
await Execute(() =>
Session!.ExecuteAsync(_writeStatement!.Bind(name, type, id, state, etag)));
Session!.ExecuteAsync(_writeStatement!.Bind(name, type, id, state, etag)))
.ConfigureAwait(false);
}

private async Task ClearStateInternalAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
Expand All @@ -156,7 +163,8 @@ private async Task ClearStateInternalAsync<T>(string stateName, GrainId grainId,
var type = GenerateTypeName<T>(stateName, grainId);
var id = GenerateId<T>(stateName, grainId);
var results = await Execute(() =>
Session!.ExecuteAsync(_clearStatement!.Bind(name, type, id)));
Session!.ExecuteAsync(_clearStatement!.Bind(name, type, id)))
.ConfigureAwait(false);

if (results.GetAvailableWithoutFetching() != 0)
{
Expand Down
46 changes: 31 additions & 15 deletions src/Reminders/Cassandra/Provider/CassandraRemindersTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,24 @@ public CassandraRemindersTable(
/// <inheritdoc/>
public async Task Init()
{
_session = await _cluster.ConnectAsync(string.Empty);
_session = await _cluster
.ConnectAsync(string.Empty)
.ConfigureAwait(false);
_session.CreateKeyspaceIfNotExists(_clientOptions.DefaultKeyspace);
_session.ChangeKeyspace(_clientOptions.DefaultKeyspace);
await InitializeDatabase();
await InitializeStatements();
await InitializeDatabase()
.ConfigureAwait(false);
await InitializeStatements()
.ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task<ReminderTableData> ReadRows(GrainId grainId)
{
var (type, id) = GenerateDatabaseIds(grainId);
var resultSet = await Execute(() =>
_session!.ExecuteAsync(_readRowsStatement!.Bind(type, id)));
_session!.ExecuteAsync(_readRowsStatement!.Bind(type, id)))
.ConfigureAwait(false);
if (resultSet.GetAvailableWithoutFetching() == 0)
{
return new ReminderTableData();
Expand All @@ -91,7 +96,8 @@ public async Task<ReminderTableData> ReadRows(uint begin, uint end)
var bigEnd = Convert.ToInt64(end);

var resultSet = await Execute(() =>
_session!.ExecuteAsync(_readRowsHashStatement!.Bind(bigBegin, bigEnd)));
_session!.ExecuteAsync(_readRowsHashStatement!.Bind(bigBegin, bigEnd)))
.ConfigureAwait(false);
if (resultSet.GetAvailableWithoutFetching() == 0)
{
return new ReminderTableData();
Expand All @@ -109,7 +115,8 @@ public async Task<ReminderEntry> ReadRow(GrainId grainId, string reminderName)
ArgumentNullException.ThrowIfNull(reminderName);
var (type, id) = GenerateDatabaseIds(grainId);
var resultSet = await Execute(() =>
_session!.ExecuteAsync(_readRowStatement!.Bind(type, id, reminderName)));
_session!.ExecuteAsync(_readRowStatement!.Bind(type, id, reminderName)))
.ConfigureAwait(false);
if (resultSet.GetAvailableWithoutFetching() == 0)
{
return new ReminderEntry();
Expand Down Expand Up @@ -137,7 +144,8 @@ await Execute(() =>
Convert.ToInt64(entry.GrainId.GetUniformHashCode()),
new DateTimeOffset(entry.StartAt),
entry.Period.Ticks,
etag)));
etag)))
.ConfigureAwait(false);
return etag;
}

Expand All @@ -157,14 +165,17 @@ DELETE FROM "{RemindersTableName}"
type,
id,
reminderName,
eTag)));
eTag)))
.ConfigureAwait(false);
return resultSet.GetAvailableWithoutFetching() > 0;
}

/// <inheritdoc/>
public Task TestOnlyClearTable()
public async Task TestOnlyClearTable()
{
return Execute(() => _session!.ExecuteAsync(new SimpleStatement($"""DELETE FROM "{RemindersTableName}";""")));
await Execute(() =>
_session!.ExecuteAsync(new SimpleStatement($"""DELETE FROM "{RemindersTableName}";""")))
.ConfigureAwait(false);
}

private static (byte[] Type, byte[] Id) GenerateDatabaseIds(GrainId grainId)
Expand All @@ -180,7 +191,8 @@ private static GrainId BuildGrainId(byte[] type, byte[] id)
private async Task InitializeDatabase()
{
await Execute(() => new Table<Reminder>(_session, _mapping, RemindersTableName)
.CreateIfNotExistsAsync());
.CreateIfNotExistsAsync())
.ConfigureAwait(false);
}

private async Task InitializeStatements()
Expand All @@ -192,28 +204,32 @@ private async Task InitializeStatements()
FROM "{RemindersTableName}"
WHERE type = ? AND id = ?
ALLOW FILTERING
"""));
"""))
.ConfigureAwait(false);
_readRowsHashStatement = await Execute(() =>
_session!.PrepareAsync(
$"""
SELECT type, id, name, start_on, period, etag
FROM "{RemindersTableName}"
WHERE hash > ? AND hash <= ?
ALLOW FILTERING
"""));
"""))
.ConfigureAwait(false);
_readRowStatement = await Execute(() =>
_session!.PrepareAsync(
$"""
SELECT type, id, name, start_on, period, etag
FROM "{RemindersTableName}"
WHERE type = ? AND id = ? AND name = ?
"""));
"""))
.ConfigureAwait(false);
_upsertRowStatement = await Execute(() =>
_session!.PrepareAsync(
$"""
INSERT INTO "{RemindersTableName}" (type, id, name, hash, start_on, period, etag)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""));
"""))
.ConfigureAwait(false);
}

/// <summary>
Expand Down

0 comments on commit a71b412

Please sign in to comment.