Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch: use backpressure to resolve streaming request flakes #3236

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,7 +14,6 @@
package zipkin2.elasticsearch.internal;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +35,6 @@
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;
import zipkin2.elasticsearch.internal.client.HttpCall;

import static java.nio.charset.StandardCharsets.UTF_8;
import static zipkin2.elasticsearch.ElasticsearchVersion.V6_0;
Expand Down Expand Up @@ -67,21 +65,15 @@ public class BulkRequestBenchmarks {
@Benchmark public HttpRequest buildAndWriteRequest_singleSpan() {
BulkCallBuilder builder = new BulkCallBuilder(es, V6_0, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

@Benchmark public HttpRequest buildAndWriteRequest_tenSpans() {
BulkCallBuilder builder = new BulkCallBuilder(es, V6_0, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
HttpCall.RequestSupplier supplier = builder.build().request;
HttpRequestWriter request = HttpRequest.streaming(supplier.headers());
supplier.writeBody(request::tryWrite);
return request;
return builder.build().request.get();
}

// Convenience main entry-point
Expand Down
2 changes: 1 addition & 1 deletion zipkin-server/src/main/resources/zipkin-server-shared.yml
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ logging:
com.linecorp.armeria: 'WARN'
# # But allow to say it's ready to serve requests
com.linecorp.armeria.server.Server: 'INFO'
# kafka is quite chatty so we switch everything off by default
# kafka is quite chatty, so we switch everything off by default
org.apache.kafka: 'OFF'
# # investigate /api/v2/dependencies
# zipkin2.internal.DependencyLinker: 'DEBUG'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,12 +13,15 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.endpoint.EmptyEndpointGroupException;
import com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
import com.linecorp.armeria.testing.junit4.server.ServerRule;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.awaitility.core.ConditionFactory;
Expand Down Expand Up @@ -47,26 +50,36 @@ public class ITElasticsearchHealthCheck {

static final SettableHealthChecker server1Health = new SettableHealthChecker(true);

static {
// Gives better context when there's an exception such as AbortedStreamException
System.setProperty("com.linecorp.armeria.verboseExceptions", "always");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already in log config, but also we weren't getting exceptions ;)

}

@ClassRule public static ServerRule server1 = new ServerRule() {
@Override protected void configure(ServerBuilder sb) {
sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse());
sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE));
sb.service("/_cluster/health", HealthCheckService.of(server1Health));
sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse());
}
};

/** This ensures the response is sent after the request is fully read. */
private static HttpResponse sendResponseAfterAggregate(HttpRequest req,
AggregatedHttpResponse response) {
final CompletableFuture<HttpResponse> future = new CompletableFuture<>();
req.aggregate().whenComplete((aggregatedReq, cause) -> {
if (cause != null) {
future.completeExceptionally(cause);
} else {
future.complete(response.toHttpResponse());
}
});
return HttpResponse.from(future);
}

static final SettableHealthChecker server2Health = new SettableHealthChecker(true);

@ClassRule public static ServerRule server2 = new ServerRule() {
@Override protected void configure(ServerBuilder sb) {
sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse());
sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE));
sb.service("/_cluster/health", HealthCheckService.of(server2Health));
sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse());
sb.serviceUnder("/_cluster/health/",
(ctx, req) -> sendResponseAfterAggregate(req, GREEN_RESPONSE));
}
};

Expand Down Expand Up @@ -99,7 +112,9 @@ private void initWithHosts(String hosts) {

@Test public void allHealthy() {
try (ElasticsearchStorage storage = context.getBean(ElasticsearchStorage.class)) {
assertOk(storage.check());

// There's an initialization delay, so await instead of expect everything up now.
awaitTimeout.untilAsserted(() -> assertThat(storage.check().ok()).isTrue());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,8 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeaders;
Expand All @@ -31,6 +33,7 @@
import io.netty.handler.codec.http.QueryStringEncoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -112,7 +115,9 @@ public <T> void index(String index, String typeName, T input, BulkIndexWriter<T>
entries.add(newIndexEntry(index, typeName, input, writer));
}

/** Creates a bulk request when there is more than one object to store */
/**
* Creates a bulk request when there is more than one object to store
*/
public HttpCall<Void> build() {
QueryStringEncoder urlBuilder = new QueryStringEncoder("/_bulk");
if (pipeline != null) urlBuilder.addParam("pipeline", pipeline);
Expand All @@ -139,7 +144,7 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {

BulkRequestSupplier(List<IndexEntry<?>> entries, boolean shouldAddType,
RequestHeaders headers, ByteBufAllocator alloc) {
this.entries = entries;
this.entries = Collections.unmodifiableList(entries);
this.shouldAddType = shouldAddType;
this.headers = headers;
this.alloc = alloc;
Expand All @@ -149,13 +154,29 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier {
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
for (IndexEntry<?> entry : entries) {
if (!requestStream.tryWrite(HttpData.wrap(serialize(alloc, entry, shouldAddType)))) {
// Stream aborted, no need to serialize anymore.
return;
}
@Override public HttpRequest get() {
HttpRequestWriter writer = HttpRequest.streaming(headers);
writeEntry(writer, 0);
return writer;
}

// There's a high chance that the response is received before the request
// is complete. This can be a problem for BulkCallBuilder when it's sending
// streaming requests. Hence, we use backpressure, instead of buffering.
//
// Follow https://github.com/line/armeria/issues/3119 for doc updates.
private void writeEntry(HttpRequestWriter writer, int index) {
if (index == entries.size()) { // out of entries.
writer.close();
return;
}
// Write the current entry directly to the current request.
if (!writer.tryWrite(HttpData.wrap(serialize(alloc, entries.get(index), shouldAddType)))) {
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
// Stream aborted, no need to serialize anymore.
return;
}
// Recurse to proceed to the next entry, if any.
writer.whenConsumed().thenRun(() -> writeEntry(writer, index + 1));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -23,7 +23,6 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.HttpStatusClass;
Expand Down Expand Up @@ -55,34 +54,15 @@ public interface BodyConverter<V> {
V convert(JsonParser parser, Supplier<String> contentString) throws IOException;
}

/**
* A request stream which can have {@link HttpData} of the request body written to it.
*/
public interface RequestStream {
/**
* Writes the {@link HttpData} to the stream. Returns {@code false} if the stream has been
* aborted (e.g., the request timed out while writing), or {@code true} otherwise.
*/
boolean tryWrite(HttpData obj);
}

/**
* A supplier of {@linkplain HttpHeaders headers} and {@linkplain HttpData body} of a request to
* Elasticsearch.
*/
public interface RequestSupplier {
public interface RequestSupplier extends Supplier<HttpRequest> {
/**
* Returns the {@linkplain HttpHeaders headers} for this request.
*/
RequestHeaders headers();

/**
* Writes the body of this request into the {@link RequestStream}. {@link
* RequestStream#tryWrite(HttpData)} can be called any number of times to publish any number of
* payload objects. It can be useful to split up a large payload into smaller chunks instead of
* buffering everything as one payload.
*/
void writeBody(RequestStream requestStream);
}

static class AggregatedRequestSupplier implements RequestSupplier {
Expand All @@ -106,8 +86,8 @@ static class AggregatedRequestSupplier implements RequestSupplier {
return request.headers();
}

@Override public void writeBody(RequestStream requestStream) {
requestStream.tryWrite(request.content());
@Override public HttpRequest get() {
return request.toHttpRequest();
}
}

Expand Down Expand Up @@ -205,10 +185,7 @@ CompletableFuture<AggregatedHttpResponse> sendRequest() {
final HttpResponse response;
try (SafeCloseable ignored =
Clients.withContextCustomizer(ctx -> ctx.logBuilder().name(name))) {
HttpRequestWriter httpRequest = HttpRequest.streaming(request.headers());
response = httpClient.execute(httpRequest);
request.writeBody(httpRequest::tryWrite);
httpRequest.close();
response = httpClient.execute(request.get());
}
CompletableFuture<AggregatedHttpResponse> responseFuture =
RequestContext.mapCurrent(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,7 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
Expand Down Expand Up @@ -253,13 +254,15 @@ class HttpCallTest {
server.enqueue(SUCCESS_RESPONSE);

HttpCall.RequestSupplier supplier = new HttpCall.RequestSupplier() {

final RequestHeaders headers = RequestHeaders.of(HttpMethod.POST, "/");

@Override public RequestHeaders headers() {
return RequestHeaders.of(HttpMethod.POST, "/");
return headers;
}

@Override public void writeBody(HttpCall.RequestStream requestStream) {
requestStream.tryWrite(HttpData.ofUtf8("hello"));
requestStream.tryWrite(HttpData.ofUtf8(" world"));
@Override public HttpRequest get() {
return HttpRequest.of(headers, HttpData.ofUtf8("hello"), HttpData.ofUtf8(" world"));
}
};

Expand Down
Loading