Skip to content

Commit

Permalink
Tweak that when the EnablePublishParallelSend option is true, it wi…
Browse files Browse the repository at this point in the history
…ll be put task into the .NET thread pool in batches rather than all at once. (#1540)
  • Loading branch information
yang-xiaodong committed Jun 21, 2024
1 parent 51453d5 commit 387f4fb
Showing 1 changed file with 42 additions and 27 deletions.
69 changes: 42 additions & 27 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace DotNetCore.CAP.Processor;

public class Dispatcher : IDispatcher
{
private CancellationTokenSource? _tasksCts;
private readonly CancellationTokenSource _delayCts = new();
private readonly ISubscribeExecutor _executor;
private readonly ILogger<Dispatcher> _logger;
Expand All @@ -28,16 +27,15 @@ public class Dispatcher : IDispatcher
private readonly PriorityQueue<MediumMessage, long> _schedulerQueue;
private readonly bool _enableParallelExecute;
private readonly bool _enableParallelSend;
private readonly int _pChannelSize;

private CancellationTokenSource? _tasksCts;
private Channel<MediumMessage> _publishedChannel = default!;
private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> _receivedChannel = default!;
private long _nextSendTime = DateTime.MaxValue.Ticks;

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
IOptions<CapOptions> options,
ISubscribeExecutor executor,
IDataStorage storage)
public Dispatcher(ILogger<Dispatcher> logger, IMessageSender sender, IOptions<CapOptions> options,
ISubscribeExecutor executor, IDataStorage storage)
{
_logger = logger;
_sender = sender;
Expand All @@ -47,6 +45,7 @@ public Dispatcher(ILogger<Dispatcher> logger,
_storage = storage;
_enableParallelExecute = options.Value.EnableSubscriberParallelExecute;
_enableParallelSend = options.Value.EnablePublishParallelSend;
_pChannelSize = Environment.ProcessorCount * 500;
}

public async Task Start(CancellationToken stoppingToken)
Expand All @@ -55,14 +54,13 @@ public async Task Start(CancellationToken stoppingToken)
_tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None);
_tasksCts.Token.Register(() => _delayCts.Cancel());

_publishedChannel = Channel.CreateBounded<MediumMessage>(
new BoundedChannelOptions(5000)
{
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
_publishedChannel = Channel.CreateBounded<MediumMessage>(new BoundedChannelOptions(_pChannelSize)
{
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return valuetask

Expand Down Expand Up @@ -207,28 +205,45 @@ private async ValueTask Sending()
try
{
while (await _publishedChannel.Reader.WaitToReadAsync(_tasksCts!.Token).ConfigureAwait(false))
while (_publishedChannel.Reader.TryRead(out var message))
try
{
if (_enableParallelSend)
{
var tasks = new List<Task>();
var batchSize = _pChannelSize / 50;
for (var i = 0; i < batchSize && _publishedChannel.Reader.TryRead(out var message); i++)
{
var item = message;
if (_enableParallelSend)
tasks.Add(Task.Run(async () =>
{
_ = Task.Run(async () =>
try
{
var result = await _sender.SendAsync(item).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}");
}
}));
}

await Task.WhenAll(tasks);
}
else
{
while (_publishedChannel.Reader.TryRead(out var message))
try
{
var result = await _sender.SendAsync(message).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
}
else
catch (Exception ex)
{
var result = await _sender.SendAsync(item).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
_logger.LogError(ex, $"An exception occurred when sending a message to the transport. Id:{message.DbId}");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "An exception occurred when sending a message to the transport. Id:{MessageId}", message.DbId);
}
}
}

}
catch (OperationCanceledException)
{
Expand Down

0 comments on commit 387f4fb

Please sign in to comment.