Skip to content

Commit

Permalink
Apply optimized execution strategy for converted connections (#2634)
Browse files Browse the repository at this point in the history
Motivation:

When a client is converted to a different API, the API optimizes the default strategy. We should do the same when we convert a reserved connection to a different API.

Modifications:

- Apply an execution strategy inside the request method of every `ReservedStreamingHttpConnectionToReserved*HttpConnection`;

Result:

Converted connections apply the same strategy as converted clients.
  • Loading branch information
idelpivnitskiy authored Jul 4, 2023
1 parent 195e409 commit e257614
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.servicetalk.http.api.StreamingHttpClientToBlockingStreamingHttpClient.ReservedStreamingHttpConnectionToBlockingStreaming;
import io.servicetalk.http.api.StreamingHttpClientToHttpClient.ReservedStreamingHttpConnectionToReservedHttpConnection;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

/**
* Conversion routines to {@link StreamingHttpService}.
*/
Expand Down Expand Up @@ -486,4 +488,20 @@ public static HttpService toHttpService(StreamingHttpService service) {
public static BlockingStreamingHttpService toBlockingStreamingHttpService(StreamingHttpService service) {
return new StreamingHttpServiceToBlockingStreamingHttpService(service);
}

/**
* Tries assigning a strategy for a request execution if none is already assigned.
* <p>
* Top level user-facing API can optimize execution strategy (offloading) when it's known that users won't interact
* with some parts of request-response processing.
*
* @param metaData request meta-data to assign the strategy to
* @param strategy {@link HttpExecutionStrategy} to assign
* @see <a href=
* "https://docs.servicetalk.io/servicetalk-http-api/SNAPSHOT/blocking-safe-by-default.html#programming-models">
* Programming models</a>
*/
static void assignStrategy(final HttpRequestMetaData metaData, final HttpExecutionStrategy strategy) {
metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.servicetalk.concurrent.BlockingIterable;

import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpApiConversions.assignStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toAggregated;
import static io.servicetalk.http.api.StreamingHttpConnectionToBlockingHttpConnection.DEFAULT_BLOCKING_CONNECTION_STRATEGY;
Expand All @@ -44,7 +44,7 @@ public HttpExecutionStrategy executionStrategy() {

@Override
public ReservedBlockingHttpConnection reserveConnection(final HttpRequestMetaData metaData) throws Exception {
metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
assignStrategy(metaData, strategy);
return blockingInvocation(client.reserveConnection(metaData)
.map(c -> new ReservedStreamingHttpConnectionToReservedBlockingHttpConnection(c, this.strategy,
reqRespFactory)));
Expand All @@ -57,7 +57,7 @@ public StreamingHttpClient asStreamingClient() {

@Override
public HttpResponse request(final HttpRequest request) throws Exception {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
assignStrategy(request, strategy);
return BlockingRequestUtils.request(client, request);
}

Expand Down Expand Up @@ -89,6 +89,7 @@ public HttpRequest newRequest(final HttpRequestMethod method, final String reque
static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnection implements
ReservedBlockingHttpConnection {
private final ReservedStreamingHttpConnection connection;
private final HttpExecutionStrategy strategy;
private final HttpConnectionContext context;
private final HttpExecutionContext executionContext;
private final HttpRequestResponseFactory reqRespFactory;
Expand All @@ -104,13 +105,13 @@ static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnecti
final HttpExecutionStrategy strategy,
final HttpRequestResponseFactory reqRespFactory) {

requireNonNull(strategy);
this.connection = requireNonNull(connection);
this.strategy = requireNonNull(strategy);
final HttpConnectionContext originalCtx = connection.connectionContext();
executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) {
@Override
public HttpExecutionStrategy executionStrategy() {
return strategy;
return ReservedStreamingHttpConnectionToReservedBlockingHttpConnection.this.strategy;
}
};
context = new DelegatingHttpConnectionContext(originalCtx) {
Expand Down Expand Up @@ -144,6 +145,7 @@ public <T> BlockingIterable<? extends T> transportEventIterable(final HttpEventK

@Override
public HttpResponse request(final HttpRequest request) throws Exception {
assignStrategy(request, strategy);
return BlockingRequestUtils.request(connection, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.servicetalk.concurrent.BlockingIterable;

import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpApiConversions.assignStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toBlockingStreaming;
import static io.servicetalk.http.api.StreamingHttpConnectionToBlockingStreamingHttpConnection.DEFAULT_BLOCKING_STREAMING_CONNECTION_STRATEGY;
Expand Down Expand Up @@ -46,9 +46,9 @@ public HttpExecutionStrategy executionStrategy() {
@Override
public ReservedBlockingStreamingHttpConnection reserveConnection(final HttpRequestMetaData metaData)
throws Exception {
assignStrategy(metaData, strategy);
// It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter).
// So we don't apply any explicit timeout here and just wait forever.
metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return blockingInvocation(client.reserveConnection(metaData)
.map(c -> new ReservedStreamingHttpConnectionToBlockingStreaming(c, this.strategy, reqRespFactory)));
}
Expand All @@ -60,7 +60,7 @@ public StreamingHttpClient asStreamingClient() {

@Override
public BlockingStreamingHttpResponse request(final BlockingStreamingHttpRequest request) throws Exception {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
assignStrategy(request, strategy);
return blockingInvocation(client.request(request.toStreamingRequest())).toBlockingStreamingResponse();
}

Expand Down Expand Up @@ -148,6 +148,7 @@ public <T> BlockingIterable<? extends T> transportEventIterable(final HttpEventK

@Override
public BlockingStreamingHttpResponse request(final BlockingStreamingHttpRequest request) throws Exception {
assignStrategy(request, strategy);
return blockingInvocation(connection.request(request.toStreamingRequest()))
.toBlockingStreamingResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpApiConversions.assignStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toAggregated;
import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_ASYNC_CONNECTION_STRATEGY;
Expand All @@ -46,7 +46,7 @@ public HttpExecutionStrategy executionStrategy() {
@Override
public Single<HttpResponse> request(final HttpRequest request) {
return Single.defer(() -> {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
assignStrategy(request, strategy);
return client.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe())
.shareContextOnSubscribe();
Expand All @@ -56,7 +56,7 @@ public Single<HttpResponse> request(final HttpRequest request) {
@Override
public Single<ReservedHttpConnection> reserveConnection(final HttpRequestMetaData metaData) {
return Single.defer(() -> {
metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
assignStrategy(metaData, strategy);
return client.reserveConnection(metaData)
.map(c -> new ReservedStreamingHttpConnectionToReservedHttpConnection(c, this.strategy,
reqRespFactory))
Expand Down Expand Up @@ -170,8 +170,12 @@ public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eve

@Override
public Single<HttpResponse> request(final HttpRequest request) {
return connection.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe());
return Single.defer(() -> {
assignStrategy(request, strategy);
return connection.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe())
.shareContextOnSubscribe();
});
}

@Override
Expand Down

0 comments on commit e257614

Please sign in to comment.