Skip to content

Commit

Permalink
Changed to return HttpRequest from RquestSupplier
Browse files Browse the repository at this point in the history
  • Loading branch information
minwoox committed Oct 16, 2020
1 parent cc74bf7 commit 6973b6f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package zipkin2.elasticsearch.internal;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +35,6 @@
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;
import zipkin2.elasticsearch.internal.client.HttpCall;

import static java.nio.charset.StandardCharsets.UTF_8;
import static zipkin2.storage.cassandra.internal.Resources.resourceToString;
Expand Down Expand Up @@ -66,21 +64,15 @@ public class BulkRequestBenchmarks {
@Benchmark public HttpRequest buildAndWriteRequest_singleSpan() {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

@Benchmark public HttpRequest buildAndWriteRequest_tenSpans() {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

// Convenience main entry-point
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,10 +20,13 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.shaded.guava.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
Expand Down Expand Up @@ -138,7 +141,7 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {

BulkRequestSupplier(List<IndexEntry<?>> entries, boolean shouldAddType,
RequestHeaders headers, ByteBufAllocator alloc) {
this.entries = entries;
this.entries = ImmutableList.copyOf(entries);
this.shouldAddType = shouldAddType;
this.headers = headers;
this.alloc = alloc;
Expand All @@ -148,13 +151,22 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
for (IndexEntry<?> entry : entries) {
if (!requestStream.tryWrite(HttpData.wrap(serialize(alloc, entry, shouldAddType)))) {
// Stream aborted, no need to serialize anymore.
return;
}
@Override
public HttpRequest get() {
final HttpRequestWriter writer = HttpRequest.streaming(headers);
writeEntry(writer, 0);
return writer;
}

private void writeEntry(HttpRequestWriter writer, int index) {
if (index == entries.size()) {
return;
}
if (!writer.tryWrite(HttpData.wrap(serialize(alloc, entries.get(index), shouldAddType)))) {
// Stream aborted, no need to serialize anymore.
return;
}
writer.whenConsumed().thenRun(() -> writeEntry(writer, index + 1));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.HttpStatusClass;
Expand Down Expand Up @@ -70,19 +69,11 @@ public interface RequestStream {
* A supplier of {@linkplain HttpHeaders headers} and {@linkplain HttpData body} of a request to
* Elasticsearch.
*/
public interface RequestSupplier {
public interface RequestSupplier extends Supplier<HttpRequest> {
/**
* Returns the {@linkplain HttpHeaders headers} for this request.
*/
RequestHeaders headers();

/**
* Writes the body of this request into the {@link RequestStream}. {@link
* RequestStream#tryWrite(HttpData)} can be called any number of times to publish any number of
* payload objects. It can be useful to split up a large payload into smaller chunks instead of
* buffering everything as one payload.
*/
void writeBody(RequestStream requestStream);
}

static class AggregatedRequestSupplier implements RequestSupplier {
Expand All @@ -106,10 +97,9 @@ static class AggregatedRequestSupplier implements RequestSupplier {
return request.headers();
}

@Override public void writeBody(RequestStream requestStream) {
if (!request.content().isEmpty()) {
requestStream.tryWrite(request.content());
}
@Override
public HttpRequest get() {
return request.toHttpRequest();
}
}

Expand Down Expand Up @@ -207,10 +197,7 @@ CompletableFuture<AggregatedHttpResponse> sendRequest() {
final HttpResponse response;
try (SafeCloseable ignored =
Clients.withContextCustomizer(ctx -> ctx.logBuilder().name(name))) {
HttpRequestWriter httpRequest = HttpRequest.streaming(request.headers());
request.writeBody(httpRequest::tryWrite);
httpRequest.close();
response = httpClient.execute(httpRequest);
response = httpClient.execute(request.get());
}
CompletableFuture<AggregatedHttpResponse> responseFuture =
RequestContext.mapCurrent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
Expand Down Expand Up @@ -252,13 +253,15 @@ class HttpCallTest {
server.enqueue(SUCCESS_RESPONSE);

HttpCall.RequestSupplier supplier = new HttpCall.RequestSupplier() {

private final RequestHeaders headers = RequestHeaders.of(HttpMethod.POST, "/");

@Override public RequestHeaders headers() {
return RequestHeaders.of(HttpMethod.POST, "/");
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
requestStream.tryWrite(HttpData.ofUtf8("hello"));
requestStream.tryWrite(HttpData.ofUtf8(" world"));
@Override public HttpRequest get() {
return HttpRequest.of(headers, HttpData.ofUtf8("hello"), HttpData.ofUtf8(" world"));
}
};

Expand Down

0 comments on commit 6973b6f

Please sign in to comment.