Skip to content

Commit

Permalink
Merge pull request #143 from i10416/tweak/use-deferred-instead-of-ref
Browse files Browse the repository at this point in the history
tweak(core): use Deferred instad of Ref
  • Loading branch information
danicheg authored Nov 26, 2024
2 parents 0c90c3f + 271a327 commit 211b47c
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions core/src/main/scala/org/http4s/grpc/ServerGrpc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -51,11 +51,11 @@ object ServerGrpc {
.evalMap(f(_, req.headers))
.flatMap(codecs.Messages.encodeSingle(encode)(_))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
.onFinalizeCaseWeak {
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand All @@ -81,7 +81,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -94,11 +94,11 @@ object ServerGrpc {
.flatMap(f(_, req.headers))
.through(codecs.Messages.encode(encode))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
.onFinalizeCaseWeak {
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream
Response[F](Status.Ok, HttpVersion.`HTTP/2`)
Expand All @@ -123,7 +123,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -136,11 +136,11 @@ object ServerGrpc {
.eval(f(codecs.Messages.decode(decode)(req.body), req.headers))
.flatMap(codecs.Messages.encodeSingle(encode)(_))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
.onFinalizeCaseWeak {
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand All @@ -166,7 +166,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -178,11 +178,11 @@ object ServerGrpc {
val body = f(codecs.Messages.decode(decode)(req.body), req.headers)
.through(codecs.Messages.encode(encode))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
.onFinalizeCaseWeak {
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand Down

0 comments on commit 211b47c

Please sign in to comment.