Skip to content

Commit

Permalink
Document ReconcileResult type and remove systematic requeue (fixes #1419
Browse files Browse the repository at this point in the history
) (#1463)

* Document ReconcileResult type (fixes #1419)

* Don't requeue successful writes in ReconcileObject

* Add ReconcileObject variant that returns the object
  • Loading branch information
olim7t authored Jan 14, 2025
1 parent 3b13a2e commit a6352fc
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 68 deletions.
19 changes: 3 additions & 16 deletions controllers/k8ssandra/medusa_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusa(
return result.Error(err)
}
purgeCronJob.SetLabels(labels.CleanedUpByLabels(kcKey))
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob)
switch {
case recRes.IsError():
return recRes
case recRes.IsRequeue():
if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob); recRes.Completed() {
return recRes
}
} else {
Expand Down Expand Up @@ -173,12 +169,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusaSecrets(
return result.Error(err)
}

res := r.reconcileRemoteBucketSecretsDeprecated(ctx, r.ClientCache.GetLocalClient(), kc, logger)
switch {
case res.IsError():
logger.Error(res.GetError(), "Failed to reconcile Medusa bucket secrets")
return res
case res.IsRequeue():
if res := r.reconcileRemoteBucketSecretsDeprecated(ctx, r.ClientCache.GetLocalClient(), kc, logger); res.Completed() {
return res
}
}
Expand All @@ -202,11 +193,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusaConfigMap(
desiredConfigMap := medusa.CreateMedusaConfigMap(namespace, kc.SanitizedName(), medusaIni)
kcKey := utils.GetKey(kc)
desiredConfigMap.SetLabels(labels.CleanedUpByLabels(kcKey))
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredConfigMap)
switch {
case recRes.IsError():
return recRes
case recRes.IsRequeue():
if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredConfigMap); recRes.Completed() {
return recRes
}
}
Expand Down
6 changes: 1 addition & 5 deletions controllers/k8ssandra/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ func (r *K8ssandraClusterReconciler) reconcileVector(

// Check if the vector config map already exists
desiredVectorConfigMap.SetLabels(labels.CleanedUpByLabels(kcKey))
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap)
switch {
case recRes.IsError():
return recRes
case recRes.IsRequeue():
if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() {
return recRes
}
} else {
Expand Down
9 changes: 2 additions & 7 deletions controllers/reaper/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@ func (r *ReaperReconciler) reconcileVectorConfigMap(
dcLogger.Error(err, "Failed to set controller reference on new Reaper Vector ConfigMap", "ConfigMap", configMapKey)
return ctrl.Result{}, err
}
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap)
switch {
case recRes.IsError():
return ctrl.Result{}, recRes.GetError()
case recRes.IsRequeue():
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() {
return recRes.Output()
}

} else {
if err := shared.DeleteConfigMapIfExists(ctx, remoteClient, configMapKey, dcLogger); err != nil {
return ctrl.Result{}, err
Expand Down
12 changes: 4 additions & 8 deletions controllers/stargate/stargate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (r *StargateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if stargateConfigResult, err := r.reconcileStargateConfigMap(ctx, stargate, dcConfig, userStargateCassandraYaml, userStargateCqlYaml, req.Namespace, *actualDc, logger); err != nil {
return ctrl.Result{}, err
} else {
if stargateConfigResult.Requeue {
if stargateConfigResult.Requeue || stargateConfigResult.RequeueAfter > 0 {
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *StargateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// reconcile Vector configmap
if vectorReconcileResult, err := r.reconcileVectorConfigMap(ctx, *stargate, actualDc, r.Client, logger); err != nil {
return vectorReconcileResult, err
} else if vectorReconcileResult.Requeue {
} else if vectorReconcileResult.Requeue || vectorReconcileResult.RequeueAfter > 0 {
return vectorReconcileResult, nil
}

Expand Down Expand Up @@ -446,12 +446,8 @@ func (r *StargateReconciler) reconcileStargateConfigMap(
return ctrl.Result{}, err
}

recRes := reconciliation.ReconcileObject(ctx, r.Client, r.DefaultDelay, *desiredConfigMap)
switch {
case recRes.IsError():
return ctrl.Result{}, recRes.GetError()
case recRes.IsRequeue():
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
if recRes := reconciliation.ReconcileObject(ctx, r.Client, r.DefaultDelay, *desiredConfigMap); recRes.Completed() {
return recRes.Output()
}
logger.Info("Stargate ConfigMap successfully reconciled")
return ctrl.Result{}, nil
Expand Down
5 changes: 5 additions & 0 deletions controllers/stargate/stargate_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stargate
import (
"context"
"encoding/json"
"os"
"testing"
"time"

Expand Down Expand Up @@ -39,6 +40,10 @@ const (
var managementApiFactory = &testutils.FakeManagementApiFactory{}

func TestStargate(t *testing.T) {

os.Setenv("REQUEUE_DEFAULT_DELAY", "10ms")
os.Setenv("REQUEUE_LONG_DELAY", "10ms")

ctx := testutils.TestSetup(t)
ctx, cancel := context.WithCancel(ctx)
testEnv := &testutils.TestEnv{}
Expand Down
8 changes: 2 additions & 6 deletions controllers/stargate/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ func (r *StargateReconciler) reconcileVectorConfigMap(
dcLogger.Error(err, "Failed to set controller reference on new Stargate Vector ConfigMap", "ConfigMap", configMapKey)
return ctrl.Result{}, err
}
recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap)
switch {
case recRes.IsError():
return ctrl.Result{}, recRes.GetError()
case recRes.IsRequeue():
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() {
return recRes.Output()
}
} else {
if err := deleteConfigMapIfExists(ctx, remoteClient, configMapKey, dcLogger); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/medusa/refresh_secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func RefreshSecrets(dc *cassdcapi.CassandraDatacenter, ctx context.Context, clie
case recRes.IsRequeue():
requeues++
continue
case recRes.IsDone():
case !recRes.Completed():
continue
}
if requeues > 0 {
Expand Down
27 changes: 18 additions & 9 deletions pkg/reconciliation/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@ type Reconcileable[T any] interface {
*T
}

// Try with U, a type of any whose POINTER still fulfils Reoncilable...
// ReconcileObject ensures that desiredObject exists in the given state, either by creating it, or updating it if it
// already exists.
func ReconcileObject[U any, T Reconcileable[U]](ctx context.Context, kClient client.Client, requeueDelay time.Duration, desiredObject U) result.ReconcileResult {
recResult, _ := ReconcileAndGetObject[U, T](ctx, kClient, requeueDelay, desiredObject)
return recResult
}

// ReconcileAndGetObject ensures that desiredObject exists in the given state, either by creating it, or updating it if
// it already exists. It returns the current state of the object on the server after the reconciliation.
func ReconcileAndGetObject[U any, T Reconcileable[U]](
ctx context.Context, kClient client.Client, requeueDelay time.Duration, desiredObject U,
) (result.ReconcileResult, *U) {
objectKey := types.NamespacedName{
Name: T(&desiredObject).GetName(),
Namespace: T(&desiredObject).GetNamespace(),
Expand All @@ -34,24 +44,23 @@ func ReconcileObject[U any, T Reconcileable[U]](ctx context.Context, kClient cli
if errors.IsNotFound(err) {
if err := kClient.Create(ctx, T(&desiredObject)); err != nil {
if errors.IsAlreadyExists(err) {
return result.RequeueSoon(requeueDelay)
return result.RequeueSoon(requeueDelay), nil
}
return result.Error(err)
return result.Error(err), nil
}
return result.RequeueSoon(requeueDelay)
} else {
return result.Error(err)
return result.Continue(), &desiredObject
}
return result.Error(err), nil
}

if !annotations.CompareHashAnnotations(T(currentCm), T(&desiredObject)) {
resourceVersion := T(currentCm).GetResourceVersion()
T(&desiredObject).DeepCopyInto(currentCm)
T(currentCm).SetResourceVersion(resourceVersion)
if err := kClient.Update(ctx, T(currentCm)); err != nil {
return result.Error(err)
return result.Error(err), nil
}
return result.RequeueSoon(requeueDelay)
return result.Continue(), currentCm
}
return result.Done()
return result.Continue(), currentCm
}
11 changes: 5 additions & 6 deletions pkg/reconciliation/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func Test_ReconcileObject_UpdateDone(t *testing.T) {
kClient := testutils.NewFakeClientWRestMapper() // Reset the Client
// Launch reconciliation.
recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject)
assert.True(t, recRes.IsRequeue())
// Should update immediately and signal we can continue
assert.False(t, recRes.Completed())
// After the update we should see the expected ConfigMap
afterUpdateCM := &corev1.ConfigMap{}
err := kClient.Get(ctx,
Expand All @@ -42,15 +43,13 @@ func Test_ReconcileObject_UpdateDone(t *testing.T) {
},
afterUpdateCM)
assert.NoError(t, err)
// If we reconcile again, we should move into the Done state.
recRes = ReconcileObject(ctx, kClient, requeueDelay, desiredObject)
assert.True(t, recRes.IsDone())
}

func Test_ReconcileObject_CreateSuccess(t *testing.T) {
kClient := testutils.NewFakeClientWRestMapper() // Reset the Client
recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject)
assert.True(t, recRes.IsRequeue())
// Should create immediately and signal we can continue
assert.False(t, recRes.Completed())
actualCm := &corev1.ConfigMap{}
err := kClient.Get(ctx, types.NamespacedName{Name: desiredObject.Name, Namespace: desiredObject.Namespace}, actualCm)
assert.NoError(t, err)
Expand All @@ -75,7 +74,7 @@ func Test_ReconcileObject_UpdateSuccess(t *testing.T) {
}
// Launch reconciliation.
recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject)
assert.True(t, recRes.IsRequeue())
assert.False(t, recRes.Completed())
annotations.AddHashAnnotation(&desiredObject)
// After the update we should see the expected ConfigMap
afterUpdateCM := &corev1.ConfigMap{}
Expand Down
55 changes: 52 additions & 3 deletions pkg/result/result_helper.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,56 @@
package result

import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

// Copyright DataStax, Inc.
// Please see the included license file for details.

// This is just so that we can reference TerminalError in the Godoc of [Error]
var _ error = reconcile.TerminalError(nil)

// ReconcileResult represents the result of a step in the reconciliation process.
//
// We typically split the top-level Reconcile() method of a controller into multiple sub-functions. Each of these
// functions uses ReconcileResult to communicate to its caller how the current iteration should proceed. There are 4
// possible implementations: [Continue], [Done], [RequeueSoon], and [Error].
//
// The idiomatic way to handle a ReconcileResult in an intermediary sub-function is:
//
// if recResult := callStep1(); recResult.Completed() {
// // Global success, requeue or error: stop what we're doing and propagate up
// return recResult
// }
// // Otherwise, proceed with the next step(s)
// if recResult := callStep2(); recResult.Completed() {
// // etc...
//
// The idiomatic way to handle a ReconcileResult in the top-level Reconcile() method is:
//
// recResult := callSomeSubmethod()
// // Possibly inspect the result (e.g. to set an error field in the status)
// return recResult.Output()
type ReconcileResult interface {
// Completed indicates that the current iteration of the reconciliation loop is complete, and the top-level
// Reconcile() method should return [ReconcileResult.Output] to the controller runtime.
//
// This returns true for a [Done] or terminal [Error] (where the output will stop the entire reconciliation loop);
// and for a [RequeueSoon] or regular [Error] (where the output will trigger a retry).
//
// This returns false for a [Continue].
Completed() bool
// Output converts this result into a format that the main Reconcile() method can return to the controller runtime.
//
// Calling this method on a [Continue] will panic.
Output() (ctrl.Result, error)
// IsError indicates whether this result is an [Error].
IsError() bool
// IsRequeue indicates whether this result is a [RequeueSoon].
IsRequeue() bool
// IsDone indicates whether this result is a [Done].
IsDone() bool
// GetError returns the wrapped error if the result is an [Error], otherwise it returns nil.
GetError() error
}

Expand Down Expand Up @@ -121,18 +157,31 @@ func (r errorOut) GetError() error {
return r.err
}

// Continue indicates that the current step in the reconciliation is done. The caller should proceed with the next step.
func Continue() ReconcileResult {
return continueReconcile{}
}

// Done indicates that the entire reconciliation loop was successful.
// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the
// top-level Reconcile() method, which should stop the reconciliation.
func Done() ReconcileResult {
return done{}
}

// RequeueSoon indicates that the current step in the reconciliation requires a requeue after a certain amount of time.
// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the
// top-level Reconcile() method, which should schedule a requeue with the given delay.
func RequeueSoon(after time.Duration) ReconcileResult {
return callBackSoon{after: after}
}

// Error indicates that the current step in the reconciliation has failed.
// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the
// top-level Reconcile() method, which should return the error to the controller runtime.
//
// If the argument is wrapped with [reconcile.TerminalError], the reconciliation loop will stop; otherwise, it will be
// retried with exponential backoff.
func Error(e error) ReconcileResult {
return errorOut{err: e}
}
6 changes: 1 addition & 5 deletions pkg/telemetry/cassandra_agent/cassandra_agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ func (c Configurator) ReconcileTelemetryAgentConfig(dc *cassdcapi.CassandraDatac
}
desiredCm.SetLabels(labels.CleanedUpByLabels(KlKey))

recRes := reconciliation.ReconcileObject(c.Ctx, c.RemoteClient, c.RequeueDelay, *desiredCm)
switch {
case recRes.IsError():
return recRes
case recRes.IsRequeue():
if recRes := reconciliation.ReconcileObject(c.Ctx, c.RemoteClient, c.RequeueDelay, *desiredCm); recRes.Completed() {
return recRes
}

Expand Down
13 changes: 11 additions & 2 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func createSingleDatacenterCluster(t *testing.T, ctx context.Context, namespace
require.NoError(err, "failed to patch K8ssandraCluster in namespace %s", namespace)
checkStargateReady(t, f, ctx, stargateKey)
checkStargateK8cStatusReady(t, f, ctx, kcKey, dcKey)
checkContainerPresence(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName)
checkContainerPresenceEventually(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName)
checkVectorAgentConfigMapPresence(t, ctx, f, dcKey, stargate.VectorAgentConfigMapName)

t.Log("check that if Stargate is deleted directly it gets re-created")
Expand All @@ -853,7 +853,7 @@ func createSingleDatacenterCluster(t *testing.T, ctx context.Context, namespace
require.NoError(err, "failed to delete Stargate in namespace %s", namespace)
checkStargateReady(t, f, ctx, stargateKey)

checkContainerPresence(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName)
checkContainerPresenceEventually(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName)
checkVectorAgentConfigMapPresence(t, ctx, f, dcKey, stargate.VectorAgentConfigMapName)

t.Log("delete Stargate in k8ssandracluster resource")
Expand Down Expand Up @@ -2336,6 +2336,15 @@ func checkContainerPresence(t *testing.T, ctx context.Context, f *framework.E2eF
require.True(t, containerFound, "cannot find Container in pod template spec")
}

func checkContainerPresenceEventually(t *testing.T, ctx context.Context, f *framework.E2eFramework, podKey framework.ClusterKey, kc *api.K8ssandraCluster, specFunction func(t *testing.T, ctx context.Context, f *framework.E2eFramework, dcKey framework.ClusterKey, kc *api.K8ssandraCluster) *corev1.PodTemplateSpec, containerName string) {
t.Logf("check that %s contains Container named %s", podKey.Name, containerName)
require.Eventually(t, func() bool {
podTempSpec := specFunction(t, ctx, f, podKey, kc)
_, containerFound := cassandra.FindContainer(podTempSpec, containerName)
return containerFound
}, polling.stargateReady.timeout, polling.stargateReady.interval, "cannot find Container in pod template spec")
}

func checkContainerDeleted(t *testing.T, ctx context.Context, f *framework.E2eFramework, podKey framework.ClusterKey, kc *api.K8ssandraCluster, specFunction func(t *testing.T, ctx context.Context, f *framework.E2eFramework, dcKey framework.ClusterKey, kc *api.K8ssandraCluster) *corev1.PodTemplateSpec, containerName string) {
t.Logf("check that %s does not have a Container named %s", podKey.Name, containerName)
podTempSpec := specFunction(t, ctx, f, podKey, kc)
Expand Down

0 comments on commit a6352fc

Please sign in to comment.