From 1fe5d8103cdb01d5b74e77b1422703d79238d9d6 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 8 Nov 2023 14:36:57 +0000 Subject: [PATCH 1/5] Fix missing implicit on port decoder --- .../com/snowplowanalytics/snowplow/runtime/HealthProbe.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index 4cc45d2..d0ab20e 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -39,7 +39,7 @@ object HealthProbe { .void object decoders { - def portDecoder: Decoder[Port] = Decoder.decodeInt.emap { port => + implicit def portDecoder: Decoder[Port] = Decoder.decodeInt.emap { port => Port.fromInt(port).toRight("Invalid port") } } From f6f141f10281330340ff374eb7c5c939fe5d4cc8 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Nov 2023 08:59:47 +0000 Subject: [PATCH 2/5] Fix: Pubsub source should nack, not ack, pending messages during shutdown --- .../snowplow/sources/pubsub/PubsubSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 16b0a4f..9bfeeff 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -181,7 +181,7 @@ object PubsubSource { } go(Nil, queue).flatMap { ackers => - pubsubCheckpointer.ack(ackers) + pubsubCheckpointer.nack(ackers) } } From 7d8716e5624bccf6dcffb952bbab16650a196165 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Nov 2023 00:18:24 +0000 Subject: [PATCH 3/5] Change Source and Sink traits to use Chunk instead of List --- .../snowplow/sinks/kafka/KafkaSink.scala | 2 +- .../snowplow/sources/kafka/KafkaSource.scala | 4 +-- .../snowplow/sources/kinesis/Utils.scala | 2 +- .../sources/kinesis/KinesisSource.scala | 5 ++- .../sources/pubsub/PubsubSource.scala | 32 ++++++++++--------- .../sources/TokenedEvents.scala | 5 +-- .../sources/internal/LowLevelEvents.scala | 4 ++- .../sources/internal/LowLevelSourceSpec.scala | 22 +++++++------ project/Dependencies.scala | 4 +-- 9 files changed, 43 insertions(+), 37 deletions(-) diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala index b1dd482..512689b 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala @@ -33,7 +33,7 @@ object KafkaSink { private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] = Sink { batch => - val records = Chunk.seq(batch.map(toProducerRecord(config, _))) + val records = Chunk.from(batch.map(toProducerRecord(config, _))) producer.produce(records).flatten.void } diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index 652fe2b..e6a6482 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -78,9 +78,9 @@ object KafkaSource { .flatMap { chunk => chunk.last match { case Some(last) => - val events = chunk.iterator.map { + val events = chunk.map { _.record.value - }.toList + } val ack = KafkaCheckpoints(Map(topicPartition.partition -> OffsetAndCommit(last.record.offset, last.offset.commit))) val timestamps = chunk.iterator.flatMap { ccr => val ts = ccr.record.timestamp diff --git a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index d431bf2..6d5dca6 100644 --- a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -57,7 +57,7 @@ object Utils { _.evalMap { case TokenedEvents(events, token, tstamp) => val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString) for { - _ <- ref.update(_ :+ ReceivedEvents(parsed, tstamp)) + _ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp)) } yield token } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 1d91261..662ce25 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -139,15 +139,14 @@ object KinesisSource { .chunks .filter(_.nonEmpty) .map { chunk => - val ack = chunk.toList + val ack = chunk.asSeq .groupBy(_.shardId) - .iterator .map { case (k, records) => k -> records.maxBy(_.sequenceNumber).toMetadata[F] } .toMap val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min - LowLevelEvents(chunk.toList.map(_.record.data()), ack, Some(earliestTstamp)) + LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp)) } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 9bfeeff..e2fe13d 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -13,7 +13,7 @@ import cats.effect.implicits._ import cats.effect.kernel.{Deferred, DeferredSink, DeferredSource} import cats.effect.std._ import cats.implicits._ -import fs2.Stream +import fs2.{Chunk, Stream} import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -44,27 +44,27 @@ object PubsubSource { def build[F[_]: Async](config: PubsubSourceConfig): F[SourceAndAck[F]] = LowLevelSource.toSourceAndAck(lowLevel(config)) - private type PubSubCheckpointer[F[_]] = Checkpointer[F, List[AckReplyConsumer]] + private type PubSubCheckpointer[F[_]] = Checkpointer[F, Chunk[AckReplyConsumer]] - private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, List[AckReplyConsumer]] = - new LowLevelSource[F, List[AckReplyConsumer]] { + private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, Chunk[AckReplyConsumer]] = + new LowLevelSource[F, Chunk[AckReplyConsumer]] { def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer - def stream: Stream[F, Stream[F, LowLevelEvents[List[AckReplyConsumer]]]] = + def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = Stream.emit(pubsubStream(config)) } private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] { - def combine(x: List[AckReplyConsumer], y: List[AckReplyConsumer]): List[AckReplyConsumer] = - x ::: y + def combine(x: Chunk[AckReplyConsumer], y: Chunk[AckReplyConsumer]): Chunk[AckReplyConsumer] = + Chunk.Queue(x, y) - val empty: List[AckReplyConsumer] = Nil - def ack(c: List[AckReplyConsumer]): F[Unit] = + val empty: Chunk[AckReplyConsumer] = Chunk.empty + def ack(c: Chunk[AckReplyConsumer]): F[Unit] = Sync[F].delay { c.foreach(_.ack()) } - def nack(c: List[AckReplyConsumer]): F[Unit] = + def nack(c: Chunk[AckReplyConsumer]): F[Unit] = Sync[F].delay { c.foreach(_.nack()) } @@ -76,7 +76,7 @@ object PubsubSource { tstamp: Instant ) - private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[List[AckReplyConsumer]]] = { + private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] = { val resources = for { dispatcher <- Stream.resource(Dispatcher.sequential(await = false)) queue <- Stream.eval(Queue.unbounded[F, SingleMessage[F]]) @@ -92,13 +92,15 @@ object PubsubSource { .chunks .filter(_.nonEmpty) .map { chunk => - val events = chunk.map(_.message).toList - val acks = chunk.map(_.ackReply).toList + val events = chunk.map(_.message) + val acks = chunk.map(_.ackReply) val earliestTstamp = chunk.map(_.tstamp).iterator.min LowLevelEvents(events, acks, Some(earliestTstamp)) } .evalTap { case LowLevelEvents(events, _, _) => - val numPermits = events.map(e => permitsFor(config, e.limit())).sum + val numPermits = events.foldLeft(0L) { case (numPermits, e) => + numPermits + permitsFor(config, e.limit()) + } semaphore.releaseN(numPermits) } .interruptWhen(sig) @@ -181,7 +183,7 @@ object PubsubSource { } go(Nil, queue).flatMap { ackers => - pubsubCheckpointer.nack(ackers) + pubsubCheckpointer.nack(Chunk.from(ackers)) } } diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala index 2e1ceec..da1f0ae 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/TokenedEvents.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.sources import cats.effect.kernel.Unique +import fs2.Chunk import java.nio.ByteBuffer import java.time.Instant @@ -16,7 +17,7 @@ import java.time.Instant * The events as they are fed into a [[EventProcessor]] * * @param events - * Each item in the List an event read from the external stream, before parsing + * Each item in the Chunk is an event read from the external stream, before parsing * @param ack * The [[EventProcessor]] must emit this token after it has fully processed the batch of events. * When the [[EventProcessor]] emits the token, it is an instruction to the [[SourceAndAck]] to @@ -26,7 +27,7 @@ import java.time.Instant * the latency metric. */ case class TokenedEvents( - events: List[ByteBuffer], + events: Chunk[ByteBuffer], ack: Unique.Token, earliestSourceTstamp: Option[Instant] ) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala index 708fc3c..6353fd8 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelEvents.scala @@ -7,6 +7,8 @@ */ package com.snowplowanalytics.snowplow.sources.internal +import fs2.Chunk + import java.nio.ByteBuffer import java.time.Instant @@ -17,7 +19,7 @@ import java.time.Instant * processor */ case class LowLevelEvents[C]( - events: List[ByteBuffer], + events: Chunk[ByteBuffer], ack: C, earliestSourceTstamp: Option[Instant] ) diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 2edd64b..8680e46 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -11,7 +11,7 @@ import cats.effect.IO import cats.effect.kernel.{Ref, Unique} import cats.effect.testkit.TestControl import cats.effect.testing.specs2.CatsEffect -import fs2.Stream +import fs2.{Chunk, Stream} import org.specs2.Specification import org.specs2.matcher.Matcher @@ -117,7 +117,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { IO.raiseError(new RuntimeException(s"boom! Exceeded $errorAfterBatch batches")) else ref - .update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)) + .update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList) .as(token) } @@ -250,7 +250,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { - Stream.emit(LowLevelEvents(Nil, (), None)).repeat + Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat } } @@ -279,7 +279,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { - Stream.emit(LowLevelEvents(Nil, (), None)).repeat + Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat } } @@ -306,7 +306,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { - Stream.emit(LowLevelEvents(Nil, (), None)) ++ Stream.never[IO] + Stream.emit(LowLevelEvents(Chunk.empty, (), None)) ++ Stream.never[IO] } } @@ -353,7 +353,7 @@ object LowLevelSourceSpec { _.evalMap { case TokenedEvents(events, token, _) => for { _ <- IO.sleep(TimeToProcessBatch) - _ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)) + _ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList) } yield token } @@ -368,7 +368,7 @@ object LowLevelSourceSpec { val out = in.evalMap { case TokenedEvents(events, token, _) => for { _ <- IO.sleep(TimeToProcessBatch) - _ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)) + _ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString).toList) _ <- checkpoints.update(token :: _) } yield () } @@ -394,9 +394,11 @@ object LowLevelSourceSpec { Stream.range(1, BatchesPerRebalance + 1).flatMap { batchId => val events = (1 to EventsPerBatch) .map(eventId => s"rebalance $rebalanceId - batch $batchId - event $eventId") - .toList - val asBytes = events.map(_.getBytes(StandardCharsets.UTF_8)).map(ByteBuffer.wrap) - Stream.emit(LowLevelEvents(events = asBytes, ack = events, earliestSourceTstamp = None)) ++ Stream + val asBytes = Chunk + .from(events) + .map(_.getBytes(StandardCharsets.UTF_8)) + .map(ByteBuffer.wrap) + Stream.emit(LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = None)) ++ Stream .sleep[IO](TimeBetweenBatches) .drain } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7151449..ea30710 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,8 +12,8 @@ object Dependencies { object V { // Scala val cats = "2.9.0" - val catsEffect = "3.5.0" - val fs2 = "3.7.0" + val catsEffect = "3.5.2" + val fs2 = "3.9.3" val log4cats = "2.6.0" val http4s = "0.23.15" val decline = "2.4.1" From c0085e42c4bd8da65c35ce9e1ec2fa3a7673b6d4 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Nov 2023 21:28:53 +0000 Subject: [PATCH 4/5] Add syntax for traversing lists --- .../snowplow/runtime/syntax/foldable.scala | 97 +++++++++++++++++++ .../snowplow/runtime/syntax/package.scala | 12 +++ .../snowplow/runtime/SyntaxSpec.scala | 70 +++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/foldable.scala create mode 100644 modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/package.scala create mode 100644 modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/SyntaxSpec.scala diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/foldable.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/foldable.scala new file mode 100644 index 0000000..9dc4169 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/foldable.scala @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime.syntax + +import cats.implicits._ +import cats.{Foldable, Monad} +import java.nio.ByteBuffer + +trait FoldableExtensionSyntax { + implicit final def snowplowFoldableSyntax[M[_]](foldable: Foldable[M]): FoldableExtensionOps[M] = new FoldableExtensionOps(foldable) +} + +final class FoldableExtensionOps[M[_]](private val M: Foldable[M]) extends AnyVal { + + /** + * Traversal over a List with an effect + * + * This is similar to a cats Traverse. But it is more efficient than cats Traverse because it does + * not attempt to keep the order of the original list in the final result. + * + * This is helpful in many snowplow apps, where order of events is not important to us. + * + * Example: + * {{{ + * Foldable[List].tranverseUnordered(events) { event => + * IO.delay { + * transformEvent(event) + * } + * } + * }}} + */ + def traverseUnordered[F[_]: Monad, A, B](items: M[A])(f: A => F[B]): F[List[B]] = + M.foldM(items, List.empty[B]) { case (acc, item) => + f(item).map(_ :: acc) + } + + /** + * Traversal over a List with an effect that produces success or failure + * + * This is helpful in many snowplow apps where we process events in batches, and each event might + * produce a bad row. We typically want to handle the resulting bad rows separately from the + * successes. And we don't care about the order of events. + * + * Example: + * {{{ + * Foldable[List].traverseUnordered(strings) { str => + * IO.delay { + * Event.parse(str).toEither + * } + * }.map { case (badRows, events) => + * // handle results + * } + * }}} + */ + def traverseSeparateUnordered[F[_]: Monad, A, B, C](items: M[A])(f: A => F[Either[B, C]]): F[(List[B], List[C])] = + M.foldM(items, (List.empty[B], List.empty[C])) { case ((lefts, rights), item) => + f(item).map { + case Left(b) => (b :: lefts, rights) + case Right(c) => (lefts, c :: rights) + } + } + + /** + * Sum elements of a List + * + * Helpful in snowplow apps for summing the lengths of byte buffers + * + * Example: + * {{{ + * Foldable[Chunk].sumBy(byteBuffers) { b => + * b.limit() - b.position() + * } + * }}} + */ + def sumBy[F[_], A](items: M[A])(f: A => Long): Long = + M.foldLeft(items, 0L) { case (acc, item) => + acc + f(item) + } + + /** + * Sum total number of bytes in a list of byte buffers + * + * Example: + * {{{ + * Foldable[Chunk].sumBytes(byteBuffers) + * }}} + */ + def sumBytes[F[_]](items: M[ByteBuffer]): Long = + sumBy(items) { byteBuffer => + (byteBuffer.limit() - byteBuffer.position()).toLong + } +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/package.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/package.scala new file mode 100644 index 0000000..eae6e54 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/syntax/package.scala @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +package object syntax { + object foldable extends FoldableExtensionSyntax +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/SyntaxSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/SyntaxSpec.scala new file mode 100644 index 0000000..b906472 --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/SyntaxSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.Foldable +import cats.effect.IO +import org.specs2.Specification +import cats.effect.unsafe.implicits.global + +import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ + +class SyntaxSpec extends Specification { + + def is = s2""" + The foldable syntax should + Traverse a list and perform an effectful transformation $e1 + Traverse a list and perform an effectful transformation to Eithers $e2 + Sum elements of a list with an effect $e3 + """ + + def e1 = { + val inputs = List("a", "b", "c") + val expected = List("c-transformed", "b-transformed", "a-transformed") + + val result = Foldable[List] + .traverseUnordered(inputs) { str => + IO.delay { + s"$str-transformed" + } + } + .unsafeRunSync() + + result must beEqualTo(expected) + + } + + def e2 = { + val inputs = List(1, 2, 3, 4, 5) + val expectedLefts = List(5, 3, 1) + val expectedRights = List(4, 2) + + val (lefts, rights) = Foldable[List] + .traverseSeparateUnordered(inputs) { i => + IO.delay { + if (i % 2 > 0) Left(i) else Right(i) + } + } + .unsafeRunSync() + + (lefts must beEqualTo(expectedLefts)) and + (rights must beEqualTo(expectedRights)) + + } + + def e3 = { + val inputs = List[Long](1, 2, 3, 4, 5) + + val result = Foldable[List] + .sumBy(inputs) { i => + i * 10 + } + + result must beEqualTo(150) + } +} From 913a3874e890a22049cd113a2e18abd6a09cd0d2 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Nov 2023 21:47:17 +0000 Subject: [PATCH 5/5] Fixes to the pubsub and kafka sinks --- .../snowplow/sources/kafka/KafkaSource.scala | 5 +++-- .../snowplow/sinks/pubsub/PubsubSink.scala | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index e6a6482..e979392 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources.kafka import cats.Applicative import cats.effect.{Async, Resource, Sync} import cats.implicits._ +import cats.effect.implicits._ import cats.kernel.Semigroup import fs2.Stream import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} @@ -49,12 +50,12 @@ object KafkaSource { if (x.offset > y.offset) x else y } - private def kafkaCheckpointer[F[_]: Applicative]: Checkpointer[F, KafkaCheckpoints[F]] = new Checkpointer[F, KafkaCheckpoints[F]] { + private def kafkaCheckpointer[F[_]: Async]: Checkpointer[F, KafkaCheckpoints[F]] = new Checkpointer[F, KafkaCheckpoints[F]] { def combine(x: KafkaCheckpoints[F], y: KafkaCheckpoints[F]): KafkaCheckpoints[F] = KafkaCheckpoints(x.byPartition |+| y.byPartition) val empty: KafkaCheckpoints[F] = KafkaCheckpoints(Map.empty) - def ack(c: KafkaCheckpoints[F]): F[Unit] = c.byPartition.values.toList.traverse_(_.commit) + def ack(c: KafkaCheckpoints[F]): F[Unit] = c.byPartition.values.toList.parTraverse(_.commit).void def nack(c: KafkaCheckpoints[F]): F[Unit] = Applicative[F].unit } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sinks/pubsub/PubsubSink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sinks/pubsub/PubsubSink.scala index 28122dc..4ef0807 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sinks/pubsub/PubsubSink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sinks/pubsub/PubsubSink.scala @@ -8,10 +8,10 @@ package com.snowplowanalytics.snowplow.sinks.pubsub import cats.effect.{Async, Sync} -import cats.effect.implicits._ import cats.effect.kernel.Resource import cats.implicits._ -import com.google.api.core.ApiFutures +import cats.Foldable +import com.google.api.core.{ApiFuture, ApiFutures} import com.google.api.gax.batching.BatchingSettings import com.google.cloud.pubsub.v1.Publisher import com.google.protobuf.ByteString @@ -32,8 +32,8 @@ object PubsubSink { } private def sinkBatch[F[_]: Async](publisher: Publisher, batch: List[Sinkable]): F[Unit] = - batch - .parTraverse { case Sinkable(bytes, _, attributes) => + Foldable[List] + .foldM(batch, List.empty[ApiFuture[String]]) { case (futures, Sinkable(bytes, _, attributes)) => for { uuid <- Async[F].delay(UUID.randomUUID) message = PubsubMessage.newBuilder @@ -42,7 +42,7 @@ object PubsubSink { .putAllAttributes(attributes.asJava) .build fut <- Async[F].delay(publisher.publish(message)) - } yield fut + } yield fut :: futures } .flatMap { futures => for {