From 613247777fad35c5191aaa6a8f5c213d68651cde Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 8 Jan 2025 09:29:04 +0000 Subject: [PATCH] Enable fairness on Kinesis Source's SynchronousQueue This fixes a possible problem in which some Kinesis shards progress more quickly than others. Each KCL shard processor has its own thread, and each calls `queue.put()` on the synchronous queue. It is possible that by luck some shard/threads were having their `put()` accepted by a consumer more quickly. --- .../snowplow/sources/kinesis/KinesisSource.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 988b1dd..1b01614 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -42,11 +42,14 @@ object KinesisSource { } } + // We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate. + private val synchronousQueueFairness: Boolean = true + private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, liveness: Ref[F, FiniteDuration] ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { - val actionQueue = new SynchronousQueue[KCLAction]() + val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness) for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat