Skip to content

Commit

Permalink
Logs and organization cleanup (#208)
Browse files Browse the repository at this point in the history
* logs and organization cleanup

* getting log from context

* reused log var
  • Loading branch information
enrichman authored Jan 29, 2025
1 parent 72b5a98 commit 54be0ba
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 145 deletions.
9 changes: 5 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,25 @@ func run(clx *cli.Context) error {
}

ctrlruntimelog.SetLogger(zapr.NewLogger(logger.Desugar().WithOptions(zap.AddCallerSkip(1))))

logger.Info("adding cluster controller")
if err := cluster.Add(ctx, mgr, sharedAgentImage, sharedAgentImagePullPolicy, logger); err != nil {
if err := cluster.Add(ctx, mgr, sharedAgentImage, sharedAgentImagePullPolicy); err != nil {
return fmt.Errorf("failed to add the new cluster controller: %v", err)
}

logger.Info("adding etcd pod controller")
if err := cluster.AddPodController(ctx, mgr, logger); err != nil {
if err := cluster.AddPodController(ctx, mgr); err != nil {
return fmt.Errorf("failed to add the new cluster controller: %v", err)
}

logger.Info("adding clusterset controller")
if err := clusterset.Add(ctx, mgr, clusterCIDR, logger); err != nil {
if err := clusterset.Add(ctx, mgr, clusterCIDR); err != nil {
return fmt.Errorf("failed to add the clusterset controller: %v", err)
}

if clusterCIDR == "" {
logger.Info("adding networkpolicy node controller")
if err := clusterset.AddNodeController(ctx, mgr, logger); err != nil {
if err := clusterset.AddNodeController(ctx, mgr); err != nil {
return fmt.Errorf("failed to add the clusterset node controller: %v", err)
}
}
Expand Down
124 changes: 41 additions & 83 deletions pkg/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/rancher/k3k/pkg/controller/cluster/agent"
"github.com/rancher/k3k/pkg/controller/cluster/server"
"github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap"
"github.com/rancher/k3k/pkg/log"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -51,12 +49,10 @@ type ClusterReconciler struct {
Scheme *runtime.Scheme
SharedAgentImage string
SharedAgentImagePullPolicy string
logger *log.Logger
}

// Add adds a new controller to the manager
func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgentImagePullPolicy string, logger *log.Logger) error {

func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgentImagePullPolicy string) error {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
return err
Expand All @@ -69,7 +65,6 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgent
Scheme: mgr.GetScheme(),
SharedAgentImage: sharedAgentImage,
SharedAgentImagePullPolicy: sharedAgentImagePullPolicy,
logger: logger.Named(clusterController),
}

return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -81,78 +76,65 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgent
}

func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var (
cluster v1alpha1.Cluster
podList v1.PodList
)
log := c.logger.With("Cluster", req.NamespacedName)
log := ctrl.LoggerFrom(ctx).WithValues("cluster", req.NamespacedName)
ctx = ctrl.LoggerInto(ctx, log) // enrich the current logger

log.Info("reconciling cluster")

var cluster v1alpha1.Cluster
if err := c.Client.Get(ctx, req.NamespacedName, &cluster); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
return reconcile.Result{}, err
}

// if the Version is not specified we will try to use the same Kubernetes version of the host.
// This version is stored in the Status object, and it will not be updated if already set.
if cluster.Spec.Version == "" && cluster.Status.HostVersion == "" {
hostVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
// if DeletionTimestamp is not Zero -> finalize the object
if !cluster.DeletionTimestamp.IsZero() {
return c.finalizeCluster(ctx, cluster)
}

// add finalizers
if !controllerutil.AddFinalizer(&cluster, clusterFinalizerName) {
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}

// update Status HostVersion
k8sVersion := strings.Split(hostVersion.GitVersion, "+")[0]
cluster.Status.HostVersion = k8sVersion + "-k3s1"
orig := cluster.DeepCopy()

reconcilerErr := c.reconcileCluster(ctx, &cluster)

// update Status if needed
if !reflect.DeepEqual(orig.Status, cluster.Status) {
if err := c.Client.Status().Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}

if cluster.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
controllerutil.AddFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}
log.Info("enqueue cluster")
return reconcile.Result{}, c.createCluster(ctx, &cluster, log)
}

// remove finalizer from the server pods and update them.
matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"})
listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace}
matchingLabels.ApplyToList(listOpts)
if err := c.Client.List(ctx, &podList, listOpts); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}
for _, pod := range podList.Items {
if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) {
controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName)
if err := c.Client.Update(ctx, &pod); err != nil {
return reconcile.Result{}, err
}
}
}
return reconcile.Result{}, reconcilerErr
}

if err := c.unbindNodeProxyClusterRole(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
func (c *ClusterReconciler) reconcileCluster(ctx context.Context, cluster *v1alpha1.Cluster) error {
log := ctrl.LoggerFrom(ctx)

if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
// remove finalizer from the cluster and update it.
controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
// if the Version is not specified we will try to use the same Kubernetes version of the host.
// This version is stored in the Status object, and it will not be updated if already set.
if cluster.Spec.Version == "" && cluster.Status.HostVersion == "" {
log.V(1).Info("cluster version not set")

hostVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
return err
}

// update Status HostVersion
k8sVersion := strings.Split(hostVersion.GitVersion, "+")[0]
cluster.Status.HostVersion = k8sVersion + "-k3s1"
}
log.Info("deleting cluster")
return reconcile.Result{}, nil
}

func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster, log *zap.SugaredLogger) error {
if err := c.validate(cluster); err != nil {
log.Errorw("invalid change", zap.Error(err))
log.Error(err, "invalid change")
return nil
}

token, err := c.token(ctx, cluster)
if err != nil {
return err
Expand Down Expand Up @@ -343,30 +325,6 @@ func (c *ClusterReconciler) bindNodeProxyClusterRole(ctx context.Context, cluste
return c.Client.Update(ctx, clusterRoleBinding)
}

func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error {
clusterRoleBinding := &rbacv1.ClusterRoleBinding{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err)
}

subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName)

var cleanedSubjects []rbacv1.Subject
for _, subject := range clusterRoleBinding.Subjects {
if subject.Name != subjectName || subject.Namespace != cluster.Namespace {
cleanedSubjects = append(cleanedSubjects, subject)
}
}

// if no subject was removed, all good
if reflect.DeepEqual(clusterRoleBinding.Subjects, cleanedSubjects) {
return nil
}

clusterRoleBinding.Subjects = cleanedSubjects
return c.Client.Update(ctx, clusterRoleBinding)
}

func (c *ClusterReconciler) agent(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP, token string) error {
agent := agent.New(cluster, serviceIP, c.SharedAgentImage, c.SharedAgentImagePullPolicy, token)
agentsConfig := agent.Config()
Expand Down
79 changes: 79 additions & 0 deletions pkg/controller/cluster/cluster_finalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package cluster

import (
"context"
"fmt"
"reflect"

"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller"
"github.com/rancher/k3k/pkg/controller/cluster/agent"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (c *ClusterReconciler) finalizeCluster(ctx context.Context, cluster v1alpha1.Cluster) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.Info("finalizing Cluster")

// remove finalizer from the server pods and update them.
matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"})
listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace}
matchingLabels.ApplyToList(listOpts)

var podList v1.PodList
if err := c.Client.List(ctx, &podList, listOpts); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}

for _, pod := range podList.Items {
if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) {
controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName)
if err := c.Client.Update(ctx, &pod); err != nil {
return reconcile.Result{}, err
}
}
}

if err := c.unbindNodeProxyClusterRole(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}

if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
// remove finalizer from the cluster and update it.
controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error {
clusterRoleBinding := &rbacv1.ClusterRoleBinding{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err)
}

subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName)

var cleanedSubjects []rbacv1.Subject
for _, subject := range clusterRoleBinding.Subjects {
if subject.Name != subjectName || subject.Namespace != cluster.Namespace {
cleanedSubjects = append(cleanedSubjects, subject)
}
}

// if no subject was removed, all good
if reflect.DeepEqual(clusterRoleBinding.Subjects, cleanedSubjects) {
return nil
}

clusterRoleBinding.Subjects = cleanedSubjects
return c.Client.Update(ctx, clusterRoleBinding)
}
6 changes: 4 additions & 2 deletions pkg/controller/cluster/cluster_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"path/filepath"
"testing"

"github.com/go-logr/zapr"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller/cluster"
"github.com/rancher/k3k/pkg/log"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -53,11 +53,13 @@ var _ = BeforeSuite(func() {
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())

ctrl.SetLogger(zapr.NewLogger(zap.NewNop()))

mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())

ctx, cancel = context.WithCancel(context.Background())
err = cluster.Add(ctx, mgr, "", "", &log.Logger{SugaredLogger: zap.NewNop().Sugar()})
err = cluster.Add(ctx, mgr, "", "")
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
Loading

0 comments on commit 54be0ba

Please sign in to comment.