Skip to content

Commit

Permalink
[release-1.14] Expose OIDC audience of KafkaSink in its status (#4082)
Browse files Browse the repository at this point in the history
* Expose OIDC audience of KafkaSink in its status (#4067)

* Provision audience of KafkaSink

* Add e2e test

* Run full OIDC e2e test suite

* run gofmt and goimports

* Fix build issue

---------

Co-authored-by: Christoph Stäbler <[email protected]>
  • Loading branch information
knative-prow-robot and creydr authored Aug 26, 2024
1 parent 070b72d commit ff0b991
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 5 deletions.
5 changes: 3 additions & 2 deletions control-plane/pkg/contract/contract.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 25 additions & 3 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"time"

"knative.dev/eventing/pkg/auth"
"knative.dev/pkg/logging"

"github.com/IBM/sarama"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -246,9 +249,9 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

logger.Debug("Updated receiver pod annotation")

transportEncryptionFlags := feature.FromContext(ctx)
features := feature.FromContext(ctx)
var addressableStatus duckv1.AddressStatus
if transportEncryptionFlags.IsPermissiveTransportEncryption() {
if features.IsPermissiveTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
Expand All @@ -263,7 +266,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
// - http address with path-based routing
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
} else if features.IsStrictTransportEncryption() {
// Strict mode: (only https addresses)
// - status.address https address with path-based routing
// - status.addresses:
Expand Down Expand Up @@ -302,6 +305,25 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

ks.Status.AddressStatus = addressableStatus

if features.IsOIDCAuthentication() {
audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)
logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience))
ks.Status.Address.Audience = &audience

for i := range ks.Status.Addresses {
ks.Status.Addresses[i].Audience = &audience
}
} else {
logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled")
if ks.Status.Address != nil {
ks.Status.Address.Audience = nil
}

for i := range ks.Status.Addresses {
ks.Status.Addresses[i].Audience = nil
}
}

ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable)

return nil
Expand Down
75 changes: 75 additions & 0 deletions control-plane/pkg/reconciler/sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"io"
"testing"

"knative.dev/eventing/pkg/auth"

"k8s.io/utils/pointer"

duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -49,6 +51,7 @@ import (

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
kafkaeventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
fakeeventingkafkaclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client/fake"
sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/eventing/v1alpha1/kafkasink"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
Expand Down Expand Up @@ -95,6 +98,11 @@ var (
Path: fmt.Sprintf("/%s/%s", SinkNamespace, SinkName),
}

sinkAudience = auth.GetAudience(kafkaeventing.SchemeGroupVersion.WithKind("KafkaSink"), metav1.ObjectMeta{
Name: SinkName,
Namespace: SinkNamespace,
})

errCreateTopic = fmt.Errorf("failed to create topic")

errDeleteTopic = fmt.Errorf("failed to delete topic")
Expand Down Expand Up @@ -1283,6 +1291,73 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
),
},
},
}, {
Name: "Reconciled normal - OIDC enabled - should provision audience",
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
Objects: []runtime.Object{
NewSink(
StatusControllerOwnsTopic(sink.ControllerTopicOwner),
),
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
SinkReceiverPod(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: SinkUUID,
Topics: []string{SinkTopic()},
Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)},
BootstrapServers: bootstrapServers,
Reference: SinkReference(),
},
},
Generation: 1,
}),
SinkReceiverPodUpdate(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
"annotation_to_preserve": "value_to_preserve",
}),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSink(
StatusControllerOwnsTopic(sink.ControllerTopicOwner),
InitSinkConditions,
StatusDataPlaneAvailable,
StatusConfigParsed,
BootstrapServers(bootstrapServersArr),
StatusConfigMapUpdatedReady(&env),
StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner),
SinkAddressable(&env),
StatusProbeSucceeded,
WithSinkAddress(duckv1.Addressable{
Name: pointer.String("http"),
URL: sinkAddress,
Audience: &sinkAudience,
}),
WithSinkAddresses([]duckv1.Addressable{
{
Name: pointer.String("http"),
URL: sinkAddress,
Audience: &sinkAudience,
},
}),
WithSinkAddessable(),
),
},
},
},
}

Expand Down
58 changes: 58 additions & 0 deletions test/e2e_new/sink_auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//go:build e2e
// +build e2e

/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_new

import (
"testing"
"time"

"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"

testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing/test/rekt/features/oidc"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)

func TestKafkaSinkSupportsOIDC(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(4*time.Second, 12*time.Minute),
environment.Managed(t),
eventshub.WithTLS(t),
)

topic := feature.MakeRandomK8sName("topic")
sink := feature.MakeRandomK8sName("kafkasink")
env.Prerequisite(ctx, t, kafkatopic.GoesReady(topic))
env.Prerequisite(ctx, t, kafkasink.GoesReady(sink, topic, testpkg.BootstrapServersPlaintextArr))

env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkasink.GVR(), "KafkaSink", sink, env.Namespace()))
}
11 changes: 11 additions & 0 deletions test/rekt/resources/kafkasink/kafkasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,14 @@ func ValidateAddress(name string, validate addressable.ValidateAddressFn, timing
}
}
}

// GoesReady returns a feature that will create a KafkaSink of the given
// name and topic, and confirm it becomes ready.
func GoesReady(name, topic string, bootstrapServers []string, cfg ...manifest.CfgFn) *feature.Feature {
f := new(feature.Feature)

f.Setup(fmt.Sprintf("install KafkaSink %q", name), Install(name, topic, bootstrapServers, cfg...))
f.Setup("KafkaSink is ready", IsReady(name))

return f
}
11 changes: 11 additions & 0 deletions test/rekt/resources/kafkatopic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,14 @@ func WithClusterNamespace(namespace string) manifest.CfgFn {
cfg["clusterNamespace"] = namespace
}
}

// GoesReady returns a feature that will create a topic of the given
// name and confirm it becomes ready.
func GoesReady(name string, cfg ...manifest.CfgFn) *feature.Feature {
f := new(feature.Feature)

f.Setup(fmt.Sprintf("install Topic %q", name), Install(name, cfg...))
f.Setup("Topic is ready", IsReady(name))

return f
}

0 comments on commit ff0b991

Please sign in to comment.