Skip to content

Commit

Permalink
Bump common-streams to 0.7.0 (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed May 29, 2024
1 parent 5c37862 commit 294ddba
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.circe.Json
import java.time.{Instant, LocalDate, OffsetDateTime, ZoneOffset}
import java.util.{List => JList, Map => JMap}

import cats.data.NonEmptyVector

import com.snowplowanalytics.iglu.schemaddl.parquet.Type
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster

Expand All @@ -32,13 +34,14 @@ private[processing] object SnowflakeCaster extends Caster[AnyRef] {
override def doubleValue(v: Double): java.lang.Double = Double.box(v)
override def decimalValue(unscaled: BigInt, details: Type.Decimal): java.math.BigDecimal =
new java.math.BigDecimal(unscaled.bigInteger, details.scale)
override def timestampValue(v: Instant): OffsetDateTime = OffsetDateTime.ofInstant(v, ZoneOffset.UTC)
override def dateValue(v: LocalDate): LocalDate = v
override def arrayValue(vs: List[AnyRef]): JList[AnyRef] = vs.asJava
override def structValue(vs: List[Caster.NamedValue[AnyRef]]): JMap[String, AnyRef] =
override def timestampValue(v: Instant): OffsetDateTime = OffsetDateTime.ofInstant(v, ZoneOffset.UTC)
override def dateValue(v: LocalDate): LocalDate = v
override def arrayValue(vs: Vector[AnyRef]): JList[AnyRef] = vs.asJava
override def structValue(vs: NonEmptyVector[Caster.NamedValue[AnyRef]]): JMap[String, AnyRef] =
vs.map { case Caster.NamedValue(k, v) =>
(k, v)
}.toMap
}.toVector
.toMap
.asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ object KinesisConfigSpec {
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -137,12 +139,14 @@ object KinesisConfigSpec {
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
),
output = Config.Output(
good = Config.Snowflake(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object PubsubConfigSpec {
private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullCount = 1,
parallelPullFactor = 0.5,
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
Expand Down Expand Up @@ -135,7 +135,7 @@ object PubsubConfigSpec {
private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullCount = 3,
parallelPullFactor = 0.5,
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
Expand Down
3 changes: 2 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ object BuildSettings {
// used in extended configuration parsing unit tests
Test / envVars := Map(
"SNOWFLAKE_PRIVATE_KEY" -> "secretPrivateKey",
"SNOWFLAKE_PRIVATE_KEY_PASSPHRASE" -> "secretKeyPassphrase"
"SNOWFLAKE_PRIVATE_KEY_PASSPHRASE" -> "secretKeyPassphrase",
"HOSTNAME" -> "testWorkerId"
)
)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Dependencies {
val nimbusJwt = "9.37.2" // Version override

// Snowplow
val streams = "0.4.0"
val streams = "0.7.0"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit 294ddba

Please sign in to comment.