Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Netty Logging and Error Reporting. #3261

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
90bad54
Better Netty Logging and Error Reporting.
scottweaver Dec 16, 2024
78e1667
Formatting and Scala Fix
scottweaver Dec 19, 2024
145118f
Merge branch 'main' into netty-internal-logging-support
987Nabil Dec 23, 2024
e080d81
Merge branch 'main' into netty-internal-logging-support
987Nabil Dec 23, 2024
451499d
PR Change: 'ZIO-succeed' -> 'Exit.succeed' for non-effectful completi…
scottweaver Jan 2, 2025
4835c6a
Merge branch 'zio:main' into netty-internal-logging-support
scottweaver Jan 5, 2025
52c3203
Better Netty Logging and Error Reporting.
scottweaver Dec 16, 2024
ed5d43f
Formatting and Scala Fix
scottweaver Dec 19, 2024
ace288c
PR Change: 'ZIO-succeed' -> 'Exit.succeed' for non-effectful completi…
scottweaver Jan 2, 2025
1f79fb3
Merge remote-tracking branch 'Scott/netty-internal-logging-support' i…
scottweaver Jan 5, 2025
6836d3c
PR Task: Add unroll compiler plugin and modify method signatures to m…
scottweaver Jan 6, 2025
a6f0b04
Remove unused method 'Dependencies#compilerPlugins'.
scottweaver Jan 6, 2025
d64792a
Make error handling safer.
scottweaver Jan 9, 2025
04cb97a
Fixed unroll and bin compat.
scottweaver Jan 9, 2025
0e4b21c
Formatting and Scala Fix
scottweaver Dec 19, 2024
d9dfe9b
PR Task: Add unroll compiler plugin and modify method signatures to m…
scottweaver Jan 6, 2025
49525d0
Remove unused method 'Dependencies#compilerPlugins'.
scottweaver Jan 6, 2025
fe9e187
Make error handling safer.
scottweaver Jan 9, 2025
16e3364
Fixed unroll and bin compat.
scottweaver Jan 9, 2025
bceeee4
'onFailure' no longer needed.
scottweaver Jan 14, 2025
4ea2772
Merge remote-tracking branch 'Scott/netty-internal-logging-support' i…
scottweaver Jan 14, 2025
56c0b5d
ReadTimeoutErrorHandler now forwards all excpetions.
scottweaver Jan 14, 2025
4062aff
'onFailure' removed from ZClient.
scottweaver Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ lazy val zioHttp = crossProject(JSPlatform, JVMPlatform)
.settings(meta)
.settings(crossProjectSettings)
.settings(Shading.shadingSettings())
.settings(
autoCompilerPlugins := true,
libraryDependencies ++= unroll,
addCompilerPlugin("com.lihaoyi" %% "unroll-plugin" % "0.1.12")
)
.settings(
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
Expand Down
12 changes: 12 additions & 0 deletions docs/reference/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,15 @@ import utils._

printSource("zio-http-example/src/main/scala/example/WebSocketReconnectingClient.scala")
```

## Enabling Netty Internal Logging

Netty's low-level, internal logging can be enabled by setting `ZClient.Config.enableNettyInternalLogging` to `true`. This can be useful for debugging and troubleshooting issues related to the underlying Netty client.

You will also need to make sure that your preferred logging framework is configured to debug level for `zio.http.netty.InternalLogging`.

Logback example:

```xml
<logger name="zio.http.netty.InternalLogging" level="DEBUG" additivity="false" />
```
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ object Dependencies {
val scalafmt = "org.scalameta" %% "scalafmt-dynamic" % "3.8.1"
val scalametaParsers = "org.scalameta" %% "parsers" % "4.9.9"

val unroll = Seq("com.lihaoyi" %% "unroll-annotation" % "0.1.12","com.lihaoyi" %% "unroll-plugin" % "0.1.12" )

val netty =
Seq(
"io.netty" % "netty-codec-http" % NettyVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[zio] final class WebSocketAppHandler(
zExec: NettyRuntime,
queue: Queue[WebSocketChannelEvent],
handshakeCompleted: Promise[Nothing, Boolean],
onComplete: Option[Promise[Throwable, ChannelState]],
callbacks: Option[(Promise[Throwable, ChannelState], Promise[Nothing, Throwable])],
)(implicit trace: Trace)
extends SimpleChannelInboundHandler[JWebSocketFrame] {

Expand All @@ -58,17 +58,19 @@ private[zio] final class WebSocketAppHandler(

override def channelUnregistered(ctx: ChannelHandlerContext): Unit = {
dispatch(ChannelEvent.unregistered)
onComplete match {
case Some(promise) => promise.unsafe.done(Exit.succeed(ChannelState.Invalid))
case None => ()
callbacks match {
case Some((onComplete, _)) => onComplete.unsafe.done(Exit.succeed(ChannelState.Invalid))
case None => ()
}
}

override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
dispatch(ChannelEvent.exceptionCaught(cause))
onComplete match {
case Some(promise) => promise.unsafe.done(Exit.fail(cause))
case None => ()
callbacks match {
case Some((onComplete, onFailure)) =>
onComplete.unsafe.done(Exit.fail(cause))
onFailure.unsafe.done(Exit.succeed(cause))
case None => ()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import zio.http.internal.ChannelState

import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}

/** Handles failures happening in ClientInboundHandler */
/**
* Handles failures happening in ClientInboundHandler
* @deprecated
* exceptionCaught logic now lives inside
* `ClientInboundHandler.exceptionCaught`.
*/
private[netty] final class ClientFailureHandler(
onResponse: Promise[Throwable, Response],
onComplete: Promise[Throwable, ChannelState],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[netty] final class ClientInboundHandler(
jReq: HttpRequest,
onResponse: Promise[Throwable, Response],
onComplete: Promise[Throwable, ChannelState],
onFailure: Promise[Nothing, Throwable],
scottweaver marked this conversation as resolved.
Show resolved Hide resolved
enableKeepAlive: Boolean,
)(implicit trace: Trace)
extends SimpleChannelInboundHandler[HttpObject](false) {
Expand Down Expand Up @@ -72,8 +73,10 @@ private[netty] final class ClientInboundHandler(
}

override def exceptionCaught(ctx: ChannelHandlerContext, error: Throwable): Unit = {
ctx.fireExceptionCaught(error)
()
val exit = Exit.fail(error)
onResponse.unsafe.done(exit)
onComplete.unsafe.done(exit)
onFailure.unsafe.done(Exit.succeed(error))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package zio.http.netty.client

import scala.annotation.unroll

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

Expand All @@ -27,9 +29,9 @@ import zio.http.netty.model.Conversions
import zio.http.netty.socket.NettySocketProtocol

import io.netty.channel.{Channel, ChannelFactory, ChannelFuture, EventLoopGroup}
import io.netty.handler.codec.PrematureChannelClosureException
import io.netty.handler.codec.http.websocketx.{WebSocketClientProtocolHandler, WebSocketFrame => JWebSocketFrame}
import io.netty.handler.codec.http.{FullHttpRequest, HttpObjectAggregator}
import io.netty.handler.logging.{LogLevel => NettyLogLevel, LoggingHandler}
import io.netty.util.concurrent.GenericFutureListener

final case class NettyClientDriver private[netty] (
Expand All @@ -49,18 +51,34 @@ final case class NettyClientDriver private[netty] (
enableKeepAlive: Boolean,
createSocketApp: () => WebSocketApp[Any],
webSocketConfig: WebSocketConfig,
@unroll enableInternalLogging: Boolean = false,
// Why this abomination? So we can provide a binary-compatible addition to this method.
onFailure: Trace => UIO[Promise[Nothing, Throwable]] = { (t: Trace) => Promise.make[Nothing, Throwable](t) },
)(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] =
if (location.scheme.isWebSocket)
requestWebsocket(channel, req, onResponse, onComplete, createSocketApp, webSocketConfig)
else
requestHttp(channel, req, onResponse, onComplete, enableKeepAlive)
onFailure(trace).flatMap {
if (location.scheme.isWebSocket) {
requestWebsocket(
channel,
req,
onResponse,
onComplete,
createSocketApp,
webSocketConfig,
enableInternalLogging,
_,
)
} else
requestHttp(channel, req, onResponse, onComplete, enableKeepAlive, enableInternalLogging, _)
}

private def requestHttp(
channel: Channel,
req: Request,
onResponse: Promise[Throwable, Response],
onComplete: Promise[Throwable, ChannelState],
enableKeepAlive: Boolean,
enableInternalLogging: Boolean,
onFailure: Promise[Nothing, Throwable],
)(implicit trace: Trace): RIO[Scope, ChannelInterface] =
ZIO
.succeed(NettyRequestEncoder.encode(req))
Expand All @@ -73,45 +91,31 @@ final case class NettyClientDriver private[netty] (
}
}
.map { jReq =>
val closeListener: GenericFutureListener[ChannelFuture] = { (_: ChannelFuture) =>
// If onComplete was already set, it means another fiber is already in the process of fulfilling the promises
// so we don't need to fulfill `onResponse`
nettyRuntime.unsafeRunSync {
onComplete.interrupt && onResponse.fail(NettyClientDriver.PrematureChannelClosure)
}(Unsafe.unsafe, trace): Unit
}
scottweaver marked this conversation as resolved.
Show resolved Hide resolved

val pipeline = channel.pipeline()

pipeline.addLast(
Names.ClientInboundHandler,
new ClientInboundHandler(nettyRuntime, req, jReq, onResponse, onComplete, enableKeepAlive),
)
if (enableInternalLogging) pipeline.addLast(makeLogHandler)

pipeline.addLast(
Names.ClientFailureHandler,
new ClientFailureHandler(onResponse, onComplete),
Names.ClientInboundHandler,
new ClientInboundHandler(nettyRuntime, req, jReq, onResponse, onComplete, onFailure, enableKeepAlive),
)

pipeline
.fireChannelRegistered()
.fireUserEventTriggered(ClientInboundHandler.SendRequest)

channel.closeFuture().addListener(closeListener)
new ChannelInterface {
override def resetChannel: ZIO[Any, Throwable, ChannelState] = {
ZIO.attempt {
channel.closeFuture().removeListener(closeListener)
pipeline.remove(Names.ClientInboundHandler)
pipeline.remove(Names.ClientFailureHandler)
ChannelState.Reusable // channel can be reused
}
}

override def interrupt: ZIO[Any, Throwable, Unit] =
ZIO.suspendSucceed {
channel.closeFuture().removeListener(closeListener)
NettyFutureExecutor.executed(channel.disconnect())
val error = new InterruptedException("Netty channel operation interrupted by ZIO Http.")
onResponse.fail(error) *> onFailure.succeed(error) *> NettyFutureExecutor.executed(channel.disconnect())
}
}
}
Expand All @@ -123,6 +127,8 @@ final case class NettyClientDriver private[netty] (
onComplete: Promise[Throwable, ChannelState],
createSocketApp: () => WebSocketApp[Any],
webSocketConfig: WebSocketConfig,
enableInternalLogging: Boolean,
onFailure: Promise[Nothing, Throwable],
)(implicit trace: Trace): RIO[Scope, ChannelInterface] = {
for {
queue <- Queue.unbounded[WebSocketChannelEvent]
Expand All @@ -135,7 +141,7 @@ final case class NettyClientDriver private[netty] (
val pipeline = channel.pipeline()

val httpObjectAggregator = new HttpObjectAggregator(Int.MaxValue)
val inboundHandler = new WebSocketClientInboundHandler(onResponse, onComplete)
val inboundHandler = new WebSocketClientInboundHandler(onResponse, onComplete, onFailure)

pipeline.addLast(Names.HttpObjectAggregator, httpObjectAggregator)
pipeline.addLast(Names.ClientInboundHandler, inboundHandler)
Expand All @@ -147,10 +153,11 @@ final case class NettyClientDriver private[netty] (
.webSocketUri(req.url.encode)
.build()

// Handles the heavy lifting required to upgrade the connection to a WebSocket connection
if (enableInternalLogging) pipeline.addLast(makeLogHandler)

// Handles the heavy lifting required to upgrade the connection to a WebSocket connection
val webSocketClientProtocol = new WebSocketClientProtocolHandler(config)
val webSocket = new WebSocketAppHandler(nettyRuntime, queue, handshakeCompleted, Some(onComplete))
val webSocket = new WebSocketAppHandler(nettyRuntime, queue, handshakeCompleted, Some((onComplete, onFailure)))

pipeline.addLast(Names.WebSocketClientProtocolHandler, webSocketClientProtocol)
pipeline.addLast(Names.WebSocketHandler, webSocket)
Expand All @@ -175,6 +182,8 @@ final case class NettyClientDriver private[netty] (
NettyConnectionPool
.fromConfig(config)
.provideSomeEnvironment[Scope](_ ++ ZEnvironment[NettyClientDriver, DnsResolver](this, dnsResolver))

private def makeLogHandler = new LoggingHandler("zio.http.netty.InternalLogging", NettyLogLevel.DEBUG)
}

object NettyClientDriver {
Expand All @@ -190,8 +199,4 @@ object NettyClientDriver {
} yield NettyClientDriver(channelFactory, eventLoopGroup, nettyRuntime)
}

private val PrematureChannelClosure = new PrematureChannelClosureException(
"Channel closed while executing the request. This is likely caused due to a client connection misconfiguration",
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package zio.http.netty.client

import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Exit, Promise, Unsafe}
import zio._

import zio.http.Response
import zio.http.internal.ChannelState
Expand All @@ -29,6 +28,7 @@ import io.netty.handler.codec.http.FullHttpResponse
private[netty] final class WebSocketClientInboundHandler(
onResponse: Promise[Throwable, Response],
onComplete: Promise[Throwable, ChannelState],
onFailure: Promise[Nothing, Throwable],
) extends SimpleChannelInboundHandler[FullHttpResponse](true) {
implicit private val unsafeClass: Unsafe = Unsafe.unsafe

Expand All @@ -45,5 +45,6 @@ private[netty] final class WebSocketClientInboundHandler(
val exit = Exit.fail(error)
onResponse.unsafe.done(exit)
onComplete.unsafe.done(exit)
onFailure.unsafe.done(Exit.succeed(error))
}
}
1 change: 0 additions & 1 deletion zio-http/jvm/src/main/scala/zio/http/netty/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ package object netty {
val HttpServerExpectContinue = "HTTP_SERVER_EXPECT_CONTINUE"
val HttpServerFlushConsolidation = "HTTP_SERVER_FLUSH_CONSOLIDATION"
val ClientInboundHandler = "CLIENT_INBOUND_HANDLER"
val ClientFailureHandler = "CLIENT_FAILURE_HANDLER"
val ClientReadTimeoutErrorHandler = "CLIENT_READ_TIMEOUT_ERROR_HANDLER"
val ClientStreamingBodyHandler = "CLIENT_STREAMING_BODY_HANDLER"
val WebSocketClientProtocolHandler = "WEB_SOCKET_CLIENT_PROTOCOL_HANDLER"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package zio.http

import scala.annotation.unroll

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Promise, Scope, Trace, ZIO, ZLayer}

import zio.http.ClientDriver.ChannelInterface
import zio.http.internal.ChannelState
Expand All @@ -34,6 +36,9 @@ trait ClientDriver {
enableKeepAlive: Boolean,
createSocketApp: () => WebSocketApp[Any],
webSocketConfig: WebSocketConfig,
@unroll enableInternalLogging: Boolean = false,
// Why this abomination? So we can provide a binary-compatible addition to this method.
onFailure: Trace => UIO[Promise[Nothing, Throwable]] = { (t: Trace) => Promise.make[Nothing, Throwable](t) },
)(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface]

def createConnectionPool(dnsResolver: DnsResolver, config: ConnectionPoolConfig)(implicit
Expand Down
12 changes: 11 additions & 1 deletion zio-http/shared/src/main/scala/zio/http/ZClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package zio.http

import java.net.{InetSocketAddress, URI}

import scala.annotation.unroll

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

Expand Down Expand Up @@ -538,6 +540,7 @@ object ZClient extends ZClientPlatformSpecific {
webSocketConfig: WebSocketConfig,
idleTimeout: Option[Duration],
connectionTimeout: Option[Duration],
@unroll enableInternalLogging: Boolean = false,
) {
self =>

Expand Down Expand Up @@ -593,7 +596,8 @@ object ZClient extends ZClientPlatformSpecific {
Decompression.config.nested("request-decompression").withDefault(Config.default.requestDecompression) ++
zio.Config.boolean("add-user-agent-header").withDefault(Config.default.addUserAgentHeader) ++
zio.Config.duration("idle-timeout").optional.withDefault(Config.default.idleTimeout) ++
zio.Config.duration("connection-timeout").optional.withDefault(Config.default.connectionTimeout)
zio.Config.duration("connection-timeout").optional.withDefault(Config.default.connectionTimeout) ++
zio.Config.boolean("enable-internal-logging").withDefault(Config.default.enableInternalLogging)
).map {
case (
ssl,
Expand All @@ -605,6 +609,7 @@ object ZClient extends ZClientPlatformSpecific {
addUserAgentHeader,
idleTimeout,
connectionTimeout,
enableInternalLogging,
) =>
default.copy(
ssl = ssl,
Expand All @@ -616,6 +621,7 @@ object ZClient extends ZClientPlatformSpecific {
addUserAgentHeader = addUserAgentHeader,
idleTimeout = idleTimeout,
connectionTimeout = connectionTimeout,
enableInternalLogging = enableInternalLogging,
)
}

Expand All @@ -631,6 +637,7 @@ object ZClient extends ZClientPlatformSpecific {
webSocketConfig = WebSocketConfig.default,
idleTimeout = Some(50.seconds),
connectionTimeout = None,
enableInternalLogging = false,
)
}

Expand Down Expand Up @@ -700,6 +707,7 @@ object ZClient extends ZClientPlatformSpecific {
connectionAcquired <- Ref.make(false)
onComplete <- Promise.make[Throwable, ChannelState]
onResponse <- Promise.make[Throwable, Response]
onFailure <- Promise.make[Nothing, Throwable]
inChannelScope = outerScope match {
case Some(scope) => (zio: ZIO[Scope, Throwable, Unit]) => scope.extend(zio)
case None => (zio: ZIO[Scope, Throwable, Unit]) => ZIO.scoped(zio)
Expand Down Expand Up @@ -734,6 +742,8 @@ object ZClient extends ZClientPlatformSpecific {
connectionPool.enableKeepAlive,
createSocketApp,
clientConfig.webSocketConfig,
clientConfig.enableInternalLogging,
_ => ZIO.succeed(onFailure),
)
.tapErrorCause(cause => onResponse.failCause(cause))
_ <-
Expand Down
Loading