Skip to content

Commit

Permalink
common-streams 0.10.0
Browse files Browse the repository at this point in the history
Common streams 0.10.0 brings significant to changes to the Kinesis and
Pubsub sources:

- PubSub source completely re-written to be a wrapper around UnaryPull
  snowplow-incubator/common-streams#101
- Kinesis source is more efficient when the stream is re-sharded
  snowplow-incubator/common-streams#102
- Kinesis source better tuned for larger deployments
  snowplow-incubator/common-streams#99
  • Loading branch information
istreeter committed Dec 31, 2024
1 parent f7b8825 commit d12a519
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
11 changes: 11 additions & 0 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@

# -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires.
"leaseDuration": "10 seconds"

# -- Controls how to pick the max number of leases to steal at one time.
# -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases.
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

# -- Configures how to backoff and retry in case of DynamoDB provisioned throughput limits
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis"
"maxBackoff": "1 second"
}
}

"output": {
Expand Down
25 changes: 16 additions & 9 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
# -- This is a balance between memory usage vs how efficiently the app can operate. The default value works well.
"bufferMaxBytes": 10000000

# -- Sets min/max boundaries on the value by which an ack deadline is extended.
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"
# -- Pubsub ack deadlines are extended for this duration when needed.
# -- A sensible value is double the size of the "windowing" config parameter, but no higher than 10 minutes.
"durationPerAckExtension": "600 seconds"

# -- Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline.
# -- For example, if `durationPerAckExtension` is `600 seconds` and `minRemainingAckDeadline` is `0.1` then the Source
# -- will wait until there is `60 seconds` left of the remining deadline, before re-extending the message deadline.
"minRemainingAckDeadline": 0.1

# -- How many pubsub messages to pull from the server in a single request.
"maxMessagesPerPull": 1000

# -- Adds an artifical delay between consecutive requests to pubsub for more messages.
# -- Under some circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
# -- the same messages multiple times.
"debounceRequests": "100 millis"
}

"output": {
Expand Down
1 change: 1 addition & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"input": ${snowplow.defaults.sources.pubsub}
"input": {
"gcpUserAgent": ${gcpUserAgent}
"durationPerAckExtension": "600 seconds"
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object Dependencies {
val awsRegistry = "1.1.20"

// Snowplow
val streams = "0.8.2-M1"
val streams = "0.10.0-M1"
val igluClient = "4.0.0"

// Transitive overrides
Expand Down

0 comments on commit d12a519

Please sign in to comment.