Skip to content

Commit

Permalink
mt-broker-filter: reject request for wrong audience (#7427)
Browse files Browse the repository at this point in the history
* mt-broker-filter: reject request for wrong audience

Signed-off-by: pingjiang <[email protected]>

* add subscriptions rules

Signed-off-by: pingjiang <[email protected]>

* remove subscriptions lister

Signed-off-by: pingjiang <[email protected]>

* fix e2e

Signed-off-by: pingjiang <[email protected]>

* move FilterAudience const to pkg/broker/filter

Signed-off-by: pingjiang <[email protected]>

* revert config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml

Signed-off-by: pingjiang <[email protected]>

* revert

Signed-off-by: pingjiang <[email protected]>

---------

Signed-off-by: pingjiang <[email protected]>
  • Loading branch information
xiangpingjiang authored Dec 4, 2023
1 parent 3fcc78a commit 79bb385
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 7 deletions.
4 changes: 3 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func main() {
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)
kubeClient := kubeclient.Get(ctx)

loggingConfig, err := broker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
Expand Down Expand Up @@ -123,7 +124,8 @@ func main() {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
handler, err := filter.NewHandler(logger, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
25 changes: 24 additions & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
// based on what serving is doing. See https://github.com/knative/serving/blob/main/pkg/network/transports.go.
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 100

FilterAudience = "mt-broker-filter"
)

// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
Expand All @@ -77,10 +79,11 @@ type Handler struct {
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
}

// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -132,6 +135,7 @@ func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, t
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
logger: logger,
tokenVerifier: tokenVerifier,
withContext: wc,
filtersMap: fm,
}, nil
Expand Down Expand Up @@ -204,6 +208,25 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")
token := auth.GetJWTFromHeader(request.Header)
if token == "" {
h.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication))
writer.WriteHeader(http.StatusUnauthorized)
return
}

if _, err := h.tokenVerifier.VerifyJWT(ctx, token, FilterAudience); err != nil {
h.logger.Warn("no valid JWT provided", zap.Error(err))
writer.WriteHeader(http.StatusUnauthorized)
return
}

h.logger.Debug("Request contained a valid JWT. Continuing...")
}

reportArgs := &ReportArgs{
ns: t.Namespace,
trigger: t.Name,
Expand Down
4 changes: 4 additions & 0 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func TestReceiver(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
Expand All @@ -447,6 +448,7 @@ func TestReceiver(t *testing.T) {
reporter := &mockReporter{}
r, err := NewHandler(
logger,
oidcTokenVerifier,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
Expand Down Expand Up @@ -616,6 +618,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
Expand All @@ -633,6 +636,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
reporter := &mockReporter{}
r, err := NewHandler(
logger,
oidcTokenVerifier,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
Expand Down
5 changes: 2 additions & 3 deletions pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
Expand All @@ -62,8 +63,6 @@ const (
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
subscriptionCreateFailed = "SubscriptionCreateFailed"
subscriptionGetFailed = "SubscriptionGetFailed"

FilterAudience = "mt-broker-filter"
)

type Reconciler struct {
Expand Down Expand Up @@ -227,7 +226,7 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1
}

if featureFlags.IsOIDCAuthentication() {
dest.Audience = pointer.String(FilterAudience)
dest.Audience = pointer.String(filter.FilterAudience)
}

// Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not
Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/broker/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
. "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/broker/filter"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
rtv1beta2 "knative.dev/eventing/pkg/reconciler/testing/v1beta2"
Expand Down Expand Up @@ -1741,7 +1742,7 @@ func makeServiceURI() *duckv1.Destination {

func makeServiceURIWithAudience() *duckv1.Destination {
dst := makeServiceURI()
dst.Audience = ptr.String(FilterAudience)
dst.Audience = ptr.String(filter.FilterAudience)

return dst
}
Expand Down Expand Up @@ -1859,7 +1860,7 @@ func makeReadySubscription(subscriberNamespace string) *messagingv1.Subscription

func makeReadySubscriptionWithAudience(subscriberNamespace string) *messagingv1.Subscription {
s := makeReadySubscription(subscriberNamespace)
s.Spec.Subscriber.Audience = ptr.String(FilterAudience)
s.Spec.Subscriber.Audience = ptr.String(filter.FilterAudience)
return s
}

Expand Down

0 comments on commit 79bb385

Please sign in to comment.