From 294ddbae0576c487ce169f68ba81f1cfd00ee5d2 Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 27 May 2024 17:33:53 +0300 Subject: [PATCH] Bump common-streams to 0.7.0 (#40) --- .../processing/SnowplowCaster.scala | 13 ++++++++----- .../snowplow/snowflake/KinesisConfigSpec.scala | 8 ++++++-- .../snowplow/snowflake/PubsubConfigSpec.scala | 4 ++-- project/BuildSettings.scala | 3 ++- project/Dependencies.scala | 2 +- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowplowCaster.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowplowCaster.scala index 926eb93..3d58c6d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowplowCaster.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowplowCaster.scala @@ -15,6 +15,8 @@ import io.circe.Json import java.time.{Instant, LocalDate, OffsetDateTime, ZoneOffset} import java.util.{List => JList, Map => JMap} +import cats.data.NonEmptyVector + import com.snowplowanalytics.iglu.schemaddl.parquet.Type import com.snowplowanalytics.iglu.schemaddl.parquet.Caster @@ -32,13 +34,14 @@ private[processing] object SnowflakeCaster extends Caster[AnyRef] { override def doubleValue(v: Double): java.lang.Double = Double.box(v) override def decimalValue(unscaled: BigInt, details: Type.Decimal): java.math.BigDecimal = new java.math.BigDecimal(unscaled.bigInteger, details.scale) - override def timestampValue(v: Instant): OffsetDateTime = OffsetDateTime.ofInstant(v, ZoneOffset.UTC) - override def dateValue(v: LocalDate): LocalDate = v - override def arrayValue(vs: List[AnyRef]): JList[AnyRef] = vs.asJava - override def structValue(vs: List[Caster.NamedValue[AnyRef]]): JMap[String, AnyRef] = + override def timestampValue(v: Instant): OffsetDateTime = OffsetDateTime.ofInstant(v, ZoneOffset.UTC) + override def dateValue(v: LocalDate): LocalDate = v + override def arrayValue(vs: Vector[AnyRef]): JList[AnyRef] = vs.asJava + override def structValue(vs: NonEmptyVector[Caster.NamedValue[AnyRef]]): JMap[String, AnyRef] = vs.map { case Caster.NamedValue(k, v) => (k, v) - }.toMap + }.toVector + .toMap .asJava } diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala index 82e5213..c87596c 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala @@ -64,12 +64,14 @@ object KinesisConfigSpec { input = KinesisSourceConfig( appName = "snowplow-snowflake-loader", streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", initialPosition = KinesisSourceConfig.InitialPosition.Latest, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds ), output = Config.Output( good = Config.Snowflake( @@ -137,12 +139,14 @@ object KinesisConfigSpec { input = KinesisSourceConfig( appName = "snowplow-snowflake-loader", streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds ), output = Config.Output( good = Config.Snowflake( diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala index 671a9e1..745d96a 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala @@ -63,7 +63,7 @@ object PubsubConfigSpec { private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig]( input = PubsubSourceConfig( subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"), - parallelPullCount = 1, + parallelPullFactor = 0.5, bufferMaxBytes = 10000000, maxAckExtensionPeriod = 1.hour, minDurationPerAckExtension = 1.minute, @@ -135,7 +135,7 @@ object PubsubConfigSpec { private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig]( input = PubsubSourceConfig( subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"), - parallelPullCount = 3, + parallelPullFactor = 0.5, bufferMaxBytes = 1000000, maxAckExtensionPeriod = 1.hour, minDurationPerAckExtension = 1.minute, diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index b1ae030..d75f0be 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -57,7 +57,8 @@ object BuildSettings { // used in extended configuration parsing unit tests Test / envVars := Map( "SNOWFLAKE_PRIVATE_KEY" -> "secretPrivateKey", - "SNOWFLAKE_PRIVATE_KEY_PASSPHRASE" -> "secretKeyPassphrase" + "SNOWFLAKE_PRIVATE_KEY_PASSPHRASE" -> "secretKeyPassphrase", + "HOSTNAME" -> "testWorkerId" ) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 73a1953..2cb7c42 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -34,7 +34,7 @@ object Dependencies { val nimbusJwt = "9.37.2" // Version override // Snowplow - val streams = "0.4.0" + val streams = "0.7.0" // tests val specs2 = "4.20.0"