Skip to content

Commit

Permalink
unaryToStream also with content-length
Browse files Browse the repository at this point in the history
  • Loading branch information
hamnis committed Dec 5, 2024
1 parent c1c5a59 commit 1d935af
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions core/src/main/scala/org/http4s/grpc/ClientGrpc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,35 +65,37 @@ object ClientGrpc {
)( // Stuff we apply at invocation
message: A,
ctx: Headers,
): Stream[F, B] = {
val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`)
.putHeaders(
SharedGrpc.TE,
SharedGrpc.GrpcEncoding,
SharedGrpc.GrpcAcceptEncoding,
SharedGrpc.ContentType,
)
.putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*)
.withBodyStream(codecs.Messages.encodeSingle(encode)(message))
.withAttribute(H2Keys.Http2PriorKnowledge, ())
): Stream[F, B] =
Stream.eval(codecs.Messages.encodeToChunk(encode)(message)).flatMap { chunk =>
val req = Request[F](Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`)
.putHeaders(
SharedGrpc.TE,
SharedGrpc.GrpcEncoding,
SharedGrpc.GrpcAcceptEncoding,
SharedGrpc.ContentType,
`Content-Length`(chunk.size.toLong),
)
.putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*)
.withBodyStream(Stream.chunk(chunk))
.withAttribute(H2Keys.Http2PriorKnowledge, ())

Stream
.resource(client.run(req))
.flatMap(resp =>
Stream.eval(handleFailure(resp.headers)).drain ++
codecs.Messages
.decode[F, B](decode)(resp.body)
.handleErrorWith(e =>
Stream.eval(
resp.trailerHeaders
.flatMap(handleFailure[F])
.attempt
.flatMap(t => t.as(e).merge.raiseError[F, B])
)
) ++
Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain
)
}
Stream
.resource(client.run(req))
.flatMap(resp =>
Stream.eval(handleFailure(resp.headers)).drain ++
codecs.Messages
.decode[F, B](decode)(resp.body)
.handleErrorWith(e =>
Stream.eval(
resp.trailerHeaders
.flatMap(handleFailure[F])
.attempt
.flatMap(t => t.as(e).merge.raiseError[F, B])
)
) ++
Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain
)
}

def streamToUnary[F[_]: Concurrent, A, B]( // Stuff We can provide via codegen
encode: Encoder[A],
Expand All @@ -107,7 +109,7 @@ object ClientGrpc {
message: Stream[F, A],
ctx: Headers,
): F[B] = {
val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`)
val req = Request[F](Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`)
.putHeaders(
SharedGrpc.TE,
SharedGrpc.GrpcEncoding,
Expand Down

0 comments on commit 1d935af

Please sign in to comment.