diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 182b65fa48..1de85c89d3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -208,7 +208,14 @@ public Completable apply(final int count, final Throwable t) { backOffPolicy = retryFor.apply(requestMetaData, t); } catch (Throwable tt) { LOGGER.warn("Unexpected exception when computing backoff policy.", tt); - return failed(ThrowableUtils.addSuppressed(t, tt)); + Completable result = failed(ThrowableUtils.addSuppressed(tt, t)); + if (returnOriginalResponses) { + StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + result = drain(response).concat(result); + } + } + return result; } if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 8caf1b41a3..4e270d0391 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -310,25 +310,22 @@ public Single request(final StreamingHttpRequest request) assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } - private enum LambdaException { + private enum ExceptionSource { RESPONSE_MAPPER, - RETRY_RETRYABLE_EXCEPTIONS, - RETRY_RESPONSES, - ON_REQUEST_RETRY + RETRY_RESPONSES } private static Stream lambdaExceptions() { return Stream.of(true, false).flatMap(returnOriginalResponses -> - Stream.of(LambdaException.values()) + Stream.of(ExceptionSource.values()) .map(lambda -> Arguments.of(returnOriginalResponses, lambda))); } @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}") @MethodSource("lambdaExceptions") - void lambdaExceptions(final boolean returnOriginalResponses, final LambdaException thrower) throws Exception { - + void lambdaExceptions(final boolean returnOriginalResponses, final ExceptionSource thrower) { final AtomicInteger newConnectionCreated = new AtomicInteger(); - final AtomicInteger responsesReceived = new AtomicInteger(); + final AtomicInteger requestsInitiated = new AtomicInteger(); final AtomicInteger responseDrained = new AtomicInteger(); final AtomicInteger onRequestRetryCounter = new AtomicInteger(); final String retryMessage = "Retryable header"; @@ -336,57 +333,40 @@ void lambdaExceptions(final boolean returnOriginalResponses, final LambdaExcepti .appendClientFilter(new Builder() .maxTotalRetries(4) .responseMapper(metaData -> { - if (thrower == LambdaException.RESPONSE_MAPPER) { + if (thrower == ExceptionSource.RESPONSE_MAPPER) { throw new RuntimeException("responseMapper"); } return metaData.headers().contains(RETRYABLE_HEADER) ? new HttpResponseException(retryMessage, metaData) : null; }) - // Disable request retrying - .retryRetryableExceptions((requestMetaData, e) -> { - if (thrower == LambdaException.RETRY_RETRYABLE_EXCEPTIONS) { - throw new RuntimeException("retryRetryableExceptions"); - } - return ofNoRetries(); - }) // Retry only responses marked so .retryResponses((requestMetaData, throwable) -> { - if (thrower == LambdaException.RETRY_RESPONSES) { + if (thrower == ExceptionSource.RETRY_RESPONSES) { throw new RuntimeException("retryResponses"); } return ofImmediate(3); }, returnOriginalResponses) - .onRequestRetry((count, req, t) -> { - if (thrower == LambdaException.ON_REQUEST_RETRY) { - throw new RuntimeException("onRequestRetryThrows"); - } - assertThat(onRequestRetryCounter.incrementAndGet(), is(count)); - }) + .onRequestRetry((count, req, t) -> + assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) .appendConnectionFilter(c -> { newConnectionCreated.incrementAndGet(); return new StreamingHttpConnectionFilter(c) { @Override public Single request(final StreamingHttpRequest request) { - return delegate().request(request) - .map(response -> { - responsesReceived.incrementAndGet(); - return response.transformPayloadBody(payload -> payload - .whenFinally(responseDrained::incrementAndGet)); - }); + return Single.defer(() -> { + requestsInitiated.incrementAndGet(); + return delegate().request(request) + .map(response -> response.transformPayloadBody(payload -> payload + .whenFinally(responseDrained::incrementAndGet))); + }); } }; }) .buildBlocking(); - // TODO: we don't really want to accept different behavior but it seems like the retry operator will swallow - // exceptions that we attempt to bubble up through the retry-strategy failure channel. - if (returnOriginalResponses && thrower != LambdaException.RESPONSE_MAPPER) { - normalClient.request(normalClient.get("/")); - } else { - assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); - } - assertThat("Response payload body was not drained on every mapping", responseDrained.get(), - is(responsesReceived.get())); + assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); + assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(1)); + assertThat("Multiple requests initiated", requestsInitiated.get(), is(1)); assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); }