Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BIO: Add multiple collection operators, port error-accumulating collection operators from IzEither, add IO2#suspendSafe #2024

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,8 @@ lazy val `fundamentals-reflection` = project.in(file("fundamentals/fundamentals-
lazy val `fundamentals-bio` = project.in(file("fundamentals/fundamentals-bio"))
.dependsOn(
`fundamentals-language` % "test->compile;compile->compile",
`fundamentals-orphans` % "test->compile;compile->compile"
`fundamentals-orphans` % "test->compile;compile->compile",
`fundamentals-collections` % "test->compile;compile->compile"
)
.settings(
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package izumi.functional.bio

final class ErrorAccumulatingOpsTestZIO extends ErrorAccumulatingOpsTest[zio.IO] {
private val runner: UnsafeRun2[zio.IO] = UnsafeRun2.createZIO()

override implicit def F: Error2[zio.IO] = Root.Convert3To2(Root.BIOZIO)
override def unsafeRun[E, A](f: zio.IO[E, A]): Either[E, A] = runner.unsafeRun(f.attempt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ trait Applicative3[F[-_, +_, +_]] extends Functor3[F] {
def traverse_[R, E, A](l: Iterable[A])(f: A => F[R, E, Unit]): F[R, E, Unit] = void(traverse(l)(f))
def sequence[R, E, A](l: Iterable[F[R, E, A]]): F[R, E, List[A]] = traverse(l)(identity)
def sequence_[R, E](l: Iterable[F[R, E, Unit]]): F[R, E, Unit] = void(traverse(l)(identity))
def flatTraverse[R, E, A, B](l: Iterable[A])(f: A => F[R, E, Iterable[B]]): F[R, E, List[B]] = map(traverse(l)(f))(_.flatten)
def flatSequence[R, E, A](l: Iterable[F[R, E, Iterable[A]]]): F[R, E, List[A]] = flatTraverse(l)(identity)
def collect[R, E, A, B](l: Iterable[A])(f: A => F[R, E, Option[B]]): F[R, E, List[B]] = map(traverse(l)(f))(_.flatten)
def filter[R, E, A](l: Iterable[A])(f: A => F[R, E, Boolean]): F[R, E, List[A]] = collect(l)(a => map(f(a))(if (_) Some(a) else None))

def unit: F[Any, Nothing, Unit] = pure(())
@inline final def traverse[R, E, A, B](o: Option[A])(f: A => F[R, E, B]): F[R, E, Option[B]] = o match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package izumi.functional.bio

import izumi.fundamentals.platform.language.SourceFilePositionMaterializer

trait Error3[F[-_, +_, +_]] extends ApplicativeError3[F] with Monad3[F] {
import scala.annotation.nowarn

trait Error3[F[-_, +_, +_]] extends ApplicativeError3[F] with Monad3[F] with ErrorAccumulatingOps3[F] {

def catchAll[R, E, A, E2](r: F[R, E, A])(f: E => F[R, E2, A]): F[R, E2, A]

Expand Down Expand Up @@ -56,6 +58,12 @@ trait Error3[F[-_, +_, +_]] extends ApplicativeError3[F] with Monad3[F] {
catchAll(r: F[R1, E, A])(e => flatMap(f(e))(if (_) fail(e) else retryUntilF(r)(f)))
}

@nowarn("msg=Unused import")
def partition[R, E, A](l: Iterable[F[R, E, A]]): F[R, Nothing, (List[E], List[A])] = {
import scala.collection.compat.*
map(traverse(l)(attempt[R, E, A]))(_.partitionMap(identity))
}

/** for-comprehensions sugar:
*
* {{{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package izumi.functional.bio

import izumi.fundamentals.collections.nonempty.NEList

import scala.collection.compat.*
import scala.collection.compat.immutable.LazyList
import scala.collection.compat.immutable.LazyList.#::
import scala.collection.immutable.Queue

trait ErrorAccumulatingOps3[F[-_, +_, +_]] { this: Error3[F] =>

/** `traverse` with error accumulation */
def traverseAccumErrors[ColR[x] <: IterableOnce[x], ColL[_], R, E, A, B](
col: ColR[A]
)(f: A => F[R, ColL[E], B]
)(implicit
buildR: Factory[B, ColR[B]],
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], ColR[B]] = {
accumulateErrorsImpl(col)(
effect = f,
onLeft = (l: ColL[E]) => iterL(l),
init = Queue.empty[B],
onRight = (acc: Queue[B], v: B) => acc :+ v,
end = (acc: Queue[B]) => acc.to(buildR),
)
}

/** `traverse_` with error accumulation */
def traverseAccumErrors_[ColR[x] <: IterableOnce[x], ColL[_], R, E, A](
col: ColR[A]
)(f: A => F[R, ColL[E], Unit]
)(implicit
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], Unit] = {
accumulateErrorsImpl(col)(
effect = f,
onLeft = (l: ColL[E]) => iterL(l),
init = (),
onRight = (acc: Unit, _: Unit) => acc,
end = (acc: Unit) => acc,
)
}

/** `sequence` with error accumulation */
def sequenceAccumErrors[ColR[x] <: IterableOnce[x], ColL[_], R, E, A](
col: ColR[F[R, ColL[E], A]]
)(implicit
buildR: Factory[A, ColR[A]],
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], ColR[A]] = {
traverseAccumErrors(col)(identity)
}

/** `sequence_` with error accumulation */
def sequenceAccumErrors_[ColR[x] <: IterableOnce[x], ColL[_], R, E, A](
col: ColR[F[R, ColL[E], A]]
)(implicit
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], Unit] = {
traverseAccumErrors_(col)(void(_))
}

/** `sequence` with error accumulation */
def sequenceAccumErrorsNEList[ColR[x] <: IterableOnce[x], R, E, A](
col: ColR[F[R, E, A]]
)(implicit buildR: Factory[A, ColR[A]]
): F[R, NEList[E], ColR[A]] = {
accumulateErrorsImpl(col)(
effect = identity,
onLeft = (e: E) => Seq(e),
init = Queue.empty[A],
onRight = (ac: Queue[A], a: A) => ac :+ a,
end = (ac: Queue[A]) => ac.to(buildR),
)
}

/** `flatTraverse` with error accumulation */
def flatTraverseAccumErrors[ColR[x] <: IterableOnce[x], ColIn[x] <: IterableOnce[x], ColL[_], R, E, A, B](
col: ColR[A]
)(f: A => F[R, ColL[E], ColIn[B]]
)(implicit
buildR: Factory[B, ColR[B]],
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], ColR[B]] = {
accumulateErrorsImpl(col)(
effect = f,
onLeft = (l: ColL[E]) => iterL(l),
init = Queue.empty[B],
onRight = (acc: Queue[B], v: IterableOnce[B]) => acc ++ v,
end = (acc: Queue[B]) => acc.to(buildR),
)
}

/** `flatSequence` with error accumulation */
def flatSequenceAccumErrors[ColR[x] <: IterableOnce[x], ColIn[x] <: IterableOnce[x], ColL[_], R, E, A](
col: ColR[F[R, ColL[E], ColIn[A]]]
)(implicit
buildR: Factory[A, ColR[A]],
buildL: Factory[E, ColL[E]],
iterL: ColL[E] => IterableOnce[E],
): F[R, ColL[E], ColR[A]] = {
flatTraverseAccumErrors(col)(identity)
}

protected[this] def accumulateErrorsImpl[ColL[_], ColR[x] <: IterableOnce[x], R, E, E1, A, B, B1, AC](
col: ColR[A]
)(effect: A => F[R, E, B],
onLeft: E => IterableOnce[E1],
init: AC,
onRight: (AC, B) => AC,
end: AC => B1,
)(implicit buildL: Factory[E1, ColL[E1]]
): F[R, ColL[E1], B1] = {
def go(
bad: Queue[E1],
good: AC,
lazyList: LazyList[A],
allGood: Boolean,
): F[R, ColL[E1], B1] = {
lazyList match {
case h #:: tail =>
redeem(effect(h))(
e => go(bad ++ onLeft(e), good, tail, allGood = false),
v => {
val newGood = onRight(good, v)
go(bad, newGood, tail, allGood)
},
)
case _ =>
if (allGood) {
pure(end(good))
} else {
fail(bad.to(buildL))
}
}
}

go(Queue.empty[E1], init, col.iterator.to(LazyList), allGood = true)
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package izumi.functional.bio

import scala.collection.compat.*
import scala.util.Try

trait IO3[F[-_, +_, +_]] extends Panic3[F] {
Expand All @@ -22,7 +23,7 @@ trait IO3[F[-_, +_, +_]] extends Panic3[F] {
* {{{
* import izumi.functional.bio.F
*
* val referentiallyTransparentArrayAllocation: F[Nothing, Array[Byte]] = {
* val exceptionSafeArrayAllocation: F[Nothing, Array[Byte]] = {
* F.sync(new Array(256))
* }
* }}}
Expand All @@ -34,8 +35,12 @@ trait IO3[F[-_, +_, +_]] extends Panic3[F] {
*/
def sync[A](effect: => A): F[Any, Nothing, A]

/** Capture a side-effectful block of code that can throw exceptions and returns another effect */
def suspend[R, A](effect: => F[R, Throwable, A]): F[R, Throwable, A] = flatten(syncThrowable(effect))

/** Capture an _exception-safe_ side-effect that returns another effect */
def suspendSafe[R, E, A](effect: => F[R, E, A]): F[R, E, A] = flatten(sync(effect))

@inline final def apply[A](effect: => A): F[Any, Throwable, A] = syncThrowable(effect)

// defaults
Expand All @@ -49,4 +54,48 @@ trait IO3[F[-_, +_, +_]] extends Panic3[F] {
override def fromTry[A](effect: => Try[A]): F[Any, Throwable, A] = {
syncThrowable(effect.get)
}

override protected[this] def accumulateErrorsImpl[ColL[_], ColR[x] <: IterableOnce[x], R, E, E1, A, B, B1, AC](
col: ColR[A]
)(effect: A => F[R, E, B],
onLeft: E => IterableOnce[E1],
init: AC,
onRight: (AC, B) => AC,
end: AC => B1,
)(implicit buildL: Factory[E1, ColL[E1]]
): F[R, ColL[E1], B1] = {
suspendSafe {
val bad = buildL.newBuilder

val iterator = col.iterator
var good = init
var allGood = true

def go(): F[R, ColL[E1], B1] = {
suspendSafe(if (iterator.hasNext) {
redeem(effect(iterator.next()))(
e =>
suspendSafe {
allGood = false
bad ++= onLeft(e)
go()
},
v =>
suspendSafe {
good = onRight(good, v)
go()
},
)
} else {
if (allGood) {
pure(end(good))
} else {
fail(bad.result())
}
})
}

go()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,40 @@ trait Monad3[F[-_, +_, +_]] extends Applicative3[F] {
}
}

def foldLeft[R, E, A, AC](l: Iterable[A])(z: AC)(f: (AC, A) => F[R, E, AC]): F[R, E, AC] = {
def go(l: List[A], ac: AC): F[R, E, AC] = {
l match {
case head :: tail => flatMap(f(ac, head))(go(tail, _))
case Nil => pure(ac)
}
}
go(l.toList, z)
}

def find[R, E, A](l: Iterable[A])(f: A => F[R, E, Boolean]): F[R, E, Option[A]] = {
def go(l: List[A]): F[R, E, Option[A]] = {
l match {
case head :: tail => flatMap(f(head))(if (_) pure(Some(head)) else go(tail))
case Nil => pure(None)
}
}
go(l.toList)
}

def collectFirst[R, E, A, B](l: Iterable[A])(f: A => F[R, E, Option[B]]): F[R, E, Option[B]] = {
def go(l: List[A]): F[R, E, Option[B]] = {
l match {
case head :: tail =>
flatMap(f(head)) {
case res @ Some(_) => pure(res)
case None => go(tail)
}
case Nil => pure(None)
}
}
go(l.toList)
}

/**
* Execute an action repeatedly until its result fails to satisfy the given predicate
* and return that result, discarding all others.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ open class AsyncZio extends Async3[ZIO] /*with Local3[ZIO]*/ {

ZIO.suspend(effect)
}
@inline override final def suspendSafe[R, E, A](effect: => ZIO[R, E, A]): ZIO[R, E, A] = {
val byName: () => ZIO[R, E, A] = () => effect
implicit val trace: zio.Trace = InteropTracer.newTrace(byName)

ZIO.suspendSucceed(effect)
}

@inline override final def fail[E](v: => E): ZIO[Any, E, Nothing] = {
val byName: () => E = () => v
Expand Down Expand Up @@ -184,6 +190,21 @@ open class AsyncZio extends Async3[ZIO] /*with Local3[ZIO]*/ {
@inline override final def sequence[R, E, A](l: Iterable[ZIO[R, E, A]]): ZIO[R, E, List[A]] = ZIO.collectAll(l.toList)(implicitly, Tracer.instance.empty)
@inline override final def traverse_[R, E, A](l: Iterable[A])(f: A => ZIO[R, E, Unit]): ZIO[R, E, Unit] = ZIO.foreachDiscard(l)(f)(InteropTracer.newTrace(f))
@inline override final def sequence_[R, E](l: Iterable[ZIO[R, E, Unit]]): ZIO[R, E, Unit] = ZIO.foreachDiscard(l)(identity)(Tracer.instance.empty)
@inline override final def filter[R, E, A](l: Iterable[A])(f: A => ZIO[R, E, Boolean]): ZIO[R, E, List[A]] = ZIO.filter(l.toList)(f)(implicitly, InteropTracer.newTrace(f))

@inline override final def foldLeft[R, E, A, AC](l: Iterable[A])(z: AC)(f: (AC, A) => ZIO[R, E, AC]): ZIO[R, E, AC] = {
ZIO.foldLeft(l)(z)(f)(InteropTracer.newTrace(f))
}

@inline override final def find[R, E, A](l: Iterable[A])(f: A => ZIO[R, E, Boolean]): ZIO[R, E, Option[A]] = {
val trace = InteropTracer.newTrace(f)

ZIO.collectFirst(l)(a => f(a).map(if (_) Some(a) else None)(trace))(trace)
}

@inline override final def collectFirst[R, E, A, B](l: Iterable[A])(f: A => ZIO[R, E, Option[B]]): ZIO[R, E, Option[B]] = {
ZIO.collectFirst(l)(f)(InteropTracer.newTrace(f))
}

@inline override final def sandbox[R, E, A](r: ZIO[R, E, A]): ZIO[R, Exit.Failure[E], A] = {
implicit val trace: zio.Trace = Tracer.instance.empty
Expand Down
Loading