Skip to content

Commit

Permalink
E2E tests for sink: TLS key pair rotation (#3395)
Browse files Browse the repository at this point in the history
* Exposing the 8443 port for https

* Add the test implementation

* Fix the custom resource definition

Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Fix the custom resource definition

Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Go formt and imports

* Adding the configmap watcher feature store

* Formatting issue

---------

Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
Leo6Leo and pierDipi authored Oct 18, 2023
1 parent 794302d commit 660893a
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,31 @@ spec:
type: object
properties:
address:
description: 'Kafka Sink is Addressable. It exposes the endpoint as an
URI to get events delivered to a Kafka topic.'
description: Kafka Sink is Addressable. It exposes the endpoint as an URI to get events delivered into the Kafka topic.
type: object
properties:
name:
type: string
url:
type: string
CACerts:
type: string
audience:
type: string
addresses:
description: Kafka Sink is Addressable. It exposes the endpoints as URIs to get events delivered into the Kafka topic.
type: array
items:
type: object
properties:
name:
type: string
url:
type: string
CACerts:
type: string
audience:
type: string
annotations:
description: 'Annotations is additional Status fields for the Resource
to save some additional State as well as convey more information
Expand Down
10 changes: 8 additions & 2 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
)

func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env) *controller.Impl {
func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl {

eventing.RegisterConditionSet(base.IngressConditionSet)

Expand Down Expand Up @@ -74,12 +74,18 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env
)
}

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(watcher)

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
logger.Warn("failed to get CA certs when at least one address uses TLS", zap.Error(err))
}
impl := sinkreconciler.NewImpl(ctx, reconciler)
impl := sinkreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore}
})
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Prober, err = prober.NewComposite(ctx, configs.IngressPodPort, configs.IngressPodTlsPort, IPsLister, impl.EnqueueKey, &caCerts)
Expand Down
9 changes: 8 additions & 1 deletion control-plane/pkg/reconciler/sink/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package sink
import (
"testing"

"knative.dev/pkg/configmap"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -58,7 +60,12 @@ func TestNewController(t *testing.T) {
Type: corev1.SecretTypeTLS,
})

controller := NewController(ctx, nil, &config.Env{
controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
},
), &config.Env{
IngressPodPort: "8080",
})

Expand Down
4 changes: 4 additions & 0 deletions data-plane/config/sink/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ spec:
port: 8080
protocol: TCP
targetPort: 8080
- name: https-container
port: 8443
protocol: TCP
targetPort: 8443
- name: https
port: 443
protocol: TCP
Expand Down
48 changes: 48 additions & 0 deletions test/e2e_new/sink_eventing_tls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build e2e
// +build e2e

/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_new

import (
"testing"
"time"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)

func TestSinkTLSCARotation(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.Test(ctx, t, features.RotateSinkTLSCertificates(ctx))
}
89 changes: 89 additions & 0 deletions test/rekt/features/sink_tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"context"
"time"

cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
"knative.dev/eventing/test/rekt/features/featureflags"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/resources/certificate"
)

func RotateSinkTLSCertificates(ctx context.Context) *feature.Feature {

sink := feature.MakeRandomK8sName("sink")
source := feature.MakeRandomK8sName("source")
ingressCertificateName := "kafka-sink-ingress-server-tls"

f := feature.NewFeatureNamed("Rotate Kafka Sink TLS certificate")

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

topic := feature.MakeRandomK8sName("topic")

f.Setup("install kafka topic", kafkatopic.Install(topic))
f.Setup("topic is ready", kafkatopic.IsReady(topic))

f.Setup("Install kafkasink", kafkasink.Install(sink, topic, testpkg.BootstrapServersPlaintextArr,
kafkasink.WithNumPartitions(10),
kafkasink.WithReplicationFactor(1)))
f.Setup("KafkaSink is ready", kafkasink.IsReady(sink))

f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{
Certificate: types.NamespacedName{
Namespace: system.Namespace(),
Name: ingressCertificateName,
},
}))

f.Setup("Sink has HTTPS address", kafkasink.ValidateAddress(sink, addressable.AssertHTTPSAddress))
event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Requirement("install source", eventshub.Install(source,
eventshub.StartSenderToResourceTLS(kafkasink.GVR(), sink, nil),
eventshub.InputEvent(event),
// Send multiple events so that we take into account that the certificate rotation might
// be detected by the server after some time.
eventshub.AddSequence,
eventshub.SendMultipleEvents(100, 3*time.Second),
))
f.Assert("Event sent", assert.OnStore(source).
MatchSentEvent(cetest.HasId(event.ID())).
AtLeast(1),
)

f.Assert("Source match updated peer certificate", assert.OnStore(source).
MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressCertificateName, "tls.crt")).
AtLeast(1),
)

return f
}
22 changes: 22 additions & 0 deletions test/rekt/resources/kafkasink/kafkasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"time"

"knative.dev/eventing/test/rekt/resources/addressable"

"k8s.io/apimachinery/pkg/runtime/schema"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/feature"
Expand Down Expand Up @@ -94,3 +96,23 @@ func AsKReference(name string, namespace string) *duckv1.KReference {
Name: name,
}
}

// Address returns a sink's address.
func Address(ctx context.Context, name string, timings ...time.Duration) (*duckv1.Addressable, error) {
return addressable.Address(ctx, GVR(), name, timings...)
}

// ValidateAddress validates the address retured by Address
func ValidateAddress(name string, validate addressable.ValidateAddress, timings ...time.Duration) feature.StepFn {
return func(ctx context.Context, t feature.T) {
addr, err := Address(ctx, name, timings...)
if err != nil {
t.Error(err)
return
}
if err := validate(addr); err != nil {
t.Error(err)
return
}
}
}

0 comments on commit 660893a

Please sign in to comment.