diff --git a/build.sbt b/build.sbt index 79099fff0..0a84e4506 100644 --- a/build.sbt +++ b/build.sbt @@ -184,6 +184,11 @@ lazy val zioHttp = crossProject(JSPlatform, JVMPlatform) .settings(meta) .settings(crossProjectSettings) .settings(Shading.shadingSettings()) + .settings( + autoCompilerPlugins := true, + libraryDependencies ++= unroll, + addCompilerPlugin("com.lihaoyi" %% "unroll-plugin" % "0.1.12") + ) .settings( libraryDependencies ++= { CrossVersion.partialVersion(scalaVersion.value) match { diff --git a/docs/reference/client.md b/docs/reference/client.md index e334de922..92b12f032 100644 --- a/docs/reference/client.md +++ b/docs/reference/client.md @@ -513,3 +513,15 @@ import utils._ printSource("zio-http-example/src/main/scala/example/WebSocketReconnectingClient.scala") ``` + +## Enabling Netty Internal Logging + +Netty's low-level, internal logging can be enabled by setting `ZClient.Config.enableNettyInternalLogging` to `true`. This can be useful for debugging and troubleshooting issues related to the underlying Netty client. + +You will also need to make sure that your preferred logging framework is configured to debug level for `zio.http.netty.InternalLogging`. + +Logback example: + +```xml + +``` \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 267b6f5c2..d84c1ec9e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,6 +19,8 @@ object Dependencies { val scalafmt = "org.scalameta" %% "scalafmt-dynamic" % "3.8.1" val scalametaParsers = "org.scalameta" %% "parsers" % "4.9.9" + val unroll = Seq("com.lihaoyi" %% "unroll-annotation" % "0.1.12","com.lihaoyi" %% "unroll-plugin" % "0.1.12" ) + val netty = Seq( "io.netty" % "netty-codec-http" % NettyVersion, diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/WebSocketAppHandler.scala b/zio-http/jvm/src/main/scala/zio/http/netty/WebSocketAppHandler.scala index 61d8a5dc8..aa9f9b473 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/WebSocketAppHandler.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/WebSocketAppHandler.scala @@ -37,7 +37,7 @@ private[zio] final class WebSocketAppHandler( zExec: NettyRuntime, queue: Queue[WebSocketChannelEvent], handshakeCompleted: Promise[Nothing, Boolean], - onComplete: Option[Promise[Throwable, ChannelState]], + callbacks: Option[Promise[Throwable, ChannelState]], )(implicit trace: Trace) extends SimpleChannelInboundHandler[JWebSocketFrame] { @@ -58,18 +58,13 @@ private[zio] final class WebSocketAppHandler( override def channelUnregistered(ctx: ChannelHandlerContext): Unit = { dispatch(ChannelEvent.unregistered) - onComplete match { - case Some(promise) => promise.unsafe.done(Exit.succeed(ChannelState.Invalid)) - case None => () - } + callbacks.foreach(_.unsafe.done(Exit.succeed(ChannelState.Invalid))) } override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { dispatch(ChannelEvent.exceptionCaught(cause)) - onComplete match { - case Some(promise) => promise.unsafe.done(Exit.fail(cause)) - case None => () - } + callbacks.foreach(_.unsafe.done(Exit.fail(cause))) + () } override def userEventTriggered(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientFailureHandler.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientFailureHandler.scala index 14cdeb848..72cdea922 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientFailureHandler.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientFailureHandler.scala @@ -7,7 +7,12 @@ import zio.http.internal.ChannelState import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} -/** Handles failures happening in ClientInboundHandler */ +/** + * Handles failures happening in ClientInboundHandler + * @deprecated + * exceptionCaught logic now lives inside + * `ClientInboundHandler.exceptionCaught`. + */ private[netty] final class ClientFailureHandler( onResponse: Promise[Throwable, Response], onComplete: Promise[Throwable, ChannelState], diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala index 1ca7994d4..1c6ec74b1 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala @@ -72,7 +72,9 @@ private[netty] final class ClientInboundHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, error: Throwable): Unit = { - ctx.fireExceptionCaught(error) + val exit = Exit.fail(error) + onResponse.unsafe.done(exit) + onComplete.unsafe.done(exit) () } } diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala index e12606b16..52503c537 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala @@ -16,20 +16,19 @@ package zio.http.netty.client +import scala.annotation.unroll import zio._ import zio.stacktracer.TracingImplicits.disableAutoTrace - import zio.http.ClientDriver.ChannelInterface import zio.http._ import zio.http.internal.ChannelState import zio.http.netty._ import zio.http.netty.model.Conversions import zio.http.netty.socket.NettySocketProtocol - -import io.netty.channel.{Channel, ChannelFactory, ChannelFuture, EventLoopGroup} -import io.netty.handler.codec.PrematureChannelClosureException +import io.netty.channel.{Channel, ChannelFactory, ChannelFuture, ChannelHandlerContext, ChannelInboundHandlerAdapter, EventLoopGroup} import io.netty.handler.codec.http.websocketx.{WebSocketClientProtocolHandler, WebSocketFrame => JWebSocketFrame} import io.netty.handler.codec.http.{FullHttpRequest, HttpObjectAggregator} +import io.netty.handler.logging.{LoggingHandler, LogLevel => NettyLogLevel} import io.netty.util.concurrent.GenericFutureListener final case class NettyClientDriver private[netty] ( @@ -49,11 +48,20 @@ final case class NettyClientDriver private[netty] ( enableKeepAlive: Boolean, createSocketApp: () => WebSocketApp[Any], webSocketConfig: WebSocketConfig, + @unroll enableInternalLogging: Boolean = false, )(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] = - if (location.scheme.isWebSocket) - requestWebsocket(channel, req, onResponse, onComplete, createSocketApp, webSocketConfig) - else - requestHttp(channel, req, onResponse, onComplete, enableKeepAlive) + if (location.scheme.isWebSocket) { + requestWebsocket( + channel, + req, + onResponse, + onComplete, + createSocketApp, + webSocketConfig, + enableInternalLogging, + ) + } else + requestHttp(channel, req, onResponse, onComplete, enableKeepAlive, enableInternalLogging) private def requestHttp( channel: Channel, @@ -61,6 +69,7 @@ final case class NettyClientDriver private[netty] ( onResponse: Promise[Throwable, Response], onComplete: Promise[Throwable, ChannelState], enableKeepAlive: Boolean, + enableInternalLogging: Boolean, )(implicit trace: Trace): RIO[Scope, ChannelInterface] = ZIO .succeed(NettyRequestEncoder.encode(req)) @@ -73,45 +82,32 @@ final case class NettyClientDriver private[netty] ( } } .map { jReq => - val closeListener: GenericFutureListener[ChannelFuture] = { (_: ChannelFuture) => - // If onComplete was already set, it means another fiber is already in the process of fulfilling the promises - // so we don't need to fulfill `onResponse` - nettyRuntime.unsafeRunSync { - onComplete.interrupt && onResponse.fail(NettyClientDriver.PrematureChannelClosure) - }(Unsafe.unsafe, trace): Unit - } - val pipeline = channel.pipeline() + if (enableInternalLogging) pipeline.addLast(makeLogHandler) + pipeline.addLast( Names.ClientInboundHandler, new ClientInboundHandler(nettyRuntime, req, jReq, onResponse, onComplete, enableKeepAlive), ) - pipeline.addLast( - Names.ClientFailureHandler, - new ClientFailureHandler(onResponse, onComplete), - ) - pipeline .fireChannelRegistered() .fireUserEventTriggered(ClientInboundHandler.SendRequest) - channel.closeFuture().addListener(closeListener) new ChannelInterface { override def resetChannel: ZIO[Any, Throwable, ChannelState] = { ZIO.attempt { - channel.closeFuture().removeListener(closeListener) pipeline.remove(Names.ClientInboundHandler) - pipeline.remove(Names.ClientFailureHandler) ChannelState.Reusable // channel can be reused } } override def interrupt: ZIO[Any, Throwable, Unit] = ZIO.suspendSucceed { - channel.closeFuture().removeListener(closeListener) - NettyFutureExecutor.executed(channel.disconnect()) + val error = new InterruptedException("Netty channel operation interrupted by ZIO Http.") + pipeline.remove(Names.ClientInboundHandler) + onResponse.fail(error) *> NettyFutureExecutor.executed(channel.disconnect()) } } } @@ -123,6 +119,7 @@ final case class NettyClientDriver private[netty] ( onComplete: Promise[Throwable, ChannelState], createSocketApp: () => WebSocketApp[Any], webSocketConfig: WebSocketConfig, + enableInternalLogging: Boolean, )(implicit trace: Trace): RIO[Scope, ChannelInterface] = { for { queue <- Queue.unbounded[WebSocketChannelEvent] @@ -147,10 +144,11 @@ final case class NettyClientDriver private[netty] ( .webSocketUri(req.url.encode) .build() - // Handles the heavy lifting required to upgrade the connection to a WebSocket connection + if (enableInternalLogging) pipeline.addLast(makeLogHandler) + // Handles the heavy lifting required to upgrade the connection to a WebSocket connection val webSocketClientProtocol = new WebSocketClientProtocolHandler(config) - val webSocket = new WebSocketAppHandler(nettyRuntime, queue, handshakeCompleted, Some(onComplete)) + val webSocket = new WebSocketAppHandler(nettyRuntime, queue, handshakeCompleted, Some(onComplete)) pipeline.addLast(Names.WebSocketClientProtocolHandler, webSocketClientProtocol) pipeline.addLast(Names.WebSocketHandler, webSocket) @@ -175,6 +173,8 @@ final case class NettyClientDriver private[netty] ( NettyConnectionPool .fromConfig(config) .provideSomeEnvironment[Scope](_ ++ ZEnvironment[NettyClientDriver, DnsResolver](this, dnsResolver)) + + private def makeLogHandler = new LoggingHandler("zio.http.netty.InternalLogging", NettyLogLevel.DEBUG) } object NettyClientDriver { @@ -190,8 +190,4 @@ object NettyClientDriver { } yield NettyClientDriver(channelFactory, eventLoopGroup, nettyRuntime) } - private val PrematureChannelClosure = new PrematureChannelClosureException( - "Channel closed while executing the request. This is likely caused due to a client connection misconfiguration", - ) - } diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala index 147f38bac..aa7c0df7a 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyConnectionPool.scala @@ -168,7 +168,11 @@ private[netty] object NettyConnectionPool { override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { cause match { - case _: ReadTimeoutException => nettyRuntime.unsafeRunSync(ZIO.logDebug("ReadTimeoutException caught")) + case _: ReadTimeoutException => + nettyRuntime.unsafeRunSync(ZIO.logDebug("ReadTimeoutException caught")) + // We need to propagate the exception to the next handler in the pipeline as subsequent error + // handlers will need to clean up resources. + super.exceptionCaught(ctx, cause) case _ => super.exceptionCaught(ctx, cause) } } diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala index e58e21397..c22164268 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala @@ -16,13 +16,10 @@ package zio.http.netty.client -import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{Exit, Promise, Unsafe} - +import zio._ import zio.http.Response import zio.http.internal.ChannelState import zio.http.netty.NettyResponse - import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.handler.codec.http.FullHttpResponse diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/package.scala b/zio-http/jvm/src/main/scala/zio/http/netty/package.scala index c56d8b1b9..c04374a35 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/package.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/package.scala @@ -33,7 +33,6 @@ package object netty { val HttpServerExpectContinue = "HTTP_SERVER_EXPECT_CONTINUE" val HttpServerFlushConsolidation = "HTTP_SERVER_FLUSH_CONSOLIDATION" val ClientInboundHandler = "CLIENT_INBOUND_HANDLER" - val ClientFailureHandler = "CLIENT_FAILURE_HANDLER" val ClientReadTimeoutErrorHandler = "CLIENT_READ_TIMEOUT_ERROR_HANDLER" val ClientStreamingBodyHandler = "CLIENT_STREAMING_BODY_HANDLER" val WebSocketClientProtocolHandler = "WEB_SOCKET_CLIENT_PROTOCOL_HANDLER" diff --git a/zio-http/jvm/src/test/scala/zio/http/SSLSpec.scala b/zio-http/jvm/src/test/scala/zio/http/SSLSpec.scala index 713215ecd..e889a0c09 100644 --- a/zio-http/jvm/src/test/scala/zio/http/SSLSpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/SSLSpec.scala @@ -16,11 +16,11 @@ package zio.http -import zio.test.Assertion.equalTo +import io.netty.handler.codec.DecoderException +import zio.test.Assertion.{anything, equalTo} import zio.test.TestAspect.withLiveClock -import zio.test.{Gen, assertCompletes, assertNever, assertZIO} +import zio.test._ import zio.{Scope, ZLayer} - import zio.http.netty.NettyConfig import zio.http.netty.client.NettyClientDriver @@ -63,18 +63,14 @@ object SSLSpec extends ZIOHttpSpec { DnsResolver.default, ZLayer.succeed(NettyConfig.defaultWithFastShutdown), ), - // Unfortunately if the channel closes before we create the request, we can't extract the DecoderException test( - "fail with DecoderException or PrematureChannelClosureException when client doesn't have the server certificate", + "fail with DecoderException when client doesn't have the server certificate", ) { Client .batched(Request.get(httpsUrl)) .fold( { e => - val expectedErrors = List("DecoderException", "PrematureChannelClosureException") - val errorType = e.getClass.getSimpleName - if (expectedErrors.contains(errorType)) assertCompletes - else assertNever(s"request failed with unexpected error type: $errorType") + assert(e)(Assertion.isSubtype[DecoderException](anything)) }, _ => assertNever("expected request to fail"), ) diff --git a/zio-http/shared/src/main/scala/zio/http/ClientDriver.scala b/zio-http/shared/src/main/scala/zio/http/ClientDriver.scala index e60bdb736..70515e25e 100644 --- a/zio-http/shared/src/main/scala/zio/http/ClientDriver.scala +++ b/zio-http/shared/src/main/scala/zio/http/ClientDriver.scala @@ -16,8 +16,10 @@ package zio.http +import scala.annotation.unroll + +import zio._ import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{Promise, Scope, Trace, ZIO, ZLayer} import zio.http.ClientDriver.ChannelInterface import zio.http.internal.ChannelState @@ -34,6 +36,7 @@ trait ClientDriver { enableKeepAlive: Boolean, createSocketApp: () => WebSocketApp[Any], webSocketConfig: WebSocketConfig, + @unroll enableInternalLogging: Boolean = false, )(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] def createConnectionPool(dnsResolver: DnsResolver, config: ConnectionPoolConfig)(implicit diff --git a/zio-http/shared/src/main/scala/zio/http/ZClient.scala b/zio-http/shared/src/main/scala/zio/http/ZClient.scala index ce6d47b33..456aac7c2 100644 --- a/zio-http/shared/src/main/scala/zio/http/ZClient.scala +++ b/zio-http/shared/src/main/scala/zio/http/ZClient.scala @@ -18,6 +18,8 @@ package zio.http import java.net.{InetSocketAddress, URI} +import scala.annotation.unroll + import zio._ import zio.stacktracer.TracingImplicits.disableAutoTrace @@ -538,6 +540,7 @@ object ZClient extends ZClientPlatformSpecific { webSocketConfig: WebSocketConfig, idleTimeout: Option[Duration], connectionTimeout: Option[Duration], + @unroll enableInternalLogging: Boolean = false, ) { self => @@ -593,7 +596,8 @@ object ZClient extends ZClientPlatformSpecific { Decompression.config.nested("request-decompression").withDefault(Config.default.requestDecompression) ++ zio.Config.boolean("add-user-agent-header").withDefault(Config.default.addUserAgentHeader) ++ zio.Config.duration("idle-timeout").optional.withDefault(Config.default.idleTimeout) ++ - zio.Config.duration("connection-timeout").optional.withDefault(Config.default.connectionTimeout) + zio.Config.duration("connection-timeout").optional.withDefault(Config.default.connectionTimeout) ++ + zio.Config.boolean("enable-internal-logging").withDefault(Config.default.enableInternalLogging) ).map { case ( ssl, @@ -605,6 +609,7 @@ object ZClient extends ZClientPlatformSpecific { addUserAgentHeader, idleTimeout, connectionTimeout, + enableInternalLogging, ) => default.copy( ssl = ssl, @@ -616,6 +621,7 @@ object ZClient extends ZClientPlatformSpecific { addUserAgentHeader = addUserAgentHeader, idleTimeout = idleTimeout, connectionTimeout = connectionTimeout, + enableInternalLogging = enableInternalLogging, ) } @@ -631,6 +637,7 @@ object ZClient extends ZClientPlatformSpecific { webSocketConfig = WebSocketConfig.default, idleTimeout = Some(50.seconds), connectionTimeout = None, + enableInternalLogging = false, ) } @@ -734,6 +741,7 @@ object ZClient extends ZClientPlatformSpecific { connectionPool.enableKeepAlive, createSocketApp, clientConfig.webSocketConfig, + clientConfig.enableInternalLogging, ) .tapErrorCause(cause => onResponse.failCause(cause)) _ <-