Skip to content

Commit

Permalink
Treat empty trailers as null
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodave committed Dec 19, 2024
1 parent 100ddc1 commit 21fbd8b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() {
final ExecutionContextInterceptorHttpServerBuilder interceptor =
new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get());

interceptor.appendServiceFilter(TrailersOptimizationFilter::new);
interceptor.appendServiceFilter(GrpcTrailersOptimizationServiceFilter.INSTANCE);

interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM;

final class GrpcTrailersOptimizationServiceFilter implements StreamingHttpServiceFilterFactory {
static final GrpcTrailersOptimizationServiceFilter INSTANCE = new GrpcTrailersOptimizationServiceFilter();

private GrpcTrailersOptimizationServiceFilter() {
}

@Override
public StreamingHttpServiceFilter create(StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
return super.handle(ctx, request, responseFactory).flatMap(response -> {
Single<StreamingHttpResponse> mappedResponse;
if (Boolean.TRUE.equals(response.context().get(HTTP_OPTIMIZE_ERROR_STREAM))) {
mappedResponse = response.payloadBody(Publisher.empty()).toResponse()
.map(HttpResponse::toStreamingResponse);
} else {
mappedResponse = Single.succeeded(response);
}
return mappedResponse.shareContextOnSubscribe();
});
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ private static Collection<Arguments> clientServerParams() {
for (boolean isClientServiceTalk : TRUE_FALSE) {
for (boolean isServerServiceTalk : TRUE_FALSE) {
for (boolean isServerBlocking : TRUE_FALSE) {
// if (!isClientServiceTalk && isServerServiceTalk && isServerBlocking) {
// // TODO there appears to be a potential bug in this combination. Separate bug filed.
// continue;
// }
if (isServerServiceTalk || !isServerBlocking) {
args.add(Arguments.of(isClientServiceTalk, isServerServiceTalk, isServerBlocking));
}
Expand Down Expand Up @@ -656,22 +652,22 @@ void unimplementedServiceError(final boolean isServiceTalkClient,

try (TestServerContext server = serverSupplier.get();
CompatClient client = clientSupplier.apply(server.listenAddress())) {
// final Single<CompatResponse> scalarResponse =
// client.scalarCall(CompatRequest.newBuilder().setId(1).build());
// validateGrpcErrorInResponse(scalarResponse.toFuture(), false, UNIMPLEMENTED,
// "Method grpc.netty.Compat/ScalarCall is unimplemented");
final Single<CompatResponse> scalarResponse =
client.scalarCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(scalarResponse.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/ScalarCall is unimplemented");
final Single<CompatResponse> clientStreamingResponse =
client.clientStreamingCall(Publisher.from(CompatRequest.newBuilder().setId(1).build()));
validateGrpcErrorInResponse(clientStreamingResponse.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/clientStreamingCall is unimplemented");
// final Publisher<CompatResponse> serverStreamingResponse =
// client.serverStreamingCall(CompatRequest.newBuilder().setId(1).build());
// validateGrpcErrorInResponse(serverStreamingResponse.toFuture(), false, UNIMPLEMENTED,
// "Method grpc.netty.Compat/serverStreamingCall is unimplemented");
// final Publisher<CompatResponse> bidirectionalStreamingResponse =
// client.bidirectionalStreamingCall(Publisher.from(CompatRequest.newBuilder().setId(1).build()));
// validateGrpcErrorInResponse(bidirectionalStreamingResponse.toFuture(), false, UNIMPLEMENTED,
// "Method grpc.netty.Compat/bidirectionalStreamingCall is unimplemented");
final Publisher<CompatResponse> serverStreamingResponse =
client.serverStreamingCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(serverStreamingResponse.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/serverStreamingCall is unimplemented");
final Publisher<CompatResponse> bidirectionalStreamingResponse =
client.bidirectionalStreamingCall(Publisher.from(CompatRequest.newBuilder().setId(1).build()));
validateGrpcErrorInResponse(bidirectionalStreamingResponse.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/bidirectionalStreamingCall is unimplemented");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class DefaultHttpResponse extends AbstractDelegatingHttpResponse
@Nullable final HttpHeaders trailers) {
super(original);
this.payloadBody = payloadBody;
this.trailers = trailers;
this.trailers = (trailers != null && trailers.isEmpty()) ? null : trailers;
}

@Override
Expand Down

0 comments on commit 21fbd8b

Please sign in to comment.