Skip to content

Commit

Permalink
Always bubble up expcetions in lambdas
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Jan 4, 2025
1 parent a628a89 commit cdb48b8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,14 @@ public Completable apply(final int count, final Throwable t) {
backOffPolicy = retryFor.apply(requestMetaData, t);
} catch (Throwable tt) {
LOGGER.warn("Unexpected exception when computing backoff policy.", tt);
return failed(ThrowableUtils.addSuppressed(t, tt));
Completable result = failed(ThrowableUtils.addSuppressed(tt, t));
if (returnOriginalResponses) {
StreamingHttpResponse response = extractStreamingResponse(t);
if (response != null) {
result = drain(response).concat(result);
}
}
return result;
}
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,83 +310,63 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1));
}

private enum LambdaException {
private enum ExceptionSource {
RESPONSE_MAPPER,
RETRY_RETRYABLE_EXCEPTIONS,
RETRY_RESPONSES,
ON_REQUEST_RETRY
RETRY_RESPONSES
}

private static Stream<Arguments> lambdaExceptions() {
return Stream.of(true, false).flatMap(returnOriginalResponses ->
Stream.of(LambdaException.values())
Stream.of(ExceptionSource.values())
.map(lambda -> Arguments.of(returnOriginalResponses, lambda)));
}

@ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}")
@MethodSource("lambdaExceptions")
void lambdaExceptions(final boolean returnOriginalResponses, final LambdaException thrower) throws Exception {

void lambdaExceptions(final boolean returnOriginalResponses, final ExceptionSource thrower) {
final AtomicInteger newConnectionCreated = new AtomicInteger();
final AtomicInteger responsesReceived = new AtomicInteger();
final AtomicInteger requestsInitiated = new AtomicInteger();
final AtomicInteger responseDrained = new AtomicInteger();
final AtomicInteger onRequestRetryCounter = new AtomicInteger();
final String retryMessage = "Retryable header";
normalClient = normalClientBuilder
.appendClientFilter(new Builder()
.maxTotalRetries(4)
.responseMapper(metaData -> {
if (thrower == LambdaException.RESPONSE_MAPPER) {
if (thrower == ExceptionSource.RESPONSE_MAPPER) {
throw new RuntimeException("responseMapper");
}
return metaData.headers().contains(RETRYABLE_HEADER) ?
new HttpResponseException(retryMessage, metaData) : null;
})
// Disable request retrying
.retryRetryableExceptions((requestMetaData, e) -> {
if (thrower == LambdaException.RETRY_RETRYABLE_EXCEPTIONS) {
throw new RuntimeException("retryRetryableExceptions");
}
return ofNoRetries();
})
// Retry only responses marked so
.retryResponses((requestMetaData, throwable) -> {
if (thrower == LambdaException.RETRY_RESPONSES) {
if (thrower == ExceptionSource.RETRY_RESPONSES) {
throw new RuntimeException("retryResponses");
}
return ofImmediate(3);
}, returnOriginalResponses)
.onRequestRetry((count, req, t) -> {
if (thrower == LambdaException.ON_REQUEST_RETRY) {
throw new RuntimeException("onRequestRetryThrows");
}
assertThat(onRequestRetryCounter.incrementAndGet(), is(count));
})
.onRequestRetry((count, req, t) ->
assertThat(onRequestRetryCounter.incrementAndGet(), is(count)))
.build())
.appendConnectionFilter(c -> {
newConnectionCreated.incrementAndGet();
return new StreamingHttpConnectionFilter(c) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request)
.map(response -> {
responsesReceived.incrementAndGet();
return response.transformPayloadBody(payload -> payload
.whenFinally(responseDrained::incrementAndGet));
});
return Single.defer(() -> {
requestsInitiated.incrementAndGet();
return delegate().request(request)
.map(response -> response.transformPayloadBody(payload -> payload
.whenFinally(responseDrained::incrementAndGet)));
});
}
};
})
.buildBlocking();
// TODO: we don't really want to accept different behavior but it seems like the retry operator will swallow
// exceptions that we attempt to bubble up through the retry-strategy failure channel.
if (returnOriginalResponses && thrower != LambdaException.RESPONSE_MAPPER) {
normalClient.request(normalClient.get("/"));
} else {
assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/")));
}
assertThat("Response payload body was not drained on every mapping", responseDrained.get(),
is(responsesReceived.get()));
assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/")));
assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(1));
assertThat("Multiple requests initiated", requestsInitiated.get(), is(1));
assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1));
}

Expand Down

0 comments on commit cdb48b8

Please sign in to comment.