Skip to content

Commit

Permalink
Test for leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Jan 3, 2025
1 parent f005576 commit a628a89
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,13 @@ public Completable apply(final int count, final Throwable t) {
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
final BackOffPolicy backOffPolicy;
try {
backOffPolicy = retryFor.apply(requestMetaData, t);
} catch (Throwable tt) {
LOGGER.warn("Unexpected exception when computing backoff policy.", tt);
return failed(ThrowableUtils.addSuppressed(t, tt));
}
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
Expand All @@ -222,9 +228,8 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo
Completable result = (retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)));
if (returnOriginalResponses) {
if (t instanceof HttpResponseException &&
((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) {
StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData();
final StreamingHttpResponse response = extractStreamingResponse(t);
if (response != null) {
// If we succeed, we need to drain the response body before we continue. If we fail we want to
// surface the original exception and don't worry about draining since it will be returned to
// the user.
Expand All @@ -233,15 +238,6 @@ Completable applyRetryCallbacks(final Completable completable, final int retryCo
// we'll ever receive a completion event, error or success.
.beforeCancel(() -> drain(response).subscribe())
.concat(drain(response));
} else {
if (!(t instanceof HttpResponseException)) {
LOGGER.debug("Couldn't unpack response due to unexpected dynamic types. Required " +
"exception of type HttpResponseException, found {}", t.getClass());
} else {
LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " +
"meta-data of type StreamingHttpResponse, found {}",
((HttpResponseException) t).metaData().getClass());
}
}
}
return result;
Expand Down Expand Up @@ -290,11 +286,12 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del

if (responseMapper != null) {
single = single.flatMap(resp -> {
HttpResponseException exception = null;
final HttpResponseException exception;
try {
exception = responseMapper.apply(resp);
} catch (Throwable t) {
LOGGER.warn("Failed to map the response. Proceeding with original response.", t);
LOGGER.warn("Failed to map the response.", t);
return drain(resp).toSingle().flatMap(ignored -> Single.failed(t));
}
Single<StreamingHttpResponse> response;
if (exception == null) {
Expand Down Expand Up @@ -1115,7 +1112,7 @@ public RetryingHttpRequesterFilter build() {

if (retryResponses != null && throwable instanceof HttpResponseException) {
final BackOffPolicy backOffPolicy =
retryResponses.apply(requestMetaData, (HttpResponseException) throwable);
retryResponses.apply(requestMetaData, (HttpResponseException) throwable);
if (backOffPolicy != NO_RETRIES) {
return backOffPolicy;
}
Expand All @@ -1135,4 +1132,19 @@ public RetryingHttpRequesterFilter build() {
private static Completable drain(StreamingHttpResponse response) {
return response.payloadBody().ignoreElements().onErrorComplete();
}

@Nullable
private static StreamingHttpResponse extractStreamingResponse(Throwable t) {
if (t instanceof HttpResponseException) {
HttpResponseException responseException = (HttpResponseException) t;
if (responseException.metaData() instanceof StreamingHttpResponse) {
return (StreamingHttpResponse) responseException.metaData();
} else {
LOGGER.info("Couldn't unpack response due to unexpected dynamic types. Required " +
"meta-data of type StreamingHttpResponse, found {}",
responseException.metaData().getClass());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.defer;
Expand Down Expand Up @@ -307,6 +310,86 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1));
}

private enum LambdaException {
RESPONSE_MAPPER,
RETRY_RETRYABLE_EXCEPTIONS,
RETRY_RESPONSES,
ON_REQUEST_RETRY
}

private static Stream<Arguments> lambdaExceptions() {
return Stream.of(true, false).flatMap(returnOriginalResponses ->
Stream.of(LambdaException.values())
.map(lambda -> Arguments.of(returnOriginalResponses, lambda)));
}

@ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}")
@MethodSource("lambdaExceptions")
void lambdaExceptions(final boolean returnOriginalResponses, final LambdaException thrower) throws Exception {

final AtomicInteger newConnectionCreated = new AtomicInteger();
final AtomicInteger responsesReceived = new AtomicInteger();
final AtomicInteger responseDrained = new AtomicInteger();
final AtomicInteger onRequestRetryCounter = new AtomicInteger();
final String retryMessage = "Retryable header";
normalClient = normalClientBuilder
.appendClientFilter(new Builder()
.maxTotalRetries(4)
.responseMapper(metaData -> {
if (thrower == LambdaException.RESPONSE_MAPPER) {
throw new RuntimeException("responseMapper");
}
return metaData.headers().contains(RETRYABLE_HEADER) ?
new HttpResponseException(retryMessage, metaData) : null;
})
// Disable request retrying
.retryRetryableExceptions((requestMetaData, e) -> {
if (thrower == LambdaException.RETRY_RETRYABLE_EXCEPTIONS) {
throw new RuntimeException("retryRetryableExceptions");
}
return ofNoRetries();
})
// Retry only responses marked so
.retryResponses((requestMetaData, throwable) -> {
if (thrower == LambdaException.RETRY_RESPONSES) {
throw new RuntimeException("retryResponses");
}
return ofImmediate(3);
}, returnOriginalResponses)
.onRequestRetry((count, req, t) -> {
if (thrower == LambdaException.ON_REQUEST_RETRY) {
throw new RuntimeException("onRequestRetryThrows");
}
assertThat(onRequestRetryCounter.incrementAndGet(), is(count));
})
.build())
.appendConnectionFilter(c -> {
newConnectionCreated.incrementAndGet();
return new StreamingHttpConnectionFilter(c) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request)
.map(response -> {
responsesReceived.incrementAndGet();
return response.transformPayloadBody(payload -> payload
.whenFinally(responseDrained::incrementAndGet));
});
}
};
})
.buildBlocking();
// TODO: we don't really want to accept different behavior but it seems like the retry operator will swallow
// exceptions that we attempt to bubble up through the retry-strategy failure channel.
if (returnOriginalResponses && thrower != LambdaException.RESPONSE_MAPPER) {
normalClient.request(normalClient.get("/"));
} else {
assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/")));
}
assertThat("Response payload body was not drained on every mapping", responseDrained.get(),
is(responsesReceived.get()));
assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1));
}

@Test
void singleInstanceFilter() {
Assertions.assertThrows(IllegalStateException.class, () -> forResolvedAddress(localAddress(8888))
Expand Down

0 comments on commit a628a89

Please sign in to comment.