From 6973b6fff02b85b7ca98d2d1b7a64f5b7f571b81 Mon Sep 17 00:00:00 2001 From: minwoox Date: Fri, 16 Oct 2020 15:25:14 +0900 Subject: [PATCH] Changed to return `HttpRequest` from `RquestSupplier` --- .../internal/BulkRequestBenchmarks.java | 12 ++------ .../internal/BulkCallBuilder.java | 28 +++++++++++++------ .../internal/client/HttpCall.java | 23 ++++----------- .../internal/client/HttpCallTest.java | 11 +++++--- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java index 84eec488d12..c848cde1bad 100644 --- a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java @@ -14,7 +14,6 @@ package zipkin2.elasticsearch.internal; import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpRequestWriter; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import java.util.concurrent.TimeUnit; @@ -36,7 +35,6 @@ import zipkin2.codec.SpanBytesDecoder; import zipkin2.elasticsearch.ElasticsearchStorage; import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry; -import zipkin2.elasticsearch.internal.client.HttpCall; import static java.nio.charset.StandardCharsets.UTF_8; import static zipkin2.storage.cassandra.internal.Resources.resourceToString; @@ -66,10 +64,7 @@ public class BulkRequestBenchmarks { @Benchmark public HttpRequest buildAndWriteRequest_singleSpan() { BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span"); builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN); - HttpCall.RequestSupplier supplier = builder.build().request; - HttpRequestWriter request = HttpRequest.streaming(supplier.headers()); - supplier.writeBody(request::tryWrite); - return request; + return builder.build().request.get(); } @Benchmark public HttpRequest buildAndWriteRequest_tenSpans() { @@ -77,10 +72,7 @@ public class BulkRequestBenchmarks { for (int i = 0; i < 10; i++) { builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN); } - HttpCall.RequestSupplier supplier = builder.build().request; - HttpRequestWriter request = HttpRequest.streaming(supplier.headers()); - supplier.writeBody(request::tryWrite); - return request; + return builder.build().request.get(); } // Convenience main entry-point diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java index 28b35d71740..b7cf406e3cc 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 The OpenZipkin Authors + * Copyright 2015-2020 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,10 +20,13 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpRequestWriter; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.internal.shaded.guava.collect.ImmutableList; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; @@ -138,7 +141,7 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier { BulkRequestSupplier(List> entries, boolean shouldAddType, RequestHeaders headers, ByteBufAllocator alloc) { - this.entries = entries; + this.entries = ImmutableList.copyOf(entries); this.shouldAddType = shouldAddType; this.headers = headers; this.alloc = alloc; @@ -148,13 +151,22 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier { return headers; } - @Override public void writeBody(HttpCall.RequestStream requestStream) { - for (IndexEntry entry : entries) { - if (!requestStream.tryWrite(HttpData.wrap(serialize(alloc, entry, shouldAddType)))) { - // Stream aborted, no need to serialize anymore. - return; - } + @Override + public HttpRequest get() { + final HttpRequestWriter writer = HttpRequest.streaming(headers); + writeEntry(writer, 0); + return writer; + } + + private void writeEntry(HttpRequestWriter writer, int index) { + if (index == entries.size()) { + return; + } + if (!writer.tryWrite(HttpData.wrap(serialize(alloc, entries.get(index), shouldAddType)))) { + // Stream aborted, no need to serialize anymore. + return; } + writer.whenConsumed().thenRun(() -> writeEntry(writer, index + 1)); } } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java index 64fe81d5c3b..6046b9d22b0 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java @@ -23,7 +23,6 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpRequestWriter; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.HttpStatusClass; @@ -70,19 +69,11 @@ public interface RequestStream { * A supplier of {@linkplain HttpHeaders headers} and {@linkplain HttpData body} of a request to * Elasticsearch. */ - public interface RequestSupplier { + public interface RequestSupplier extends Supplier { /** * Returns the {@linkplain HttpHeaders headers} for this request. */ RequestHeaders headers(); - - /** - * Writes the body of this request into the {@link RequestStream}. {@link - * RequestStream#tryWrite(HttpData)} can be called any number of times to publish any number of - * payload objects. It can be useful to split up a large payload into smaller chunks instead of - * buffering everything as one payload. - */ - void writeBody(RequestStream requestStream); } static class AggregatedRequestSupplier implements RequestSupplier { @@ -106,10 +97,9 @@ static class AggregatedRequestSupplier implements RequestSupplier { return request.headers(); } - @Override public void writeBody(RequestStream requestStream) { - if (!request.content().isEmpty()) { - requestStream.tryWrite(request.content()); - } + @Override + public HttpRequest get() { + return request.toHttpRequest(); } } @@ -207,10 +197,7 @@ CompletableFuture sendRequest() { final HttpResponse response; try (SafeCloseable ignored = Clients.withContextCustomizer(ctx -> ctx.logBuilder().name(name))) { - HttpRequestWriter httpRequest = HttpRequest.streaming(request.headers()); - request.writeBody(httpRequest::tryWrite); - httpRequest.close(); - response = httpClient.execute(httpRequest); + response = httpClient.execute(request.get()); } CompletableFuture responseFuture = RequestContext.mapCurrent( diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java index da087f86aee..cccdac37a2a 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java @@ -20,6 +20,7 @@ import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.ResponseHeaders; @@ -252,13 +253,15 @@ class HttpCallTest { server.enqueue(SUCCESS_RESPONSE); HttpCall.RequestSupplier supplier = new HttpCall.RequestSupplier() { + + private final RequestHeaders headers = RequestHeaders.of(HttpMethod.POST, "/"); + @Override public RequestHeaders headers() { - return RequestHeaders.of(HttpMethod.POST, "/"); + return headers; } - @Override public void writeBody(HttpCall.RequestStream requestStream) { - requestStream.tryWrite(HttpData.ofUtf8("hello")); - requestStream.tryWrite(HttpData.ofUtf8(" world")); + @Override public HttpRequest get() { + return HttpRequest.of(headers, HttpData.ofUtf8("hello"), HttpData.ofUtf8(" world")); } };