Skip to content

Commit

Permalink
Explicitly return 408 when timeout is hit
Browse files Browse the repository at this point in the history
Previously, we would return 503 Service Unavailable, suggesting that failures
should not be retried and leading to confusion with early timeout being hit.
Now, we return 408 Request Timeout which is more explicit and easier to monitor.
  • Loading branch information
peel committed Aug 21, 2024
1 parent f2566b4 commit 22f2204
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ object Config {
responseHeaderTimeout: FiniteDuration,
bodyReadTimeout: FiniteDuration,
maxRequestLineLength: Int,
maxHeadersLength: Int
maxHeadersLength: Int,
maxPayloadSize: Long
)

case class License(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.net.InetSocketAddress
import javax.net.ssl.SSLContext
import org.http4s.Response
import org.http4s.Status

object HttpServer {

Expand All @@ -40,12 +42,46 @@ object HttpServer {
networking: Config.Networking,
metricsConfig: Config.Metrics,
debugHttp: Config.Debug.Http
)(
mkServer: ((HttpApp[F], Int, Boolean, Config.Networking) => Resource[F, Server])
): Resource[F, Server] =
for {
withMetricsMiddleware <- createMetricsMiddleware(routes, metricsConfig)
server <- buildBlazeServer[F](withMetricsMiddleware, healthRoutes, port, secure, hsts, networking, debugHttp)
httpApp <- Resource.pure(httpApp(withMetricsMiddleware, healthRoutes, hsts, networking, debugHttp))
server <- mkServer(httpApp, port, secure, networking)
} yield server

def buildBlazeServer[F[_]: Async](
httpApp: HttpApp[F],
port: Int,
secure: Boolean,
networking: Config.Networking
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(port))
.withHttpApp(httpApp)
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.withResponseHeaderTimeout(networking.responseHeaderTimeout)
.withLengthLimits(
maxRequestLineLen = networking.maxRequestLineLength,
maxHeadersLen = networking.maxHeadersLength
)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.resource

def httpApp[F[_]: Async](
routes: HttpRoutes[F],
healthRoutes: HttpRoutes[F],
hsts: Config.HSTS,
networking: Config.Networking,
debugHttp: Config.Debug.Http
): HttpApp[F] = hstsApp(
hsts,
loggerMiddleware(timeoutMiddleware(routes, networking), debugHttp) <+> loggerMiddleware(healthRoutes, debugHttp)
)

private def createMetricsMiddleware[F[_]: Async](
routes: HttpRoutes[F],
metricsConfig: Config.Metrics
Expand Down Expand Up @@ -81,36 +117,10 @@ object HttpServer {
} else routes

private def timeoutMiddleware[F[_]: Async](routes: HttpRoutes[F], networking: Config.Networking): HttpRoutes[F] =
Timeout.httpRoutes[F](timeout = networking.responseHeaderTimeout)(routes)

private def buildBlazeServer[F[_]: Async](
routes: HttpRoutes[F],
healthRoutes: HttpRoutes[F],
port: Int,
secure: Boolean,
hsts: Config.HSTS,
networking: Config.Networking,
debugHttp: Config.Debug.Http
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(port))
.withHttpApp(
hstsApp(
hsts,
loggerMiddleware(timeoutMiddleware(routes, networking), debugHttp)
<+> loggerMiddleware(healthRoutes, debugHttp)
)
)
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.withResponseHeaderTimeout(networking.responseHeaderTimeout)
.withLengthLimits(
maxRequestLineLen = networking.maxRequestLineLength,
maxHeadersLen = networking.maxHeadersLength
)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.resource
Timeout.httpRoutes[F](timeout = networking.responseHeaderTimeout)(routes).collect {
case Response(Status.ServiceUnavailable, httpVersion, headers, body, attributes) =>
Response[F](Status.RequestTimeout, httpVersion, headers, body, attributes)
}

implicit class ConditionalAction[A](item: A) {
def cond(cond: Boolean, action: A => A): A =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object Run {
config.networking,
config.monitoring.metrics,
config.debug.http
)
)(HttpServer.buildBlazeServer)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
} yield httpClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.snowplowanalytics.snowplow.collector.core

import org.specs2.mutable.Specification
import cats.effect.IO

import org.http4s.client.Client
import org.http4s._
import org.http4s.dsl.io._
import org.http4s.implicits._
import scala.concurrent.duration._
import cats.effect.testing.specs2._

class HttpServerSpecification extends Specification with CatsEffect {

"HttpServer" should {
"manage request timeout" should {
"timeout threshold is configured" in {
val config =
TestUtils.testConfig.copy(networking = TestUtils.testConfig.networking.copy(responseHeaderTimeout = 100.millis))
val routes = HttpRoutes.of[IO] {
case _ -> Root / "fast" =>
Ok("Fast")
case _ -> Root / "never" =>
IO.never[Response[IO]]
}
val healthRoutes = HttpRoutes.of[IO] {
case _ -> Root / "health" =>
Ok("ok")
}
val httpApp = HttpServer.httpApp(
routes,
healthRoutes,
config.hsts,
config.networking,
config.debug.http
)
val client: Client[IO] = Client.fromHttpApp(httpApp)
val request: Request[IO] = Request(method = Method.GET, uri = uri"/never")
val res: IO[String] = client.expect[String](request)

res
.attempt
.map(_ must beLeft[Throwable].which {
case org.http4s.client.UnexpectedStatus(Status.RequestTimeout, _, _) => true
case _ => false
})
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ object TestUtils {
5.seconds,
1.second,
20480,
40960
40960,
100000
),
debug = Debug.Debug(
http = Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ object KafkaConfigSpec {
responseHeaderTimeout = 30.seconds,
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960
maxHeadersLength = 40960,
maxPayloadSize = 1048576
),
license = Config.License(accept = true),
debug = Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ object KinesisConfigSpec {
responseHeaderTimeout = 30.seconds,
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960
maxHeadersLength = 40960,
maxPayloadSize = 1048576
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ object NsqConfigSpec {
responseHeaderTimeout = 30.seconds,
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960
maxHeadersLength = 40960,
maxPayloadSize = 1048576
),
license = Config.License(accept = true),
debug = Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ object ConfigSpec {
responseHeaderTimeout = 30.seconds,
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960
maxHeadersLength = 40960,
maxPayloadSize = 1048576
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ object SqsConfigSpec {
responseHeaderTimeout = 30.seconds,
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960
maxHeadersLength = 40960,
maxPayloadSize = 1048576
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down

0 comments on commit 22f2204

Please sign in to comment.