Skip to content

Commit

Permalink
Add ClockSynced Condidtion.
Browse files Browse the repository at this point in the history
Signed-off-by: xuezhaojun <[email protected]>
  • Loading branch information
xuezhaojun committed Nov 16, 2023
1 parent 615f5a4 commit a1678bc
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
k8s.io/kube-aggregator v0.28.1
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c
open-cluster-management.io/api v0.12.1-0.20231114013807-95119cf22df6
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c h1:9Rvj3UTjVwJWOItlIYx6shFF72f8L3t91T9IwZ8sx6Q=
open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c/go.mod h1:r4sQGR9YgLC4hXC695sfinun2WhuigWrEPk2IeIl800=
open-cluster-management.io/api v0.12.1-0.20231114013807-95119cf22df6 h1:QwQ3dTtbCGuZ7iGqINuawGh4c3NmGe2qhcdHvEXRsOs=
open-cluster-management.io/api v0.12.1-0.20231114013807-95119cf22df6/go.mod h1:/I/nFccB0tmF+dZg7pHuzY3SaXOX86MI4vcFtidJ0OM=
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0 h1:5etrmy4PzZ3tV34U02olW5RMICW0GWqgzQItTAp+X0k=
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0/go.mod h1:/I/nFccB0tmF+dZg7pHuzY3SaXOX86MI4vcFtidJ0OM=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
112 changes: 103 additions & 9 deletions pkg/registration/hub/lease/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lease

import (
"context"
"sync"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -34,11 +35,42 @@ var (

// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
clusterHealthMap *clusterHealthMap
}

type clusterHealth struct {
savedLease *coordv1.Lease
}

func (h *clusterHealth) deepCopy() *clusterHealth {
if h == nil {
return nil
}
return &clusterHealth{
savedLease: h.savedLease.DeepCopy(),
}
}

type clusterHealthMap struct {
sync.RWMutex
clusterHealths map[string]*clusterHealth
}

func (m *clusterHealthMap) getDeepCopy(clusterName string) *clusterHealth {
m.RLock()
defer m.RUnlock()
return m.clusterHealths[clusterName].deepCopy()
}

func (m *clusterHealthMap) set(clusterName string, clusterHealth *clusterHealth) {
m.Lock()
defer m.Unlock()
m.clusterHealths[clusterName] = clusterHealth
}

// NewClusterLeaseController creates a cluster lease controller on hub cluster.
Expand All @@ -56,6 +88,9 @@ func NewClusterLeaseController(
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
Expand Down Expand Up @@ -90,7 +125,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
if errors.IsNotFound(err) {
if !cluster.DeletionTimestamp.IsZero() {
// the lease is not found and the cluster is deleting, update the cluster to unknown immediately
return c.updateClusterStatus(ctx, cluster)
return c.updateClusterStatusUnknown(ctx, cluster)
}

// the lease is not found, try to create it
Expand Down Expand Up @@ -119,9 +154,31 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
}

now := time.Now()
if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {

ch := c.clusterHealthMap.getDeepCopy(clusterName)
defer c.clusterHealthMap.set(clusterName, ch)

if ch == nil {
ch = &clusterHealth{}
}

// When the pod is restart, we don't want to report the ClockOutOfSync condition.
if ch.savedLease == nil {
ch.savedLease = observedLease
}

// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
if ch.savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime) {
ch.savedLease = observedLease
if err := c.updateClusterStatusClockSynced(ctx, cluster, now.Sub(observedLease.Spec.RenewTime.Time) < time.Duration(LeaseDurationSeconds)); err != nil {
return err
}
}

if now.After(observedLease.Spec.RenewTime.Time.Add(gracePeriod)) {
// Assume NTP synced between hub and a managed cluster, otherwise the agents may not function correctly.
// the lease is not updated constantly, change the cluster available condition to unknown
if err := c.updateClusterStatus(ctx, cluster); err != nil {
if err := c.updateClusterStatusUnknown(ctx, cluster); err != nil {
return err
}
}
Expand All @@ -131,7 +188,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
return nil
}

func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
func (c *leaseController) updateClusterStatusUnknown(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) {
// the managed cluster available condition alreay is unknown, do nothing
return nil
Expand All @@ -154,3 +211,40 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus

return err
}

func (c *leaseController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
var desiredStatus metav1.ConditionStatus
var condition metav1.Condition
if synced {
desiredStatus = metav1.ConditionTrue
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
} else {
desiredStatus = metav1.ConditionFalse
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
}

if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
// the managed cluster clock synced condition alreay is desired status, do nothing
return nil
}

newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
}
return err
}
43 changes: 39 additions & 4 deletions pkg/registration/hub/lease/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ var now = time.Now()

func TestSync(t *testing.T) {
cases := []struct {
name string
clusters []runtime.Object
clusterLeases []runtime.Object
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
name string
clusters []runtime.Object
clusterLeases []runtime.Object
clusterHealthMap *clusterHealthMap
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
}{
{
name: "sync unaccepted managed cluster",
Expand Down Expand Up @@ -110,6 +111,34 @@ func TestSync(t *testing.T) {
testingcommon.AssertNoActions(t, clusterActions)
},
},
{
name: "managed cluster's clock is out of sync",
clusters: []runtime.Object{testinghelpers.NewAvailableManagedCluster()},
clusterLeases: []runtime.Object{testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute))},
clusterHealthMap: &clusterHealthMap{
clusterHealths: map[string]*clusterHealth{
testinghelpers.TestManagedClusterName: {
savedLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-6*time.Minute)),
},
},
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
testingcommon.AssertActions(t, clusterActions, "patch", "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
}

for _, c := range cases {
Expand Down Expand Up @@ -142,6 +171,12 @@ func TestSync(t *testing.T) {
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
if c.clusterHealthMap != nil {
ctrl.clusterHealthMap = c.clusterHealthMap
}
syncErr := ctrl.sync(context.TODO(), syncCtx)
if syncErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ open-cluster-management.io/addon-framework/pkg/index
open-cluster-management.io/addon-framework/pkg/manager/controllers/addonconfiguration
open-cluster-management.io/addon-framework/pkg/manager/controllers/addonowner
open-cluster-management.io/addon-framework/pkg/utils
# open-cluster-management.io/api v0.12.1-0.20231114013807-95119cf22df6
# open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0
## explicit; go 1.19
open-cluster-management.io/api/addon/v1alpha1
open-cluster-management.io/api/client/addon/clientset/versioned
Expand Down
2 changes: 2 additions & 0 deletions vendor/open-cluster-management.io/api/cluster/v1/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a1678bc

Please sign in to comment.