Skip to content

Commit

Permalink
Update for grpc response
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhiguo committed Jan 21, 2025
1 parent 4c8a2dc commit af6e3dc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.jd.live.agent.core.util.network.Ipv4;
import com.jd.live.agent.demo.grpc.service.api.*;
import com.jd.live.agent.demo.response.LiveTransmission;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

Expand All @@ -29,7 +30,7 @@ public class UserServiceGrpcImpl extends UserServiceGrpc.UserServiceImplBase {
@Override
public void get(UserGetRequest request, StreamObserver<UserGetResponse> responseObserver) {
if (request.getId() < 0 && ThreadLocalRandom.current().nextInt(3) == 0) {
responseObserver.onError(new RuntimeException("error"));
responseObserver.onError(Status.INTERNAL.withDescription("Server error!").asException());
return;
}
if (request.getId() >= 100) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@
package com.jd.live.agent.plugin.router.gprc.request;

import com.jd.live.agent.bootstrap.exception.RejectException.RejectNoProviderException;
import com.jd.live.agent.governance.exception.ErrorName;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcInboundRequest;
import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcOutboundRequest;
import com.jd.live.agent.governance.request.RoutedRequest;
import com.jd.live.agent.plugin.router.gprc.exception.GrpcException;
import com.jd.live.agent.plugin.router.gprc.exception.GrpcException.GrpcClientException;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveDiscovery;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRequest;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRouteResult;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.*;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -88,6 +88,17 @@ public List<String> getHeaders(String key) {
*/
class GrpcOutboundRequest extends AbstractRpcOutboundRequest<LiveRequest<?, ?>> implements GrpcRequest, RoutedRequest {

private static final Function<Throwable, ErrorName> GRPC_ERROR_FUNCTION = throwable -> {
if (throwable instanceof StatusException) {
return new ErrorName(null, String.valueOf(((StatusException) throwable).getStatus().getCode().value()));
} else if (throwable instanceof StatusRuntimeException) {
return new ErrorName(null, String.valueOf(((StatusRuntimeException) throwable).getStatus().getCode().value()));
} else if (throwable instanceof GrpcException.GrpcServerException) {
return new ErrorName(null, String.valueOf(((GrpcException.GrpcServerException) throwable).getStatus().getCode().value()));
}
return DEFAULT_ERROR_FUNCTION.apply(throwable);
};

public GrpcOutboundRequest(LiveRequest<?, ?> request) {
super(request);
this.service = LiveDiscovery.getService(request.getPath());
Expand Down Expand Up @@ -134,5 +145,15 @@ public boolean hasEndpoint() {
LiveRouteResult result = request.getRouteResult();
return result != null && result.isSuccess();
}

/**
* Returns the default error name function.
*
* @return The default error name function.
*/
@Override
public Function<Throwable, ErrorName> getErrorFunction() {
return GRPC_ERROR_FUNCTION;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ class GrpcOutboundResponse extends AbstractRpcOutboundResponse<Object> implement

private final Status status;

private final boolean server;

public GrpcOutboundResponse(Object response) {
this(response, false);
}

public GrpcOutboundResponse(Object response, boolean isServer) {
super(response, null);
status = Status.OK;
server = isServer;
}

public GrpcOutboundResponse(ServiceError error, ErrorPredicate retryPredicate) {
Expand All @@ -43,6 +50,7 @@ public GrpcOutboundResponse(ServiceError error, ErrorPredicate retryPredicate) {
public GrpcOutboundResponse(ServiceError error, ErrorPredicate retryPredicate, Status status) {
super(null, error, retryPredicate);
this.status = status != null ? status : getStatus(error.getThrowable());
this.server = error != null && error.isServerError();
}

@Override
Expand All @@ -53,5 +61,9 @@ public String getCode() {
private Status getStatus(Throwable throwable) {
return throwable == null ? Status.INTERNAL : Status.fromThrowable(throwable);
}

public boolean isServer() {
return server;
}
}
}

0 comments on commit af6e3dc

Please sign in to comment.