From ab1bbac6d1de2fae9ec01058be835b06aaafb2e7 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 5 Nov 2024 23:17:13 +0300 Subject: [PATCH] store state in reconciler struct, move high level logic to reconciler methods --- internal/controller/etcdcluster_controller.go | 176 ++++++++++-------- internal/controller/observables.go | 10 + test/utils/utils.go | 2 +- 3 files changed, 108 insertions(+), 80 deletions(-) diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index f6287fd3..e3a13b3a 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -58,6 +58,7 @@ const ( type EtcdClusterReconciler struct { client.Client Scheme *runtime.Scheme + state *observables } // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete @@ -75,9 +76,9 @@ type EtcdClusterReconciler struct { // Reconcile checks CR and current cluster state and performs actions to transform current state to desired. func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log.Debug(ctx, "reconciling object") - state := observables{} - state.instance = &etcdaenixiov1alpha1.EtcdCluster{} - err := r.Get(ctx, req.NamespacedName, state.instance) + r.state = &observables{} + r.state.instance = &etcdaenixiov1alpha1.EtcdCluster{} + err := r.Get(ctx, req.NamespacedName, r.state.instance) if err != nil { if errors.IsNotFound(err) { log.Debug(ctx, "object not found") @@ -87,70 +88,70 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, err } // If object is being deleted, skipping reconciliation - if !state.instance.DeletionTimestamp.IsZero() { + if !r.state.instance.DeletionTimestamp.IsZero() { return reconcile.Result{}, nil } // create two services and the pdb - err = r.ensureUnconditionalObjects(ctx, state.instance) + err = r.ensureUnconditionalObjects(ctx) if err != nil { return ctrl.Result{}, err } // fetch STS if exists - err = r.Get(ctx, req.NamespacedName, &state.statefulSet) + err = r.Get(ctx, req.NamespacedName, &r.state.statefulSet) if client.IgnoreNotFound(err) != nil { return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err) } - state.stsExists = state.statefulSet.UID != "" + r.state.stsExists = r.state.statefulSet.UID != "" // fetch endpoints - clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client) + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, r.state.instance, r.Client) if err != nil { return ctrl.Result{}, err } - state.endpointsFound = clusterClient != nil && singleClients != nil + r.state.endpointsFound = clusterClient != nil && singleClients != nil if clusterClient != nil { - state.endpoints = clusterClient.Endpoints() + r.state.endpoints = clusterClient.Endpoints() } // fetch PVCs - state.pvcs, err = factory.PVCs(ctx, state.instance, r.Client) + r.state.pvcs, err = factory.PVCs(ctx, r.state.instance, r.Client) if err != nil { return ctrl.Result{}, err } - if !state.endpointsFound { - if !state.stsExists { - return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing + if !r.state.endpointsFound { + if !r.state.stsExists { + return r.createClusterFromScratch(ctx) // TODO: needs implementing } // update sts pod template (and only pod template) if it doesn't match desired state - if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing + if !r.state.statefulSetPodSpecCorrect() { // TODO: needs implementing desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing - state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec - return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet) + r.state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec + return ctrl.Result{}, r.patchOrCreateObject(ctx, &r.state.statefulSet) } - if !state.statefulSetReady() { // TODO: needs improved implementation? + if !r.state.statefulSetReady() { // TODO: needs improved implementation? return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready") } - if *state.statefulSet.Spec.Replicas > 0 { + if *r.state.statefulSet.Spec.Replicas > 0 { return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)") } - if *state.instance.Spec.Replicas == 0 { + if *r.state.instance.Spec.Replicas == 0 { // cluster successfully scaled down to zero return ctrl.Result{}, nil } - return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing + return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing } // get status of every endpoint and member list from every endpoint - state.etcdStatuses = make([]etcdStatus, len(singleClients)) + r.state.etcdStatuses = make([]etcdStatus, len(singleClients)) { var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) @@ -158,7 +159,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) wg.Add(1) go func(i int) { defer wg.Done() - state.etcdStatuses[i].fill(ctx, singleClients[i]) + r.state.etcdStatuses[i].fill(ctx, singleClients[i]) }(i) } wg.Wait() @@ -166,22 +167,22 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } memberReached := false - for i := range state.etcdStatuses { - if state.etcdStatuses[i].endpointStatus != nil { + for i := range r.state.etcdStatuses { + if r.state.etcdStatuses[i].endpointStatus != nil { memberReached = true break } } if !memberReached { - return r.createOrUpdateStatefulSet(ctx, &state, state.instance) + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) } - state.setClusterID() - if state.inSplitbrain() { + r.state.setClusterID() + if r.state.inSplitbrain() { log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue") meta.SetStatusCondition( - &state.instance.Status.Conditions, + &r.state.instance.Status.Conditions, metav1.Condition{ Type: etcdaenixiov1alpha1.EtcdConditionError, Status: metav1.ConditionTrue, @@ -189,35 +190,35 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage), }, ) - return r.updateStatus(ctx, state.instance) + return r.updateStatus(ctx) } - if !state.clusterHasQuorum() { + if !r.state.clusterHasQuorum() { // we can't do anything about this but we still return an error to check on the cluster from time to time return ctrl.Result{}, fmt.Errorf("cluster has lost quorum") } - if state.hasLearners() { - return ctrl.Result{}, r.promoteLearners(ctx, &state) + if r.state.hasLearners() { + return ctrl.Result{}, r.promoteLearners(ctx) } - if err := r.createOrUpdateClusterStateConfigMap(ctx, &state); err != nil { + if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil { return ctrl.Result{}, err } - if !state.statefulSetPodSpecCorrect() { - return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, &state) + if !r.state.statefulSetPodSpecCorrect() { + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) } // if size is different we have to remove statefulset it will be recreated in the next step - if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, state.instance); err != nil { + if err := r.checkAndDeleteStatefulSetIfNecessary(ctx); err != nil { return ctrl.Result{}, err } /* Saved as an example // set cluster initialization condition meta.SetStatusCondition( - &state.instance.Status.Conditions, + &r.state.instance.Status.Conditions, metav1.Condition{ Type: etcdaenixiov1alpha1.EtcdConditionInitialized, Status: metav1.ConditionTrue, @@ -226,21 +227,21 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) }, ) */ - return r.updateStatus(ctx, state.instance) + return r.updateStatus(ctx) } // checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed. -func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error { - for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates { +func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context) error { + for _, volumeClaimTemplate := range r.state.statefulSet.Spec.VolumeClaimTemplates { if volumeClaimTemplate.Name != "data" { continue } currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] - desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] + desiredStorage := r.state.instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] if desiredStorage.Cmp(currentStorage) != 0 { deletePolicy := metav1.DeletePropagationOrphan - log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name) - err := r.Delete(ctx, &state.statefulSet, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) + log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", r.state.statefulSet.Name) + err := r.Delete(ctx, &r.state.statefulSet, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) if err != nil { log.Error(ctx, err, "Failed to delete StatefulSet") return err @@ -252,21 +253,20 @@ func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context } // ensureConditionalClusterObjects creates or updates all objects owned by cluster CR -func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( - ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(ctx context.Context) error { - if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil { + if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, r.state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile cluster state configmap failed") return err } log.Debug(ctx, "cluster state configmap reconciled") - if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil { + if err := factory.CreateOrUpdateStatefulSet(ctx, r.state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile statefulset failed") return err } - if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil { + if err := factory.UpdatePersistentVolumeClaims(ctx, r.state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile persistentVolumeClaims failed") return err } @@ -276,11 +276,11 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( } // updateStatusOnErr wraps error and updates EtcdCluster status -func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) { +func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, err error) (ctrl.Result, error) { // The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored. // Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded. // REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile@v0.17.3#Reconciler - _, statusErr := r.updateStatus(ctx, cluster) + _, statusErr := r.updateStatus(ctx) if statusErr != nil { return ctrl.Result{}, goerrors.Join(statusErr, err) } @@ -288,8 +288,8 @@ func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster * } // updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict -func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) { - err := r.Status().Update(ctx, cluster) +func (r *EtcdClusterReconciler) updateStatus(ctx context.Context) (ctrl.Result, error) { + err := r.Status().Update(ctx, r.state.instance) if err == nil { return ctrl.Result{}, nil } @@ -302,9 +302,9 @@ func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcda } // isStatefulSetReady gets managed StatefulSet and checks its readiness. -func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, c *etcdaenixiov1alpha1.EtcdCluster) (bool, error) { +func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context) (bool, error) { sts := &appsv1.StatefulSet{} - err := r.Get(ctx, client.ObjectKeyFromObject(c), sts) + err := r.Get(ctx, client.ObjectKeyFromObject(r.state.instance), sts) if err == nil { return sts.Status.ReadyReplicas == *sts.Spec.Replicas, nil } @@ -322,11 +322,11 @@ func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) configureAuth(ctx context.Context) error { var err error - cli, err := r.GetEtcdClient(ctx, cluster) + cli, err := r.GetEtcdClient(ctx) if err != nil { return err } @@ -342,7 +342,7 @@ func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcd auth := clientv3.NewAuth(cli) - if cluster.Spec.Security != nil && cluster.Spec.Security.EnableAuth { + if r.state.instance.Spec.Security != nil && r.state.instance.Spec.Security.EnableAuth { if err := r.createRoleIfNotExists(ctx, auth, "root"); err != nil { return err @@ -393,12 +393,12 @@ func testMemberList(ctx context.Context, cli *clientv3.Client) error { return err } -func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) { +func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context) (*clientv3.Client, error) { - endpoints := getEndpointsSlice(cluster) + endpoints := getEndpointsSlice(r.state.instance) log.Debug(ctx, "endpoints built", "endpoints", endpoints) - tlsConfig, err := r.getTLSConfig(ctx, cluster) + tlsConfig, err := r.getTLSConfig(ctx) if err != nil { log.Error(ctx, err, "failed to build tls config") return nil, err @@ -421,17 +421,17 @@ func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcd } -func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) { +func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context) (*tls.Config, error) { var err error caCertPool := &x509.CertPool{} - if cluster.IsServerTrustedCADefined() { + if r.state.instance.IsServerTrustedCADefined() { serverCASecret := &corev1.Secret{} - if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil { + if err = r.Get(ctx, client.ObjectKey{Namespace: r.state.instance.Namespace, Name: r.state.instance.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil { log.Error(ctx, err, "failed to get server trusted CA secret") return nil, err } @@ -448,10 +448,10 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda cert := tls.Certificate{} - if cluster.IsClientSecurityEnabled() { + if r.state.instance.IsClientSecurityEnabled() { rootSecret := &corev1.Secret{} - if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil { + if err = r.Get(ctx, client.ObjectKey{Namespace: r.state.instance.Namespace, Name: r.state.instance.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil { log.Error(ctx, err, "failed to get root client secret") return nil, err } @@ -465,7 +465,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda } tlsConfig := &tls.Config{ - InsecureSkipVerify: !cluster.IsServerTrustedCADefined(), + InsecureSkipVerify: !r.state.instance.IsServerTrustedCADefined(), RootCAs: caCertPool, Certificates: []tls.Certificate{ cert, @@ -602,7 +602,7 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie // ensureUnconditionalObjects creates the two services and the PDB // which can be created at the start of the reconciliation loop // without any risk of disrupting the etcd cluster -func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context) error { const concurrentOperations = 3 c := make(chan error) defer close(c) @@ -620,7 +620,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, r.state.instance, r.Client), "couldn't ensure client service"): } }(c) @@ -628,7 +628,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, r.state.instance, r.Client), "couldn't ensure headless service"): } }(c) @@ -636,7 +636,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, r.state.instance, r.Client), "couldn't ensure pod disruption budget"): } }(c) @@ -668,9 +668,9 @@ func (r *EtcdClusterReconciler) patchOrCreateObject(ctx context.Context, obj cli // TODO! // nolint:unparam,unused -func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, state *observables) (ctrl.Result, error) { - cm := factory.TemplateClusterStateConfigMap(state.instance, "new", state.desiredReplicas()) - err := ctrl.SetControllerReference(state.instance, cm, r.Scheme) +func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context) (ctrl.Result, error) { + cm := factory.TemplateClusterStateConfigMap(r.state.instance, "new", r.state.desiredReplicas()) + err := ctrl.SetControllerReference(r.state.instance, cm, r.Scheme) if err != nil { return ctrl.Result{}, err } @@ -679,7 +679,7 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st return ctrl.Result{}, err } meta.SetStatusCondition( - &state.instance.Status.Conditions, + &r.state.instance.Status.Conditions, metav1.Condition{ Type: etcdaenixiov1alpha1.EtcdConditionInitialized, Status: metav1.ConditionFalse, @@ -688,7 +688,7 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st }, ) meta.SetStatusCondition( - &state.instance.Status.Conditions, + &r.state.instance.Status.Conditions, metav1.Condition{ Type: etcdaenixiov1alpha1.EtcdConditionReady, Status: metav1.ConditionFalse, @@ -698,14 +698,32 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st ) // ensure managed resources - if err = r.ensureConditionalClusterObjects(ctx, state.instance); err != nil { - return r.updateStatusOnErr(ctx, state.instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) + if err = r.ensureConditionalClusterObjects(ctx); err != nil { + return r.updateStatusOnErr(ctx, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) } panic("not yet implemented") } // TODO! // nolint:unused -func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) { +func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error { panic("not yet implemented") } diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 77ea461e..34102a57 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -183,3 +183,13 @@ func (o *observables) statefulSetPodSpecCorrect() bool { func (o *observables) statefulSetReady() bool { return o.statefulSet.Status.ReadyReplicas == *o.statefulSet.Spec.Replicas } + +// TODO: +func (o *observables) clusterHasQuorum() bool { + return false +} + +// TODO: +func (o *observables) hasLearners() bool { + return false +} diff --git a/test/utils/utils.go b/test/utils/utils.go index 02d77295..f2c534ed 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -179,7 +179,7 @@ func GetEtcdClient(ctx context.Context, namespacedName types.NamespacedName) (*c return nil, err } - client, err := r.GetEtcdClient(ctx, cluster) + client, err := r.GetEtcdClient(ctx) if err != nil { return nil, err }