From 0d42df5120a21614492bbac677a5540de6e45412 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 3 Jan 2025 09:25:38 +0000 Subject: [PATCH] Sources should report stream latency of stuck events (#105) Currently in common-streams apps, the _application_ (i.e. not this library) is responsible for tracking latency of events consumed from the stream. Because of the way it is implemented in the apps, if an event gets stuck (e.g. cannot be written to destination) then the latency drops to zero for the period when the app is retrying the stuck event. This commit aims to fix the problem where the `latency_millis` metric wrongly drops to zero: 1. The `SourceAndAck` is now responsible for tracking the latency metric and pushing the metric to the app. This consolidates the metric across multiple apps into one place 2. The Metrics class allows metric starting values to be wrapped in and Effect, i.e. `F[Metrics.State]`. This means the application can pass in the latency reported by the Source. 3. The `TokenedEvents` now does not contain a stream timestamp. The downstream app has no business knowing the stream timestamp, now that latency is calculated inside common-streams lib. 4. The `LowLevelSource` no longer provides a `lastLiveness`. Instead, the low level source is expected to periodically emit a `None` whenever it is healthy. The higher level `SourceAndAck` then takes responsibility for monitoring whether the low level source is healthy. 5. As a result of these changes, the `SourceAndAck` emits a latency metric of zero whenever the source is genuinely healthy but there are no events to be pulled. This means the app's latency metric is only zero if there are no events. This is better than the previous situation, where the app might mis-leadingly emit a zero latency when the app is stuck, e.g. cannot connect to warehouse. --- .../snowplow/runtime/MetricsSpec.scala | 2 +- .../sources/kinesis/KinesisSourceSpec.scala | 12 +- .../snowplow/sources/kinesis/Utils.scala | 9 +- .../snowplow/sources/kafka/KafkaSource.scala | 17 +- .../sources/kinesis/KinesisSource.scala | 57 ++-- .../sources/pubsub/PubsubSource.scala | 16 +- .../snowplow/runtime/Metrics.scala | 5 +- .../sources/EventProcessingConfig.scala | 5 +- .../sources/SourceAndAck.scala | 19 +- .../sources/TokenedEvents.scala | 7 +- .../sources/internal/LowLevelEvents.scala | 7 +- .../sources/internal/LowLevelSource.scala | 122 +++++--- .../sources/internal/LowLevelSourceSpec.scala | 283 +++++++++++++++--- 13 files changed, 394 insertions(+), 167 deletions(-) diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala index 17e6e90d..89588b98 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala @@ -76,7 +76,7 @@ object TestMetrics { ref: Ref[IO, TestState], emptyState: TestState, config: Option[Metrics.StatsdConfig] - ) extends Metrics[IO, TestState](ref, emptyState, config) { + ) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) { def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c)) def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t)) } diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala index 00234485..de9fb63a 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala @@ -24,8 +24,6 @@ import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing import com.snowplowanalytics.snowplow.it.DockerPull import com.snowplowanalytics.snowplow.it.kinesis._ -import java.time.Instant - import Utils._ import org.specs2.specification.BeforeAll @@ -60,23 +58,21 @@ class KinesisSourceSpec for { refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil) - t1 <- IO.realTimeInstant _ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload) - t2 <- IO.realTimeInstant - processingConfig = new EventProcessingConfig(NoWindowing) + refReportedLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + processingConfig = EventProcessingConfig(NoWindowing, tstamp => refReportedLatencies.update(_ :+ tstamp)) kinesisConfig = getKinesisSourceConfig(testStream1Name) sourceAndAck <- KinesisSource.build[IO](kinesisConfig) stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed)) fiber <- stream.compile.drain.start _ <- IO.sleep(2.minutes) processed <- refProcessed.get + reportedLatencies <- refReportedLatencies.get _ <- fiber.cancel } yield List( processed must haveSize(1), processed.head.events must beEqualTo(List(testPayload)), - processed.head.tstamp must beSome { tstamp: Instant => - tstamp.toEpochMilli must beBetween(t1.toEpochMilli, t2.toEpochMilli) - } + reportedLatencies.max must beBetween(1.second, 2.minutes) ).reduce(_ and _) } } diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index ae661384..df560b25 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -24,14 +24,13 @@ import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig import java.net.URI import java.nio.charset.StandardCharsets import java.util.UUID -import java.time.Instant import scala.concurrent.duration.DurationLong import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest object Utils { - case class ReceivedEvents(events: List[String], tstamp: Option[Instant]) + case class ReceivedEvents(events: List[String]) def putDataToKinesis( client: KinesisAsyncClient, @@ -81,7 +80,7 @@ object Utils { .build() val out = - ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None) + ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray()))) out } @@ -108,10 +107,10 @@ object Utils { ) def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] = - _.evalMap { case TokenedEvents(events, token, tstamp) => + _.evalMap { case TokenedEvents(events, token) => val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString) for { - _ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp)) + _ <- ref.update(_ :+ ReceivedEvents(parsed.toList)) } yield token } diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index dc9feef6..f80b7a00 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -19,8 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.reflect._ import java.nio.ByteBuffer -import java.time.Instant -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationLong // kafka import fs2.kafka._ @@ -48,11 +47,8 @@ object KafkaSource { new LowLevelSource[F, KafkaCheckpoints[F]] { def checkpointer: Checkpointer[F, KafkaCheckpoints[F]] = kafkaCheckpointer - def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] = + def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] = kafkaStream(config, authHandlerClass) - - def lastLiveness: F[FiniteDuration] = - Sync[F].realTime } case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit]) @@ -75,7 +71,7 @@ object KafkaSource { private def kafkaStream[F[_]: Async, T <: AzureAuthenticationCallbackHandler]( config: KafkaSourceConfig, authHandlerClass: ClassTag[T] - ): Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] = + ): Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] = KafkaConsumer .stream(consumerSettings[F, T](config, authHandlerClass)) .evalTap(_.subscribeTo(config.topicName)) @@ -89,7 +85,7 @@ object KafkaSource { private def joinPartitions[F[_]: Async]( partitioned: PartitionedStreams[F] - ): Stream[F, LowLevelEvents[KafkaCheckpoints[F]]] = { + ): Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]] = { val streams = partitioned.toSeq.map { case (topicPartition, stream) => stream.chunks .flatMap { chunk => @@ -103,8 +99,8 @@ object KafkaSource { val ts = ccr.record.timestamp ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime) } - val earliestTimestamp = if (timestamps.isEmpty) None else Some(Instant.ofEpochMilli(timestamps.min)) - Stream.emit(LowLevelEvents(events, ack, earliestTimestamp)) + val earliestTimestamp = if (timestamps.isEmpty) None else Some(timestamps.min.millis) + Stream.emit(Some(LowLevelEvents(events, ack, earliestTimestamp))) case None => Stream.empty } @@ -117,6 +113,7 @@ object KafkaSource { Stream .emits(streams) .parJoinUnbounded + .mergeHaltL(Stream.awakeDelay(10.seconds).map(_ => None).repeat) // keepalives .onFinalize { Logger[F].info(s"Stopping processing of partitions: $formatted") } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 1b016148..9f87ddf4 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.effect.{Async, Ref, Sync} +import cats.effect.{Async, Sync} import cats.data.NonEmptyList import cats.implicits._ import com.snowplowanalytics.snowplow.sources.SourceAndAck @@ -19,7 +19,7 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber import java.util.concurrent.{CountDownLatch, SynchronousQueue} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationLong import scala.jdk.CollectionConverters._ object KinesisSource { @@ -27,18 +27,13 @@ object KinesisSource { private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = - Ref.ofEffect(Sync[F].realTime).flatMap { liveness => - LowLevelSource.toSourceAndAck { - new LowLevelSource[F, Map[String, Checkpointable]] { - def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = - kinesisStream(config, liveness) + LowLevelSource.toSourceAndAck { + new LowLevelSource[F, Map[String, Checkpointable]] { + def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = + kinesisStream(config) - def checkpointer: KinesisCheckpointer[F] = - new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) - - def lastLiveness: F[FiniteDuration] = - liveness.get - } + def checkpointer: KinesisCheckpointer[F] = + new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) } } @@ -46,26 +41,24 @@ object KinesisSource { private val synchronousQueueFairness: Boolean = true private def kinesisStream[F[_]: Async]( - config: KinesisSourceConfig, - liveness: Ref[F, FiniteDuration] - ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { + config: KinesisSourceConfig + ): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = { val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness) for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) - events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat + events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat } yield events } private def pullFromQueueAndEmit[F[_]: Sync]( - queue: SynchronousQueue[KCLAction], - liveness: Ref[F, FiniteDuration] - ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = - Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) => + queue: SynchronousQueue[KCLAction] + ): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = + Pull.eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult(actions, hasShardEnd) => val toEmit = actions.traverse { case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => - Pull.done + Pull.output1(None) case KCLAction.ProcessRecords(shardId, processRecordsInput) => - Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] + Pull.output1(Some(provideNextChunk(shardId, processRecordsInput))).covary[F] case KCLAction.ShardEnd(shardId, await, shardEndedInput) => handleShardEnd[F](shardId, await, shardEndedInput) case KCLAction.KCLError(t) => @@ -81,14 +74,13 @@ object KinesisSource { } Pull.eval(log).covaryOutput *> toEmit *> Pull.done } else - toEmit *> pullFromQueueAndEmit(queue, liveness) + toEmit *> pullFromQueueAndEmit(queue) } private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean) - private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] = + private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[PullFromQueueResult] = resolveNextAction(queue) - .productL(updateLiveness(liveness)) .flatMap { case shardEnd: KCLAction.ShardEnd => // If we reached the end of one shard, it is likely we reached the end of other shards too. @@ -115,9 +107,6 @@ object KinesisSource { _ <- Sync[F].delay(queue.drainTo(ret)) } yield ret.asScala.toList - private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] = - Sync[F].realTime.flatMap(now => liveness.set(now)) - private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = { val chunk = Chunk.javaList(input.records()).map(_.data()) val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above @@ -126,17 +115,21 @@ object KinesisSource { new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), input.checkpointer ) - LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp)) + LowLevelEvents( + chunk, + Map[String, Checkpointable](shardId -> checkpointable), + Some(firstRecord.approximateArrivalTimestamp.toEpochMilli.millis) + ) } private def handleShardEnd[F[_]]( shardId: String, await: CountDownLatch, shardEndedInput: ShardEndedInput - ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = { + ): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = { val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await) val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) - Pull.output1(last) + Pull.output1(Some(last)) } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 743b891a..5d2101b4 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -14,8 +14,6 @@ import fs2.{Chunk, Stream} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import java.time.Instant - // pubsub import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} import com.google.api.gax.grpc.ChannelPoolSettings @@ -31,7 +29,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ -import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import scala.jdk.CollectionConverters._ import java.util.concurrent.{ExecutorService, Executors} @@ -64,17 +62,14 @@ object PubsubSource { new LowLevelSource[F, Vector[Unique.Token]] { def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources) - def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] = pubsubStream(config, deferredResources) - - def lastLiveness: F[FiniteDuration] = - Sync[F].realTime } private def pubsubStream[F[_]: Async]( config: PubsubSourceConfig, deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] - ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + ): Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] = for { parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) stub <- Stream.resource(stubResource(config)) @@ -83,7 +78,6 @@ object PubsubSource { } yield Stream .fixedRateStartImmediately(config.debounceRequests, dampen = true) .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) - .unNone .prefetchN(parallelPullCount) .concurrently(extendDeadlines(config, stub, refStates)) .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) @@ -124,10 +118,10 @@ object PubsubSource { } } - private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = { + private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): FiniteDuration = { val (tstampSeconds, tstampNanos) = records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min - Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong) + tstampSeconds.seconds + tstampNanos.toLong.nanos } /** diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala index ea88db9c..90951e62 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8 abstract class Metrics[F[_]: Async, S <: Metrics.State]( ref: Ref[F, S], - emptyState: S, + initState: F[S], config: Option[Metrics.StatsdConfig] ) { def report: Stream[F, Nothing] = Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters => def report = for { - state <- ref.getAndSet(emptyState) + nextState <- initState + state <- ref.getAndSet(nextState) kv = state.toKVMetrics _ <- reporters.traverse(_.report(kv)) } yield () diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala index 29f24679..758fd5e2 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala @@ -25,8 +25,11 @@ import scala.concurrent.duration.FiniteDuration * Whether to open a new [[EventProcessor]] to handle a timed window of events (e.g. for the * transformer) or whether to feed events to a single [[EventProcessor]] in a continuous endless * stream (e.g. Enrich) + * + * @param latencyConsumer + * common-streams apps should use the latencyConsumer to set the latency metric. */ -case class EventProcessingConfig(windowing: EventProcessingConfig.Windowing) +case class EventProcessingConfig[F[_]](windowing: EventProcessingConfig.Windowing, latencyConsumer: FiniteDuration => F[Unit]) object EventProcessingConfig { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index 384daf76..d1cf5caf 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources import cats.Show import cats.implicits._ import fs2.Stream + import scala.concurrent.duration.FiniteDuration /** @@ -32,7 +33,7 @@ trait SourceAndAck[F[_]] { * @return * A stream which should be compiled and drained */ - def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] + def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing] /** * Reports on whether the source of events is healthy @@ -48,6 +49,22 @@ trait SourceAndAck[F[_]] { * healthy. If any event is "stuck" then latency is high and the probe should report unhealthy. */ def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] + + /** + * Latency of the message that is currently being processed by the downstream application + * + * The returned value is `None` if this `SourceAndAck` is currently awaiting messages from + * upstream, e.g. it is doing a remote fetch. + * + * The returned value is `Some` if this `SourceAndAck` has emitted a message downstream to the + * application, and it is waiting for the downstream app to "pull" the next message from this + * `SourceAndAck`. + * + * This value should be used as the initial latency at the start of a statsd metrics reporting + * period. This ensures the app reports non-zero latency even when the app is stuck (e.g. cannot + * load events to destination). + */ + def currentStreamLatency: F[Option[FiniteDuration]] } object SourceAndAck { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala index da1f0ae4..d4d9aaa6 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala @@ -11,7 +11,6 @@ import cats.effect.kernel.Unique import fs2.Chunk import java.nio.ByteBuffer -import java.time.Instant /** * The events as they are fed into a [[EventProcessor]] @@ -22,12 +21,8 @@ import java.time.Instant * The [[EventProcessor]] must emit this token after it has fully processed the batch of events. * When the [[EventProcessor]] emits the token, it is an instruction to the [[SourceAndAck]] to * ack/checkpoint the events. - * @param earliestSourceTstamp - * The timestamp that an event was originally written to the source stream. Used for calculating - * the latency metric. */ case class TokenedEvents( events: Chunk[ByteBuffer], - ack: Unique.Token, - earliestSourceTstamp: Option[Instant] + ack: Unique.Token ) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala index 6353fd88..f791e745 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala @@ -10,16 +10,19 @@ package com.snowplowanalytics.snowplow.sources.internal import fs2.Chunk import java.nio.ByteBuffer -import java.time.Instant +import scala.concurrent.duration.FiniteDuration /** * The events and checkpointable item emitted by a LowLevelSource * * This library uses LowLevelEvents internally, but it is never exposed to the high level event * processor + * + * @param earliestSourceTstamp + * A point in time, represented as a `FiniteDuration` since epoch */ case class LowLevelEvents[C]( events: Chunk[ByteBuffer], ack: C, - earliestSourceTstamp: Option[Instant] + earliestSourceTstamp: Option[FiniteDuration] ) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 3ac01bb6..7daa059e 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -17,7 +17,7 @@ import fs2.{Pipe, Pull, Stream} import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger -import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} /** @@ -43,16 +43,12 @@ private[sources] trait LowLevelSource[F[_], C] { * rebalancing. * * A new [[EventProcessor]] will be invoked for each inner stream - */ - def stream: Stream[F, Stream[F, LowLevelEvents[C]]] - - /** - * The last time this source was known to be alive and healthy * - * The returned value is FiniteDuration since the unix epoch, i.e. the value returned by - * `Sync[F].realTime` + * The inner stream should periodically emit `None` as a signal that it is alive and healthy, even + * when there are no events on the stream. Failure to emit frequently will result in the + * `SourceAndAck` reporting itself as unhealthy. */ - def lastLiveness: F[FiniteDuration] + def stream: Stream[F, Stream[F, Option[LowLevelEvents[C]]]] } private[sources] object LowLevelSource { @@ -65,16 +61,36 @@ private[sources] object LowLevelSource { * Map is keyed by Token, corresponding to a batch of `TokenedEvents`. Map values are `C`s which * is how to ack/checkpoint the batch. */ - private type State[C] = Map[Unique.Token, C] + private type AcksState[C] = Map[Unique.Token, C] - /** - * Mutable state used for measuring the event-processing latency from this source - * - * For the batch that was last emitted downstream for processing, the `Option[FiniteDuration]` - * represents the real time (since epoch) that the batch was emitted. If it is None then there is - * no batch currently being processed. - */ - private type LatencyRef[F[_]] = Ref[F, Option[FiniteDuration]] + private sealed trait InternalState + + private object InternalState { + + /** + * The Source is awaiting the downstream processor to pull another message from us + * + * @param since + * Timestamp of when the last message was emitted downstream. A point in time represented as + * FiniteDuration since the epoch. + * @param streamTstamp + * For the last last batch to be emitted downstream, this is the earliest timestamp according + * to the source, i.e. time the event was written to the source stream. It is None if this + * stream type does not record timestamps, e.g. Kafka under some circumstances. + */ + case class AwaitingDownstream(since: FiniteDuration, streamTstamp: Option[FiniteDuration]) extends InternalState + + /** + * The Source is awaiting the upstream remote source to provide more messages + * + * @param since + * Timestamp of when the last message was received up upstream. A point in time represented as + * FiniteDuration since the epoch. + */ + case class AwaitingUpstream(since: FiniteDuration) extends InternalState + + case object Disconnected extends InternalState + } /** * Lifts the internal [[LowLevelSource]] into a [[SourceAndAck]], which is the public API of this @@ -82,23 +98,22 @@ private[sources] object LowLevelSource { */ def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] = for { - latencyRef <- Ref[F].of(Option.empty[FiniteDuration]) - isConnectedRef <- Ref[F].of(false) - } yield sourceAndAckImpl(source, latencyRef, isConnectedRef) + stateRef <- Ref[F].of[InternalState](InternalState.Disconnected) + } yield sourceAndAckImpl(source, stateRef) private def sourceAndAckImpl[F[_]: Async, C]( source: LowLevelSource[F, C], - latencyRef: LatencyRef[F], - isConnectedRef: Ref[F, Boolean] + stateRef: Ref[F, InternalState] ): SourceAndAck[F] = new SourceAndAck[F] { - def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] = { + def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing] = { val str = for { s2 <- source.stream acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _)) - _ <- Stream.bracket(isConnectedRef.set(true))(_ => isConnectedRef.set(false)) + now <- Stream.eval(Sync[F].realTime) + _ <- Stream.bracket(stateRef.set(InternalState.AwaitingUpstream(now)))(_ => stateRef.set(InternalState.Disconnected)) } yield { val tokenedSources = s2 - .through(monitorLatency(latencyRef)) + .through(monitorLatency(config, stateRef)) .through(tokened(acksRef)) .through(windowed(config.windowing)) @@ -116,18 +131,27 @@ private[sources] object LowLevelSource { } def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] = - (isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN { - case (false, _, _, _) => + (stateRef.get, Sync[F].realTime).mapN { + case (InternalState.Disconnected, _) => SourceAndAck.Disconnected - case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency => - SourceAndAck.LaggingEventProcessor(now - lastPullTime) - case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency => - SourceAndAck.InactiveSource(now - lastLiveness) + case (InternalState.AwaitingDownstream(since, _), now) if now - since > maxAllowedProcessingLatency => + SourceAndAck.LaggingEventProcessor(now - since) + case (InternalState.AwaitingUpstream(since), now) if now - since > maxAllowedProcessingLatency => + SourceAndAck.InactiveSource(now - since) case _ => SourceAndAck.Healthy } + + def currentStreamLatency: F[Option[FiniteDuration]] = + stateRef.get.flatMap { + case InternalState.AwaitingDownstream(_, Some(tstamp)) => + Sync[F].realTime.map { now => + Some(now - tstamp) + } + case _ => none.pure[F] + } } - private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, State[C]]): F[Unit] = + private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, AcksState[C]]): F[Unit] = ref.get .flatMap { map => checkpointer.nack(checkpointer.combineAll(map.values)) @@ -138,29 +162,41 @@ private[sources] object LowLevelSource { * * The token can later be exchanged for the original checkpointable item */ - private def tokened[F[_]: Sync, C](ref: Ref[F, State[C]]): Pipe[F, LowLevelEvents[C], TokenedEvents] = - _.evalMap { case LowLevelEvents(events, ack, earliestSourceTstamp) => + private def tokened[F[_]: Sync, C](ref: Ref[F, AcksState[C]]): Pipe[F, LowLevelEvents[C], TokenedEvents] = + _.evalMap { case LowLevelEvents(events, ack, _) => for { token <- Unique[F].unique _ <- ref.update(_ + (token -> ack)) - } yield TokenedEvents(events, token, earliestSourceTstamp) + } yield TokenedEvents(events, token) } /** * An fs2 Pipe which records what time (duration since epoch) we last emitted a batch downstream * for processing */ - private def monitorLatency[F[_]: Sync, A](ref: Ref[F, Option[FiniteDuration]]): Pipe[F, A, A] = { + private def monitorLatency[F[_]: Sync, C]( + config: EventProcessingConfig[F], + ref: Ref[F, InternalState] + ): Pipe[F, Option[LowLevelEvents[C]], LowLevelEvents[C]] = { - def go(source: Stream[F, A]): Pull[F, A, Unit] = + def go(source: Stream[F, Option[LowLevelEvents[C]]]): Pull[F, LowLevelEvents[C], Unit] = source.pull.uncons1.flatMap { case None => Pull.done - case Some((pulled, source)) => + case Some((Some(pulled), source)) => for { now <- Pull.eval(Sync[F].realTime) - _ <- Pull.eval(ref.set(Some(now))) + latency = pulled.earliestSourceTstamp.fold(Duration.Zero)(now - _) + _ <- Pull.eval(config.latencyConsumer(latency)) + _ <- Pull.eval(ref.set(InternalState.AwaitingDownstream(now, pulled.earliestSourceTstamp))) _ <- Pull.output1(pulled) - _ <- Pull.eval(ref.set(None)) + _ <- Pull.eval(ref.set(InternalState.AwaitingUpstream(now))) + _ <- go(source) + } yield () + case Some((None, source)) => + for { + now <- Pull.eval(Sync[F].realTime) + _ <- Pull.eval(config.latencyConsumer(Duration.Zero)) + _ <- Pull.eval(ref.set(InternalState.AwaitingUpstream(now))) _ <- go(source) } yield () } @@ -190,11 +226,11 @@ private[sources] object LowLevelSource { */ private def messageSink[F[_]: Async, C]( processor: EventProcessor[F], - ref: Ref[F, State[C]], + ref: Ref[F, AcksState[C]], checkpointer: Checkpointer[F, C], control: EagerWindows.Control[F] ): Pipe[F, TokenedEvents, Nothing] = - _.evalTap { case TokenedEvents(events, _, _) => + _.evalTap { case TokenedEvents(events, _) => Logger[F].debug(s"Batch of ${events.size} events received from the source stream") } .through(processor) diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index a7dfece8..00f3ac89 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -16,9 +16,10 @@ import cats.effect.testing.specs2.CatsEffect import fs2.{Chunk, Stream} import org.specs2.Specification -import scala.concurrent.duration.{DurationInt, FiniteDuration} - +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import java.nio.charset.StandardCharsets +import java.time.Instant + import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import java.nio.ByteBuffer @@ -42,17 +43,28 @@ class LowLevelSourceSpec extends Specification with CatsEffect { use a short first window according to the configuration $windowed5 When reporting healthy status - report healthy when there are no events $health1 - report lagging if there are unprocessed events $health2 - report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $health3 - report healthy after all events have been processed and acked $health4 - report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health5 - report unhealthy if the underlying low level source is lagging $health6 + report healthy when there are no events but the source emits periodic liveness pings $health1 + report unhealthy when there are no events and no liveness pings $health2 + report lagging if there are unprocessed events $health3 + report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $health4 + report healthy after all events have been processed and acked $health5 + report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health6 + report unhealthy if the underlying low level source is lagging $health7 + + When reporting currentStreamLatency + report no timestamp when there are no events $latency1 + report a timestamp if there are unprocessed events $latency2 + report no timestamp after events are processed $latency3 + + When pushing latency metrics + report no metric when there are no events and no liveness pings $latencyMetric1 + report zero latency when there are no events, but regular liveness pings $latencyMetric2 + report non-zero latency when the source emits timestamped events $latencyMetric3 """ def e1 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -109,7 +121,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def e2 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 100, @@ -144,7 +156,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def e3 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -195,7 +207,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed1 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -303,7 +315,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed2 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -336,7 +348,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed3 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.minutes, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.minutes, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -381,7 +393,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed4 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.seconds, 1.0, numEagerWindows = 4)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.seconds, 1.0, numEagerWindows = 4), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -448,7 +460,8 @@ class LowLevelSourceSpec extends Specification with CatsEffect { duration = 60.seconds, firstWindowScaling = 0.25, // so first window is 15 seconds numEagerWindows = 2 - ) + ), + _ => IO.unit ) val testConfig = TestSourceConfig( @@ -496,13 +509,12 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def health1 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) - // A source that emits nothing + // A source that emits periodic liveness pings val lowLevelSource = new LowLevelSource[IO, Unit] { - def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) - def lastLiveness: IO[FiniteDuration] = IO.realTime + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) } val io = for { @@ -511,7 +523,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(1.hour) - health <- sourceAndAck.isHealthy(1.nanosecond) + health <- sourceAndAck.isHealthy(10.seconds) _ <- fiber.cancel } yield health must beEqualTo(SourceAndAck.Healthy) @@ -520,7 +532,30 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def health2 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + health <- sourceAndAck.isHealthy(1.nanosecond) + _ <- fiber.cancel + } yield health must beEqualTo(SourceAndAck.InactiveSource(1.hour)) + + TestControl.executeEmbed(io) + } + + def health3 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -542,9 +577,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health3 = { + def health4 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -566,9 +601,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health4 = { + def health5 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 1, @@ -583,25 +618,24 @@ class LowLevelSourceSpec extends Specification with CatsEffect { processor = testProcessor(refActions, testConfig) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(5.minutes) - health <- sourceAndAck.isHealthy(1.microsecond) + health <- sourceAndAck.isHealthy(2.seconds) _ <- fiber.cancel } yield health must beEqualTo(SourceAndAck.Healthy) TestControl.executeEmbed(io) } - def health5 = { + def health6 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) // A source that emits one batch per inner stream, with a 5 minute "rebalancing" in between val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.fixedDelay[IO](5.minutes).map { _ => - Stream.emit(LowLevelEvents(Chunk.empty, (), None)) + Stream.emit(Some(LowLevelEvents(Chunk.empty, (), None))) } - def lastLiveness: IO[FiniteDuration] = IO.realTime } val io = for { @@ -617,14 +651,13 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health6 = { + def health7 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val lowLevelSource = new LowLevelSource[IO, Unit] { - def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) - def lastLiveness: IO[FiniteDuration] = IO.realTime.map(_ - 10.seconds) + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) } val io = for { @@ -635,10 +668,162 @@ class LowLevelSourceSpec extends Specification with CatsEffect { _ <- IO.sleep(5.minutes) health <- sourceAndAck.isHealthy(5.seconds) _ <- fiber.cancel - } yield health must beEqualTo(SourceAndAck.InactiveSource(10.seconds)) + } yield health must beEqualTo(SourceAndAck.InactiveSource(5.minutes)) + + TestControl.executeEmbed(io) + } + + /** Specs for currentStreamLatency */ + + def latency1 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + + def latency2 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z") + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 0.second, + timeToProcessBatch = 1.hour, // Processor is very slow to sink the events + streamTstamp = streamTstamp + ) + + val io = for { + _ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes) + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = testProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(5.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beSome(7.minutes) TestControl.executeEmbed(io) } + + def latency3 = { + + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2), _ => IO.unit) + + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 1.second, + timeToProcessBatch = 1.second + ) + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(30.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + + /** Specs for latency metric */ + + def latencyMetric1 = { + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics must beEmpty + + TestControl.executeEmbed(io) + } + + def latencyMetric2 = { + + // A source that emits periodic liveness pings + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics.toSet must contain(exactly(Duration.Zero)) + + TestControl.executeEmbed(io) + } + + def latencyMetric3 = { + + val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z") + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 1.second, + timeToProcessBatch = 1.second, + streamTstamp = streamTstamp + ) + + val io = for { + _ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes) + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(10.seconds) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics must contain(beBetween(120.seconds, 130.seconds)) + + TestControl.executeEmbed(io) + } + } object LowLevelSourceSpec { @@ -657,7 +842,8 @@ object LowLevelSourceSpec { eventsPerBatch: Int, timeBetweenBatches: FiniteDuration, timeToProcessBatch: FiniteDuration, - timeToFinalizeWindow: FiniteDuration = 0.seconds + timeToFinalizeWindow: FiniteDuration = 0.seconds, + streamTstamp: Instant = Instant.EPOCH ) /** @@ -670,7 +856,7 @@ object LowLevelSourceSpec { val start = IO.realTimeInstant.flatMap(t => ref.update(_ :+ Action.ProcessorStartedWindow(t.toString))) val end = IO.realTimeInstant.flatMap(t => ref.update(_ :+ Action.ProcessorReachedEndOfWindow(t.toString))) - val middle = in.evalMap { case TokenedEvents(events, token, _) => + val middle = in.evalMap { case TokenedEvents(events, token) => val deserialized = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList for { now <- IO.realTimeInstant @@ -699,7 +885,7 @@ object LowLevelSourceSpec { tokens <- checkpoints.get } yield tokens.reverse - val middle = in.evalMap { case TokenedEvents(events, token, _) => + val middle = in.evalMap { case TokenedEvents(events, token) => val deserialized = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList for { now <- IO.realTimeInstant @@ -718,6 +904,7 @@ object LowLevelSourceSpec { * * - It emits batches of events at regular intervals * - It "rebalances" (like Kafka) after every few batches, which means it emits a new stream + * - It periodically emits `None`s to report its liveness * - It uses a ref to record which events got checkpointed */ def testLowLevelSource(ref: Ref[IO, Vector[Action]], config: TestSourceConfig): LowLevelSource[IO, List[String]] = @@ -727,7 +914,7 @@ object LowLevelSourceSpec { ref.update(_ :+ Action.Checkpointed(toCheckpoint)) } - def stream: Stream[IO, Stream[IO, LowLevelEvents[List[String]]]] = + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[List[String]]]]] = Stream.eval(Ref[IO].of(0)).flatMap { counter => Stream.unit.repeat.map { _ => Stream @@ -737,16 +924,22 @@ object LowLevelSourceSpec { .map { numbers => val events = numbers.map(_.toString) val asBytes = Chunk.from(events).map(e => ByteBuffer.wrap(e.getBytes(StandardCharsets.UTF_8))) - LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = None) + Some( + LowLevelEvents( + events = asBytes, + ack = events.toList, + earliestSourceTstamp = Some(config.streamTstamp.toEpochMilli.millis) + ) + ) } } .flatMap { e => Stream.emit(e) ++ Stream.sleep[IO](config.timeBetweenBatches).drain } .repeatN(config.batchesPerRebalance.toLong) + .mergeHaltL(Stream.awakeDelay[IO](1.second).map(_ => None).repeat) } } - def lastLiveness: IO[FiniteDuration] = IO.realTime } }