From fe105249e8a52764a1424c62577f65527d557a05 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Wed, 8 Jan 2025 12:21:20 +0300 Subject: [PATCH 1/2] Don't swallow errors in Lineage package Signed-off-by: Ali Ok --- pkg/graph/constructor.go | 35 ++++++----------------------------- 1 file changed, 6 insertions(+), 29 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 488c4e74bcc..cf0ba748bf7 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -97,14 +97,10 @@ func (g *Graph) fetchBrokers(ctx context.Context, config ConstructorConfig, even for _, ns := range config.Namespaces { brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { - logger.Warn("failed to list brokers while constructing lineage graph", zap.Error(err)) - } - if err == nil { for _, broker := range brokers.Items { if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) { @@ -125,14 +121,10 @@ func (g *Graph) fetchChannels(ctx context.Context, config ConstructorConfig, eve for _, ns := range config.Namespaces { channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { - logger.Warn("failed to list channels while constructing lineage graph", zap.Error(err)) - } - if err == nil { for _, channel := range channels.Items { if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) { @@ -171,14 +163,10 @@ func (g *Graph) fetchTriggers(ctx context.Context, config ConstructorConfig, eve for _, ns := range config.Namespaces { triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { - logger.Warn("failed to list triggers while constructing lineage graph", zap.Error(err)) - } - if err == nil { for _, trigger := range triggers.Items { if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) { @@ -201,14 +189,10 @@ func (g *Graph) fetchSubscriptions(ctx context.Context, config ConstructorConfig for _, ns := range config.Namespaces { subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { - logger.Warn("failed to list subscriptions while constructing lineage graph", zap.Error(err)) - } - if err == nil { for _, subscription := range subscriptions.Items { if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) { @@ -231,14 +215,10 @@ func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, e for _, ns := range config.Namespaces { eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) { + if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { - logger.Warn("failed to list eventtypes while constructing lineage graph", zap.Error(err)) - } - if err == nil { for _, eventType := range eventTypes.Items { if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) { @@ -436,10 +416,7 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger }, ).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()}) if err != nil { - if errors.IsNotFound(err) || errors.IsUnauthorized(err) || errors.IsForbidden(err) { - logger.Warn("failed to list source CRDs", zap.Error(err)) - // no need to keep processing here, but also this isn't an error that should stop us from - // continuing to build the graph + if errors.IsNotFound(err) { return nil, nil } else { return nil, fmt.Errorf("unable to list source CRDs: %w", err) From 32e36bf8186993d568284ffb4781c7d96aec3abd Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Wed, 8 Jan 2025 12:55:56 +0300 Subject: [PATCH 2/2] Better error handling and make the fail fast configurable Signed-off-by: Ali Ok --- pkg/graph/constructor.go | 190 ++++++++++++++++++++++++++++----------- 1 file changed, 137 insertions(+), 53 deletions(-) diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index cf0ba748bf7..cdb370612f0 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -22,7 +22,6 @@ import ( "fmt" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -37,6 +36,15 @@ import ( ) type ConstructorConfig struct { + // Lenient will cause the graph building to ignore non-fatal errors that occur while listing resources. + // + // For example, if brokers cannot be listed due to a permission error, the graph will still be built with other + // resources. + // + // However, if the brokers cannot be listed due to a network error, the graph building will stop and return an error + // regardless of the Lenient setting. + Lenient bool + EventingClient *eventingclient.Clientset DynamicClient *dynamic.DynamicClient Namespaces []string @@ -97,15 +105,26 @@ func (g *Graph) fetchBrokers(ctx context.Context, config ConstructorConfig, even for _, ns := range config.Namespaces { brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) { - return err + + if apierrs.IsNotFound(err) { + continue } - if err == nil { - for _, broker := range brokers.Items { - if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) { - g.AddBroker(broker) - } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return fmt.Errorf("failed to list brokers: %w", err) + } + logger.Warn("failed to list brokers while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return fmt.Errorf("failed to list brokers: %w", err) + } + + for _, broker := range brokers.Items { + if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) { + g.AddBroker(broker) } } } @@ -119,17 +138,27 @@ func (g *Graph) fetchChannels(ctx context.Context, config ConstructorConfig, eve } for _, ns := range config.Namespaces { - channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) { - return err + + if apierrs.IsNotFound(err) { + continue } - if err == nil { - for _, channel := range channels.Items { - if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) { - g.AddChannel(channel) - } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return fmt.Errorf("failed to list channels: %w", err) + } + logger.Warn("failed to list channels while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return fmt.Errorf("failed to list channels: %w", err) + } + + for _, channel := range channels.Items { + if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) { + g.AddChannel(channel) } } } @@ -163,17 +192,28 @@ func (g *Graph) fetchTriggers(ctx context.Context, config ConstructorConfig, eve for _, ns := range config.Namespaces { triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) { - return err + + if apierrs.IsNotFound(err) { + continue + } + + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return fmt.Errorf("failed to list triggers: %w", err) + } + logger.Warn("failed to list triggers while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return fmt.Errorf("failed to list triggers: %w", err) } - if err == nil { - for _, trigger := range triggers.Items { - if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) { - err := g.AddTrigger(trigger) - if err != nil { - return err - } + for _, trigger := range triggers.Items { + if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) { + err := g.AddTrigger(trigger) + if err != nil { + return err } } } @@ -189,17 +229,28 @@ func (g *Graph) fetchSubscriptions(ctx context.Context, config ConstructorConfig for _, ns := range config.Namespaces { subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) { - return err + + if apierrs.IsNotFound(err) { + continue } - if err == nil { - for _, subscription := range subscriptions.Items { - if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) { - err := g.AddSubscription(subscription) - if err != nil { - return err - } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return fmt.Errorf("failed to list subscriptions: %w", err) + } + logger.Warn("failed to list subscriptions while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return fmt.Errorf("failed to list subscriptions: %w", err) + } + + for _, subscription := range subscriptions.Items { + if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) { + err := g.AddSubscription(subscription) + if err != nil { + return err } } } @@ -215,17 +266,28 @@ func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, e for _, ns := range config.Namespaces { eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{}) - if err != nil && !apierrs.IsNotFound(err) { - return err + + if apierrs.IsNotFound(err) { + continue } - if err == nil { - for _, eventType := range eventTypes.Items { - if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) { - err := g.AddEventType(eventType) - if err != nil { - return err - } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return fmt.Errorf("failed to list eventtypes: %w", err) + } + logger.Warn("failed to list eventtypes while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return fmt.Errorf("failed to list eventtypes: %w", err) + } + + for _, eventType := range eventTypes.Items { + if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) { + err := g.AddEventType(eventType) + if err != nil { + return err } } } @@ -415,12 +477,21 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger Resource: "customresourcedefinitions", }, ).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()}) - if err != nil { - if errors.IsNotFound(err) { - return nil, nil - } else { - return nil, fmt.Errorf("unable to list source CRDs: %w", err) + + if apierrs.IsNotFound(err) { + return nil, nil + } + + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return nil, fmt.Errorf("failed to list source CRDs: %w", err) } + logger.Warn("failed to list source CRDs while constructing lineage graph", zap.Error(err)) + return nil, nil + } + + if err != nil { + return nil, fmt.Errorf("failed to list source CRDs: %w", err) } duckSources := []duckv1.Source{} @@ -434,18 +505,31 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger for _, ns := range config.Namespaces { sourcesList, err := config.DynamicClient.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{}) - if err != nil { - // just log and continue, we may succeed for other sources - logger.Warn("Failed to list sources", zap.Error(err)) + + if apierrs.IsNotFound(err) { continue } + if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) { + if !config.Lenient { + return nil, fmt.Errorf("failed to list sources: %w", err) + } + logger.Warn("failed to list sources while constructing lineage graph", zap.Error(err)) + continue + } + + if err != nil { + return nil, fmt.Errorf("failed to list sources: %w", err) + } + for i := range sourcesList.Items { unstructuredSource := sourcesList.Items[i] duckSource, err := duckSourceFromUnstructured(&unstructuredSource) - if err == nil { - duckSources = append(duckSources, duckSource) + if err != nil { + return nil, fmt.Errorf("failed to convert unstructured source to duck source: %w", err) } + + duckSources = append(duckSources, duckSource) } } }