Skip to content

Commit

Permalink
Merge pull request #2 from snovak7/main
Browse files Browse the repository at this point in the history
Improvements in Persistence & Reminders
  • Loading branch information
snovak7 authored Oct 4, 2023
2 parents 316d3bd + 4570e43 commit 4f8f755
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
<PackageReference Include="Escendit.Extensions.Hosting.Cassandra" />
<PackageReference Include="Microsoft.Orleans.Runtime"/>
</ItemGroup>
<ItemGroup>
<Folder Include="Mapping\" />
<Folder Include="Schema\" />
</ItemGroup>
<ItemGroup>
<None Pack="true" PackagePath="" Include="README.md"/>
</ItemGroup>
Expand Down
27 changes: 27 additions & 0 deletions src/Persistence/Cassandra/Mapping/SingleGrainStorageMapping.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Escendit Ltd. All Rights Reserved.
// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information.

namespace Escendit.Orleans.Persistence.Cassandra.Mapping;

using global::Cassandra.Mapping;
using Schema;

/// <summary>
/// Single Grain Storage Mapping.
/// </summary>
public class SingleGrainStorageMapping : Mappings
{
/// <summary>
/// Initializes a new instance of the <see cref="SingleGrainStorageMapping"/> class.
/// </summary>
public SingleGrainStorageMapping()
{
For<SingleGrainStorageTable>()
.PartitionKey("type", "id", "name")
.Column(p => p.Type, cm => cm.WithName("type"))
.Column(p => p.Id, cm => cm.WithName("id"))
.Column(p => p.Name, cm => cm.WithName("name"))
.Column(p => p.State, cm => cm.WithName("state"))
.Column(p => p.Etag, cm => cm.WithName("etag"));
}
}
40 changes: 40 additions & 0 deletions src/Persistence/Cassandra/Schema/SingleGrainStorageTable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Escendit Ltd. All Rights Reserved.
// Licensed under the MIT. See LICENSE.txt file in the solution root for full license information.

namespace Escendit.Orleans.Persistence.Cassandra.Schema;

/// <summary>
/// Single Grain Storage Table.
/// </summary>
public class SingleGrainStorageTable
{
/// <summary>
/// Gets or sets the type.
/// </summary>
/// <value>The type.</value>
public byte[] Type { get; set; }

/// <summary>
/// Gets or sets the id.
/// </summary>
/// <value>The id.</value>
public byte[] Id { get; set; }

/// <summary>
/// Gets or sets the name.
/// </summary>
/// <value>The name.</value>
public string Name { get; set; } = default!;

/// <summary>
/// Gets or sets the state.
/// </summary>
/// <value>The state.</value>
public byte[] State { get; set; }

/// <summary>
/// Gets or sets the etag.
/// </summary>
/// <value>The etag.</value>
public string Etag { get; set; } = default!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public static GrainStorageBase Create(IServiceProvider serviceProvider, string n
{
Strategy.SingleTable => ActivatorUtilities
.CreateInstance<SingleTableGrainStorage>(serviceProvider, name, options, connectionOptions),
Strategy.TablePerGrain => throw new NotImplementedException(),
Strategy.TablePerGrain => throw new NotSupportedException(),
null => throw new NotSupportedException(),
_ => throw new ArgumentOutOfRangeException(name),
};
}
Expand Down
14 changes: 10 additions & 4 deletions src/Persistence/Cassandra/Storage/GrainStorageBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,20 @@ protected Task<T> Execute<T>(Func<Task<T>> action, [CallerMemberName] string act
{
var stopwatch = Stopwatch.StartNew();
var result = action();
stopwatch.Stop();
LogExecute(_name, returnTypeName, selfTypeName, actionName, stopwatch.ElapsedMilliseconds);
result.ContinueWith(
(_, _) =>
{
stopwatch.Stop();
LogExecute(_name, returnTypeName, selfTypeName, actionName, stopwatch.ElapsedMilliseconds);
},
TaskContinuationOptions.OnlyOnRanToCompletion,
TaskScheduler.Default);
return result;
}
catch (Exception ex)
{
LogException(_name, ex, ex.Message, returnTypeName, selfTypeName, actionName);
throw;
throw new CassandraStorageException(ex.Message, ex);
}
}

Expand Down Expand Up @@ -217,7 +223,7 @@ protected Task<T> Execute<T>(Func<Task<T>> action, [CallerMemberName] string act
EventId = 200,
EventName = "Execution",
Level = LogLevel.Debug,
Message = "Executing with client {name} > {returnType} {selfType}.{action} completed in {elapsed}")]
Message = "Executing with client {name} > {returnType} {selfType}.{action} completed in {elapsed}ms")]
private partial void LogExecute(string name, string returnType, string selfType, string action, long elapsed);

[LoggerMessage(
Expand Down
97 changes: 36 additions & 61 deletions src/Persistence/Cassandra/Storage/SingleTableGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ namespace Escendit.Orleans.Persistence.Cassandra.Storage;

using Escendit.Extensions.Hosting.Cassandra;
using global::Cassandra;
using global::Cassandra.Data.Linq;
using global::Cassandra.Mapping;
using global::Orleans;
using global::Orleans.Runtime;
using Mapping;
using Microsoft.Extensions.Logging;
using Options;
using Schema;

/// <summary>
/// Single Table Grain Storage.
/// </summary>
internal partial class SingleTableGrainStorage : GrainStorageBase
internal class SingleTableGrainStorage : GrainStorageBase
{
private readonly ILogger _logger;
private readonly CassandraClientOptions _clientOptions;
private readonly CassandraStorageOptions _storageOptions;
private readonly MappingConfiguration _mappingConfiguration;
private PreparedStatement? _readStatement;
private PreparedStatement? _writeStatement;
private PreparedStatement? _clearStatement;
Expand All @@ -40,9 +43,9 @@ public SingleTableGrainStorage(
CassandraStorageOptions storageOptions)
: base(name, logger, serviceProvider.GetRequiredCassandraClient(name), clientOptions)
{
_logger = logger;
_clientOptions = clientOptions;
_storageOptions = storageOptions;
_mappingConfiguration = new MappingConfiguration()
.Define<SingleGrainStorageMapping>();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -80,59 +83,41 @@ protected override async Task Initialize(CancellationToken cancellationToken)
{
await base.Initialize(cancellationToken);

var results = await Execute(
() => Session!
.ExecuteAsync(
new SimpleStatement(
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = ? AND table_name = ? ALLOW FILTERING",
_clientOptions.DefaultKeyspace!,
_storageOptions.TableNameOrPrefix)));

if (results.GetAvailableWithoutFetching() == 0)
{
var result = await Execute(
() => Session!.ExecuteAsync(
new SimpleStatement(
$"""
create table "{_storageOptions.TableNameOrPrefix}" (
name varchar,
type blob,
id blob,
state blob,
etag varchar
primary key ((name, type, id))
)
""")));

if (result.GetAvailableWithoutFetching() != 0)
{
LogTableCreated(_storageOptions.TableNameOrPrefix!);
}
}
var storage =
new Table<SingleGrainStorageTable>(Session, _mappingConfiguration, _storageOptions.TableNameOrPrefix);
await Execute(() => storage.CreateIfNotExistsAsync());

_readStatement =
await Execute(
() => Session!.PrepareAsync(
$"select name, type, id, state, etag, exists from \"{_storageOptions.TableNameOrPrefix}\" where name = :name and type = :type and id = :id ALLOW FILTERING"));
$"""
SELECT name, type, id, state, etag
FROM "{_storageOptions.TableNameOrPrefix}"
WHERE name = ? AND type = ? AND id = ?
"""));
_writeStatement =
await Execute(
() => Session!.PrepareAsync(
$"insert into \"{_storageOptions.TableNameOrPrefix}\" (name, type, id, state, etag) VALUES (:name, :type, :id, :state, :etag)"));
$"""
INSERT INTO "{_storageOptions.TableNameOrPrefix}" (name, type, id, state, etag)
VALUES (?, ?, ?, ?, ?)
"""));
_clearStatement =
await Execute(
() => Session!.PrepareAsync(
$"delete from \"{_storageOptions.TableNameOrPrefix}\" where name = :name and type = :type and id = :id"));
$"""
DELETE FROM "{_storageOptions.TableNameOrPrefix}"
WHERE name = ? AND type = ? AND id = ?
"""));
}

private async Task ReadStateInternalAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
var results = await Execute(
() => Session!.ExecuteAsync(_readStatement!.Bind(new
{
id = GenerateId<T>(stateName, grainId),
type = GenerateTypeName<T>(stateName, grainId),
name = GenerateStateName<T>(stateName, grainId),
})));
var name = GenerateStateName<T>(stateName, grainId);
var type = GenerateTypeName<T>(stateName, grainId);
var id = GenerateId<T>(stateName, grainId);
var results = await Execute(() =>
Session!.ExecuteAsync(_readStatement!.Bind(name, type, id)));

if (results.GetAvailableWithoutFetching() == 0)
{
Expand All @@ -156,20 +141,17 @@ private async Task WriteStateInternalAsync<T>(string stateName, GrainId grainId,
var id = GenerateId<T>(stateName, grainId);
var state = _storageOptions.GrainStorageSerializer!.Serialize(grainState.State).ToArray();
var etag = grainState.ETag;
await Execute(
() => Session!.ExecuteAsync(_writeStatement!.Bind(new { name, type, id, state, etag, })));
await Execute(() =>
Session!.ExecuteAsync(_writeStatement!.Bind(name, type, id, state, etag)));
}

private async Task ClearStateInternalAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
var results = await
Execute(() => Session!.ExecuteAsync(_clearStatement!.Bind(
new
{
id = GenerateId<T>(stateName, grainId),
type = GenerateTypeName<T>(stateName, grainId),
name = GenerateStateName<T>(stateName, grainId),
})));
var name = GenerateStateName<T>(stateName, grainId);
var type = GenerateTypeName<T>(stateName, grainId);
var id = GenerateId<T>(stateName, grainId);
var results = await Execute(() =>
Session!.ExecuteAsync(_clearStatement!.Bind(name, type, id)));

if (results.GetAvailableWithoutFetching() != 0)
{
Expand All @@ -178,11 +160,4 @@ private async Task ClearStateInternalAsync<T>(string stateName, GrainId grainId,
grainState.RecordExists = default;
}
}

[LoggerMessage(
EventId = 200,
EventName = "SingleTable Created",
Level = LogLevel.Information,
Message = "Strategy SingleTable '{tableName}' created")]
private partial void LogTableCreated(string tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
</ItemGroup>
<ItemGroup>
<None Remove="Resources\SQL\1_Reminders.sql"/>
<EmbeddedResource Include="Resources\SQL\1_Reminders.sql"/>
</ItemGroup>
<ItemGroup>
<None Pack="true" PackagePath="" Include="README.md"/>
Expand Down
5 changes: 2 additions & 3 deletions src/Reminders/Cassandra/Mapping/ReminderMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ public class ReminderMapping : Mappings
public ReminderMapping()
{
For<Reminder>()
.TableName("reminders")
.ClusteringKey("type", "id", "name")
.PartitionKey("type", "id", "name")
.Column(p => p.Type, cm => cm.WithName("type"))
.Column(p => p.Id, cm => cm.WithName("id"))
.Column(p => p.Hash, cm => cm.WithName("hash"))
.Column(p => p.Hash, cm => cm.WithName("hash").WithSecondaryIndex())
.Column(p => p.Name, cm => cm.WithName("name"))
.Column(p => p.StartOn, cm => cm.WithName("start_on"))
.Column(p => p.Period, cm => cm.WithName("period"))
Expand Down
Loading

0 comments on commit 4f8f755

Please sign in to comment.