Skip to content

Commit

Permalink
Kinesis checkpointer should retry on dynamodb provisioned throughput …
Browse files Browse the repository at this point in the history
…exception
  • Loading branch information
istreeter committed Dec 23, 2024
1 parent 86f0316 commit 8c747b5
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.it.kinesis

import cats.effect.{IO, Ref}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import software.amazon.awssdk.core.SdkBytes
Expand All @@ -19,13 +18,13 @@ import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, GetShar

import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy
import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig

import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.DurationLong

import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
Expand Down Expand Up @@ -96,12 +95,13 @@ object Utils {
Some(endpoint),
Some(endpoint),
10.seconds,
BigDecimal(1.0)
BigDecimal(1.0),
BackoffPolicy(100.millis, 1.second)
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
streamName,
BackoffPolicy(FiniteDuration(1, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), None),
BackoffPolicy(1.second, 1.second),
1000,
1000000,
Some(endpoint)
Expand Down
4 changes: 4 additions & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ snowplow.defaults: {
}
leaseDuration: "10 seconds"
maxLeasesToStealAtOneTimeFactor: 2.0
checkpointThrottledBackoffPolicy: {
minBackoff: "100 millis"
maxBackoff: "1 second"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
package com.snowplowanalytics.snowplow.sinks.kinesis

import cats.implicits._
import cats.{Applicative, Parallel}
import cats.Parallel
import cats.effect.{Async, Resource, Sync}
import cats.effect.kernel.Ref

import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.syntax.all._
import retry.{RetryPolicies, RetryPolicy}

import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.KinesisClient
Expand All @@ -24,6 +23,8 @@ import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain
import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResponse}

import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries}

import java.net.URI
import java.util.UUID
import java.nio.charset.StandardCharsets.UTF_8
Expand Down Expand Up @@ -202,7 +203,7 @@ object KinesisSink {
streamName: String,
records: ListOfList[Sinkable]
): F[Unit] = {
val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy)
val policyForThrottling = Retries.forThrottling[F](throttlingErrorsPolicy)

// First, tryWriteToKinesis - the AWS SDK will handle retries. If there are still failures after that, it will:
// - return messages for retries if we only hit throttliing
Expand Down Expand Up @@ -233,17 +234,6 @@ object KinesisSink {

private final case class RequestLimits(recordLimit: Int, bytesLimit: Int)

private object Retries {

def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] =
capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff))

private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = {
val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy)
config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max)))
}
}

private def getRecordSize(record: PutRecordsRequestEntry) =
record.data.asByteArrayUnsafe().length + record.partitionKey().getBytes(UTF_8).length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,10 @@ package com.snowplowanalytics.snowplow.sinks.kinesis

import io.circe._
import io.circe.generic.semiauto._
import io.circe.config.syntax._
import scala.concurrent.duration.FiniteDuration

import java.net.URI

case class BackoffPolicy(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
maxRetries: Option[Int]
)
import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

object BackoffPolicy {

implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] =
deriveDecoder[BackoffPolicy]
}
import java.net.URI

case class KinesisSinkConfig(
streamName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ package com.snowplowanalytics.snowplow.sources.kinesis
import cats.effect.{Async, Sync}
import cats.implicits._
import cats.effect.implicits._
import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import org.typelevel.log4cats.Logger
import software.amazon.kinesis.exceptions.ShutdownException
import retry.syntax.all._
import software.amazon.kinesis.exceptions.{ShutdownException, ThrottlingException}
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries}

import java.util.concurrent.CountDownLatch

private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] {
private class KinesisCheckpointer[F[_]: Async: Logger](throttledBackoffPolicy: BackoffPolicy)
extends Checkpointer[F, Map[String, Checkpointable]] {

private val retryPolicy = Retries.forThrottling[F](throttledBackoffPolicy)

override val empty: Map[String, Checkpointable] = Map.empty

Expand Down Expand Up @@ -56,6 +62,18 @@ private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, M
checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber)
)
.recoverWith(ignoreShutdownExceptions(shardId))
.retryingOnSomeErrors(
policy = retryPolicy,
isWorthRetrying = {
case _: ThrottlingException => true.pure[F]
case _ => false.pure[F]
},
onError = { case (_, retryDetails) =>
Logger[F].warn(
s"Exceeded DynamoDB provisioned throughput. Checkpointing will be retried. (${retryDetails.retriesSoFar} retries so far)"
)
}
)

private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException =>
// The ShardRecordProcessor instance has been shutdown. This just means another KCL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object KinesisSource {
kinesisStream(config, liveness)

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F]()
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def lastLiveness: F[FiniteDuration] =
liveness.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

/**
* Config to be supplied from the app's hocon
*
Expand Down Expand Up @@ -46,7 +48,8 @@ case class KinesisSourceConfig(
dynamodbCustomEndpoint: Option[URI],
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal
maxLeasesToStealAtOneTimeFactor: BigDecimal,
checkpointThrottledBackoffPolicy: BackoffPolicy
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.specs2.Specification

import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

class KinesisSinkConfigSpec extends Specification {
import KinesisSinkConfigSpec._

Expand All @@ -38,7 +40,7 @@ class KinesisSinkConfigSpec extends Specification {

val expected = KinesisSinkConfig(
streamName = "my-stream",
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second, maxRetries = None),
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
recordLimit = 500,
byteLimit = 5242880,
customEndpoint = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.circe.generic.semiauto._
import org.specs2.Specification
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

class KinesisSourceConfigSpec extends Specification {
import KinesisSourceConfigSpec._

Expand All @@ -40,7 +42,11 @@ class KinesisSourceConfigSpec extends Specification {
"type": "TrimHorizon"
},
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
"maxLeasesToStealAtOneTimeFactor": 0.42,
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
}
"""

Expand All @@ -52,7 +58,8 @@ class KinesisSourceConfigSpec extends Specification {
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
).reduce(_ and _)
}
}
Expand All @@ -71,7 +78,11 @@ class KinesisSourceConfigSpec extends Specification {
"type": "TRIM_HORIZON"
},
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
"maxLeasesToStealAtOneTimeFactor": 0.42,
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
}
"""

Expand All @@ -83,7 +94,8 @@ class KinesisSourceConfigSpec extends Specification {
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
).reduce(_ and _)
}
}
Expand All @@ -102,16 +114,17 @@ class KinesisSourceConfigSpec extends Specification {
val result = ConfigFactory.load(ConfigFactory.parseString(input))

val expected = KinesisSourceConfig(
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down

0 comments on commit 8c747b5

Please sign in to comment.