From 9f0cb31109ea8c968e76e936608b13fce00c64da Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 19 Dec 2024 12:40:43 -0700 Subject: [PATCH 1/2] Add a new context key and filter to optimize the grpc blocking error case --- .../io/servicetalk/grpc/api/GrpcRouter.java | 2 + .../grpc/netty/DefaultGrpcServerBuilder.java | 2 + ...GrpcTrailersOptimizationServiceFilter.java | 62 +++++++++++++++++++ .../grpc/netty/ProtocolCompatibilityTest.java | 4 -- .../servicetalk/http/api/HttpContextKeys.java | 3 + 5 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 6392f7f1dc..816423edf5 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -91,6 +91,7 @@ import static io.servicetalk.grpc.api.GrpcUtils.setStatusOk; import static io.servicetalk.grpc.api.GrpcUtils.validateContentType; import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService; +import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll; @@ -708,6 +709,7 @@ public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequ methodDescriptor.httpPath(), t); HttpHeaders trailers; if (grpcResponse == null || (trailers = grpcResponse.trailers()) == null) { + response.context().put(HTTP_OPTIMIZE_ERROR_STREAM, Boolean.TRUE); setStatus(response.headers(), t, allocator); // Use HTTP response to avoid setting "OK" in trailers and allocating a serializer response.sendMetaData().close(); diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index 6dccf4ad55..d7761616ce 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -146,6 +146,8 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() { final ExecutionContextInterceptorHttpServerBuilder interceptor = new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get()); + interceptor.appendServiceFilter(GrpcTrailersOptimizationServiceFilter.INSTANCE); + interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE); directCallInitializer.initialize(interceptor); diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java new file mode 100644 index 0000000000..e1cfb4706c --- /dev/null +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.netty; + +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.HttpExecutionStrategies; +import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.HttpResponse; +import io.servicetalk.http.api.HttpServiceContext; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponseFactory; +import io.servicetalk.http.api.StreamingHttpService; +import io.servicetalk.http.api.StreamingHttpServiceFilter; +import io.servicetalk.http.api.StreamingHttpServiceFilterFactory; + +import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; + +final class GrpcTrailersOptimizationServiceFilter implements StreamingHttpServiceFilterFactory { + static final GrpcTrailersOptimizationServiceFilter INSTANCE = new GrpcTrailersOptimizationServiceFilter(); + + private GrpcTrailersOptimizationServiceFilter() { + } + + @Override + public StreamingHttpServiceFilter create(StreamingHttpService service) { + return new StreamingHttpServiceFilter(service) { + @Override + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return super.handle(ctx, request, responseFactory).flatMap(response -> { + Single mappedResponse; + if (Boolean.TRUE.equals(response.context().get(HTTP_OPTIMIZE_ERROR_STREAM))) { + mappedResponse = response.toResponse().map(HttpResponse::toStreamingResponse); + } else { + mappedResponse = Single.succeeded(response); + } + return mappedResponse.shareContextOnSubscribe(); + }); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } +} diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java index ea95d17eed..4ee0cbf107 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java @@ -268,10 +268,6 @@ private static Collection clientServerParams() { for (boolean isClientServiceTalk : TRUE_FALSE) { for (boolean isServerServiceTalk : TRUE_FALSE) { for (boolean isServerBlocking : TRUE_FALSE) { - if (!isClientServiceTalk && isServerServiceTalk && isServerBlocking) { - // TODO there appears to be a potential bug in this combination. Separate bug filed. - continue; - } if (isServerServiceTalk || !isServerBlocking) { args.add(Arguments.of(isClientServiceTalk, isServerServiceTalk, isServerBlocking)); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java index a0a74cd89e..60833bd16d 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java @@ -68,6 +68,9 @@ public final class HttpContextKeys { public static final Key HTTP_FORCE_NEW_CONNECTION = newKey("HTTP_FORCE_NEW_CONNECTION", Boolean.class); + public static final Key HTTP_OPTIMIZE_ERROR_STREAM = + newKey("HTTP_OPTIMIZE_ERROR_STREAM", Boolean.class); + private HttpContextKeys() { // No instances } From 2445b1ea87e6ce328938d91ee5fc171f9c954c90 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 20 Dec 2024 14:24:06 -0700 Subject: [PATCH 2/2] Address feedback --- .../io/servicetalk/grpc/api/GrpcRouter.java | 4 +-- .../grpc/internal/GrpcContextKeys.java | 36 +++++++++++++++++++ .../grpc/netty/DefaultGrpcServerBuilder.java | 5 ++- ...rceTrailersOnlyResponseServiceFilter.java} | 13 +++---- .../servicetalk/http/api/HttpContextKeys.java | 3 -- 5 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java rename servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/{GrpcTrailersOptimizationServiceFilter.java => GrpcEnforceTrailersOnlyResponseServiceFilter.java} (82%) diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 816423edf5..06c580fb88 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -90,8 +90,8 @@ import static io.servicetalk.grpc.api.GrpcUtils.setStatus; import static io.servicetalk.grpc.api.GrpcUtils.setStatusOk; import static io.servicetalk.grpc.api.GrpcUtils.validateContentType; +import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE; import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll; @@ -709,7 +709,7 @@ public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequ methodDescriptor.httpPath(), t); HttpHeaders trailers; if (grpcResponse == null || (trailers = grpcResponse.trailers()) == null) { - response.context().put(HTTP_OPTIMIZE_ERROR_STREAM, Boolean.TRUE); + response.context().put(TRAILERS_ONLY_RESPONSE, Boolean.TRUE); setStatus(response.headers(), t, allocator); // Use HTTP response to avoid setting "OK" in trailers and allocating a serializer response.sendMetaData().close(); diff --git a/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java new file mode 100644 index 0000000000..aa0fe07628 --- /dev/null +++ b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.internal; + +import io.servicetalk.context.api.ContextMap; + +import static io.servicetalk.context.api.ContextMap.Key.newKey; + +/** + * All {@link ContextMap.Key}(s) defined for gRPC. + */ +public final class GrpcContextKeys { + /** + * For the blocking server this key allows the router to notify an upstream filter that it is safe to consolidate + * tailing empty data frames when set to true. + * + */ + public static final ContextMap.Key TRAILERS_ONLY_RESPONSE = + newKey("TRAILERS_ONLY_RESPONSE", Boolean.class); + + private GrpcContextKeys() { + } +} diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index d7761616ce..502a87d9be 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -145,9 +145,6 @@ private Single doListen(final GrpcServiceFactory serviceFa private ExecutionContextInterceptorHttpServerBuilder preBuild() { final ExecutionContextInterceptorHttpServerBuilder interceptor = new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get()); - - interceptor.appendServiceFilter(GrpcTrailersOptimizationServiceFilter.INSTANCE); - interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE); directCallInitializer.initialize(interceptor); @@ -156,6 +153,8 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() { } initializer.initialize(interceptor); + interceptor.appendServiceFilter(GrpcEnforceTrailersOnlyResponseServiceFilter.INSTANCE); + return interceptor; } diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java similarity index 82% rename from servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java rename to servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java index e1cfb4706c..35f951964f 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java @@ -27,12 +27,13 @@ import io.servicetalk.http.api.StreamingHttpServiceFilter; import io.servicetalk.http.api.StreamingHttpServiceFilterFactory; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; +import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE; -final class GrpcTrailersOptimizationServiceFilter implements StreamingHttpServiceFilterFactory { - static final GrpcTrailersOptimizationServiceFilter INSTANCE = new GrpcTrailersOptimizationServiceFilter(); +final class GrpcEnforceTrailersOnlyResponseServiceFilter implements StreamingHttpServiceFilterFactory { + static final GrpcEnforceTrailersOnlyResponseServiceFilter INSTANCE = + new GrpcEnforceTrailersOnlyResponseServiceFilter(); - private GrpcTrailersOptimizationServiceFilter() { + private GrpcEnforceTrailersOnlyResponseServiceFilter() { } @Override @@ -42,9 +43,9 @@ public StreamingHttpServiceFilter create(StreamingHttpService service) { public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { - return super.handle(ctx, request, responseFactory).flatMap(response -> { + return delegate().handle(ctx, request, responseFactory).flatMap(response -> { Single mappedResponse; - if (Boolean.TRUE.equals(response.context().get(HTTP_OPTIMIZE_ERROR_STREAM))) { + if (Boolean.TRUE.equals(response.context().get(TRAILERS_ONLY_RESPONSE))) { mappedResponse = response.toResponse().map(HttpResponse::toStreamingResponse); } else { mappedResponse = Single.succeeded(response); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java index 60833bd16d..a0a74cd89e 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java @@ -68,9 +68,6 @@ public final class HttpContextKeys { public static final Key HTTP_FORCE_NEW_CONNECTION = newKey("HTTP_FORCE_NEW_CONNECTION", Boolean.class); - public static final Key HTTP_OPTIMIZE_ERROR_STREAM = - newKey("HTTP_OPTIMIZE_ERROR_STREAM", Boolean.class); - private HttpContextKeys() { // No instances }