Skip to content

Commit

Permalink
Fix CI flakes (#3852)
Browse files Browse the repository at this point in the history
* Disable chaos duck

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use deployment for kafka-broker-receiver

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove mappings only after closing the Producer

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Improve LoomKafkaProducer (logging, resource usage)

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Format java

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use onSuccess

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use atomics for ReferenceCounting

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Log consumer context when sending events

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Async appender

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix namespaced broker unit tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use STS for broker receiver

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Refactor getting features config in triggerv2 using ctx

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Detect audience and identity SA changes and refactor sender and token provider

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Return early if Producer.send returns an exception

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* format Go code

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Increase timeout

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Migrate to async appender even in official configuration

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Apr 30, 2024
1 parent 561b4ec commit 79c839c
Show file tree
Hide file tree
Showing 27 changed files with 327 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type kafkaDeploymentDeleter struct {

func (k *kafkaDeploymentDeleter) DeleteBrokerDeployments(ctx context.Context) error {
deployments := []string{
"kafka-broker-receiver",
"kafka-broker-dispatcher",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ data:
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
10 changes: 10 additions & 0 deletions control-plane/pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"
)

const (
Expand Down Expand Up @@ -217,3 +218,12 @@ func executeTemplateToString(template template.Template, metadata v1.ObjectMeta,

return result.String(), nil
}

type Stores []reconciler.ConfigStore

func (css Stores) ToContext(ctx context.Context) context.Context {
for _, cs := range css {
ctx = cs.ToContext(ctx)
}
return ctx
}
44 changes: 42 additions & 2 deletions control-plane/pkg/reconciler/broker/namespaced_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type NamespacedReconciler struct {
ServiceAccountLister corelisters.ServiceAccountLister
ServiceLister corelisters.ServiceLister
ClusterRoleBindingLister rbaclisters.ClusterRoleBindingLister
DeploymentLister appslisters.DeploymentLister
StatefulSetLister appslisters.StatefulSetLister
BrokerLister eventinglisters.BrokerLister

Expand Down Expand Up @@ -316,12 +317,18 @@ func (r *NamespacedReconciler) getManifestFromSystemNamespace(broker *eventing.B
}
resources = append(resources, additionalConfigMaps...)

additionalDeployments, err := r.statefulSetsFromSystemNamespace(broker)
additionalDeployments, err := r.deploymentsFromSystemNamespace(broker)
if err != nil {
return mf.Manifest{}, err
}
resources = append(resources, additionalDeployments...)

additionalStatefulsets, err := r.statefulSetsFromSystemNamespace(broker)
if err != nil {
return mf.Manifest{}, err
}
resources = append(resources, additionalStatefulsets...)

additionalServiceAccounts, err := r.serviceAccountsFromSystemNamespace(broker)
if err != nil {
return mf.Manifest{}, err
Expand Down Expand Up @@ -368,7 +375,6 @@ func (r *NamespacedReconciler) getManifestFromAdditionalResources(broker *eventi

func (r *NamespacedReconciler) statefulSetsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
deployments := []string{
"kafka-broker-receiver",
"kafka-broker-dispatcher",
}
resources := make([]unstructured.Unstructured, 0, len(deployments))
Expand All @@ -382,6 +388,21 @@ func (r *NamespacedReconciler) statefulSetsFromSystemNamespace(broker *eventing.
return resources, nil
}

func (r *NamespacedReconciler) deploymentsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
deployments := []string{
"kafka-broker-receiver",
}
resources := make([]unstructured.Unstructured, 0, len(deployments))
for _, name := range deployments {
resource, err := r.createManifestFromSystemDeployment(broker, name)
if err != nil {
return nil, err
}
resources = append(resources, resource)
}
return resources, nil
}

func (r *NamespacedReconciler) configMapsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
configMaps := []string{
"config-kafka-broker-data-plane",
Expand Down Expand Up @@ -472,6 +493,25 @@ func (r *NamespacedReconciler) createManifestFromSystemStatefulSet(broker *event
return unstructuredFromObject(cm)
}

func (r *NamespacedReconciler) createManifestFromSystemDeployment(broker *eventing.Broker, name string) (unstructured.Unstructured, error) {
sysDeployment, err := r.DeploymentLister.Deployments(r.SystemNamespace).Get(name)
if err != nil {
return unstructured.Unstructured{}, fmt.Errorf("failed to get Deployment %s/%s: %w", r.SystemNamespace, name, err)
}

cm := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: appsv1.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Namespace: broker.GetNamespace(),
Name: sysDeployment.Name,
Labels: sysDeployment.Labels,
Annotations: sysDeployment.Annotations,
},
Spec: sysDeployment.Spec,
}
return unstructuredFromObject(cm)
}

func (r *NamespacedReconciler) serviceAccountsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
serviceAccounts := []string{
"knative-kafka-broker-data-plane",
Expand Down
10 changes: 6 additions & 4 deletions control-plane/pkg/reconciler/broker/namespaced_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ import (
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"

duckv1 "knative.dev/pkg/apis/duck/v1"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
. "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
. "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

var (
Expand Down Expand Up @@ -121,7 +122,7 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env)
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
NewStatefulSet("kafka-broker-receiver", SystemNamespace),
NewDeployment("kafka-broker-receiver", SystemNamespace),
NewStatefulSet("kafka-broker-dispatcher", SystemNamespace),
NewServiceAccount(SystemNamespace, "knative-kafka-broker-data-plane"),
reconcilertesting.NewService("kafka-broker-ingress", SystemNamespace),
Expand Down Expand Up @@ -199,7 +200,7 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env)
WithNamespacedLabel,
),
ToManifestivalResource(t,
NewStatefulSet("kafka-broker-receiver", BrokerNamespace),
NewDeployment("kafka-broker-receiver", BrokerNamespace),
WithNamespacedBrokerOwnerRef,
WithNamespacedLabel,
),
Expand Down Expand Up @@ -371,7 +372,7 @@ func namespacedBrokerFinalization(t *testing.T, format string, env config.Env) {
reconcilertesting.NewConfigMap("config-tracing", SystemNamespace),
reconcilertesting.NewConfigMap("config-features", SystemNamespace),
reconcilertesting.NewConfigMap("kafka-config-logging", SystemNamespace),
NewStatefulSet("kafka-broker-receiver", SystemNamespace),
NewDeployment("kafka-broker-receiver", SystemNamespace),
NewStatefulSet("kafka-broker-dispatcher", SystemNamespace),
NewServiceAccount(SystemNamespace, "knative-kafka-broker-data-plane"),
reconcilertesting.NewService("kafka-broker-ingress", SystemNamespace),
Expand Down Expand Up @@ -503,6 +504,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) {
NamespaceLister: listers.GetNamespaceLister(),
ConfigMapLister: listers.GetConfigMapLister(),
StatefulSetLister: listers.GetStatefulSetLister(),
DeploymentLister: listers.GetDeploymentLister(),
BrokerLister: listers.GetBrokerLister(),
ServiceAccountLister: listers.GetServiceAccountLister(),
ServiceLister: listers.GetServiceLister(),
Expand Down
15 changes: 13 additions & 2 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"net/http"
"time"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/network"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"

"knative.dev/eventing-kafka-broker/control-plane/pkg/util"

mfclient "github.com/manifestival/client-go-client"
Expand All @@ -52,6 +53,7 @@ import (

brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
namespaceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
Expand Down Expand Up @@ -108,6 +110,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
ServiceLister: serviceinformer.Get(ctx).Lister(),
ClusterRoleBindingLister: clusterrolebindinginformer.Get(ctx).Lister(),
StatefulSetLister: statefulsetinformer.Get(ctx).Lister(),
DeploymentLister: deploymentinformer.Get(ctx).Lister(),
BrokerLister: brokerinformer.Get(ctx).Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
Expand Down Expand Up @@ -176,13 +179,21 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
statefulsetinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: kafka.FilterAny(
kafka.FilterWithLabel("app", "kafka-broker-dispatcher"),
kafka.FilterWithLabel("app", "kafka-broker-receiver"),
),
Handler: controller.HandleAll(controller.EnsureTypeMeta(
globalResync,
appsv1.SchemeGroupVersion.WithKind("StatefulSet"),
)),
})
deploymentinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: kafka.FilterAny(
kafka.FilterWithLabel("app", "kafka-broker-receiver"),
),
Handler: controller.HandleAll(controller.EnsureTypeMeta(
globalResync,
appsv1.SchemeGroupVersion.WithKind("Deployment"),
)),
})

// we set a label for each resource we create and filter things based on that
filterFunc := pkgreconciler.LabelFilterFunc(
Expand Down
54 changes: 54 additions & 0 deletions control-plane/pkg/reconciler/testing/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package testing

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
)

func NewDeployment(name, namespace string, sso ...reconcilertesting.DeploymentOption) *appsv1.Deployment {
ss := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{}},
Volumes: []corev1.Volume{{
Name: "contract-resources",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "kafka-broker-brokers-triggers",
},
},
},
}},
},
},
},
}
for _, opt := range sso {
opt(ss)
}
return ss
}
37 changes: 13 additions & 24 deletions control-plane/pkg/reconciler/trigger/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ import (
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"

apiseventing "knative.dev/eventing/pkg/apis/eventing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client"
consumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger"

apiseventing "knative.dev/eventing/pkg/apis/eventing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"

kubeclient "knative.dev/pkg/client/injection/kube/client"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
Expand All @@ -69,20 +68,19 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

var globalResync func()

flagsHolder := trigger.FlagsHolder{}

featureStore := feature.NewStore(logger.Sugar().Named("feature-config-store"), func(name string, value interface{}) {
flags, ok := value.(feature.Flags)
if ok {
flagsHolder.FlagsLock.Lock()
defer flagsHolder.FlagsLock.Unlock()
flagsHolder.Flags = flags
coreFeatureStore := feature.NewStore(logger.Sugar().Named("feature-config-store"), func(_ string, _ interface{}) {
if globalResync != nil {
globalResync()
}
})
coreFeatureStore.WatchConfigs(watcher)

kafkaFeatureStore := apisconfig.NewStore(ctx, func(_ string, _ *apisconfig.KafkaFeatureFlags) {
if globalResync != nil {
globalResync()
}
})
featureStore.WatchConfigs(watcher)
kafkaFeatureStore.WatchConfigs(watcher)

reconciler := &Reconciler{
BrokerLister: brokerInformer.Lister(),
Expand All @@ -94,27 +92,18 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
InternalsClient: consumergroupclient.Get(ctx),
SecretLister: secretinformer.Get(ctx).Lister(),
KubeClient: kubeclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
FlagsHolder: &flagsHolder,
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: apisconfig.Stores{coreFeatureStore, kafkaFeatureStore},
FinalizerName: FinalizerName,
AgentName: ControllerAgentName,
SkipStatusUpdates: false,
PromoteFilterFunc: filterTriggers(reconciler.BrokerLister),
}
})

kafkaFeatureStore := apisconfig.NewStore(ctx, func(_ string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
if globalResync != nil {
globalResync()
}
})
kafkaFeatureStore.WatchConfigs(watcher)

globalResync = func() {
impl.FilteredGlobalResync(filterTriggers(reconciler.BrokerLister), triggerInformer.Informer())
}
Expand Down
Loading

0 comments on commit 79c839c

Please sign in to comment.