diff --git a/pkg/registration/hub/lease/controller.go b/pkg/registration/hub/lease/controller.go index 30e368985..1b3b03e12 100644 --- a/pkg/registration/hub/lease/controller.go +++ b/pkg/registration/hub/lease/controller.go @@ -2,6 +2,7 @@ package lease import ( "context" + "sync" "time" "github.com/openshift/library-go/pkg/controller/factory" @@ -34,11 +35,42 @@ var ( // leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available. type leaseController struct { - kubeClient kubernetes.Interface - patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus] - clusterLister clusterv1listers.ManagedClusterLister - leaseLister coordlisters.LeaseLister - eventRecorder events.Recorder + kubeClient kubernetes.Interface + patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus] + clusterLister clusterv1listers.ManagedClusterLister + leaseLister coordlisters.LeaseLister + eventRecorder events.Recorder + clusterHealthMap *clusterHealthMap +} + +type clusterHealth struct { + savedLease *coordv1.Lease +} + +func (h *clusterHealth) deepCopy() *clusterHealth { + if h == nil { + return nil + } + return &clusterHealth{ + savedLease: h.savedLease.DeepCopy(), + } +} + +type clusterHealthMap struct { + sync.RWMutex + clusterHealths map[string]*clusterHealth +} + +func (m *clusterHealthMap) getDeepCopy(clusterName string) *clusterHealth { + m.RLock() + defer m.RUnlock() + return m.clusterHealths[clusterName].deepCopy() +} + +func (m *clusterHealthMap) set(clusterName string, clusterHealth *clusterHealth) { + m.Lock() + defer m.Unlock() + m.clusterHealths[clusterName] = clusterHealth } // NewClusterLeaseController creates a cluster lease controller on hub cluster. @@ -56,6 +88,9 @@ func NewClusterLeaseController( clusterLister: clusterInformer.Lister(), leaseLister: leaseInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"), + clusterHealthMap: &clusterHealthMap{ + clusterHealths: make(map[string]*clusterHealth), + }, } return factory.New(). WithFilteredEventsInformersQueueKeysFunc( @@ -90,7 +125,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext) if errors.IsNotFound(err) { if !cluster.DeletionTimestamp.IsZero() { // the lease is not found and the cluster is deleting, update the cluster to unknown immediately - return c.updateClusterStatus(ctx, cluster) + return c.updateClusterStatusUnknown(ctx, cluster) } // the lease is not found, try to create it @@ -119,9 +154,31 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext) } now := time.Now() - if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) { + + ch := c.clusterHealthMap.getDeepCopy(clusterName) + defer c.clusterHealthMap.set(clusterName, ch) + + if ch == nil { + ch = &clusterHealth{} + } + + // When the pod is restart, we don't want to report the ClockOutOfSync condition. + if ch.savedLease == nil { + ch.savedLease = observedLease + } + + // When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent. + if ch.savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime) { + ch.savedLease = observedLease + if err := c.updateClusterStatusClockSynced(ctx, cluster, now.Sub(observedLease.Spec.RenewTime.Time) < time.Duration(LeaseDurationSeconds)); err != nil { + return err + } + } + + if now.After(observedLease.Spec.RenewTime.Time.Add(gracePeriod)) { + // Assume NTP synced between hub and a managed cluster, otherwise the agents may not function correctly. // the lease is not updated constantly, change the cluster available condition to unknown - if err := c.updateClusterStatus(ctx, cluster); err != nil { + if err := c.updateClusterStatusUnknown(ctx, cluster); err != nil { return err } } @@ -131,7 +188,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext) return nil } -func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error { +func (c *leaseController) updateClusterStatusUnknown(ctx context.Context, cluster *clusterv1.ManagedCluster) error { if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) { // the managed cluster available condition alreay is unknown, do nothing return nil @@ -154,3 +211,40 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus return err } + +func (c *leaseController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error { + var desiredStatus metav1.ConditionStatus + var condition metav1.Condition + if synced { + desiredStatus = metav1.ConditionTrue + condition = metav1.Condition{ + Type: clusterv1.ManagedClusterConditionClockSynced, + Status: metav1.ConditionTrue, + Reason: "ManagedClusterClockSynced", + Message: "The clock of the managed cluster is synced with the hub.", + } + } else { + desiredStatus = metav1.ConditionFalse + condition = metav1.Condition{ + Type: clusterv1.ManagedClusterConditionClockSynced, + Status: metav1.ConditionFalse, + Reason: "ManagedClusterClockOutOfSync", + Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.", + } + } + + if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) { + // the managed cluster clock synced condition alreay is desired status, do nothing + return nil + } + + newCluster := cluster.DeepCopy() + meta.SetStatusCondition(&newCluster.Status.Conditions, condition) + + updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status) + if updated { + c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated", + "update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus) + } + return err +} diff --git a/pkg/registration/hub/lease/controller_test.go b/pkg/registration/hub/lease/controller_test.go index 98e4531e2..df135a042 100644 --- a/pkg/registration/hub/lease/controller_test.go +++ b/pkg/registration/hub/lease/controller_test.go @@ -27,10 +27,11 @@ var now = time.Now() func TestSync(t *testing.T) { cases := []struct { - name string - clusters []runtime.Object - clusterLeases []runtime.Object - validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) + name string + clusters []runtime.Object + clusterLeases []runtime.Object + clusterHealthMap *clusterHealthMap + validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) }{ { name: "sync unaccepted managed cluster", @@ -110,6 +111,34 @@ func TestSync(t *testing.T) { testingcommon.AssertNoActions(t, clusterActions) }, }, + { + name: "managed cluster is available and clock is out of sync", + clusters: []runtime.Object{testinghelpers.NewAvailableManagedCluster()}, + clusterLeases: []runtime.Object{testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute))}, + clusterHealthMap: &clusterHealthMap{ + clusterHealths: map[string]*clusterHealth{ + testinghelpers.TestManagedClusterName: { + savedLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-6*time.Minute)), + }, + }, + }, + validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) { + expected := metav1.Condition{ + Type: clusterv1.ManagedClusterConditionClockSynced, + Status: metav1.ConditionFalse, + Reason: "ManagedClusterClockOutOfSync", + Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.", + } + testingcommon.AssertActions(t, clusterActions, "patch", "patch") + patch := clusterActions[0].(clienttesting.PatchAction).GetPatch() + managedCluster := &v1.ManagedCluster{} + err := json.Unmarshal(patch, managedCluster) + if err != nil { + t.Fatal(err) + } + testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected) + }, + }, } for _, c := range cases { @@ -142,6 +171,12 @@ func TestSync(t *testing.T) { clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(), eventRecorder: syncCtx.Recorder(), + clusterHealthMap: &clusterHealthMap{ + clusterHealths: make(map[string]*clusterHealth), + }, + } + if c.clusterHealthMap != nil { + ctrl.clusterHealthMap = c.clusterHealthMap } syncErr := ctrl.sync(context.TODO(), syncCtx) if syncErr != nil {