-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix missing implicit on port decoder #19
Changes from all commits
1fe5d81
f6f141f
7d8716e
c0085e4
913a387
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Snowplow Community License Version 1.0, | ||
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0. | ||
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 | ||
*/ | ||
package com.snowplowanalytics.snowplow.runtime.syntax | ||
|
||
import cats.implicits._ | ||
import cats.{Foldable, Monad} | ||
import java.nio.ByteBuffer | ||
|
||
trait FoldableExtensionSyntax { | ||
implicit final def snowplowFoldableSyntax[M[_]](foldable: Foldable[M]): FoldableExtensionOps[M] = new FoldableExtensionOps(foldable) | ||
} | ||
|
||
final class FoldableExtensionOps[M[_]](private val M: Foldable[M]) extends AnyVal { | ||
|
||
/** | ||
* Traversal over a List with an effect | ||
* | ||
* This is similar to a cats Traverse. But it is more efficient than cats Traverse because it does | ||
* not attempt to keep the order of the original list in the final result. | ||
* | ||
* This is helpful in many snowplow apps, where order of events is not important to us. | ||
* | ||
* Example: | ||
* {{{ | ||
* Foldable[List].tranverseUnordered(events) { event => | ||
* IO.delay { | ||
* transformEvent(event) | ||
* } | ||
* } | ||
* }}} | ||
*/ | ||
def traverseUnordered[F[_]: Monad, A, B](items: M[A])(f: A => F[B]): F[List[B]] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not completely sure what you're asking -- is it about why I return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes it was. Thanks for the answer. |
||
M.foldM(items, List.empty[B]) { case (acc, item) => | ||
f(item).map(_ :: acc) | ||
} | ||
|
||
/** | ||
* Traversal over a List with an effect that produces success or failure | ||
* | ||
* This is helpful in many snowplow apps where we process events in batches, and each event might | ||
* produce a bad row. We typically want to handle the resulting bad rows separately from the | ||
* successes. And we don't care about the order of events. | ||
* | ||
* Example: | ||
* {{{ | ||
* Foldable[List].traverseUnordered(strings) { str => | ||
* IO.delay { | ||
* Event.parse(str).toEither | ||
* } | ||
* }.map { case (badRows, events) => | ||
* // handle results | ||
* } | ||
* }}} | ||
*/ | ||
def traverseSeparateUnordered[F[_]: Monad, A, B, C](items: M[A])(f: A => F[Either[B, C]]): F[(List[B], List[C])] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very useful! |
||
M.foldM(items, (List.empty[B], List.empty[C])) { case ((lefts, rights), item) => | ||
f(item).map { | ||
case Left(b) => (b :: lefts, rights) | ||
case Right(c) => (lefts, c :: rights) | ||
} | ||
} | ||
|
||
/** | ||
* Sum elements of a List | ||
* | ||
* Helpful in snowplow apps for summing the lengths of byte buffers | ||
* | ||
* Example: | ||
* {{{ | ||
* Foldable[Chunk].sumBy(byteBuffers) { b => | ||
* b.limit() - b.position() | ||
* } | ||
* }}} | ||
*/ | ||
def sumBy[F[_], A](items: M[A])(f: A => Long): Long = | ||
M.foldLeft(items, 0L) { case (acc, item) => | ||
acc + f(item) | ||
} | ||
|
||
/** | ||
* Sum total number of bytes in a list of byte buffers | ||
* | ||
* Example: | ||
* {{{ | ||
* Foldable[Chunk].sumBytes(byteBuffers) | ||
* }}} | ||
*/ | ||
def sumBytes[F[_]](items: M[ByteBuffer]): Long = | ||
sumBy(items) { byteBuffer => | ||
(byteBuffer.limit() - byteBuffer.position()).toLong | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Snowplow Community License Version 1.0, | ||
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0. | ||
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 | ||
*/ | ||
package com.snowplowanalytics.snowplow.runtime | ||
|
||
package object syntax { | ||
object foldable extends FoldableExtensionSyntax | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then if we just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
with underscore at the end. I copied this syntax object by doing what cats does. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Snowplow Community License Version 1.0, | ||
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0. | ||
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 | ||
*/ | ||
package com.snowplowanalytics.snowplow.runtime | ||
|
||
import cats.Foldable | ||
import cats.effect.IO | ||
import org.specs2.Specification | ||
import cats.effect.unsafe.implicits.global | ||
|
||
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ | ||
|
||
class SyntaxSpec extends Specification { | ||
|
||
def is = s2""" | ||
The foldable syntax should | ||
Traverse a list and perform an effectful transformation $e1 | ||
Traverse a list and perform an effectful transformation to Eithers $e2 | ||
Sum elements of a list with an effect $e3 | ||
""" | ||
|
||
def e1 = { | ||
val inputs = List("a", "b", "c") | ||
val expected = List("c-transformed", "b-transformed", "a-transformed") | ||
|
||
val result = Foldable[List] | ||
.traverseUnordered(inputs) { str => | ||
IO.delay { | ||
s"$str-transformed" | ||
} | ||
} | ||
.unsafeRunSync() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use this to remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tried this.... in my opinion after re-writing it to use the |
||
|
||
result must beEqualTo(expected) | ||
|
||
} | ||
|
||
def e2 = { | ||
val inputs = List(1, 2, 3, 4, 5) | ||
val expectedLefts = List(5, 3, 1) | ||
val expectedRights = List(4, 2) | ||
|
||
val (lefts, rights) = Foldable[List] | ||
.traverseSeparateUnordered(inputs) { i => | ||
IO.delay { | ||
if (i % 2 > 0) Left(i) else Right(i) | ||
} | ||
} | ||
.unsafeRunSync() | ||
|
||
(lefts must beEqualTo(expectedLefts)) and | ||
(rights must beEqualTo(expectedRights)) | ||
|
||
} | ||
|
||
def e3 = { | ||
val inputs = List[Long](1, 2, 3, 4, 5) | ||
|
||
val result = Foldable[List] | ||
.sumBy(inputs) { i => | ||
i * 10 | ||
} | ||
|
||
result must beEqualTo(150) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why this would be quicker without preserving the order, we just need to iterate over all the elements once and doing it in the same order sounds like the fastest way. And are we not still preserving the order with this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a List and you do
list.traverse { i => ???}
then the first thing cats does underneath is to copy the original List into a buffer. Whereas this new method does a single traversal.See the unit tests! It's because we iterate in a forwards direction, and then do
_ :: acc
which prepends (not appends) to the accumulated list.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah I get it now ! That's a great idea !