Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure first window is always small #106

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ object EventProcessingConfig {
* Controls how many windows are allowed to start eagerly ahead of an earlier window that is
* still being finalized. For example, if numEagerWindows=2 then window 42 is allowed to start
* while windows 40 and 41 are still finalizing.
*
* The `firstWindowScaling` lies in the range 0.25 to 0.5. This range comes from experience with
* the lake loader: 1. Helps the app quickly reach a stable cpu usage; 2. Avoids a problem in
* which the loader pulls in more events in the first window than what it can possibly sink within
* the second window.
*/
case class TimedWindows(
duration: FiniteDuration,
Expand All @@ -62,7 +67,7 @@ object EventProcessingConfig {
for {
random <- Random.scalaUtilRandom
factor <- random.nextDouble
} yield TimedWindows(duration, factor, numEagerWindows)
} yield TimedWindows(duration, 0.25 * (1.0 + factor), numEagerWindows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is betweenDouble in Random. Would that work here and maybe make the range a bit more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted. Nice idea.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ private[sources] object LowLevelSource {
* by flushing all pending events to S3 and then sending the SQS message.
*/
private def timedWindows[F[_]: Async, A](config: EventProcessingConfig.TimedWindows): Pipe[F, A, Stream[F, A]] = {
def go(timedPull: Pull.Timed[F, A], current: Option[Queue[F, Option[A]]]): Pull[F, Stream[F, A], Unit] =
def go(
timedPull: Pull.Timed[F, A],
current: Option[Queue[F, Option[A]]],
nextDuration: FiniteDuration
): Pull[F, Stream[F, A], Unit] =
timedPull.uncons.attempt.flatMap {
case Right(None) =>
current match {
Expand All @@ -292,8 +296,8 @@ private[sources] object LowLevelSource {
}
case Right(Some((Left(_), next))) =>
current match {
case None => go(next, None)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None)
case None => go(next, None, nextDuration)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None, nextDuration)
}
case Right(Some((Right(chunk), next))) =>
current match {
Expand All @@ -302,11 +306,11 @@ private[sources] object LowLevelSource {
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
_ <- Pull.eval(chunk.traverse(a => q.offer(Some(a))))
_ <- Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration)
} yield go(next, Some(q))
_ <- Pull.eval(Logger[F].info(s"Opening new window with duration $nextDuration")) >> next.timeout(nextDuration)
} yield go(next, Some(q), (nextDuration * 2).min(config.duration))
pull.flatten
case Some(q) =>
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q))
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q), nextDuration)
}
case Left(throwable) =>
current match {
Expand All @@ -324,7 +328,7 @@ private[sources] object LowLevelSource {
_ <- timedPull.timeout(timeout)
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
} yield go(timedPull, Some(q))
} yield go(timedPull, Some(q), (2 * timeout).min(config.duration))
pull.flatten
}
.stream
Expand All @@ -338,6 +342,6 @@ private[sources] object LowLevelSource {
* time. All instances in the group should end windows at slightly different times, so that
* downstream gets a more steady flow of completed batches.
*/
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows) =
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration =
(config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig))
processor = windowedProcessor(refActions, testConfig)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(91.seconds)
_ <- IO.sleep(131.seconds)
_ <- fiber.cancel
result <- refActions.get
} yield result must beEqualTo(
Expand All @@ -490,15 +490,21 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
Action.ProcessorReceivedEvents("1970-01-01T00:00:22Z", List("5", "6")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:33Z", List("7", "8")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:44Z", List("9", "10")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:00:52Z"),
Action.Checkpointed(List("5", "6", "7", "8", "9", "10")),
Action.ProcessorStartedWindow("1970-01-01T00:00:55Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:00:55Z", List("11", "12")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:06Z", List("13", "14")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:17Z", List("15", "16")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:22Z"),
Action.Checkpointed(List("5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16")),
Action.ProcessorStartedWindow("1970-01-01T00:01:28Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:01:28Z", List("17", "18")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:31Z"),
Action.Checkpointed(List("17", "18"))
Action.ProcessorReceivedEvents("1970-01-01T00:01:39Z", List("19", "20")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:50Z", List("21", "22")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:55Z"),
Action.Checkpointed(List("11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22")),
Action.ProcessorStartedWindow("1970-01-01T00:02:01Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:02:01Z", List("23", "24")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:11Z"),
Action.Checkpointed(List("23", "24"))
)
)

Expand Down
Loading