From 79bb385f8a3d9a03e87942cc5980ce403c5d6be6 Mon Sep 17 00:00:00 2001 From: cola <45722758+xiangpingjiang@users.noreply.github.com> Date: Mon, 4 Dec 2023 21:31:17 +0800 Subject: [PATCH] mt-broker-filter: reject request for wrong audience (#7427) * mt-broker-filter: reject request for wrong audience Signed-off-by: pingjiang * add subscriptions rules Signed-off-by: pingjiang * remove subscriptions lister Signed-off-by: pingjiang * fix e2e Signed-off-by: pingjiang * move FilterAudience const to pkg/broker/filter Signed-off-by: pingjiang * revert config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml Signed-off-by: pingjiang * revert Signed-off-by: pingjiang --------- Signed-off-by: pingjiang --- cmd/broker/filter/main.go | 4 ++- pkg/broker/filter/filter_handler.go | 25 ++++++++++++++++++- pkg/broker/filter/filter_handler_test.go | 4 +++ pkg/reconciler/broker/trigger/trigger.go | 5 ++-- pkg/reconciler/broker/trigger/trigger_test.go | 5 ++-- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 94709479205..ef527bcace3 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -77,6 +77,7 @@ func main() { log.Printf("Registering %d informers", len(injection.Default.GetInformers())) ctx, informers := injection.Default.SetupInformers(ctx, cfg) + ctx = injection.WithConfig(ctx, cfg) kubeClient := kubeclient.Get(ctx) loggingConfig, err := broker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName()) @@ -123,7 +124,8 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - handler, err := filter.NewHandler(logger, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) + handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 60ce851f09e..4e5fe49e093 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -64,6 +64,8 @@ const ( // based on what serving is doing. See https://github.com/knative/serving/blob/main/pkg/network/transports.go. defaultMaxIdleConnections = 1000 defaultMaxIdleConnectionsPerHost = 100 + + FilterAudience = "mt-broker-filter" ) // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. @@ -77,10 +79,11 @@ type Handler struct { logger *zap.Logger withContext func(ctx context.Context) context.Context filtersMap *subscriptionsapi.FiltersMap + tokenVerifier *auth.OIDCTokenVerifier } // NewHandler creates a new Handler and its associated EventReceiver. -func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) { +func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -132,6 +135,7 @@ func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, t eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider), triggerLister: triggerInformer.Lister(), logger: logger, + tokenVerifier: tokenVerifier, withContext: wc, filtersMap: fm, }, nil @@ -204,6 +208,25 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { return } + features := feature.FromContext(ctx) + if features.IsOIDCAuthentication() { + h.logger.Debug("OIDC authentication is enabled") + token := auth.GetJWTFromHeader(request.Header) + if token == "" { + h.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication)) + writer.WriteHeader(http.StatusUnauthorized) + return + } + + if _, err := h.tokenVerifier.VerifyJWT(ctx, token, FilterAudience); err != nil { + h.logger.Warn("no valid JWT provided", zap.Error(err)) + writer.WriteHeader(http.StatusUnauthorized) + return + } + + h.logger.Debug("Request contained a valid JWT. Continuing...") + } + reportArgs := &ReportArgs{ ns: t.Namespace, trigger: t.Name, diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index d7a4c58c14b..4fec820722e 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -431,6 +431,7 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { @@ -447,6 +448,7 @@ func TestReceiver(t *testing.T) { reporter := &mockReporter{} r, err := NewHandler( logger, + oidcTokenVerifier, oidcTokenProvider, triggerinformerfake.Get(ctx), reporter, @@ -616,6 +618,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { @@ -633,6 +636,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { reporter := &mockReporter{} r, err := NewHandler( logger, + oidcTokenVerifier, oidcTokenProvider, triggerinformerfake.Get(ctx), reporter, diff --git a/pkg/reconciler/broker/trigger/trigger.go b/pkg/reconciler/broker/trigger/trigger.go index edbdd057e23..c9071e3e2ee 100644 --- a/pkg/reconciler/broker/trigger/trigger.go +++ b/pkg/reconciler/broker/trigger/trigger.go @@ -46,6 +46,7 @@ import ( "knative.dev/eventing/pkg/apis/feature" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/pkg/broker/filter" clientset "knative.dev/eventing/pkg/client/clientset/versioned" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -62,8 +63,6 @@ const ( subscriptionDeleteFailed = "SubscriptionDeleteFailed" subscriptionCreateFailed = "SubscriptionCreateFailed" subscriptionGetFailed = "SubscriptionGetFailed" - - FilterAudience = "mt-broker-filter" ) type Reconciler struct { @@ -227,7 +226,7 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 } if featureFlags.IsOIDCAuthentication() { - dest.Audience = pointer.String(FilterAudience) + dest.Audience = pointer.String(filter.FilterAudience) } // Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not diff --git a/pkg/reconciler/broker/trigger/trigger_test.go b/pkg/reconciler/broker/trigger/trigger_test.go index 86654d618df..71906abf4f6 100644 --- a/pkg/reconciler/broker/trigger/trigger_test.go +++ b/pkg/reconciler/broker/trigger/trigger_test.go @@ -64,6 +64,7 @@ import ( _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" . "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/broker/filter" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" . "knative.dev/eventing/pkg/reconciler/testing/v1" rtv1beta2 "knative.dev/eventing/pkg/reconciler/testing/v1beta2" @@ -1741,7 +1742,7 @@ func makeServiceURI() *duckv1.Destination { func makeServiceURIWithAudience() *duckv1.Destination { dst := makeServiceURI() - dst.Audience = ptr.String(FilterAudience) + dst.Audience = ptr.String(filter.FilterAudience) return dst } @@ -1859,7 +1860,7 @@ func makeReadySubscription(subscriberNamespace string) *messagingv1.Subscription func makeReadySubscriptionWithAudience(subscriberNamespace string) *messagingv1.Subscription { s := makeReadySubscription(subscriberNamespace) - s.Spec.Subscriber.Audience = ptr.String(FilterAudience) + s.Spec.Subscriber.Audience = ptr.String(filter.FilterAudience) return s }