Skip to content

Commit

Permalink
Merge pull request #2 from berdon/berdon/memory-leak-and-tests
Browse files Browse the repository at this point in the history
Tests, caching multiplexers, fixing memory leak
  • Loading branch information
berdon authored Sep 21, 2018
2 parents c9fb7b5 + 9f9e025 commit 950003f
Show file tree
Hide file tree
Showing 38 changed files with 1,556 additions and 147 deletions.
15 changes: 11 additions & 4 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<!-- Versioning properties -->
<PropertyGroup>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">2.0.4</VersionPrefix>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">2.0.6</VersionPrefix>
</PropertyGroup>

<!-- For Debug builds generated a date/time dependent version suffix -->
Expand All @@ -23,6 +23,7 @@

<PropertyGroup>
<AutoBogus_Moq>1.0.0</AutoBogus_Moq>
<Microsoft_Extensions_DependencyInjection>2.1.1</Microsoft_Extensions_DependencyInjection>
<dotnet_xunit>2.3.1</dotnet_xunit>
<FluentAssertions>4.19.4</FluentAssertions>
<Microsoft_NET_Test_Sdk>15.6.0</Microsoft_NET_Test_Sdk>
Expand All @@ -35,12 +36,18 @@
<Microsoft_Orleans_Server>2.0.4</Microsoft_Orleans_Server>
<Microsoft_Orleans_Runtime_Legacy>2.0.4</Microsoft_Orleans_Runtime_Legacy>
<Microsoft_Orleans_TestingHost>2.0.4</Microsoft_Orleans_TestingHost>
<Moq>4.7.10</Moq>
<Moq>4.10.0</Moq>
<Newtonsoft_Json>10.0.3</Newtonsoft_Json>
<Nito_AsyncEx_Tasks>5.0.0-pre-02</Nito_AsyncEx_Tasks>
<Serilog>2.5.0</Serilog>
<StackExchange_Redis_StrongName>1.2.6</StackExchange_Redis_StrongName>
<Serilog>2.7.1</Serilog>
<!--
Can't upgrade until the latest Orleans Code Generator is finished.
See https://github.com/dotnet/orleans/issues/4956
https://github.com/dotnet/orleans/pull/4934
-->
<StackExchange_Redis>1.2.6</StackExchange_Redis>
<xunit>2.3.1</xunit>
<xunit_categories>2.0.3</xunit_categories>
<xunit_runner_visualstudio>2.3.1</xunit_runner_visualstudio>
<Zuercher_Orleans_Testing_Utils>1.0.2-dev</Zuercher_Orleans_Testing_Utils>
</PropertyGroup>
Expand Down
36 changes: 36 additions & 0 deletions Orleans.Providers.Redis.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Persistence.Redis",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Redis.Common", "src\Orleans.Redis.Common\Orleans.Redis.Common.csproj", "{2E425139-CBEF-431D-84AC-708762032BE4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StorageTests", "test\StorageTests\StorageTests.csproj", "{39A25FE8-4978-4966-8D55-6752485F16CD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StreamingTests", "test\StreamProviderTests\StreamingTests.csproj", "{5C397A12-6CAB-46CB-8D67-5BA475376419}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shared", "test\Shared\Shared.csproj", "{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StorageTests.GrainInterfaces", "test\StorageTests.GrainInterfaces\StorageTests.GrainInterfaces.csproj", "{EE1D1C02-6809-47FF-A903-AB34BEA6344B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -46,6 +54,30 @@ Global
{2E425139-CBEF-431D-84AC-708762032BE4}.Release|Any CPU.Build.0 = Release|Any CPU
{2E425139-CBEF-431D-84AC-708762032BE4}.Test|Any CPU.ActiveCfg = Test|Any CPU
{2E425139-CBEF-431D-84AC-708762032BE4}.Test|Any CPU.Build.0 = Test|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Release|Any CPU.Build.0 = Release|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Test|Any CPU.ActiveCfg = Debug|Any CPU
{39A25FE8-4978-4966-8D55-6752485F16CD}.Test|Any CPU.Build.0 = Debug|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Release|Any CPU.Build.0 = Release|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Test|Any CPU.ActiveCfg = Debug|Any CPU
{5C397A12-6CAB-46CB-8D67-5BA475376419}.Test|Any CPU.Build.0 = Debug|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Release|Any CPU.Build.0 = Release|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Test|Any CPU.ActiveCfg = Debug|Any CPU
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC}.Test|Any CPU.Build.0 = Debug|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Release|Any CPU.Build.0 = Release|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Test|Any CPU.ActiveCfg = Debug|Any CPU
{EE1D1C02-6809-47FF-A903-AB34BEA6344B}.Test|Any CPU.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -55,6 +87,10 @@ Global
{22109A4D-8E78-4E67-9921-5C61DC42851F} = {217DCEEF-7C5B-47F1-A148-E01B7061DB07}
{0E16DE16-EFEC-4780-AAC2-B90D83CBC3DC} = {CF37789C-0A7A-4949-9D98-1B3FFC547415}
{2E425139-CBEF-431D-84AC-708762032BE4} = {CF37789C-0A7A-4949-9D98-1B3FFC547415}
{39A25FE8-4978-4966-8D55-6752485F16CD} = {217DCEEF-7C5B-47F1-A148-E01B7061DB07}
{5C397A12-6CAB-46CB-8D67-5BA475376419} = {217DCEEF-7C5B-47F1-A148-E01B7061DB07}
{507EB8BD-6341-46C9-A4DF-E64C414DCBBC} = {217DCEEF-7C5B-47F1-A148-E01B7061DB07}
{EE1D1C02-6809-47FF-A903-AB34BEA6344B} = {217DCEEF-7C5B-47F1-A148-E01B7061DB07}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AD5DBB80-3BA1-4CF0-8B91-71F183EEA3B3}
Expand Down
46 changes: 46 additions & 0 deletions src/Orleans.Redis.Common/CachedConnectionMultiplexerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace Orleans.Redis.Common
{
/// <summary>
/// Caches connections based on the configuration string and reuses them
/// if able.
/// </summary>
public class CachedConnectionMultiplexerFactory : IConnectionMultiplexerFactory
{
public static IConnectionMultiplexerFactory Default = new CachedConnectionMultiplexerFactory();

private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private readonly Dictionary<string, IConnectionMultiplexer> _connectionMultiplexers = new Dictionary<string, IConnectionMultiplexer>();
internal Dictionary<string, IConnectionMultiplexer> TestHook_ConnectionMultiplexers => _connectionMultiplexers;

private CachedConnectionMultiplexerFactory() { }

public async Task<IConnectionMultiplexer> CreateAsync(string configuration)
{
if (!_connectionMultiplexers.TryGetValue(configuration, out var connectionMultiplexer))
{
try
{
await _lock.WaitAsync();

if (!_connectionMultiplexers.TryGetValue(configuration, out connectionMultiplexer))
{
connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(configuration);
_connectionMultiplexers.Add(configuration, connectionMultiplexer);
}
}
finally
{
_lock.Release();
}
}

return connectionMultiplexer;
}
}
}
24 changes: 24 additions & 0 deletions src/Orleans.Redis.Common/ConnectionMultiplexerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.Redis.Common
{
/// <summary>
/// You probably don't want this factory implementation. This does not pool
/// or attempt to reuse connection multiplexers.
/// </summary>
internal class ConnectionMultiplexerFactory : IConnectionMultiplexerFactory
{
public static ConnectionMultiplexerFactory Default = new ConnectionMultiplexerFactory();

private ConnectionMultiplexerFactory() { }

public async Task<IConnectionMultiplexer> CreateAsync(string configuration)
{
return await ConnectionMultiplexer.ConnectAsync(configuration);
}
}
}
10 changes: 10 additions & 0 deletions src/Orleans.Redis.Common/IConnectionMultiplexerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using StackExchange.Redis;
using System.Threading.Tasks;

namespace Orleans.Redis.Common
{
public interface IConnectionMultiplexerFactory
{
Task<IConnectionMultiplexer> CreateAsync(string configuration);
}
}
1 change: 1 addition & 0 deletions src/Orleans.Redis.Common/Orleans.Redis.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Core" Version="$(Microsoft_Orleans_Core)" />
<PackageReference Include="Serilog" Version="$(Serilog)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchange_Redis)" />
</ItemGroup>
</Project>
4 changes: 4 additions & 0 deletions src/Orleans.Redis.Common/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("StreamingTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
12 changes: 12 additions & 0 deletions src/Orleans.Redis.Common/SilentLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Serilog;
using System;
using System.Collections.Generic;
using System.Text;

namespace Orleans.Redis.Common
{
public class SilentLogger
{
public static readonly ILogger Logger = new LoggerConfiguration().CreateLogger();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal static IServiceCollection AddRedisGrainStorage(this IServiceCollection
{
configureOptions?.Invoke(services.AddOptions<RedisGrainStorageOptions>(name));

services.TryAddSingleton(CachedConnectionMultiplexerFactory.Default);
services.TryAddSingleton<ISerializationManager, OrleansSerializationManager>();
services.AddTransient<IConfigurationValidator>(sp => new RedisGrainStorageOptionsValidator(sp.GetService<IOptionsSnapshot<RedisGrainStorageOptions>>().Get(name), name));
services.ConfigureNamedOptionForLogging<RedisGrainStorageOptions>(name);
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Storage.Redis/Orleans.Persistence.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<PackageReference Include="Microsoft.Orleans.Core" Version="$(Microsoft_Orleans_Core)" />
<PackageReference Include="Microsoft.Orleans.Server" Version="$(Microsoft_Orleans_Server)" />
<PackageReference Include="Serilog" Version="$(Serilog)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchange_Redis_StrongName)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchange_Redis)" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public RedisGrainStorage(string name, RedisGrainStorageOptions options, ILogger
{
_name = name;
_options = options;
_logger = logger;
_logger = logger ?? SilentLogger.Logger;
_serializationManager = serializationManager;
_clusterOptions = clusterOptions.Value;
}
Expand Down
9 changes: 6 additions & 3 deletions src/Orleans.Streaming.Redis/Orleans.Streaming.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<DefineConstants>TRACE;TEST</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<NoWarn>1701;1702;IDE0016</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand All @@ -28,12 +29,14 @@
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Client" Version="$(Microsoft_Orleans_Client)" />
<PackageReference Include="Microsoft.Orleans.Server" Version="$(Microsoft_Orleans_Server)" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator.Build" Version="$(Microsoft_Orleans_OrleansCodeGenerator_Build)" />
<PackageReference Include="Microsoft.Orleans.Core.Legacy" Version="$(Microsoft_Orleans_Core_Legacy)" />
<PackageReference Include="Microsoft.Orleans.Runtime.Legacy" Version="$(Microsoft_Orleans_Runtime_Legacy)" />
<PackageReference Include="Nito.AsyncEx.Tasks" Version="$(Nito_AsyncEx_Tasks)" />
<PackageReference Include="Serilog" Version="$(Serilog)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchange_Redis_StrongName)" />
<PackageReference Include="Zuercher.Orleans.Redis.Common" Version="2.0.4-dev7" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchange_Redis)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Orleans.Redis.Common\Orleans.Redis.Common.csproj" />
</ItemGroup>
</Project>
4 changes: 4 additions & 0 deletions src/Orleans.Streaming.Redis/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("StreamingTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
23 changes: 16 additions & 7 deletions src/Orleans.Streaming.Redis/Providers/Streams/RedisQueueAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Orleans.Configuration;
using Orleans.Redis.Common;
using Orleans.Streaming.Redis.Storage;
using Orleans.Streams;
using Serilog;
Expand All @@ -13,11 +14,12 @@ namespace Orleans.Providers.Streams.Redis
{
internal class RedisQueueAdapter : IQueueAdapter
{
protected readonly string ServiceId;
protected readonly ConcurrentDictionary<QueueId, RedisDataManager> Queues = new ConcurrentDictionary<QueueId, RedisDataManager>();
private readonly string ServiceId;
private readonly ConcurrentDictionary<QueueId, RedisDataManager> Queues = new ConcurrentDictionary<QueueId, RedisDataManager>();

private readonly RedisStreamOptions _redisStreamOptions;
private readonly HashRingBasedStreamQueueMapper _streamQueueMapper;
private readonly IConnectionMultiplexerFactory _connectionMultiplexerFactory;
private readonly IStreamQueueMapper _streamQueueMapper;
private readonly ILogger _logger;
private readonly IRedisDataAdapter _dataAdapter;

Expand All @@ -28,30 +30,37 @@ internal class RedisQueueAdapter : IQueueAdapter

public RedisQueueAdapter(
RedisStreamOptions options,
IConnectionMultiplexerFactory connectionMultiplexerFactory,
IRedisDataAdapter dataAdapter,
HashRingBasedStreamQueueMapper streamQueueMapper,
IStreamQueueMapper streamQueueMapper,
ILogger logger,
string serviceId,
string providerName)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (connectionMultiplexerFactory == null) throw new ArgumentNullException(nameof(connectionMultiplexerFactory));
if (dataAdapter == null) throw new ArgumentNullException(nameof(dataAdapter));
if (streamQueueMapper == null) throw new ArgumentNullException(nameof(streamQueueMapper));
if (string.IsNullOrEmpty(serviceId)) throw new ArgumentNullException(nameof(serviceId));
if (string.IsNullOrEmpty(providerName)) throw new ArgumentNullException(nameof(providerName));

_redisStreamOptions = options;
_connectionMultiplexerFactory = connectionMultiplexerFactory;
ServiceId = serviceId;
Name = providerName;
_streamQueueMapper = streamQueueMapper;
_dataAdapter = dataAdapter;
_logger = logger;
_logger = (logger ?? SilentLogger.Logger).ForContext<RedisQueueAdapter>();
}

public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
return RedisAdapterReceiver.Create(
return RedisQueueAdapterReceiver.Create(
_logger,
queueId,
ServiceId,
_redisStreamOptions,
_connectionMultiplexerFactory,
_dataAdapter);
}

Expand All @@ -66,7 +75,7 @@ public async Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamNamesp

if (!Queues.TryGetValue(queueId, out var queue))
{
var tmpQueue = new RedisDataManager(_redisStreamOptions, _logger, queueId.ToString(), ServiceId);
var tmpQueue = new RedisDataManager(_redisStreamOptions, _connectionMultiplexerFactory, _logger, queueId.ToString(), ServiceId);
await tmpQueue.InitAsync();
queue = Queues.GetOrAdd(queueId, tmpQueue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ public class RedisQueueAdapterFactory : IQueueAdapterFactory
{
private readonly string _providerName;
private readonly RedisStreamOptions _options;
private readonly IConnectionMultiplexerFactory _connectionMultiplexerFactory;
private readonly ClusterOptions _clusterOptions;
private readonly ILogger _logger;
private readonly IRedisDataAdapter _dataAdapter;
private readonly HashRingBasedStreamQueueMapper _streamQueueMapper;
private readonly IStreamQueueMapper _streamQueueMapper;
private readonly IQueueAdapterCache _adapterCache;

/// <summary>
Expand All @@ -30,24 +31,35 @@ public class RedisQueueAdapterFactory : IQueueAdapterFactory
public RedisQueueAdapterFactory(
string name,
RedisStreamOptions options,
IConnectionMultiplexerFactory connectionMultiplexerFactory,
HashRingStreamQueueMapperOptions queueMapperOptions,
SimpleQueueCacheOptions cacheOptions,
IServiceProvider serviceProvider,
IOptions<ClusterOptions> clusterOptions,
IRedisDataAdapter dataAdapter,
ILogger logger,
ISerializationManager serializationManager,
IServiceProvider provider)
ISerializationManager serializationManager)
{
if (string.IsNullOrEmpty(name)) throw new ArgumentNullException(nameof(name));
if (options == null) throw new ArgumentNullException(nameof(options));
if (connectionMultiplexerFactory == null) throw new ArgumentNullException(nameof(connectionMultiplexerFactory));
if (queueMapperOptions == null) throw new ArgumentNullException(nameof(queueMapperOptions));
if (cacheOptions == null) throw new ArgumentNullException(nameof(cacheOptions));
if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider));
if (clusterOptions == null) throw new ArgumentNullException(nameof(clusterOptions));
if (dataAdapter == null) throw new ArgumentNullException(nameof(dataAdapter));
if (serializationManager == null) throw new ArgumentNullException(nameof(serializationManager));

_providerName = name;
_options = options ?? throw new ArgumentNullException(nameof(options));
_options = options;
_connectionMultiplexerFactory = connectionMultiplexerFactory;
_clusterOptions = clusterOptions.Value;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger != null ? logger.ForContext<RedisQueueAdapterFactory>() : SilentLogger.Logger;
_dataAdapter = dataAdapter;

_streamQueueMapper = new HashRingBasedStreamQueueMapper(queueMapperOptions, _providerName);

var microsoftLoggerFactory = provider.GetService<Microsoft.Extensions.Logging.ILoggerFactory>();
var microsoftLoggerFactory = serviceProvider.GetService<Microsoft.Extensions.Logging.ILoggerFactory>();
_adapterCache = new SimpleQueueAdapterCache(cacheOptions, _providerName, microsoftLoggerFactory);
}

Expand All @@ -61,6 +73,7 @@ public Task<IQueueAdapter> CreateAdapter()
{
var adapter = new RedisQueueAdapter(
_options,
_connectionMultiplexerFactory,
_dataAdapter,
_streamQueueMapper,
_logger,
Expand Down
Loading

0 comments on commit 950003f

Please sign in to comment.