Skip to content

Commit

Permalink
Amended for common-streams 0.10.0-M3
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 9, 2025
1 parent 5ca2239 commit c5e04c3
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProces
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics, RuntimeService}
import com.snowplowanalytics.snowplow.lakes.{Environment, RuntimeService}
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.loaders.transform.{
Expand All @@ -57,7 +57,7 @@ object Processing {
Stream.eval(env.lakeWriter.createTable *> deferredTableExists.complete(()))

implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient)
val eventProcessingConfig: EventProcessingConfig = EventProcessingConfig(env.windowing)
val eventProcessingConfig = EventProcessingConfig(env.windowing, env.metrics.setLatency)

env.source
.stream(eventProcessingConfig, eventProcessor(env, deferredTableExists.get))
Expand Down Expand Up @@ -120,8 +120,7 @@ object Processing {
badProcessor: BadRowProcessor,
ref: Ref[F, WindowState]
): Pipe[F, TokenedEvents, Nothing] =
_.through(setLatency(env.metrics))
.through(rememberTokens(ref))
_.through(rememberTokens(ref))
.through(incrementReceivedCount(env))
.through(parseBytes(env, badProcessor))
.through(handleParseFailures(env, badProcessor))
Expand Down Expand Up @@ -167,22 +166,8 @@ object Processing {

}.drain

private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] =
_.evalTap {
_.earliestSourceTstamp match {
case Some(t) =>
for {
now <- Sync[F].realTime
latency = now - t.toEpochMilli.millis
_ <- metrics.setLatency(latency)
} yield ()
case None =>
Applicative[F].unit
}
}

private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] =
_.evalMap { case TokenedEvents(events, token, _) =>
_.evalMap { case TokenedEvents(events, token) =>
ref.update(state => state.copy(tokens = token :: state.tokens)).as(events)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object MockEnvironment {

private def testSourceAndAck(windows: List[List[TokenedEvents]], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] =
new SourceAndAck[IO] {
def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] =
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
Stream.eval(state.update(_ :+ SubscribedToStream)).drain ++
Stream.emits(windows).flatMap { batches =>
Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object TestSparkEnvironment {

private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] =
new SourceAndAck[IO] {
def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] =
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
Stream.emits(windows).flatMap { batches =>
Stream
.emits(batches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object EventUtils {
StandardCharsets.UTF_8.encode(e.toTsv)
}
IO.unique.map { ack =>
TokenedEvents(serialized, ack, None)
TokenedEvents(serialized, ack)
}
}
}
Expand Down Expand Up @@ -60,7 +60,7 @@ object EventUtils {
def badlyFormatted: IO[TokenedEvents] =
IO.unique.map { token =>
val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
TokenedEvents(serialized, token, None)
TokenedEvents(serialized, token)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@
package com.snowplowanalytics.snowplow.lakes.processing

import cats.implicits._
import cats.effect.IO
import org.specs2.Specification
import cats.effect.testing.specs2.CatsEffect
import io.circe.Json
import cats.effect.testkit.TestControl

import java.time.Instant
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent
import com.snowplowanalytics.snowplow.lakes.{MockEnvironment, RuntimeService}
Expand All @@ -34,10 +30,9 @@ class ProcessingSpec extends Specification with CatsEffect {
Write multiple windows of events in order $e3
Write multiple batches in a single window when batch exceeds cutoff $e4
Write good batches and bad events when a window contains both $e5
Set the latency metric based off the message timestamp $e6
Load events with a known schema $e7
Send failed events for an unrecognized schema $e8
Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e9
Load events with a known schema $e6
Send failed events for an unrecognized schema $e7
Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e8
"""

def e1 = {
Expand Down Expand Up @@ -227,44 +222,6 @@ class ProcessingSpec extends Specification with CatsEffect {
}

def e6 = {
val messageTime = Instant.parse("2023-10-24T10:00:00.000Z")
val processTime = Instant.parse("2023-10-24T10:00:42.123Z").minusMillis(MockEnvironment.TimeTakenToCreateTable.toMillis)

val io = for {
inputs <- EventUtils.inputEvents(2, EventUtils.good())
tokened <- inputs.traverse(_.tokened).map {
_.map {
_.copy(earliestSourceTstamp = Some(messageTime))
}
}
control <- MockEnvironment.build(List(tokened))
_ <- IO.sleep(processTime.toEpochMilli.millis)
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.SubscribedToStream,
Action.CreatedTable,
Action.InitializedLocalDataFrame("v20231024100032"),
Action.SetLatencyMetric(42123.millis),
Action.AddedReceivedCountMetric(2),
Action.SetLatencyMetric(42123.millis),
Action.AddedReceivedCountMetric(2),
Action.AppendedRowsToDataFrame("v20231024100032", 4),
Action.CommittedToTheLake("v20231024100032"),
Action.AddedCommittedCountMetric(4),
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
Action
.SetE2ELatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable + processTime.toEpochMilli.millis),
Action.Checkpointed(tokened.map(_.ack)),
Action.RemovedDataFrameFromDisk("v20231024100032")
)
)

TestControl.executeEmbed(io)
}

def e7 = {

val ueGood700 = SnowplowEvent.UnstructEvent(
Some(
Expand Down Expand Up @@ -302,7 +259,7 @@ class ProcessingSpec extends Specification with CatsEffect {
TestControl.executeEmbed(io)
}

def e8 = {
def e7 = {

val ueDoesNotExist = SnowplowEvent.UnstructEvent(
Some(
Expand Down Expand Up @@ -342,7 +299,7 @@ class ProcessingSpec extends Specification with CatsEffect {
TestControl.executeEmbed(io)
}

def e9 = {
def e8 = {

val ueDoesNotExist = SnowplowEvent.UnstructEvent(
Some(
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object Dependencies {
val awsRegistry = "1.1.20"

// Snowplow
val streams = "0.10.0-M2"
val streams = "0.10.0-M3"
val igluClient = "4.0.0"

// Transitive overrides
Expand Down

0 comments on commit c5e04c3

Please sign in to comment.