Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing implicit on port decoder #19

Merged
merged 5 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object KafkaSink {

private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] =
Sink { batch =>
val records = Chunk.seq(batch.map(toProducerRecord(config, _)))
val records = Chunk.from(batch.map(toProducerRecord(config, _)))
producer.produce(records).flatten.void
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources.kafka
import cats.Applicative
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import cats.effect.implicits._
import cats.kernel.Semigroup
import fs2.Stream
import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
Expand Down Expand Up @@ -49,12 +50,12 @@ object KafkaSource {
if (x.offset > y.offset) x else y
}

private def kafkaCheckpointer[F[_]: Applicative]: Checkpointer[F, KafkaCheckpoints[F]] = new Checkpointer[F, KafkaCheckpoints[F]] {
private def kafkaCheckpointer[F[_]: Async]: Checkpointer[F, KafkaCheckpoints[F]] = new Checkpointer[F, KafkaCheckpoints[F]] {
def combine(x: KafkaCheckpoints[F], y: KafkaCheckpoints[F]): KafkaCheckpoints[F] =
KafkaCheckpoints(x.byPartition |+| y.byPartition)

val empty: KafkaCheckpoints[F] = KafkaCheckpoints(Map.empty)
def ack(c: KafkaCheckpoints[F]): F[Unit] = c.byPartition.values.toList.traverse_(_.commit)
def ack(c: KafkaCheckpoints[F]): F[Unit] = c.byPartition.values.toList.parTraverse(_.commit).void
def nack(c: KafkaCheckpoints[F]): F[Unit] = Applicative[F].unit
}

Expand All @@ -78,9 +79,9 @@ object KafkaSource {
.flatMap { chunk =>
chunk.last match {
case Some(last) =>
val events = chunk.iterator.map {
val events = chunk.map {
_.record.value
}.toList
}
val ack = KafkaCheckpoints(Map(topicPartition.partition -> OffsetAndCommit(last.record.offset, last.offset.commit)))
val timestamps = chunk.iterator.flatMap { ccr =>
val ts = ccr.record.timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Utils {
_.evalMap { case TokenedEvents(events, token, tstamp) =>
val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)
for {
_ <- ref.update(_ :+ ReceivedEvents(parsed, tstamp))
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp))
} yield token
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,14 @@ object KinesisSource {
.chunks
.filter(_.nonEmpty)
.map { chunk =>
val ack = chunk.toList
val ack = chunk.asSeq
.groupBy(_.shardId)
.iterator
.map { case (k, records) =>
k -> records.maxBy(_.sequenceNumber).toMetadata[F]
}
.toMap
val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min
LowLevelEvents(chunk.toList.map(_.record.data()), ack, Some(earliestTstamp))
LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
package com.snowplowanalytics.snowplow.sinks.pubsub

import cats.effect.{Async, Sync}
import cats.effect.implicits._
import cats.effect.kernel.Resource
import cats.implicits._
import com.google.api.core.ApiFutures
import cats.Foldable
import com.google.api.core.{ApiFuture, ApiFutures}
import com.google.api.gax.batching.BatchingSettings
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
Expand All @@ -32,8 +32,8 @@ object PubsubSink {
}

private def sinkBatch[F[_]: Async](publisher: Publisher, batch: List[Sinkable]): F[Unit] =
batch
.parTraverse { case Sinkable(bytes, _, attributes) =>
Foldable[List]
.foldM(batch, List.empty[ApiFuture[String]]) { case (futures, Sinkable(bytes, _, attributes)) =>
for {
uuid <- Async[F].delay(UUID.randomUUID)
message = PubsubMessage.newBuilder
Expand All @@ -42,7 +42,7 @@ object PubsubSink {
.putAllAttributes(attributes.asJava)
.build
fut <- Async[F].delay(publisher.publish(message))
} yield fut
} yield fut :: futures
}
.flatMap { futures =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import cats.effect.implicits._
import cats.effect.kernel.{Deferred, DeferredSink, DeferredSource}
import cats.effect.std._
import cats.implicits._
import fs2.Stream
import fs2.{Chunk, Stream}
import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
import org.typelevel.log4cats.slf4j.Slf4jLogger

Expand Down Expand Up @@ -44,27 +44,27 @@ object PubsubSource {
def build[F[_]: Async](config: PubsubSourceConfig): F[SourceAndAck[F]] =
LowLevelSource.toSourceAndAck(lowLevel(config))

private type PubSubCheckpointer[F[_]] = Checkpointer[F, List[AckReplyConsumer]]
private type PubSubCheckpointer[F[_]] = Checkpointer[F, Chunk[AckReplyConsumer]]

private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, List[AckReplyConsumer]] =
new LowLevelSource[F, List[AckReplyConsumer]] {
private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, Chunk[AckReplyConsumer]] =
new LowLevelSource[F, Chunk[AckReplyConsumer]] {
def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer

def stream: Stream[F, Stream[F, LowLevelEvents[List[AckReplyConsumer]]]] =
def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] =
Stream.emit(pubsubStream(config))
}

private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] {
def combine(x: List[AckReplyConsumer], y: List[AckReplyConsumer]): List[AckReplyConsumer] =
x ::: y
def combine(x: Chunk[AckReplyConsumer], y: Chunk[AckReplyConsumer]): Chunk[AckReplyConsumer] =
Chunk.Queue(x, y)

val empty: List[AckReplyConsumer] = Nil
def ack(c: List[AckReplyConsumer]): F[Unit] =
val empty: Chunk[AckReplyConsumer] = Chunk.empty
def ack(c: Chunk[AckReplyConsumer]): F[Unit] =
Sync[F].delay {
c.foreach(_.ack())
}

def nack(c: List[AckReplyConsumer]): F[Unit] =
def nack(c: Chunk[AckReplyConsumer]): F[Unit] =
Sync[F].delay {
c.foreach(_.nack())
}
Expand All @@ -76,7 +76,7 @@ object PubsubSource {
tstamp: Instant
)

private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[List[AckReplyConsumer]]] = {
private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] = {
val resources = for {
dispatcher <- Stream.resource(Dispatcher.sequential(await = false))
queue <- Stream.eval(Queue.unbounded[F, SingleMessage[F]])
Expand All @@ -92,13 +92,15 @@ object PubsubSource {
.chunks
.filter(_.nonEmpty)
.map { chunk =>
val events = chunk.map(_.message).toList
val acks = chunk.map(_.ackReply).toList
val events = chunk.map(_.message)
val acks = chunk.map(_.ackReply)
val earliestTstamp = chunk.map(_.tstamp).iterator.min
LowLevelEvents(events, acks, Some(earliestTstamp))
}
.evalTap { case LowLevelEvents(events, _, _) =>
val numPermits = events.map(e => permitsFor(config, e.limit())).sum
val numPermits = events.foldLeft(0L) { case (numPermits, e) =>
numPermits + permitsFor(config, e.limit())
}
semaphore.releaseN(numPermits)
}
.interruptWhen(sig)
Expand Down Expand Up @@ -181,7 +183,7 @@ object PubsubSource {
}

go(Nil, queue).flatMap { ackers =>
pubsubCheckpointer.ack(ackers)
pubsubCheckpointer.nack(Chunk.from(ackers))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object HealthProbe {
.void

object decoders {
def portDecoder: Decoder[Port] = Decoder.decodeInt.emap { port =>
implicit def portDecoder: Decoder[Port] = Decoder.decodeInt.emap { port =>
Port.fromInt(port).toRight("Invalid port")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime.syntax

import cats.implicits._
import cats.{Foldable, Monad}
import java.nio.ByteBuffer

trait FoldableExtensionSyntax {
implicit final def snowplowFoldableSyntax[M[_]](foldable: Foldable[M]): FoldableExtensionOps[M] = new FoldableExtensionOps(foldable)
}

final class FoldableExtensionOps[M[_]](private val M: Foldable[M]) extends AnyVal {

/**
* Traversal over a List with an effect
*
* This is similar to a cats Traverse. But it is more efficient than cats Traverse because it does
* not attempt to keep the order of the original list in the final result.
Comment on lines +23 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why this would be quicker without preserving the order, we just need to iterate over all the elements once and doing it in the same order sounds like the fastest way. And are we not still preserving the order with this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a List and you do list.traverse { i => ???} then the first thing cats does underneath is to copy the original List into a buffer. Whereas this new method does a single traversal.

And are we not still preserving the order with this function?

See the unit tests! It's because we iterate in a forwards direction, and then do _ :: acc which prepends (not appends) to the accumulated list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I get it now ! That's a great idea !

*
* This is helpful in many snowplow apps, where order of events is not important to us.
*
* Example:
* {{{
* Foldable[List].tranverseUnordered(events) { event =>
* IO.delay {
* transformEvent(event)
* }
* }
* }}}
*/
def traverseUnordered[F[_]: Monad, A, B](items: M[A])(f: A => F[B]): F[List[B]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're missing M.empty to be able to return F[M[B]] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely sure what you're asking -- is it about why I return F[List[B]] instead of F[M[B]]? I did it this way because constructing a List like this is very fast. And because most of the time in our applications it is convenient to receive a List as the return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it about why I return F[List[B]] instead of F[M[B]]?

Yes it was. Thanks for the answer.

M.foldM(items, List.empty[B]) { case (acc, item) =>
f(item).map(_ :: acc)
}

/**
* Traversal over a List with an effect that produces success or failure
*
* This is helpful in many snowplow apps where we process events in batches, and each event might
* produce a bad row. We typically want to handle the resulting bad rows separately from the
* successes. And we don't care about the order of events.
*
* Example:
* {{{
* Foldable[List].traverseUnordered(strings) { str =>
* IO.delay {
* Event.parse(str).toEither
* }
* }.map { case (badRows, events) =>
* // handle results
* }
* }}}
*/
def traverseSeparateUnordered[F[_]: Monad, A, B, C](items: M[A])(f: A => F[Either[B, C]]): F[(List[B], List[C])] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very useful!

M.foldM(items, (List.empty[B], List.empty[C])) { case ((lefts, rights), item) =>
f(item).map {
case Left(b) => (b :: lefts, rights)
case Right(c) => (lefts, c :: rights)
}
}

/**
* Sum elements of a List
*
* Helpful in snowplow apps for summing the lengths of byte buffers
*
* Example:
* {{{
* Foldable[Chunk].sumBy(byteBuffers) { b =>
* b.limit() - b.position()
* }
* }}}
*/
def sumBy[F[_], A](items: M[A])(f: A => Long): Long =
M.foldLeft(items, 0L) { case (acc, item) =>
acc + f(item)
}

/**
* Sum total number of bytes in a list of byte buffers
*
* Example:
* {{{
* Foldable[Chunk].sumBytes(byteBuffers)
* }}}
*/
def sumBytes[F[_]](items: M[ByteBuffer]): Long =
sumBy(items) { byteBuffer =>
(byteBuffer.limit() - byteBuffer.position()).toLong
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime

package object syntax {
object foldable extends FoldableExtensionSyntax
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then if we just import com.snowplowanalytics.snowplow.runtime.syntax.foldable we can use the additional functions right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.snowplowanalytics.snowplow.runtime.syntax.foldable._

with underscore at the end. I copied this syntax object by doing what cats does.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime

import cats.Foldable
import cats.effect.IO
import org.specs2.Specification
import cats.effect.unsafe.implicits.global

import com.snowplowanalytics.snowplow.runtime.syntax.foldable._

class SyntaxSpec extends Specification {

def is = s2"""
The foldable syntax should
Traverse a list and perform an effectful transformation $e1
Traverse a list and perform an effectful transformation to Eithers $e2
Sum elements of a list with an effect $e3
"""

def e1 = {
val inputs = List("a", "b", "c")
val expected = List("c-transformed", "b-transformed", "a-transformed")

val result = Foldable[List]
.traverseUnordered(inputs) { str =>
IO.delay {
s"$str-transformed"
}
}
.unsafeRunSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use this to remove the .unsafeRunSync()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this.... in my opinion after re-writing it to use the CatsEffect mixin, this spec lost some clarity. I like the mixin, and I use it several places already in these common libraries. But I don't feel we need to use it everywhere.


result must beEqualTo(expected)

}

def e2 = {
val inputs = List(1, 2, 3, 4, 5)
val expectedLefts = List(5, 3, 1)
val expectedRights = List(4, 2)

val (lefts, rights) = Foldable[List]
.traverseSeparateUnordered(inputs) { i =>
IO.delay {
if (i % 2 > 0) Left(i) else Right(i)
}
}
.unsafeRunSync()

(lefts must beEqualTo(expectedLefts)) and
(rights must beEqualTo(expectedRights))

}

def e3 = {
val inputs = List[Long](1, 2, 3, 4, 5)

val result = Foldable[List]
.sumBy(inputs) { i =>
i * 10
}

result must beEqualTo(150)
}
}
Loading
Loading