From 1bd2db0493c2c9e8388c68b02c44c4743c0137a2 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 11:35:27 +0000 Subject: [PATCH] Null check bug fixes and tweak accessibility of properties on `TransportResponse` (#144) `DefaultResponseFactory` was throwing if the response stream was null. This can occur when an exception is thrown when sending the request (e.g., `HttpRequestException`), for example, when the `HttpClient` cannot connect to the endpoint. Rather than throwing a null exception here, we still want to return a response with the original exception attached. In `StreamResponse`, we must safety-check that any linked disposables are not null before attempting to dispose of them. The final change in `TransportResponse` is a tweak for the ingest work. The `BulkStreamingResponse` was initially derived from the `StreamResponse` to share behaviour. However, this causes the `Body` property (stream) to be present on the derived type. As we are handling stream reading internally, this is unnecessary and could produce weird behaviour if the consumer tries to access the stream directly. Instead, `BulkStreamingResponse` derives directly from `TransportResponse`, overriding `LeaveOpen` and handling `LinkedDisposables` in its own `Dispose` method. This means we could potentially seal `StreamResponse` again. However, it might still be helpful for consumers to derive responses from this for advanced scenarios, with the base class doing the right thing during disposal. I am open to thoughts on whether that's likely to happen. @flobernd, were you deriving from this in the client? --- .../OpenTelemetry/OpenTelemetry.cs | 8 +++ src/Elastic.Transport/DistributedTransport.cs | 7 +- .../Responses/DefaultResponseFactory.cs | 4 +- .../Responses/Special/StreamResponse.cs | 56 +++------------ .../Responses/Special/StreamResponseBase.cs | 70 +++++++++++++++++++ .../Responses/TransportResponse.cs | 8 +-- 6 files changed, 98 insertions(+), 55 deletions(-) create mode 100644 src/Elastic.Transport/Responses/Special/StreamResponseBase.cs diff --git a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs index ba70e04..aec90dd 100644 --- a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs +++ b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs @@ -53,6 +53,14 @@ internal static void SetCommonAttributes(Activity? activity, ITransportConfigura } var productSchemaVersion = string.Empty; + foreach (var attribute in activity.TagObjects) + { + if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal)) + { + if (attribute.Value is string schemaVersion) + productSchemaVersion = schemaVersion; + } + } // We add the client schema version only when it differs from the product schema version if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal)) diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index dbe50ee..8364fd4 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -124,9 +124,6 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) { - if (activity.IsAllDataRequested) - OpenTelemetry.SetCommonAttributes(activity, Configuration); - if (Configuration.Authentication is BasicAuthentication basicAuthentication) activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username); @@ -261,9 +258,13 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); + // We don't check IsAllDataRequested here as that's left to the consumer. if (configureActivity is not null && activity is not null) configureActivity.Invoke(activity); + if (activity is { IsAllDataRequested: true }) + OpenTelemetry.SetCommonAttributes(activity, Configuration); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); } finally diff --git a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs index 46ae787..afaf378 100644 --- a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs +++ b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs @@ -79,13 +79,11 @@ private async ValueTask CreateCoreAsync( IReadOnlyDictionary? tcpStats, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() { - responseStream.ThrowIfNull(nameof(responseStream)); - var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength); TResponse? response = null; - if (MayHaveBody(statusCode, endpoint.Method, contentLength) + if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength) && TryResolveBuilder(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder)) { var ownsStream = false; diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs index d097ac3..fed6df0 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -8,65 +8,31 @@ namespace Elastic.Transport; /// -/// A response that exposes the response as . +/// A response that exposes the response as a . /// /// MUST be disposed after use to ensure the HTTP connection is freed for reuse. /// /// -public class StreamResponse : TransportResponse, IDisposable +public sealed class StreamResponse : StreamResponseBase, IDisposable { - private bool _disposed; - - /// - /// The MIME type of the response, if present. - /// - public string ContentType { get; } - /// - public StreamResponse() - { - Body = Stream.Null; + public StreamResponse() : base(Stream.Null) => ContentType = string.Empty; - } /// - public StreamResponse(Stream body, string? contentType) - { - Body = body; + public StreamResponse(Stream body, string? contentType) : base(body) => ContentType = contentType ?? string.Empty; - } - - internal override bool LeaveOpen => true; /// - /// Disposes the underlying stream. + /// The MIME type of the response, if present. /// - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposed) - { - if (disposing) - { - Body.Dispose(); - - if (LinkedDisposables is not null) - { - foreach (var disposable in LinkedDisposables) - disposable.Dispose(); - } - } - - _disposed = true; - } - } + public string ContentType { get; } /// - /// Disposes the underlying stream. + /// The raw response stream. /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } + public Stream Body => Stream; + + /// + protected internal override bool LeaveOpen => true; } diff --git a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs new file mode 100644 index 0000000..06ebb67 --- /dev/null +++ b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.IO; +using Elastic.Transport.Extensions; + +namespace Elastic.Transport; + +/// +/// A base class for implementing responses that access the raw response stream. +/// +public abstract class StreamResponseBase : TransportResponse, IDisposable +{ + /// + protected internal override bool LeaveOpen => true; + + /// + /// The raw response stream from the HTTP layer. + /// + /// + /// MUST be disposed to release the underlying HTTP connection for reuse. + /// + protected Stream Stream { get; } + + /// + /// Indicates that the response has been disposed and it is not longer safe to access the stream. + /// + protected bool Disposed { get; private set; } + + /// + public StreamResponseBase(Stream responseStream) + { + responseStream.ThrowIfNull(nameof(responseStream)); + Stream = responseStream; + } + + /// + /// Disposes the underlying stream. + /// + /// + protected virtual void Dispose(bool disposing) + { + if (!Disposed) + { + if (disposing) + { + Stream?.Dispose(); + + if (LinkedDisposables is not null) + { + foreach (var disposable in LinkedDisposables) + disposable?.Dispose(); + } + } + + Disposed = true; + } + } + + /// + /// Disposes the underlying stream. + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } +} diff --git a/src/Elastic.Transport/Responses/TransportResponse.cs b/src/Elastic.Transport/Responses/TransportResponse.cs index 4e7f4a8..ca72888 100644 --- a/src/Elastic.Transport/Responses/TransportResponse.cs +++ b/src/Elastic.Transport/Responses/TransportResponse.cs @@ -10,12 +10,12 @@ namespace Elastic.Transport; /// /// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response -/// types, , , , and +/// types, , , , and /// public abstract class TransportResponse : TransportResponse { /// - /// The deserialized body returned by the product. + /// The (potentially deserialized) response returned by the product. /// public T Body { get; protected internal set; } } @@ -46,7 +46,7 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// StreamResponse and kept internal. If we later make this public, we might need to refine this. /// [JsonIgnore] - internal IEnumerable? LinkedDisposables { get; set; } + protected internal IEnumerable? LinkedDisposables { get; internal set; } /// /// Allows the response to identify that the response stream should NOT be automatically disposed. @@ -55,6 +55,6 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// Currently only used by StreamResponse and therefore internal. /// [JsonIgnore] - internal virtual bool LeaveOpen { get; } = false; + protected internal virtual bool LeaveOpen { get; } = false; }