Skip to content

Commit

Permalink
Enable fairness on Kinesis Source's SynchronousQueue
Browse files Browse the repository at this point in the history
This fixes a possible problem in which some Kinesis shards progress more
quickly than others. Each KCL shard processor has its own thread, and
each calls `queue.put()` on the synchronous queue.  It is possible that
by luck some shard/threads were having their `put()` accepted by a
consumer more quickly.
  • Loading branch information
istreeter committed Jan 8, 2025
1 parent d5f509b commit 8fcbbc6
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object KinesisSource {
config: KinesisSourceConfig,
liveness: Ref[F, FiniteDuration]
): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = {
val actionQueue = new SynchronousQueue[KCLAction]()
val actionQueue = new SynchronousQueue[KCLAction](true)
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
Expand Down

0 comments on commit 8fcbbc6

Please sign in to comment.