From 299216b6c444ee1dcd6987ce9865e892b68bbe2b Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 29 Jan 2024 14:10:54 +0300 Subject: [PATCH] Use different auth callback classes with good/bad sinks --- kafka/src/main/resources/application.conf | 1 - .../KafkaCollector.scala | 10 ++++++++-- .../sinks/AzureAuthenticationCallbackHandler.scala | 13 ++++++++++++- .../sinks/KafkaSink.scala | 9 ++++++--- .../KafkaConfigSpec.scala | 14 ++++++-------- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index 4ed4706ca..b0eb74a64 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -15,7 +15,6 @@ collector { "security.protocol" = "SASL_SSL" "sasl.mechanism" = "OAUTHBEARER" "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" - "sasl.login.callback.handler.class": "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" } } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index e162f7a23..77bdba300 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -19,8 +19,14 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- KafkaSink.create[IO](config.good) - bad <- KafkaSink.create[IO](config.bad) + good <- KafkaSink.create[IO]( + config.good, + classOf[GoodAzureAuthenticationCallbackHandler].getName + ) + bad <- KafkaSink.create[IO]( + config.bad, + classOf[BadAzureAuthenticationCallbackHandler].getName + ) } yield Sinks(good, bad) override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] = diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala index d907b8394..15feaf8e1 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala @@ -28,6 +28,16 @@ import com.azure.core.credential.TokenRequestContext import com.nimbusds.jwt.JWTParser +// We need separate instances of callback handler with good and bad sink because +// they need different tokens to authenticate. However we are only giving class name to +// Kafka and it initializes the class itself and if we pass same class name for both sinks, +// Kafka initializes and uses only one instance of the callback handler. To create two +// separate instances, we created two different classes and pass their names to respective +// sink's properties. With this way, both sinks will have their own callback handler instance. +class GoodAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler + +class BadAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler + class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler { val credentials = new DefaultAzureCredentialBuilder().build() @@ -51,7 +61,8 @@ class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler { case None => throw new Exception("Empty bootstrap servers list") } val uri = URI.create("https://" + bootstrapServer) - this.sbUri = uri.getScheme + "://" + uri.getHost + // Workload identity works with '.default' scope + this.sbUri = s"${uri.getScheme}://${uri.getHost}/.default" } override def handle(callbacks: Array[Callback]): Unit = diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 0280ecf00..a3023fd36 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -62,10 +62,11 @@ class KafkaSink[F[_]: Sync]( object KafkaSink { def create[F[_]: Sync]( - sinkConfig: Config.Sink[KafkaSinkConfig] + sinkConfig: Config.Sink[KafkaSinkConfig], + authCallbackClass: String ): Resource[F, KafkaSink[F]] = for { - kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer) + kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass) kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name) } yield kafkaSink @@ -77,7 +78,8 @@ object KafkaSink { */ private def createProducer[F[_]: Sync]( kafkaConfig: KafkaSinkConfig, - bufferConfig: Config.Buffer + bufferConfig: Config.Buffer, + authCallbackClass: String ): Resource[F, KafkaProducer[String, Array[Byte]]] = { val acquire = Sync[F].delay { val props = new Properties() @@ -88,6 +90,7 @@ object KafkaSink { props.setProperty("linger.ms", bufferConfig.timeLimit.toString) props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") + props.setProperty("sasl.login.callback.handler.class", authCallbackClass) // Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418 kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 431df9968..74d92d742 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -126,10 +126,9 @@ object KafkaConfigSpec { retries = 10, producerConf = Some( Map( - "security.protocol" -> "SASL_SSL", - "sasl.mechanism" -> "OAUTHBEARER", - "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", - "sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) ) ) @@ -147,10 +146,9 @@ object KafkaConfigSpec { retries = 10, producerConf = Some( Map( - "security.protocol" -> "SASL_SSL", - "sasl.mechanism" -> "OAUTHBEARER", - "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", - "sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) ) )