Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debounce how often we checkpoint progress #109

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Utils {
Some(endpoint),
10.seconds,
BigDecimal(1.0),
BackoffPolicy(100.millis, 1.second)
BackoffPolicy(100.millis, 1.second),
10.seconds
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
Expand Down
1 change: 1 addition & 0 deletions modules/kafka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ snowplow.defaults {
"sasl.mechanism": "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
debounceCommitOffsets: "10 seconds"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.reflect._

import java.nio.ByteBuffer
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -49,6 +49,8 @@ object KafkaSource {

def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
kafkaStream(config, authHandlerClass)

def debounceCheckpoints: FiniteDuration = config.debounceCommitOffsets
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ package com.snowplowanalytics.snowplow.sources.kafka

import io.circe.Decoder
import io.circe.generic.semiauto._
import io.circe.config.syntax._

import scala.concurrent.duration.FiniteDuration

/**
* Config to be supplied from the app's hocon
*
* @param debounceCommitOffsets
* How frequently to commit our progress back to kafka. By increasing this value, we decrease the
* number of requests made to the kafka broker.
*/
case class KafkaSourceConfig(
topicName: String,
bootstrapServers: String,
consumerConf: Map[String, String]
consumerConf: Map[String, String],
debounceCommitOffsets: FiniteDuration
)

object KafkaSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.circe.Decoder
import io.circe.generic.semiauto._
import org.specs2.Specification

import scala.concurrent.duration.DurationLong

class KafkaSourceConfigSpec extends Specification {
import KafkaSourceConfigSpec._

Expand Down Expand Up @@ -50,7 +52,8 @@ class KafkaSourceConfigSpec extends Specification {
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
debounceCommitOffsets = 10.seconds
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ snowplow.defaults: {
minBackoff: "100 millis"
maxBackoff: "1 second"
}
debounceCheckpoints: "10 seconds"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.{CountDownLatch, SynchronousQueue}
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.jdk.CollectionConverters._

object KinesisSource {
Expand All @@ -34,6 +34,8 @@ object KinesisSource {

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def debounceCheckpoints: FiniteDuration = config.debounceCheckpoints
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy
* up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With
* bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we
* increase how aggressively it acquires leases.
* @param debounceCheckpoints
* How frequently to checkpoint our progress to the DynamoDB table. By increasing this value we
* can decrease the write-throughput requirements of the DynamoDB table.
*
* Other params are self-explanatory
*/
Expand All @@ -49,7 +52,8 @@ case class KinesisSourceConfig(
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal,
checkpointThrottledBackoffPolicy: BackoffPolicy
checkpointThrottledBackoffPolicy: BackoffPolicy,
debounceCheckpoints: FiniteDuration
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class KinesisSourceConfigSpec extends Specification {
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
},
"debounceCheckpoints": "42 seconds"
}
"""

Expand All @@ -59,7 +60,8 @@ class KinesisSourceConfigSpec extends Specification {
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
c.debounceCheckpoints must beEqualTo(42.seconds)
).reduce(_ and _)
}
}
Expand All @@ -82,7 +84,8 @@ class KinesisSourceConfigSpec extends Specification {
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
},
"debounceCheckpoints": "42 seconds"
}
"""

Expand All @@ -95,7 +98,8 @@ class KinesisSourceConfigSpec extends Specification {
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
c.debounceCheckpoints must beEqualTo(42.seconds)
).reduce(_ and _)
}
}
Expand Down Expand Up @@ -124,7 +128,8 @@ class KinesisSourceConfigSpec extends Specification {
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
debounceCheckpoints = 10.seconds
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ object PubsubSource {

def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
pubsubStream(config, deferredResources)

def debounceCheckpoints: FiniteDuration = Duration.Zero
}

private def pubsubStream[F[_]: Async](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.sources.internal

import cats.Monad
import cats.{Monad, Semigroup}
import cats.implicits._
import cats.effect.std.Queue
import cats.effect.kernel.{Ref, Unique}
Expand Down Expand Up @@ -49,6 +49,14 @@ private[sources] trait LowLevelSource[F[_], C] {
* `SourceAndAck` reporting itself as unhealthy.
*/
def stream: Stream[F, Stream[F, Option[LowLevelEvents[C]]]]

/**
* How frequently we should checkpoint progress to this source
*
* E.g. for the Kinesis we can increase value to reduce how often we need to write to the DynamoDB
* table
*/
def debounceCheckpoints: FiniteDuration
}

private[sources] object LowLevelSource {
Expand Down Expand Up @@ -118,7 +126,7 @@ private[sources] object LowLevelSource {
.through(windowed(config.windowing))

val sinks = EagerWindows.pipes { control: EagerWindows.Control[F] =>
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control))
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control, source.debounceCheckpoints))
}

tokenedSources
Expand Down Expand Up @@ -223,20 +231,21 @@ private[sources] object LowLevelSource {
* @param control
* Controls the processing of eager windows. Prevents the next eager window from checkpointing
* any events before the previous window is fully finalized.
* @param debounceCheckpoints
* Debounces how often we call the checkpointer.
*/
private def messageSink[F[_]: Async, C](
processor: EventProcessor[F],
ref: Ref[F, AcksState[C]],
checkpointer: Checkpointer[F, C],
control: EagerWindows.Control[F]
control: EagerWindows.Control[F],
debounceCheckpoints: FiniteDuration
): Pipe[F, TokenedEvents, Nothing] =
_.evalTap { case TokenedEvents(events, _) =>
Logger[F].debug(s"Batch of ${events.size} events received from the source stream")
}
.through(processor)
.chunks
.prefetch // This prefetch means we can ack messages concurrently with processing the next batch
.evalTap(_ => control.waitForPreviousWindow)
.evalMap { chunk =>
chunk
.traverse { token =>
Expand All @@ -249,10 +258,14 @@ private[sources] object LowLevelSource {
case None => Async[F].raiseError[C](new IllegalStateException("Missing checkpoint for token"))
}
}
.flatMap { cs =>
checkpointer.ack(checkpointer.combineAll(cs.toIterable))
.map { cs =>
checkpointer.combineAll(cs.toIterable)
}
}
.prefetch // This prefetch means we can ack messages concurrently with processing the next batch
.through(batchUpCheckpoints(debounceCheckpoints, checkpointer))
.evalTap(_ => control.waitForPreviousWindow)
.evalMap(c => checkpointer.ack(c))
.drain
.onFinalizeCase {
case ExitCase.Succeeded =>
Expand Down Expand Up @@ -344,4 +357,38 @@ private[sources] object LowLevelSource {
*/
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration =
(config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds

private def batchUpCheckpoints[F[_]: Async, C](timeout: FiniteDuration, semigroup: Semigroup[C]): Pipe[F, C, C] = {

def go(timedPull: Pull.Timed[F, C], output: Option[C]): Pull[F, C, Unit] =
timedPull.uncons.flatMap {
case None =>
// Upstream finished cleanly. Emit whatever is pending and we're done.
Pull.outputOption1(output)
case Some((Left(_), next)) =>
// Timer timed-out. Emit whatever is pending.
Pull.outputOption1(output) >> go(next, None)
case Some((Right(chunk), next)) =>
// Upstream emitted tokens to us. We might already have pending tokens
output match {
case Some(c) =>
go(next, Some(chunk.foldLeft(c)(semigroup.combine(_, _))))
case None =>
semigroup.combineAllOption(chunk.iterator) match {
case Some(c) =>
next.timeout(timeout) >> go(next, Some(c))
case None =>
go(next, None)
}
}
}

in =>
if (timeout > Duration.Zero)
in.pull.timed { timedPull =>
go(timedPull, None)
}.stream
else
in
}
}
Loading
Loading