diff --git a/controllers/topic_controller.go b/controllers/topic_controller.go index 1511f59..bf929fc 100644 --- a/controllers/topic_controller.go +++ b/controllers/topic_controller.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1" @@ -36,6 +37,8 @@ type TopicReconciler struct { Scheme *runtime.Scheme } +const topicFinalizerName = "topic.googlecloudpubsuboperator.quipper.github.io/finalizer" + //+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics/status,verbs=get;update;patch //+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics/finalizers,verbs=update @@ -51,9 +54,40 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // on deleted requests. return ctrl.Result{}, client.IgnoreNotFound(err) } - logger.Info("Found the topic", "topic", topic) + // examine DeletionTimestamp to determine if object is under deletion + if topic.ObjectMeta.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. This is equivalent + // registering our finalizer. + if !controllerutil.ContainsFinalizer(&topic, topicFinalizerName) { + controllerutil.AddFinalizer(&topic, topicFinalizerName) + if err := r.Update(ctx, &topic); err != nil { + return ctrl.Result{}, err + } + } + } else { + // The object is being deleted + if controllerutil.ContainsFinalizer(&topic, topicFinalizerName) { + // our finalizer is present, so lets handle any external dependency + if err := deleteTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID); err != nil { + // if fail to delete the external dependency here, return with error + // so that it can be retried + return ctrl.Result{}, err + } + + // remove our finalizer from the list and update it. + controllerutil.RemoveFinalizer(&topic, topicFinalizerName) + if err := r.Update(ctx, &topic); err != nil { + return ctrl.Result{}, err + } + } + + // Stop reconciliation as the item is being deleted + return ctrl.Result{}, nil + } + t, err := createTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID) if err != nil { if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.AlreadyExists { @@ -91,3 +125,20 @@ func createTopic(ctx context.Context, projectID, topicID string) (*pubsub.Topic, return t, nil } + +func deleteTopic(ctx context.Context, projectID, topicID string) error { + c, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return fmt.Errorf("pubsub.NewClient: %w", err) + } + defer c.Close() + + if err := c.Topic(topicID).Delete(ctx); err != nil { + if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.NotFound { + // for idempotent + return nil + } + return fmt.Errorf("unable to delete topic %s: %w", topicID, err) + } + return nil +}