From d74142941fcd2dd843d367b1f1f650b5aaac2919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 3 Nov 2023 14:59:17 +0100 Subject: [PATCH] Upgrade scala? --- .../ConfigParser.scala | 13 ++++------ .../HttpServer.scala | 2 +- .../Run.scala | 2 +- .../Service.scala | 2 +- .../RoutesSpec.scala | 2 ++ .../ServiceSpec.scala | 6 ++--- .../TestUtils.scala | 6 ++--- .../scalastream/it/kafka/KafkaUtils.scala | 2 +- .../scalastream/it/core/CookieSpec.scala | 4 ++-- .../it/core/DoNotTrackCookieSpec.scala | 2 +- .../scalastream/it/kinesis/Kinesis.scala | 2 +- .../sinks/KinesisSink.scala | 8 +++---- .../sinks/NsqSink.scala | 10 +++++--- project/BuildSettings.scala | 24 +++++++++---------- .../scalastream/it/pubsub/PubSub.scala | 2 +- .../sinks/PubSubHealthCheck.scala | 2 +- .../sinks/PubSubSink.scala | 2 +- .../sinks/SqsSink.scala | 7 +++--- 18 files changed, 48 insertions(+), 50 deletions(-) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala index c2960ba8d..bf3c80c29 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala @@ -1,25 +1,20 @@ package com.snowplowanalytics.snowplow.collector.core import java.nio.file.{Files, Path} - import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger - -import com.typesafe.config.{Config => TypesafeConfig, ConfigFactory} - -import scala.collection.JavaConverters._ - +import com.typesafe.config.{ConfigFactory, Config => TypesafeConfig} import io.circe.Decoder import io.circe.config.syntax.CirceConfigOps - import cats.implicits._ import cats.data.EitherT - import cats.effect.{ExitCode, Sync} +import scala.jdk.CollectionConverters._ + object ConfigParser { - implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + implicit private def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] def fromPath[F[_]: Sync, SinkConfig: Decoder]( configPath: Option[Path] diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index e792b7d10..a162d2e0a 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -13,7 +13,7 @@ import javax.net.ssl.SSLContext object HttpServer { - implicit private def logger[F[_]: Async] = Slf4jLogger.getLogger[F] + implicit private def logger[F[_]: Async]: Logger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async]( app: HttpApp[F], diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 443f40e2a..3968e2a5b 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -29,7 +29,7 @@ object Run { type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo] - implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + implicit private def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index ad3373cea..59d8c4a62 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -5,7 +5,7 @@ import java.util.UUID import org.apache.commons.codec.binary.Base64 import scala.concurrent.duration._ -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import cats.effect.{Clock, Sync} import cats.implicits._ diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala index 7e5ac7fe5..2cbf82dc2 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala @@ -171,6 +171,8 @@ class RoutesSpec extends Specification { case Method.GET => cookieParams.pixelExpected shouldEqual true cookieParams.contentType shouldEqual None + case other => + ko(s"Invalid http method - $other") } cookieParams.doNotTrack shouldEqual false response.status must beEqualTo(Status.Ok) diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala index f44bfba02..c431eb75f 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala @@ -1,7 +1,7 @@ package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import org.specs2.mutable.Specification @@ -404,7 +404,7 @@ class ServiceSpec extends Specification { `Access-Control-Allow-Headers`(ci"Content-Type", ci"SP-Anonymous"), `Access-Control-Max-Age`.Cache(3600).asInstanceOf[`Access-Control-Max-Age`] ) - service.preflightResponse(Request[IO]()).unsafeRunSync.headers shouldEqual expected + service.preflightResponse(Request[IO]()).unsafeRunSync().headers shouldEqual expected } } @@ -700,7 +700,7 @@ class ServiceSpec extends Specification { ) cookie.secure must beTrue cookie.httpOnly must beTrue - cookie.sameSite must beSome(SameSite.None) + cookie.sameSite must beSome[SameSite](SameSite.None) cookie.extension must beNone service.cookieHeader( headers = Headers.empty, diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 3647ec7d3..2dc0b780e 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -6,7 +6,7 @@ import cats.Applicative import org.http4s.SameSite -import com.snowplowanalytics.snowplow.collector.core.Config._ +import com.snowplowanalytics.snowplow.collector.core.Config.{Sink => SinkConfig, _} object TestUtils { val appName = "collector-test" @@ -75,7 +75,7 @@ object TestUtils { ), cors = CORS(60.minutes), streams = Streams( - good = Sink( + good = SinkConfig( name = "raw", Buffer( 3145728, @@ -84,7 +84,7 @@ object TestUtils { ), AnyRef ), - bad = Sink( + bad = SinkConfig( name = "bad-1", Buffer( 3145728, diff --git a/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala index b70d73d07..8cb176647 100644 --- a/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala +++ b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala @@ -4,7 +4,7 @@ import cats.effect._ import org.apache.kafka.clients.consumer._ import java.util.Properties import java.time.Duration -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala index f972243b6..f4c616fc2 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/CookieSpec.scala @@ -46,13 +46,13 @@ class CookieSpec extends Specification with Localstack with CatsEffect { cookie.name must beEqualTo("greatName") cookie.expires match { case Some(expiry) => - expiry.epochSecond should beCloseTo((now + 42.days).toSeconds, 10) + expiry.epochSecond should beCloseTo((now + 42.days).toSeconds, 10L) case None => ko(s"Cookie [$cookie] doesn't contain the expiry date") } cookie.secure should beTrue cookie.httpOnly should beTrue - cookie.sameSite should beSome(SameSite.Strict) + cookie.sameSite should beSome[SameSite](SameSite.Strict) case _ => ko(s"There is not 1 cookie but ${resp.cookies.size}") } } diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala index abcbc4937..fa8142611 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/core/DoNotTrackCookieSpec.scala @@ -16,7 +16,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containe import org.specs2.execute.PendingUntilFixed import org.specs2.mutable.Specification -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.concurrent.duration._ class DoNotTrackCookieSpec extends Specification with Localstack with CatsEffect with PendingUntilFixed { diff --git a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/Kinesis.scala b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/Kinesis.scala index 449d5da04..e34395e79 100644 --- a/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/Kinesis.scala +++ b/kinesis/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kinesis/Kinesis.scala @@ -8,7 +8,7 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer import cats.effect.{IO, Resource} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index cf6519a40..ca84dca84 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory import java.nio.ByteBuffer import java.util.UUID import java.util.concurrent.ScheduledExecutorService -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutorService, Future} @@ -92,7 +92,7 @@ class KinesisSink[F[_]: Sync] private ( def flush(): Unit = { val eventsToSend = synchronized { - val evts = storedEvents.result + val evts = storedEvents.result() storedEvents.clear() byteCount = 0 evts @@ -367,7 +367,7 @@ class KinesisSink[F[_]: Sync] private ( private def checkKinesisHealth(): Unit = { val healthRunnable = new Runnable { - override def run() { + override def run(): Unit = { log.info(s"Starting background check for Kinesis stream $streamName") while (!kinesisHealthy) { Try { @@ -392,7 +392,7 @@ class KinesisSink[F[_]: Sync] private ( private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs => val healthRunnable = new Runnable { - override def run() { + override def run(): Unit = { log.info(s"Starting background check for SQS buffer ${sqs.sqsBufferName}") while (!sqsHealthy) { Try { diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index c47cc374a..e7885552c 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks import java.util.concurrent.TimeoutException -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import cats.effect.{Resource, Sync} import cats.implicits._ import com.snowplowanalytics.client.nsq.NSQProducer @@ -42,11 +42,15 @@ class NsqSink[F[_]: Sync] private ( override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].blocking(producer.produceMulti(topicName, events.asJava)).onError { case _: NSQException | _: TimeoutException => - Sync[F].delay(healthStatus = false) - } *> Sync[F].delay(healthStatus = true) + setHealthStatus(false) + } *> setHealthStatus(true) def shutdown(): Unit = producer.shutdown() + + private def setHealthStatus(status: Boolean): F[Unit] = Sync[F].delay { + healthStatus = status + } } object NsqSink { diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 5b0336e8d..aa1684a10 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -21,19 +21,17 @@ import sbtdynver.DynVerPlugin.autoImport._ object BuildSettings { lazy val commonSettings = Seq( - organization := "com.snowplowanalytics", - name := "snowplow-stream-collector", - description := "Scala Stream Collector for Snowplow raw events", - scalaVersion := "2.12.10", - scalacOptions ++= Seq("-Ypartial-unification", "-Ywarn-macros:after"), - javacOptions := Seq("-source", "11", "-target", "11"), - resolvers ++= resolutionRepos - ) - - lazy val resolutionRepos = Seq( - "Snowplow Analytics Maven repo".at("http://maven.snplow.com/releases/").withAllowInsecureProtocol(true), - // For uaParser utils - "user-agent-parser repo".at("https://clojars.org/repo/") + organization := "com.snowplowanalytics", + name := "snowplow-stream-collector", + description := "Scala Stream Collector for Snowplow raw events", + scalaVersion := "2.13.12", + scalacOptions ++= Seq("-Ywarn-macros:after"), + javacOptions := Seq("-source", "11", "-target", "11"), + resolvers ++= Seq( + "Snowplow Analytics Maven repo".at("http://maven.snplow.com/releases/").withAllowInsecureProtocol(true), + // For uaParser utils + "user-agent-parser repo".at("https://clojars.org/repo/") + ) ) lazy val coreHttp4sSettings = commonSettings ++ sbtAssemblySettings ++ Defaults.itSettings diff --git a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/PubSub.scala b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/PubSub.scala index 390fa3e2a..8fc2b8a7e 100644 --- a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/PubSub.scala +++ b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/PubSub.scala @@ -8,7 +8,7 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.pubsub -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import com.google.api.gax.grpc.GrpcTransportChannel import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala index 07940c3c0..2b40b45f9 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala @@ -9,7 +9,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.util._ object PubSubHealthCheck { diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala index 387efaaa0..7f6a4f5bb 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala @@ -94,7 +94,7 @@ object PubSubSink { sinkConfig.name ) - private def createProducer[F[_]: Async: Parallel]( + private def createProducer[F[_]: Async]( sinkConfig: PubSubSinkConfig, topicName: String, bufferConfig: Config.Buffer diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index 7110f148c..8f9bc255d 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ListBuffer import scala.util.{Failure, Random, Success, Try} import scala.concurrent.{ExecutionContextExecutorService, Future} import scala.concurrent.duration.MILLISECONDS -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import cats.syntax.either._ @@ -82,7 +82,7 @@ class SqsSink[F[_]: Sync] private ( def flush(): Unit = { val eventsToSend = synchronized { - val evts = storedEvents.result + val evts = storedEvents.result() storedEvents.clear() byteCount = 0 evts @@ -242,7 +242,7 @@ class SqsSink[F[_]: Sync] private ( private def checkSqsHealth(): Unit = { val healthRunnable = new Runnable { - override def run() { + override def run(): Unit = while (!sqsHealthy) { Try { client.getQueueUrl(queueName) @@ -255,7 +255,6 @@ class SqsSink[F[_]: Sync] private ( Thread.sleep(1000L) } } - } } executorService.execute(healthRunnable) }