Skip to content

Commit

Permalink
tweak(core): use Deferred instad of Ref
Browse files Browse the repository at this point in the history
  • Loading branch information
i10416 committed Nov 2, 2024
1 parent 71cf68f commit fae648e
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions core/src/main/scala/org/http4s/grpc/ServerGrpc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -40,10 +40,10 @@ object ServerGrpc {
.flatMap(codecs.Messages.encodeSingle(encode)(_))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand All @@ -69,7 +69,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -83,10 +83,10 @@ object ServerGrpc {
.through(codecs.Messages.encode(encode))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream
Response[F](Status.Ok, HttpVersion.`HTTP/2`)
Expand All @@ -111,7 +111,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -125,10 +125,10 @@ object ServerGrpc {
.flatMap(codecs.Messages.encodeSingle(encode)(_))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand All @@ -154,7 +154,7 @@ object ServerGrpc {
): HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName =>
for {
status <- Ref.of[F, (Int, Option[String])]((0, Option.empty))
status <- Deferred[F, (Int, Option[String])]
trailers = status.get.map { case (i, message) =>
Headers(
NamedHeaders.GrpcStatus(i)
Expand All @@ -167,10 +167,10 @@ object ServerGrpc {
.through(codecs.Messages.encode(encode))
.through(timeoutStream(_)(timeout.map(_.duration)))
.onFinalizeCase {
case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None))
case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some))
case Resource.ExitCase.Canceled => status.set((1, None))
case _ => ().pure[F]
case Resource.ExitCase.Errored(_: TimeoutException) => status.complete((4, None)).void
case Resource.ExitCase.Errored(e) => status.complete((2, e.toString().some)).void
case Resource.ExitCase.Canceled => status.complete((1, None)).void
case Resource.ExitCase.Succeeded => status.complete((0, None)).void
}
.mask // ensures body closure without rst-stream

Expand Down

0 comments on commit fae648e

Please sign in to comment.