diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 2cd4e6d43f4..488c4e74bcc 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -29,8 +29,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" - rest "k8s.io/client-go/rest" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" @@ -39,7 +37,8 @@ import ( ) type ConstructorConfig struct { - RestConfig rest.Config + EventingClient *eventingclient.Clientset + DynamicClient *dynamic.DynamicClient Namespaces []string ShouldAddBroker func(b eventingv1.Broker) bool FetchBrokers bool @@ -56,39 +55,34 @@ type ConstructorConfig struct { } func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Logger) (*Graph, error) { - eventingClient, err := eventingclient.NewForConfig(&config.RestConfig) - if err != nil { - return nil, err - } - g := NewGraph() - err = g.fetchBrokers(ctx, config, eventingClient, logger) + err := g.fetchBrokers(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } - err = g.fetchChannels(ctx, config, eventingClient, logger) + err = g.fetchChannels(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } - err = g.fetchSources(ctx, config, eventingClient, logger) + err = g.fetchSources(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } - err = g.fetchTriggers(ctx, config, eventingClient, logger) + err = g.fetchTriggers(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } - err = g.fetchSubscriptions(ctx, config, eventingClient, logger) + err = g.fetchSubscriptions(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } - err = g.fetchEventTypes(ctx, config, eventingClient, logger) + err = g.fetchEventTypes(ctx, config, config.EventingClient, logger) if err != nil { return nil, err } @@ -376,7 +370,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { to := g.getOrCreateVertex(&trigger.Spec.Subscriber, nil) - //TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later + // TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later broker.AddEdge(to, triggerDest, getTransformForTrigger(trigger), false) if trigger.Spec.Delivery == nil || trigger.Spec.Delivery.DeadLetterSink == nil { @@ -434,12 +428,7 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { } func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger) ([]duckv1.Source, error) { - client, err := dynamic.NewForConfig(&config.RestConfig) - if err != nil { - return nil, err - } - - sourceCRDs, err := client.Resource( + sourceCRDs, err := config.DynamicClient.Resource( schema.GroupVersionResource{ Group: "apiextentions.k8s.io", Version: "v1", @@ -467,7 +456,7 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger } for _, ns := range config.Namespaces { - sourcesList, err := client.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{}) + sourcesList, err := config.DynamicClient.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{}) if err != nil { // just log and continue, we may succeed for other sources logger.Warn("Failed to list sources", zap.Error(err))