Skip to content

Commit

Permalink
Lineage constructor to accept clients instead of creating them (#8399)
Browse files Browse the repository at this point in the history
Signed-off-by: Ali Ok <[email protected]>
  • Loading branch information
aliok authored Jan 8, 2025
1 parent 52792ea commit 81a37ff
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions pkg/graph/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
rest "k8s.io/client-go/rest"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand All @@ -39,7 +37,8 @@ import (
)

type ConstructorConfig struct {
RestConfig rest.Config
EventingClient *eventingclient.Clientset
DynamicClient *dynamic.DynamicClient
Namespaces []string
ShouldAddBroker func(b eventingv1.Broker) bool
FetchBrokers bool
Expand All @@ -56,39 +55,34 @@ type ConstructorConfig struct {
}

func ConstructGraph(ctx context.Context, config ConstructorConfig, logger zap.Logger) (*Graph, error) {
eventingClient, err := eventingclient.NewForConfig(&config.RestConfig)
if err != nil {
return nil, err
}

g := NewGraph()

err = g.fetchBrokers(ctx, config, eventingClient, logger)
err := g.fetchBrokers(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}

err = g.fetchChannels(ctx, config, eventingClient, logger)
err = g.fetchChannels(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}

err = g.fetchSources(ctx, config, eventingClient, logger)
err = g.fetchSources(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}

err = g.fetchTriggers(ctx, config, eventingClient, logger)
err = g.fetchTriggers(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}

err = g.fetchSubscriptions(ctx, config, eventingClient, logger)
err = g.fetchSubscriptions(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}

err = g.fetchEventTypes(ctx, config, eventingClient, logger)
err = g.fetchEventTypes(ctx, config, config.EventingClient, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -376,7 +370,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error {

to := g.getOrCreateVertex(&trigger.Spec.Subscriber, nil)

//TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later
// TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later
broker.AddEdge(to, triggerDest, getTransformForTrigger(trigger), false)

if trigger.Spec.Delivery == nil || trigger.Spec.Delivery.DeadLetterSink == nil {
Expand Down Expand Up @@ -434,12 +428,7 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error {
}

func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger) ([]duckv1.Source, error) {
client, err := dynamic.NewForConfig(&config.RestConfig)
if err != nil {
return nil, err
}

sourceCRDs, err := client.Resource(
sourceCRDs, err := config.DynamicClient.Resource(
schema.GroupVersionResource{
Group: "apiextentions.k8s.io",
Version: "v1",
Expand Down Expand Up @@ -467,7 +456,7 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger
}

for _, ns := range config.Namespaces {
sourcesList, err := client.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{})
sourcesList, err := config.DynamicClient.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{})
if err != nil {
// just log and continue, we may succeed for other sources
logger.Warn("Failed to list sources", zap.Error(err))
Expand Down

0 comments on commit 81a37ff

Please sign in to comment.