Skip to content

Commit

Permalink
Sources should report stream latency of stuck events (#105)
Browse files Browse the repository at this point in the history
Currently in common-streams apps, the _application_ (i.e. not this
library) is responsible for tracking latency of events consumed from the
stream.  Because of the way it is implemented in the apps, if an event
gets stuck (e.g. cannot be written to destination) then the latency
drops to zero for the period when the app is retrying the stuck event.

This commit aims to fix the problem where the `latency_millis` metric
wrongly drops to zero:

1. The `SourceAndAck` is now responsible for tracking the latency metric
and pushing the metric to the app. This consolidates the metric across
multiple apps into one place

2. The Metrics class allows metric starting values to be wrapped in and
Effect, i.e. `F[Metrics.State]`. This means the application can pass in
the latency reported by the Source.

3. The `TokenedEvents` now does not contain a stream timestamp.  The
downstream app has no business knowing the stream timestamp, now that
latency is calculated inside common-streams lib.

4. The `LowLevelSource` no longer provides a `lastLiveness`.  Instead,
the low level source is expected to periodically emit a `None` whenever
it is healthy. The higher level `SourceAndAck` then takes responsibility
for monitoring whether the low level source is healthy.

5. As a result of these changes, the `SourceAndAck` emits a latency
metric of zero whenever the source is genuinely healthy but there are no
events to be pulled. This means the app's latency metric is only zero if
there are no events.  This is better than the previous situation, where
the app might mis-leadingly emit a zero latency when the app is stuck,
e.g. cannot connect to warehouse.
  • Loading branch information
istreeter committed Jan 9, 2025
1 parent aa35e08 commit 0d42df5
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object TestMetrics {
ref: Ref[IO, TestState],
emptyState: TestState,
config: Option[Metrics.StatsdConfig]
) extends Metrics[IO, TestState](ref, emptyState, config) {
) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) {
def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c))
def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing
import com.snowplowanalytics.snowplow.it.DockerPull
import com.snowplowanalytics.snowplow.it.kinesis._

import java.time.Instant

import Utils._

import org.specs2.specification.BeforeAll
Expand Down Expand Up @@ -60,23 +58,21 @@ class KinesisSourceSpec

for {
refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
t1 <- IO.realTimeInstant
_ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload)
t2 <- IO.realTimeInstant
processingConfig = new EventProcessingConfig(NoWindowing)
refReportedLatencies <- Ref[IO].of(Vector.empty[FiniteDuration])
processingConfig = EventProcessingConfig(NoWindowing, tstamp => refReportedLatencies.update(_ :+ tstamp))
kinesisConfig = getKinesisSourceConfig(testStream1Name)
sourceAndAck <- KinesisSource.build[IO](kinesisConfig)
stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed))
fiber <- stream.compile.drain.start
_ <- IO.sleep(2.minutes)
processed <- refProcessed.get
reportedLatencies <- refReportedLatencies.get
_ <- fiber.cancel
} yield List(
processed must haveSize(1),
processed.head.events must beEqualTo(List(testPayload)),
processed.head.tstamp must beSome { tstamp: Instant =>
tstamp.toEpochMilli must beBetween(t1.toEpochMilli, t2.toEpochMilli)
}
reportedLatencies.max must beBetween(1.second, 2.minutes)
).reduce(_ and _)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.time.Instant
import scala.concurrent.duration.DurationLong

import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest

object Utils {

case class ReceivedEvents(events: List[String], tstamp: Option[Instant])
case class ReceivedEvents(events: List[String])

def putDataToKinesis(
client: KinesisAsyncClient,
Expand Down Expand Up @@ -81,7 +80,7 @@ object Utils {
.build()

val out =
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None)
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())))
out
}

Expand All @@ -108,10 +107,10 @@ object Utils {
)

def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] =
_.evalMap { case TokenedEvents(events, token, tstamp) =>
_.evalMap { case TokenedEvents(events, token) =>
val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)
for {
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp))
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList))
} yield token
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.reflect._

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -48,11 +47,8 @@ object KafkaSource {
new LowLevelSource[F, KafkaCheckpoints[F]] {
def checkpointer: Checkpointer[F, KafkaCheckpoints[F]] = kafkaCheckpointer

def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
kafkaStream(config, authHandlerClass)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand All @@ -75,7 +71,7 @@ object KafkaSource {
private def kafkaStream[F[_]: Async, T <: AzureAuthenticationCallbackHandler](
config: KafkaSourceConfig,
authHandlerClass: ClassTag[T]
): Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
KafkaConsumer
.stream(consumerSettings[F, T](config, authHandlerClass))
.evalTap(_.subscribeTo(config.topicName))
Expand All @@ -89,7 +85,7 @@ object KafkaSource {

private def joinPartitions[F[_]: Async](
partitioned: PartitionedStreams[F]
): Stream[F, LowLevelEvents[KafkaCheckpoints[F]]] = {
): Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]] = {
val streams = partitioned.toSeq.map { case (topicPartition, stream) =>
stream.chunks
.flatMap { chunk =>
Expand All @@ -103,8 +99,8 @@ object KafkaSource {
val ts = ccr.record.timestamp
ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime)
}
val earliestTimestamp = if (timestamps.isEmpty) None else Some(Instant.ofEpochMilli(timestamps.min))
Stream.emit(LowLevelEvents(events, ack, earliestTimestamp))
val earliestTimestamp = if (timestamps.isEmpty) None else Some(timestamps.min.millis)
Stream.emit(Some(LowLevelEvents(events, ack, earliestTimestamp)))
case None =>
Stream.empty
}
Expand All @@ -117,6 +113,7 @@ object KafkaSource {
Stream
.emits(streams)
.parJoinUnbounded
.mergeHaltL(Stream.awakeDelay(10.seconds).map(_ => None).repeat) // keepalives
.onFinalize {
Logger[F].info(s"Stopping processing of partitions: $formatted")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Ref, Sync}
import cats.effect.{Async, Sync}
import cats.data.NonEmptyList
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.SourceAndAck
Expand All @@ -19,53 +19,46 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.{CountDownLatch, SynchronousQueue}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong
import scala.jdk.CollectionConverters._

object KinesisSource {

private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
Ref.ofEffect(Sync[F].realTime).flatMap { liveness =>
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] =
kinesisStream(config, liveness)
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] =
kinesisStream(config)

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def lastLiveness: F[FiniteDuration] =
liveness.get
}
def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)
}
}

// We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate.
private val synchronousQueueFairness: Boolean = true

private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
liveness: Ref[F, FiniteDuration]
): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = {
config: KinesisSourceConfig
): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = {
val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness)
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat
} yield events
}

private def pullFromQueueAndEmit[F[_]: Sync](
queue: SynchronousQueue[KCLAction],
liveness: Ref[F, FiniteDuration]
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] =
Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
queue: SynchronousQueue[KCLAction]
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] =
Pull.eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
val toEmit = actions.traverse {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
Pull.done
Pull.output1(None)
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F]
Pull.output1(Some(provideNextChunk(shardId, processRecordsInput))).covary[F]
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput)
case KCLAction.KCLError(t) =>
Expand All @@ -81,14 +74,13 @@ object KinesisSource {
}
Pull.eval(log).covaryOutput *> toEmit *> Pull.done
} else
toEmit *> pullFromQueueAndEmit(queue, liveness)
toEmit *> pullFromQueueAndEmit(queue)
}

private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean)

private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] =
private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[PullFromQueueResult] =
resolveNextAction(queue)
.productL(updateLiveness(liveness))
.flatMap {
case shardEnd: KCLAction.ShardEnd =>
// If we reached the end of one shard, it is likely we reached the end of other shards too.
Expand All @@ -115,9 +107,6 @@ object KinesisSource {
_ <- Sync[F].delay(queue.drainTo(ret))
} yield ret.asScala.toList

private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] =
Sync[F].realTime.flatMap(now => liveness.set(now))

private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = {
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
Expand All @@ -126,17 +115,21 @@ object KinesisSource {
new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber),
input.checkpointer
)
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
LowLevelEvents(
chunk,
Map[String, Checkpointable](shardId -> checkpointable),
Some(firstRecord.approximateArrivalTimestamp.toEpochMilli.millis)
)
}

private def handleShardEnd[F[_]](
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = {
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = {
val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await)
val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None)
Pull.output1(last)
Pull.output1(Some(last))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import fs2.{Chunk, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.time.Instant

// pubsub
import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.ChannelPoolSettings
Expand All @@ -31,7 +29,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration}
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors}
Expand Down Expand Up @@ -64,17 +62,14 @@ object PubsubSource {
new LowLevelSource[F, Vector[Unique.Token]] {
def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources)

def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
pubsubStream(config, deferredResources)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

private def pubsubStream[F[_]: Async](
config: PubsubSourceConfig,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
for {
parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config)))
stub <- Stream.resource(stubResource(config))
Expand All @@ -83,7 +78,6 @@ object PubsubSource {
} yield Stream
.fixedRateStartImmediately(config.debounceRequests, dampen = true)
.parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates))
.unNone
.prefetchN(parallelPullCount)
.concurrently(extendDeadlines(config, stub, refStates))
.onFinalize(nackRefStatesForShutdown(config, stub, refStates))
Expand Down Expand Up @@ -124,10 +118,10 @@ object PubsubSource {
}
}

private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = {
private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): FiniteDuration = {
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)
tstampSeconds.seconds + tstampNanos.toLong.nanos
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8

abstract class Metrics[F[_]: Async, S <: Metrics.State](
ref: Ref[F, S],
emptyState: S,
initState: F[S],
config: Option[Metrics.StatsdConfig]
) {
def report: Stream[F, Nothing] =
Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters =>
def report = for {
state <- ref.getAndSet(emptyState)
nextState <- initState
state <- ref.getAndSet(nextState)
kv = state.toKVMetrics
_ <- reporters.traverse(_.report(kv))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import scala.concurrent.duration.FiniteDuration
* Whether to open a new [[EventProcessor]] to handle a timed window of events (e.g. for the
* transformer) or whether to feed events to a single [[EventProcessor]] in a continuous endless
* stream (e.g. Enrich)
*
* @param latencyConsumer
* common-streams apps should use the latencyConsumer to set the latency metric.
*/
case class EventProcessingConfig(windowing: EventProcessingConfig.Windowing)
case class EventProcessingConfig[F[_]](windowing: EventProcessingConfig.Windowing, latencyConsumer: FiniteDuration => F[Unit])

object EventProcessingConfig {

Expand Down
Loading

0 comments on commit 0d42df5

Please sign in to comment.