Skip to content

Commit

Permalink
elasticsearch: use backpressure to resolve streaming request flakes
Browse files Browse the repository at this point in the history
The `HttpResponse` was sent before the request are fully sent.
So the request was aborted after getting the response.

We should change to send the response after the request is fully received.
- close #3197

Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
minwoox authored and Adrian Cole committed Dec 4, 2023
1 parent f75bf0f commit 561de8a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,7 +14,6 @@
package zipkin2.elasticsearch.internal;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +35,6 @@
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;
import zipkin2.elasticsearch.internal.client.HttpCall;

import static java.nio.charset.StandardCharsets.UTF_8;
import static zipkin2.elasticsearch.ElasticsearchVersion.V6_0;
Expand Down Expand Up @@ -67,21 +65,15 @@ public class BulkRequestBenchmarks {
@Benchmark public HttpRequest buildAndWriteRequest_singleSpan() {
BulkCallBuilder builder = new BulkCallBuilder(es, V6_0, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

@Benchmark public HttpRequest buildAndWriteRequest_tenSpans() {
BulkCallBuilder builder = new BulkCallBuilder(es, V6_0, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

// Convenience main entry-point
Expand Down
2 changes: 1 addition & 1 deletion zipkin-server/src/main/resources/zipkin-server-shared.yml
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ logging:
com.linecorp.armeria: 'WARN'
# # But allow to say it's ready to serve requests
com.linecorp.armeria.server.Server: 'INFO'
# kafka is quite chatty so we switch everything off by default
# kafka is quite chatty, so we switch everything off by default
org.apache.kafka: 'OFF'
# # investigate /api/v2/dependencies
# zipkin2.internal.DependencyLinker: 'DEBUG'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,12 +13,15 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.endpoint.EmptyEndpointGroupException;
import com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
import com.linecorp.armeria.testing.junit4.server.ServerRule;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.awaitility.core.ConditionFactory;
Expand Down Expand Up @@ -47,26 +50,36 @@ public class ITElasticsearchHealthCheck {

static final SettableHealthChecker server1Health = new SettableHealthChecker(true);

static {
// Gives better context when there's an exception such as AbortedStreamException
System.setProperty("com.linecorp.armeria.verboseExceptions", "always");
}

@ClassRule public static ServerRule server1 = new ServerRule() {
@Override protected void configure(ServerBuilder sb) {
sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse());
sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE));
sb.service("/_cluster/health", HealthCheckService.of(server1Health));
sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse());
}
};

/** This ensures the response is sent after the request is fully read. */
private static HttpResponse sendResponseAfterAggregate(HttpRequest req,
AggregatedHttpResponse response) {
final CompletableFuture<HttpResponse> future = new CompletableFuture<>();
req.aggregate().whenComplete((aggregatedReq, cause) -> {
if (cause != null) {
future.completeExceptionally(cause);
} else {
future.complete(response.toHttpResponse());
}
});
return HttpResponse.from(future);
}

static final SettableHealthChecker server2Health = new SettableHealthChecker(true);

@ClassRule public static ServerRule server2 = new ServerRule() {
@Override protected void configure(ServerBuilder sb) {
sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse());
sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE));
sb.service("/_cluster/health", HealthCheckService.of(server2Health));
sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse());
sb.serviceUnder("/_cluster/health/",
(ctx, req) -> sendResponseAfterAggregate(req, GREEN_RESPONSE));
}
};

Expand Down Expand Up @@ -99,7 +112,9 @@ private void initWithHosts(String hosts) {

@Test public void allHealthy() {
try (ElasticsearchStorage storage = context.getBean(ElasticsearchStorage.class)) {
assertOk(storage.check());

// There's an initialization delay, so await instead of expect everything up now.
awaitTimeout.untilAsserted(() -> assertThat(storage.check().ok()).isTrue());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,8 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeaders;
Expand All @@ -31,6 +33,7 @@
import io.netty.handler.codec.http.QueryStringEncoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -112,7 +115,9 @@ public <T> void index(String index, String typeName, T input, BulkIndexWriter<T>
entries.add(newIndexEntry(index, typeName, input, writer));
}

/** Creates a bulk request when there is more than one object to store */
/**
* Creates a bulk request when there is more than one object to store
*/
public HttpCall<Void> build() {
QueryStringEncoder urlBuilder = new QueryStringEncoder("/_bulk");
if (pipeline != null) urlBuilder.addParam("pipeline", pipeline);
Expand All @@ -139,7 +144,7 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {

BulkRequestSupplier(List<IndexEntry<?>> entries, boolean shouldAddType,
RequestHeaders headers, ByteBufAllocator alloc) {
this.entries = entries;
this.entries = Collections.unmodifiableList(entries);
this.shouldAddType = shouldAddType;
this.headers = headers;
this.alloc = alloc;
Expand All @@ -149,13 +154,29 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
for (IndexEntry<?> entry : entries) {
if (!requestStream.tryWrite(HttpData.wrap(serialize(alloc, entry, shouldAddType)))) {
// Stream aborted, no need to serialize anymore.
return;
}
@Override public HttpRequest get() {
HttpRequestWriter writer = HttpRequest.streaming(headers);
writeEntry(writer, 0);
return writer;
}

// There's a high chance that the response is received before the request
// is complete. This can be a problem for BulkCallBuilder when it's sending
// streaming requests. Hence, we use backpressure, instead of buffering.
//
// Follow https://github.com/line/armeria/issues/3119 for doc updates.
private void writeEntry(HttpRequestWriter writer, int index) {
if (index == entries.size()) { // out of entries.
writer.close();
return;
}
// Write the current entry directly to the current request.
if (!writer.tryWrite(HttpData.wrap(serialize(alloc, entries.get(index), shouldAddType)))) {
// Stream aborted, no need to serialize anymore.
return;
}
// Recurse to proceed to the next entry, if any.
writer.whenConsumed().thenRun(() -> writeEntry(writer, index + 1));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -23,7 +23,6 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.HttpStatusClass;
Expand Down Expand Up @@ -55,34 +54,15 @@ public interface BodyConverter<V> {
V convert(JsonParser parser, Supplier<String> contentString) throws IOException;
}

/**
* A request stream which can have {@link HttpData} of the request body written to it.
*/
public interface RequestStream {
/**
* Writes the {@link HttpData} to the stream. Returns {@code false} if the stream has been
* aborted (e.g., the request timed out while writing), or {@code true} otherwise.
*/
boolean tryWrite(HttpData obj);
}

/**
* A supplier of {@linkplain HttpHeaders headers} and {@linkplain HttpData body} of a request to
* Elasticsearch.
*/
public interface RequestSupplier {
public interface RequestSupplier extends Supplier<HttpRequest> {
/**
* Returns the {@linkplain HttpHeaders headers} for this request.
*/
RequestHeaders headers();

/**
* Writes the body of this request into the {@link RequestStream}. {@link
* RequestStream#tryWrite(HttpData)} can be called any number of times to publish any number of
* payload objects. It can be useful to split up a large payload into smaller chunks instead of
* buffering everything as one payload.
*/
void writeBody(RequestStream requestStream);
}

static class AggregatedRequestSupplier implements RequestSupplier {
Expand All @@ -106,8 +86,8 @@ static class AggregatedRequestSupplier implements RequestSupplier {
return request.headers();
}

@Override public void writeBody(RequestStream requestStream) {
requestStream.tryWrite(request.content());
@Override public HttpRequest get() {
return request.toHttpRequest();
}
}

Expand Down Expand Up @@ -205,10 +185,7 @@ CompletableFuture<AggregatedHttpResponse> sendRequest() {
final HttpResponse response;
try (SafeCloseable ignored =
Clients.withContextCustomizer(ctx -> ctx.logBuilder().name(name))) {
HttpRequestWriter httpRequest = HttpRequest.streaming(request.headers());
response = httpClient.execute(httpRequest);
request.writeBody(httpRequest::tryWrite);
httpRequest.close();
response = httpClient.execute(request.get());
}
CompletableFuture<AggregatedHttpResponse> responseFuture =
RequestContext.mapCurrent(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,7 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
Expand Down Expand Up @@ -253,13 +254,15 @@ class HttpCallTest {
server.enqueue(SUCCESS_RESPONSE);

HttpCall.RequestSupplier supplier = new HttpCall.RequestSupplier() {

final RequestHeaders headers = RequestHeaders.of(HttpMethod.POST, "/");

@Override public RequestHeaders headers() {
return RequestHeaders.of(HttpMethod.POST, "/");
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
requestStream.tryWrite(HttpData.ofUtf8("hello"));
requestStream.tryWrite(HttpData.ofUtf8(" world"));
@Override public HttpRequest get() {
return HttpRequest.of(headers, HttpData.ofUtf8("hello"), HttpData.ofUtf8(" world"));
}
};

Expand Down

0 comments on commit 561de8a

Please sign in to comment.