diff --git a/pkg/activator/net/helpers.go b/pkg/activator/net/helpers.go index f42b1479444a..b45ed8844e80 100644 --- a/pkg/activator/net/helpers.go +++ b/pkg/activator/net/helpers.go @@ -22,7 +22,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" - "knative.dev/networking/pkg/apis/networking" ) @@ -92,13 +91,13 @@ func endpointsToDests(endpoints *corev1.Endpoints, portName string) (ready, notR return ready, notReady } -// getServicePort takes a service and a protocol and returns the port number of +// getTargetPort takes a service and a protocol and returns the port number of // the port named for that protocol. If the port is not found then ok is false. -func getServicePort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) { +func getTargetPort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) { wantName := networking.ServicePortName(protocol) for _, p := range svc.Spec.Ports { if p.Name == wantName { - return int(p.Port), true + return p.TargetPort.IntValue(), true } } diff --git a/pkg/activator/net/helpers_test.go b/pkg/activator/net/helpers_test.go index 7e0cb0702f16..c2366cbd93ea 100644 --- a/pkg/activator/net/helpers_test.go +++ b/pkg/activator/net/helpers_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "knative.dev/networking/pkg/apis/networking" ) @@ -153,7 +154,7 @@ func TestEndpointsToDests(t *testing.T) { } } -func TestGetServicePort(t *testing.T) { +func TestGetTargetPort(t *testing.T) { for _, tc := range []struct { name string protocol networking.ProtocolType @@ -164,8 +165,8 @@ func TestGetServicePort(t *testing.T) { name: "Single port", protocol: networking.ProtocolHTTP1, ports: []corev1.ServicePort{{ - Name: "http", - Port: 100, + Name: "http", + TargetPort: intstr.FromInt(100), }}, expect: 100, expectOK: true, @@ -173,8 +174,8 @@ func TestGetServicePort(t *testing.T) { name: "Missing port", protocol: networking.ProtocolHTTP1, ports: []corev1.ServicePort{{ - Name: "invalid", - Port: 100, + Name: "invalid", + TargetPort: intstr.FromInt(100), }}, expect: 0, expectOK: false, @@ -186,7 +187,7 @@ func TestGetServicePort(t *testing.T) { }, } - port, ok := getServicePort(tc.protocol, &svc) + port, ok := getTargetPort(tc.protocol, &svc) if ok != tc.expectOK { t.Errorf("Wanted ok %v, got %v", tc.expectOK, ok) } diff --git a/pkg/activator/net/revision_backends.go b/pkg/activator/net/revision_backends.go index 83f60b1328aa..af8321c7b19f 100644 --- a/pkg/activator/net/revision_backends.go +++ b/pkg/activator/net/revision_backends.go @@ -48,6 +48,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/pkg/logging/logkey" + "knative.dev/pkg/network" "knative.dev/pkg/reconciler" "knative.dev/serving/pkg/apis/serving" revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision" @@ -62,9 +63,9 @@ import ( // ClusterIPDest will be set to non empty string and Dests will be nil. Otherwise Dests will be set // to a slice of healthy l4 dests for reaching the revision. type revisionDestsUpdate struct { - Rev types.NamespacedName - ClusterIPDest string - Dests sets.Set[string] + Rev types.NamespacedName + PrivateService string + Dests sets.Set[string] } type dests struct { @@ -91,7 +92,7 @@ const ( defaultProbeFrequency time.Duration = 200 * time.Millisecond ) -// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic +// revisionWatcher watches the podIPs/service of a revision. It implements the logic // to supply revisionDestsUpdate events on updateCh type revisionWatcher struct { stopCh <-chan struct{} @@ -103,8 +104,9 @@ type revisionWatcher struct { // Stores the list of pods that have been successfully probed. healthyPods sets.Set[string] - // Stores whether the service ClusterIP has been seen as healthy. - clusterIPHealthy bool + + // Stores whether the private k8s service has been seen as healthy. + privateServiceHealthy bool transport http.RoundTripper destsCh chan dests @@ -200,23 +202,22 @@ func (rw *revisionWatcher) probe(ctx context.Context, dest string) (pass bool, n return match, notMesh, err } -func (rw *revisionWatcher) getDest() (string, error) { - svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(names.PrivateService(rw.rev.Name)) +func (rw *revisionWatcher) getPrivateServiceDest() (string, error) { + svcName := names.PrivateService(rw.rev.Name) + svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(svcName) if err != nil { return "", err } - if svc.Spec.ClusterIP == "" { - return "", fmt.Errorf("private service %s/%s clusterIP is nil, this should never happen", svc.ObjectMeta.Namespace, svc.ObjectMeta.Name) - } - svcPort, ok := getServicePort(rw.protocol, svc) + svcHostname := network.GetServiceHostname(svcName, rw.rev.Namespace) + svcPort, ok := getTargetPort(rw.protocol, svc) if !ok { return "", fmt.Errorf("unable to find port in service %s/%s", svc.Namespace, svc.Name) } - return net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(svcPort)), nil + return net.JoinHostPort(svcHostname, strconv.Itoa(svcPort)), nil } -func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) { +func (rw *revisionWatcher) probePrivateService(dest string) (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), probeTimeout) defer cancel() match, _, err := rw.probe(ctx, dest) @@ -296,12 +297,12 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee return healthy, unchanged, sawNotMesh.Load(), err } -func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string]) { +func (rw *revisionWatcher) sendUpdate(privateService string, dests sets.Set[string]) { select { case <-rw.stopCh: return default: - rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, ClusterIPDest: clusterIP, Dests: dests} + rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, PrivateService: privateService, Dests: dests} } } @@ -310,9 +311,9 @@ func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string]) func (rw *revisionWatcher) checkDests(curDests, prevDests dests) { if len(curDests.ready) == 0 && len(curDests.notReady) == 0 { // We must have scaled down. - rw.clusterIPHealthy = false + rw.privateServiceHealthy = false rw.healthyPods = nil - rw.logger.Debug("ClusterIP is no longer healthy.") + rw.logger.Debug("Private service is no longer healthy.") // Send update that we are now inactive (both params invalid). rw.sendUpdate("", nil) return @@ -351,7 +352,7 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) { // Note: it's important that this copies (via hs.Union) the healthy pods // set before sending the update to avoid concurrent modifications // affecting the throttler, which iterates over the set. - rw.sendUpdate("" /*clusterIP*/, hs.Union(nil)) + rw.sendUpdate("", hs.Union(nil)) return } // no-op, and we have successfully probed at least one pod. @@ -380,28 +381,28 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) { // If we failed to probe even a single pod, check the clusterIP. // NB: We can't cache the IP address, since user might go rogue // and delete the K8s service. We'll fix it, but the cluster IP will be different. - dest, err := rw.getDest() + dest, err := rw.getPrivateServiceDest() if err != nil { rw.logger.Errorw("Failed to determine service destination", zap.Error(err)) return } - // If cluster IP is healthy and we haven't scaled down, short circuit. - if rw.clusterIPHealthy { - rw.logger.Debugf("ClusterIP %s already probed (ready backends: %d)", dest, len(curDests.ready)) + // If service hostname is healthy and we haven't scaled down, short circuit. + if rw.privateServiceHealthy { + rw.logger.Debugf("service hostname %s already probed (ready backends: %d)", dest, len(curDests.ready)) rw.sendUpdate(dest, curDests.ready) return } - // If clusterIP is healthy send this update and we are done. - if ok, err := rw.probeClusterIP(dest); err != nil { - rw.logger.Errorw("Failed to probe clusterIP "+dest, zap.Error(err)) + // If service via hostname is healthy send this update and we are done. + if ok, err := rw.probePrivateService(dest); err != nil { + rw.logger.Errorw("Failed to probe private service: "+dest, zap.Error(err)) } else if ok { // We can reach here only iff pods are not successfully individually probed - // but ClusterIP conversely has been successfully probed. + // but PrivateService conversely has been successfully probed. rw.podsAddressable = false - rw.logger.Debugf("ClusterIP is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready)) - rw.clusterIPHealthy = true + rw.logger.Debugf("Private service is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready)) + rw.privateServiceHealthy = true rw.healthyPods = nil rw.sendUpdate(dest, curDests.ready) } @@ -421,8 +422,8 @@ func (rw *revisionWatcher) run(probeFrequency time.Duration) { // then we want to probe on timer. rw.logger.Debugw("Revision state", zap.Object("dests", curDests), zap.Object("healthy", logging.StringSet(rw.healthyPods)), - zap.Bool("clusterIPHealthy", rw.clusterIPHealthy)) - if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.clusterIPHealthy || + zap.Bool("clusterHealthy", rw.privateServiceHealthy)) + if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.privateServiceHealthy || curDests.ready.Union(curDests.notReady).Equal(rw.healthyPods)) { rw.logger.Debug("Probing on timer") tickCh = timer.C diff --git a/pkg/activator/net/revision_backends_test.go b/pkg/activator/net/revision_backends_test.go index 1f9e59b10ae2..ace87832be13 100644 --- a/pkg/activator/net/revision_backends_test.go +++ b/pkg/activator/net/revision_backends_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" pkgnet "knative.dev/networking/pkg/apis/networking" @@ -88,14 +89,14 @@ func revision(revID types.NamespacedName, protocol pkgnet.ProtocolType, cc int64 return r } -func privateSKSService(revID types.NamespacedName, clusterIP string, ports []corev1.ServicePort) *corev1.Service { +func privateSKSService(revID types.NamespacedName, ports []corev1.ServicePort) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: revID.Namespace, Name: names.PrivateService(revID.Name), }, Spec: corev1.ServiceSpec{ - ClusterIP: clusterIP, + ClusterIP: corev1.ClusterIPNone, Ports: ports, }, } @@ -124,7 +125,7 @@ func TestRevisionWatcher(t *testing.T) { dests dests protocol pkgnet.ProtocolType clusterPort corev1.ServicePort - clusterIP string + privateService string expectUpdates []revisionDestsUpdate probeHostResponses map[string][]activatortest.FakeResponse initialClusterIPState bool @@ -138,8 +139,8 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ "128.0.0.1:1234": {{ Code: http.StatusOK, @@ -153,8 +154,8 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ "128.0.0.1:1234": {{ Code: http.StatusOK, @@ -169,10 +170,10 @@ func TestRevisionWatcher(t *testing.T) { Name: "http2", Port: 1234, }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -181,18 +182,20 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "single http2 clusterIP", + name: "single http2 hitting private service", dests: dests{ready: sets.New("128.0.0.1:1234"), notReady: sets.New("128.0.0.2:1234")}, protocol: pkgnet.ProtocolH2C, clusterPort: corev1.ServicePort{ - Name: "http2", - Port: 1234, + Name: "http2", + TargetPort: intstr.FromInt(1234), }, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", noPodAddressability: true, - expectUpdates: []revisionDestsUpdate{{ClusterIPDest: "129.0.0.1:1234", Dests: sets.New("128.0.0.1:1234")}}, + expectUpdates: []revisionDestsUpdate{ + {PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1234", Dests: sets.New("128.0.0.1:1234")}, + }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Code: http.StatusOK, Body: queue.Name, }}, @@ -204,13 +207,13 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "no pods", - dests: dests{}, - clusterIP: "129.0.0.1", + name: "no pods", + dests: dests{}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", }, { name: "no pods, was happy", dests: dests{}, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", initialClusterIPState: true, }, { name: "single unavailable podIP", @@ -219,9 +222,9 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Code: http.StatusServiceUnavailable, }}, "128.0.0.1:1234": {{ @@ -235,9 +238,9 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Code: http.StatusServiceUnavailable, }}, "128.0.0.1:1234": {{ @@ -248,13 +251,13 @@ func TestRevisionWatcher(t *testing.T) { name: "podIP slow ready", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1234, + Name: "http", + TargetPort: intstr.FromInt(1234), }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -271,8 +274,8 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234", "128.0.0.2:1234", "128.0.0.3:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.1:1234", "128.0.0.2:1234", "128.0.0.3:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ "128.0.0.1:1234": {{ Code: http.StatusOK, @@ -294,8 +297,8 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 1234, }, - clusterIP: "129.0.0.1", - expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.2:1234")}}, + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + expectUpdates: []revisionDestsUpdate{{Dests: sets.New("128.0.0.2:1234")}}, probeHostResponses: map[string][]activatortest.FakeResponse{ "128.0.0.1:1234": {{ Err: errors.New("clusterIP transport error"), @@ -312,7 +315,7 @@ func TestRevisionWatcher(t *testing.T) { Name: "http", Port: 4321, }, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", expectUpdates: []revisionDestsUpdate{ {Dests: sets.New("128.0.0.2:1234")}, {Dests: sets.New("128.0.0.2:1234", "128.0.0.1:1234")}, @@ -333,20 +336,20 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "clusterIP slow ready, no pod addressability", + name: "private service slow to ready, no pod addressability", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1234, + Name: "http", + TargetPort: intstr.FromInt(1234), }, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", expectUpdates: []revisionDestsUpdate{{ - ClusterIPDest: "129.0.0.1:1234", - Dests: sets.New("128.0.0.1:1234"), + PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + Dests: sets.New("128.0.0.1:1234"), }}, noPodAddressability: true, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Err: errors.New("clusterIP transport error"), }, { Code: http.StatusOK, @@ -359,20 +362,20 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "clusterIP ready, no pod addressability", + name: "private service ready, no pod addressability", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: true, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1235", expectUpdates: []revisionDestsUpdate{{ - ClusterIPDest: "129.0.0.1:1235", - Dests: sets.New("128.0.0.1:1234"), + PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1235", + Dests: sets.New("128.0.0.1:1234"), }}, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1235": {{ Code: http.StatusOK, Body: queue.Name, }}, @@ -381,19 +384,19 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "clusterIP ready, pod fails with non-mesh error then succeeds", + name: "private service ready, pod fails with non-mesh error then succeeds", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: false, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", expectUpdates: []revisionDestsUpdate{ {Dests: sets.New("128.0.0.1:1234")}, }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ // ClusterIP is healthy, but should not be used. Code: http.StatusOK, Body: queue.Name, @@ -406,16 +409,16 @@ func TestRevisionWatcher(t *testing.T) { }}, }, }, { - name: "passthrough lb, clusterIP ready but no fallback", + name: "passthrough lb, private service ready but no fallback", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: true, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1234", probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Code: http.StatusOK, Body: queue.Name, }}, @@ -425,16 +428,16 @@ func TestRevisionWatcher(t *testing.T) { }, usePassthroughLb: true, }, { - name: "mesh mode enabled: pod ready but should still use cluster IP", + name: "mesh mode enabled: pod ready but should still use private service", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: true, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1235", expectUpdates: []revisionDestsUpdate{ - {ClusterIPDest: "129.0.0.1:1235", Dests: sets.New("128.0.0.1:1234")}, + {PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1235", Dests: sets.New("128.0.0.1:1234")}, }, meshMode: netcfg.MeshCompatibilityModeEnabled, probeHostResponses: map[string][]activatortest.FakeResponse{ @@ -443,7 +446,7 @@ func TestRevisionWatcher(t *testing.T) { Code: http.StatusOK, Body: queue.Name, }}, - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1235": {{ Code: http.StatusOK, Body: queue.Name, }}, @@ -452,17 +455,17 @@ func TestRevisionWatcher(t *testing.T) { name: "mesh mode disabled: pod initially returns mesh-compatible error, but don't fallback", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: false, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1235", expectUpdates: []revisionDestsUpdate{ {Dests: sets.New("128.0.0.1:1234")}, }, meshMode: netcfg.MeshCompatibilityModeDisabled, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1235": {{ // Cluster IP healthy, but should not be used. Code: http.StatusOK, Body: queue.Name, @@ -480,11 +483,11 @@ func TestRevisionWatcher(t *testing.T) { name: "ready pod in k8s api when mesh-compat disabled", dests: dests{ready: sets.New("128.0.0.1:1234")}, clusterPort: corev1.ServicePort{ - Name: "http", - Port: 1235, + Name: "http", + TargetPort: intstr.FromInt(1235), }, noPodAddressability: false, - clusterIP: "129.0.0.1", + privateService: "test-revision-private.test-namespace.svc.cluster.local:1235", expectUpdates: []revisionDestsUpdate{ {Dests: sets.New("128.0.0.1:1234")}, }, @@ -526,11 +529,9 @@ func TestRevisionWatcher(t *testing.T) { informer := fakeserviceinformer.Get(ctx) revID := types.NamespacedName{Namespace: testNamespace, Name: testRevision} - if tc.clusterIP != "" { - svc := privateSKSService(revID, tc.clusterIP, []corev1.ServicePort{tc.clusterPort}) - fake.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) - informer.Informer().GetIndexer().Add(svc) - } + svc := privateSKSService(revID, []corev1.ServicePort{tc.clusterPort}) + fake.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + informer.Informer().GetIndexer().Add(svc) waitInformers, err := rtesting.RunAndSyncInformers(ctx, informer.Informer()) if err != nil { @@ -554,7 +555,7 @@ func TestRevisionWatcher(t *testing.T) { tc.meshMode, true, logger) - rw.clusterIPHealthy = tc.initialClusterIPState + rw.privateServiceHealthy = tc.initialClusterIPState var wg sync.WaitGroup wg.Add(1) @@ -661,11 +662,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", - []corev1.ServicePort{{Name: "http", Port: 1234}}), + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + []corev1.ServicePort{{Name: "http", TargetPort: intstr.FromInt(1234)}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -689,11 +690,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolH2C), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", - []corev1.ServicePort{{Name: "http2", Port: 1234}}), + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + []corev1.ServicePort{{Name: "http2", TargetPort: intstr.FromInt(1234)}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -721,9 +722,9 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: "test-revision2"}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: "test-revision1"}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: "test-revision1"}, []corev1.ServicePort{{Name: "http", Port: 2345}}), - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: "test-revision2"}, "129.0.0.2", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: "test-revision2"}, []corev1.ServicePort{{Name: "http", Port: 2345}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ @@ -746,11 +747,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, []corev1.ServicePort{{Name: "http", Port: 1234}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ // Should not succeed by hitting this cluster IP. + "test-revision-private.test-namespace.svc.cluster.local": {{ // Should not succeed by hitting this cluster IP. Code: http.StatusOK, Body: queue.Name, }}, @@ -767,11 +768,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", - []corev1.ServicePort{{Name: "http", Port: 1234}}), + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + []corev1.ServicePort{{Name: "http", TargetPort: intstr.FromInt(1234)}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local:1234": {{ Code: http.StatusOK, Body: queue.Name, }}, @@ -781,8 +782,8 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { }, expectDests: map[types.NamespacedName]revisionDestsUpdate{ {Namespace: testNamespace, Name: testRevision}: { - ClusterIPDest: "129.0.0.1:1234", - Dests: sets.New("128.0.0.1:1234"), + PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + Dests: sets.New("128.0.0.1:1234"), }, }, updateCnt: 1, @@ -793,11 +794,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, []corev1.ServicePort{{Name: "http", Port: 1234}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -813,11 +814,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, []corev1.ServicePort{{Name: "http", Port: 1234}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -844,11 +845,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { }), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, []corev1.ServicePort{{Name: "http", Port: 1234}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -871,11 +872,11 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { }), }, services: []*corev1.Service{ - privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, "129.0.0.1", + privateSKSService(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, []corev1.ServicePort{{Name: "http", Port: 1234}}), }, probeHostResponses: map[string][]activatortest.FakeResponse{ - "129.0.0.1:1234": {{ + "test-revision-private.test-namespace.svc.cluster.local": {{ Err: errors.New("clusterIP transport error"), }}, "128.0.0.1:1234": {{ @@ -970,7 +971,6 @@ func TestCheckDestsReadyToNotReady(t *testing.T) { svc := privateSKSService( types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - "129.0.0.1", []corev1.ServicePort{{Name: "http", Port: 1234}}, ) fakekubeclient.Get(ctx).CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) @@ -1028,7 +1028,7 @@ func TestCheckDestsReadyToNotReady(t *testing.T) { uCh := make(chan revisionDestsUpdate, 1) dCh := make(chan struct{}) rw := &revisionWatcher{ - clusterIPHealthy: true, + privateServiceHealthy: true, podsAddressable: true, rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, updateCh: uCh, @@ -1113,7 +1113,6 @@ func TestCheckDests(t *testing.T) { svc := privateSKSService( types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - "129.0.0.1", []corev1.ServicePort{{Name: "http", Port: 1234}}, ) fakekubeclient.Get(ctx).CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) @@ -1133,7 +1132,7 @@ func TestCheckDests(t *testing.T) { uCh := make(chan revisionDestsUpdate, 1) dCh := make(chan struct{}) rw := &revisionWatcher{ - clusterIPHealthy: true, + privateServiceHealthy: true, podsAddressable: false, rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, updateCh: uCh, @@ -1173,7 +1172,6 @@ func TestCheckDestsSwinging(t *testing.T) { svc := privateSKSService( types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - "10.5.0.1", []corev1.ServicePort{{Name: "http", Port: 1234}}, ) @@ -1248,9 +1246,9 @@ func TestCheckDestsSwinging(t *testing.T) { // First not ready, second good, clusterIP: not ready. rw.checkDests(dests{ready: sets.New("10.0.0.1:1234", "10.0.0.2:1234")}, emptyDests()) want := revisionDestsUpdate{ - Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - ClusterIPDest: "", - Dests: sets.New("10.0.0.2:1234"), + Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + PrivateService: "", + Dests: sets.New("10.0.0.2:1234"), } select { @@ -1361,7 +1359,6 @@ func TestRevisionDeleted(t *testing.T) { svc := privateSKSService( types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - "129.0.0.1", []corev1.ServicePort{{Name: "http", Port: 1234}}, ) fakekubeclient.Get(ctx).CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) @@ -1477,7 +1474,6 @@ func TestServiceMoreThanOne(t *testing.T) { for _, num := range []string{"11", "12"} { svc := privateSKSService( types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - "129.0.0."+num, []corev1.ServicePort{{Name: "http", Port: 1234}}, ) // Modify the name so both can be created. diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index df6793f3e02a..211ecd71f4c8 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -407,12 +407,12 @@ func assignSlice(trackers []*podTracker, selfIndex, numActivators, cc int) []*po // to lock on updating concurrency / trackers func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { rt.logger.Debugw("Handling update", - zap.String("ClusterIP", update.ClusterIPDest), zap.Object("dests", logging.StringSet(update.Dests))) + zap.String("private service", update.PrivateService), zap.Object("dests", logging.StringSet(update.Dests))) - // ClusterIP is not yet ready, so we want to send requests directly to the pods. + // PrivateService is not yet ready, so we want to send requests directly to the pods. // NB: this will not be called in parallel, thus we can build a new podTrackers // array before taking out a lock. - if update.ClusterIPDest == "" { + if update.PrivateService == "" { // Create a map for fast lookup of existing trackers. trackersMap := make(map[string]*podTracker, len(rt.podTrackers)) for _, tracker := range rt.podTrackers { @@ -443,7 +443,7 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { return } - rt.updateThrottlerState(len(update.Dests), nil /*trackers*/, newPodTracker(update.ClusterIPDest, nil)) + rt.updateThrottlerState(len(update.Dests), nil /*trackers*/, newPodTracker(update.PrivateService, nil)) } // Throttler load balances requests to revisions based on capacity. When `Run` is called it listens for diff --git a/pkg/activator/net/throttler_test.go b/pkg/activator/net/throttler_test.go index ded1a9363d56..0d74c282ef2a 100644 --- a/pkg/activator/net/throttler_test.go +++ b/pkg/activator/net/throttler_test.go @@ -19,6 +19,7 @@ package net import ( "context" "errors" + "fmt" "strconv" "sync" "testing" @@ -45,6 +46,7 @@ import ( fakerevisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision/fake" "knative.dev/serving/pkg/networking" "knative.dev/serving/pkg/queue" + "knative.dev/serving/pkg/reconciler/serverlessservice/resources/names" ) var testBreakerParams = queue.BreakerParams{ @@ -359,9 +361,9 @@ func TestThrottlerErrorOneTimesOut(t *testing.T) { throttler := newTestThrottler(ctx) throttler.handleUpdate(revisionDestsUpdate{ - Rev: revID, - ClusterIPDest: "129.0.0.1:1234", - Dests: sets.New("128.0.0.1:1234"), + Rev: revID, + PrivateService: fmt.Sprintf("%s.%s:1234", names.PrivateService(testRevision), testNamespace), + Dests: sets.New("128.0.0.1:1234"), }) // Send 2 requests, one should time out. @@ -429,18 +431,18 @@ func TestThrottlerSuccesses(t *testing.T) { requests: 1, wantDests: sets.New("128.0.0.1:1234"), }, { - name: "single healthy clusterIP", + name: "single healthy private service", revision: revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), initUpdates: []revisionDestsUpdate{{ Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, Dests: sets.New("128.0.0.1:1234", "128.0.0.2:1234"), }, { - Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - ClusterIPDest: "129.0.0.1:1234", - Dests: sets.New("128.0.0.1:1234"), + Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + Dests: sets.New("128.0.0.1:1234"), }}, requests: 1, - wantDests: sets.New("129.0.0.1:1234"), + wantDests: sets.New("test-revision-private.test-namespace.svc.cluster.local:1234"), }, { name: "spread podIP load", revision: revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), @@ -475,15 +477,15 @@ func TestThrottlerSuccesses(t *testing.T) { // All three IP addresses should be used if cc>3. wantDests: sets.New("128.0.0.1:1234", "128.0.0.2:1234", "211.212.213.214"), }, { - name: "multiple ClusterIP requests", + name: "multiple private service requests", revision: revisionCC1(types.NamespacedName{Namespace: testNamespace, Name: testRevision}, pkgnet.ProtocolHTTP1), initUpdates: []revisionDestsUpdate{{ - Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, - ClusterIPDest: "129.0.0.1:1234", - Dests: sets.New("128.0.0.1:1234", "128.0.0.2:1234"), + Rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + PrivateService: "test-revision-private.test-namespace.svc.cluster.local:1234", + Dests: sets.New("128.0.0.1:1234", "128.0.0.2:1234"), }}, requests: 2, - wantDests: sets.New("129.0.0.1:1234"), + wantDests: sets.New("test-revision-private.test-namespace.svc.cluster.local:1234"), }} { t.Run(tc.name, func(t *testing.T) { ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t) @@ -623,9 +625,9 @@ func TestPodAssignmentFinite(t *testing.T) { throttler.revisionThrottlers[revName] = rt update := revisionDestsUpdate{ - Rev: revName, - ClusterIPDest: "", - Dests: sets.New("ip4", "ip3", "ip5", "ip2", "ip1", "ip0"), + Rev: revName, + PrivateService: "", + Dests: sets.New("ip4", "ip3", "ip5", "ip2", "ip1", "ip0"), } // This should synchronously update throughout the system. // And now we can inspect `rt`. @@ -673,9 +675,9 @@ func TestPodAssignmentInfinite(t *testing.T) { throttler.revisionThrottlers[revName] = rt update := revisionDestsUpdate{ - Rev: revName, - ClusterIPDest: "", - Dests: sets.New("ip3", "ip2", "ip1"), + Rev: revName, + PrivateService: "", + Dests: sets.New("ip3", "ip2", "ip1"), } // This should synchronously update throughout the system. // And now we can inspect `rt`.