Skip to content

Commit

Permalink
Merge pull request #5 from berdon/berdon/disposing-and-sharing
Browse files Browse the repository at this point in the history
Addressing connection disposing (#3) and sharing (#4)
  • Loading branch information
berdon authored Sep 25, 2018
2 parents 3ceca97 + 8aa90ad commit b21ea3e
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<!-- Versioning properties -->
<PropertyGroup>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">2.0.6</VersionPrefix>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">2.0.7</VersionPrefix>
</PropertyGroup>

<!-- For Debug builds generated a date/time dependent version suffix -->
Expand Down
24 changes: 0 additions & 24 deletions src/Orleans.Redis.Common/ConnectionMultiplexerFactory.cs

This file was deleted.

6 changes: 6 additions & 0 deletions src/Orleans.Redis.Common/IConnectionMultiplexerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ namespace Orleans.Redis.Common
{
public interface IConnectionMultiplexerFactory
{
/// <summary>
/// Call to create a new (or yield a previously created) <see cref="IConnectionMultiplexer"/>.
/// Callers should not manually dispose the connections and instead call <see cref="ReleaseAsync(IConnectionMultiplexer)"/>.
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
Task<IConnectionMultiplexer> CreateAsync(string configuration);
}
}
11 changes: 6 additions & 5 deletions src/Orleans.Storage.Redis/Providers/Storage/RedisGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,27 @@ public class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifec
NullValueHandling = NullValueHandling.Ignore
};

private ConnectionMultiplexer _redisConnection;
private IDatabase _redisClient => _redisConnection.GetDatabase();
private IConnectionMultiplexerFactory _connectionMultiplexerFactory;
private IConnectionMultiplexer _connectionMultiplexer;
private IDatabase _redisClient => _connectionMultiplexer.GetDatabase();

public RedisGrainStorage(string name, RedisGrainStorageOptions options, ILogger logger, IOptions<ClusterOptions> clusterOptions, ISerializationManager serializationManager)
public RedisGrainStorage(string name, RedisGrainStorageOptions options, ILogger logger, IOptions<ClusterOptions> clusterOptions, ISerializationManager serializationManager, IConnectionMultiplexerFactory connectionMultiplexerFactory)
{
_name = name;
_options = options;
_logger = logger ?? SilentLogger.Logger;
_serializationManager = serializationManager;
_clusterOptions = clusterOptions.Value;
_connectionMultiplexerFactory = connectionMultiplexerFactory;
}

public async Task Init(CancellationToken ct)
{
_redisConnection = await ConnectionMultiplexer.ConnectAsync(_options.ConnectionString);
_connectionMultiplexer = await _connectionMultiplexerFactory.CreateAsync(_options.ConnectionString);
}

public Task Close(CancellationToken ct)
{
_redisConnection.Dispose();
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public async Task Shutdown(TimeSpan timeout)

if (_queue != null)
{
await _queue.StopAsync(cts.Token);
await _queue.UnsubscribeAsync(cts.Token);
}

// Remember that we shut down so we never try to read from the queue again.
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Streaming.Redis/Storage/IRedisDataManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal interface IRedisDataManager
Task DeleteQueueMessage(RedisValue value);
Task<IEnumerable<RedisValue>> GetQueueMessagesAsync(int count);
Task InitAsync(CancellationToken ct = default);
Task StopAsync(CancellationToken ct = default);
Task SubscribeAsync(CancellationToken ct = default);
Task UnsubscribeAsync(CancellationToken ct = default);
}
}
23 changes: 11 additions & 12 deletions src/Orleans.Streaming.Redis/Storage/RedisDataManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ internal class RedisDataManager : IRedisDataManager

private static readonly IEnumerable<RedisValue> EmptyRedisValueEnumerable = Array.Empty<RedisValue>();

private bool _initialized = false;
private IConnectionMultiplexer _connectionMultiplexer;

internal ConcurrentQueue<RedisValue> TestHook_Queue => _queue;
Expand All @@ -59,11 +58,11 @@ public async Task InitAsync(CancellationToken ct = default)

try
{
if (ct.IsCancellationRequested) throw new TaskCanceledException();

_connectionMultiplexer = await _connectionMultiplexerFactory.CreateAsync(_options.ConnectionString);

if (ct.IsCancellationRequested) throw new TaskCanceledException();

_initialized = true;
}
catch (Exception exc)
{
Expand All @@ -81,6 +80,8 @@ public async Task SubscribeAsync(CancellationToken ct = default)

try
{
if (ct.IsCancellationRequested) throw new TaskCanceledException();

var subscription = _connectionMultiplexer.GetSubscriber();
await subscription.SubscribeAsync(_redisChannel, OnChannelReceivedData);

Expand All @@ -96,24 +97,18 @@ public async Task SubscribeAsync(CancellationToken ct = default)
}
}

public async Task StopAsync(CancellationToken ct = default)
public async Task UnsubscribeAsync(CancellationToken ct = default)
{
var startTime = DateTimeOffset.UtcNow;

if (!_initialized) throw new InvalidOperationException("Cannot call StopAsync before InitAsync");

try
{
if (ct.IsCancellationRequested) throw new TaskCanceledException();

var subscription = _connectionMultiplexer.GetSubscriber();
await subscription.UnsubscribeAllAsync();
await subscription.UnsubscribeAsync(_redisChannel, OnChannelReceivedData);

if (ct.IsCancellationRequested) throw new TaskCanceledException();

_connectionMultiplexer.Dispose();

_initialized = false;
}
catch (Exception exc)
{
Expand Down Expand Up @@ -199,7 +194,11 @@ private void CheckAlertSlowAccess(DateTimeOffset startOperation, [CallerMemberNa
private void OnChannelReceivedData(RedisChannel channel, RedisValue value)
{
_queue.Enqueue(value);
_logger.Debug("{Count}", _queue.Count);

if (_logger.IsEnabled(Serilog.Events.LogEventLevel.Verbose))
{
_logger.Verbose("{Queue} has {Count} messages", QueueName, _queue.Count);
}

// Dequeue until we're below our queue cache limit
while (_queue.Count > _options.QueueCacheSize && _queue.TryDequeue(out var _)) { }
Expand Down
46 changes: 29 additions & 17 deletions test/StreamProviderTests/RedisDataManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,38 +83,38 @@ public async Task SubscribeAsyncRespectsCancellationTokenTimeout()
}

[Fact]
public async Task StopAsyncRespectsCancellationTokenTimeout()
public async Task UnsubscribeAsyncRespectsCancellationTokenTimeout()
{
var (connectionMultiplexer, subscriber, rdm) = MockRedisDataManager();

await rdm.InitAsync();

var ct = new CancellationToken(true);
await AssertEx.ThrowsAnyAsync<AggregateException>(() => rdm.StopAsync(ct), e => e.InnerException is TaskCanceledException);
await AssertEx.ThrowsAnyAsync<AggregateException>(() => rdm.UnsubscribeAsync(ct), e => e.InnerException is TaskCanceledException);
}

[Fact]
public async Task StopAsyncWithoutInitAsyncThrowsInvalidOperationException()
{
var (connectionMultiplexer, subscriber, rdm) = MockRedisDataManager();

await AssertEx.ThrowsAnyAsync<InvalidOperationException>(() => rdm.StopAsync());
}

[Fact]
public async Task StopAsyncUnsubscribesFromRedisChannel()
public async Task UnsubscribeAsyncUnsubscribesFromRedisChannel()
{
var (connectionMultiplexer, subscriber, rdm) = MockRedisDataManager();

Action<RedisChannel, RedisValue> subscribedHandler = null;
subscriber
.Setup(x => x.UnsubscribeAllAsync(It.IsAny<CommandFlags>()))
.Setup(x => x.SubscribeAsync(ExpectedChannelName, It.IsAny<Action<RedisChannel, RedisValue>>(), It.IsAny<CommandFlags>()))
.Callback((RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags commandFlags) =>
{
subscribedHandler = handler;
})
.Returns(Task.CompletedTask);
subscriber
.Setup(x => x.UnsubscribeAsync(ExpectedChannelName, subscribedHandler, It.IsAny<CommandFlags>()))
.Returns(Task.CompletedTask);

await rdm.InitAsync();
await rdm.SubscribeAsync();
await rdm.StopAsync();
await rdm.UnsubscribeAsync();

subscriber.Verify(x => x.UnsubscribeAllAsync(It.IsAny<CommandFlags>()), Times.Once);
subscriber.Verify(x => x.UnsubscribeAsync(ExpectedChannelName, subscribedHandler, It.IsAny<CommandFlags>()), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -151,7 +151,7 @@ public async Task MessagesFromRedisReturnedByGetQueueMessagesAsync()

AssertEx.Equal(expectedMessages, await rdm.GetQueueMessagesAsync(100));

await rdm.StopAsync();
await rdm.UnsubscribeAsync();
}

[Fact]
Expand All @@ -178,7 +178,7 @@ public async Task PublishedMessagesCallToRedisPublishAsync()

AssertEx.Equal(expectedPublishedMessages, actualPublishedMessages);

await rdm.StopAsync();
await rdm.UnsubscribeAsync();
}

/// <summary>
Expand Down Expand Up @@ -232,11 +232,23 @@ public async Task UnsubscribedInstanceDoesntCallRedisSubscribeAsync()
.Returns(Task.CompletedTask);

await rdm.InitAsync();
await rdm.StopAsync();

subscriber.Verify(x => x.SubscribeAsync(ExpectedChannelName, It.IsAny<Action<RedisChannel, RedisValue>>(), It.IsAny<CommandFlags>()), Times.Never());
}

[Fact]
public async Task UnsubscribeAsyncDoesntDisposeConnectionMultiplexer()
{
var (connectionMultiplexer, subscriber, rdm) = MockRedisDataManager();
connectionMultiplexer.Setup(x => x.Dispose());

await rdm.InitAsync();
await rdm.SubscribeAsync();
await rdm.UnsubscribeAsync();

connectionMultiplexer.Verify(x => x.Dispose(), Times.Never());
}

private (Mock<IConnectionMultiplexer> MockConnectionMultiplexer, Mock<ISubscriber> MockSubscriber, RedisDataManager RedisDataManager) MockRedisDataManager(Mock<IConnectionMultiplexerFactory> connectionMultiplexerFactory = null, RedisStreamOptions redisStreamOptions = null)
{
var mockConnectionMultiplexer = new Mock <IConnectionMultiplexer> { DefaultValue = DefaultValue.Mock };
Expand Down
2 changes: 1 addition & 1 deletion test/StreamProviderTests/RedisQueueAdapterReceiverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public async Task ShutdownRespectsTimeout()
rqar.TestHook_Queue = rdm.Object;

rdm
.Setup(x => x.StopAsync(It.IsAny<CancellationToken>()))
.Setup(x => x.UnsubscribeAsync(It.IsAny<CancellationToken>()))
.Callback((CancellationToken ct) => Thread.Sleep(1))
.Returns(Task.CompletedTask);

Expand Down

0 comments on commit b21ea3e

Please sign in to comment.