Skip to content

Commit

Permalink
Don't swallow errors in Lineage package (#8401)
Browse files Browse the repository at this point in the history
* Don't swallow errors in Lineage package

Signed-off-by: Ali Ok <[email protected]>

* Better error handling and make the fail fast configurable

Signed-off-by: Ali Ok <[email protected]>

---------

Signed-off-by: Ali Ok <[email protected]>
  • Loading branch information
aliok authored Jan 8, 2025
1 parent 81a37ff commit bb6c53c
Showing 1 changed file with 117 additions and 56 deletions.
173 changes: 117 additions & 56 deletions pkg/graph/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -37,6 +36,15 @@ import (
)

type ConstructorConfig struct {
// Lenient will cause the graph building to ignore non-fatal errors that occur while listing resources.
//
// For example, if brokers cannot be listed due to a permission error, the graph will still be built with other
// resources.
//
// However, if the brokers cannot be listed due to a network error, the graph building will stop and return an error
// regardless of the Lenient setting.
Lenient bool

EventingClient *eventingclient.Clientset
DynamicClient *dynamic.DynamicClient
Namespaces []string
Expand Down Expand Up @@ -97,19 +105,26 @@ func (g *Graph) fetchBrokers(ctx context.Context, config ConstructorConfig, even

for _, ns := range config.Namespaces {
brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list brokers: %w", err)
}
logger.Warn("failed to list brokers while constructing lineage graph", zap.Error(err))
continue
}

if err == nil {
for _, broker := range brokers.Items {
if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) {
g.AddBroker(broker)
}
if err != nil {
return fmt.Errorf("failed to list brokers: %w", err)
}

for _, broker := range brokers.Items {
if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) {
g.AddBroker(broker)
}
}
}
Expand All @@ -123,21 +138,27 @@ func (g *Graph) fetchChannels(ctx context.Context, config ConstructorConfig, eve
}

for _, ns := range config.Namespaces {

channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list channels: %w", err)
}
logger.Warn("failed to list channels while constructing lineage graph", zap.Error(err))
continue
}

if err == nil {
for _, channel := range channels.Items {
if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) {
g.AddChannel(channel)
}
if err != nil {
return fmt.Errorf("failed to list channels: %w", err)
}

for _, channel := range channels.Items {
if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) {
g.AddChannel(channel)
}
}
}
Expand Down Expand Up @@ -171,21 +192,28 @@ func (g *Graph) fetchTriggers(ctx context.Context, config ConstructorConfig, eve

for _, ns := range config.Namespaces {
triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list triggers: %w", err)
}
logger.Warn("failed to list triggers while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list triggers: %w", err)
}

if err == nil {
for _, trigger := range triggers.Items {
if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) {
err := g.AddTrigger(trigger)
if err != nil {
return err
}
for _, trigger := range triggers.Items {
if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) {
err := g.AddTrigger(trigger)
if err != nil {
return err
}
}
}
Expand All @@ -201,21 +229,28 @@ func (g *Graph) fetchSubscriptions(ctx context.Context, config ConstructorConfig

for _, ns := range config.Namespaces {
subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list subscriptions: %w", err)
}
logger.Warn("failed to list subscriptions while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list subscriptions: %w", err)
}

if err == nil {
for _, subscription := range subscriptions.Items {
if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) {
err := g.AddSubscription(subscription)
if err != nil {
return err
}
for _, subscription := range subscriptions.Items {
if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) {
err := g.AddSubscription(subscription)
if err != nil {
return err
}
}
}
Expand All @@ -231,21 +266,28 @@ func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, e

for _, ns := range config.Namespaces {
eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list eventtypes: %w", err)
}
logger.Warn("failed to list eventtypes while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list eventtypes: %w", err)
}

if err == nil {
for _, eventType := range eventTypes.Items {
if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) {
err := g.AddEventType(eventType)
if err != nil {
return err
}
for _, eventType := range eventTypes.Items {
if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) {
err := g.AddEventType(eventType)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -435,15 +477,21 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger
Resource: "customresourcedefinitions",
},
).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()})
if err != nil {
if errors.IsNotFound(err) || errors.IsUnauthorized(err) || errors.IsForbidden(err) {
logger.Warn("failed to list source CRDs", zap.Error(err))
// no need to keep processing here, but also this isn't an error that should stop us from
// continuing to build the graph
return nil, nil
} else {
return nil, fmt.Errorf("unable to list source CRDs: %w", err)

if apierrs.IsNotFound(err) {
return nil, nil
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return nil, fmt.Errorf("failed to list source CRDs: %w", err)
}
logger.Warn("failed to list source CRDs while constructing lineage graph", zap.Error(err))
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("failed to list source CRDs: %w", err)
}

duckSources := []duckv1.Source{}
Expand All @@ -457,18 +505,31 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger

for _, ns := range config.Namespaces {
sourcesList, err := config.DynamicClient.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{})
if err != nil {
// just log and continue, we may succeed for other sources
logger.Warn("Failed to list sources", zap.Error(err))

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return nil, fmt.Errorf("failed to list sources: %w", err)
}
logger.Warn("failed to list sources while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return nil, fmt.Errorf("failed to list sources: %w", err)
}

for i := range sourcesList.Items {
unstructuredSource := sourcesList.Items[i]
duckSource, err := duckSourceFromUnstructured(&unstructuredSource)
if err == nil {
duckSources = append(duckSources, duckSource)
if err != nil {
return nil, fmt.Errorf("failed to convert unstructured source to duck source: %w", err)
}

duckSources = append(duckSources, duckSource)
}
}
}
Expand Down

0 comments on commit bb6c53c

Please sign in to comment.