Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add a condition to report when hub and agent clock out of sync. #312

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions pkg/registration/hub/lease/clocksynccontroller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package lease

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
coordv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
coordinformers "k8s.io/client-go/informers/coordination/v1"
coordlisters "k8s.io/client-go/listers/coordination/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/common/queue"
)

type clockSyncController struct {
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
}

const (
clockSyncControllerName = "ClockSyncController"
)

func NewClockSyncController(
clusterClient clientset.Interface,
clusterInformer clusterv1informer.ManagedClusterInformer,
leaseInformer coordinformers.LeaseInformer,
recorder events.Recorder,
) factory.Controller {
c := &clockSyncController{
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-clock-sync-controller"),
}

syncCtx := factory.NewSyncContext(clockSyncControllerName, recorder)
leaseRenewTimeUpdateInformer := renewUpdateInfomer(syncCtx.Queue(), leaseInformer)

return factory.New().WithSyncContext(syncCtx).
WithBareInformers(leaseRenewTimeUpdateInformer).
WithSync(c.sync).
ToController(clockSyncControllerName, recorder)
}

func renewUpdateInfomer(q workqueue.RateLimitingInterface, leaseInformer coordinformers.LeaseInformer) factory.Informer {
leaseRenewTimeUpdateInformer := leaseInformer.Informer()
queueKeyByLabel := queue.QueueKeyByLabel(clusterv1.ClusterNameLabelKey)
_, err := leaseRenewTimeUpdateInformer.AddEventHandler(&cache.FilteringResourceEventHandler{
FilterFunc: queue.UnionFilter(queue.FileterByLabel(clusterv1.ClusterNameLabelKey), queue.FilterByNames(leaseName)),
Handler: &cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
// only renew field update event will be added to queue
oldLease := oldObj.(*coordv1.Lease)
newLease := newObj.(*coordv1.Lease)
if !oldLease.Spec.RenewTime.Equal(newLease.Spec.RenewTime) {
for _, queueKey := range queueKeyByLabel(newLease) {
q.Add(queueKey)
}
}
},
},
})
if err != nil {
runtime.HandleError(err)
}
return leaseRenewTimeUpdateInformer
}

func (c *clockSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
clusterName := syncCtx.QueueKey()

// the event caused by resync will be filtered because the cluster is not found
cluster, err := c.clusterLister.Get(clusterName)
if errors.IsNotFound(err) {
// the cluster is not found, do nothing
return nil
}
if err != nil {
return err
}

now := time.Now()
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
if err != nil {
return err
}
// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
// If the two time are not close(over 1 lease duration), we assume the clock is out of sync.
oneLeaseDuration := time.Duration(LeaseDurationSeconds) * time.Second
if err := c.updateClusterStatusClockSynced(ctx, cluster,
now.Sub(observedLease.Spec.RenewTime.Time) < oneLeaseDuration && observedLease.Spec.RenewTime.Time.Sub(now) < oneLeaseDuration); err != nil {
return err
}
return nil
}

func (c *clockSyncController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
var desiredStatus metav1.ConditionStatus
var condition metav1.Condition
if synced {
desiredStatus = metav1.ConditionTrue
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
} else {
desiredStatus = metav1.ConditionFalse
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
}

if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
// the managed cluster clock synced condition alreay is desired status, do nothing
return nil
}

newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
}
return err
}
125 changes: 125 additions & 0 deletions pkg/registration/hub/lease/clocksynccontroller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package lease

import (
"context"
"encoding/json"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
v1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)

func TestClockSyncController(t *testing.T) {
// cases:
// 1. hub and agent clock is close
// 2. hub and agent clock is not close
cases := []struct {
name string
clusters []runtime.Object
leases []runtime.Object
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
}{
{
name: "hub and agent clock is close",
clusters: []runtime.Object{
testinghelpers.NewManagedCluster(),
},
leases: []runtime.Object{
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(5*time.Second)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
{
name: "hub and agent clock is not close",
clusters: []runtime.Object{
testinghelpers.NewManagedCluster(),
},
leases: []runtime.Object{
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(61*time.Second)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.clusters...)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
for _, cluster := range c.clusters {
if err := clusterStore.Add(cluster); err != nil {
t.Fatal(err)
}
}

leaseClient := kubefake.NewSimpleClientset(c.leases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(leaseClient, time.Minute*10)
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
for _, lease := range c.leases {
if err := leaseStore.Add(lease); err != nil {
t.Fatal(err)
}
}

syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)

controller := &clockSyncController{
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
}
syncErr := controller.sync(context.TODO(), syncCtx)
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, leaseClient.Actions(), clusterClient.Actions())
})
}

}
8 changes: 8 additions & 0 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
controllerContext.EventRecorder,
)

clockSyncController := lease.NewClockSyncController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInformers.Coordination().V1().Leases(),
controllerContext.EventRecorder,
)

managedClusterSetController := managedclusterset.NewManagedClusterSetController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
Expand Down Expand Up @@ -268,6 +275,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
go taintController.Run(ctx, 1)
go csrController.Run(ctx, 1)
go leaseController.Run(ctx, 1)
go clockSyncController.Run(ctx, 1)
go managedClusterSetController.Run(ctx, 1)
go managedClusterSetBindingController.Run(ctx, 1)
go clusterroleController.Run(ctx, 1)
Expand Down
57 changes: 57 additions & 0 deletions test/integration/registration/managedcluster_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,45 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
assertAvailableCondition(managedClusterName, metav1.ConditionUnknown, gracePeriod)
})

ginkgo.It("clock sync condition should work", func() {
// run registration agent
agentOptions := &spoke.SpokeAgentOptions{
BootstrapKubeconfig: bootstrapKubeConfigFile,
HubKubeconfigSecret: hubKubeconfigSecret,
ClusterHealthCheckPeriod: 1 * time.Minute,
}
commOptions := commonoptions.NewAgentOptions()
commOptions.HubKubeconfigDir = hubKubeconfigDir
commOptions.SpokeClusterName = managedClusterName

stop := runAgent("cluster-leasetest", agentOptions, commOptions, spokeCfg)
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)

gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
assertCloclSyncedCondition(managedClusterName, metav1.ConditionTrue, gracePeriod)

// stop the agent in case agent update the lease.
stop()

// update the managed cluster lease renew time
now := time.Now()
gomega.Eventually(func() error {
lease, err := util.GetManagedClusterLease(kubeClient, managedClusterName)
if err != nil {
return err
}
// The default lease duration is 60s.
// The renewTime is 2 leaseDuration before the hub's now, so the clock should be out of sync.
// The renewTime + 5 * leaseDuration > now, so the available condition should be true
lease.Spec.RenewTime = &metav1.MicroTime{Time: now.Add(-120 * time.Second)}
_, err = kubeClient.CoordinationV1().Leases(managedClusterName).Update(context.TODO(), lease, metav1.UpdateOptions{})
return err
}, eventuallyInterval, eventuallyTimeout).ShouldNot(gomega.HaveOccurred())

assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
assertCloclSyncedCondition(managedClusterName, metav1.ConditionFalse, 0)
})
})

func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string, leaseDuration int32) {
Expand Down Expand Up @@ -213,3 +252,21 @@ func updateManagedClusterLeaseDuration(clusterName string, leaseDuration int32)
_, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
return err
}

func assertCloclSyncedCondition(managedClusterName string, status metav1.ConditionStatus, d int) {
<-time.After(time.Duration(d) * time.Second)
gomega.Eventually(func() error {
managedCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return err
}
cond := meta.FindStatusCondition(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced)
if cond == nil {
return fmt.Errorf("available condition is not found")
}
if cond.Status != status {
return fmt.Errorf("expected avaibale condition is %s, but %v", status, cond)
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
}
9 changes: 9 additions & 0 deletions test/integration/util/managedcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +22,14 @@ func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterNam
return spokeCluster, nil
}

func GetManagedClusterLease(kubeClient kubernetes.Interface, spokeClusterName string) (*coordinationv1.Lease, error) {
lease, err := kubeClient.CoordinationV1().Leases(spokeClusterName).Get(context.TODO(), "managed-cluster-lease", metav1.GetOptions{})
if err != nil {
return nil, err
}
return lease, nil
}

func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) error {
return AcceptManagedClusterWithLeaseDuration(clusterClient, spokeClusterName, 60)
}
Expand Down
Loading