diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 351360567..bc07e6b1d 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -18,7 +18,6 @@ namespace DotNetCore.CAP.Processor; public class Dispatcher : IDispatcher { - private CancellationTokenSource? _tasksCts; private readonly CancellationTokenSource _delayCts = new(); private readonly ISubscribeExecutor _executor; private readonly ILogger _logger; @@ -28,16 +27,15 @@ public class Dispatcher : IDispatcher private readonly PriorityQueue _schedulerQueue; private readonly bool _enableParallelExecute; private readonly bool _enableParallelSend; + private readonly int _pChannelSize; + private CancellationTokenSource? _tasksCts; private Channel _publishedChannel = default!; private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> _receivedChannel = default!; private long _nextSendTime = DateTime.MaxValue.Ticks; - public Dispatcher(ILogger logger, - IMessageSender sender, - IOptions options, - ISubscribeExecutor executor, - IDataStorage storage) + public Dispatcher(ILogger logger, IMessageSender sender, IOptions options, + ISubscribeExecutor executor, IDataStorage storage) { _logger = logger; _sender = sender; @@ -47,6 +45,7 @@ public Dispatcher(ILogger logger, _storage = storage; _enableParallelExecute = options.Value.EnableSubscriberParallelExecute; _enableParallelSend = options.Value.EnablePublishParallelSend; + _pChannelSize = Environment.ProcessorCount * 500; } public async Task Start(CancellationToken stoppingToken) @@ -55,14 +54,13 @@ public async Task Start(CancellationToken stoppingToken) _tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None); _tasksCts.Token.Register(() => _delayCts.Cancel()); - _publishedChannel = Channel.CreateBounded( - new BoundedChannelOptions(5000) - { - AllowSynchronousContinuations = true, - SingleReader = true, - SingleWriter = true, - FullMode = BoundedChannelFullMode.Wait - }); + _publishedChannel = Channel.CreateBounded(new BoundedChannelOptions(_pChannelSize) + { + AllowSynchronousContinuations = true, + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return valuetask @@ -207,28 +205,45 @@ private async ValueTask Sending() try { while (await _publishedChannel.Reader.WaitToReadAsync(_tasksCts!.Token).ConfigureAwait(false)) - while (_publishedChannel.Reader.TryRead(out var message)) - try + { + if (_enableParallelSend) + { + var tasks = new List(); + var batchSize = _pChannelSize / 50; + for (var i = 0; i < batchSize && _publishedChannel.Reader.TryRead(out var message); i++) { var item = message; - if (_enableParallelSend) + tasks.Add(Task.Run(async () => { - _ = Task.Run(async () => + try { var result = await _sender.SendAsync(item).ConfigureAwait(false); if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception); - }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}"); + } + })); + } + + await Task.WhenAll(tasks); + } + else + { + while (_publishedChannel.Reader.TryRead(out var message)) + try + { + var result = await _sender.SendAsync(message).ConfigureAwait(false); + if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); } - else + catch (Exception ex) { - var result = await _sender.SendAsync(item).ConfigureAwait(false); - if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception); + _logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}"); } - } - catch (Exception ex) - { - _logger.LogError(ex, "An exception occurred when sending a message to the transport. Id:{MessageId}", message.DbId); - } + } + } + } catch (OperationCanceledException) {