Skip to content

Commit

Permalink
watch connection instead of tunnelendpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Nov 15, 2023
1 parent 1d42ff8 commit 81d5493
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 61 deletions.
16 changes: 16 additions & 0 deletions deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,22 @@ rules:
- patch
- update
- watch
- apiGroups:
- networking.liqo.io
resources:
- connections
verbs:
- get
- list
- watch
- apiGroups:
- networking.liqo.io
resources:
- connections/status
verbs:
- get
- list
- watch
- apiGroups:
- networking.liqo.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ rules:
- get
- list
- watch
- apiGroups:
- networking.liqo.io
resources:
- connections
verbs:
- get
- list
- watch
- apiGroups:
- storage.k8s.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -37,14 +39,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
liqoconst "github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/consts"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
resourcerequestoperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/resource-request-controller"
peeringRoles "github.com/liqotech/liqo/pkg/peering-roles"
tenantnamespace "github.com/liqotech/liqo/pkg/tenantNamespace"
foreignclusterutils "github.com/liqotech/liqo/pkg/utils/foreignCluster"
"github.com/liqotech/liqo/pkg/utils/getters"
liqolabels "github.com/liqotech/liqo/pkg/utils/labels"
peeringconditionsutils "github.com/liqotech/liqo/pkg/utils/peeringConditions"
traceutils "github.com/liqotech/liqo/pkg/utils/trace"
Expand All @@ -66,20 +69,21 @@ const (
virtualKubeletPendingReason = "KubeletPending"
virtualKubeletPendingMessage = "The remote cluster has not started the VirtualKubelet for the peering yet"

tunnelEndpointNotFoundReason = "TunnelEndpointNotFound"
tunnelEndpointNotFoundMessage = "The TunnelEndpoint has not been found in the Tenant Namespace %v"
connectionNotFoundReason = "ConnectionNotFound"
connectionNotFoundMessage = "The connection has not been found for the remote cluster %v"

tunnelEndpointAvailableReason = "TunnelEndpointAvailable"
tunnelEndpointAvailableMessage = "The TunnelEndpoint has been successfully found in the Tenant Namespace %v and it is connected"
connectionAvailableReason = "ConnectionAvailable"
connectionAvailableMessage = "The connection has been found for the remote cluster %v"

tunnelEndpointConnectingReason = "TunnelEndpointConnecting"
tunnelEndpointConnectingMessage = "The TunnelEndpoint has been successfully found in the Tenant Namespace %v, but it is not connected yet"
connectionConnectingReason = "ConnectionConnecting"
connectionConnectingMessage = "The connection has been found for the remote cluster %v, but it is not connected yet"

connectionErrorReason = "ConnectionError"
connectionErrorMessage = "The connection has been found for the remote cluster %v, but an error occurred"

externalNetworkReason = "ExternalNetwork"
externalNetworkMessage = "The remote cluster network connection is not managed by Liqo"

tunnelEndpointErrorReason = "TunnelEndpointError"

apiServerReadyReason = "APIServerReady"
apiServerReadyMessage = "The remote cluster API Server is ready"

Expand Down Expand Up @@ -126,6 +130,8 @@ type ForeignClusterReconciler struct {
// +kubebuilder:rbac:groups=net.liqo.io,resources=networkconfigs/status,verbs=*
// +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints/status,verbs=get;watch;update
// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;watch
// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles,verbs=get;list;watch
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;watch;create;update;patch
Expand Down Expand Up @@ -211,12 +217,12 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
tracer.Step("Ensured the ForeignCluster is processable")

// check for TunnelEndpoints
if err = r.checkTEP(ctx, &foreignCluster); err != nil {
// check for Connection
if err = r.checkConnection(ctx, &foreignCluster); err != nil {
klog.Error(err)
return ctrl.Result{}, err
}
tracer.Step("Checked the TunnelEndpoint status")
tracer.Step("Checked the Connection status")

// ------ (2) ensuring prerequirements ------

Expand Down Expand Up @@ -429,7 +435,7 @@ func (r *ForeignClusterReconciler) SetupWithManager(mgr ctrl.Manager, workers in
Owns(&discoveryv1alpha1.ResourceRequest{}).
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer),
builder.WithPredicates(getAuthTokenSecretPredicate())).
Watches(&netv1alpha1.TunnelEndpoint{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer)).
Watches(&networkingv1alpha1.Connection{}, handler.EnqueueRequestsFromMapFunc(r.clusterIDEnqueuer)).
Watches(&sharingv1alpha1.ResourceOffer{}, handler.EnqueueRequestsFromMapFunc(r.foreignclusterEnqueuer)).
WithOptions(controller.Options{MaxConcurrentReconciles: workers}).
Complete(r)
Expand Down Expand Up @@ -533,7 +539,24 @@ func getPeeringPhase(foreignCluster *discoveryv1alpha1.ForeignCluster,
}
}

func (r *ForeignClusterReconciler) checkTEP(ctx context.Context,
func ensureStatusForConnection(foreignCluster *discoveryv1alpha1.ForeignCluster, connection *networkingv1alpha1.Connection) {
switch connection.Status.Value {
case networkingv1alpha1.Connected:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusEstablished,
connectionAvailableReason, fmt.Sprintf(connectionAvailableMessage, foreignCluster.Spec.ClusterIdentity.ClusterID))
case networkingv1alpha1.Connecting:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusPending,
connectionConnectingReason, fmt.Sprintf(connectionConnectingMessage, foreignCluster.Spec.ClusterIdentity.ClusterID))
case networkingv1alpha1.ConnectionError:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusError,
connectionErrorReason, fmt.Sprintf(connectionErrorMessage, foreignCluster.Spec.ClusterIdentity.ClusterID))
}
}

func (r *ForeignClusterReconciler) checkConnection(ctx context.Context,
foreignCluster *discoveryv1alpha1.ForeignCluster) error {
if r.DisableInternalNetwork {
peeringconditionsutils.EnsureStatus(foreignCluster,
Expand All @@ -542,38 +565,55 @@ func (r *ForeignClusterReconciler) checkTEP(ctx context.Context,
return nil
}

var tepList netv1alpha1.TunnelEndpointList
if err := r.Client.List(ctx, &tepList, client.MatchingLabels{
liqoconst.ClusterIDLabelName: foreignCluster.Spec.ClusterIdentity.ClusterID,
}); err != nil {
remoteClusterIDSelector := labels.Set{consts.RemoteClusterID: foreignCluster.Spec.ClusterIdentity.ClusterID}.AsSelector()
connections, err := getters.ListConnectionsByLabel(ctx, r.Client, corev1.NamespaceAll, remoteClusterIDSelector)
if err != nil {
klog.Error(err)
return err
}

if len(tepList.Items) == 0 {
switch len(connections.Items) {
case 0:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusNone,
tunnelEndpointNotFoundReason, fmt.Sprintf(tunnelEndpointNotFoundMessage, foreignCluster.Status.TenantNamespace.Local))
} else if len(tepList.Items) > 0 {
tep := &tepList.Items[0]
switch tep.Status.Connection.Status {
case netv1alpha1.Connected:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusEstablished,
tunnelEndpointAvailableReason, fmt.Sprintf(tunnelEndpointAvailableMessage, foreignCluster.Status.TenantNamespace.Local))
case netv1alpha1.Connecting:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusPending,
tunnelEndpointConnectingReason, fmt.Sprintf(tunnelEndpointConnectingMessage, foreignCluster.Status.TenantNamespace.Local))
case netv1alpha1.ConnectionError:
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.NetworkStatusCondition, discoveryv1alpha1.PeeringConditionStatusError,
tunnelEndpointErrorReason, tep.Status.Connection.StatusMessage)
}
connectionNotFoundReason, fmt.Sprintf(connectionNotFoundMessage, foreignCluster.Spec.ClusterIdentity.ClusterID))
case 1:
connection := &connections.Items[0]
ensureStatusForConnection(foreignCluster, connection)
default:
// get the oldest connection
sort.Slice(connections.Items, func(i, j int) bool {
return connections.Items[i].CreationTimestamp.Before(&connections.Items[j].CreationTimestamp)
})
connection := &connections.Items[0]
klog.Errorf("multiple connections found for remote cluster %v, using the oldest one %q",
foreignCluster.Spec.ClusterIdentity.ClusterID, client.ObjectKeyFromObject(connection))
ensureStatusForConnection(foreignCluster, connection)
}

return nil
}

func (r *ForeignClusterReconciler) clusterIDEnqueuer(ctx context.Context, obj client.Object) []ctrl.Request {
if obj.GetLabels() == nil {
return []ctrl.Request{}
}

clusterID, ok := obj.GetLabels()[consts.RemoteClusterID]
if !ok {
return []ctrl.Request{}
}

fc, err := foreignclusterutils.GetForeignClusterByID(ctx, r.Client, clusterID)
if err != nil {
klog.Error(err)
return []ctrl.Request{}
}

klog.V(4).Infof("enqueuing foreigncluster %q for object %q", fc.GetName(), klog.KObj(obj))
return []ctrl.Request{{NamespacedName: types.NamespacedName{Name: fc.GetName()}}}
}

func (r *ForeignClusterReconciler) foreignclusterEnqueuer(_ context.Context, obj client.Object) []ctrl.Request {
gvks, _, err := r.Scheme.ObjectKinds(obj)
// Should never happen, but if it happens we panic.
Expand Down
27 changes: 11 additions & 16 deletions pkg/virtualKubelet/liqoNodeProvider/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/utils/maps"
Expand All @@ -51,20 +51,21 @@ func (p *LiqoNodeProvider) reconcileNodeFromVirtualNode(event watch.Event) error
return nil
}

func (p *LiqoNodeProvider) reconcileNodeFromTep(event watch.Event) error {
var tep netv1alpha1.TunnelEndpoint
func (p *LiqoNodeProvider) reconcileNodeFromConnection(event watch.Event) error {
var connection networkingv1alpha1.Connection
unstruct, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return errors.New("error in casting tunnel endpoint: recreate watcher")
return errors.New("error in casting Connection")
}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct.Object, &tep); err != nil {
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct.Object, &connection); err != nil {
klog.Error(err)
return err
}

if event.Type == watch.Deleted {
p.updateMutex.Lock()
defer p.updateMutex.Unlock()
klog.Infof("tunnelEndpoint %v deleted", tep.Name)
klog.Infof("connection %v deleted", connection.Name)
p.networkReady = false
err := p.updateNode()
if err != nil {
Expand All @@ -73,11 +74,10 @@ func (p *LiqoNodeProvider) reconcileNodeFromTep(event watch.Event) error {
return err
}

if err := p.updateFromTep(&tep); err != nil {
klog.Errorf("node update from tunnelEndpoint %v failed for reason %v; retry...", tep.Name, err)
if err := p.updateFromConnection(&connection); err != nil {
klog.Errorf("node update from connection %v failed for reason %v; retry...", connection.Name, err)
return err
}
klog.Info("correctly set pod CIDR from tunnel endpoint")
return nil
}

Expand Down Expand Up @@ -119,16 +119,11 @@ func (p *LiqoNodeProvider) updateFromVirtualNode(ctx context.Context,
return p.updateNode()
}

func (p *LiqoNodeProvider) updateFromTep(tep *netv1alpha1.TunnelEndpoint) error {
func (p *LiqoNodeProvider) updateFromConnection(connection *networkingv1alpha1.Connection) error {
p.updateMutex.Lock()
defer p.updateMutex.Unlock()

// if tep is not connected yet, return
if tep.Status.Connection.Status != netv1alpha1.Connected {
p.networkReady = false
return p.updateNode()
}
p.networkReady = true
p.networkReady = connection.Status.Value == networkingv1alpha1.Connected
return p.updateNode()
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/virtualKubelet/liqoNodeProvider/resourceWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
)
Expand All @@ -45,13 +45,14 @@ func (p *LiqoNodeProvider) StartProvider(ctx context.Context) (ready chan struct
_, err := virtualNodeInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromVirtualNode))
runtime.Must(err)

var tepInformerFactory dynamicinformer.DynamicSharedInformerFactory
var connInformerFactory dynamicinformer.DynamicSharedInformerFactory
if p.checkNetworkStatus {
tepInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynClient, p.resyncPeriod, namespace, func(opt *metav1.ListOptions) {
opt.LabelSelector = consts.ClusterIDLabelName + "=" + p.foreignClusterID
})
tepInformer := tepInformerFactory.ForResource(netv1alpha1.TunnelEndpointGroupVersionResource).Informer()
_, err := tepInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromTep))
connInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(p.dynClient, p.resyncPeriod, namespace,
func(opt *metav1.ListOptions) {
opt.LabelSelector = consts.RemoteClusterID + "=" + p.foreignClusterID
})
connInformer := connInformerFactory.ForResource(networkingv1alpha1.ConnectionGroupVersionResource).Informer()
_, err := connInformer.AddEventHandler(getEventHandler(p.reconcileNodeFromConnection))
runtime.Must(err)
}

Expand All @@ -60,7 +61,7 @@ func (p *LiqoNodeProvider) StartProvider(ctx context.Context) (ready chan struct
<-ready
go virtualNodeInformerFactory.Start(ctx.Done())
if p.checkNetworkStatus {
go tepInformerFactory.Start(ctx.Done())
go connInformerFactory.Start(ctx.Done())
}
klog.Info("Liqo informers started")
}()
Expand Down
1 change: 1 addition & 0 deletions pkg/virtualKubelet/roles/local/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ package local

// +kubebuilder:rbac:groups=virtualkubelet.liqo.io,resources=namespacemaps;virtualnodes,verbs=get;list;watch;
// +kubebuilder:rbac:groups=net.liqo.io,resources=tunnelendpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;watch
// +kubebuilder:rbac:groups=discovery.liqo.io,resources=foreignclusters,verbs=get;list;watch
// +kubebuilder:rbac:groups=discovery.liqo.io,resources=foreignclusters/status,verbs=get;list;watch

Expand Down

0 comments on commit 81d5493

Please sign in to comment.