From c5e04c320a0268b8bca02297bd3ce9ce0c9b05fc Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Jan 2025 12:45:16 +0000 Subject: [PATCH] Amended for common-streams 0.10.0-M3 --- .../processing/Processing.scala | 23 ++------ .../MockEnvironment.scala | 2 +- .../TestSparkEnvironment.scala | 2 +- .../processing/EventUtils.scala | 4 +- .../processing/ProcessingSpec.scala | 53 ++----------------- project/Dependencies.scala | 2 +- 6 files changed, 14 insertions(+), 72 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index 6a6b5226..b9febd01 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProces import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.ListOfList -import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics, RuntimeService} +import com.snowplowanalytics.snowplow.lakes.{Environment, RuntimeService} import com.snowplowanalytics.snowplow.runtime.processing.BatchUp import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.loaders.transform.{ @@ -57,7 +57,7 @@ object Processing { Stream.eval(env.lakeWriter.createTable *> deferredTableExists.complete(())) implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient) - val eventProcessingConfig: EventProcessingConfig = EventProcessingConfig(env.windowing) + val eventProcessingConfig = EventProcessingConfig(env.windowing, env.metrics.setLatency) env.source .stream(eventProcessingConfig, eventProcessor(env, deferredTableExists.get)) @@ -120,8 +120,7 @@ object Processing { badProcessor: BadRowProcessor, ref: Ref[F, WindowState] ): Pipe[F, TokenedEvents, Nothing] = - _.through(setLatency(env.metrics)) - .through(rememberTokens(ref)) + _.through(rememberTokens(ref)) .through(incrementReceivedCount(env)) .through(parseBytes(env, badProcessor)) .through(handleParseFailures(env, badProcessor)) @@ -167,22 +166,8 @@ object Processing { }.drain - private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] = - _.evalTap { - _.earliestSourceTstamp match { - case Some(t) => - for { - now <- Sync[F].realTime - latency = now - t.toEpochMilli.millis - _ <- metrics.setLatency(latency) - } yield () - case None => - Applicative[F].unit - } - } - private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] = - _.evalMap { case TokenedEvents(events, token, _) => + _.evalMap { case TokenedEvents(events, token) => ref.update(state => state.copy(tokens = token :: state.tokens)).as(events) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala index f29a6c98..efaa1c49 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala @@ -119,7 +119,7 @@ object MockEnvironment { private def testSourceAndAck(windows: List[List[TokenedEvents]], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] = new SourceAndAck[IO] { - def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] = + def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] = Stream.eval(state.update(_ :+ SubscribedToStream)).drain ++ Stream.emits(windows).flatMap { batches => Stream diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index 7d1101d9..6b411a2d 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -60,7 +60,7 @@ object TestSparkEnvironment { private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] = new SourceAndAck[IO] { - def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] = + def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] = Stream.emits(windows).flatMap { batches => Stream .emits(batches) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/EventUtils.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/EventUtils.scala index 7db08ef0..44e7789c 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/EventUtils.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/EventUtils.scala @@ -28,7 +28,7 @@ object EventUtils { StandardCharsets.UTF_8.encode(e.toTsv) } IO.unique.map { ack => - TokenedEvents(serialized, ack, None) + TokenedEvents(serialized, ack) } } } @@ -60,7 +60,7 @@ object EventUtils { def badlyFormatted: IO[TokenedEvents] = IO.unique.map { token => val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) - TokenedEvents(serialized, token, None) + TokenedEvents(serialized, token) } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala index afea13e0..2a933ee4 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala @@ -11,15 +11,11 @@ package com.snowplowanalytics.snowplow.lakes.processing import cats.implicits._ -import cats.effect.IO import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import io.circe.Json import cats.effect.testkit.TestControl -import java.time.Instant -import scala.concurrent.duration.DurationLong - import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent import com.snowplowanalytics.snowplow.lakes.{MockEnvironment, RuntimeService} @@ -34,10 +30,9 @@ class ProcessingSpec extends Specification with CatsEffect { Write multiple windows of events in order $e3 Write multiple batches in a single window when batch exceeds cutoff $e4 Write good batches and bad events when a window contains both $e5 - Set the latency metric based off the message timestamp $e6 - Load events with a known schema $e7 - Send failed events for an unrecognized schema $e8 - Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e9 + Load events with a known schema $e6 + Send failed events for an unrecognized schema $e7 + Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e8 """ def e1 = { @@ -227,44 +222,6 @@ class ProcessingSpec extends Specification with CatsEffect { } def e6 = { - val messageTime = Instant.parse("2023-10-24T10:00:00.000Z") - val processTime = Instant.parse("2023-10-24T10:00:42.123Z").minusMillis(MockEnvironment.TimeTakenToCreateTable.toMillis) - - val io = for { - inputs <- EventUtils.inputEvents(2, EventUtils.good()) - tokened <- inputs.traverse(_.tokened).map { - _.map { - _.copy(earliestSourceTstamp = Some(messageTime)) - } - } - control <- MockEnvironment.build(List(tokened)) - _ <- IO.sleep(processTime.toEpochMilli.millis) - _ <- Processing.stream(control.environment).compile.drain - state <- control.state.get - } yield state should beEqualTo( - Vector( - Action.SubscribedToStream, - Action.CreatedTable, - Action.InitializedLocalDataFrame("v20231024100032"), - Action.SetLatencyMetric(42123.millis), - Action.AddedReceivedCountMetric(2), - Action.SetLatencyMetric(42123.millis), - Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v20231024100032", 4), - Action.CommittedToTheLake("v20231024100032"), - Action.AddedCommittedCountMetric(4), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), - Action - .SetE2ELatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable + processTime.toEpochMilli.millis), - Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v20231024100032") - ) - ) - - TestControl.executeEmbed(io) - } - - def e7 = { val ueGood700 = SnowplowEvent.UnstructEvent( Some( @@ -302,7 +259,7 @@ class ProcessingSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def e8 = { + def e7 = { val ueDoesNotExist = SnowplowEvent.UnstructEvent( Some( @@ -342,7 +299,7 @@ class ProcessingSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - def e9 = { + def e8 = { val ueDoesNotExist = SnowplowEvent.UnstructEvent( Some( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f22a9119..a25ec2cf 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -49,7 +49,7 @@ object Dependencies { val awsRegistry = "1.1.20" // Snowplow - val streams = "0.10.0-M2" + val streams = "0.10.0-M3" val igluClient = "4.0.0" // Transitive overrides