Skip to content
This repository has been archived by the owner on Apr 28, 2022. It is now read-only.

Commit

Permalink
Fix blocking code (#191)
Browse files Browse the repository at this point in the history
* fixed blocking code in Reporters/RemoteReporter.cs

Signed-off-by: Bojan Pantovic <[email protected]>

* replace blocking collection with BufferBlock to make everything async

Signed-off-by: Bojan Pantovic <[email protected]>

* code review changes

Signed-off-by: Bojan Pantovic <[email protected]>

* code review changes #2

Signed-off-by: Bojan Pantovic <[email protected]>

* Fix NuGET related build warnings

Signed-off-by: Kraemer, Benjamin <[email protected]>
Signed-off-by: Bojan Pantovic <[email protected]>

Co-authored-by: [email protected] <[email protected]>
Co-authored-by: Kraemer, Benjamin <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2020
1 parent 462eef0 commit e3b70cd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/Jaeger.Core/Jaeger.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="OpenTracing" Version="0.12.1" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" />
</ItemGroup>

</Project>
49 changes: 18 additions & 31 deletions src/Jaeger.Core/Reporters/RemoteReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Jaeger.Exceptions;
using Jaeger.Metrics;
using Jaeger.Senders;
Expand All @@ -18,7 +19,7 @@ public class RemoteReporter : IReporter
public const int DefaultMaxQueueSize = 100;
public static readonly TimeSpan DefaultFlushInterval = TimeSpan.FromSeconds(1);

private readonly BlockingCollection<ICommand> _commandQueue;
private readonly BufferBlock<ICommand> _commandQueue;
private readonly Task _queueProcessorTask;
private readonly TimeSpan _flushInterval;
private readonly Task _flushTask;
Expand All @@ -32,28 +33,22 @@ internal RemoteReporter(ISender sender, TimeSpan flushInterval, int maxQueueSize
_sender = sender;
_metrics = metrics;
_logger = loggerFactory.CreateLogger<RemoteReporter>();
_commandQueue = new BlockingCollection<ICommand>(maxQueueSize);
_commandQueue = new BufferBlock<ICommand>( new DataflowBlockOptions() {
BoundedCapacity = maxQueueSize
});

// start a thread to append spans
_queueProcessorTask = Task.Factory.StartNew(ProcessQueueLoop, TaskCreationOptions.LongRunning);
_queueProcessorTask = Task.Run(async () => { await ProcessQueueLoop(); });

_flushInterval = flushInterval;
_flushTask = Task.Run(FlushLoop);
}

public void Report(Span span)
{
bool added = false;
try
{
// It's better to drop spans, than to block here
added = _commandQueue.TryAdd(new AppendCommand(this, span));
}
catch (InvalidOperationException)
{
// The queue has been marked as IsAddingCompleted -> no-op.
}

// It's better to drop spans, than to block here
var added = _commandQueue.Post(new AppendCommand(this, span));

if (!added)
{
_metrics.ReporterDropped.Inc(1);
Expand All @@ -62,9 +57,7 @@ public void Report(Span span)

public async Task CloseAsync(CancellationToken cancellationToken)
{
// Note: Java creates a CloseCommand but we have CompleteAdding() in C# so we don't need the command.
// (This also stops the FlushLoop)
_commandQueue.CompleteAdding();
_commandQueue.Complete();

try
{
Expand Down Expand Up @@ -100,16 +93,9 @@ internal void Flush()
// to reduce the number of updateGauge stats, we only emit queue length on flush
_metrics.ReporterQueueLength.Update(_commandQueue.Count);

try
{
// We can safely drop FlushCommand when the queue is full - sender should take care of flushing
// in such case
_commandQueue.TryAdd(new FlushCommand(this));
}
catch (InvalidOperationException)
{
// The queue has been marked as IsAddingCompleted -> no-op.
}
// We can safely drop FlushCommand when the queue is full - sender should take care of flushing
// in such case
_commandQueue.Post(new FlushCommand(this));
}

private async Task FlushLoop()
Expand All @@ -120,17 +106,18 @@ private async Task FlushLoop()
await Task.Delay(_flushInterval).ConfigureAwait(false);
Flush();
}
while (!_commandQueue.IsAddingCompleted);
while (!_commandQueue.Completion.IsCompleted);
}

private void ProcessQueueLoop()
private async Task ProcessQueueLoop()
{
// This blocks until a command is available or IsCompleted=true
foreach (ICommand command in _commandQueue.GetConsumingEnumerable())
while (await _commandQueue.OutputAvailableAsync())
{
try
{
command.ExecuteAsync().Wait();
var command = await _commandQueue.ReceiveAsync();
await command.ExecuteAsync();
}
catch (SenderException ex)
{
Expand Down

0 comments on commit e3b70cd

Please sign in to comment.