Skip to content

Commit

Permalink
Allow enabling sarama logging and disabling client pool (#4103) (#4108)
Browse files Browse the repository at this point in the history
Add 3 new environment variables:
```
ENABLE_SARAMA_LOGGER       (default: false)
ENABLE_SARAMA_DEBUG_LOGGER (default: false)
ENABLE_SARAMA_CLIENT_POOL  (default: true)
```

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Sep 20, 2024
1 parent d38623c commit f62e51f
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 41 deletions.
14 changes: 13 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package main
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -70,7 +73,16 @@ func main() {
auth.OIDCLabelSelector,
eventing.DispatcherLabelSelectorStr,
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
28 changes: 25 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
"knative.dev/pkg/logging"
)

type KafkaClientKey struct{}
Expand Down Expand Up @@ -63,8 +64,21 @@ type ClientPool struct {
}

type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)

type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
if err != nil {
return nil, err
}
return sarama.NewClusterAdminFromClient(c)
}

func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
}

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
Expand Down Expand Up @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st
}

func Get(ctx context.Context) *ClientPool {
return ctx.Value(ctxKey).(*ClientPool)
v := ctx.Value(ctxKey)
if v == nil {
return nil
}
return v.(*ClientPool)
}

func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
Expand All @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
}

func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err
Expand All @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1
return nil, err
}

saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
saramaClient, err := newSaramaClient(bootstrapServers, config)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
configmapInformer := configmapinformer.Get(ctx)
featureFlags := apisconfig.DefaultFeaturesConfig()

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
10 changes: 7 additions & 3 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
}

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
Expand All @@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
})
Expand Down
24 changes: 14 additions & 10 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -80,14 +78,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.ChannelReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName),
}

clientPool := clientpool.Get(ctx)

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
Expand All @@ -132,17 +130,24 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
PodLister: dispatcherPodInformer.Lister(),
KubeClient: kubeclient.Get(ctx),
NameGenerator: names.SimpleNameGenerator,
GetKafkaClient: clientPool.GetClient,
InitOffsetsFunc: offset.InitOffsets,
SystemNamespace: system.Namespace(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
KedaClient: kedaclient.Get(ctx),
AutoscalerConfig: env.AutoscalerConfigMap,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
r.GetKafkaClient = clientpool.DisabledGetClient
} else {
r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
r.GetKafkaClient = clientPool.GetClient
}

consumerInformer := consumer.Get(ctx)

consumerGroupInformer := consumergroup.Get(ctx)
Expand Down
14 changes: 9 additions & 5 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

configmapInformer := configmapinformer.Get(ctx)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -67,9 +65,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.SinkReceiverLabel,
},
ConfigMapLister: configmapInformer.Lister(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
Env: configs,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
triggerLister := triggerInformer.Lister()
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -97,12 +95,19 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
BrokerClass: kafka.BrokerClass,
DataPlaneConfigMapLabeler: base.NoopConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
reconciler.GetKafkaClient = clientpool.DisabledGetClient
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
reconciler.GetKafkaClient = clientPool.GetClient
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
FinalizerName: FinalizerName,
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
triggerLister := triggerInformer.Lister()
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -87,12 +85,19 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
reconciler.GetKafkaClient = clientpool.DisabledGetClient
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
reconciler.GetKafkaClient = clientPool.GetClient
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
FinalizerName: NamespacedFinalizerName,
Expand Down

0 comments on commit f62e51f

Please sign in to comment.