Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't swallow errors in Lineage package #8401

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 117 additions & 56 deletions pkg/graph/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -37,6 +36,15 @@ import (
)

type ConstructorConfig struct {
// Lenient will cause the graph building to ignore non-fatal errors that occur while listing resources.
//
// For example, if brokers cannot be listed due to a permission error, the graph will still be built with other
// resources.
//
// However, if the brokers cannot be listed due to a network error, the graph building will stop and return an error
// regardless of the Lenient setting.
Lenient bool

EventingClient *eventingclient.Clientset
DynamicClient *dynamic.DynamicClient
Namespaces []string
Expand Down Expand Up @@ -97,19 +105,26 @@ func (g *Graph) fetchBrokers(ctx context.Context, config ConstructorConfig, even

for _, ns := range config.Namespaces {
brokers, err := eventingClient.EventingV1().Brokers(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list brokers: %w", err)
}
logger.Warn("failed to list brokers while constructing lineage graph", zap.Error(err))
continue
}

if err == nil {
for _, broker := range brokers.Items {
if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) {
g.AddBroker(broker)
}
if err != nil {
return fmt.Errorf("failed to list brokers: %w", err)
}

for _, broker := range brokers.Items {
if config.ShouldAddBroker == nil || config.ShouldAddBroker(broker) {
g.AddBroker(broker)
}
}
}
Expand All @@ -123,21 +138,27 @@ func (g *Graph) fetchChannels(ctx context.Context, config ConstructorConfig, eve
}

for _, ns := range config.Namespaces {

channels, err := eventingClient.MessagingV1().Channels(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list channels: %w", err)
}
logger.Warn("failed to list channels while constructing lineage graph", zap.Error(err))
continue
}

if err == nil {
for _, channel := range channels.Items {
if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) {
g.AddChannel(channel)
}
if err != nil {
return fmt.Errorf("failed to list channels: %w", err)
}

for _, channel := range channels.Items {
if config.ShouldAddChannel == nil || config.ShouldAddChannel(channel) {
g.AddChannel(channel)
}
}
}
Expand Down Expand Up @@ -171,21 +192,28 @@ func (g *Graph) fetchTriggers(ctx context.Context, config ConstructorConfig, eve

for _, ns := range config.Namespaces {
triggers, err := eventingClient.EventingV1().Triggers(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list triggers: %w", err)
}
logger.Warn("failed to list triggers while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list triggers: %w", err)
}

if err == nil {
for _, trigger := range triggers.Items {
if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) {
err := g.AddTrigger(trigger)
if err != nil {
return err
}
for _, trigger := range triggers.Items {
if config.ShouldAddTrigger == nil || config.ShouldAddTrigger(trigger) {
err := g.AddTrigger(trigger)
if err != nil {
return err
}
}
}
Expand All @@ -201,21 +229,28 @@ func (g *Graph) fetchSubscriptions(ctx context.Context, config ConstructorConfig

for _, ns := range config.Namespaces {
subscriptions, err := eventingClient.MessagingV1().Subscriptions(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list subscriptions: %w", err)
}
logger.Warn("failed to list subscriptions while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list subscriptions: %w", err)
}

if err == nil {
for _, subscription := range subscriptions.Items {
if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) {
err := g.AddSubscription(subscription)
if err != nil {
return err
}
for _, subscription := range subscriptions.Items {
if config.ShouldAddSubscription == nil || config.ShouldAddSubscription(subscription) {
err := g.AddSubscription(subscription)
if err != nil {
return err
}
}
}
Expand All @@ -231,21 +266,28 @@ func (g *Graph) fetchEventTypes(ctx context.Context, config ConstructorConfig, e

for _, ns := range config.Namespaces {
eventTypes, err := eventingClient.EventingV1beta3().EventTypes(ns).List(ctx, metav1.ListOptions{})
if err != nil && !apierrs.IsNotFound(err) && !apierrs.IsUnauthorized(err) && !apierrs.IsForbidden(err) {
return err

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return fmt.Errorf("failed to list eventtypes: %w", err)
}
logger.Warn("failed to list eventtypes while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return fmt.Errorf("failed to list eventtypes: %w", err)
}

if err == nil {
for _, eventType := range eventTypes.Items {
if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) {
err := g.AddEventType(eventType)
if err != nil {
return err
}
for _, eventType := range eventTypes.Items {
if config.ShouldAddEventType == nil || config.ShouldAddEventType(eventType) {
err := g.AddEventType(eventType)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -435,15 +477,21 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger
Resource: "customresourcedefinitions",
},
).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{"duck.knative.dev/source": "true"}.String()})
if err != nil {
if errors.IsNotFound(err) || errors.IsUnauthorized(err) || errors.IsForbidden(err) {
logger.Warn("failed to list source CRDs", zap.Error(err))
// no need to keep processing here, but also this isn't an error that should stop us from
// continuing to build the graph
return nil, nil
} else {
return nil, fmt.Errorf("unable to list source CRDs: %w", err)

if apierrs.IsNotFound(err) {
return nil, nil
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return nil, fmt.Errorf("failed to list source CRDs: %w", err)
}
logger.Warn("failed to list source CRDs while constructing lineage graph", zap.Error(err))
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("failed to list source CRDs: %w", err)
}

duckSources := []duckv1.Source{}
Expand All @@ -457,18 +505,31 @@ func getSources(ctx context.Context, config ConstructorConfig, logger zap.Logger

for _, ns := range config.Namespaces {
sourcesList, err := config.DynamicClient.Resource(sourceGVR).Namespace(ns).List(ctx, metav1.ListOptions{})
if err != nil {
// just log and continue, we may succeed for other sources
logger.Warn("Failed to list sources", zap.Error(err))

if apierrs.IsNotFound(err) {
continue
}

if apierrs.IsUnauthorized(err) || apierrs.IsForbidden(err) {
if !config.Lenient {
return nil, fmt.Errorf("failed to list sources: %w", err)
}
logger.Warn("failed to list sources while constructing lineage graph", zap.Error(err))
continue
}

if err != nil {
return nil, fmt.Errorf("failed to list sources: %w", err)
}

for i := range sourcesList.Items {
unstructuredSource := sourcesList.Items[i]
duckSource, err := duckSourceFromUnstructured(&unstructuredSource)
if err == nil {
duckSources = append(duckSources, duckSource)
if err != nil {
return nil, fmt.Errorf("failed to convert unstructured source to duck source: %w", err)
}

duckSources = append(duckSources, duckSource)
}
}
Comment on lines 525 to 534
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a behavior change here.

Before:

  • Ignore the issue that the unstructrued source cannot be converted to a duck source

After:

  • Do not ignore it, even with Lenient=true

}
Expand Down
Loading