Skip to content

Commit

Permalink
prelude: Batch effect (#633)
Browse files Browse the repository at this point in the history
An effect that provides functionality similar to ZIO Query.
  • Loading branch information
fwbrasil authored Sep 13, 2024
1 parent 9450eee commit f359d2d
Show file tree
Hide file tree
Showing 7 changed files with 966 additions and 13 deletions.
85 changes: 84 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,89 @@ val d: Int < Async =
Resource.run(c)
```

### Batch: Efficient Data Processing

The `Batch` effect provides a mechanism for efficient processing of data in batches, allowing for optimized handling of datasets. It includes a type parameter `S` that represents the possible effects that can occur in the data sources.

```scala
import kyo.*

// Using 'Batch.sourceSeq' for processing the entire sequence at once, returning a 'Seq'
val source1 = Batch.sourceSeq[Int, String, Any] { seq =>
seq.map(i => i.toString)
}

// Using 'Batch.sourceMap' for processing the entire sequence at once, returning a 'Map'
val source2 = Batch.sourceMap[Int, String, IO] { seq =>
// Source functions can perform arbitrary effects like 'IO' before returning the results
IO {
seq.map(i => i -> i.toString).toMap
}
}

// Using 'Batch.source' for individual effect suspensions
// This is a more generic method that allows effects for each of the inputs
val source3 = Batch.source[Int, String, IO] { seq =>
val map = seq.map { i =>
i -> IO((i * 2).toString)
}.toMap
(i: Int) => map(i)
}

// Example usage
val result =
for
a <- Batch.eval(Seq(1, 2, 3))
b1 <- source1(a)
b2 <- source2(a)
b3 <- source3(a)
yield (a, b1, b2, b3)

// Handle the effect
val finalResult: Seq[(Int, String, String, String)] < IO =
Batch.run(result)
```

When creating a source, it's important to note that the returned sequence must have the same number of elements as the input sequence. This restriction ensures consistent behavior and allows for proper batching of operations.

```scala
import kyo.*

// This is valid
val validSource = Batch.sourceSeq[Int, String, Any] { seq =>
seq.map(_.toString)
}

// This would cause a runtime error
val invalidSource = Batch.sourceSeq[Int, Int, Any] { seq =>
seq.filter(_ % 2 == 0)
}
```

It's crucial to understand that the batching is done based on the identity of the provided source function. To ensure proper batching, it's necessary to reuse the function returned by `Batch.source`. Creating a new source for each operation will prevent effective batching. For example:

```scala
import kyo.*

// Correct usage: reusing the source
val source = Batch.sourceSeq[Int, Int, IO] { seq =>
IO(seq.map(_ * 2))
}

val goodBatch = for
a <- Batch.eval(1 to 1000)
b <- source(a) // This will be batched
c <- source(b) // This will also be batched
yield c

// Incorrect usage: creating new sources inline
val badBatch = for
a <- Batch.eval(1 to 1000)
b <- Batch.sourceSeq[Int, Int, IO](seq => IO(seq.map(_ * 2)))(a) // This won't be batched
c <- Batch.sourceSeq[Int, Int, IO](seq => IO(seq.map(_ * 2)))(b) // This also won't be batched
yield c
```

### Choice: Exploratory Branching

The `Choice` effect is designed to aid in handling and exploring multiple options, pathways, or outcomes in a computation. This effect is particularly useful in scenario where you're dealing with decision trees, backtracking algorithms, or any situation that involves dynamically exploring multiple options.
Expand Down Expand Up @@ -1059,7 +1142,7 @@ val n: Array[Int] = a.toArray

// Flatten a nested chunk
val o: Chunk[Int] =
Chunk(a, b).flatten
Chunk(a, b).flattenChunk

// Obtain sequentially distict elements.
// Outputs: Chunk(1, 2, 3, 1)
Expand Down
4 changes: 2 additions & 2 deletions kyo-data/shared/src/main/scala/kyo/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ sealed abstract class Chunk[A] extends Seq[A] derives CanEqual:
* @return
* a flattened Chunk
*/
final def flatten[B](using ev: A =:= Chunk[B]): Chunk[B] =
final def flattenChunk[B](using ev: A =:= Chunk[B]): Chunk[B] =
if isEmpty then Chunk.empty
else
val nested = this.toArrayInternal
Expand All @@ -280,7 +280,7 @@ sealed abstract class Chunk[A] extends Seq[A] derives CanEqual:
copy()

Compact(unnested)
end flatten
end flattenChunk

/** Copies the elements of this Chunk to an array.
*
Expand Down
16 changes: 8 additions & 8 deletions kyo-data/shared/src/test/scala/kyo/ChunkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,41 +470,41 @@ class ChunkTest extends Test:
}
}

"flatten" - {
"flattenChunk" - {
"flattens a chunk of chunks in original order" in {
val chunk: Chunk[Chunk[Int]] = Chunk(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7, 8, 9))
assert(chunk.flatten == Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9))
assert(chunk.flattenChunk == Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9))
}

"returns an empty chunk when flattening an empty chunk of chunks" in {
val chunk = Chunk.empty[Chunk[Int]]
assert(chunk.flatten.isEmpty)
assert(chunk.flattenChunk.isEmpty)
}

"returns an empty chunk when flattening a chunk of empty chunks" in {
val chunk = Chunk(Chunk.empty[Int], Chunk.empty[Int])
assert(chunk.flatten.isEmpty)
assert(chunk.flattenChunk.isEmpty)
}

"preserves the order of elements within each chunk" in {
val chunk = Chunk(Chunk(1, 2, 3), Chunk(4, 5, 6))
assert(chunk.flatten == Chunk(1, 2, 3, 4, 5, 6))
assert(chunk.flattenChunk == Chunk(1, 2, 3, 4, 5, 6))
}

"handles a chunk containing a single chunk" in {
val chunk = Chunk(Chunk(1, 2, 3))
assert(chunk.flatten == Chunk(1, 2, 3))
assert(chunk.flattenChunk == Chunk(1, 2, 3))
}

"handles chunks of different sizes" in {
val chunk = Chunk(Chunk(1), Chunk(2, 3), Chunk(4, 5, 6, 7))
assert(chunk.flatten == Chunk(1, 2, 3, 4, 5, 6, 7))
assert(chunk.flattenChunk == Chunk(1, 2, 3, 4, 5, 6, 7))
}

"appends" in {
val chunk: Chunk[Chunk[Int]] =
Chunk(Chunk.empty[Int].append(1).append(2), Chunk(3, 4).append(5), Chunk(6, 7).append(8).append(9))
assert(chunk.flatten == Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9))
assert(chunk.flattenChunk == Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9))
}
}

Expand Down
171 changes: 171 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Batch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package kyo

import Batch.internal.*
import kyo.Tag
import kyo.kernel.*

/** The Batch effect allows for efficient batching and processing of operations.
*
* Batch is used to group multiple operations together and execute them in a single batch, which can lead to performance improvements,
* especially when dealing with external systems or databases.
*
* @tparam S
* Effects from batch sources
*/
sealed trait Batch[+S] extends ArrowEffect[Op[*, S], Id]

object Batch:

import internal.*

private inline def erasedTag[S]: Tag[Batch[S]] = Tag[Batch[Any]].asInstanceOf[Tag[Batch[S]]]

/** Creates a batched computation from a source function.
*
* @param f
* The source function with the following signature:
* - Input: `Seq[A]` - A sequence of input values to be processed in batch
* - Output: `(A => B < S) < S` - A function that, when evaluated, produces another function:
* - This inner function takes a single input `A` and returns a value `B` with effects `S`, allowing effects on each element
* individually.
* - The outer `< S` indicates that the creation of this function may itself involve effects `S`
*
* @return
* A function that takes a single input `A` and returns a batched computation `B < Batch[S]`
*
* This method allows for efficient batching of operations by processing multiple inputs at once, while still providing individual
* results for each input.
*/
inline def source[A, B, S](f: Seq[A] => (A => B < S) < S)(using inline frame: Frame): A => B < Batch[S] =
(v: A) => ArrowEffect.suspend[B](erasedTag[S], Op.Call(v, f))

/** Creates a batched computation from a source function that returns a Map.
*
* @param f
* The source function that takes a sequence of inputs and returns a Map of results
* @return
* A function that takes a single input and returns a batched computation
*/
inline def sourceMap[A, B, S](f: Seq[A] => Map[A, B] < S)(using inline frame: Frame): A => B < Batch[S] =
source[A, B, S] { input =>
f(input).map { output =>
require(
input.size == output.size,
s"Source created at ${frame.parse.position} returned a different number of elements than input: ${input.size} != ${output.size}"
)
((a: A) => output(a): B < S)
}
}

/** Creates a batched computation from a source function that returns a Sequence.
*
* @param f
* The source function that takes a sequence of inputs and returns a sequence of results
* @return
* A function that takes a single input and returns a batched computation
*/
inline def sourceSeq[A, B, S](f: Seq[A] => Seq[B] < S)(using inline frame: Frame): A => B < Batch[S] =
sourceMap[A, B, S] { input =>
f(input).map { output =>
require(
input.size == output.size,
s"Source created at ${frame.parse.position} returned a different number of elements than input: ${input.size} != ${output.size}"
)
input.zip(output).toMap
}
}

/** Evaluates a sequence of values in a batch.
*
* @param seq
* The sequence of values to evaluate
* @return
* A batched operation that produces a single value from the sequence
*/
inline def eval[A](seq: Seq[A])(using inline frame: Frame): A < Batch[Any] =
ArrowEffect.suspend[A](erasedTag[Any], Op.Eval(seq))

/** Applies a function to each element of a sequence in a batched context.
*
* This method is similar to `Kyo.foreach`, but instead of returning a `Seq[B]`, it returns a single value of type `B`.
*
* @param seq
* The sequence of values to process
* @param f
* The function to apply to each element
* @return
* A batched computation that produces a single value of type B
*/
inline def foreach[A, B, S](seq: Seq[A])(inline f: A => B < S): B < (Batch[Any] & S) =
ArrowEffect.suspendMap[A](erasedTag[Any], Op.Eval(seq))(f)

/** Runs a computation with Batch effect, executing all batched operations.
*
* @param v
* The computation to run
* @return
* A sequence of results from executing the batched operations
*/
def run[A: Flat, S, S2](v: A < (Batch[S] & S2))(using Frame): Seq[A] < (S & S2) =

case class Cont(op: Op[?, S], cont: Any => (Cont | A) < (Batch[S] & S & S2))
def runCont(v: (Cont | A) < (Batch[S] & S & S2)): (Cont | A) < (S & S2) =
// TODO workaround, Flat macro isn't inferring correctly with nested classes
import Flat.unsafe.bypass
ArrowEffect.handle(erasedTag[S], v) {
[C] => (input, cont) => Cont(input, v => cont(v.asInstanceOf[C]))
}
end runCont

case class Expanded(v: Any, source: Source[Any, Any, S], cont: Any => (Cont | A) < (Batch[S] & S & S2))
def expand(state: Seq[Cont | A]): Seq[Expanded | A] < (S & S2) =
Kyo.foreach(state) {
case Cont(Op.Eval(seq), cont) =>
Kyo.foreach(seq)(v => runCont(cont(v))).map(expand)
case Cont(Op.Call(v, source), cont) =>
Seq(Expanded(v, source.asInstanceOf, cont))
case a: A @unchecked =>
Seq(a)
}.map(_.flatten)

def loop(state: Seq[Cont | A]): Seq[A] < (S & S2) =
expand(state).map { expanded =>
val (completed, pending) =
expanded.zipWithIndex.partitionMap {
case (e: Expanded @unchecked, idx) => Right((e, idx))
case (a: A @unchecked, idx) => Left((a, idx))
}

if pending.isEmpty then
completed.sortBy(_._2).map(_._1)
else
val grouped = pending.groupBy(_._1.source)

Kyo.foreach(grouped.toSeq) { case (source, items) =>
val (expandedItems, indices) = items.unzip
val values = expandedItems.map(_.v)
val uniqueValues = values.distinct

source(uniqueValues).map { map =>
Kyo.foreach(expandedItems.zip(indices)) { case (e, idx) =>
runCont(map(e.v).map(e.cont)).map((_, idx))
}
}
}.map { results =>
loop(completed.map(_._1) ++ results.flatten.map(_._1))
}
end if
}
end loop
runCont(v).map(c => loop(Seq(c)))
end run

object internal:
type Source[A, B, S] = Seq[A] => (A => B < S) < S

enum Op[V, -S]:
case Eval[A](seq: Seq[A]) extends Op[A, Any]
case Call[A, B, S](v: A, source: Source[A, B, S]) extends Op[B, S]
end internal

end Batch
2 changes: 1 addition & 1 deletion kyo-prelude/shared/src/main/scala/kyo/Choice.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object Choice:
ArrowEffect.handle(Tag[Choice], v.map(Chunk[A](_))) {
[C] =>
(input, cont) =>
Kyo.foreach(input)(v => Choice.run(cont(v))).map(_.flatten.flatten)
Kyo.foreach(input)(v => Choice.run(cont(v))).map(_.flattenChunk.flattenChunk)
}

end Choice
2 changes: 1 addition & 1 deletion kyo-prelude/shared/src/main/scala/kyo/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
handle = [C] =>
(input, state, cont) =>
(state.append(input), cont(Continue())),
done = (state, _) => state.flatten
done = (state, _) => state.flattenChunk
)

end Stream
Expand Down
Loading

0 comments on commit f359d2d

Please sign in to comment.