Skip to content

Commit

Permalink
Add ClockSynced Condition on managed cluster.
Browse files Browse the repository at this point in the history
Signed-off-by: xuezhaojun <[email protected]>
  • Loading branch information
xuezhaojun committed Nov 16, 2023
1 parent 29ffff0 commit d6825f2
Show file tree
Hide file tree
Showing 76 changed files with 1,808 additions and 2,793 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
k8s.io/kube-aggregator v0.28.1
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c
open-cluster-management.io/api v0.12.1-0.20231027024433-bab1208e6889
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down Expand Up @@ -115,10 +115,10 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -487,6 +489,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -744,6 +748,8 @@ open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c
open-cluster-management.io/addon-framework v0.8.1-0.20231009020812-e52774032b4c/go.mod h1:r4sQGR9YgLC4hXC695sfinun2WhuigWrEPk2IeIl800=
open-cluster-management.io/api v0.12.1-0.20231027024433-bab1208e6889 h1:U57ynNMUY6umxZq9F+rLiVqjwky2eXMSHEk5mAtwau0=
open-cluster-management.io/api v0.12.1-0.20231027024433-bab1208e6889/go.mod h1:RaKSNLO1I3xYfvIwIcCxFYgIUp3NOseG0xoGfReBEPw=
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0 h1:5etrmy4PzZ3tV34U02olW5RMICW0GWqgzQItTAp+X0k=
open-cluster-management.io/api v0.12.1-0.20231116014932-8bf94241ddc0/go.mod h1:/I/nFccB0tmF+dZg7pHuzY3SaXOX86MI4vcFtidJ0OM=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
112 changes: 103 additions & 9 deletions pkg/registration/hub/lease/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lease

import (
"context"
"sync"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -34,11 +35,42 @@ var (

// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
clusterHealthMap *clusterHealthMap
}

type clusterHealth struct {
savedLease *coordv1.Lease
}

func (h *clusterHealth) deepCopy() *clusterHealth {
if h == nil {
return nil
}
return &clusterHealth{
savedLease: h.savedLease.DeepCopy(),
}
}

type clusterHealthMap struct {
sync.RWMutex
clusterHealths map[string]*clusterHealth
}

func (m *clusterHealthMap) getDeepCopy(clusterName string) *clusterHealth {
m.RLock()
defer m.RUnlock()
return m.clusterHealths[clusterName].deepCopy()
}

func (m *clusterHealthMap) set(clusterName string, clusterHealth *clusterHealth) {
m.Lock()
defer m.Unlock()
m.clusterHealths[clusterName] = clusterHealth
}

// NewClusterLeaseController creates a cluster lease controller on hub cluster.
Expand All @@ -56,6 +88,9 @@ func NewClusterLeaseController(
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
Expand Down Expand Up @@ -90,7 +125,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
if errors.IsNotFound(err) {
if !cluster.DeletionTimestamp.IsZero() {
// the lease is not found and the cluster is deleting, update the cluster to unknown immediately
return c.updateClusterStatus(ctx, cluster)
return c.updateClusterStatusUnknown(ctx, cluster)
}

// the lease is not found, try to create it
Expand Down Expand Up @@ -119,9 +154,31 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
}

now := time.Now()
if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {

ch := c.clusterHealthMap.getDeepCopy(clusterName)
defer c.clusterHealthMap.set(clusterName, ch)

if ch == nil {
ch = &clusterHealth{}
}

// When the pod is restart, we don't want to report the ClockOutOfSync condition.
if ch.savedLease == nil {
ch.savedLease = observedLease
}

// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
if ch.savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime) {
ch.savedLease = observedLease
if err := c.updateClusterStatusClockSynced(ctx, cluster, now.Sub(observedLease.Spec.RenewTime.Time) < time.Duration(LeaseDurationSeconds)); err != nil {
return err
}
}

if now.After(observedLease.Spec.RenewTime.Time.Add(gracePeriod)) {
// Assume NTP synced between hub and a managed cluster, otherwise the agents may not function correctly.
// the lease is not updated constantly, change the cluster available condition to unknown
if err := c.updateClusterStatus(ctx, cluster); err != nil {
if err := c.updateClusterStatusUnknown(ctx, cluster); err != nil {
return err
}
}
Expand All @@ -131,7 +188,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
return nil
}

func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
func (c *leaseController) updateClusterStatusUnknown(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) {
// the managed cluster available condition alreay is unknown, do nothing
return nil
Expand All @@ -154,3 +211,40 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus

return err
}

func (c *leaseController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
var desiredStatus metav1.ConditionStatus
var condition metav1.Condition
if synced {
desiredStatus = metav1.ConditionTrue
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterClockSynced",
Message: "The clock of the managed cluster is synced with the hub.",
}
} else {
desiredStatus = metav1.ConditionFalse
condition = metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
}

if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
// the managed cluster clock synced condition alreay is desired status, do nothing
return nil
}

newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)

updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
}
return err
}
43 changes: 39 additions & 4 deletions pkg/registration/hub/lease/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ var now = time.Now()

func TestSync(t *testing.T) {
cases := []struct {
name string
clusters []runtime.Object
clusterLeases []runtime.Object
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
name string
clusters []runtime.Object
clusterLeases []runtime.Object
clusterHealthMap *clusterHealthMap
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
}{
{
name: "sync unaccepted managed cluster",
Expand Down Expand Up @@ -110,6 +111,34 @@ func TestSync(t *testing.T) {
testingcommon.AssertNoActions(t, clusterActions)
},
},
{
name: "managed cluster's clock is out of sync",
clusters: []runtime.Object{testinghelpers.NewAvailableManagedCluster()},
clusterLeases: []runtime.Object{testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute))},
clusterHealthMap: &clusterHealthMap{
clusterHealths: map[string]*clusterHealth{
testinghelpers.TestManagedClusterName: {
savedLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-6*time.Minute)),
},
},
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionClockSynced,
Status: metav1.ConditionFalse,
Reason: "ManagedClusterClockOutOfSync",
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
}
testingcommon.AssertActions(t, clusterActions, "patch", "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
},
},
}

for _, c := range cases {
Expand Down Expand Up @@ -142,6 +171,12 @@ func TestSync(t *testing.T) {
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
clusterHealthMap: &clusterHealthMap{
clusterHealths: make(map[string]*clusterHealth),
},
}
if c.clusterHealthMap != nil {
ctrl.clusterHealthMap = c.clusterHealthMap
}
syncErr := ctrl.sync(context.TODO(), syncCtx)
if syncErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func TestDeployWithRolloutStrategyReconcileAsExpected(t *testing.T) {
perGoupeRollOut := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.ProgressivePerGroup,
ProgressivePerGroup: &clusterv1alpha1.RolloutProgressivePerGroup{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
}
mwrSet := helpertest.CreateTestManifestWorkReplicaSetWithRollOutStrategy("mwrSet-test", "default",
Expand Down Expand Up @@ -358,13 +360,17 @@ func TestDeployWithMultiPlacementsReconcileAsExpected(t *testing.T) {
perGoupeRollOut := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.ProgressivePerGroup,
ProgressivePerGroup: &clusterv1alpha1.RolloutProgressivePerGroup{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
}
allRollOut := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.All,
All: &clusterv1alpha1.RolloutAll{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
}

Expand Down Expand Up @@ -474,7 +480,9 @@ func TestDeployMWRSetSpecChangesReconcile(t *testing.T) {
perGoupeRollOut := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.ProgressivePerGroup,
ProgressivePerGroup: &clusterv1alpha1.RolloutProgressivePerGroup{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
}
mwrSet := helpertest.CreateTestManifestWorkReplicaSetWithRollOutStrategy("mwrSet-test", "default",
Expand Down
4 changes: 3 additions & 1 deletion pkg/work/hub/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func CreateTestManifestWorkReplicaSet(name string, ns string, placementNames ...
allRollOut := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.All,
All: &clusterv1alpha1.RolloutAll{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
}

Expand Down
4 changes: 3 additions & 1 deletion test/e2e/manifestworkreplicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ var _ = ginkgo.Describe("Test ManifestWorkReplicaSet", func() {
RolloutStrategy: clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.All,
All: &clusterv1alpha1.RolloutAll{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
},
}
Expand Down
4 changes: 3 additions & 1 deletion test/integration/work/manifestworkreplicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ var _ = ginkgo.Describe("ManifestWorkReplicaSet", func() {
RolloutStrategy: clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.All,
All: &clusterv1alpha1.RolloutAll{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
RolloutConfig: clusterv1alpha1.RolloutConfig{
Timeout: "None",
},
},
},
}
Expand Down
19 changes: 0 additions & 19 deletions vendor/go.uber.org/atomic/.codecov.yml

This file was deleted.

15 changes: 0 additions & 15 deletions vendor/go.uber.org/atomic/.gitignore

This file was deleted.

Loading

0 comments on commit d6825f2

Please sign in to comment.