Skip to content

Commit

Permalink
release 2.0.1
Browse files Browse the repository at this point in the history
release 2.0.1
  • Loading branch information
yang-xiaodong authored Sep 16, 2017
2 parents 4fb1e08 + e7d9998 commit d0d8b40
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 102 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ bin/
/.idea
Properties
/pack.bat
/src/DotNetCore.CAP/project.json
/src/DotNetCore.CAP/packages.config
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
/NuGet.config
2 changes: 1 addition & 1 deletion build/version.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionPatch>1</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
Expand Down
31 changes: 20 additions & 11 deletions samples/Sample.RabbitMQ.SqlServer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.IO;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
Expand All @@ -7,22 +8,30 @@ namespace Sample.RabbitMQ.SqlServer
{
public class Program
{

//var config = new ConfigurationBuilder()
// .AddCommandLine(args)
// .AddEnvironmentVariables("ASPNETCORE_")
// .Build();

//var host = new WebHostBuilder()
// .UseConfiguration(config)
// .UseKestrel()
// .UseContentRoot(Directory.GetCurrentDirectory())
// .UseIISIntegration()
// .UseStartup<Startup>()
// .Build();

//host.Run();
public static void Main(string[] args)
{
var config = new ConfigurationBuilder()
.AddCommandLine(args)
.AddEnvironmentVariables("ASPNETCORE_")
.Build();
BuildWebHost(args).Run();
}

var host = new WebHostBuilder()
.UseConfiguration(config)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();

host.Run();
}
}
}
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void AddServices(IServiceCollection services)
services.AddSingleton(kafkaOptions);

services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ public void AddServices(IServiceCollection services)
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();

services.AddSingleton<ConnectionPool>();
services.AddScoped(x => x.GetService<ConnectionPool>().Rent());

services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>();
}
}
}
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private const int DefaultPoolSize = 32;
private const int DefaultPoolSize = 15;

private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>();

Expand Down
14 changes: 10 additions & 4 deletions src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,29 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly IConnection _connection;
private readonly ConnectionPool _connectionPool;
private readonly RabbitMQOptions _rabbitMQOptions;

public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
IConnection connection,
ConnectionPool connectionPool,
RabbitMQOptions rabbitMQOptions,
ILogger<PublishQueueExecutor> logger)
: base(options, stateChanger, logger)
{
_logger = logger;
_connection = connection;
_connectionPool = connectionPool;
_rabbitMQOptions = rabbitMQOptions;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var connection = _connectionPool.Rent();

try
{
using (var channel = _connection.CreateModel())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);

Expand All @@ -55,6 +57,10 @@ public override Task<OperateResult> PublishAsync(string keyName, string content)
Description = ex.Message
}));
}
finally
{
_connectionPool.Return(connection);
}
}
}
}
13 changes: 8 additions & 5 deletions src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class RabbitMQConsumerClient : IConsumerClient
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;

private IConnection _connection;
private ConnectionPool _connectionPool;
private IModel _channel;
private ulong _deliveryTag;

Expand All @@ -23,11 +23,11 @@ internal sealed class RabbitMQConsumerClient : IConsumerClient
public event EventHandler<string> OnError;

public RabbitMQConsumerClient(string queueName,
IConnection connection,
ConnectionPool connectionPool,
RabbitMQOptions options)
{
_queueName = queueName;
_connection = connection;
_connectionPool = connectionPool;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;

Expand All @@ -36,7 +36,9 @@ public RabbitMQConsumerClient(string queueName,

private void InitClient()
{
_channel = _connection.CreateModel();
var connection = _connectionPool.Rent();

_channel = connection.CreateModel();

_channel.ExchangeDeclare(
exchange: _exchageName,
Expand All @@ -49,6 +51,8 @@ private void InitClient()
exclusive: false,
autoDelete: false,
arguments: arguments);

_connectionPool.Return(connection);
}

public void Subscribe(IEnumerable<string> topics)
Expand Down Expand Up @@ -81,7 +85,6 @@ public void Commit()
public void Dispose()
{
_channel.Dispose();
_connection.Dispose();
}

private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly IConnection _connection;
private readonly ConnectionPool _connectionPool;


public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection)
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
{
_rabbitMQOptions = rabbitMQOptions;
_connection = connection;
_connectionPool = pool;
}

public IConsumerClient Create(string groupId)
{
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions);
return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions);
}
}
}
18 changes: 13 additions & 5 deletions src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ public SqlServerCapOptionsExtension(Action<SqlServerOptions> configure)
public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>();
services.AddTransient<ICapPublisher, CapPublisher>();
services.AddTransient<ICallbackPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
AddSqlServerOptions(services);
}

private void AddSqlServerOptions(IServiceCollection services)
{
var sqlServerOptions = new SqlServerOptions();

_configure(sqlServerOptions);
Expand All @@ -32,9 +36,13 @@ public void AddServices(IServiceCollection services)
{
services.AddSingleton(x =>
{
var dbContext = (DbContext)x.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
using (var scope = x.CreateScope())
{
var provider = scope.ServiceProvider;
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
return sqlServerOptions;
}
});
}
else
Expand Down
5 changes: 2 additions & 3 deletions src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ public interface IConsumerServiceSelector
/// <summary>
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with
/// <paramref name="provider"/>.
/// </summary>
/// <param name="provider"> <see cref="IServiceProvider"/>.</param>
/// </summary>
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns>
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider);
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates();

/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
Expand Down
9 changes: 0 additions & 9 deletions src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ private static void AddSubscribeServices(IServiceCollection services)
{
services.AddTransient(service.Key, service.Value);
}

var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types)
{
if (Helper.IsController(type.GetTypeInfo()))
{
services.AddTransient(typeof(object), type);
}
}
}
}
}
7 changes: 2 additions & 5 deletions src/DotNetCore.CAP/IBootstrapper.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ public DefaultBootstrapper(
IOptions<CapOptions> options,
IStorage storage,
IApplicationLifetime appLifetime,
IServiceProvider provider)
IEnumerable<IProcessingServer> servers)
{
_logger = logger;
_appLifetime = appLifetime;
Options = options.Value;
Storage = storage;
Provider = provider;
Servers = Provider.GetServices<IProcessingServer>();
Servers = servers;

_cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() =>
Expand All @@ -55,8 +54,6 @@ public DefaultBootstrapper(

protected IEnumerable<IProcessingServer> Servers { get; }

public IServiceProvider Provider { get; private set; }

public Task BootstrapAsync()
{
return (_bootstrappingTask = BootstrapTaskAsync());
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP/IConsumerHandler.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ConsumerHandler(

public void Start()
{
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped();

foreach (var matchGroup in groupingMatchs)
{
Expand Down
13 changes: 5 additions & 8 deletions src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Internal
Expand All @@ -23,15 +22,13 @@ public ConsumerInvokerFactory(

public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext)
{
using (var scope = _serviceProvider.CreateScope())
var context = new ConsumerInvokerContext(consumerContext)
{
var context = new ConsumerInvokerContext(consumerContext)
{
Result = new DefaultConsumerInvoker(_logger, scope.ServiceProvider, _modelBinderFactory, consumerContext)
};
Result = new DefaultConsumerInvoker(_logger, _serviceProvider,
_modelBinderFactory, consumerContext)
};

return context.Result;
}
return context.Result;
}
}
}
36 changes: 20 additions & 16 deletions src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,29 @@ public async Task InvokeAsync()
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);

var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var serviceType = _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType();
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType);

var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);
var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);

object result = null;
if (_executor.MethodParameters.Length > 0)
{
result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
}
else
{
result = await ExecuteAsync(obj);
}
object result = null;
if (_executor.MethodParameters.Length > 0)
{
result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
}
else
{
result = await ExecuteAsync(obj);
}

if (!string.IsNullOrEmpty(message.CallbackName))
{
await SentCallbackMessage(message.Id, message.CallbackName, result);
if (!string.IsNullOrEmpty(message.CallbackName))
{
await SentCallbackMessage(message.Id, message.CallbackName, result);
}
}
}

Expand Down
Loading

0 comments on commit d0d8b40

Please sign in to comment.