From 7feb05059f4f10b0af6ed08096fdb8ac7f581f46 Mon Sep 17 00:00:00 2001 From: Reto Lehmann Date: Thu, 11 Jan 2024 17:29:56 +0100 Subject: [PATCH] Add defaulting for all GRPC probe types (#14773) * Add defaulting for all GRPC probe types * Add an E2E readiness test with GRPC probes * Rename function --- pkg/apis/serving/v1/revision_defaults.go | 21 ++- pkg/queue/health/probe_test.go | 10 +- .../revision/resources/deploy_test.go | 3 +- test/e2e/readiness_test.go | 43 +++++ test/test_images/grpc-ping/main.go | 3 + .../google.golang.org/grpc/health/client.go | 117 +++++++++++++ .../google.golang.org/grpc/health/logging.go | 23 +++ .../google.golang.org/grpc/health/server.go | 163 ++++++++++++++++++ vendor/modules.txt | 1 + 9 files changed, 370 insertions(+), 14 deletions(-) create mode 100644 vendor/google.golang.org/grpc/health/client.go create mode 100644 vendor/google.golang.org/grpc/health/logging.go create mode 100644 vendor/google.golang.org/grpc/health/server.go diff --git a/pkg/apis/serving/v1/revision_defaults.go b/pkg/apis/serving/v1/revision_defaults.go index 1599952c6d0d..5e5fecec0d61 100644 --- a/pkg/apis/serving/v1/revision_defaults.go +++ b/pkg/apis/serving/v1/revision_defaults.go @@ -132,7 +132,8 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont // If there are multiple containers then default probes will be applied to the container where user specified PORT // default probes will not be applied for non serving containers if len(rs.PodSpec.Containers) == 1 || len(container.Ports) != 0 { - rs.applyProbes(container) + rs.applyProbesWithDefaults(container) + rs.applyGRPCProbeDefaults(container) } if rs.PodSpec.EnableServiceLinks == nil && apis.IsInCreate(ctx) { @@ -153,7 +154,7 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont } } -func (*RevisionSpec) applyProbes(container *corev1.Container) { +func (*RevisionSpec) applyProbesWithDefaults(container *corev1.Container) { if container.ReadinessProbe == nil { container.ReadinessProbe = &corev1.Probe{} } @@ -164,10 +165,6 @@ func (*RevisionSpec) applyProbes(container *corev1.Container) { container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{} } - if container.ReadinessProbe.GRPC != nil && container.ReadinessProbe.GRPC.Service == nil { - container.ReadinessProbe.GRPC.Service = ptr.String("") - } - if container.ReadinessProbe.SuccessThreshold == 0 { container.ReadinessProbe.SuccessThreshold = 1 } @@ -183,6 +180,18 @@ func (*RevisionSpec) applyProbes(container *corev1.Container) { } } +func (*RevisionSpec) applyGRPCProbeDefaults(container *corev1.Container) { + if container.ReadinessProbe != nil && container.ReadinessProbe.GRPC != nil && container.ReadinessProbe.GRPC.Service == nil { + container.ReadinessProbe.GRPC.Service = ptr.String("") + } + if container.LivenessProbe != nil && container.LivenessProbe.GRPC != nil && container.LivenessProbe.GRPC.Service == nil { + container.LivenessProbe.GRPC.Service = ptr.String("") + } + if container.StartupProbe != nil && container.StartupProbe.GRPC != nil && container.StartupProbe.GRPC.Service == nil { + container.StartupProbe.GRPC.Service = ptr.String("") + } +} + // Upgrade SecurityContext for this container and the Pod definition to use settings // for the `restricted` profile when the feature flag is enabled. // This does not currently set `runAsNonRoot` for the restricted profile, because diff --git a/pkg/queue/health/probe_test.go b/pkg/queue/health/probe_test.go index d4e5eb298e63..3afbbc73eb04 100644 --- a/pkg/queue/health/probe_test.go +++ b/pkg/queue/health/probe_test.go @@ -35,7 +35,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" netheader "knative.dev/networking/pkg/http/header" - "knative.dev/pkg/ptr" ) func TestTCPProbe(t *testing.T) { @@ -270,7 +269,7 @@ func TestHTTPProbeResponseErrorFailure(t *testing.T) { } } -func TestGRPCProbeSuccessWithDefaultServiceName(t *testing.T) { +func TestGRPCProbeSuccess(t *testing.T) { // use ephemeral port to prevent port conflict lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -286,7 +285,7 @@ func TestGRPCProbeSuccessWithDefaultServiceName(t *testing.T) { }() assignedPort := lis.Addr().(*net.TCPAddr).Port - gRPCAction := newGRPCAction(t, assignedPort, "") + gRPCAction := newGRPCAction(t, assignedPort) config := GRPCProbeConfigOptions{ Timeout: time.Second, GRPCAction: gRPCAction, @@ -342,12 +341,11 @@ func newHTTPGetAction(t *testing.T, serverURL string) *corev1.HTTPGetAction { } } -func newGRPCAction(t *testing.T, port int, service string) *corev1.GRPCAction { +func newGRPCAction(t *testing.T, port int) *corev1.GRPCAction { t.Helper() return &corev1.GRPCAction{ - Port: int32(port), - Service: ptr.String(service), + Port: int32(port), } } diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index 008750bb7272..50564041fa55 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -350,8 +350,7 @@ func withGRPCReadinessProbe(port int) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ GRPC: &corev1.GRPCAction{ - Port: int32(port), - Service: nil, + Port: int32(port), }, }} } diff --git a/test/e2e/readiness_test.go b/test/e2e/readiness_test.go index 11a829830c9c..175ed58b1a18 100644 --- a/test/e2e/readiness_test.go +++ b/test/e2e/readiness_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" pkgTest "knative.dev/pkg/test" "knative.dev/pkg/test/spoof" + v1 "knative.dev/serving/pkg/apis/serving/v1" v1opts "knative.dev/serving/pkg/testing/v1" "knative.dev/serving/test" v1test "knative.dev/serving/test/v1" @@ -121,5 +122,47 @@ func TestReadinessPathAndQuery(t *testing.T) { ); err != nil { t.Fatalf("The endpoint %s for Route %s didn't serve the expected text %q: %v", url, names.Route, test.HelloWorldText, err) } +} + +func TestReadinessGRPCProbe(t *testing.T) { + t.Parallel() + + clients := Setup(t) + + names := test.ResourceNames{ + Service: test.ObjectNameForTest(t), + Image: test.GRPCPing, + } + + test.EnsureTearDown(t, clients, &names) + + t.Log("Creating a new Service") + + resources, err := v1test.CreateServiceReady(t, clients, &names, + v1opts.WithNamedPort("h2c"), + v1opts.WithReadinessProbe( + &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: v1.DefaultUserPort, + }, + }, + })) + if err != nil { + t.Fatalf("Failed to create initial Service: %v: %v", names.Service, err) + } + url := resources.Route.Status.URL.URL() + if _, err := pkgTest.CheckEndpointState( + context.Background(), + clients.KubeClient, + t.Logf, + url, + spoof.IsStatusOK, + "gRPCProbeReady", + test.ServingFlags.ResolvableDomain, + test.AddRootCAtoTransport(context.Background(), t.Logf, clients, test.ServingFlags.HTTPS), + ); err != nil { + t.Fatalf("The endpoint %s for Route %s didn't return success: %v", url, names.Route, err) + } } diff --git a/test/test_images/grpc-ping/main.go b/test/test_images/grpc-ping/main.go index ecfba249e9d0..bc5a455d7da6 100644 --- a/test/test_images/grpc-ping/main.go +++ b/test/test_images/grpc-ping/main.go @@ -27,6 +27,8 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "knative.dev/pkg/network" ping "knative.dev/serving/test/test_images/grpc-ping/proto" @@ -92,6 +94,7 @@ func main() { } g := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(g, health.NewServer()) ping.RegisterPingServiceServer(g, &server{}) handler := func(w http.ResponseWriter, r *http.Request) { diff --git a/vendor/google.golang.org/grpc/health/client.go b/vendor/google.golang.org/grpc/health/client.go new file mode 100644 index 000000000000..740745c45f63 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/client.go @@ -0,0 +1,117 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package health + +import ( + "context" + "fmt" + "io" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/status" +) + +var ( + backoffStrategy = backoff.DefaultExponential + backoffFunc = func(ctx context.Context, retries int) bool { + d := backoffStrategy.Backoff(retries) + timer := time.NewTimer(d) + select { + case <-timer.C: + return true + case <-ctx.Done(): + timer.Stop() + return false + } + } +) + +func init() { + internal.HealthCheckFunc = clientHealthCheck +} + +const healthCheckMethod = "/grpc.health.v1.Health/Watch" + +// This function implements the protocol defined at: +// https://github.com/grpc/grpc/blob/master/doc/health-checking.md +func clientHealthCheck(ctx context.Context, newStream func(string) (any, error), setConnectivityState func(connectivity.State, error), service string) error { + tryCnt := 0 + +retryConnection: + for { + // Backs off if the connection has failed in some way without receiving a message in the previous retry. + if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) { + return nil + } + tryCnt++ + + if ctx.Err() != nil { + return nil + } + setConnectivityState(connectivity.Connecting, nil) + rawS, err := newStream(healthCheckMethod) + if err != nil { + continue retryConnection + } + + s, ok := rawS.(grpc.ClientStream) + // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes. + if !ok { + setConnectivityState(connectivity.Ready, nil) + return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS) + } + + if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF { + // Stream should have been closed, so we can safely continue to create a new stream. + continue retryConnection + } + s.CloseSend() + + resp := new(healthpb.HealthCheckResponse) + for { + err = s.RecvMsg(resp) + + // Reports healthy for the LBing purposes if health check is not implemented in the server. + if status.Code(err) == codes.Unimplemented { + setConnectivityState(connectivity.Ready, nil) + return err + } + + // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED. + if err != nil { + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err)) + continue retryConnection + } + + // As a message has been received, removes the need for backoff for the next retry by resetting the try count. + tryCnt = 0 + if resp.Status == healthpb.HealthCheckResponse_SERVING { + setConnectivityState(connectivity.Ready, nil) + } else { + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status)) + } + } + } +} diff --git a/vendor/google.golang.org/grpc/health/logging.go b/vendor/google.golang.org/grpc/health/logging.go new file mode 100644 index 000000000000..83c6acf55ef6 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/logging.go @@ -0,0 +1,23 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package health + +import "google.golang.org/grpc/grpclog" + +var logger = grpclog.Component("health_service") diff --git a/vendor/google.golang.org/grpc/health/server.go b/vendor/google.golang.org/grpc/health/server.go new file mode 100644 index 000000000000..cce6312d77f9 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/server.go @@ -0,0 +1,163 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package health provides a service that exposes server's health and it must be +// imported to enable support for client-side health checks. +package health + +import ( + "context" + "sync" + + "google.golang.org/grpc/codes" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +// Server implements `service Health`. +type Server struct { + healthgrpc.UnimplementedHealthServer + mu sync.RWMutex + // If shutdown is true, it's expected all serving status is NOT_SERVING, and + // will stay in NOT_SERVING. + shutdown bool + // statusMap stores the serving status of the services this Server monitors. + statusMap map[string]healthpb.HealthCheckResponse_ServingStatus + updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus +} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{ + statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, + updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), + } +} + +// Check implements `service Health`. +func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if servingStatus, ok := s.statusMap[in.Service]; ok { + return &healthpb.HealthCheckResponse{ + Status: servingStatus, + }, nil + } + return nil, status.Error(codes.NotFound, "unknown service") +} + +// Watch implements `service Health`. +func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + service := in.Service + // update channel is used for getting service status updates. + update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) + s.mu.Lock() + // Puts the initial status to the channel. + if servingStatus, ok := s.statusMap[service]; ok { + update <- servingStatus + } else { + update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN + } + + // Registers the update channel to the correct place in the updates map. + if _, ok := s.updates[service]; !ok { + s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) + } + s.updates[service][stream] = update + defer func() { + s.mu.Lock() + delete(s.updates[service], stream) + s.mu.Unlock() + }() + s.mu.Unlock() + + var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 + for { + select { + // Status updated. Sends the up-to-date status to the client. + case servingStatus := <-update: + if lastSentStatus == servingStatus { + continue + } + lastSentStatus = servingStatus + err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) + if err != nil { + return status.Error(codes.Canceled, "Stream has ended.") + } + // Context done. Removes the update channel from the updates map. + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Stream has ended.") + } + } +} + +// SetServingStatus is called when need to reset the serving status of a service +// or insert a new service entry into the statusMap. +func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { + s.mu.Lock() + defer s.mu.Unlock() + if s.shutdown { + logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus) + return + } + + s.setServingStatusLocked(service, servingStatus) +} + +func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { + s.statusMap[service] = servingStatus + for _, update := range s.updates[service] { + // Clears previous updates, that are not sent to the client, from the channel. + // This can happen if the client is not reading and the server gets flow control limited. + select { + case <-update: + default: + } + // Puts the most recent update to the channel. + update <- servingStatus + } +} + +// Shutdown sets all serving status to NOT_SERVING, and configures the server to +// ignore all future status changes. +// +// This changes serving status for all services. To set status for a particular +// services, call SetServingStatus(). +func (s *Server) Shutdown() { + s.mu.Lock() + defer s.mu.Unlock() + s.shutdown = true + for service := range s.statusMap { + s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING) + } +} + +// Resume sets all serving status to SERVING, and configures the server to +// accept all future status changes. +// +// This changes serving status for all services. To set status for a particular +// services, call SetServingStatus(). +func (s *Server) Resume() { + s.mu.Lock() + defer s.mu.Unlock() + s.shutdown = false + for service := range s.statusMap { + s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1870ea71aa68..e2a5e241b3c8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -708,6 +708,7 @@ google.golang.org/grpc/credentials/insecure google.golang.org/grpc/encoding google.golang.org/grpc/encoding/proto google.golang.org/grpc/grpclog +google.golang.org/grpc/health google.golang.org/grpc/health/grpc_health_v1 google.golang.org/grpc/internal google.golang.org/grpc/internal/backoff