Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up deprecated requester/client/filter API #1960

Merged
merged 1 commit into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand All @@ -44,9 +43,8 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return delegate.request(strategy, request).flatMap(response -> {
return delegate.request(request).flatMap(response -> {
if (!OK.equals(response.status())) {
return failed(new BadResponseStatusException("Bad response status from " + backendName + ": " +
response.status()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.servicetalk.http.api.BlockingStreamingHttpResponse;
import io.servicetalk.http.api.HttpClient;
import io.servicetalk.http.api.HttpRequest;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand All @@ -51,6 +52,7 @@
import static io.servicetalk.grpc.api.GrpcUtils.toGrpcException;
import static io.servicetalk.grpc.api.GrpcUtils.validateResponseAndGetPayload;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_DEADLINE_KEY;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static java.util.Objects.requireNonNull;

final class DefaultGrpcClientCallFactory implements GrpcClientCallFactory {
Expand Down Expand Up @@ -102,9 +104,8 @@ public <Req, Resp> ClientCall<Req, Resp> newCall(final MethodDescriptor<Req, Res
HttpRequest httpRequest = client.post(UNKNOWN_PATH.equals(mdPath) ? metadata.path() : mdPath);
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request, client.executionContext().bufferAllocator()));
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
return (strategy == null ? client.request(httpRequest) : client.request(strategy, httpRequest))
assignStrategy(httpRequest, metadata);
return client.request(httpRequest)
.map(response -> validateResponseAndGetPayload(response, responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(response.headers(),
deserializerIdentity, deserializers, GrpcDeserializer::messageEncoding)))
Expand Down Expand Up @@ -147,10 +148,8 @@ public <Req, Resp> StreamingClientCall<Req, Resp> newStreamingCall(
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request,
streamingHttpClient.executionContext().bufferAllocator()));
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
return (strategy == null ? streamingHttpClient.request(httpRequest) :
streamingHttpClient.request(strategy, httpRequest))
assignStrategy(httpRequest, metadata);
return streamingHttpClient.request(httpRequest)
.flatMapPublisher(response -> validateResponseAndGetPayload(response, responseContentType,
streamingHttpClient.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
Expand Down Expand Up @@ -235,10 +234,8 @@ public <Req, Resp> BlockingClientCall<Req, Resp> newBlockingCall(
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request, client.executionContext().bufferAllocator()));
try {
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
final HttpResponse response = strategy == null ? client.request(httpRequest) :
client.request(strategy, httpRequest);
assignStrategy(httpRequest, metadata);
final HttpResponse response = client.request(httpRequest);
return validateResponseAndGetPayload(response, responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(response.headers(),
deserializerIdentity, deserializers, GrpcDeserializer::messageEncoding));
Expand Down Expand Up @@ -285,10 +282,8 @@ public <Req, Resp> BlockingStreamingClientCall<Req, Resp> newBlockingStreamingCa
httpRequest.payloadBody(serializer.serialize(request,
streamingHttpClient.executionContext().bufferAllocator()));
try {
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
final BlockingStreamingHttpResponse response = strategy == null ? client.request(httpRequest) :
client.request(strategy, httpRequest);
assignStrategy(httpRequest, metadata);
final BlockingStreamingHttpResponse response = client.request(httpRequest);
return validateResponseAndGetPayload(response.toStreamingResponse(), responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
Expand Down Expand Up @@ -379,7 +374,8 @@ public Completable onClose() {
* @param metaDataTimeout the timeout specified in client metadata or null for no timeout
* @return The timeout {@link Duration}, potentially negative or null if no timeout.
*/
private @Nullable Duration timeoutForRequest(@Nullable Duration metaDataTimeout) {
@Nullable
private Duration timeoutForRequest(@Nullable Duration metaDataTimeout) {
Long deadline = AsyncContext.get(GRPC_DEADLINE_KEY);
@Nullable
Duration contextTimeout = null != deadline ? Duration.ofNanos(deadline - System.nanoTime()) : null;
Expand Down Expand Up @@ -447,4 +443,12 @@ private static <Req> GrpcStreamingSerializer<Req> streamingSerializer(
private static <Resp> GrpcDeserializer<Resp> deserializer(MethodDescriptor<?, Resp> methodDescriptor) {
return new GrpcDeserializer<>(methodDescriptor.responseDescriptor().serializerDescriptor().serializer());
}

private static void assignStrategy(HttpRequestMetaData requestMetaData, GrpcClientMetadata grpcMetadata) {
@Nullable
final GrpcExecutionStrategy strategy = grpcMetadata.strategy();
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
if (strategy != null) {
requestMetaData.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,30 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie

@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate, strategy, request);
return CatchAllHttpClientFilter.request(delegate, request);
}

@Override
public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnection(
final HttpExecutionStrategy strategy, final HttpRequestMetaData metaData) {
final HttpRequestMetaData metaData) {

return delegate().reserveConnection(strategy, metaData)
return delegate().reserveConnection(metaData)
.map(r -> new ReservedStreamingHttpConnectionFilter(r) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate, strategy, request);
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate(), request);
}
});
}
};
}

private static Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
final Single<StreamingHttpResponse> resp;
try {
resp = delegate.request(strategy, request);
resp = delegate.request(request);
} catch (Throwable t) {
return failed(toGrpcException(t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
Expand Down Expand Up @@ -686,7 +685,6 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
if (throwEx) {
return throwException(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.ServiceFactory;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
Expand Down Expand Up @@ -90,13 +89,12 @@ private void setUp(boolean streamingService, boolean streamingClient) throws Exc
.initializeHttp(builder -> builder.appendClientFilter(origin -> new StreamingHttpClientFilter(origin) {
@Override
protected Single<StreamingHttpResponse> request(StreamingHttpRequester delegate,
HttpExecutionStrategy strategy,
StreamingHttpRequest request) {
// Change path to send the request to the route API that expects only a single request item
// and generates requested number of response items:
return defer(() -> {
request.requestTarget(BlockingTestResponseStreamRpc.PATH);
return delegate.request(strategy, request).subscribeShareContext();
return delegate.request(request).subscribeShareContext();
});
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClientFilter;
Expand Down Expand Up @@ -223,9 +222,8 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return super.request(delegate, strategy, request)
return super.request(delegate, request)
.map(response -> {
assertGrpcStatusInHeaders(response, errors);
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,10 @@
*/
package io.servicetalk.http.api;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

/**
* The equivalent of {@link HttpClient} but with synchronous/blocking APIs instead of asynchronous APIs.
*/
public interface BlockingHttpClient extends BlockingHttpRequester {
/**
* Send a {@code request}.
*
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
*/
@Override // FIXME: 0.42 - remove, this method is defined in BlockingHttpRequester
HttpResponse request(HttpRequest request) throws Exception;

/**
* Reserve a {@link BlockingHttpConnection} based on provided {@link HttpRequestMetaData}.
*
Expand All @@ -42,26 +30,6 @@ public interface BlockingHttpClient extends BlockingHttpRequester {
*/
ReservedBlockingHttpConnection reserveConnection(HttpRequestMetaData metaData) throws Exception;

/**
* Reserve a {@link BlockingHttpConnection} based on provided {@link HttpRequestMetaData}.
*
* @param strategy {@link HttpExecutionStrategy} to use.
* @param metaData Allows the underlying layers to know what {@link BlockingHttpConnection}s are valid to
* reserve for future {@link HttpRequest requests} with the same {@link HttpRequestMetaData}.
* For example this may provide some insight into shard or other info.
* @return a {@link ReservedBlockingHttpConnection}.
* @throws Exception if an exception occurs during the reservation process.
* @deprecated Use {@link #reserveConnection(HttpRequestMetaData)}. If an {@link HttpExecutionStrategy} needs to be
* altered, provide a value for {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY} in the
* {@link HttpRequestMetaData#context() request context}.
*/
@Deprecated
default ReservedBlockingHttpConnection reserveConnection(HttpExecutionStrategy strategy,
HttpRequestMetaData metaData) throws Exception {
metaData.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return reserveConnection(metaData);
}

/**
* Convert this {@link BlockingHttpClient} to the {@link StreamingHttpClient} API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@
* The equivalent of {@link HttpConnection} but with synchronous/blocking APIs instead of asynchronous APIs.
*/
public interface BlockingHttpConnection extends BlockingHttpRequester {
/**
* Send a {@code request}.
*
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
*/
@Override // FIXME: 0.42 - remove, this method is defined in BlockingHttpRequester
HttpResponse request(HttpRequest request) throws Exception;

/**
* Get the {@link HttpConnectionContext}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import io.servicetalk.concurrent.GracefulAutoCloseable;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

/**
* The equivalent of {@link HttpRequester} with synchronous/blocking APIs instead of asynchronous APIs.
*/
Expand All @@ -29,28 +27,7 @@ public interface BlockingHttpRequester extends HttpRequestFactory, GracefulAutoC
* @param request the request to send.
* @return The response.
*/
default HttpResponse request(HttpRequest request) throws Exception {
// FIXME: 0.42 - remove default impl
throw new UnsupportedOperationException("Method request(HttpRequest) is not supported by " +
getClass().getName());
}

/**
* Send a {@code request} using the passed {@link HttpExecutionStrategy strategy}.
*
* @param strategy {@link HttpExecutionStrategy} to use.
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
* @deprecated Use {@link #request(HttpRequest)}. If an {@link HttpExecutionStrategy} needs to be altered, provide a
* value for {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY} in the
* {@link HttpRequestMetaData#context() request context}.
*/
@Deprecated
default HttpResponse request(HttpExecutionStrategy strategy, HttpRequest request) throws Exception {
request.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return request(request);
}
HttpResponse request(HttpRequest request) throws Exception;

/**
* Get the {@link HttpExecutionContext} used during construction of this object.
Expand Down
Loading