Skip to content

Commit

Permalink
Merge pull request #936 from dotnetcore/supports/flow-control
Browse files Browse the repository at this point in the history
Improve flow control for message of in memory
  • Loading branch information
xiangxiren authored Jul 9, 2021
2 parents 2e8256c + 6fbfaea commit e636e94
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
11 changes: 9 additions & 2 deletions src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,23 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)

public void Commit(object sender)
{
_channel.BasicAck((ulong)sender, false);
if (_channel.IsOpen)
{
_channel.BasicAck((ulong)sender, false);
}
}

public void Reject(object sender)
{
_channel.BasicReject((ulong)sender, true);
if (_channel.IsOpen)
{
_channel.BasicReject((ulong)sender, true);
}
}

public void Dispose()
{

_channel?.Dispose();
//The connection should not be closed here, because the connection is still in use elsewhere.
//_connection?.Dispose();
Expand Down
5 changes: 3 additions & 2 deletions src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void Pulse()

private void RegisterMessageProcessor(IConsumerClient client)
{
client.OnMessageReceived += async (sender, transportMessage) =>
// Cannot set subscription to asynchronous
client.OnMessageReceived += (sender, transportMessage) =>
{
_logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName());

Expand Down Expand Up @@ -193,7 +194,7 @@ private void RegisterMessageProcessor(IConsumerClient client)
}

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
message = await _serializer.DeserializeAsync(transportMessage, type);
message = _serializer.DeserializeAsync(transportMessage, type).GetAwaiter().GetResult();
message.RemoveException();
}
catch (Exception e)
Expand Down
61 changes: 54 additions & 7 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ public class Dispatcher : IDispatcher
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

private Channel<MediumMessage> _publishedChannel;
private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;

private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
IOptions<CapOptions> options,
Expand All @@ -41,10 +40,26 @@ public Dispatcher(ILogger<Dispatcher> logger,
public void Start(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
stoppingToken.Register(() => _cts.Cancel());
stoppingToken.Register(() => _cts.Cancel());

var capacity = _options.ProducerThreadCount * 500;
_publishedChannel = Channel.CreateBounded<MediumMessage>(new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ProducerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

capacity = _options.ConsumerThreadCount * 300;
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor)>(new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ConsumerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true });
_receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>();

Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
Expand All @@ -55,12 +70,44 @@ public void Start(CancellationToken stoppingToken)

public void EnqueueToPublish(MediumMessage message)
{
_publishedChannel.Writer.TryWrite(message);
try
{
if (!_publishedChannel.Writer.TryWrite(message))
{
while (_publishedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (_publishedChannel.Writer.TryWrite(message))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor)
{
_receivedChannel.Writer.TryWrite((message, descriptor));
try
{
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
{
while (_receivedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (_receivedChannel.Writer.TryWrite((message, descriptor)))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

private async Task Sending(CancellationToken cancellationToken)
Expand Down

0 comments on commit e636e94

Please sign in to comment.