Skip to content

Commit

Permalink
Use different auth callback classes with good/bad sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jan 29, 2024
1 parent a66e721 commit 299216b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
1 change: 0 additions & 1 deletion kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ collector {
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
"sasl.login.callback.handler.class": "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {

override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- KafkaSink.create[IO](config.good)
bad <- KafkaSink.create[IO](config.bad)
good <- KafkaSink.create[IO](
config.good,
classOf[GoodAzureAuthenticationCallbackHandler].getName
)
bad <- KafkaSink.create[IO](
config.bad,
classOf[BadAzureAuthenticationCallbackHandler].getName
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ import com.azure.core.credential.TokenRequestContext

import com.nimbusds.jwt.JWTParser

// We need separate instances of callback handler with good and bad sink because
// they need different tokens to authenticate. However we are only giving class name to
// Kafka and it initializes the class itself and if we pass same class name for both sinks,
// Kafka initializes and uses only one instance of the callback handler. To create two
// separate instances, we created two different classes and pass their names to respective
// sink's properties. With this way, both sinks will have their own callback handler instance.
class GoodAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler

class BadAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler

class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler {

val credentials = new DefaultAzureCredentialBuilder().build()
Expand All @@ -51,7 +61,8 @@ class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler {
case None => throw new Exception("Empty bootstrap servers list")
}
val uri = URI.create("https://" + bootstrapServer)
this.sbUri = uri.getScheme + "://" + uri.getHost
// Workload identity works with '.default' scope
this.sbUri = s"${uri.getScheme}://${uri.getHost}/.default"
}

override def handle(callbacks: Array[Callback]): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ class KafkaSink[F[_]: Sync](
object KafkaSink {

def create[F[_]: Sync](
sinkConfig: Config.Sink[KafkaSinkConfig]
sinkConfig: Config.Sink[KafkaSinkConfig],
authCallbackClass: String
): Resource[F, KafkaSink[F]] =
for {
kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer)
kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass)
kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name)
} yield kafkaSink

Expand All @@ -77,7 +78,8 @@ object KafkaSink {
*/
private def createProducer[F[_]: Sync](
kafkaConfig: KafkaSinkConfig,
bufferConfig: Config.Buffer
bufferConfig: Config.Buffer,
authCallbackClass: String
): Resource[F, KafkaProducer[String, Array[Byte]]] = {
val acquire = Sync[F].delay {
val props = new Properties()
Expand All @@ -88,6 +90,7 @@ object KafkaSink {
props.setProperty("linger.ms", bufferConfig.timeLimit.toString)
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.setProperty("sasl.login.callback.handler.class", authCallbackClass)

// Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418
kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ object KafkaConfigSpec {
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
)
)
Expand All @@ -147,10 +146,9 @@ object KafkaConfigSpec {
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
)
)
Expand Down

0 comments on commit 299216b

Please sign in to comment.