diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..b4075ea --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# #97 Introduce sbt-http4s-org plugin +b92d1dbfcc29d3afcca0d2c143528f07b949dcd3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0bf079..7bf65a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,11 +26,25 @@ jobs: build: name: Build and Test strategy: + fail-fast: false matrix: os: [ubuntu-latest] scala: [2.13, 3] - java: [temurin@8] + java: [temurin@8, temurin@11, temurin@17] project: [http4s-grpcJVM, http4s-grpcJS, http4s-grpcNative] + exclude: + - scala: 3 + java: temurin@11 + - scala: 3 + java: temurin@17 + - project: http4s-grpcJS + java: temurin@11 + - project: http4s-grpcJS + java: temurin@17 + - project: http4s-grpcNative + java: temurin@11 + - project: http4s-grpcNative + java: temurin@17 runs-on: ${{ matrix.os }} timeout-minutes: 60 steps: @@ -52,9 +66,39 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@11) + id: setup-java-temurin-11 + if: matrix.java == 'temurin@11' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 11 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false' + run: sbt +update + + - name: Setup Java (temurin@17) + id: setup-java-temurin-17 + if: matrix.java == 'temurin@17' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 17 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' + run: sbt +update + - name: Check that workflows are up to date run: sbt githubWorkflowCheck + - name: Check headers and formatting + if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest' + run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' headerCheckAll scalafmtCheckAll 'project /' scalafmtSbtCheck + - name: scalaJSLink if: matrix.project == 'http4s-grpcJS' run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' Test/scalaJSLinkerResult @@ -74,6 +118,14 @@ jobs: if: matrix.java == 'temurin@8' && matrix.os == 'ubuntu-latest' run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' doc + - name: Check scalafix lints + if: matrix.java == 'temurin@8' && !startsWith(matrix.scala, '3') + run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' 'scalafixAll --check' + + - name: Check unused compile dependencies + if: matrix.java == 'temurin@8' + run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' unusedCompileDependenciesTest + - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') run: mkdir -p core/.native/target core/.js/target codegen/plugin/target core/.jvm/target codegen/generator/target project/target @@ -117,6 +169,32 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@11) + id: setup-java-temurin-11 + if: matrix.java == 'temurin@11' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 11 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false' + run: sbt +update + + - name: Setup Java (temurin@17) + id: setup-java-temurin-17 + if: matrix.java == 'temurin@17' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 17 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' + run: sbt +update + - name: Download target directories (2.13, http4s-grpcJVM) uses: actions/download-artifact@v4 with: @@ -201,39 +279,6 @@ jobs: SONATYPE_CREDENTIAL_HOST: ${{ secrets.SONATYPE_CREDENTIAL_HOST }} run: sbt tlCiRelease - dependency-submission: - name: Submit Dependencies - if: github.event_name != 'pull_request' - strategy: - matrix: - os: [ubuntu-latest] - java: [temurin@8] - runs-on: ${{ matrix.os }} - steps: - - name: Checkout current branch (full) - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Setup Java (temurin@8) - id: setup-java-temurin-8 - if: matrix.java == 'temurin@8' - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: 8 - cache: sbt - - - name: sbt update - if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' - run: sbt +update - - - name: Submit Dependencies - uses: scalacenter/sbt-dependency-submission@v2 - with: - modules-ignore: http4s-grpcjvm_2.13 http4s-grpcjvm_3 codegeneratortesting_native0.4_2.13 codegeneratortesting_native0.4_3 codegeneratortesting_sjs1_2.13 codegeneratortesting_sjs1_3 site_2.13 site_3 http4s-grpcjs_2.13 http4s-grpcjs_3 http4s-grpcnative_2.13 http4s-grpcnative_3 codegeneratortesting_2.13 codegeneratortesting_3 - configs-ignore: test scala-tool scala-doc-tool test-internal - site: name: Generate Site strategy: @@ -273,6 +318,19 @@ jobs: if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@17) + id: setup-java-temurin-17 + if: matrix.java == 'temurin@17' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 17 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' + run: sbt +update + - name: Generate site run: sbt site/tlSite diff --git a/.scalafix.conf b/.scalafix.conf new file mode 100644 index 0000000..ab1b91a --- /dev/null +++ b/.scalafix.conf @@ -0,0 +1,15 @@ +rules = [ + Http4sFs2Linters + Http4sGeneralLinters + Http4sUseLiteralsSyntax + LeakingImplicitClassVal + ExplicitResultTypes + OrganizeImports +] + +triggered.rules = [ + Http4sFs2Linters + Http4sGeneralLinters + Http4sUseLiteralsSyntax + LeakingImplicitClassVal +] diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..11865a7 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,38 @@ +version = 3.8.0 + +style = default + +maxColumn = 100 + +// Docstring wrapping breaks doctests +docstrings.wrap = false + +// Vertical alignment is pretty, but leads to bigger diffs +align.preset = none + +danglingParentheses.preset = true + +rewrite.rules = [ + AvoidInfix + RedundantBraces + RedundantParens + PreferCurlyFors + SortModifiers +] + +rewrite.sortModifiers.order = [ + override, implicit, private, protected, final, sealed, abstract, lazy +] + +rewrite.trailingCommas.style = multiple + +runner.dialect = scala212 + +fileOverride { + "glob:**/scala-3/**/*.scala" { + runner.dialect = scala3 + } + "glob:**/scala-2.13/**/*.scala" { + runner.dialect = scala213 + } +} diff --git a/build.sbt b/build.sbt index 0647969..c019d31 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,13 @@ -ThisBuild / tlBaseVersion := "0.0" // your current series x.y +import explicitdeps.ExplicitDepsPlugin.autoImport.moduleFilterRemoveValue + +ThisBuild / tlBaseVersion := "0.1" -ThisBuild / organization := "io.chrisdavenport" -ThisBuild / organizationName := "Christopher Davenport" ThisBuild / licenses := Seq(License.MIT) ThisBuild / developers := List( tlGitHubDev("christopherdavenport", "Christopher Davenport") ) ThisBuild / tlCiReleaseBranches := Seq("main") -ThisBuild / tlSonatypeUseLegacyHost := true +ThisBuild / tlSonatypeUseLegacyHost := false ThisBuild / tlMimaPreviousVersions := Set() @@ -17,6 +17,12 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.1") ThisBuild / scalaVersion := Scala213 +// disable sbt-header plugin until we are not aligned on the license +ThisBuild / headerCheckAll := Nil + +// temporarily disable dependency submissions in CI +ThisBuild / tlCiDependencyGraphJob := false + val catsV = "2.10.0" val catsEffectV = "3.5.4" val fs2V = "3.9.2" @@ -27,41 +33,47 @@ import scalapb.compiler.Version.scalapbVersion // Projects lazy val `http4s-grpc` = tlCrossRootProject .aggregate(core, codeGenerator, codeGeneratorTesting, codeGeneratorPlugin) + .settings( + unusedCompileDependenciesFilter -= moduleFilter() + ) + .disablePlugins(HeaderPlugin) lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) .crossType(CrossType.Pure) .in(file("core")) .settings( name := "http4s-grpc", - libraryDependencies ++= Seq( - "org.typelevel" %%% "cats-core" % catsV, - "org.typelevel" %%% "cats-effect" % catsEffectV, - - "co.fs2" %%% "fs2-core" % fs2V, - "co.fs2" %%% "fs2-io" % fs2V, - "co.fs2" %%% "fs2-scodec" % fs2V, - - "org.http4s" %%% "http4s-dsl" % http4sV, - "org.http4s" %%% "http4s-ember-server" % http4sV, - "org.http4s" %%% "http4s-ember-client" % http4sV, - - "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, - + "org.typelevel" %%% "cats-core" % catsV, + "org.typelevel" %%% "cats-effect" % catsEffectV, + "co.fs2" %%% "fs2-core" % fs2V, + "co.fs2" %%% "fs2-io" % fs2V, + "co.fs2" %%% "fs2-scodec" % fs2V, + "org.http4s" %%% "http4s-dsl" % http4sV, + "org.http4s" %%% "http4s-ember-server" % http4sV, + "org.http4s" %%% "http4s-ember-client" % http4sV, + "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, "com.thesamet.scalapb" %%% "scalapb-runtime" % scalapbVersion, - - ) - ).jsSettings( - scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule)}, + ), + unusedCompileDependenciesFilter -= moduleFilter(), ) - -lazy val codeGenerator = project.in(file("codegen/generator")).settings( - name := "http4s-grpc-generator", - crossScalaVersions := Seq(Scala212), - libraryDependencies ++= Seq( - "com.thesamet.scalapb" %% "compilerplugin" % scalapbVersion + .jsSettings( + scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule) } ) -) + .disablePlugins(HeaderPlugin) + +lazy val codeGenerator = + project + .in(file("codegen/generator")) + .settings( + name := "http4s-grpc-generator", + crossScalaVersions := Seq(Scala212), + libraryDependencies ++= Seq( + "com.thesamet.scalapb" %% "compilerplugin" % scalapbVersion + ), + unusedCompileDependenciesFilter -= moduleFilter(), + ) + .disablePlugins(HeaderPlugin, ScalafixPlugin) lazy val codegenFullName = "org.http4s.grpc.generator.Http4sGrpcCodeGenerator" @@ -86,8 +98,10 @@ lazy val codeGeneratorPlugin = project "com.thesamet.scalapb" %% "compilerplugin" % scalapbVersion ), addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.7"), - addSbtPlugin("org.portable-scala" % "sbt-platform-deps" % "1.0.2") + addSbtPlugin("org.portable-scala" % "sbt-platform-deps" % "1.0.2"), + unusedCompileDependenciesFilter -= moduleFilter(), ) + .disablePlugins(HeaderPlugin, ScalafixPlugin) lazy val codeGeneratorTesting = crossProject(JVMPlatform, JSPlatform, NativePlatform) .crossType(CrossType.Pure) @@ -95,26 +109,31 @@ lazy val codeGeneratorTesting = crossProject(JVMPlatform, JSPlatform, NativePlat .enablePlugins(LocalCodeGenPlugin, BuildInfoPlugin, NoPublishPlugin) .dependsOn(core) .settings( + tlFatalWarnings := false, codeGenClasspath := (codeGenerator / Compile / fullClasspath).value, Compile / PB.targets := Seq( scalapb.gen(grpc = false) -> (Compile / sourceManaged).value / "scalapb", - genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "http4s-grpc" + genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "http4s-grpc", ), Compile / PB.protoSources += baseDirectory.value.getParentFile / "src" / "main" / "protobuf", libraryDependencies ++= Seq( "com.thesamet.scalapb" %%% "scalapb-runtime" % scalapbVersion % "protobuf", "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, - "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test + "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, ), buildInfoPackage := "org.http4s.grpc.e2e.buildinfo", - buildInfoKeys := Seq[BuildInfoKey]("sourceManaged" -> (Compile / sourceManaged).value / "http4s-grpc"), - githubWorkflowArtifactUpload := false + buildInfoKeys := Seq[BuildInfoKey]( + "sourceManaged" -> (Compile / sourceManaged).value / "http4s-grpc" + ), + githubWorkflowArtifactUpload := false, + unusedCompileDependenciesFilter -= moduleFilter(), ) + .disablePlugins(HeaderPlugin, ScalafixPlugin) - -lazy val site = project.in(file("site")) +lazy val site = project + .in(file("site")) .enablePlugins(TypelevelSitePlugin) .dependsOn(core.jvm) .settings( - tlSiteIsTypelevelProject := Some(TypelevelProject.Affiliate), + tlSiteIsTypelevelProject := Some(TypelevelProject.Affiliate) ) diff --git a/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcCodeGenerator.scala b/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcCodeGenerator.scala index 2709f9f..4817fc3 100644 --- a/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcCodeGenerator.scala +++ b/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcCodeGenerator.scala @@ -33,8 +33,8 @@ object Http4sGrpcCodeGenerator extends CodeGenApp { def generateServiceFiles( file: FileDescriptor, - di: DescriptorImplicits - ): Seq[PluginProtos.CodeGeneratorResponse.File] = { + di: DescriptorImplicits, + ): Seq[PluginProtos.CodeGeneratorResponse.File] = file.getServices.asScala.map { service => val p = new Http4sGrpcServicePrinter(service, di) @@ -45,30 +45,27 @@ object Http4sGrpcCodeGenerator extends CodeGenApp { b.setContent(code) b.build }.toSeq - } - private def parseParameters(params: String): Either[String, (GeneratorParams)] = + private def parseParameters(params: String): Either[String, GeneratorParams] = for { paramsAndUnparsed <- GeneratorParams.fromStringCollectUnrecognized(params) params = paramsAndUnparsed._1 - unparsed = paramsAndUnparsed._2 + // unparsed = paramsAndUnparsed._2 } yield params - def process(request: CodeGenRequest): CodeGenResponse = { + def process(request: CodeGenRequest): CodeGenResponse = parseParameters(request.parameter) match { case Right(params) => val implicits = DescriptorImplicits.fromCodeGenRequest(params, request) val srvFiles = request.filesToGenerate.flatMap(generateServiceFiles(_, implicits)) CodeGenResponse.succeed( srvFiles, - Set(PluginProtos.CodeGeneratorResponse.Feature.FEATURE_PROTO3_OPTIONAL) + Set(PluginProtos.CodeGeneratorResponse.Feature.FEATURE_PROTO3_OPTIONAL), ) case Left(error) => CodeGenResponse.fail(error) } - } - override def registerExtensions(registry: ExtensionRegistry): Unit = { + override def registerExtensions(registry: ExtensionRegistry): Unit = Scalapb.registerAllExtensions(registry) - } } diff --git a/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcServicePrinter.scala b/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcServicePrinter.scala index 316b87f..5270932 100644 --- a/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcServicePrinter.scala +++ b/codegen/generator/src/main/scala/org/http4s/grpc/generator/Http4sGrpcServicePrinter.scala @@ -40,33 +40,36 @@ class Http4sGrpcServicePrinter(service: ServiceDescriptor, di: DescriptorImplici private[this] def serviceMethodSignature(method: MethodDescriptor) = { - val scalaInType = "_root_."+method.inputType.scalaType - val scalaOutType = "_root_."+method.outputType.scalaType + val scalaInType = "_root_." + method.inputType.scalaType + val scalaOutType = "_root_." + method.outputType.scalaType val ctx = s"ctx: $Ctx" s"def ${method.name}" + (method.streamType match { case StreamType.Unary => s"(request: $scalaInType, $ctx): F[$scalaOutType]" - case StreamType.ClientStreaming => s"(request: $Stream[F, $scalaInType], $ctx): F[$scalaOutType]" + case StreamType.ClientStreaming => + s"(request: $Stream[F, $scalaInType], $ctx): F[$scalaOutType]" case StreamType.ServerStreaming => s"(request: $scalaInType, $ctx): $Stream[F, $scalaOutType]" - case StreamType.Bidirectional => s"(request: $Stream[F, $scalaInType], $ctx): $Stream[F, $scalaOutType]" + case StreamType.Bidirectional => + s"(request: $Stream[F, $scalaInType], $ctx): $Stream[F, $scalaOutType]" }) } - private[this] def handleMethod(method: MethodDescriptor) = { + private[this] def handleMethod(method: MethodDescriptor) = method.streamType match { case StreamType.Unary => "unaryToUnary" case StreamType.ClientStreaming => "streamToUnary" case StreamType.ServerStreaming => "unaryToStream" case StreamType.Bidirectional => "streamToStream" } - } private[this] def createClientCall(method: MethodDescriptor) = { val encode = s"$Codec.codecForGenerated(_root_.${method.inputType.scalaType})" val decode = s"$Codec.codecForGenerated(_root_.${method.outputType.scalaType})" val serviceName = method.getService.getFullName val methodName = method.getName - s"""$ClientGrpc.${handleMethod(method)}($encode, $decode, "$serviceName", "$methodName")(client, baseUri)(request, ctx)""" + s"""$ClientGrpc.${handleMethod( + method + )}($encode, $decode, "$serviceName", "$methodName")(client, baseUri)(request, ctx)""" } private[this] def serviceMethodImplementation(method: MethodDescriptor): PrinterEndo = { p => @@ -78,15 +81,17 @@ class Http4sGrpcServicePrinter(service: ServiceDescriptor, di: DescriptorImplici } private[this] def serviceBindingImplementation(method: MethodDescriptor): PrinterEndo = { p => - val serviceCall = s"serviceImpl.${method.name}" - val eval = if (method.isServerStreaming) s"$Stream.eval(mkCtx(m))" else "mkCtx(m)" + // val serviceCall = s"serviceImpl.${method.name}" + // val eval = if (method.isServerStreaming) s"$Stream.eval(mkCtx(m))" else "mkCtx(m)" val decode = s"$Codec.codecForGenerated(${method.inputType.scalaType})" val encode = s"$Codec.codecForGenerated(${method.outputType.scalaType})" val serviceName = method.getService.getFullName val methodName = method.getName - p.add(s""".combineK($ServerGrpc.${handleMethod(method)}($decode, $encode, "$serviceName", "$methodName")(serviceImpl.${method.name}(_, _)))""") + p.add(s""".combineK($ServerGrpc.${handleMethod( + method + )}($decode, $encode, "$serviceName", "$methodName")(serviceImpl.${method.name}(_, _)))""") } private[this] def serviceMethods: PrinterEndo = _.call(service.methods.map { method => @@ -97,8 +102,7 @@ class Http4sGrpcServicePrinter(service: ServiceDescriptor, di: DescriptorImplici _.call(service.methods.map(serviceMethodImplementation): _*) private[this] def serviceBindingImplementations: PrinterEndo = - _.add(s"$HttpRoutes.empty[F]") - .indent + _.add(s"$HttpRoutes.empty[F]").indent .call(service.methods.map(serviceBindingImplementation): _*) .add(s""".combineK($ServerGrpc.methodNotFoundRoute("${service.getFullName()}"))""") .outdent @@ -126,33 +130,30 @@ class Http4sGrpcServicePrinter(service: ServiceDescriptor, di: DescriptorImplici .newline .add("}") - private[this] def serviceClient: PrinterEndo = { + private[this] def serviceClient: PrinterEndo = _.add( s"def fromClient[F[_]: $Concurrent](client: $Client[F], baseUri: $Uri): $serviceName[F] = new _root_.$servicePkgName.$serviceName[F] {" ).indent .call(serviceMethodImplementations) .outdent .add("}") - } - private[this] def serviceBinding: PrinterEndo = { + private[this] def serviceBinding: PrinterEndo = _.add( s"def toRoutes[F[_]: $Temporal](serviceImpl: _root_.$servicePkgName.$serviceName[F]): $HttpRoutes[F] = {" ).indent .call(serviceBindingImplementations) .outdent .add("}") - } // / - def printService(printer: FunctionalPrinter): FunctionalPrinter = { + def printService(printer: FunctionalPrinter): FunctionalPrinter = printer .add(s"package $servicePkgName", "", "import _root_.cats.syntax.all._", "") .call(serviceTrait) .newline .call(serviceObject) - } } object Http4sGrpcServicePrinter { @@ -183,4 +184,4 @@ object Http4sGrpcServicePrinter { } -} \ No newline at end of file +} diff --git a/codegen/plugin/src/main/scala/org/http4s/grpc/sbt/Http4sGrpcPlugin.scala b/codegen/plugin/src/main/scala/org/http4s/grpc/sbt/Http4sGrpcPlugin.scala index d3c1bcd..c341303 100644 --- a/codegen/plugin/src/main/scala/org/http4s/grpc/sbt/Http4sGrpcPlugin.scala +++ b/codegen/plugin/src/main/scala/org/http4s/grpc/sbt/Http4sGrpcPlugin.scala @@ -35,7 +35,8 @@ object Http4sGrpcPlugin extends AutoPlugin { object autoImport { val http4sGrpcVersion: String = BuildInfo.version val http4sGrpcOutputPath = settingKey[File]("Directory for sources generated by http4s-grpc") - val http4sGrpcScalaPBOptions = settingKey[Seq[String]]("Options forwarded to the ScalaPB generator") + val http4sGrpcScalaPBOptions = + settingKey[Seq[String]]("Options forwarded to the ScalaPB generator") } import autoImport._ @@ -50,17 +51,17 @@ object Http4sGrpcPlugin extends AutoPlugin { Artifact( BuildInfo.organization, s"${BuildInfo.codeGeneratorModule}_${BuildInfo.scalaBinaryVersion}", - BuildInfo.version + BuildInfo.version, ), BuildInfo.codeGeneratorClass + "$", - Nil + Nil, ), (Compile / http4sGrpcOutputPath).value, - (Compile / http4sGrpcScalaPBOptions).value + (Compile / http4sGrpcScalaPBOptions).value, ) }, libraryDependencies ++= Seq( - BuildInfo.organization %%% BuildInfo.coreModule % BuildInfo.version, - ) + BuildInfo.organization %%% BuildInfo.coreModule % BuildInfo.version + ), ) } diff --git a/codegen/testing/src/test/scala/hello/world/TestServiceSuite.scala b/codegen/testing/src/test/scala/hello/world/TestServiceSuite.scala index c26f675..8935582 100644 --- a/codegen/testing/src/test/scala/hello/world/TestServiceSuite.scala +++ b/codegen/testing/src/test/scala/hello/world/TestServiceSuite.scala @@ -1,19 +1,21 @@ package hello.world -import cats.syntax.all._ import cats.effect.IO +import cats.syntax.all._ import fs2.Stream import munit._ -import org.http4s.client.Client import org.http4s.Headers import org.http4s.Uri +import org.http4s.client.Client import org.http4s.syntax.all._ -import org.scalacheck._, Arbitrary.arbitrary +import org.scalacheck._ import org.scalacheck.effect.PropF.forAllF +import Arbitrary.arbitrary + class TestServiceSuite extends CatsEffectSuite with ScalaCheckEffectSuite { - - val impl = new TestService[IO] { + + val impl: TestService[IO] = new TestService[IO] { def noStreaming(request: TestMessage, ctx: Headers): IO[TestMessage] = IO(request) @@ -33,13 +35,15 @@ class TestServiceSuite extends CatsEffectSuite with ScalaCheckEffectSuite { for { a <- arbitrary[String] b <- arbitrary[Int] - c <- Gen.option(Gen.oneOf(Color.RED, Color.GREEN, Color.BLUE).map(TestMessage.NestedMessage(_))) + c <- Gen.option( + Gen.oneOf(Color.RED, Color.GREEN, Color.BLUE).map(TestMessage.NestedMessage(_)) + ) } yield TestMessage(a, b, c) ) - val client = TestService.fromClient[IO]( + val client: TestService[IO] = TestService.fromClient[IO]( Client.fromHttpApp(TestService.toRoutes(impl).orNotFound), - Uri() + Uri(), ) test("no streaming") { @@ -50,7 +54,9 @@ class TestServiceSuite extends CatsEffectSuite with ScalaCheckEffectSuite { test("client streaming") { forAllF { (msg: TestMessage, tail: List[TestMessage]) => - client.clientStreaming(Stream.emits(msg :: tail), Headers.empty).assertEquals(tail.lastOption.getOrElse(msg)) + client + .clientStreaming(Stream.emits(msg :: tail), Headers.empty) + .assertEquals(tail.lastOption.getOrElse(msg)) } } @@ -68,36 +74,43 @@ class TestServiceSuite extends CatsEffectSuite with ScalaCheckEffectSuite { test("Routes returns missing method") { val client = Client.fromHttpApp(TestService.toRoutes(impl).orNotFound) - client.run(org.http4s.Request[IO](org.http4s.Method.POST, uri"/hello.world.TestService/missingMethod")) - .use{ resp => + client + .run( + org.http4s.Request[IO](org.http4s.Method.POST, uri"/hello.world.TestService/missingMethod") + ) + .use { resp => val headers = resp.headers val status = headers.get[org.http4s.grpc.codecs.NamedHeaders.GrpcStatus] status.pure[IO] - }.assertEquals( + } + .assertEquals( Some(org.http4s.grpc.codecs.NamedHeaders.GrpcStatus(12)) ) } - test("Client fails with initial failure"){ + test("Client fails with initial failure") { forAllF { (msg: TestMessage) => - val route = org.http4s.HttpRoutes.of[IO]{ - case _ => org.http4s.Response(org.http4s.Status.Ok) + val route = org.http4s.HttpRoutes.of[IO] { case _ => + org.http4s + .Response(org.http4s.Status.Ok) .putHeaders( org.http4s.grpc.codecs.NamedHeaders.GrpcStatus(12) - ).pure[IO] + ) + .pure[IO] } val client = TestService.fromClient[IO]( Client.fromHttpApp(route.orNotFound), - Uri() + Uri(), ) - client.`export`(msg, Headers.empty) + client + .`export`(msg, Headers.empty) .attemptNarrow[org.http4s.grpc.GrpcExceptions.StatusRuntimeException] .map(_.leftMap(grpcFailed => grpcFailed.status)) .assertEquals(Either.left(12)) } } - test("Server Fails in Trailers"){ + test("Server Fails in Trailers") { forAllF { (msg: TestMessage) => val ts = new TestService[IO] { @@ -117,10 +130,11 @@ class TestServiceSuite extends CatsEffectSuite with ScalaCheckEffectSuite { } val client = TestService.fromClient[IO]( Client.fromHttpApp(TestService.toRoutes[IO](ts).orNotFound), - Uri() + Uri(), ) - client.noStreaming(msg, Headers.empty) + client + .noStreaming(msg, Headers.empty) .attemptNarrow[org.http4s.grpc.GrpcExceptions.StatusRuntimeException] .map(_.leftMap(grpcFailed => grpcFailed.status)) .assertEquals(Either.left(2)) diff --git a/core/src/main/scala/org/http4s/grpc/ClientGrpc.scala b/core/src/main/scala/org/http4s/grpc/ClientGrpc.scala index 57ed71c..e0b83da 100644 --- a/core/src/main/scala/org/http4s/grpc/ClientGrpc.scala +++ b/core/src/main/scala/org/http4s/grpc/ClientGrpc.scala @@ -3,115 +3,173 @@ package org.http4s.grpc import cats._ import cats.effect._ import cats.syntax.all._ +import fs2._ import org.http4s._ import org.http4s.client.Client -import scodec.{Encoder, Decoder} -import fs2._ import org.http4s.ember.core.h2.H2Keys import org.http4s.grpc.codecs.NamedHeaders +import scodec.Decoder +import scodec.Encoder object ClientGrpc { - def unaryToUnary[F[_]: Concurrent, A, B](// Stuff We can provide via codegen - encode: Encoder[A], - decode: Decoder[B], - serviceName: String, - methodName: String, + def unaryToUnary[F[_]: Concurrent, A, B]( // Stuff We can provide via codegen + encode: Encoder[A], + decode: Decoder[B], + serviceName: String, + methodName: String, )( // Stuff We can apply at application scope - client: Client[F], baseUri: Uri + client: Client[F], + baseUri: Uri, )( // Stuff we apply at invocation - message: A, ctx: Headers + message: A, + ctx: Headers, ): F[B] = { val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`) - .putHeaders(SharedGrpc.TE, SharedGrpc.GrpcEncoding, SharedGrpc.GrpcAcceptEncoding, SharedGrpc.ContentType) - .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw):_*) + .putHeaders( + SharedGrpc.TE, + SharedGrpc.GrpcEncoding, + SharedGrpc.GrpcAcceptEncoding, + SharedGrpc.ContentType, + ) + .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*) .withBodyStream(codecs.Messages.encodeSingle(encode)(message)) .withAttribute(H2Keys.Http2PriorKnowledge, ()) - client.run(req).use( resp => - handleFailure(resp.headers) >> - codecs.Messages.decodeSingle(decode)(resp.body).handleErrorWith( - e => resp.trailerHeaders.flatMap(handleFailure[F]).attempt.flatMap(t => t.as(e).merge.raiseError[F, B]) - ) <* - resp.trailerHeaders.flatMap(handleFailure[F]) - ) + client + .run(req) + .use(resp => + handleFailure(resp.headers) >> + codecs.Messages + .decodeSingle(decode)(resp.body) + .handleErrorWith(e => + resp.trailerHeaders + .flatMap(handleFailure[F]) + .attempt + .flatMap(t => t.as(e).merge.raiseError[F, B]) + ) <* + resp.trailerHeaders.flatMap(handleFailure[F]) + ) } - - def unaryToStream[F[_]: Concurrent, A, B](// Stuff We can provide via codegen - encode: Encoder[A], - decode: Decoder[B], - serviceName: String, - methodName: String, + def unaryToStream[F[_]: Concurrent, A, B]( // Stuff We can provide via codegen + encode: Encoder[A], + decode: Decoder[B], + serviceName: String, + methodName: String, )( // Stuff We can apply at application scope - client: Client[F], baseUri: Uri + client: Client[F], + baseUri: Uri, )( // Stuff we apply at invocation - message: A, ctx: Headers + message: A, + ctx: Headers, ): Stream[F, B] = { val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`) - .putHeaders(SharedGrpc.TE, SharedGrpc.GrpcEncoding, SharedGrpc.GrpcAcceptEncoding, SharedGrpc.ContentType) - .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw):_*) + .putHeaders( + SharedGrpc.TE, + SharedGrpc.GrpcEncoding, + SharedGrpc.GrpcAcceptEncoding, + SharedGrpc.ContentType, + ) + .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*) .withBodyStream(codecs.Messages.encodeSingle(encode)(message)) .withAttribute(H2Keys.Http2PriorKnowledge, ()) - Stream.resource(client.run(req)).flatMap( resp => - Stream.eval(handleFailure(resp.headers)).drain ++ - codecs.Messages.decode[F, B](decode)(resp.body).handleErrorWith( - e => Stream.eval(resp.trailerHeaders.flatMap(handleFailure[F]).attempt.flatMap(t => t.as(e).merge.raiseError[F, B])) - ) ++ - Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain - ) + Stream + .resource(client.run(req)) + .flatMap(resp => + Stream.eval(handleFailure(resp.headers)).drain ++ + codecs.Messages + .decode[F, B](decode)(resp.body) + .handleErrorWith(e => + Stream.eval( + resp.trailerHeaders + .flatMap(handleFailure[F]) + .attempt + .flatMap(t => t.as(e).merge.raiseError[F, B]) + ) + ) ++ + Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain + ) } - def streamToUnary[F[_]: Concurrent, A, B](// Stuff We can provide via codegen - encode: Encoder[A], - decode: Decoder[B], - serviceName: String, - methodName: String, + def streamToUnary[F[_]: Concurrent, A, B]( // Stuff We can provide via codegen + encode: Encoder[A], + decode: Decoder[B], + serviceName: String, + methodName: String, )( // Stuff We can apply at application scope - client: Client[F], baseUri: Uri + client: Client[F], + baseUri: Uri, )( // Stuff we apply at invocation - message: Stream[F, A], ctx: Headers + message: Stream[F, A], + ctx: Headers, ): F[B] = { val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`) - .putHeaders(SharedGrpc.TE, SharedGrpc.GrpcEncoding, SharedGrpc.GrpcAcceptEncoding, SharedGrpc.ContentType) - .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw):_*) + .putHeaders( + SharedGrpc.TE, + SharedGrpc.GrpcEncoding, + SharedGrpc.GrpcAcceptEncoding, + SharedGrpc.ContentType, + ) + .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*) .withBodyStream(codecs.Messages.encode(encode)(message).mask) .withAttribute(H2Keys.Http2PriorKnowledge, ()) - client.run(req).use( resp => - handleFailure(resp.headers) >> - codecs.Messages.decodeSingle(decode)(resp.body).handleErrorWith( - e => resp.trailerHeaders.flatMap(handleFailure[F]).attempt.flatMap(t => t.as(e).merge.raiseError[F, B]) - ) <* - resp.trailerHeaders.flatMap(handleFailure[F]) - ) + client + .run(req) + .use(resp => + handleFailure(resp.headers) >> + codecs.Messages + .decodeSingle(decode)(resp.body) + .handleErrorWith(e => + resp.trailerHeaders + .flatMap(handleFailure[F]) + .attempt + .flatMap(t => t.as(e).merge.raiseError[F, B]) + ) <* + resp.trailerHeaders.flatMap(handleFailure[F]) + ) } - def streamToStream[F[_]: Concurrent, A, B](// Stuff We can provide via codegen - encode: Encoder[A], - decode: Decoder[B], - serviceName: String, - methodName: String, + def streamToStream[F[_]: Concurrent, A, B]( // Stuff We can provide via codegen + encode: Encoder[A], + decode: Decoder[B], + serviceName: String, + methodName: String, )( // Stuff We can apply at application scope - client: Client[F], baseUri: Uri + client: Client[F], + baseUri: Uri, )( // Stuff we apply at invocation - message: Stream[F, A], ctx: Headers + message: Stream[F, A], + ctx: Headers, ): Stream[F, B] = { val req = Request(Method.POST, baseUri / serviceName / methodName, HttpVersion.`HTTP/2`) - .putHeaders(SharedGrpc.TE, SharedGrpc.GrpcEncoding, SharedGrpc.GrpcAcceptEncoding, SharedGrpc.ContentType) - .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw):_*) + .putHeaders( + SharedGrpc.TE, + SharedGrpc.GrpcEncoding, + SharedGrpc.GrpcAcceptEncoding, + SharedGrpc.ContentType, + ) + .putHeaders(ctx.headers.map(Header.ToRaw.rawToRaw): _*) .withBodyStream(codecs.Messages.encode(encode)(message).mask) .withAttribute(H2Keys.Http2PriorKnowledge, ()) - - Stream.resource(client.run(req)).flatMap( resp => - - Stream.eval(handleFailure(resp.headers)).drain ++ - codecs.Messages.decode[F, B](decode)(resp.body).handleErrorWith( - e => Stream.eval(resp.trailerHeaders.flatMap(handleFailure[F]).attempt.flatMap(t => t.as(e).merge.raiseError[F, B])) - ) ++ - Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain - ) + Stream + .resource(client.run(req)) + .flatMap(resp => + Stream.eval(handleFailure(resp.headers)).drain ++ + codecs.Messages + .decode[F, B](decode)(resp.body) + .handleErrorWith(e => + Stream.eval( + resp.trailerHeaders + .flatMap(handleFailure[F]) + .attempt + .flatMap(t => t.as(e).merge.raiseError[F, B]) + ) + ) ++ + Stream.eval(resp.trailerHeaders).evalMap(handleFailure[F]).drain + ) } private def handleFailure[F[_]: MonadThrow](headers: Headers): F[Unit] = { @@ -126,6 +184,4 @@ object ClientGrpc { } } - } - diff --git a/core/src/main/scala/org/http4s/grpc/GrpcExceptions.scala b/core/src/main/scala/org/http4s/grpc/GrpcExceptions.scala index c8fd805..6dbef1c 100644 --- a/core/src/main/scala/org/http4s/grpc/GrpcExceptions.scala +++ b/core/src/main/scala/org/http4s/grpc/GrpcExceptions.scala @@ -1,9 +1,9 @@ package org.http4s.grpc object GrpcExceptions { - case class StatusRuntimeException(status: Int, message: Option[String]) - extends RuntimeException({ - val me = message.fold(""){(m: String) => s", Message-${m}"} - s"Grpc Failed: Status-$status${me}" - }) -} \ No newline at end of file + final case class StatusRuntimeException(status: Int, message: Option[String]) + extends RuntimeException({ + val me = message.fold("")((m: String) => s", Message-${m}") + s"Grpc Failed: Status-$status${me}" + }) +} diff --git a/core/src/main/scala/org/http4s/grpc/ServerGrpc.scala b/core/src/main/scala/org/http4s/grpc/ServerGrpc.scala index 92e6744..4abffbf 100644 --- a/core/src/main/scala/org/http4s/grpc/ServerGrpc.scala +++ b/core/src/main/scala/org/http4s/grpc/ServerGrpc.scala @@ -1,42 +1,45 @@ package org.http4s.grpc -import cats.syntax.all._ import cats.effect._ -import org.http4s._ -import scodec.{Encoder, Decoder} +import cats.syntax.all._ import fs2._ +import org.http4s._ import org.http4s.dsl.request._ +import org.http4s.grpc.codecs.NamedHeaders import org.http4s.headers.Trailer import org.typelevel.ci._ -import org.http4s.grpc.codecs.NamedHeaders -import scala.concurrent.duration._ +import scodec.Decoder +import scodec.Encoder + import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ object ServerGrpc { - def unaryToUnary[F[_]: Temporal, A, B](// Stuff We can provide via codegen\ - decode: Decoder[A], - encode: Encoder[B], - serviceName: String, - methodName: String, + def unaryToUnary[F[_]: Temporal, A, B]( // Stuff We can provide via codegen\ + decode: Decoder[A], + encode: Encoder[B], + serviceName: String, + methodName: String, )( // Stuff we apply at invocation - f: (A,Headers) => F[B] - ): HttpRoutes[F] = HttpRoutes.of[F]{ - case req@POST -> Root / sN / mN if sN === serviceName && mN === methodName => + f: (A, Headers) => F[B] + ): HttpRoutes[F] = HttpRoutes.of[F] { + case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName => for { status <- Ref.of[F, (Int, Option[String])]((0, Option.empty)) - trailers = status.get.map{case (i, message) => + trailers = status.get.map { case (i, message) => Headers( - NamedHeaders.GrpcStatus(i), + NamedHeaders.GrpcStatus(i) ).put(message.map(NamedHeaders.GrpcMessage(_))) } timeout = req.headers.get[NamedHeaders.GrpcTimeout] } yield { - val body = Stream.eval(codecs.Messages.decodeSingle(decode)(req.body)) + val body = Stream + .eval(codecs.Messages.decodeSingle(decode)(req.body)) .evalMap(f(_, req.headers)) .flatMap(codecs.Messages.encodeSingle(encode)(_)) .through(timeoutStream(_)(timeout.map(_.duration))) - .onFinalizeCase{ + .onFinalizeCase { case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None)) case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some)) case Resource.ExitCase.Canceled => status.set((1, None)) @@ -49,36 +52,37 @@ object ServerGrpc { Trailer(cats.data.NonEmptyList.of(CIString("grpc-status"))), SharedGrpc.ContentType, SharedGrpc.GrpcEncoding, - SharedGrpc.TE + SharedGrpc.TE, ) .withBodyStream(body) .withTrailerHeaders(trailers) } } - def unaryToStream[F[_]: Temporal, A, B](// Stuff We can provide via codegen\ - decode: Decoder[A], - encode: Encoder[B], - serviceName: String, - methodName: String, + def unaryToStream[F[_]: Temporal, A, B]( // Stuff We can provide via codegen\ + decode: Decoder[A], + encode: Encoder[B], + serviceName: String, + methodName: String, )( // Stuff we apply at invocation - f: (A,Headers) => Stream[F,B] - ): HttpRoutes[F] = HttpRoutes.of[F]{ - case req@POST -> Root / sN / mN if sN === serviceName && mN === methodName => + f: (A, Headers) => Stream[F, B] + ): HttpRoutes[F] = HttpRoutes.of[F] { + case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName => for { status <- Ref.of[F, (Int, Option[String])]((0, Option.empty)) - trailers = status.get.map{case (i, message) => + trailers = status.get.map { case (i, message) => Headers( - NamedHeaders.GrpcStatus(i), + NamedHeaders.GrpcStatus(i) ).put(message.map(NamedHeaders.GrpcMessage(_))) } timeout = req.headers.get[NamedHeaders.GrpcTimeout] } yield { - val body = Stream.eval(codecs.Messages.decodeSingle(decode)(req.body)) + val body = Stream + .eval(codecs.Messages.decodeSingle(decode)(req.body)) .flatMap(f(_, req.headers)) .through(codecs.Messages.encode(encode)) .through(timeoutStream(_)(timeout.map(_.duration))) - .onFinalizeCase{ + .onFinalizeCase { case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None)) case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some)) case Resource.ExitCase.Canceled => status.set((1, None)) @@ -90,36 +94,37 @@ object ServerGrpc { Trailer(cats.data.NonEmptyList.of(CIString("grpc-status"))), SharedGrpc.ContentType, SharedGrpc.GrpcEncoding, - SharedGrpc.TE + SharedGrpc.TE, ) .withBodyStream(body) .withTrailerHeaders(trailers) } } - def streamToUnary[F[_]: Temporal, A, B](// Stuff We can provide via codegen\ - decode: Decoder[A], - encode: Encoder[B], - serviceName: String, - methodName: String, + def streamToUnary[F[_]: Temporal, A, B]( // Stuff We can provide via codegen\ + decode: Decoder[A], + encode: Encoder[B], + serviceName: String, + methodName: String, )( // Stuff we apply at invocation - f: (Stream[F,A],Headers) => F[B] - ): HttpRoutes[F] = HttpRoutes.of[F]{ - case req@POST -> Root / sN / mN if sN === serviceName && mN === methodName => + f: (Stream[F, A], Headers) => F[B] + ): HttpRoutes[F] = HttpRoutes.of[F] { + case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName => for { status <- Ref.of[F, (Int, Option[String])]((0, Option.empty)) - trailers = status.get.map{case (i, message) => + trailers = status.get.map { case (i, message) => Headers( - NamedHeaders.GrpcStatus(i), + NamedHeaders.GrpcStatus(i) ).put(message.map(NamedHeaders.GrpcMessage(_))) } timeout = req.headers.get[NamedHeaders.GrpcTimeout] } yield { - val body = Stream.eval(f(codecs.Messages.decode(decode)(req.body), req.headers)) + val body = Stream + .eval(f(codecs.Messages.decode(decode)(req.body), req.headers)) .flatMap(codecs.Messages.encodeSingle(encode)(_)) .through(timeoutStream(_)(timeout.map(_.duration))) - .onFinalizeCase{ + .onFinalizeCase { case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None)) case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some)) case Resource.ExitCase.Canceled => status.set((1, None)) @@ -132,27 +137,27 @@ object ServerGrpc { Trailer(cats.data.NonEmptyList.of(CIString("grpc-status"))), SharedGrpc.ContentType, SharedGrpc.GrpcEncoding, - SharedGrpc.TE + SharedGrpc.TE, ) .withBodyStream(body) .withTrailerHeaders(trailers) } } - def streamToStream[F[_]: Temporal, A, B](// Stuff We can provide via codegen\ - decode: Decoder[A], - encode: Encoder[B], - serviceName: String, - methodName: String, + def streamToStream[F[_]: Temporal, A, B]( // Stuff We can provide via codegen\ + decode: Decoder[A], + encode: Encoder[B], + serviceName: String, + methodName: String, )( // Stuff we apply at invocation - f: (Stream[F, A],Headers) => Stream[F,B] - ): HttpRoutes[F] = HttpRoutes.of[F]{ - case req@POST -> Root / sN / mN if sN === serviceName && mN === methodName => + f: (Stream[F, A], Headers) => Stream[F, B] + ): HttpRoutes[F] = HttpRoutes.of[F] { + case req @ POST -> Root / sN / mN if sN === serviceName && mN === methodName => for { status <- Ref.of[F, (Int, Option[String])]((0, Option.empty)) - trailers = status.get.map{case (i, message) => + trailers = status.get.map { case (i, message) => Headers( - NamedHeaders.GrpcStatus(i), + NamedHeaders.GrpcStatus(i) ).put(message.map(NamedHeaders.GrpcMessage(_))) } timeout = req.headers.get[NamedHeaders.GrpcTimeout] @@ -161,7 +166,7 @@ object ServerGrpc { val body = f(codecs.Messages.decode(decode)(req.body), req.headers) .through(codecs.Messages.encode(encode)) .through(timeoutStream(_)(timeout.map(_.duration))) - .onFinalizeCase{ + .onFinalizeCase { case Resource.ExitCase.Errored(_: TimeoutException) => status.set((4, None)) case Resource.ExitCase.Errored(e) => status.set((2, e.toString().some)) case Resource.ExitCase.Canceled => status.set((1, None)) @@ -174,22 +179,23 @@ object ServerGrpc { Trailer(cats.data.NonEmptyList.of(CIString("grpc-status"))), SharedGrpc.ContentType, SharedGrpc.GrpcEncoding, - SharedGrpc.TE + SharedGrpc.TE, ) .withBodyStream(body) .withTrailerHeaders(trailers) } } - def methodNotFoundRoute[F[_]: Concurrent](serviceName: String) = HttpRoutes.of[F]{ + def methodNotFoundRoute[F[_]: Concurrent](serviceName: String) = HttpRoutes.of[F] { case POST -> Root / sN / mN if sN === serviceName => Response[F](Status.Ok, HttpVersion.`HTTP/2`) .putHeaders( SharedGrpc.ContentType, SharedGrpc.TE, NamedHeaders.GrpcStatus(12), - "grpc-message" -> s"unknown method $mN for service $sN" - ).pure[F] + "grpc-message" -> s"unknown method $mN for service $sN", + ) + .pure[F] } def closeGrpcRoutes[F[_]: Concurrent](req: Request[F]): F[Response[F]] = req match { @@ -199,31 +205,35 @@ object ServerGrpc { SharedGrpc.ContentType, SharedGrpc.TE, NamedHeaders.GrpcStatus(12), - "grpc-message" -> s"unknown service $sN" - ).pure[F] + "grpc-message" -> s"unknown service $sN", + ) + .pure[F] case other -> Root / _ => Response[F](Status.Ok, HttpVersion.`HTTP/2`) .putHeaders( SharedGrpc.ContentType, SharedGrpc.TE, NamedHeaders.GrpcStatus(12), - "grpc-message" -> s"unknown method $other" - ).pure[F] + "grpc-message" -> s"unknown method $other", + ) + .pure[F] case _ => Response[F](Status.Ok, HttpVersion.`HTTP/2`) .putHeaders( SharedGrpc.ContentType, SharedGrpc.TE, NamedHeaders.GrpcStatus(12), - "grpc-message" -> s"unknown request" - ).pure[F] + "grpc-message" -> s"unknown request", + ) + .pure[F] } - private def timeoutStream[F[_]: Temporal, A](s: Stream[F, A])(timeout: Option[FiniteDuration]): Stream[F, A] = { + private def timeoutStream[F[_]: Temporal, A]( + s: Stream[F, A] + )(timeout: Option[FiniteDuration]): Stream[F, A] = timeout match { - case None => s + case None => s case Some(value) => s.timeout(value) } - } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/http4s/grpc/SharedGrpc.scala b/core/src/main/scala/org/http4s/grpc/SharedGrpc.scala index cf259df..9ca49f6 100644 --- a/core/src/main/scala/org/http4s/grpc/SharedGrpc.scala +++ b/core/src/main/scala/org/http4s/grpc/SharedGrpc.scala @@ -1,17 +1,18 @@ package org.http4s.grpc +import org.http4s.Header import org.http4s.headers.`Content-Type` import org.typelevel.ci.CIString private object SharedGrpc { - val ContentType: `Content-Type` = org.http4s.headers.`Content-Type`.parse("application/grpc+proto") + val ContentType: `Content-Type` = org.http4s.headers.`Content-Type` + .parse("application/grpc+proto") .getOrElse(throw new Throwable("Impossible: This protocol is valid")) - // TODO Content-Coding → "identity" / "gzip" / "deflate" / "snappy" / {custom} - val GrpcEncoding = org.http4s.Header.Raw(CIString("grpc-encoding"), "identity") - val GrpcAcceptEncoding = org.http4s.Header.Raw(CIString("grpc-accept-encoding"), "identity") - val TE = org.http4s.Header.Raw(CIString("te"), "trailers") - + val GrpcEncoding: Header.Raw = org.http4s.Header.Raw(CIString("grpc-encoding"), "identity") + val GrpcAcceptEncoding: Header.Raw = + org.http4s.Header.Raw(CIString("grpc-accept-encoding"), "identity") + val TE: Header.Raw = org.http4s.Header.Raw(CIString("te"), "trailers") -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/http4s/grpc/codecs/LengthPrefixedMessage.scala b/core/src/main/scala/org/http4s/grpc/codecs/LengthPrefixedMessage.scala index 051c717..0dff55b 100644 --- a/core/src/main/scala/org/http4s/grpc/codecs/LengthPrefixedMessage.scala +++ b/core/src/main/scala/org/http4s/grpc/codecs/LengthPrefixedMessage.scala @@ -1,17 +1,18 @@ package org.http4s.grpc.codecs +import cats.syntax.all._ import scodec._ import scodec.bits._ import scodec.codecs._ -import cats.syntax.all._ -case class LengthPrefixedMessage(compressed: Boolean, message: ByteVector) +final case class LengthPrefixedMessage(compressed: Boolean, message: ByteVector) + object LengthPrefixedMessage { - val codec: scodec.Codec[LengthPrefixedMessage] = + val codec: scodec.Codec[LengthPrefixedMessage] = ( - uint8.xmap[Boolean](_ === 1, { case true => 1; case false => 0 }) :: - variableSizeBytesLong(uint32, bytes) + uint8.xmap[Boolean](_ === 1, { case true => 1; case false => 0 }) :: + variableSizeBytesLong(uint32, bytes) ).as[LengthPrefixedMessage] -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/http4s/grpc/codecs/Messages.scala b/core/src/main/scala/org/http4s/grpc/codecs/Messages.scala index e97be45..d7df3d6 100644 --- a/core/src/main/scala/org/http4s/grpc/codecs/Messages.scala +++ b/core/src/main/scala/org/http4s/grpc/codecs/Messages.scala @@ -1,56 +1,52 @@ package org.http4s.grpc.codecs import cats._ -import cats.syntax.all._ import cats.effect._ +import cats.syntax.all._ import fs2._ import scodec.Attempt object Messages { - def decode[F[_]: MonadThrow, A](d: scodec.Decoder[A])(s: Stream[F, Byte]): Stream[F, A] = { + def decode[F[_]: MonadThrow, A](d: scodec.Decoder[A])(s: Stream[F, Byte]): Stream[F, A] = decodeLPMStream(s) .through(decodeLPMThroughDecoder(d)) - } - def decodeSingle[F[_]: Concurrent, A](d: scodec.Decoder[A])(s: Stream[F, Byte]): F[A] = { + def decodeSingle[F[_]: Concurrent, A](d: scodec.Decoder[A])(s: Stream[F, Byte]): F[A] = decode(d)(s) .take(1) .compile .lastOrError - } - private def decodeLPMThroughDecoder[F[_]: MonadThrow, A](d: scodec.Decoder[A])(s: Stream[F, LengthPrefixedMessage]): Stream[F, A] = { - s.evalMap(lpm => - liftAttempt(d.decodeValue(lpm.message.bits)) - ) - } + private def decodeLPMThroughDecoder[F[_]: MonadThrow, A](d: scodec.Decoder[A])( + s: Stream[F, LengthPrefixedMessage] + ): Stream[F, A] = + s.evalMap(lpm => liftAttempt(d.decodeValue(lpm.message.bits))) - private def decodeLPMStream[F[_]: RaiseThrowable](s: Stream[F, Byte]): Stream[F, LengthPrefixedMessage] = { + private def decodeLPMStream[F[_]: RaiseThrowable]( + s: Stream[F, Byte] + ): Stream[F, LengthPrefixedMessage] = s.through(fs2.interop.scodec.StreamDecoder.many(LengthPrefixedMessage.codec).toPipeByte) - } - - def encode[F[_]: MonadThrow, A](e: scodec.Encoder[A])(s: Stream[F, A]): Stream[F, Byte] = { + def encode[F[_]: MonadThrow, A](e: scodec.Encoder[A])(s: Stream[F, A]): Stream[F, Byte] = s.through(encodeLPMThroughEncoder[F, A](e)) .through(encodeLPMStream[F]) - } - def encodeSingle[F[_]: MonadThrow, A](e: scodec.Encoder[A])(a: A): Stream[F, Byte] = { + def encodeSingle[F[_]: MonadThrow, A](e: scodec.Encoder[A])(a: A): Stream[F, Byte] = encode(e)(Stream(a).covary[F]) - } - private def encodeLPMThroughEncoder[F[_]: MonadThrow, A](e: scodec.Encoder[A])(s: Stream[F, A]): Stream[F, LengthPrefixedMessage] = { + private def encodeLPMThroughEncoder[F[_]: MonadThrow, A]( + e: scodec.Encoder[A] + )(s: Stream[F, A]): Stream[F, LengthPrefixedMessage] = s .evalMap(a => liftAttempt(e.encode(a))) .map(b => LengthPrefixedMessage(false, b.bytes)) - } - private def encodeLPMStream[F[_]: RaiseThrowable](s: Stream[F, LengthPrefixedMessage]): Stream[F, Byte] = { - s.through(fs2.interop.scodec.StreamEncoder.many(LengthPrefixedMessage.codec).toPipeByte) - } + private def encodeLPMStream[F[_]: RaiseThrowable]( + s: Stream[F, LengthPrefixedMessage] + ): Stream[F, Byte] = + s.through(fs2.interop.scodec.StreamEncoder.many(LengthPrefixedMessage.codec).toPipeByte) - private def liftAttempt[F[_]: MonadThrow, A](att: Attempt[A]): F[A] = { + private def liftAttempt[F[_]: MonadThrow, A](att: Attempt[A]): F[A] = att.toEither.leftMap(err => new RuntimeException(err.messageWithContext)).liftTo[F] - } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/http4s/grpc/codecs/NamedHeaders.scala b/core/src/main/scala/org/http4s/grpc/codecs/NamedHeaders.scala index d899050..f0be089 100644 --- a/core/src/main/scala/org/http4s/grpc/codecs/NamedHeaders.scala +++ b/core/src/main/scala/org/http4s/grpc/codecs/NamedHeaders.scala @@ -1,23 +1,24 @@ package org.http4s.grpc.codecs -import scala.concurrent.duration._ +import cats.parse.Parser import cats.syntax.all._ import org.http4s.Header +import org.http4s.ParseResult +import org.http4s.internal.parsing.CommonRules.ows +import org.http4s.parser.AdditionalRules import org.typelevel.ci.CIString -import cats.parse.Parser -import org.http4s.parser.AdditionalRules -import org.http4s.internal.parsing.CommonRules.ows -import org.http4s.ParseResult +import scala.concurrent.duration._ object NamedHeaders { - case class GrpcTimeout(duration: FiniteDuration) + final case class GrpcTimeout(duration: FiniteDuration) + object GrpcTimeout { private val parser = ( (AdditionalRules.NonNegativeLong <* ows) ~ - Parser.charIn('H', 'M', 'S', 'm', 'u', 'n').mapFilter(c => decodeTimeUnit(c.toString())) - ).map{ case (value, unit) => + Parser.charIn('H', 'M', 'S', 'm', 'u', 'n').mapFilter(c => decodeTimeUnit(c.toString())) + ).map { case (value, unit) => GrpcTimeout(FiniteDuration.apply(value, unit)) } @@ -29,7 +30,7 @@ object NamedHeaders { val out = value * x s"$out $unit" }, - (s: String) => ParseResult.fromParser(parser, "Invalid GrpcTimeout")(s) + (s: String) => ParseResult.fromParser(parser, "Invalid GrpcTimeout")(s), ) private def encodeTimeUnit(t: TimeUnit): (Int, String) = t match { @@ -42,7 +43,7 @@ object NamedHeaders { case DAYS => (24, "H") } - private def decodeTimeUnit(s: String): Option[TimeUnit] = s match { + private def decodeTimeUnit(s: String): Option[TimeUnit] = s match { case "H" => HOURS.some case "M" => MINUTES.some case "S" => SECONDS.some @@ -54,29 +55,27 @@ object NamedHeaders { } // https://grpc.github.io/grpc/core/md_doc_statuscodes.html - case class GrpcStatus(statusCode: Int) + final case class GrpcStatus(statusCode: Int) + object GrpcStatus { private val parser = cats.parse.Numbers.nonNegativeIntString.map(s => GrpcStatus(s.toInt)) implicit val header: Header[GrpcStatus, Header.Single] = Header.create( CIString("grpc-status"), - (t: GrpcStatus) => { - t.statusCode.toString() - }, - (s: String) => ParseResult.fromParser(parser, "Invalid GrpcStatus")(s) + (t: GrpcStatus) => t.statusCode.toString(), + (s: String) => ParseResult.fromParser(parser, "Invalid GrpcStatus")(s), ) } - case class GrpcMessage(message: String) + final case class GrpcMessage(message: String) + object GrpcMessage { implicit val header: Header[GrpcMessage, Header.Single] = Header.create( CIString("grpc-message"), - (t: GrpcMessage) => { - t.message - }, - (s: String) => ParseResult.success(GrpcMessage(s)) + (t: GrpcMessage) => t.message, + (s: String) => ParseResult.success(GrpcMessage(s)), ) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/http4s/grpc/codecs/ScalaPb.scala b/core/src/main/scala/org/http4s/grpc/codecs/ScalaPb.scala index e3dcd8a..c391cf3 100644 --- a/core/src/main/scala/org/http4s/grpc/codecs/ScalaPb.scala +++ b/core/src/main/scala/org/http4s/grpc/codecs/ScalaPb.scala @@ -1,27 +1,36 @@ package org.http4s.grpc.codecs import com.google.protobuf.ByteString -import scodec.{Encoder, Decoder, Attempt, Codec, DecodeResult} -import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper} -import scodec.bits.{ByteVector, BitVector} +import scalapb.GeneratedMessage +import scalapb.GeneratedMessageCompanion +import scalapb.TypeMapper +import scodec.Attempt +import scodec.Codec +import scodec.DecodeResult +import scodec.Decoder +import scodec.Encoder +import scodec.bits.BitVector +import scodec.bits.ByteVector // Should this be its own subproject? object ScalaPb { - private def encoderForGenerated[A <: GeneratedMessage](companion: GeneratedMessageCompanion[A]): Encoder[A] = { + private def encoderForGenerated[A <: GeneratedMessage]( + companion: GeneratedMessageCompanion[A] + ): Encoder[A] = Encoder[A]((a: A) => Attempt.successful(ByteVector.view(companion.toByteArray(a)).bits)) - } - private def decoderForGenerated[A <: GeneratedMessage](companion: GeneratedMessageCompanion[A]): Decoder[A] = { + private def decoderForGenerated[A <: GeneratedMessage]( + companion: GeneratedMessageCompanion[A] + ): Decoder[A] = Decoder[A]((b: BitVector) => - Attempt.fromTry(companion.validate(b.bytes.toArrayUnsafe)) + Attempt + .fromTry(companion.validate(b.bytes.toArrayUnsafe)) .map(a => DecodeResult(a, BitVector.empty)) ) - } - def codecForGenerated[A <: GeneratedMessage](companion: GeneratedMessageCompanion[A]): Codec[A] = { + def codecForGenerated[A <: GeneratedMessage](companion: GeneratedMessageCompanion[A]): Codec[A] = Codec[A](encoderForGenerated(companion), decoderForGenerated(companion)) - } implicit def byteVectorTypeMapper: TypeMapper[ByteString, ByteVector] = new TypeMapper[ByteString, ByteVector] { diff --git a/core/src/test/scala/org/http4s/grpc/MainSpec.scala b/core/src/test/scala/org/http4s/grpc/MainSpec.scala index 295a9d0..e198a17 100644 --- a/core/src/test/scala/org/http4s/grpc/MainSpec.scala +++ b/core/src/test/scala/org/http4s/grpc/MainSpec.scala @@ -1,12 +1,12 @@ package org.http4s.grpc -import munit.CatsEffectSuite import cats.effect._ +import munit.CatsEffectSuite class MainSpec extends CatsEffectSuite { test("Main should exit succesfully") { - assertEquals(ExitCode.Success, ExitCode.Success) + assertEquals(ExitCode.Success, ExitCode.Success) } } diff --git a/project/plugins.sbt b/project/plugins.sbt index 069c24a..41c8791 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,12 +1,12 @@ -addSbtPlugin("org.typelevel" % "sbt-typelevel-ci-release" % "0.6.7") -addSbtPlugin("org.typelevel" % "sbt-typelevel-site" % "0.6.7") -addSbtPlugin("org.typelevel" % "sbt-typelevel-settings" % "0.6.7") +addSbtPlugin("org.http4s" % "sbt-http4s-org" % "0.16.3") addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.14.0") -addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") +addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.16") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") -addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.7") // Because sbt-protoc-gen-project brings in 1.0.4 +addSbtPlugin( + "com.thesamet" % "sbt-protoc" % "1.0.7" +) // Because sbt-protoc-gen-project brings in 1.0.4 addSbtPlugin("com.thesamet" % "sbt-protoc-gen-project" % "0.1.8") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.14"