diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpApiConversions.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpApiConversions.java index 584944d32f..76ba9c242a 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpApiConversions.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpApiConversions.java @@ -19,6 +19,8 @@ import io.servicetalk.http.api.StreamingHttpClientToBlockingStreamingHttpClient.ReservedStreamingHttpConnectionToBlockingStreaming; import io.servicetalk.http.api.StreamingHttpClientToHttpClient.ReservedStreamingHttpConnectionToReservedHttpConnection; +import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; + /** * Conversion routines to {@link StreamingHttpService}. */ @@ -486,4 +488,20 @@ public static HttpService toHttpService(StreamingHttpService service) { public static BlockingStreamingHttpService toBlockingStreamingHttpService(StreamingHttpService service) { return new StreamingHttpServiceToBlockingStreamingHttpService(service); } + + /** + * Tries assigning a strategy for a request execution if none is already assigned. + *

+ * Top level user-facing API can optimize execution strategy (offloading) when it's known that users won't interact + * with some parts of request-response processing. + * + * @param metaData request meta-data to assign the strategy to + * @param strategy {@link HttpExecutionStrategy} to assign + * @see + * Programming models + */ + static void assignStrategy(final HttpRequestMetaData metaData, final HttpExecutionStrategy strategy) { + metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java index 670ad24d7b..34ad25eeeb 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java @@ -18,7 +18,7 @@ import io.servicetalk.concurrent.BlockingIterable; import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; +import static io.servicetalk.http.api.HttpApiConversions.assignStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.RequestResponseFactories.toAggregated; import static io.servicetalk.http.api.StreamingHttpConnectionToBlockingHttpConnection.DEFAULT_BLOCKING_CONNECTION_STRATEGY; @@ -44,7 +44,7 @@ public HttpExecutionStrategy executionStrategy() { @Override public ReservedBlockingHttpConnection reserveConnection(final HttpRequestMetaData metaData) throws Exception { - metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + assignStrategy(metaData, strategy); return blockingInvocation(client.reserveConnection(metaData) .map(c -> new ReservedStreamingHttpConnectionToReservedBlockingHttpConnection(c, this.strategy, reqRespFactory))); @@ -57,7 +57,7 @@ public StreamingHttpClient asStreamingClient() { @Override public HttpResponse request(final HttpRequest request) throws Exception { - request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + assignStrategy(request, strategy); return BlockingRequestUtils.request(client, request); } @@ -89,6 +89,7 @@ public HttpRequest newRequest(final HttpRequestMethod method, final String reque static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnection implements ReservedBlockingHttpConnection { private final ReservedStreamingHttpConnection connection; + private final HttpExecutionStrategy strategy; private final HttpConnectionContext context; private final HttpExecutionContext executionContext; private final HttpRequestResponseFactory reqRespFactory; @@ -104,13 +105,13 @@ static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnecti final HttpExecutionStrategy strategy, final HttpRequestResponseFactory reqRespFactory) { - requireNonNull(strategy); this.connection = requireNonNull(connection); + this.strategy = requireNonNull(strategy); final HttpConnectionContext originalCtx = connection.connectionContext(); executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) { @Override public HttpExecutionStrategy executionStrategy() { - return strategy; + return ReservedStreamingHttpConnectionToReservedBlockingHttpConnection.this.strategy; } }; context = new DelegatingHttpConnectionContext(originalCtx) { @@ -144,6 +145,7 @@ public BlockingIterable transportEventIterable(final HttpEventK @Override public HttpResponse request(final HttpRequest request) throws Exception { + assignStrategy(request, strategy); return BlockingRequestUtils.request(connection, request); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingStreamingHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingStreamingHttpClient.java index 7841b60032..cb7ad617a6 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingStreamingHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingStreamingHttpClient.java @@ -18,7 +18,7 @@ import io.servicetalk.concurrent.BlockingIterable; import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; +import static io.servicetalk.http.api.HttpApiConversions.assignStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.RequestResponseFactories.toBlockingStreaming; import static io.servicetalk.http.api.StreamingHttpConnectionToBlockingStreamingHttpConnection.DEFAULT_BLOCKING_STREAMING_CONNECTION_STRATEGY; @@ -46,9 +46,9 @@ public HttpExecutionStrategy executionStrategy() { @Override public ReservedBlockingStreamingHttpConnection reserveConnection(final HttpRequestMetaData metaData) throws Exception { + assignStrategy(metaData, strategy); // It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter). // So we don't apply any explicit timeout here and just wait forever. - metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); return blockingInvocation(client.reserveConnection(metaData) .map(c -> new ReservedStreamingHttpConnectionToBlockingStreaming(c, this.strategy, reqRespFactory))); } @@ -60,7 +60,7 @@ public StreamingHttpClient asStreamingClient() { @Override public BlockingStreamingHttpResponse request(final BlockingStreamingHttpRequest request) throws Exception { - request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + assignStrategy(request, strategy); return blockingInvocation(client.request(request.toStreamingRequest())).toBlockingStreamingResponse(); } @@ -148,6 +148,7 @@ public BlockingIterable transportEventIterable(final HttpEventK @Override public BlockingStreamingHttpResponse request(final BlockingStreamingHttpRequest request) throws Exception { + assignStrategy(request, strategy); return blockingInvocation(connection.request(request.toStreamingRequest())) .toBlockingStreamingResponse(); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java index 5a9d73fc28..258fcc2335 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java @@ -19,7 +19,7 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; +import static io.servicetalk.http.api.HttpApiConversions.assignStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.RequestResponseFactories.toAggregated; import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_ASYNC_CONNECTION_STRATEGY; @@ -46,7 +46,7 @@ public HttpExecutionStrategy executionStrategy() { @Override public Single request(final HttpRequest request) { return Single.defer(() -> { - request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + assignStrategy(request, strategy); return client.request(request.toStreamingRequest()) .flatMap(response -> response.toResponse().shareContextOnSubscribe()) .shareContextOnSubscribe(); @@ -56,7 +56,7 @@ public Single request(final HttpRequest request) { @Override public Single reserveConnection(final HttpRequestMetaData metaData) { return Single.defer(() -> { - metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy); + assignStrategy(metaData, strategy); return client.reserveConnection(metaData) .map(c -> new ReservedStreamingHttpConnectionToReservedHttpConnection(c, this.strategy, reqRespFactory)) @@ -170,8 +170,12 @@ public Publisher transportEventStream(final HttpEventKey eve @Override public Single request(final HttpRequest request) { - return connection.request(request.toStreamingRequest()) - .flatMap(response -> response.toResponse().shareContextOnSubscribe()); + return Single.defer(() -> { + assignStrategy(request, strategy); + return connection.request(request.toStreamingRequest()) + .flatMap(response -> response.toResponse().shareContextOnSubscribe()) + .shareContextOnSubscribe(); + }); } @Override