From c49058cf5ba4f3af77e2b40dc613ee57be824bad Mon Sep 17 00:00:00 2001 From: Nathan Date: Wed, 15 Jul 2020 08:00:50 -0400 Subject: [PATCH] Fix Lost Spans on Close (#182) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make InMemorySender Asynchronous To demonstrate lost span problem (to be shortly reported). When AppendAsync is actually asynchronous, some spans are lost on Tracer Dispose. This change should have no effect, but causes RemoteReporterTests.TestRemoteReporterFlushesOnClose to fail intermittently. Signed-off-by: phxnsharp * Do not use Task.Factory.StartNew For some reason awaiting the task returned always completes immediately. Signed-off-by: phxnsharp * Add Thread Safety to InMemorySender This didn't help reliability, but I believe it is necessary because different threads may be adding and flushing at the same time. Signed-off-by: phxnsharp * Style cleanup Use slightly better style for _blocker. Also, turns out async in Flush causes other issues, removed for now. Signed-off-by: phxnsharp * Fix Intermittent Test Failure Found several problems that were contributing: - RemoteControlledSampler was using a long lived Wait() call on a thread pool thread, which was consuming Thread Pool threads and causing delays as the thread pool was exausted - RemoteReporter.ProcessQueueLoop does have long lived waits due to the BlockingQueue. Reworked it to work correctly on a LongLived thread without consuming thread pool threads. - Many of the tests were leaving behind the ProcessQueueLoop because the blocker was never released. There is still an intermittent test failure, but that was there before I started changing code. Signed-off-by: phxnsharp Co-authored-by: Benjamin Krämer --- src/Jaeger.Core/Reporters/RemoteReporter.cs | 6 +-- .../Samplers/RemoteControlledSampler.cs | 6 +-- .../Senders/InMemorySender.cs | 39 ++++++++++++++----- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/Jaeger.Core/Reporters/RemoteReporter.cs b/src/Jaeger.Core/Reporters/RemoteReporter.cs index d8341d553..6b555a569 100644 --- a/src/Jaeger.Core/Reporters/RemoteReporter.cs +++ b/src/Jaeger.Core/Reporters/RemoteReporter.cs @@ -38,7 +38,7 @@ internal RemoteReporter(ISender sender, TimeSpan flushInterval, int maxQueueSize _queueProcessorTask = Task.Factory.StartNew(ProcessQueueLoop, TaskCreationOptions.LongRunning); _flushInterval = flushInterval; - _flushTask = Task.Factory.StartNew(FlushLoop, TaskCreationOptions.LongRunning); + _flushTask = Task.Run(FlushLoop); } public void Report(Span span) @@ -123,14 +123,14 @@ private async Task FlushLoop() while (!_commandQueue.IsAddingCompleted); } - private async Task ProcessQueueLoop() + private void ProcessQueueLoop() { // This blocks until a command is available or IsCompleted=true foreach (ICommand command in _commandQueue.GetConsumingEnumerable()) { try { - await command.ExecuteAsync().ConfigureAwait(false); + command.ExecuteAsync().Wait(); } catch (SenderException ex) { diff --git a/src/Jaeger.Core/Samplers/RemoteControlledSampler.cs b/src/Jaeger.Core/Samplers/RemoteControlledSampler.cs index 8745fbbfd..5d77da8e0 100644 --- a/src/Jaeger.Core/Samplers/RemoteControlledSampler.cs +++ b/src/Jaeger.Core/Samplers/RemoteControlledSampler.cs @@ -40,12 +40,12 @@ private RemoteControlledSampler(Builder builder) /// /// Updates to a new sampler when it is different. /// - internal void UpdateSampler() + internal async void UpdateSampler() { try { - SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName) - .ConfigureAwait(false).GetAwaiter().GetResult(); + SamplingStrategyResponse response = await _samplingManager.GetSamplingStrategyAsync(_serviceName) + .ConfigureAwait(false); _metrics.SamplerRetrieved.Inc(1); diff --git a/test/Jaeger.Core.Tests/Senders/InMemorySender.cs b/test/Jaeger.Core.Tests/Senders/InMemorySender.cs index 12cc0cd83..0e27679db 100644 --- a/test/Jaeger.Core.Tests/Senders/InMemorySender.cs +++ b/test/Jaeger.Core.Tests/Senders/InMemorySender.cs @@ -40,27 +40,46 @@ public List GetReceived() return new List(_received); } - public Task AppendAsync(Span span, CancellationToken cancellationToken) + public async Task AppendAsync(Span span, CancellationToken cancellationToken) { - _blocker.Wait(cancellationToken); + //This serves to both make this call actually asynchronous and also to prevent the + //blocking call from consuming a Thread Pool thread. + await Task.Factory.StartNew(() => _blocker.Wait(cancellationToken), + TaskCreationOptions.LongRunning); - _appended.Add(span); - _received.Add(span); - return Task.FromResult(0); + lock (_appended) + { + _appended.Add(span); + _received.Add(span); + } + return 0; } public virtual Task FlushAsync(CancellationToken cancellationToken) { - int flushedSpans = _appended.Count; - _flushed.AddRange(_appended); - _appended.Clear(); + //This conflicts with the way TestCloseWhenQueueFull is written. Since + //it blocks the process queue from ever ending, RemoteReporter.CloseAsync + //is guaranteed to timeout, which means cancellationToken here will already + //be set. This prevents the rest of the function from running, causing the + //test to fail. + //await Task.Delay(1, cancellationToken); + + int flushedSpans; + lock (_appended ) + { + flushedSpans = _appended.Count; + _flushed.AddRange(_appended); + _appended.Clear(); + } return Task.FromResult(flushedSpans); } - public Task CloseAsync(CancellationToken cancellationToken) + public async Task CloseAsync(CancellationToken cancellationToken) { - return FlushAsync(cancellationToken); + int result = await FlushAsync(cancellationToken); + AllowAppend(); + return result; } public void BlockAppend()