diff --git a/cmd/gardener-custom-metrics/main.go b/cmd/gardener-custom-metrics/main.go index 5ac2e907..6652e596 100644 --- a/cmd/gardener-custom-metrics/main.go +++ b/cmd/gardener-custom-metrics/main.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap/zapcore" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/component-base/logs" + "k8s.io/component-base/version" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" kmgr "sigs.k8s.io/controller-runtime/pkg/manager" @@ -21,7 +22,6 @@ import ( "github.com/gardener/gardener-custom-metrics/pkg/metrics_provider" gutil "github.com/gardener/gardener-custom-metrics/pkg/util/gardener" k8sclient "github.com/gardener/gardener-custom-metrics/pkg/util/k8s/client" - "github.com/gardener/gardener-custom-metrics/pkg/version" ) func main() { @@ -79,7 +79,7 @@ func completeAppCLIOptions( // Create log log := initLogs(ctx, appOptions.Completed().LogLevel) - log.V(app.VerbosityInfo).Info("Initializing", "version", version.Version) + log.V(app.VerbosityInfo).Info("Initializing", "version", version.Get().GitVersion) // Create manager log.V(app.VerbosityInfo).Info("Creating client set") diff --git a/pkg/ha/ha_service.go b/pkg/ha/ha_service.go index 808d4e97..d5f2dc1c 100644 --- a/pkg/ha/ha_service.go +++ b/pkg/ha/ha_service.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" ctlmgr "sigs.k8s.io/controller-runtime/pkg/manager" @@ -93,6 +94,7 @@ func (ha *HAService) Start(ctx context.Context) error { select { case <-ctx.Done(): + _ = ha.cleanUp() return fmt.Errorf("starting HA service: %w", ctx.Err()) case <-ha.testIsolation.TimeAfter(retryPeriod): } @@ -103,5 +105,70 @@ func (ha *HAService) Start(ctx context.Context) error { } } - return nil + <-ctx.Done() + err := ha.cleanUp() + if err == nil { + err = ctx.Err() + } + return err +} + +// cleanUp is executed upon ending leadership. Its purpose is to remove the Endpoints object created upon acquiring +// leadership. +func (ha *HAService) cleanUp() error { + // Use our own context. This function executes when the main application context is closed. + // Also, try to finish before a potential 15 seconds termination grace timeout. + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) + defer cancel() + seedClient := ha.manager.GetClient() + + attempt := 0 + var err error + for { + endpoints := corev1.Endpoints{} + err = seedClient.Get(ctx, client.ObjectKey{Namespace: ha.namespace, Name: app.Name}, &endpoints) + if err != nil { + if apierrors.IsNotFound(err) { + ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded: the object was missing") + return nil + } + + ha.log.V(app.VerbosityInfo).Info("Failed to retrieve the endpoints object", "error", err.Error()) + } else { + // Avoid data race. We don't want to delete the endpoint if it is sending traffic to a replica other than this one. + isEndpointStillPointingToOurReplica := + len(endpoints.Subsets) == 1 && + len(endpoints.Subsets[0].Addresses) == 1 && + endpoints.Subsets[0].Addresses[0].IP == ha.servingIPAddress && + len(endpoints.Subsets[0].Ports) == 1 && + endpoints.Subsets[0].Ports[0].Port == int32(ha.servingPort) && + endpoints.Subsets[0].Ports[0].Protocol == corev1.ProtocolTCP + if !isEndpointStillPointingToOurReplica { + // Someone else is using the endpoint. We can't perform safe cleanup. Abandon the object. + ha.log.V(app.VerbosityWarning).Info( + "Abandoning endpoints object because it was modified by an external actor") + return nil + } + + // Only delete the endpoint if it is the resource version for which we confirmed that it points to us. + deletionPrecondition := client.Preconditions{UID: &endpoints.UID, ResourceVersion: &endpoints.ResourceVersion} + err = seedClient.Delete(ctx, &endpoints, deletionPrecondition) + if client.IgnoreNotFound(err) == nil { + // The endpoint was deleted (even if not by us). We call that successful cleanup. + ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded") + return nil + } + ha.log.V(app.VerbosityInfo).Info("Failed to delete the endpoints object", "error", err.Error()) + } + + // Deletion request failed, possibly because of a midair collision. Wait a bit and retry. + attempt++ + if attempt >= 10 { + break + } + time.Sleep(1 * time.Second) + } + + ha.log.V(app.VerbosityError).Error(err, "All retries to delete the endpoints object failed. Abandoning object.") + return fmt.Errorf("HAService cleanup: deleting endponts object: retrying failed, last error: %w", err) } diff --git a/pkg/ha/ha_service_test.go b/pkg/ha/ha_service_test.go index 6290313c..f63c9772 100644 --- a/pkg/ha/ha_service_test.go +++ b/pkg/ha/ha_service_test.go @@ -2,15 +2,16 @@ package ha import ( "context" - "sync/atomic" - "time" - + "fmt" "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kclient "sigs.k8s.io/controller-runtime/pkg/client" + "sync/atomic" + "time" "github.com/gardener/gardener-custom-metrics/pkg/app" "github.com/gardener/gardener-custom-metrics/pkg/util/testutil" @@ -22,121 +23,154 @@ var _ = Describe("HAService", func() { testIPAddress = "1.2.3.4" testPort = 777 ) - - Describe("Start", func() { - It("should set the respective service endpoints ", func() { - // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) - - endpoints := &corev1.Endpoints{ + // Helper functions + var ( + makeEmptyEndpointsObject = func(namespace string) *corev1.Endpoints { + return &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: app.Name, - Namespace: ha.namespace, + Namespace: namespace, }, } - Expect(ha.manager.GetClient().Create(context.Background(), endpoints)).To(Succeed()) + } + arrange = func() (*HAService, *testutil.FakeManager, context.Context, context.CancelFunc) { + manager := testutil.NewFakeManager() + ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ctx, cancel := context.WithCancel(context.Background()) + return ha, manager, ctx, cancel + } + createEndpointsObjectOnServer = func(namespace string, client kclient.Client) { + endpoints := makeEmptyEndpointsObject(namespace) + Expect(client.Create(context.Background(), endpoints)).To(Succeed()) + } + waitGetChangedEndpoints = func(ha *HAService, actualEndpoints *corev1.Endpoints) error { + // This function returns nil if the endpoints object exists and has changed from its initial, unpopulated state + + err := ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints) + if err != nil { + return err + } + if actualEndpoints.Subsets == nil { + return fmt.Errorf("endpoiints object not populated") + } + return nil + } + expectEndpointsPopulated = func(actualEndpoints *corev1.Endpoints) { + Expect(actualEndpoints.Labels).NotTo(BeNil()) + Expect(actualEndpoints.Labels["app"]).To(Equal(app.Name)) + Expect(actualEndpoints.Subsets).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Addresses).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) + Expect(actualEndpoints.Subsets[0].Ports).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Ports[0].Port).To(Equal(int32(testPort))) + } + ) + + Describe("Start", func() { + It("should create/update the respective service endpoints object ", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + var err error // Act - err := ha.Start(context.Background()) + go func() { + err = ha.Start(ctx) + }() // Assert - Expect(err).To(Succeed()) - actual := corev1.Endpoints{} - manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(actual.Labels).NotTo(BeNil()) - Expect(actual.Labels["app"]).To(Equal(app.Name)) - Expect(actual.Subsets).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) - Expect(actual.Subsets[0].Ports).To(HaveLen(1)) - Expect(actual.Subsets[0].Ports[0].Port).To(Equal(int32(testPort))) + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + Expect(err).NotTo(HaveOccurred()) + expectEndpointsPopulated(actualEndpoints) }) - It("should wait and retry with exponential backoff, if the service endpoints are missing, and succeed "+ - "once they appear", func() { - + It("should immediately abort retrying, if the context gets canceled", func() { // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, manager, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + timeAfterChan := make(chan time.Time) - var timeAfterDuration atomic.Int64 ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { - timeAfterDuration.Store(int64(duration)) return timeAfterChan } - var err error - var isComplete atomic.Bool // Act and assert go func() { - err = ha.Start(context.Background()) + err = ha.Start(ctx) isComplete.Store(true) }() - Consistently(isComplete.Load).Should(BeFalse()) - Expect(timeAfterDuration.Load()).To(Equal(int64(1 * time.Second))) - timeAfterChan <- time.Now() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. Consistently(isComplete.Load).Should(BeFalse()) - Expect(timeAfterDuration.Load()).To(Equal(int64(2 * time.Second))) - - endpoints := &corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: app.Name, - Namespace: ha.namespace, - }, - } - Expect(ha.manager.GetClient().Create(context.Background(), endpoints)).To(Succeed()) - - timeAfterChan <- time.Now() + cancel() Eventually(isComplete.Load).Should(BeTrue()) - Expect(err).To(Succeed()) + Expect(err).To(MatchError(ContainSubstring("canceled"))) actual := corev1.Endpoints{} - manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(actual.Subsets).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) + err = manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) + Expect(err).To(HaveOccurred()) }) - It("should immediately abort retrying, if the context gets canceled", func() { + It("should wait and retry with exponential backoff, if the service endpoints are missing, and succeed "+ + "once they appear", func() { + // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool timeAfterChan := make(chan time.Time) + var timeAfterDuration atomic.Int64 ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { + timeAfterDuration.Store(int64(duration)) return timeAfterChan } - var err error - var isComplete atomic.Bool - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Act and assert go func() { err = ha.Start(ctx) isComplete.Store(true) }() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. + Consistently(isComplete.Load).Should(BeFalse()) + Expect(timeAfterDuration.Load()).To(Equal(int64(1 * time.Second))) + timeAfterChan <- time.Now() Consistently(isComplete.Load).Should(BeFalse()) + Expect(timeAfterDuration.Load()).To(Equal(int64(2 * time.Second))) - cancel() - Eventually(isComplete.Load).Should(BeTrue()) - Expect(err).To(MatchError(ContainSubstring("canceled"))) - actual := corev1.Endpoints{} - err = manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(err).To(HaveOccurred()) + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + timeAfterChan <- time.Now() + + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + Expect(err).To(Succeed()) + expectEndpointsPopulated(actualEndpoints) }) It("should use exponential backoff", func() { - // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, _, ctx, cancel := arrange() + defer cancel() timeAfterChan := make(chan time.Time) var timeAfterDuration atomic.Int64 ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { @@ -146,9 +180,14 @@ var _ = Describe("HAService", func() { // Act and assert go func() { - ha.Start(context.Background()) + _ = ha.Start(ctx) }() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. expectedPeriod := 1 * time.Second expectedMax := 5 * time.Minute for i := 0; i < 20; i++ { @@ -161,5 +200,100 @@ var _ = Describe("HAService", func() { } Consistently(timeAfterDuration.Load).Should(Equal(int64(expectedMax))) }) + + It("should delete its service endpoint when context closes", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + cancel() + + // Wait for HAService to delete update the Endpoints object + Eventually(func() bool { + err := ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints) + return apierrors.IsNotFound(err) + }).Should(BeTrue()) + + Expect(err.Error()).To(ContainSubstring("canceled")) + }) + + It("upon exit, cleanup should not delete the service endpoint if it points to a different IP address", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + isComplete.Store(true) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + // Modify the Endpoints object so it no longer points to our pod + actualEndpoints.Subsets[0].Addresses[0].IP = "1.1.1.1" + Expect(ha.manager.GetClient().Update(ctx, actualEndpoints)).To(Succeed()) + + cancel() + + // Make sure the HAService did not delete the Endpoints object + Eventually(isComplete.Load).Should(BeTrue()) + Expect(err.Error()).To(ContainSubstring("canceled")) + Expect( + ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints)). + To(Succeed()) + }) + + It("upon exit, cleanup should succeed if endpoints object is deleted by an external actor", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + isComplete.Store(true) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + // Delete the Endpoints object before triggering cleanup. Error should be "context canceled" and not e.g. "not found" + Expect(ha.manager.GetClient().Delete(ctx, actualEndpoints)).To(Succeed()) + cancel() + Eventually(isComplete.Load).Should(BeTrue()) + Expect(err.Error()).To(ContainSubstring("canceled")) + }) }) })