Skip to content

Commit

Permalink
GH-84 refactor DNSRecord conflict detection
Browse files Browse the repository at this point in the history
  • Loading branch information
maksymvavilov committed May 13, 2024
1 parent 7e3b88f commit 235ea03
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 145 deletions.
211 changes: 119 additions & 92 deletions internal/controller/dnsrecord_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package controller

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -28,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -45,18 +43,15 @@ import (

const (
DNSRecordFinalizer = "kuadrant.io/dns-record"
WriteCounterLimit = 20
validationRequeueVariance = 0.5
DefaultValidationDuration = time.Second * 5
)

var (
defaultRequeueTime time.Duration
validationRequeueTime time.Duration
noRequeueDuration = time.Duration(0)
validFor time.Duration
reconcileStart = metav1.Time{}
Clock clock.Clock = clock.RealClock{}
reconcileStart = metav1.Time{}
)

// DNSRecordReconciler reconciles a DNSRecord object
Expand Down Expand Up @@ -90,19 +85,19 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
dnsRecord := previous.DeepCopy()

if dnsRecord.DeletionTimestamp != nil && !dnsRecord.DeletionTimestamp.IsZero() {
if err := r.ReconcileHealthChecks(ctx, dnsRecord); client.IgnoreNotFound(err) != nil {
if err = r.ReconcileHealthChecks(ctx, dnsRecord); client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, err
}
requeueTime, err := r.deleteRecord(ctx, dnsRecord)
hadChanges, err := r.deleteRecord(ctx, dnsRecord)
if err != nil {
logger.Error(err, "Failed to delete DNSRecord")
return ctrl.Result{RequeueAfter: requeueTime}, err
return ctrl.Result{}, err
}
// if requeueTime returned is the same as validationRequeueTime - the deleteRecord has successfully applied changes
// if hadChanges - the deleteRecord has successfully applied changes
// in this case we need to queue for validation to ensure DNS Provider retained changes
// before removing finalizer and deleting the DNS Record CR
if requeueTime == validationRequeueTime {
return ctrl.Result{RequeueAfter: requeueTime}, nil
if hadChanges {
return ctrl.Result{RequeueAfter: validationRequeueTime}, nil
}

logger.Info("Removing Finalizer", "name", DNSRecordFinalizer)
Expand Down Expand Up @@ -133,36 +128,83 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reason = "ValidationError"
message = fmt.Sprintf("validation of DNSRecord failed: %v", err)
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse, reason, message)
return r.updateStatus(ctx, previous, dnsRecord, noRequeueDuration, err)
return r.updateStatus(ctx, previous, dnsRecord, false, err)
}

// Publish the record
requeueAfter, err := r.publishRecord(ctx, dnsRecord)
hadChanges, err := r.publishRecord(ctx, dnsRecord)
if err != nil {
reason = "ProviderError"
message = fmt.Sprintf("The DNS provider failed to ensure the record: %v", provider.SanitizeError(err))
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse, reason, message)
return r.updateStatus(ctx, previous, dnsRecord, noRequeueDuration, err)
return r.updateStatus(ctx, previous, dnsRecord, hadChanges, err)
}
// success
dnsRecord.Status.ObservedGeneration = dnsRecord.Generation
dnsRecord.Status.Endpoints = dnsRecord.Spec.Endpoints

if err := r.ReconcileHealthChecks(ctx, dnsRecord); err != nil {
if err = r.ReconcileHealthChecks(ctx, dnsRecord); err != nil {
return ctrl.Result{}, err
}

return r.updateStatus(ctx, previous, dnsRecord, requeueAfter, nil)
return r.updateStatus(ctx, previous, dnsRecord, hadChanges, nil)
}

func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, current *v1alpha1.DNSRecord, requeueAfter time.Duration, specErr error) (reconcile.Result, error) {
func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, current *v1alpha1.DNSRecord, hadChanges bool, specErr error) (reconcile.Result, error) {
var requeueTime time.Duration
logger := log.FromContext(ctx)

// short loop. We don't publish anything so not changing status
if prematurely, requeueIn := recordReceivedPrematurely(current); prematurely {
return reconcile.Result{RequeueAfter: requeueIn}, nil
}

// failure
if specErr != nil {
current.Status.WriteCounter = previous.Status.WriteCounter
current.Status.ValidFor = previous.Status.ValidFor
current.Status.QueuedAt = previous.Status.QueuedAt
current.Status.QueuedFor = previous.Status.QueuedFor
var updateError error
if !equality.Semantic.DeepEqual(previous.Status, current.Status) {
if updateError = r.Status().Update(ctx, current); updateError != nil && apierrors.IsConflict(updateError) {
return ctrl.Result{Requeue: true}, nil
}
}
return ctrl.Result{Requeue: true}, updateError
}

// success
if hadChanges {
// generation has not changed but there are changes.
// implies that they were overridden - bump write counter
if !generationChanged(current) {
current.Status.WriteCounter++
wrtiteCounter.WithLabelValues(current.Name, current.Namespace).Inc()
logger.V(1).Info("Changes needed on the same generation of record")
}
requeueTime = validationRequeueTime
setDNSRecordCondition(current, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse, "AwaitingValidation", "Awaiting validation")
} else {
logger.Info("All records are already up to date")
// reset the valid for from randomized value to a fixed value once validation succeeds
if !meta.IsStatusConditionTrue(current.Status.Conditions, string(v1alpha1.ConditionTypeReady)) {
requeueTime = exponentialRequeueTime(validationRequeueTime.String())
} else {
// uses current.Status.ValidFor as the last requeue duration. Double it.
requeueTime = exponentialRequeueTime(current.Status.ValidFor)
}
setDNSRecordCondition(current, string(v1alpha1.ConditionTypeReady), metav1.ConditionTrue, "ProviderSuccess", "Provider ensured the dns record")
}

// valid for is always a requeue time
current.Status.ValidFor = requeueTime.String()

// reset the counter on the gen change regardless of having changes in the plan
if generationChanged(current) {
current.Status.WriteCounter = 0
wrtiteCounter.WithLabelValues(current.Name, current.Namespace).Set(0)
logger.V(1).Info("Resetting write counter on the generation change")
}

current.Status.ObservedGeneration = current.Generation
current.Status.Endpoints = current.Spec.Endpoints
current.Status.QueuedAt = reconcileStart

// update the record after setting the status
if !equality.Semantic.DeepEqual(previous.Status, current.Status) {
if updateError := r.Status().Update(ctx, current); updateError != nil {
if apierrors.IsConflict(updateError) {
Expand All @@ -172,11 +214,7 @@ func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, curren
}
}

if specErr != nil {
return ctrl.Result{Requeue: true}, nil
}

return ctrl.Result{RequeueAfter: requeueAfter}, nil
return ctrl.Result{RequeueAfter: requeueTime}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -191,7 +229,7 @@ func (r *DNSRecordReconciler) SetupWithManager(mgr ctrl.Manager, requeueIn, vali

// deleteRecord deletes record(s) in the DNSPRovider(i.e. route53) configured by the ManagedZone assigned to this
// DNSRecord (dnsRecord.Status.ParentManagedZone).
func (r *DNSRecordReconciler) deleteRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord) (time.Duration, error) {
func (r *DNSRecordReconciler) deleteRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord) (bool, error) {
logger := log.FromContext(ctx)

managedZone := &v1alpha1.ManagedZone{
Expand All @@ -203,33 +241,33 @@ func (r *DNSRecordReconciler) deleteRecord(ctx context.Context, dnsRecord *v1alp
err := r.Get(ctx, client.ObjectKeyFromObject(managedZone), managedZone, &client.GetOptions{})
if err != nil {
// If the Managed Zone isn't found, just continue
return noRequeueDuration, client.IgnoreNotFound(err)
return false, client.IgnoreNotFound(err)
}
managedZoneReady := meta.IsStatusConditionTrue(managedZone.Status.Conditions, "Ready")

if !managedZoneReady {
return validationRequeueTime, fmt.Errorf("the managed zone is not in a ready state : %s", managedZone.Name)
return false, fmt.Errorf("the managed zone is not in a ready state : %s", managedZone.Name)
}

requeueTime, err := r.applyChanges(ctx, dnsRecord, managedZone, true)
hadChanges, err := r.applyChanges(ctx, dnsRecord, managedZone, true)
if err != nil {
if strings.Contains(err.Error(), "was not found") || strings.Contains(err.Error(), "notFound") {
logger.Info("Record not found in managed zone, continuing", "dnsRecord", dnsRecord.Name, "managedZone", managedZone.Name)
return noRequeueDuration, nil
return false, nil
} else if strings.Contains(err.Error(), "no endpoints") {
logger.Info("DNS record had no endpoint, continuing", "dnsRecord", dnsRecord.Name, "managedZone", managedZone.Name)
return noRequeueDuration, nil
return false, nil
}
return noRequeueDuration, err
return false, err
}
logger.Info("Deleted DNSRecord in manage zone", "dnsRecord", dnsRecord.Name, "managedZone", managedZone.Name)

return requeueTime, nil
return hadChanges, nil
}

// publishRecord publishes record(s) to the DNSPRovider(i.e. route53) configured by the ManagedZone assigned to this
// DNSRecord (dnsRecord.Status.ParentManagedZone).
func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord) (time.Duration, error) {
func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord) (bool, error) {
logger := log.FromContext(ctx)
managedZone := &v1alpha1.ManagedZone{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -239,42 +277,57 @@ func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1al
}
err := r.Get(ctx, client.ObjectKeyFromObject(managedZone), managedZone, &client.GetOptions{})
if err != nil {
return noRequeueDuration, err
return false, err
}
managedZoneReady := meta.IsStatusConditionTrue(managedZone.Status.Conditions, "Ready")

if !managedZoneReady {
return noRequeueDuration, fmt.Errorf("the managed zone is not in a ready state : %s", managedZone.Name)
return false, fmt.Errorf("the managed zone is not in a ready state : %s", managedZone.Name)
}

// cut off here for the short reconcile loop
requeueIn := validFor
if dnsRecord.Status.ValidFor != "" {
requeueIn, _ = time.ParseDuration(dnsRecord.Status.ValidFor)
}
expiryTime := metav1.NewTime(dnsRecord.Status.QueuedAt.Add(requeueIn))
if !generationChanged(dnsRecord) && reconcileStart.Before(&expiryTime) {
if prematurely, _ := recordReceivedPrematurely(dnsRecord); prematurely {
logger.V(1).Info("Skipping managed zone to which the DNS dnsRecord is already published and is still valid", "dnsRecord", dnsRecord.Name, "managedZone", managedZone.Name)
return requeueIn, nil
}
if generationChanged(dnsRecord) {
dnsRecord.Status.WriteCounter = 0
wrtiteCounter.WithLabelValues(dnsRecord.Name, dnsRecord.Namespace).Set(0)
return false, nil
}

requeueAfter, err := r.applyChanges(ctx, dnsRecord, managedZone, false)
hadChanges, err := r.applyChanges(ctx, dnsRecord, managedZone, false)
if err != nil {
return noRequeueDuration, err
return hadChanges, err
}
logger.Info("Published DNSRecord to manage zone", "dnsRecord", dnsRecord.Name, "managedZone", managedZone.Name)

return requeueAfter, nil
return hadChanges, nil
}

func recordReceivedPrematurely(record *v1alpha1.DNSRecord) (bool, time.Duration) {
requeueIn := validFor
if record.Status.ValidFor != "" {
requeueIn, _ = time.ParseDuration(record.Status.ValidFor)
}
expiryTime := metav1.NewTime(record.Status.QueuedAt.Add(requeueIn))
return !generationChanged(record) && reconcileStart.Before(&expiryTime), requeueIn
}

func generationChanged(record *v1alpha1.DNSRecord) bool {
return record.Generation != record.Status.ObservedGeneration
}

// exponentialRequeueTime consumes the current time and doubles it until it reaches defaultRequeueTime
func exponentialRequeueTime(lastRequeueTime string) time.Duration {
lastRequeue, err := time.ParseDuration(lastRequeueTime)
// corrupted DNSRecord. This value naturally set only via time.Duration.String() call
if err != nil {
// default to the least confidence timeout
return validationRequeueTime
}
// double the duration. Return the max timeout if overshoot
newReqeueue := lastRequeue * 2
if newReqeueue > defaultRequeueTime {
return defaultRequeueTime
}
return newReqeueue
}

// setDNSRecordCondition adds or updates a given condition in the DNSRecord status..
func setDNSRecordCondition(dnsRecord *v1alpha1.DNSRecord, conditionType string, status metav1.ConditionStatus, reason, message string) {
cond := metav1.Condition{
Expand Down Expand Up @@ -308,7 +361,7 @@ func (r *DNSRecordReconciler) getDNSProvider(ctx context.Context, dnsRecord *v1a
return r.ProviderFactory.ProviderFor(ctx, managedZone, providerConfig)
}

func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, managedZone *v1alpha1.ManagedZone, isDelete bool) (time.Duration, error) {
func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, managedZone *v1alpha1.ManagedZone, isDelete bool) (bool, error) {
logger := log.FromContext(ctx)
zoneDomainName, _ := strings.CutPrefix(managedZone.Spec.DomainName, v1alpha1.WildcardPrefix)
rootDomainName, _ := strings.CutPrefix(dnsRecord.Spec.RootHost, v1alpha1.WildcardPrefix)
Expand All @@ -318,18 +371,18 @@ func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alp

dnsProvider, err := r.getDNSProvider(ctx, dnsRecord)
if err != nil {
return noRequeueDuration, err
return false, err
}

registry, err := dnsRecord.GetRegistry(dnsProvider, managedDNSRecordTypes, excludeDNSRecordTypes)
if err != nil {
return noRequeueDuration, err
return false, err
}

policyID := "sync"
policy, exists := externaldnsplan.Policies[policyID]
if !exists {
return noRequeueDuration, fmt.Errorf("unknown policy: %s", policyID)
return false, fmt.Errorf("unknown policy: %s", policyID)
}

//If we are deleting set the expected endpoints to an empty array
Expand All @@ -340,19 +393,19 @@ func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alp
//zoneEndpoints = Records in the current dns provider zone
zoneEndpoints, err := registry.Records(ctx)
if err != nil {
return noRequeueDuration, err
return false, err
}

//specEndpoints = Records that this DNSRecord expects to exist
specEndpoints, err := registry.AdjustEndpoints(dnsRecord.Spec.Endpoints)
if err != nil {
return noRequeueDuration, fmt.Errorf("adjusting specEndpoints: %w", err)
return false, fmt.Errorf("adjusting specEndpoints: %w", err)
}

//statusEndpoints = Records that were created/updated by this DNSRecord last
statusEndpoints, err := registry.AdjustEndpoints(dnsRecord.Status.Endpoints)
if err != nil {
return noRequeueDuration, fmt.Errorf("adjusting statusEndpoints: %w", err)
return false, fmt.Errorf("adjusting statusEndpoints: %w", err)
}

//Note: All endpoint lists should be in the same provider specific format at this point
Expand All @@ -375,38 +428,12 @@ func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alp
plan = plan.Calculate()

if err = plan.Error(); err != nil {
return noRequeueDuration, err
return false, err
}

dnsRecord.Status.ValidFor = defaultRequeueTime.String()
dnsRecord.Status.QueuedAt = reconcileStart
if plan.Changes.HasChanges() {
// generation has not changed but there are changes.
// implies that they were overridden - bump write counter
if !generationChanged(dnsRecord) {
if dnsRecord.Status.WriteCounter < WriteCounterLimit {
dnsRecord.Status.WriteCounter++
wrtiteCounter.WithLabelValues(dnsRecord.Name, dnsRecord.Namespace).Inc()
logger.V(1).Info("Changes needed on the same generation of record")
} else {
err = errors.New("reached write limit to the DNS provider for the same generation of record")
logger.Error(err, "Giving up on trying to maintain desired state of the DNS record - changes are being overridden")
return noRequeueDuration, err
}
}
dnsRecord.Status.ValidFor = validationRequeueTime.String()
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse, "AwaitingValidation", "Awaiting validation")
logger.Info("Applying changes")
err = registry.ApplyChanges(ctx, plan.Changes)
if err != nil {
return validationRequeueTime, err
}
} else {
logger.Info("All records are already up to date")
dnsRecord.Status.WriteCounter = 0
wrtiteCounter.WithLabelValues(dnsRecord.Name, dnsRecord.Namespace).Set(0)
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionTrue, "ProviderSuccess", "Provider ensured the dns record")
return true, err
}

return defaultRequeueTime, nil
return false, nil
}
Loading

0 comments on commit 235ea03

Please sign in to comment.