diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcStatusCode.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcStatusCode.java
index cf50d65465..99beca234f 100644
--- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcStatusCode.java
+++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcStatusCode.java
@@ -40,17 +40,34 @@ public enum GrpcStatusCode {
NOT_FOUND(5),
/** Some entity that we attempted to create already exists. */
ALREADY_EXISTS(6),
- /** Permission denied for a particular client. Different from {@link #UNAUTHENTICATED}. */
+ /**
+ * Permission denied for a particular client. Must not be used for the following cases:
+ *
+ * - rejections caused by exhausting some resource (use {@link #RESOURCE_EXHAUSTED} instead)
+ * - the caller cannot be identified (use {@link #UNAUTHENTICATED} instead)
+ *
+ */
PERMISSION_DENIED(7),
/** Resource exhausted. */
RESOURCE_EXHAUSTED(8),
- /** The action cannot be executed on the current system state. Client should not retry.. */
+ /** The action cannot be executed on the current system state. Client should not retry. */
FAILED_PRECONDITION(9),
- /** Aborted, typically due to a concurrency issue (think CAS). Client may retry the whole sequence.. */
+ /** Aborted, typically due to a concurrency issue (think CAS). Client may retry the whole sequence. */
ABORTED(10),
- /** Used for range errors. */
+ /**
+ * Used for range errors (e.g. seeking or reading past end of file.)
+ *
+ * Unlike {@link #INVALID_ARGUMENT}, this error indicates a problem that may be fixed if the system state changes.
+ * For example, a 32-bit file system will generate {@link #INVALID_ARGUMENT} if asked to read at an offset that is
+ * not in the range [0,2^32-1], but it will generate OUT_OF_RANGE if asked to read from an offset past the current
+ * file size.
+ *
+ * There is a fair bit of overlap with {@link #FAILED_PRECONDITION}. This error is more specific and recommended
+ * in scenarios when callers who are iterating through a space can easily look for an OUT_OF_RANGE error to detect
+ * when they are done.
+ */
OUT_OF_RANGE(11),
- /** Unimplemented action. */
+ /** The method/operation is not implemented/supported. */
UNIMPLEMENTED(12),
/** Internal invariant violated. */
INTERNAL(13),
diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java
index f5a2c150f0..8644e1e19e 100644
--- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java
+++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java
@@ -35,6 +35,7 @@
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.HttpResponseFactory;
import io.servicetalk.http.api.HttpResponseMetaData;
+import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpSerializer;
import io.servicetalk.http.api.StatelessTrailersTransformer;
import io.servicetalk.http.api.StreamingHttpResponse;
@@ -77,7 +78,11 @@
import static io.servicetalk.grpc.api.GrpcHeaderValues.SERVICETALK_USER_AGENT;
import static io.servicetalk.grpc.api.GrpcStatusCode.CANCELLED;
import static io.servicetalk.grpc.api.GrpcStatusCode.DEADLINE_EXCEEDED;
-import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL;
+import static io.servicetalk.grpc.api.GrpcStatusCode.FAILED_PRECONDITION;
+import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT;
+import static io.servicetalk.grpc.api.GrpcStatusCode.PERMISSION_DENIED;
+import static io.servicetalk.grpc.api.GrpcStatusCode.UNAUTHENTICATED;
+import static io.servicetalk.grpc.api.GrpcStatusCode.UNAVAILABLE;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN;
import static io.servicetalk.grpc.api.GrpcStatusCode.fromHttp2ErrorCode;
@@ -89,6 +94,19 @@
import static io.servicetalk.http.api.HttpHeaderNames.USER_AGENT;
import static io.servicetalk.http.api.HttpHeaderValues.TRAILERS;
import static io.servicetalk.http.api.HttpRequestMethod.POST;
+import static io.servicetalk.http.api.HttpResponseStatus.BAD_GATEWAY;
+import static io.servicetalk.http.api.HttpResponseStatus.EXPECTATION_FAILED;
+import static io.servicetalk.http.api.HttpResponseStatus.FORBIDDEN;
+import static io.servicetalk.http.api.HttpResponseStatus.GATEWAY_TIMEOUT;
+import static io.servicetalk.http.api.HttpResponseStatus.NOT_FOUND;
+import static io.servicetalk.http.api.HttpResponseStatus.NOT_IMPLEMENTED;
+import static io.servicetalk.http.api.HttpResponseStatus.OK;
+import static io.servicetalk.http.api.HttpResponseStatus.PRECONDITION_FAILED;
+import static io.servicetalk.http.api.HttpResponseStatus.REQUEST_TIMEOUT;
+import static io.servicetalk.http.api.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.CLIENT_ERROR_4XX;
+import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS;
+import static io.servicetalk.http.api.HttpResponseStatus.UNAUTHORIZED;
import static java.lang.String.valueOf;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
@@ -263,10 +281,39 @@ static GrpcStatusException toGrpcException(Throwable cause) {
: new GrpcStatusException(toGrpcStatus(cause), () -> null);
}
+ private static void validateStatusCode(HttpResponseStatus status) {
+ final int statusCode = status.code();
+ if (statusCode == OK.code()) {
+ return;
+ }
+ final GrpcStatusCode grpcStatusCode;
+ if (statusCode == BAD_GATEWAY.code() || statusCode == SERVICE_UNAVAILABLE.code() ||
+ statusCode == GATEWAY_TIMEOUT.code() || statusCode == TOO_MANY_REQUESTS.code()) {
+ grpcStatusCode = UNAVAILABLE;
+ } else if (statusCode == UNAUTHORIZED.code()) {
+ grpcStatusCode = UNAUTHENTICATED;
+ } else if (statusCode == FORBIDDEN.code()) {
+ grpcStatusCode = PERMISSION_DENIED;
+ } else if (statusCode == NOT_FOUND.code() || statusCode == NOT_IMPLEMENTED.code()) {
+ grpcStatusCode = UNIMPLEMENTED;
+ } else if (statusCode == REQUEST_TIMEOUT.code()) {
+ grpcStatusCode = DEADLINE_EXCEEDED;
+ } else if (statusCode == PRECONDITION_FAILED.code() || statusCode == EXPECTATION_FAILED.code()) {
+ grpcStatusCode = FAILED_PRECONDITION;
+ } else if (CLIENT_ERROR_4XX.contains(statusCode)) {
+ grpcStatusCode = INVALID_ARGUMENT;
+ } else {
+ grpcStatusCode = UNKNOWN;
+ }
+ throw GrpcStatusException.of(Status.newBuilder().setCode(grpcStatusCode.value())
+ .setMessage("HTTP status code: " + status).build());
+ }
+
static Publisher validateResponseAndGetPayload(final StreamingHttpResponse response,
final CharSequence expectedContentType,
final BufferAllocator allocator,
final GrpcStreamingDeserializer deserializer) {
+ validateStatusCode(response.status()); // gRPC protocol requires 200, don't look further if this check fails.
// In case of an empty response, gRPC-server may return only one HEADER frame with endStream=true. Our
// HTTP1-based implementation translates them into response headers so we need to look for a grpc-status in both
// headers and trailers. Since this is streaming response and we have the headers now, we check for the
@@ -293,6 +340,7 @@ static Resp validateResponseAndGetPayload(final HttpResponse response,
final CharSequence expectedContentType,
final BufferAllocator allocator,
final GrpcDeserializer deserializer) {
+ validateStatusCode(response.status()); // gRPC protocol requires 200, don't look further if this check fails.
// In case of an empty response, gRPC-server may return only one HEADER frame with endStream=true. Our
// HTTP1-based implementation translates them into response headers so we need to look for a grpc-status in both
// headers and trailers.
@@ -320,7 +368,7 @@ static void validateContentType(HttpHeaders headers, CharSequence expectedConten
if (!contentEqualsIgnoreCase(requestContentType, expectedContentType) &&
(requestContentType == null ||
!regionMatches(requestContentType, true, 0, APPLICATION_GRPC, 0, APPLICATION_GRPC.length()))) {
- throw GrpcStatusException.of(Status.newBuilder().setCode(INTERNAL.value())
+ throw GrpcStatusException.of(Status.newBuilder().setCode(UNKNOWN.value())
.setMessage("invalid content-type: " + requestContentType).build());
}
}
diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/HttpResponseUponGrpcRequestTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/HttpResponseUponGrpcRequestTest.java
index 6ec8aeee2d..b717b88ec2 100644
--- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/HttpResponseUponGrpcRequestTest.java
+++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/HttpResponseUponGrpcRequestTest.java
@@ -49,7 +49,7 @@ final class HttpResponseUponGrpcRequestTest {
ServerContext serverContext = HttpServers.forAddress(localAddress(0))
.protocols(h2Default())
.listenAndAwait((ctx, request, responseFactory) ->
- succeeded(responseFactory.badRequest().payloadBody(responsePayload, textSerializerUtf8())));
+ succeeded(responseFactory.ok().payloadBody(responsePayload, textSerializerUtf8())));
client = GrpcClients.forAddress(serverHostAndPort(serverContext))
.buildBlocking(new TesterProto.Tester.ClientFactory());
@@ -111,7 +111,7 @@ private static void assertThrowsGrpcStatusException(Executable executable) {
}
private static void assertGrpcStatusException(GrpcStatusException grpcStatusException) {
- assertThat(grpcStatusException.status().code(), is(GrpcStatusCode.INTERNAL));
+ assertThat(grpcStatusException.status().code(), is(GrpcStatusCode.UNKNOWN));
String description = grpcStatusException.status().description();
assertThat(description, notNullValue());
assertThat(description, containsString("invalid content-type: text/plain;"));
diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java
index 66cdfe088f..804b25f45a 100644
--- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java
+++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java
@@ -15,6 +15,7 @@
*/
package io.servicetalk.grpc.netty;
+import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.BlockingIterable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource.Processor;
@@ -51,7 +52,9 @@
import io.servicetalk.grpc.netty.CompatProto.Compat.ServiceFactory;
import io.servicetalk.grpc.netty.CompatProto.RequestContainer.CompatRequest;
import io.servicetalk.grpc.netty.CompatProto.ResponseContainer.CompatResponse;
+import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServerBuilder;
+import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
@@ -69,6 +72,7 @@
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Code;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
@@ -85,6 +89,7 @@
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
@@ -116,11 +121,22 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS;
import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy;
-import static io.servicetalk.grpc.api.GrpcExecutionStrategies.offloadNever;
+import static io.servicetalk.grpc.api.GrpcExecutionStrategy.from;
import static io.servicetalk.grpc.api.GrpcStatusCode.CANCELLED;
import static io.servicetalk.grpc.api.GrpcStatusCode.DEADLINE_EXCEEDED;
+import static io.servicetalk.grpc.api.GrpcStatusCode.FAILED_PRECONDITION;
+import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL;
+import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT;
+import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_TIMEOUT_HEADER_KEY;
-import static io.servicetalk.http.netty.HttpProtocolConfigs.h2;
+import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
+import static io.servicetalk.http.api.HttpResponseStatus.BAD_REQUEST;
+import static io.servicetalk.http.api.HttpResponseStatus.EXPECTATION_FAILED;
+import static io.servicetalk.http.api.HttpResponseStatus.PRECONDITION_FAILED;
+import static io.servicetalk.http.api.HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE;
+import static io.servicetalk.http.api.HttpResponseStatus.REQUEST_TIMEOUT;
+import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.CLIENT_ERROR_4XX;
+import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.test.resources.DefaultTestCerts.loadServerKey;
import static io.servicetalk.test.resources.DefaultTestCerts.loadServerPem;
import static io.servicetalk.test.resources.DefaultTestCerts.serverPemHostname;
@@ -132,6 +148,7 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -404,7 +421,7 @@ void grpcJavaToServiceTalkErrorInResponseNoOffload(final boolean ssl,
final String compression)
throws Exception {
final TestServerContext server = serviceTalkServer(ErrorMode.SIMPLE_IN_RESPONSE, ssl,
- offloadNever(), compression, null);
+ from(offloadNone()), compression, null);
final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null);
testGrpcError(client, server, false, streaming, compression);
}
@@ -458,7 +475,7 @@ void grpcJavaToServiceTalkErrorWithStatusInResponseNoOffloads(
final boolean streaming,
final String compression) throws Exception {
final TestServerContext server = serviceTalkServer(ErrorMode.STATUS_IN_RESPONSE, ssl,
- offloadNever(), compression, null);
+ from(offloadNone()), compression, null);
final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null);
testGrpcError(client, server, true, streaming, compression);
}
@@ -485,6 +502,62 @@ void grpcJavaToServiceTalkBlocking(
testRequestResponse(client, server, streaming, compression);
}
+ @Test
+ void clientH2ReturnStatus() throws Exception {
+ try (HttpServerContext server = HttpServers.forAddress(localAddress(0))
+ .protocols(h2Default())
+ .listenBlockingAndAwait((ctx, request, responseFactory) -> {
+ byte meta = request.payloadBody().readByte();
+ if (meta != 0) {
+ throw new IllegalArgumentException("compression not supported");
+ }
+ int length = request.payloadBody().readInt();
+ if (request.payloadBody().readableBytes() != length) {
+ throw new IllegalArgumentException("payload body length incomplete: " + length);
+ }
+ CompatRequest compatRequest = CompatRequest.parser().parseFrom(
+ Buffer.asInputStream(request.payloadBody()));
+ return responseFactory.newResponse(HttpResponseStatus.of(compatRequest.getId(), "foo"))
+ .payloadBody(ctx.executionContext().bufferAllocator().fromAscii("error"));
+ });
+ CompatClient grpcJavaClient = grpcJavaClient(server.listenAddress(), null, false, null);
+ CompatClient stClient = serviceTalkClient(server.listenAddress(), false, null, null)) {
+ for (int httpCode = 100; httpCode < 999; ++httpCode) {
+ CompatRequest request = CompatRequest.newBuilder().setId(httpCode).build();
+ int grpcJavaCode = Code.OK.getNumber();
+ int stCode = GrpcStatusCode.OK.value();
+ try {
+ grpcJavaClient.scalarCall(request).toFuture().get();
+ } catch (ExecutionException e) {
+ grpcJavaCode = ((StatusRuntimeException) e.getCause()).getStatus().getCode().value();
+ }
+ try {
+ stClient.scalarCall(request).toFuture().get();
+ } catch (ExecutionException e) {
+ stCode = ((GrpcStatusException) e.getCause()).status().code().value();
+ }
+ if (httpCode < 200) {
+ // grpc-java maps 1xx responses to error code INTERNAL, we currently map to UNKNOWN. The test server
+ // isn't following the http protocol by returning only a 1xx response and each framework catches
+ // this exception differently internally.
+ assertThat("h2 response code: " + httpCode, stCode, equalTo(UNKNOWN.value()));
+ assertThat("h2 response code: " + httpCode, grpcJavaCode, equalTo(INTERNAL.value()));
+ } else if (httpCode == REQUEST_TIMEOUT.code()) {
+ assertThat("h2 response code: " + httpCode, stCode, equalTo(DEADLINE_EXCEEDED.value()));
+ assertThat("h2 response code: " + httpCode, grpcJavaCode, equalTo(UNKNOWN.value()));
+ } else if (httpCode == PRECONDITION_FAILED.code() || httpCode == EXPECTATION_FAILED.code()) {
+ assertThat("h2 response code: " + httpCode, stCode, equalTo(FAILED_PRECONDITION.value()));
+ assertThat("h2 response code: " + httpCode, grpcJavaCode, equalTo(UNKNOWN.value()));
+ } else if (stCode != grpcJavaCode && CLIENT_ERROR_4XX.contains(httpCode)) {
+ assertThat("h2 response code: " + httpCode, stCode, equalTo(INVALID_ARGUMENT.value()));
+ assertThat("h2 response code: " + httpCode, grpcJavaCode, equalTo(
+ httpCode == BAD_REQUEST.code() || httpCode == REQUEST_HEADER_FIELDS_TOO_LARGE.code() ?
+ INTERNAL.value() : UNKNOWN.value()));
+ }
+ }
+ }
+ }
+
@ParameterizedTest
@MethodSource("sslStreamingAndCompressionParams")
void grpcJavaToServiceTalkBlockingError(final boolean ssl,
@@ -603,7 +676,7 @@ void timeoutMidRequest(boolean stClient, boolean stServer, boolean clientInitiat
Duration serverTimeout = clientInitiatedTimeout ? null : DEFAULT_DEADLINE;
BlockingQueue serverErrorQueue = new ArrayBlockingQueue<>(16);
final TestServerContext server = stServer ?
- serviceTalkServer(ErrorMode.NONE, false, offloadNever(), null, null, serverErrorQueue) :
+ serviceTalkServer(ErrorMode.NONE, false, from(offloadNone()), null, null, serverErrorQueue) :
grpcJavaServer(ErrorMode.NONE, false, null);
try (ServerContext proxyCtx = buildTimeoutProxy(server.listenAddress(), serverTimeout, false)) {
final CompatClient client = stClient ?
@@ -633,8 +706,8 @@ void timeoutMidRequest(boolean stClient, boolean stServer, boolean clientInitiat
private static ServerContext buildTimeoutProxy(SocketAddress serverAddress, @Nullable Duration forcedTimeout,
boolean ssl) throws Exception {
HttpServerBuilder proxyBuilder = HttpServers.forAddress(localAddress(0))
- .executionStrategy(offloadNever())
- .protocols(h2().build());
+ .executionStrategy(from(offloadNone()))
+ .protocols(h2Default());
if (ssl) {
proxyBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build());
@@ -651,8 +724,8 @@ private static final class RemoveTimeoutHeaderProxy implements StreamingHttpServ
boolean ssl) {
SingleAddressHttpClientBuilder builder =
HttpClients.forResolvedAddress((InetSocketAddress) serverAddress)
- .executionStrategy(offloadNever())
- .protocols(h2().build());
+ .executionStrategy(from(offloadNone()))
+ .protocols(h2Default());
if (ssl) {
builder.sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem)
.peerHost(serverPemHostname()).build());