Skip to content

Commit

Permalink
Bump common-streams to 0.10.0-M2
Browse files Browse the repository at this point in the history
Common streams 0.10.0 brings significant to changes to the Kinesis and
Pubsub sources:

- PubSub source completely re-written to be a wrapper around UnaryPull
  snowplow-incubator/common-streams#101
- Kinesis source is more efficient when the stream is re-sharded
  snowplow-incubator/common-streams#102
- Kinesis source better tuned for larger deployments
  snowplow-incubator/common-streams#99

And improvements to latency metrics:
- Sources should report stream latency of stuck events
  snowplow-incubator/common-streams#104
  • Loading branch information
istreeter committed Jan 3, 2025
1 parent 0928d9a commit 6f59023
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 61 deletions.
17 changes: 17 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@
# -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll.
"maxRecords": 1000
}

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

# -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires.
"leaseDuration": "10 seconds"

# -- Controls how to pick the max number of leases to steal at one time.
# -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases.
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

# -- Configures how to backoff and retry in case of DynamoDB provisioned throughput limits
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis"
"maxBackoff": "1 second"
}
}

"output": {
Expand Down
24 changes: 12 additions & 12 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
# -- This is a balance between memory usage vs how efficiently the app can operate. The default value works well.
"bufferMaxBytes": 1000000
# -- Pubsub ack deadlines are extended for this duration when needed.
"durationPerAckExtension": "60 seconds"

# -- For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event.
"maxAckExtensionPeriod": "1 hour"
# -- Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline.
# -- For example, if `durationPerAckExtension` is `60 seconds` and `minRemainingAckDeadline` is `0.1` then the Source
# -- will wait until there is `6 seconds` left of the remining deadline, before re-extending the message deadline.
"minRemainingAckDeadline": 0.1

# -- Sets min/max boundaries on the value by which an ack deadline is extended.
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"
# -- How many pubsub messages to pull from the server in a single request.
"maxMessagesPerPull": 1000

# -- Max num of streaming pulls to open per transport channel
"maxPullsPerTransportChannel": 16
# -- Adds an artifical delay between consecutive requests to pubsub for more messages.
# -- Under some circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
# -- the same messages multiple times.
"debounceRequests": "100 millis"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object Environment {
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
toSink(config.main.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics, sourceAndAck))
creds <- Resource.eval(BigQueryUtils.credentials(config.main.output.good))
tableManager <- Resource.eval(TableManager.make(config.main.output.good, creds))
tableManagerWrapped <- Resource.eval(TableManager.withHandledErrors(tableManager, config.main.retries, appHealth))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
*/
package com.snowplowanalytics.snowplow.bigquery

import cats.Functor
import cats.effect.Async
import cats.effect.kernel.Ref
import cats.implicits._
import fs2.Stream

import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics}

trait Metrics[F[_]] {
Expand All @@ -25,8 +27,8 @@ trait Metrics[F[_]] {

object Metrics {

def build[F[_]: Async](config: Config.Metrics): F[Metrics[F]] =
Ref[F].of(State.empty).map(impl(config, _))
def build[F[_]: Async](config: Config.Metrics, sourceAndAck: SourceAndAck[F]): F[Metrics[F]] =
Ref.ofEffect(State.initialize(sourceAndAck)).map(impl(config, _, sourceAndAck))

private case class State(
good: Long,
Expand All @@ -42,11 +44,18 @@ object Metrics {
}

private object State {
def empty: State = State(0, 0, 0L)
def initialize[F[_]: Functor](sourceAndAck: SourceAndAck[F]): F[State] =
sourceAndAck.currentStreamLatency.map { latency =>
State(0L, 0L, latency.fold(0L)(_.toMillis))
}
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
new CommonMetrics[F, State](ref, State.empty, config.statsd) with Metrics[F] {
private def impl[F[_]: Async](
config: Config.Metrics,
ref: Ref[F, State],
sourceAndAck: SourceAndAck[F]
): Metrics[F] =
new CommonMetrics[F, State](ref, State.initialize(sourceAndAck), config.statsd) with Metrics[F] {
def addGood(count: Long): F[Unit] =
ref.update(s => s.copy(good = s.good + count))
def addBad(count: Long): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ object MockEnvironment {

def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] =
IO.pure(SourceAndAck.Healthy)

def currentStreamLatency: IO[Option[FiniteDuration]] =
IO.pure(None)
}

private def testBadSink(mockedResponse: Response[Unit], state: Ref[IO, Vector[Action]]): Sink[IO] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import cats.Id
import cats.effect.testing.specs2.CatsEffect
import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification
import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

import java.nio.file.Paths
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -61,15 +63,17 @@ class KinesisConfigSpec extends Specification with CatsEffect {
object KinesisConfigSpec {
private val minimalConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-bigquery-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-bigquery-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal("2.0"),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
),
output = Config.Output(
good = Config.BigQuery(
Expand All @@ -82,7 +86,7 @@ object KinesisConfigSpec {
bad = Config.SinkWithMaxSize(
sink = KinesisSinkConfig(
streamName = "bad",
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second, maxRetries = None),
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
recordLimit = 500,
byteLimit = 5242880,
customEndpoint = None
Expand Down Expand Up @@ -129,15 +133,17 @@ object KinesisConfigSpec {
// workerIdentifer coming from "HOSTNAME" env variable set in BuildSettings
private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-bigquery-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-bigquery-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal("2.0"),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
),
output = Config.Output(
good = Config.BigQuery(
Expand All @@ -150,7 +156,7 @@ object KinesisConfigSpec {
bad = Config.SinkWithMaxSize(
sink = KinesisSinkConfig(
streamName = "bad",
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second, maxRetries = None),
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
recordLimit = 500,
byteLimit = 5242880,
customEndpoint = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ class PubsubConfigSpec extends Specification with CatsEffect {
object PubsubConfigSpec {
private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
durationPerAckExtension = 1.minute,
minRemainingAckDeadline = BigDecimal(0.1),
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
maxMessagesPerPull = 1000,
debounceRequests = 100.millis
),
output = Config.Output(
good = Config.BigQuery(
Expand Down Expand Up @@ -128,15 +126,13 @@ object PubsubConfigSpec {

private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
durationPerAckExtension = 1.minute,
minRemainingAckDeadline = BigDecimal(0.1),
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
maxMessagesPerPull = 1000,
debounceRequests = 100.millis
),
output = Config.Output(
good = Config.BigQuery(
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ object Dependencies {
val slf4j = "2.0.12"
val azureSdk = "1.11.4"
val sentry = "6.25.2"
val awsSdk2 = "2.25.16"
val awsSdk2 = "2.29.0"
val bigqueryStorage = "2.47.0"
val bigquery = "2.34.2"

// Snowplow
val streams = "0.8.2-M1"
val streams = "0.10.0-M2"
val igluClient = "4.0.0"

// tests
Expand Down

0 comments on commit 6f59023

Please sign in to comment.