Skip to content

Commit

Permalink
Merge pull request #67 from JD557/async-io
Browse files Browse the repository at this point in the history
Improve RIO async support
  • Loading branch information
JD557 authored Mar 5, 2021
2 parents 300be8b + 59a8f68 commit 9b01393
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class HtmlCanvas() extends LowLevelCanvas {
"keydown",
(ev: KeyboardEvent) => {
JsKeyMapping.getKey(ev.keyCode).foreach(k => keyboardInput = keyboardInput.press(k))
println("keydown")
}
)
dom.document.addEventListener[KeyboardEvent](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ object CanvasIO {
/** Store an unsafe canvas operation in a [[CanvasIO]]. */
def accessCanvas[A](f: Canvas => A): CanvasIO[A] = RIO.access[Canvas, A](f)

/** Polls a future and optionally returns its result. */
def pollFuture[A](future: Future[A]): CanvasIO[Option[Try[A]]] = RIO.pollFuture(future)
/** Returns a [[Poll]] from a function that receives a callback */
def fromCallback[A](operation: (Try[A] => Unit) => Unit): CanvasIO[Poll[A]] =
RIO.fromCallback(operation)

/** Fetches the canvas settings. */
val getSettings: CanvasIO[Canvas.Settings] = accessCanvas(_.settings)
Expand Down
73 changes: 73 additions & 0 deletions pure/shared/src/main/scala/eu/joaocosta/minart/pure/Poll.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package eu.joaocosta.minart.pure

import scala.concurrent._
import scala.util.{Try, Failure, Success}

import eu.joaocosta.minart.core._

/** Representation of a running asyncronous computation that can be polled.
*
* Note that, unlike Futures, operations on an Poll don't require an execution context and
* might be applied sequentially every time `poll` is called.
* While this might be inneficient, this is by design, to simplify multiplatform development.
*/
case class Poll[+A](poll: RIO[Any, Option[Try[A]]]) extends AnyVal {

/** Transforms the result of this operation. */
def transform[B](f: Try[A] => Try[B]): Poll[B] =
Poll[B](poll.map(_.map(f)))

/** Maps the result of this operation. */
def map[B](f: A => B): Poll[B] =
this.transform(_.map(f))

/** Combines two operations by applying a function to the result of the first operation. */
def flatMap[B](f: A => Poll[B]): Poll[B] = Poll[B](
poll.flatMap {
case Some(Success(x)) => f(x).poll
case Some(Failure(t)) => RIO.pure(Some(Failure(t)))
case None => RIO.pure(None)
}
)

/** Combines two operations by combining their results with the given function. */
def zipWith[B, C](that: Poll[B])(f: (A, B) => C): Poll[C] = this.flatMap(x => that.map(y => f(x, y)))

/** Combines two operations by combining their results into a tuple. */
def zip[B](that: Poll[B]): Poll[(A, B)] = this.zipWith(that)((x, y) => x -> y)

/** Changes the result of this operation to another value. */
def as[B](x: B): Poll[B] = this.map(_ => x)

/** Changes the result of this operation unit. */
def unit: Poll[Unit] = this.as(())
}

object Poll {

/** Lifts a value into a successful [[Poll]]. */
def successful[A](x: A): Poll[A] = Poll(RIO.pure(Some(Success(x))))

/** Lifts a value into a failed [[Poll]]. */
def failed(t: Throwable): Poll[Nothing] = Poll(RIO.pure(Some(Failure(t))))

/** Creates an [[Poll]] that never returns a value. */
val never: Poll[Nothing] = Poll(RIO.pure(None))

/** Builds an [[Poll]] from a running future */
def fromFuture[A](future: Future[A]): Poll[A] =
Poll(RIO.suspend(future.value))

/** Converts an `Iterable[Poll[A]]` into a `Poll[List[A]]`. */
def sequence[A](it: Iterable[Poll[A]]): Poll[List[A]] =
it.foldLeft[Poll[List[A]]](Poll.successful(Nil)) { case (acc, next) =>
acc.zipWith(next) { case (xs, x) => x :: xs }
}.map(_.reverse)

/** Converts an `Iterable[A]` into a `Poll[List[B]]` by applying an operation to each element. */
def traverse[A, B](it: Iterable[A])(f: A => Poll[B]): Poll[List[B]] =
it.foldLeft[Poll[List[B]]](Poll.successful(Nil)) { case (acc, next) =>
acc.zipWith(f(next)) { case (xs, x) => x :: xs }
}.map(_.reverse)

}
7 changes: 5 additions & 2 deletions pure/shared/src/main/scala/eu/joaocosta/minart/pure/RIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ object RIO {
/** Returns a operation that requires some resource. */
def access[R, A](f: R => A): RIO[R, A] = Suspend[R, A](f)

/** Polls a future and optionally returns its result. */
def pollFuture[A](future: Future[A]): RIO[Any, Option[Try[A]]] = Suspend[Any, Option[Try[A]]](_ => future.value)
/** Returns a [[Poll]] from a function that receives a callback */
def fromCallback[A](operation: (Try[A] => Unit) => Unit): RIO[Any, Poll[A]] = {
val promise = scala.concurrent.Promise[A]()
RIO.suspend(operation(promise.complete)).as(Poll.fromFuture(promise.future))
}

/** Converts an `Iterable[RIO[R, A]]` into a `RIO[R, List[A]]`. */
def sequence[R, A](it: Iterable[RIO[R, A]]): RIO[R, List[A]] =
Expand Down
55 changes: 55 additions & 0 deletions pure/shared/src/test/scala/eu/joaocosta/minart/pure/PollSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package eu.joaocosta.minart.pure

import scala.concurrent._
import scala.util.{Success, Failure}

import verify._

import eu.joaocosta.minart.core._

object PollSpec extends BasicTestSuite {
test("store pure results") {
val error = new Exception("error")
assert(Poll.successful(1).poll.run(()) == Some(Success(1)))
assert(Poll.failed(error).poll.run(()) == Some(Failure(error)))
assert(Poll.never.poll.run(()) == None)
}

test("allow polling Futures") {
val promise = Promise[Int]()
val io = Poll.fromFuture(promise.future)
assert(io.poll.run(()) == None)
promise.complete(scala.util.Success(0))
assert(io.poll.run(()) == Some(Success(0)))
}

test("provide a stack-safe transform operation") {
val io = (1 to 1000).foldLeft[Poll[Int]](Poll.failed(new Exception("error"))) { case (acc, _) =>
acc.transform(_.recover { case _ => 0 }.map(_ + 1))
}
assert(io.poll.run(()) == Some(Success(1000)))
}

test("provide a stack-safe map operation") {
val io = (1 to 1000).foldLeft[Poll[Int]](Poll.successful(0)) { case (acc, _) => acc.map(_ + 1) }
assert(io.poll.run(()) == Some(Success(1000)))
}

test("provide a stack-safe flatMap operation") {
val io = (1 to 1000).foldLeft[Poll[Int]](Poll.successful(0)) { case (acc, _) =>
acc.flatMap(x => Poll.successful(x + 1))
}
assert(io.poll.run(()) == Some(Success(1000)))
}

test("provide zip/zipWith operations") {
assert(Poll.successful(1).zip(Poll.successful(2)).poll.run(()) == Some(Success((1, 2))))

assert(Poll.successful(1).zipWith(Poll.successful(2))(_ + _).poll.run(()) == Some(Success(3)))
}

test("correctly sequence operations") {
val io = Poll.sequence(List(Poll.successful(1), Poll.successful(2), Poll.successful(3)))
assert(io.poll.run(()) == Some(Success(List(1, 2, 3))))
}
}
16 changes: 10 additions & 6 deletions pure/shared/src/test/scala/eu/joaocosta/minart/pure/RIOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ object RIOSpec extends BasicTestSuite {
assert(hasRun == true)
}

test("allow polling Futures") {
val promise = Promise[Int]()
val io = RIO.pollFuture(promise.future)
assert(io.run(()) == None)
promise.complete(scala.util.Success(0))
assert(io.run(()) == Some(scala.util.Success(0)))
test("allow polling async operations") {
var completer: Int => Unit = (_) => ()
val io = RIO
.fromCallback[Int] { cb =>
completer = (x: Int) => cb(scala.util.Success(x))
}
.run(())
assert(io.poll.run(()) == None)
completer(0)
assert(io.poll.run(()) == Some(scala.util.Success(0)))
}

test("provide a stack-safe map operation") {
Expand Down

0 comments on commit 9b01393

Please sign in to comment.