diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index e80f239ba..31a1c13a9 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -28,9 +28,10 @@ class Routes[F[_]: Async]( implicit val dns: Dns[F] = Dns.forSync[F] - private val healthRoutes = HttpRoutes.of[F] { + private val healthRoutes = HttpRoutes.strict[F] { case GET -> Root / "health" => Ok("ok") + } <+> HttpRoutes.of[F] { case GET -> Root / "sink-health" => service .sinksHealthy diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index 9acff5b66..343dd58a1 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -32,6 +32,7 @@ import org.typelevel.ci._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload import com.snowplowanalytics.snowplow.collector.core.model._ +import cats.effect.kernel.Async trait IService[F[_]] { def preflightResponse(req: Request[F]): F[Response[F]] @@ -54,7 +55,7 @@ object Service { val spAnonymousNuid = "00000000-0000-0000-0000-000000000000" } -class Service[F[_]: Sync]( +class Service[F[_]: Async]( config: Config[Any], sinks: Sinks[F], appInfo: AppInfo @@ -121,15 +122,14 @@ class Service[F[_]: Sync]( `Access-Control-Allow-Credentials`().toRaw1.some ).flatten responseHeaders = Headers(headerList ++ bounceLocationHeaders(config.cookieBounce, shouldBounce, request)) - _ <- if (!doNotTrack && !shouldBounce) sinkEvent(event, partitionKey) else Sync[F].unit - resp = buildHttpResponse( - queryParams = request.uri.query.params, - headers = responseHeaders, - redirect = redirect, - pixelExpected = pixelExpected, - shouldBounce = shouldBounce - ) - } yield resp + _ <- if (!doNotTrack && !shouldBounce) Async[F].start(sinkEvent(event, partitionKey)) else Async[F].unit + } yield buildHttpResponse( + queryParams = request.uri.query.params, + headers = responseHeaders, + redirect = redirect, + pixelExpected = pixelExpected, + shouldBounce = shouldBounce + ) override def sinksHealthy: F[Boolean] = (sinks.good.isHealthy, sinks.bad.isHealthy).mapN(_ && _) @@ -323,7 +323,7 @@ class Service[F[_]: Sync]( ): F[Unit] = for { // Split events into Good and Bad - eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) + eventSplit <- Async[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) // Send events to respective sinks _ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey) _ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)