diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala index 17e6e90d..89588b98 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala @@ -76,7 +76,7 @@ object TestMetrics { ref: Ref[IO, TestState], emptyState: TestState, config: Option[Metrics.StatsdConfig] - ) extends Metrics[IO, TestState](ref, emptyState, config) { + ) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) { def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c)) def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t)) } diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala index 00234485..de9fb63a 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala @@ -24,8 +24,6 @@ import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing import com.snowplowanalytics.snowplow.it.DockerPull import com.snowplowanalytics.snowplow.it.kinesis._ -import java.time.Instant - import Utils._ import org.specs2.specification.BeforeAll @@ -60,23 +58,21 @@ class KinesisSourceSpec for { refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil) - t1 <- IO.realTimeInstant _ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload) - t2 <- IO.realTimeInstant - processingConfig = new EventProcessingConfig(NoWindowing) + refReportedLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + processingConfig = EventProcessingConfig(NoWindowing, tstamp => refReportedLatencies.update(_ :+ tstamp)) kinesisConfig = getKinesisSourceConfig(testStream1Name) sourceAndAck <- KinesisSource.build[IO](kinesisConfig) stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed)) fiber <- stream.compile.drain.start _ <- IO.sleep(2.minutes) processed <- refProcessed.get + reportedLatencies <- refReportedLatencies.get _ <- fiber.cancel } yield List( processed must haveSize(1), processed.head.events must beEqualTo(List(testPayload)), - processed.head.tstamp must beSome { tstamp: Instant => - tstamp.toEpochMilli must beBetween(t1.toEpochMilli, t2.toEpochMilli) - } + reportedLatencies.max must beBetween(1.second, 2.minutes) ).reduce(_ and _) } } diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index ae661384..df560b25 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -24,14 +24,13 @@ import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig import java.net.URI import java.nio.charset.StandardCharsets import java.util.UUID -import java.time.Instant import scala.concurrent.duration.DurationLong import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest object Utils { - case class ReceivedEvents(events: List[String], tstamp: Option[Instant]) + case class ReceivedEvents(events: List[String]) def putDataToKinesis( client: KinesisAsyncClient, @@ -81,7 +80,7 @@ object Utils { .build() val out = - ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None) + ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray()))) out } @@ -108,10 +107,10 @@ object Utils { ) def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] = - _.evalMap { case TokenedEvents(events, token, tstamp) => + _.evalMap { case TokenedEvents(events, token) => val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString) for { - _ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp)) + _ <- ref.update(_ :+ ReceivedEvents(parsed.toList)) } yield token } diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index dc9feef6..f80b7a00 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -19,8 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.reflect._ import java.nio.ByteBuffer -import java.time.Instant -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationLong // kafka import fs2.kafka._ @@ -48,11 +47,8 @@ object KafkaSource { new LowLevelSource[F, KafkaCheckpoints[F]] { def checkpointer: Checkpointer[F, KafkaCheckpoints[F]] = kafkaCheckpointer - def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] = + def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] = kafkaStream(config, authHandlerClass) - - def lastLiveness: F[FiniteDuration] = - Sync[F].realTime } case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit]) @@ -75,7 +71,7 @@ object KafkaSource { private def kafkaStream[F[_]: Async, T <: AzureAuthenticationCallbackHandler]( config: KafkaSourceConfig, authHandlerClass: ClassTag[T] - ): Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] = + ): Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] = KafkaConsumer .stream(consumerSettings[F, T](config, authHandlerClass)) .evalTap(_.subscribeTo(config.topicName)) @@ -89,7 +85,7 @@ object KafkaSource { private def joinPartitions[F[_]: Async]( partitioned: PartitionedStreams[F] - ): Stream[F, LowLevelEvents[KafkaCheckpoints[F]]] = { + ): Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]] = { val streams = partitioned.toSeq.map { case (topicPartition, stream) => stream.chunks .flatMap { chunk => @@ -103,8 +99,8 @@ object KafkaSource { val ts = ccr.record.timestamp ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime) } - val earliestTimestamp = if (timestamps.isEmpty) None else Some(Instant.ofEpochMilli(timestamps.min)) - Stream.emit(LowLevelEvents(events, ack, earliestTimestamp)) + val earliestTimestamp = if (timestamps.isEmpty) None else Some(timestamps.min.millis) + Stream.emit(Some(LowLevelEvents(events, ack, earliestTimestamp))) case None => Stream.empty } @@ -117,6 +113,7 @@ object KafkaSource { Stream .emits(streams) .parJoinUnbounded + .mergeHaltL(Stream.awakeDelay(10.seconds).map(_ => None).repeat) // keepalives .onFinalize { Logger[F].info(s"Stopping processing of partitions: $formatted") } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 1b016148..9f87ddf4 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.effect.{Async, Ref, Sync} +import cats.effect.{Async, Sync} import cats.data.NonEmptyList import cats.implicits._ import com.snowplowanalytics.snowplow.sources.SourceAndAck @@ -19,7 +19,7 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber import java.util.concurrent.{CountDownLatch, SynchronousQueue} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationLong import scala.jdk.CollectionConverters._ object KinesisSource { @@ -27,18 +27,13 @@ object KinesisSource { private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = - Ref.ofEffect(Sync[F].realTime).flatMap { liveness => - LowLevelSource.toSourceAndAck { - new LowLevelSource[F, Map[String, Checkpointable]] { - def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = - kinesisStream(config, liveness) + LowLevelSource.toSourceAndAck { + new LowLevelSource[F, Map[String, Checkpointable]] { + def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = + kinesisStream(config) - def checkpointer: KinesisCheckpointer[F] = - new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) - - def lastLiveness: F[FiniteDuration] = - liveness.get - } + def checkpointer: KinesisCheckpointer[F] = + new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) } } @@ -46,26 +41,24 @@ object KinesisSource { private val synchronousQueueFairness: Boolean = true private def kinesisStream[F[_]: Async]( - config: KinesisSourceConfig, - liveness: Ref[F, FiniteDuration] - ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { + config: KinesisSourceConfig + ): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = { val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness) for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) - events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat + events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat } yield events } private def pullFromQueueAndEmit[F[_]: Sync]( - queue: SynchronousQueue[KCLAction], - liveness: Ref[F, FiniteDuration] - ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = - Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) => + queue: SynchronousQueue[KCLAction] + ): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = + Pull.eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult(actions, hasShardEnd) => val toEmit = actions.traverse { case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => - Pull.done + Pull.output1(None) case KCLAction.ProcessRecords(shardId, processRecordsInput) => - Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] + Pull.output1(Some(provideNextChunk(shardId, processRecordsInput))).covary[F] case KCLAction.ShardEnd(shardId, await, shardEndedInput) => handleShardEnd[F](shardId, await, shardEndedInput) case KCLAction.KCLError(t) => @@ -81,14 +74,13 @@ object KinesisSource { } Pull.eval(log).covaryOutput *> toEmit *> Pull.done } else - toEmit *> pullFromQueueAndEmit(queue, liveness) + toEmit *> pullFromQueueAndEmit(queue) } private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean) - private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] = + private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[PullFromQueueResult] = resolveNextAction(queue) - .productL(updateLiveness(liveness)) .flatMap { case shardEnd: KCLAction.ShardEnd => // If we reached the end of one shard, it is likely we reached the end of other shards too. @@ -115,9 +107,6 @@ object KinesisSource { _ <- Sync[F].delay(queue.drainTo(ret)) } yield ret.asScala.toList - private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] = - Sync[F].realTime.flatMap(now => liveness.set(now)) - private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = { val chunk = Chunk.javaList(input.records()).map(_.data()) val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above @@ -126,17 +115,21 @@ object KinesisSource { new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), input.checkpointer ) - LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp)) + LowLevelEvents( + chunk, + Map[String, Checkpointable](shardId -> checkpointable), + Some(firstRecord.approximateArrivalTimestamp.toEpochMilli.millis) + ) } private def handleShardEnd[F[_]]( shardId: String, await: CountDownLatch, shardEndedInput: ShardEndedInput - ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = { + ): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = { val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await) val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) - Pull.output1(last) + Pull.output1(Some(last)) } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 743b891a..5d2101b4 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -14,8 +14,6 @@ import fs2.{Chunk, Stream} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import java.time.Instant - // pubsub import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} import com.google.api.gax.grpc.ChannelPoolSettings @@ -31,7 +29,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ -import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import scala.jdk.CollectionConverters._ import java.util.concurrent.{ExecutorService, Executors} @@ -64,17 +62,14 @@ object PubsubSource { new LowLevelSource[F, Vector[Unique.Token]] { def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources) - def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] = pubsubStream(config, deferredResources) - - def lastLiveness: F[FiniteDuration] = - Sync[F].realTime } private def pubsubStream[F[_]: Async]( config: PubsubSourceConfig, deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] - ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + ): Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] = for { parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) stub <- Stream.resource(stubResource(config)) @@ -83,7 +78,6 @@ object PubsubSource { } yield Stream .fixedRateStartImmediately(config.debounceRequests, dampen = true) .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) - .unNone .prefetchN(parallelPullCount) .concurrently(extendDeadlines(config, stub, refStates)) .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) @@ -124,10 +118,10 @@ object PubsubSource { } } - private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = { + private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): FiniteDuration = { val (tstampSeconds, tstampNanos) = records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min - Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong) + tstampSeconds.seconds + tstampNanos.toLong.nanos } /** diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala index ea88db9c..90951e62 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8 abstract class Metrics[F[_]: Async, S <: Metrics.State]( ref: Ref[F, S], - emptyState: S, + initState: F[S], config: Option[Metrics.StatsdConfig] ) { def report: Stream[F, Nothing] = Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters => def report = for { - state <- ref.getAndSet(emptyState) + nextState <- initState + state <- ref.getAndSet(nextState) kv = state.toKVMetrics _ <- reporters.traverse(_.report(kv)) } yield () diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala index 29f24679..758fd5e2 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala @@ -25,8 +25,11 @@ import scala.concurrent.duration.FiniteDuration * Whether to open a new [[EventProcessor]] to handle a timed window of events (e.g. for the * transformer) or whether to feed events to a single [[EventProcessor]] in a continuous endless * stream (e.g. Enrich) + * + * @param latencyConsumer + * common-streams apps should use the latencyConsumer to set the latency metric. */ -case class EventProcessingConfig(windowing: EventProcessingConfig.Windowing) +case class EventProcessingConfig[F[_]](windowing: EventProcessingConfig.Windowing, latencyConsumer: FiniteDuration => F[Unit]) object EventProcessingConfig { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index 384daf76..d1cf5caf 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources import cats.Show import cats.implicits._ import fs2.Stream + import scala.concurrent.duration.FiniteDuration /** @@ -32,7 +33,7 @@ trait SourceAndAck[F[_]] { * @return * A stream which should be compiled and drained */ - def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] + def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing] /** * Reports on whether the source of events is healthy @@ -48,6 +49,22 @@ trait SourceAndAck[F[_]] { * healthy. If any event is "stuck" then latency is high and the probe should report unhealthy. */ def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] + + /** + * Latency of the message that is currently being processed by the downstream application + * + * The returned value is `None` if this `SourceAndAck` is currently awaiting messages from + * upstream, e.g. it is doing a remote fetch. + * + * The returned value is `Some` if this `SourceAndAck` has emitted a message downstream to the + * application, and it is waiting for the downstream app to "pull" the next message from this + * `SourceAndAck`. + * + * This value should be used as the initial latency at the start of a statsd metrics reporting + * period. This ensures the app reports non-zero latency even when the app is stuck (e.g. cannot + * load events to destination). + */ + def currentStreamLatency: F[Option[FiniteDuration]] } object SourceAndAck { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala index da1f0ae4..d4d9aaa6 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala @@ -11,7 +11,6 @@ import cats.effect.kernel.Unique import fs2.Chunk import java.nio.ByteBuffer -import java.time.Instant /** * The events as they are fed into a [[EventProcessor]] @@ -22,12 +21,8 @@ import java.time.Instant * The [[EventProcessor]] must emit this token after it has fully processed the batch of events. * When the [[EventProcessor]] emits the token, it is an instruction to the [[SourceAndAck]] to * ack/checkpoint the events. - * @param earliestSourceTstamp - * The timestamp that an event was originally written to the source stream. Used for calculating - * the latency metric. */ case class TokenedEvents( events: Chunk[ByteBuffer], - ack: Unique.Token, - earliestSourceTstamp: Option[Instant] + ack: Unique.Token ) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala index 6353fd88..f791e745 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala @@ -10,16 +10,19 @@ package com.snowplowanalytics.snowplow.sources.internal import fs2.Chunk import java.nio.ByteBuffer -import java.time.Instant +import scala.concurrent.duration.FiniteDuration /** * The events and checkpointable item emitted by a LowLevelSource * * This library uses LowLevelEvents internally, but it is never exposed to the high level event * processor + * + * @param earliestSourceTstamp + * A point in time, represented as a `FiniteDuration` since epoch */ case class LowLevelEvents[C]( events: Chunk[ByteBuffer], ack: C, - earliestSourceTstamp: Option[Instant] + earliestSourceTstamp: Option[FiniteDuration] ) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 3ac01bb6..7daa059e 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -17,7 +17,7 @@ import fs2.{Pipe, Pull, Stream} import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger -import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} /** @@ -43,16 +43,12 @@ private[sources] trait LowLevelSource[F[_], C] { * rebalancing. * * A new [[EventProcessor]] will be invoked for each inner stream - */ - def stream: Stream[F, Stream[F, LowLevelEvents[C]]] - - /** - * The last time this source was known to be alive and healthy * - * The returned value is FiniteDuration since the unix epoch, i.e. the value returned by - * `Sync[F].realTime` + * The inner stream should periodically emit `None` as a signal that it is alive and healthy, even + * when there are no events on the stream. Failure to emit frequently will result in the + * `SourceAndAck` reporting itself as unhealthy. */ - def lastLiveness: F[FiniteDuration] + def stream: Stream[F, Stream[F, Option[LowLevelEvents[C]]]] } private[sources] object LowLevelSource { @@ -65,16 +61,36 @@ private[sources] object LowLevelSource { * Map is keyed by Token, corresponding to a batch of `TokenedEvents`. Map values are `C`s which * is how to ack/checkpoint the batch. */ - private type State[C] = Map[Unique.Token, C] + private type AcksState[C] = Map[Unique.Token, C] - /** - * Mutable state used for measuring the event-processing latency from this source - * - * For the batch that was last emitted downstream for processing, the `Option[FiniteDuration]` - * represents the real time (since epoch) that the batch was emitted. If it is None then there is - * no batch currently being processed. - */ - private type LatencyRef[F[_]] = Ref[F, Option[FiniteDuration]] + private sealed trait InternalState + + private object InternalState { + + /** + * The Source is awaiting the downstream processor to pull another message from us + * + * @param since + * Timestamp of when the last message was emitted downstream. A point in time represented as + * FiniteDuration since the epoch. + * @param streamTstamp + * For the last last batch to be emitted downstream, this is the earliest timestamp according + * to the source, i.e. time the event was written to the source stream. It is None if this + * stream type does not record timestamps, e.g. Kafka under some circumstances. + */ + case class AwaitingDownstream(since: FiniteDuration, streamTstamp: Option[FiniteDuration]) extends InternalState + + /** + * The Source is awaiting the upstream remote source to provide more messages + * + * @param since + * Timestamp of when the last message was received up upstream. A point in time represented as + * FiniteDuration since the epoch. + */ + case class AwaitingUpstream(since: FiniteDuration) extends InternalState + + case object Disconnected extends InternalState + } /** * Lifts the internal [[LowLevelSource]] into a [[SourceAndAck]], which is the public API of this @@ -82,23 +98,22 @@ private[sources] object LowLevelSource { */ def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] = for { - latencyRef <- Ref[F].of(Option.empty[FiniteDuration]) - isConnectedRef <- Ref[F].of(false) - } yield sourceAndAckImpl(source, latencyRef, isConnectedRef) + stateRef <- Ref[F].of[InternalState](InternalState.Disconnected) + } yield sourceAndAckImpl(source, stateRef) private def sourceAndAckImpl[F[_]: Async, C]( source: LowLevelSource[F, C], - latencyRef: LatencyRef[F], - isConnectedRef: Ref[F, Boolean] + stateRef: Ref[F, InternalState] ): SourceAndAck[F] = new SourceAndAck[F] { - def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] = { + def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing] = { val str = for { s2 <- source.stream acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _)) - _ <- Stream.bracket(isConnectedRef.set(true))(_ => isConnectedRef.set(false)) + now <- Stream.eval(Sync[F].realTime) + _ <- Stream.bracket(stateRef.set(InternalState.AwaitingUpstream(now)))(_ => stateRef.set(InternalState.Disconnected)) } yield { val tokenedSources = s2 - .through(monitorLatency(latencyRef)) + .through(monitorLatency(config, stateRef)) .through(tokened(acksRef)) .through(windowed(config.windowing)) @@ -116,18 +131,27 @@ private[sources] object LowLevelSource { } def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] = - (isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN { - case (false, _, _, _) => + (stateRef.get, Sync[F].realTime).mapN { + case (InternalState.Disconnected, _) => SourceAndAck.Disconnected - case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency => - SourceAndAck.LaggingEventProcessor(now - lastPullTime) - case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency => - SourceAndAck.InactiveSource(now - lastLiveness) + case (InternalState.AwaitingDownstream(since, _), now) if now - since > maxAllowedProcessingLatency => + SourceAndAck.LaggingEventProcessor(now - since) + case (InternalState.AwaitingUpstream(since), now) if now - since > maxAllowedProcessingLatency => + SourceAndAck.InactiveSource(now - since) case _ => SourceAndAck.Healthy } + + def currentStreamLatency: F[Option[FiniteDuration]] = + stateRef.get.flatMap { + case InternalState.AwaitingDownstream(_, Some(tstamp)) => + Sync[F].realTime.map { now => + Some(now - tstamp) + } + case _ => none.pure[F] + } } - private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, State[C]]): F[Unit] = + private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, AcksState[C]]): F[Unit] = ref.get .flatMap { map => checkpointer.nack(checkpointer.combineAll(map.values)) @@ -138,29 +162,41 @@ private[sources] object LowLevelSource { * * The token can later be exchanged for the original checkpointable item */ - private def tokened[F[_]: Sync, C](ref: Ref[F, State[C]]): Pipe[F, LowLevelEvents[C], TokenedEvents] = - _.evalMap { case LowLevelEvents(events, ack, earliestSourceTstamp) => + private def tokened[F[_]: Sync, C](ref: Ref[F, AcksState[C]]): Pipe[F, LowLevelEvents[C], TokenedEvents] = + _.evalMap { case LowLevelEvents(events, ack, _) => for { token <- Unique[F].unique _ <- ref.update(_ + (token -> ack)) - } yield TokenedEvents(events, token, earliestSourceTstamp) + } yield TokenedEvents(events, token) } /** * An fs2 Pipe which records what time (duration since epoch) we last emitted a batch downstream * for processing */ - private def monitorLatency[F[_]: Sync, A](ref: Ref[F, Option[FiniteDuration]]): Pipe[F, A, A] = { + private def monitorLatency[F[_]: Sync, C]( + config: EventProcessingConfig[F], + ref: Ref[F, InternalState] + ): Pipe[F, Option[LowLevelEvents[C]], LowLevelEvents[C]] = { - def go(source: Stream[F, A]): Pull[F, A, Unit] = + def go(source: Stream[F, Option[LowLevelEvents[C]]]): Pull[F, LowLevelEvents[C], Unit] = source.pull.uncons1.flatMap { case None => Pull.done - case Some((pulled, source)) => + case Some((Some(pulled), source)) => for { now <- Pull.eval(Sync[F].realTime) - _ <- Pull.eval(ref.set(Some(now))) + latency = pulled.earliestSourceTstamp.fold(Duration.Zero)(now - _) + _ <- Pull.eval(config.latencyConsumer(latency)) + _ <- Pull.eval(ref.set(InternalState.AwaitingDownstream(now, pulled.earliestSourceTstamp))) _ <- Pull.output1(pulled) - _ <- Pull.eval(ref.set(None)) + _ <- Pull.eval(ref.set(InternalState.AwaitingUpstream(now))) + _ <- go(source) + } yield () + case Some((None, source)) => + for { + now <- Pull.eval(Sync[F].realTime) + _ <- Pull.eval(config.latencyConsumer(Duration.Zero)) + _ <- Pull.eval(ref.set(InternalState.AwaitingUpstream(now))) _ <- go(source) } yield () } @@ -190,11 +226,11 @@ private[sources] object LowLevelSource { */ private def messageSink[F[_]: Async, C]( processor: EventProcessor[F], - ref: Ref[F, State[C]], + ref: Ref[F, AcksState[C]], checkpointer: Checkpointer[F, C], control: EagerWindows.Control[F] ): Pipe[F, TokenedEvents, Nothing] = - _.evalTap { case TokenedEvents(events, _, _) => + _.evalTap { case TokenedEvents(events, _) => Logger[F].debug(s"Batch of ${events.size} events received from the source stream") } .through(processor) diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index a7dfece8..00f3ac89 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -16,9 +16,10 @@ import cats.effect.testing.specs2.CatsEffect import fs2.{Chunk, Stream} import org.specs2.Specification -import scala.concurrent.duration.{DurationInt, FiniteDuration} - +import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import java.nio.charset.StandardCharsets +import java.time.Instant + import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import java.nio.ByteBuffer @@ -42,17 +43,28 @@ class LowLevelSourceSpec extends Specification with CatsEffect { use a short first window according to the configuration $windowed5 When reporting healthy status - report healthy when there are no events $health1 - report lagging if there are unprocessed events $health2 - report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $health3 - report healthy after all events have been processed and acked $health4 - report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health5 - report unhealthy if the underlying low level source is lagging $health6 + report healthy when there are no events but the source emits periodic liveness pings $health1 + report unhealthy when there are no events and no liveness pings $health2 + report lagging if there are unprocessed events $health3 + report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $health4 + report healthy after all events have been processed and acked $health5 + report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health6 + report unhealthy if the underlying low level source is lagging $health7 + + When reporting currentStreamLatency + report no timestamp when there are no events $latency1 + report a timestamp if there are unprocessed events $latency2 + report no timestamp after events are processed $latency3 + + When pushing latency metrics + report no metric when there are no events and no liveness pings $latencyMetric1 + report zero latency when there are no events, but regular liveness pings $latencyMetric2 + report non-zero latency when the source emits timestamped events $latencyMetric3 """ def e1 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -109,7 +121,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def e2 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 100, @@ -144,7 +156,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def e3 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -195,7 +207,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed1 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -303,7 +315,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed2 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -336,7 +348,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed3 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.minutes, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.minutes, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 5, @@ -381,7 +393,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def windowed4 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.seconds, 1.0, numEagerWindows = 4)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(10.seconds, 1.0, numEagerWindows = 4), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -448,7 +460,8 @@ class LowLevelSourceSpec extends Specification with CatsEffect { duration = 60.seconds, firstWindowScaling = 0.25, // so first window is 15 seconds numEagerWindows = 2 - ) + ), + _ => IO.unit ) val testConfig = TestSourceConfig( @@ -496,13 +509,12 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def health1 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) - // A source that emits nothing + // A source that emits periodic liveness pings val lowLevelSource = new LowLevelSource[IO, Unit] { - def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) - def lastLiveness: IO[FiniteDuration] = IO.realTime + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) } val io = for { @@ -511,7 +523,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(1.hour) - health <- sourceAndAck.isHealthy(1.nanosecond) + health <- sourceAndAck.isHealthy(10.seconds) _ <- fiber.cancel } yield health must beEqualTo(SourceAndAck.Healthy) @@ -520,7 +532,30 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def health2 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + health <- sourceAndAck.isHealthy(1.nanosecond) + _ <- fiber.cancel + } yield health must beEqualTo(SourceAndAck.InactiveSource(1.hour)) + + TestControl.executeEmbed(io) + } + + def health3 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -542,9 +577,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health3 = { + def health4 = { - val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2)) + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2), _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = Int.MaxValue, @@ -566,9 +601,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health4 = { + def health5 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val testConfig = TestSourceConfig( batchesPerRebalance = 1, @@ -583,25 +618,24 @@ class LowLevelSourceSpec extends Specification with CatsEffect { processor = testProcessor(refActions, testConfig) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(5.minutes) - health <- sourceAndAck.isHealthy(1.microsecond) + health <- sourceAndAck.isHealthy(2.seconds) _ <- fiber.cancel } yield health must beEqualTo(SourceAndAck.Healthy) TestControl.executeEmbed(io) } - def health5 = { + def health6 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) // A source that emits one batch per inner stream, with a 5 minute "rebalancing" in between val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.fixedDelay[IO](5.minutes).map { _ => - Stream.emit(LowLevelEvents(Chunk.empty, (), None)) + Stream.emit(Some(LowLevelEvents(Chunk.empty, (), None))) } - def lastLiveness: IO[FiniteDuration] = IO.realTime } val io = for { @@ -617,14 +651,13 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def health6 = { + def health7 = { - val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) val lowLevelSource = new LowLevelSource[IO, Unit] { - def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) - def lastLiveness: IO[FiniteDuration] = IO.realTime.map(_ - 10.seconds) + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) } val io = for { @@ -635,10 +668,162 @@ class LowLevelSourceSpec extends Specification with CatsEffect { _ <- IO.sleep(5.minutes) health <- sourceAndAck.isHealthy(5.seconds) _ <- fiber.cancel - } yield health must beEqualTo(SourceAndAck.InactiveSource(10.seconds)) + } yield health must beEqualTo(SourceAndAck.InactiveSource(5.minutes)) + + TestControl.executeEmbed(io) + } + + /** Specs for currentStreamLatency */ + + def latency1 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + + def latency2 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z") + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 0.second, + timeToProcessBatch = 1.hour, // Processor is very slow to sink the events + streamTstamp = streamTstamp + ) + + val io = for { + _ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes) + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = testProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(5.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beSome(7.minutes) TestControl.executeEmbed(io) } + + def latency3 = { + + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2), _ => IO.unit) + + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 1.second, + timeToProcessBatch = 1.second + ) + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(30.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + + /** Specs for latency metric */ + + def latencyMetric1 = { + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics must beEmpty + + TestControl.executeEmbed(io) + } + + def latencyMetric2 = { + + // A source that emits periodic liveness pings + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics.toSet must contain(exactly(Duration.Zero)) + + TestControl.executeEmbed(io) + } + + def latencyMetric3 = { + + val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z") + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 1.second, + timeToProcessBatch = 1.second, + streamTstamp = streamTstamp + ) + + val io = for { + _ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes) + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + refLatencies <- Ref[IO].of(Vector.empty[FiniteDuration]) + config = EventProcessingConfig(EventProcessingConfig.NoWindowing, metric => refLatencies.update(_ :+ metric)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(10.seconds) + latencyMetrics <- refLatencies.get + _ <- fiber.cancel + } yield latencyMetrics must contain(beBetween(120.seconds, 130.seconds)) + + TestControl.executeEmbed(io) + } + } object LowLevelSourceSpec { @@ -657,7 +842,8 @@ object LowLevelSourceSpec { eventsPerBatch: Int, timeBetweenBatches: FiniteDuration, timeToProcessBatch: FiniteDuration, - timeToFinalizeWindow: FiniteDuration = 0.seconds + timeToFinalizeWindow: FiniteDuration = 0.seconds, + streamTstamp: Instant = Instant.EPOCH ) /** @@ -670,7 +856,7 @@ object LowLevelSourceSpec { val start = IO.realTimeInstant.flatMap(t => ref.update(_ :+ Action.ProcessorStartedWindow(t.toString))) val end = IO.realTimeInstant.flatMap(t => ref.update(_ :+ Action.ProcessorReachedEndOfWindow(t.toString))) - val middle = in.evalMap { case TokenedEvents(events, token, _) => + val middle = in.evalMap { case TokenedEvents(events, token) => val deserialized = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList for { now <- IO.realTimeInstant @@ -699,7 +885,7 @@ object LowLevelSourceSpec { tokens <- checkpoints.get } yield tokens.reverse - val middle = in.evalMap { case TokenedEvents(events, token, _) => + val middle = in.evalMap { case TokenedEvents(events, token) => val deserialized = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList for { now <- IO.realTimeInstant @@ -718,6 +904,7 @@ object LowLevelSourceSpec { * * - It emits batches of events at regular intervals * - It "rebalances" (like Kafka) after every few batches, which means it emits a new stream + * - It periodically emits `None`s to report its liveness * - It uses a ref to record which events got checkpointed */ def testLowLevelSource(ref: Ref[IO, Vector[Action]], config: TestSourceConfig): LowLevelSource[IO, List[String]] = @@ -727,7 +914,7 @@ object LowLevelSourceSpec { ref.update(_ :+ Action.Checkpointed(toCheckpoint)) } - def stream: Stream[IO, Stream[IO, LowLevelEvents[List[String]]]] = + def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[List[String]]]]] = Stream.eval(Ref[IO].of(0)).flatMap { counter => Stream.unit.repeat.map { _ => Stream @@ -737,16 +924,22 @@ object LowLevelSourceSpec { .map { numbers => val events = numbers.map(_.toString) val asBytes = Chunk.from(events).map(e => ByteBuffer.wrap(e.getBytes(StandardCharsets.UTF_8))) - LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = None) + Some( + LowLevelEvents( + events = asBytes, + ack = events.toList, + earliestSourceTstamp = Some(config.streamTstamp.toEpochMilli.millis) + ) + ) } } .flatMap { e => Stream.emit(e) ++ Stream.sleep[IO](config.timeBetweenBatches).drain } .repeatN(config.batchesPerRebalance.toLong) + .mergeHaltL(Stream.awakeDelay[IO](1.second).map(_ => None).repeat) } } - def lastLiveness: IO[FiniteDuration] = IO.realTime } }