Skip to content

Commit

Permalink
Add ClockSynced Condition on managed cluster.
Browse files Browse the repository at this point in the history
Signed-off-by: xuezhaojun <[email protected]>
  • Loading branch information
xuezhaojun committed Nov 15, 2023
1 parent 29ffff0 commit 1dd5d7d
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 13 deletions.
112 changes: 103 additions & 9 deletions pkg/registration/hub/lease/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lease

import (
"context"
"sync"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -34,11 +35,42 @@ var (

// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
clusterHealthMap *clusterHealthMap
}

type clusterHealth struct {
savedLease *coordv1.Lease
}

func (h *clusterHealth) deepCopy() *clusterHealth {
if h == nil {
return nil
}
return &clusterHealth{
savedLease: h.savedLease.DeepCopy(),
}
}

type clusterHealthMap struct {
sync.RWMutex
clusterHealths map[string]*clusterHealth
}

func (m *clusterHealthMap) getDeepCopy(clusterName string) *clusterHealth {
m.RLock()
defer m.RUnlock()
return m.clusterHealths[clusterName].deepCopy()
}

func (m *clusterHealthMap) set(clusterName string, clusterHealth *clusterHealth) {
m.Lock()
defer m.Unlock()
m.clusterHealths[clusterName] = clusterHealth
}

// NewClusterLeaseController creates a cluster lease controller on hub cluster.
Expand All @@ -56,6 +88,9 @@ func NewClusterLeaseController(
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
Expand Down Expand Up @@ -90,7 +125,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
if errors.IsNotFound(err) {
if !cluster.DeletionTimestamp.IsZero() {
// the lease is not found and the cluster is deleting, update the cluster to unknown immediately
return c.updateClusterStatus(ctx, cluster)
return c.updateClusterStatusUnknown(ctx, cluster)
}

// the lease is not found, try to create it
Expand Down Expand Up @@ -119,9 +154,31 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
}

now := time.Now()
if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {

ch := c.clusterHealthMap.getDeepCopy(clusterName)
defer c.clusterHealthMap.set(clusterName, ch)

if ch == nil {
ch = &clusterHealth{}
}

// When the pod is restart, we don't want to report the ClockOutOfSync condition.
if ch.savedLease == nil {
ch.savedLease = observedLease
}

// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
if ch.savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime) {
ch.savedLease = observedLease
if err := c.updateClusterStatusClockSynced(ctx, cluster, now.Sub(observedLease.Spec.RenewTime.Time) < time.Duration(LeaseDurationSeconds)); err != nil {
return err
}
}

if now.After(observedLease.Spec.RenewTime.Time.Add(gracePeriod)) {
// Assume NTP synced between hub and a managed cluster, otherwise the agents may not function correctly.
// the lease is not updated constantly, change the cluster available condition to unknown
if err := c.updateClusterStatus(ctx, cluster); err != nil {
if err := c.updateClusterStatusUnknown(ctx, cluster); err != nil {
return err
}
}
Expand All @@ -131,7 +188,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
return nil
}

func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
func (c *leaseController) updateClusterStatusUnknown(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) {
// the managed cluster available condition alreay is unknown, do nothing
return nil
Expand All @@ -154,3 +211,40 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus

return err
}

func (c *leaseController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
var desiredStatus metav1.ConditionStatus
var condition metav1.Condition
if synced {
desiredStatus = metav1.ConditionTrue
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
} else {
desiredStatus = metav1.ConditionFalse
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
}

if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
// the managed cluster clock synced condition alreay is desired status, do nothing
return nil
}

newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
}
return err
}
43 changes: 39 additions & 4 deletions pkg/registration/hub/lease/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ var now = time.Now()

func TestSync(t *testing.T) {
cases := []struct {
name string
clusters []runtime.Object
clusterLeases []runtime.Object
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
name string
clusters []runtime.Object
clusterLeases []runtime.Object
clusterHealthMap *clusterHealthMap
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
}{
{
name: "sync unaccepted managed cluster",
Expand Down Expand Up @@ -110,6 +111,34 @@ func TestSync(t *testing.T) {
testingcommon.AssertNoActions(t, clusterActions)
},
},
{
name: "managed cluster is available and clock is out of sync",
clusters: []runtime.Object{testinghelpers.NewAvailableManagedCluster()},
clusterLeases: []runtime.Object{testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute))},
clusterHealthMap: &clusterHealthMap{
clusterHealths: map[string]*clusterHealth{
testinghelpers.TestManagedClusterName: {
savedLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-6*time.Minute)),
},
},
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
testingcommon.AssertActions(t, clusterActions, "patch", "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
}

for _, c := range cases {
Expand Down Expand Up @@ -142,6 +171,12 @@ func TestSync(t *testing.T) {
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
if c.clusterHealthMap != nil {
ctrl.clusterHealthMap = c.clusterHealthMap
}
syncErr := ctrl.sync(context.TODO(), syncCtx)
if syncErr != nil {
Expand Down

0 comments on commit 1dd5d7d

Please sign in to comment.