Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply optimized execution strategy for converted connections #2634

Merged
merged 3 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public HttpRequest newRequest(final HttpRequestMethod method, final String reque
static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnection implements
ReservedBlockingHttpConnection {
private final ReservedStreamingHttpConnection connection;
private final HttpExecutionStrategy strategy;
private final HttpConnectionContext context;
private final HttpExecutionContext executionContext;
private final HttpRequestResponseFactory reqRespFactory;
Expand All @@ -104,13 +105,13 @@ static final class ReservedStreamingHttpConnectionToReservedBlockingHttpConnecti
final HttpExecutionStrategy strategy,
final HttpRequestResponseFactory reqRespFactory) {

requireNonNull(strategy);
this.connection = requireNonNull(connection);
this.strategy = requireNonNull(strategy);
final HttpConnectionContext originalCtx = connection.connectionContext();
executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) {
@Override
public HttpExecutionStrategy executionStrategy() {
return strategy;
return ReservedStreamingHttpConnectionToReservedBlockingHttpConnection.this.strategy;
}
};
context = new DelegatingHttpConnectionContext(originalCtx) {
Expand Down Expand Up @@ -144,6 +145,7 @@ public <T> BlockingIterable<? extends T> transportEventIterable(final HttpEventK

@Override
public HttpResponse request(final HttpRequest request) throws Exception {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't have cycles right now to deal with ClientEffectiveStrategyTest.
I've verified locally that it works as expected after my fix and filed #2635.
The idea is that the request method for every ReservedStreamingHttpConnectionToReserved*HttpConnection should behave the same way as its top-level StreamingHttpClientTo*HttpClient.

Copy link
Member

@Scottmitch Scottmitch Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments clarifying why and maybe an internal static utility function that makes it easier to track?

return BlockingRequestUtils.request(connection, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public <T> BlockingIterable<? extends T> transportEventIterable(final HttpEventK

@Override
public BlockingStreamingHttpResponse request(final BlockingStreamingHttpRequest request) throws Exception {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return blockingInvocation(connection.request(request.toStreamingRequest()))
.toBlockingStreamingResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eve

@Override
public Single<HttpResponse> request(final HttpRequest request) {
return connection.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe());
return Single.defer(() -> {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return connection.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe())
.shareContextOnSubscribe();
});
}

@Override
Expand Down