Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: factor out setup func #7909

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 53 additions & 153 deletions grpc/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestWaitForReadyTrue(t *testing.T) {
clk: clock.NewFake(),
waitForReady: true,
}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
conn, err := grpc.NewClient("localhost:19876", // random, probably unused port
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestWaitForReadyFalse(t *testing.T) {
clk: clock.NewFake(),
waitForReady: false,
}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
conn, err := grpc.NewClient("localhost:19876", // random, probably unused port
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
Expand All @@ -155,14 +155,14 @@ func TestWaitForReadyFalse(t *testing.T) {
}
}

// testServer is used to implement TestTimeouts, and will attempt to sleep for
// testTimeoutServer is used to implement TestTimeouts, and will attempt to sleep for
// the given amount of time (unless it hits a timeout or cancel).
type testServer struct {
type testTimeoutServer struct {
test_proto.UnimplementedChillerServer
}

// Chill implements ChillerServer.Chill
func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) {
func (s *testTimeoutServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) {
start := time.Now()
// Sleep for either the requested amount of time, or the context times out or
// is canceled.
Expand All @@ -176,42 +176,9 @@ func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_prot
}

func TestTimeouts(t *testing.T) {
// start server
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
port := lis.Addr().(*net.TCPAddr).Port

serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clock.NewFake())
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()

// make client
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clock.NewFake(),
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
c := test_proto.NewChillerClient(conn)
server := new(testTimeoutServer)
client, _, stop := setup(t, server, clock.NewFake())
defer stop()

testCases := []struct {
timeout time.Duration
Expand All @@ -225,7 +192,7 @@ func TestTimeouts(t *testing.T) {
t.Run(tc.timeout.String(), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tc.timeout)
defer cancel()
_, err := c.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second)})
_, err := client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second)})
if err == nil {
t.Fatal("Got no error, expected a timeout")
}
Expand All @@ -237,58 +204,22 @@ func TestTimeouts(t *testing.T) {
}

func TestRequestTimeTagging(t *testing.T) {
clk := clock.NewFake()
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Retrieve the concrete port numberthe system assigned our listener
port := lis.Addr().(*net.TCPAddr).Port

// Create a new ChillerServer
server := new(testTimeoutServer)
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
// Chill until ill
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()

// Dial the ChillerServer
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clk,
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
// Create a ChillerClient with the connection to the ChillerServer
c := test_proto.NewChillerClient(conn)
client, _, stop := setup(t, server, serverMetrics)
defer stop()

// Make an RPC request with the ChillerClient with a timeout higher than the
// requested ChillerServer delay so that the RPC completes normally
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := c.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second * 5)}); err != nil {
if _, err := client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second * 5)}); err != nil {
t.Fatalf("Unexpected error calling Chill RPC: %s", err)
}

// There should be one histogram sample in the serverInterceptor rpcLag stat
test.AssertMetricWithLabelsEquals(t, si.metrics.rpcLag, prometheus.Labels{}, 1)
test.AssertMetricWithLabelsEquals(t, serverMetrics.rpcLag, prometheus.Labels{}, 1)
}

func TestClockSkew(t *testing.T) {
Expand All @@ -298,32 +229,23 @@ func TestClockSkew(t *testing.T) {
clientClk := clock.NewFake()
clientClk.Set(time.Now())

// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
port := lis.Addr().(*net.TCPAddr).Port

// Start a gRPC server listening on that port
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, serverClk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
go func() { _ = s.Serve(lis) }()
defer s.Stop()
_, serverPort, stop := setup(t, &testTimeoutServer{}, serverClk)
defer stop()

// Start a gRPC client talking to the server
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{metrics: clientMetrics, clk: clientClk, timeout: time.Second}
conn, err := grpc.NewClient(
net.JoinHostPort("localhost", strconv.Itoa(port)),
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clientClk,
}
conn, err := grpc.NewClient(net.JoinHostPort("localhost", strconv.Itoa(serverPort)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary),
)
test.AssertNotError(t, err, "creating test client")
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}

client := test_proto.NewChillerClient(conn)

// Create a context with plenty of timeout
Expand Down Expand Up @@ -369,18 +291,15 @@ func (s *blockedServer) Chill(_ context.Context, _ *test_proto.Time) (*test_prot
}

func TestInFlightRPCStat(t *testing.T) {
clk := clock.NewFake()
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Retrieve the concrete port numberthe system assigned our listener
port := lis.Addr().(*net.TCPAddr).Port

// Create a new blockedServer to act as a ChillerServer
server := &blockedServer{}

metrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")

client, _, stop := setup(t, server, metrics)
defer stop()

// Increment the roadblock waitgroup - this will cause all chill RPCs to
// the server to block until we call Done()!
server.roadblock.Add(1)
Expand All @@ -391,43 +310,11 @@ func TestInFlightRPCStat(t *testing.T) {
numRPCs := 5
server.received.Add(numRPCs)

serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, server)
// Chill until ill
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()

// Dial the ChillerServer
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clk,
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
// Create a ChillerClient with the connection to the ChillerServer
c := test_proto.NewChillerClient(conn)

// Fire off a few RPCs. They will block on the blockedServer's roadblock wg
for range numRPCs {
go func() {
// Ignore errors, just chilllll.
_, _ = c.Chill(context.Background(), &test_proto.Time{})
_, _ = client.Chill(context.Background(), &test_proto.Time{})
}()
}

Expand All @@ -442,15 +329,15 @@ func TestInFlightRPCStat(t *testing.T) {
}

// We expect the inFlightRPCs gauge for the Chiller.Chill RPCs to be equal to numRPCs.
test.AssertMetricWithLabelsEquals(t, ci.metrics.inFlightRPCs, labels, float64(numRPCs))
test.AssertMetricWithLabelsEquals(t, metrics.inFlightRPCs, labels, float64(numRPCs))

// Unblock the blockedServer to let all of the Chiller.Chill RPCs complete
server.roadblock.Done()
// Sleep for a little bit to let all the RPCs complete
time.Sleep(1 * time.Second)

// Check the gauge value again
test.AssertMetricWithLabelsEquals(t, ci.metrics.inFlightRPCs, labels, 0)
test.AssertMetricWithLabelsEquals(t, metrics.inFlightRPCs, labels, 0)
}

func TestServiceAuthChecker(t *testing.T) {
Expand Down Expand Up @@ -541,7 +428,7 @@ func (s *testUserAgentServer) Chill(ctx context.Context, in *test_proto.Time) (*

func TestUserAgentMetadata(t *testing.T) {
server := new(testUserAgentServer)
client, stop := setup(t, server)
client, _, stop := setup(t, server)
defer stop()

testUA := "test UA"
Expand All @@ -557,13 +444,26 @@ func TestUserAgentMetadata(t *testing.T) {
}
}

func setup(t *testing.T, server test_proto.ChillerServer) (test_proto.ChillerClient, func()) {
// setup creates a server and client, returning the created client, the running server's port, and a stop function.
func setup(t *testing.T, server test_proto.ChillerServer, opts ...any) (test_proto.ChillerClient, int, func()) {
clk := clock.NewFake()
serverMetricsVal, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
clientMetricsVal, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")

for _, opt := range opts {
switch optTyped := opt.(type) {
case clock.FakeClock:
clk = optTyped
case clientMetrics:
clientMetricsVal = optTyped
case serverMetrics:
serverMetricsVal = optTyped
default:
t.Fatalf("setup called with unrecognize option %#v", t)
}
}
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
Expand All @@ -587,11 +487,11 @@ func setup(t *testing.T, server test_proto.ChillerServer) (test_proto.ChillerCli
metrics: clientMetricsVal,
clk: clock.NewFake(),
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
conn, err := grpc.NewClient(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
return test_proto.NewChillerClient(conn), s.Stop
return test_proto.NewChillerClient(conn), port, s.Stop
}
Loading