From 6fbfaea3de9b09f2857a066eaca69f1ab1c66679 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 8 Jul 2021 17:39:24 +0800 Subject: [PATCH] Improve flow control for message of in memory. #935 --- .../RabbitMQConsumerClient.cs | 11 +++- .../Internal/IConsumerRegister.Default.cs | 5 +- .../Processor/IDispatcher.Default.cs | 61 ++++++++++++++++--- 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 711519855..4236e0965 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -81,16 +81,23 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Commit(object sender) { - _channel.BasicAck((ulong)sender, false); + if (_channel.IsOpen) + { + _channel.BasicAck((ulong)sender, false); + } } public void Reject(object sender) { - _channel.BasicReject((ulong)sender, true); + if (_channel.IsOpen) + { + _channel.BasicReject((ulong)sender, true); + } } public void Dispose() { + _channel?.Dispose(); //The connection should not be closed here, because the connection is still in use elsewhere. //_connection?.Dispose(); diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 6bfacfbc4..4d40380d2 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -165,7 +165,8 @@ public void Pulse() private void RegisterMessageProcessor(IConsumerClient client) { - client.OnMessageReceived += async (sender, transportMessage) => + // Cannot set subscription to asynchronous + client.OnMessageReceived += (sender, transportMessage) => { _logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName()); @@ -193,7 +194,7 @@ private void RegisterMessageProcessor(IConsumerClient client) } var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; - message = await _serializer.DeserializeAsync(transportMessage, type); + message = _serializer.DeserializeAsync(transportMessage, type).GetAwaiter().GetResult(); message.RemoveException(); } catch (Exception e) diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 500e42368..0b1536002 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -21,12 +21,11 @@ public class Dispatcher : IDispatcher private readonly CapOptions _options; private readonly ISubscribeDispatcher _executor; private readonly ILogger _logger; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private Channel _publishedChannel; private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel; - private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - public Dispatcher(ILogger logger, IMessageSender sender, IOptions options, @@ -41,10 +40,26 @@ public Dispatcher(ILogger logger, public void Start(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); - stoppingToken.Register(() => _cts.Cancel()); + stoppingToken.Register(() => _cts.Cancel()); + + var capacity = _options.ProducerThreadCount * 500; + _publishedChannel = Channel.CreateBounded(new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity) + { + AllowSynchronousContinuations = true, + SingleReader = _options.ProducerThreadCount == 1, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); + + capacity = _options.ConsumerThreadCount * 300; + _receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor)>(new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity) + { + AllowSynchronousContinuations = true, + SingleReader = _options.ConsumerThreadCount == 1, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); - _publishedChannel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true }); - _receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>(); Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount) .Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); @@ -55,12 +70,44 @@ public void Start(CancellationToken stoppingToken) public void EnqueueToPublish(MediumMessage message) { - _publishedChannel.Writer.TryWrite(message); + try + { + if (!_publishedChannel.Writer.TryWrite(message)) + { + while (_publishedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult()) + { + if (_publishedChannel.Writer.TryWrite(message)) + { + return; + } + } + } + } + catch (OperationCanceledException) + { + //Ignore + } } public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor) { - _receivedChannel.Writer.TryWrite((message, descriptor)); + try + { + if (!_receivedChannel.Writer.TryWrite((message, descriptor))) + { + while (_receivedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult()) + { + if (_receivedChannel.Writer.TryWrite((message, descriptor))) + { + return; + } + } + } + } + catch (OperationCanceledException) + { + //Ignore + } } private async Task Sending(CancellationToken cancellationToken)