diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index d99383b..02dc209 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -50,7 +50,7 @@ object PubsubSource { def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = - Stream.emit(pubsubStream(config)) + pubsubStream(config) } private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] { @@ -114,12 +114,11 @@ object PubsubSource { } } - private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] = + private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = for { control <- Stream.eval(Control.build(config)) _ <- Stream.resource(runSubscriber(config, control)) - events <- consumeFromQueue(config, control) - } yield events + } yield consumeFromQueue(config, control) private def consumeFromQueue[F[_]: Sync]( config: PubsubSourceConfig, @@ -208,7 +207,7 @@ object PubsubSource { _ <- Sync[F].delay(apiService.stopAsync()) fiber <- drainQueue(control).start _ <- Logger[F].info("Waiting for the PubSub Subscriber to finish cleanly...") - _ <- Sync[F].blocking(apiService.awaitTerminated(config.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) + _ <- Sync[F].blocking(apiService.awaitTerminated()) _ <- Sync[F].delay(control.phaser.forceTermination()) _ <- fiber.join } yield () diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index be1efd7..358ae44 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -7,6 +7,8 @@ */ package com.snowplowanalytics.snowplow.sources +import cats.Show +import cats.implicits._ import fs2.Stream import scala.concurrent.duration.FiniteDuration @@ -33,14 +35,46 @@ trait SourceAndAck[F[_]] { def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] /** - * Measurement of how long the EventProcessor has spent processing any pending un-acked events. + * Reports on whether the source of events is healthy * - * Note, unlike our statsd metrics, this measurement does not consider min/max values over a - * period of time. It is a snapshot measurement for a single point in time. + * @param maxAllowedProcessingLatency + * A maximum allowed value for how long the `EventProcessor` may spend processing any pending + * un-acked events. If this cutoff is exceeded then `isHealthy` returns an unhealthy status. * - * This measurement is designed to be used as a health probe. If events are getting processed - * quickly then latency is low and the probe should report healthy. If any event is "stuck" then - * latency is high and the probe should report unhealthy. + * Note, unlike our statsd metrics, this latency measurement does not consider min/max values over + * a period of time. It is a snapshot measurement for a single point in time. + * + * If events are getting processed quickly then latency is low and the probe should report + * healthy. If any event is "stuck" then latency is high and the probe should report unhealthy. */ - def processingLatency: F[FiniteDuration] + def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] +} + +object SourceAndAck { + + sealed trait HealthStatus + case object Healthy extends HealthStatus + sealed trait Unhealthy extends HealthStatus + + /** + * The health status expected if the source is at a stage of its lifecycle where cannot provide + * events + * + * For Pubsub this could be because the Subscriber is not yet running. For Kafka this could be due + * to re-balancing. + */ + case object Disconnected extends Unhealthy + + /** + * The health status expected if an event is "stuck" in the EventProcessor + * + * @param latency + * How long the EventProcessor has spent trying to process the stuck event + */ + case class LaggingEventProcessor(latency: FiniteDuration) extends Unhealthy + + implicit def showUnhealthy: Show[Unhealthy] = Show { + case Disconnected => "No connection to a source of events" + case LaggingEventProcessor(latency) => show"Processing latency is $latency" + } } diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 463027b..63c5ef3 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -17,7 +17,7 @@ import fs2.{Pipe, Pull, Stream} import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger -import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} +import scala.concurrent.duration.{DurationLong, FiniteDuration} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} /** @@ -76,16 +76,19 @@ private[sources] object LowLevelSource { def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] = for { latencyRef <- Ref[F].of(Option.empty[FiniteDuration]) - } yield sourceAndAckImpl(source, latencyRef) + isConnectedRef <- Ref[F].of(false) + } yield sourceAndAckImpl(source, latencyRef, isConnectedRef) private def sourceAndAckImpl[F[_]: Async, C]( source: LowLevelSource[F, C], - latencyRef: LatencyRef[F] + latencyRef: LatencyRef[F], + isConnectedRef: Ref[F, Boolean] ): SourceAndAck[F] = new SourceAndAck[F] { def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] = { val str = for { acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _)) s2 <- source.stream + _ <- Stream.bracket(isConnectedRef.set(true))(_ => isConnectedRef.set(false)) } yield { val tokenedSources = s2 .through(monitorLatency(latencyRef)) @@ -105,13 +108,13 @@ private[sources] object LowLevelSource { str.flatten } - def processingLatency: F[FiniteDuration] = - latencyRef.get.flatMap { - case None => Duration.Zero.pure[F] - case Some(lastPullTime) => - Async[F].realTime.map { now => - now - lastPullTime - } + def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] = + (isConnectedRef.get, latencyRef.get, Sync[F].realTime).mapN { + case (false, _, _) => + SourceAndAck.Disconnected + case (_, Some(lastPullTime), now) if now - lastPullTime > maxAllowedProcessingLatency => + SourceAndAck.LaggingEventProcessor(now - lastPullTime) + case _ => SourceAndAck.Healthy } } diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 8680e46..10fbb58 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -18,7 +18,7 @@ import org.specs2.matcher.Matcher import scala.concurrent.duration.{Duration, DurationInt} import java.nio.charset.StandardCharsets -import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} +import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import java.nio.ByteBuffer @@ -37,11 +37,12 @@ class LowLevelSourceSpec extends Specification with CatsEffect { cleanly checkpoint pending window when a stream is interrupted $e5 not checkpoint events if the event processor throws an exception $e6 - When reporting processing latency - report zero latency when there are no events $e7 - report non-zero latency while there are unprocessed events $e8 - report zero latency if events are processed but not yet acked (e.g. a batch-oriented loader) $e9 - report zero latency after all events have been processed and acked $e10 + When reporting healthy status + report healthy when there are no events $e7 + report lagging if there are unprocessed events $e8 + report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $e9 + report healthy after all events have been processed and acked $e10 + report disconnected in between two open windows of events $e11 """ def e1 = { @@ -226,7 +227,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { // A source that emits nothing val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) - def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.never[IO] + def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) } val io = for { @@ -235,9 +236,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { processor = testProcessor(refProcessed) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(1.hour) - latency <- sourceAndAck.processingLatency + health <- sourceAndAck.isHealthy(Duration.Zero) _ <- fiber.cancel - } yield latency must beEqualTo(Duration.Zero) + } yield health must beEqualTo(SourceAndAck.Healthy) TestControl.executeEmbed(io) } @@ -264,9 +265,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(5.minutes) - latency <- sourceAndAck.processingLatency + health <- sourceAndAck.isHealthy(10.seconds) _ <- fiber.cancel - } yield latency must beEqualTo(5.minutes) + } yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(5.minutes)) TestControl.executeEmbed(io) } @@ -291,9 +292,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect { sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(342.seconds) - latency <- sourceAndAck.processingLatency + health <- sourceAndAck.isHealthy(90.seconds) _ <- fiber.cancel - } yield latency must beEqualTo(42.seconds) + } yield health must beEqualTo(SourceAndAck.Healthy) TestControl.executeEmbed(io) } @@ -320,9 +321,39 @@ class LowLevelSourceSpec extends Specification with CatsEffect { sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) fiber <- sourceAndAck.stream(config, processor).compile.drain.start _ <- IO.sleep(5.minutes) - latency <- sourceAndAck.processingLatency + health <- sourceAndAck.isHealthy(1.microsecond) _ <- fiber.cancel - } yield latency must beEqualTo(Duration.Zero) + } yield health must beEqualTo(SourceAndAck.Healthy) + + TestControl.executeEmbed(io) + } + + def e11 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + + // A source that emits one batch per window, with a 5 minute "rebalancing" in between windows + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = + Stream.fixedDelay[IO](5.minutes).map { _ => + Stream.emit(LowLevelEvents(Chunk.empty, (), None)) + } + } + + // A processor which takes 5 seconds to process each batch + val processor: EventProcessor[IO] = + _.evalMap { case TokenedEvents(_, token, _) => + IO.sleep(5.seconds).as(token) + } + + val io = for { + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(2.minutes) + health <- sourceAndAck.isHealthy(1.microsecond) + _ <- fiber.cancel + } yield health must beEqualTo(SourceAndAck.Disconnected) TestControl.executeEmbed(io) }