Skip to content
This repository has been archived by the owner on Mar 28, 2022. It is now read-only.

watch for cluster changes for all resources #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions controllers/rabbitmqbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
Expand All @@ -26,7 +27,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand All @@ -39,6 +42,9 @@ type RabbitmqBindingReconciler struct {
Service *Service
}

const DestinationTypeQueue = "queue"
const DestinationTypeExchange = "exchange"

// +kubebuilder:rbac:groups=rabbitmq.kokuwa.io,resources=rabbitmqbindings,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rabbitmq.kokuwa.io,resources=rabbitmqbindings/status,verbs=get;update;patch

Expand Down Expand Up @@ -87,11 +93,23 @@ func (r *RabbitmqBindingReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
// The object is being deleted
if containsString(instance.ObjectMeta.Finalizers, rabbitmqFinalizer) {
// our finalizer is present, so lets handle our external dependency
//TODO
bindings, err := rabbitClient.ListQueueBindings(instance.Spec.Vhost, instance.Spec.Destination)
if err != nil {
var bindings []rabbithole.BindingInfo
if instance.Spec.DestinationType == DestinationTypeQueue {
bindings, err = rabbitClient.ListQueueBindings(instance.Spec.Vhost, instance.Spec.Destination)
if err != nil {
return reconcile.Result{}, err
}
} else if instance.Spec.DestinationType == DestinationTypeExchange {
bindings, err = rabbitClient.ListExchangeBindings(instance.Spec.Vhost, instance.Spec.Destination, rabbithole.BindingSource)
if err != nil {
return reconcile.Result{}, err
}
} else {
err = fmt.Errorf("unkown destination_type %v", instance.Spec.DestinationType)
r.UpdateErrorState(ctx, instance, err)
return reconcile.Result{}, err
}

if binding, ok := r.checkIfBindingExists(bindings, instance.Spec); ok {
if _, err := rabbitClient.DeleteBinding(instance.Spec.Vhost, *binding); err != nil {
// if fail to delete the external dependency here, return with error
Expand All @@ -114,10 +132,22 @@ func (r *RabbitmqBindingReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return reconcile.Result{}, nil
}

bindings, err := rabbitClient.ListQueueBindings(instance.Spec.Vhost, instance.Spec.Destination)
if err != nil {
var bindings []rabbithole.BindingInfo
if instance.Spec.DestinationType == DestinationTypeQueue {
bindings, err = rabbitClient.ListQueueBindings(instance.Spec.Vhost, instance.Spec.Destination)
if err != nil {
r.UpdateErrorState(ctx, instance, err)
return reconcile.Result{}, err
}
} else if instance.Spec.DestinationType == DestinationTypeExchange {
bindings, err = rabbitClient.ListExchangeBindings(instance.Spec.Vhost, instance.Spec.Destination, rabbithole.BindingSource)
if err != nil {
r.UpdateErrorState(ctx, instance, err)
return reconcile.Result{}, err
}
} else {
err = fmt.Errorf("unkown destination_type %v", instance.Spec.DestinationType)
r.UpdateErrorState(ctx, instance, err)
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
if _, ok := r.checkIfBindingExists(bindings, instance.Spec); !ok {
Expand Down Expand Up @@ -175,6 +205,7 @@ func (r *RabbitmqBindingReconciler) UpdateErrorState(context context.Context, in
func (r *RabbitmqBindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqBinding{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

Expand Down
3 changes: 3 additions & 0 deletions controllers/rabbitmqexchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand Down Expand Up @@ -138,5 +140,6 @@ func (r *RabbitmqExchangeReconciler) UpdateErrorState(context context.Context, i
func (r *RabbitmqExchangeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqExchange{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}
3 changes: 3 additions & 0 deletions controllers/rabbitmqpermission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand Down Expand Up @@ -146,5 +148,6 @@ func (r *RabbitmqPermissionReconciler) UpdateErrorState(context context.Context,
func (r *RabbitmqPermissionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqPermisson{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}
3 changes: 3 additions & 0 deletions controllers/rabbitmqqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand Down Expand Up @@ -146,5 +148,6 @@ func (r *RabbitmqQueueReconciler) UpdateErrorState(context context.Context, inst
func (r *RabbitmqQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqQueue{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}
3 changes: 3 additions & 0 deletions controllers/rabbitmqshovel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand Down Expand Up @@ -200,6 +202,7 @@ func (r *RabbitmqShovelReconciler) getPasswordSecret(namespace string, secretRef
func (r *RabbitmqShovelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqShovel{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

Expand Down
3 changes: 3 additions & 0 deletions controllers/rabbitmquser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/kokuwaio/rabbitmq-operator/api/v1beta1"
)
Expand Down Expand Up @@ -168,6 +170,7 @@ func (r *RabbitmqUserReconciler) UpdateErrorState(context context.Context, insta
func (r *RabbitmqUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.RabbitmqUser{}).
Watches(&source.Kind{Type: &rabbitmqv1beta1.RabbitmqCluster{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

Expand Down