diff --git a/deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml b/deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml index 5c661ffa02..353f87c7e5 100644 --- a/deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml +++ b/deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml @@ -417,6 +417,22 @@ rules: - patch - update - watch +- apiGroups: + - networking.liqo.io + resources: + - connections + verbs: + - get + - list + - watch +- apiGroups: + - networking.liqo.io + resources: + - connections/status + verbs: + - get + - list + - watch - apiGroups: - networking.liqo.io resources: diff --git a/deployments/liqo/files/liqo-virtual-kubelet-local-ClusterRole.yaml b/deployments/liqo/files/liqo-virtual-kubelet-local-ClusterRole.yaml index 78b5b7249d..37fc256d55 100644 --- a/deployments/liqo/files/liqo-virtual-kubelet-local-ClusterRole.yaml +++ b/deployments/liqo/files/liqo-virtual-kubelet-local-ClusterRole.yaml @@ -157,6 +157,14 @@ rules: - get - list - watch +- apiGroups: + - networking.liqo.io + resources: + - connections + verbs: + - get + - list + - watch - apiGroups: - storage.k8s.io resources: diff --git a/pkg/liqo-controller-manager/foreign-cluster-operator/foreign-cluster-controller.go b/pkg/liqo-controller-manager/foreign-cluster-operator/foreign-cluster-controller.go index a25fe950ea..de3c5454a9 100644 --- a/pkg/liqo-controller-manager/foreign-cluster-operator/foreign-cluster-controller.go +++ b/pkg/liqo-controller-manager/foreign-cluster-operator/foreign-cluster-controller.go @@ -18,12 +18,14 @@ import ( "context" "fmt" "net/http" + "sort" "sync" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -37,14 +39,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" - netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" - liqoconst "github.com/liqotech/liqo/pkg/consts" + "github.com/liqotech/liqo/pkg/consts" identitymanager "github.com/liqotech/liqo/pkg/identityManager" resourcerequestoperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/resource-request-controller" peeringRoles "github.com/liqotech/liqo/pkg/peering-roles" tenantnamespace "github.com/liqotech/liqo/pkg/tenantNamespace" foreignclusterutils "github.com/liqotech/liqo/pkg/utils/foreignCluster" + "github.com/liqotech/liqo/pkg/utils/getters" liqolabels "github.com/liqotech/liqo/pkg/utils/labels" peeringconditionsutils "github.com/liqotech/liqo/pkg/utils/peeringConditions" traceutils "github.com/liqotech/liqo/pkg/utils/trace" @@ -66,20 +69,21 @@ const ( virtualKubeletPendingReason = "KubeletPending" virtualKubeletPendingMessage = "The remote cluster has not started the VirtualKubelet for the peering yet" - tunnelEndpointNotFoundReason = "TunnelEndpointNotFound" - tunnelEndpointNotFoundMessage = "The TunnelEndpoint has not been found in the Tenant Namespace %v" + connectionNotFoundReason = "ConnectionNotFound" + connectionNotFoundMessage = "The connection has not been found for the remote cluster %v" - tunnelEndpointAvailableReason = "TunnelEndpointAvailable" - tunnelEndpointAvailableMessage = "The TunnelEndpoint has been successfully found in the Tenant Namespace %v and it is connected" + connectionAvailableReason = "ConnectionAvailable" + connectionAvailableMessage = "The connection has been found for the remote cluster %v" - tunnelEndpointConnectingReason = "TunnelEndpointConnecting" - tunnelEndpointConnectingMessage = "The TunnelEndpoint has been successfully found in the Tenant Namespace %v, but it is not connected yet" + connectionConnectingReason = "ConnectionConnecting" + connectionConnectingMessage = "The connection has been found for the remote cluster %v, but it is not connected yet" + + connectionErrorReason = "ConnectionError" + connectionErrorMessage = "The connection has been found for the remote cluster %v, but an error occurred" externalNetworkReason = "ExternalNetwork" externalNetworkMessage = "The remote cluster network connection is not managed by Liqo" - tunnelEndpointErrorReason = "TunnelEndpointError" - apiServerReadyReason = "APIServerReady" apiServerReadyMessage = "The remote cluster API Server is ready" @@ -126,6 +130,8 @@ type ForeignClusterReconciler struct { // +kubebuilder:rbac:groups=net.liqo.io,resources=networkconfigs/status,verbs=* // +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints,verbs=get;list;watch // +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints/status,verbs=get;watch;update +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;watch +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections/status,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch // +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles,verbs=get;list;watch // +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;watch;create;update;patch @@ -211,12 +217,12 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque } tracer.Step("Ensured the ForeignCluster is processable") - // check for TunnelEndpoints - if err = r.checkTEP(ctx, &foreignCluster); err != nil { + // check for Connection + if err = r.checkConnection(ctx, &foreignCluster); err != nil { klog.Error(err) return ctrl.Result{}, err } - tracer.Step("Checked the TunnelEndpoint status") + tracer.Step("Checked the Connection status") // ------ (2) ensuring prerequirements ------ @@ -429,7 +435,7 @@ func (r *ForeignClusterReconciler) SetupWithManager(mgr ctrl.Manager, workers in Owns(&discoveryv1alpha1.ResourceRequest{}). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer), builder.WithPredicates(getAuthTokenSecretPredicate())). - Watches(&netv1alpha1.TunnelEndpoint{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer)). + Watches(&networkingv1alpha1.Connection{}, handler.EnqueueRequestsFromMapFunc(r.clusterIDEnqueuer)). Watches(&sharingv1alpha1.ResourceOffer{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer)). WithOptions(controller.Options{MaxConcurrentReconciles: workers}). Complete(r) @@ -533,7 +539,24 @@ func getPeeringPhase(foreignCluster *discoveryv1alpha1.ForeignCluster, } } -func (r *ForeignClusterReconciler) checkTEP(ctx context.Context, +func ensureStatusForConnection(foreignCluster *discoveryv1alpha1.ForeignCluster, connection *networkingv1alpha1.Connection) { + switch connection.Status.Value { + case networkingv1alpha1.Connected: + peeringconditionsutils.EnsureStatus(foreignCluster, + discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusEstablished, + connectionAvailableReason, fmt.Sprintf(connectionAvailableMessage, foreignCluster.Spec.ClusterIdentity.ClusterID)) + case networkingv1alpha1.Connecting: + peeringconditionsutils.EnsureStatus(foreignCluster, + discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusPending, + connectionConnectingReason, fmt.Sprintf(connectionConnectingMessage, foreignCluster.Spec.ClusterIdentity.ClusterID)) + case networkingv1alpha1.ConnectionError: + peeringconditionsutils.EnsureStatus(foreignCluster, + discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusError, + connectionErrorReason, fmt.Sprintf(connectionErrorMessage, foreignCluster.Spec.ClusterIdentity.ClusterID)) + } +} + +func (r *ForeignClusterReconciler) checkConnection(ctx context.Context, foreignCluster *discoveryv1alpha1.ForeignCluster) error { if r.DisableInternalNetwork { peeringconditionsutils.EnsureStatus(foreignCluster, @@ -542,38 +565,55 @@ func (r *ForeignClusterReconciler) checkTEP(ctx context.Context, return nil } - var tepList netv1alpha1.TunnelEndpointList - if err := r.Client.List(ctx, &tepList, client.MatchingLabels{ - liqoconst.ClusterIDLabelName: foreignCluster.Spec.ClusterIdentity.ClusterID, - }); err != nil { + remoteClusterIDSelector := labels.Set{consts.RemoteClusterID: foreignCluster.Spec.ClusterIdentity.ClusterID}.AsSelector() + connections, err := getters.ListConnectionsByLabel(ctx, r.Client, corev1.NamespaceAll, remoteClusterIDSelector) + if err != nil { klog.Error(err) return err } - if len(tepList.Items) == 0 { + switch len(connections.Items) { + case 0: peeringconditionsutils.EnsureStatus(foreignCluster, discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusNone, - tunnelEndpointNotFoundReason, fmt.Sprintf(tunnelEndpointNotFoundMessage, foreignCluster.Status.TenantNamespace.Local)) - } else if len(tepList.Items) > 0 { - tep := &tepList.Items[0] - switch tep.Status.Connection.Status { - case netv1alpha1.Connected: - peeringconditionsutils.EnsureStatus(foreignCluster, - discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusEstablished, - tunnelEndpointAvailableReason, fmt.Sprintf(tunnelEndpointAvailableMessage, foreignCluster.Status.TenantNamespace.Local)) - case netv1alpha1.Connecting: - peeringconditionsutils.EnsureStatus(foreignCluster, - discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusPending, - tunnelEndpointConnectingReason, fmt.Sprintf(tunnelEndpointConnectingMessage, foreignCluster.Status.TenantNamespace.Local)) - case netv1alpha1.ConnectionError: - peeringconditionsutils.EnsureStatus(foreignCluster, - discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusError, - tunnelEndpointErrorReason, tep.Status.Connection.StatusMessage) - } + connectionNotFoundReason, fmt.Sprintf(connectionNotFoundMessage, foreignCluster.Spec.ClusterIdentity.ClusterID)) + case 1: + connection := &connections.Items[0] + ensureStatusForConnection(foreignCluster, connection) + default: + // get the oldest connection + sort.Slice(connections.Items, func(i, j int) bool { + return connections.Items[i].CreationTimestamp.Before(&connections.Items[j].CreationTimestamp) + }) + connection := &connections.Items[0] + klog.Errorf("multiple connections found for remote cluster %v, using the oldest one %q", + foreignCluster.Spec.ClusterIdentity.ClusterID, client.ObjectKeyFromObject(connection)) + ensureStatusForConnection(foreignCluster, connection) } + return nil } +func (r *ForeignClusterReconciler) clusterIDEnqueuer(ctx context.Context, obj client.Object) []ctrl.Request { + if obj.GetLabels() == nil { + return []ctrl.Request{} + } + + clusterID, ok := obj.GetLabels()[consts.RemoteClusterID] + if !ok { + return []ctrl.Request{} + } + + fc, err := foreignclusterutils.GetForeignClusterByID(ctx, r.Client, clusterID) + if err != nil { + klog.Error(err) + return []ctrl.Request{} + } + + klog.V(4).Infof("enqueuing foreigncluster %q for object %q", fc.GetName(), klog.KObj(obj)) + return []ctrl.Request{{NamespacedName: types.NamespacedName{Name: fc.GetName()}}} +} + func (r *ForeignClusterReconciler) foreignclusterEnqueuer(_ context.Context, obj client.Object) []ctrl.Request { gvks, _, err := r.Scheme.ObjectKinds(obj) // Should never happen, but if it happens we panic. diff --git a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go index 0c26a35337..6381ce79c2 100644 --- a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go +++ b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" - netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" "github.com/liqotech/liqo/pkg/consts" "github.com/liqotech/liqo/pkg/utils/maps" @@ -51,20 +51,21 @@ func (p *LiqoNodeProvider) reconcileNodeFromVirtualNode(event watch.Event) error return nil } -func (p *LiqoNodeProvider) reconcileNodeFromTep(event watch.Event) error { - var tep netv1alpha1.TunnelEndpoint +func (p *LiqoNodeProvider) reconcileNodeFromConnection(event watch.Event) error { + var connection networkingv1alpha1.Connection unstruct, ok := event.Object.(*unstructured.Unstructured) if !ok { - return errors.New("error in casting tunnel endpoint: recreate watcher") + return errors.New("error in casting Connection") } - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct.Object, &tep); err != nil { + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct.Object, &connection); err != nil { klog.Error(err) return err } + if event.Type == watch.Deleted { p.updateMutex.Lock() defer p.updateMutex.Unlock() - klog.Infof("tunnelEndpoint %v deleted", tep.Name) + klog.Infof("connection %v deleted", connection.Name) p.networkReady = false err := p.updateNode() if err != nil { @@ -73,11 +74,10 @@ func (p *LiqoNodeProvider) reconcileNodeFromTep(event watch.Event) error { return err } - if err := p.updateFromTep(&tep); err != nil { - klog.Errorf("node update from tunnelEndpoint %v failed for reason %v; retry...", tep.Name, err) + if err := p.updateFromConnection(&connection); err != nil { + klog.Errorf("node update from connection %v failed for reason %v; retry...", connection.Name, err) return err } - klog.Info("correctly set pod CIDR from tunnel endpoint") return nil } @@ -119,16 +119,11 @@ func (p *LiqoNodeProvider) updateFromVirtualNode(ctx context.Context, return p.updateNode() } -func (p *LiqoNodeProvider) updateFromTep(tep *netv1alpha1.TunnelEndpoint) error { +func (p *LiqoNodeProvider) updateFromConnection(connection *networkingv1alpha1.Connection) error { p.updateMutex.Lock() defer p.updateMutex.Unlock() - // if tep is not connected yet, return - if tep.Status.Connection.Status != netv1alpha1.Connected { - p.networkReady = false - return p.updateNode() - } - p.networkReady = true + p.networkReady = connection.Status.Value == networkingv1alpha1.Connected return p.updateNode() } diff --git a/pkg/virtualKubelet/liqoNodeProvider/resourceWatcher.go b/pkg/virtualKubelet/liqoNodeProvider/resourceWatcher.go index dcb83c170c..4e6e03a2fb 100644 --- a/pkg/virtualKubelet/liqoNodeProvider/resourceWatcher.go +++ b/pkg/virtualKubelet/liqoNodeProvider/resourceWatcher.go @@ -26,7 +26,7 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/klog/v2" - netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" "github.com/liqotech/liqo/pkg/consts" ) @@ -45,13 +45,14 @@ func (p *LiqoNodeProvider) StartProvider(ctx context.Context) (ready chan struct _, err := virtualNodeInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromVirtualNode)) runtime.Must(err) - var tepInformerFactory dynamicinformer.DynamicSharedInformerFactory + var connInformerFactory dynamicinformer.DynamicSharedInformerFactory if p.checkNetworkStatus { - tepInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynClient, p.resyncPeriod, namespace, func(opt *metav1.ListOptions) { - opt.LabelSelector = consts.ClusterIDLabelName + "=" + p.foreignClusterID - }) - tepInformer := tepInformerFactory.ForResource(netv1alpha1.TunnelEndpointGroupVersionResource).Informer() - _, err := tepInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromTep)) + connInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynClient, p.resyncPeriod, namespace, + func(opt *metav1.ListOptions) { + opt.LabelSelector = consts.RemoteClusterID + "=" + p.foreignClusterID + }) + connInformer := connInformerFactory.ForResource(networkingv1alpha1.ConnectionGroupVersionResource).Informer() + _, err := connInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromConnection)) runtime.Must(err) } @@ -60,7 +61,7 @@ func (p *LiqoNodeProvider) StartProvider(ctx context.Context) (ready chan struct <-ready go virtualNodeInformerFactory.Start(ctx.Done()) if p.checkNetworkStatus { - go tepInformerFactory.Start(ctx.Done()) + go connInformerFactory.Start(ctx.Done()) } klog.Info("Liqo informers started") }() diff --git a/pkg/virtualKubelet/roles/local/role.go b/pkg/virtualKubelet/roles/local/role.go index a1cee70a75..e9f2ce532d 100644 --- a/pkg/virtualKubelet/roles/local/role.go +++ b/pkg/virtualKubelet/roles/local/role.go @@ -31,6 +31,7 @@ package local // +kubebuilder:rbac:groups=virtualkubelet.liqo.io,resources=namespacemaps;virtualnodes,verbs=get;list;watch; // +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints,verbs=get;list;watch +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;watch // +kubebuilder:rbac:groups=discovery.liqo.io,resources=foreignclusters,verbs=get;list;watch // +kubebuilder:rbac:groups=discovery.liqo.io,resources=foreignclusters/status,verbs=get;list;watch