-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgrpcserver.go
71 lines (67 loc) · 2.46 KB
/
grpcserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package cloudrunner
import (
"context"
"fmt"
"log/slog"
"net"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// NewGRPCServer creates a new gRPC server preconfigured with middleware for request logging, tracing, etc.
func NewGRPCServer(ctx context.Context, opts ...grpc.ServerOption) *grpc.Server {
run, ok := getRunContext(ctx)
if !ok {
panic("cloudrunner.NewGRPCServer: must be called with a context from cloudrunner.Run")
}
serverOptions := []grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
run.loggerMiddleware.GRPCUnaryServerInterceptor, // adds context logger
run.traceMiddleware.GRPCServerUnaryInterceptor, // needs the context logger
run.requestLoggerMiddleware.GRPCUnaryServerInterceptor, // needs to run after trace
run.serverMiddleware.GRPCUnaryServerInterceptor, // needs to run after request logger
),
grpc.ChainStreamInterceptor(
run.loggerMiddleware.GRPCStreamServerInterceptor,
run.traceMiddleware.GRPCStreamServerInterceptor,
run.requestLoggerMiddleware.GRPCStreamServerInterceptor,
run.serverMiddleware.GRPCStreamServerInterceptor,
),
// For details on keepalive settings, see:
// https://github.com/grpc/grpc-go/blob/master/Documentation/keepalive.md
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
// If a client pings more than once every 30 seconds, terminate the connection
MinTime: 30 * time.Second,
// Allow pings even when there are no active streams
PermitWithoutStream: true,
}),
}
serverOptions = append(serverOptions, run.grpcServerOptions...)
serverOptions = append(serverOptions, opts...)
return grpc.NewServer(serverOptions...)
}
// ListenGRPC binds a listener on the configured port and listens for gRPC requests.
func ListenGRPC(ctx context.Context, grpcServer *grpc.Server) error {
run, ok := getRunContext(ctx)
if !ok {
return fmt.Errorf("cloudrunner.ListenGRPC: must be called with a context from cloudrunner.Run")
}
address := fmt.Sprintf(":%d", run.config.Runtime.Port)
listener, err := (&net.ListenConfig{}).Listen(
ctx,
"tcp",
address,
)
if err != nil {
return err
}
go func() {
<-ctx.Done()
slog.InfoContext(ctx, "gRPCServer shutting down")
grpcServer.GracefulStop()
}()
slog.InfoContext(ctx, "gRPCServer listening", slog.String("address", address))
return grpcServer.Serve(listener)
}