From 0944b6038ad3b78944aa8283f00b4c98cc777012 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 20 Apr 2024 21:11:07 +0100 Subject: [PATCH 1/4] Bump common-streams to 0.6.0 --- .../processing/BigQueryCaster.scala | 7 ++++--- .../processing/BigQuerySchemaUtils.scala | 10 +++++----- .../processing/ProcessingSpec.scala | 8 +++++--- project/BuildSettings.scala | 2 +- project/Dependencies.scala | 2 +- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryCaster.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryCaster.scala index 06bb4ce8..e8cd2146 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryCaster.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryCaster.scala @@ -8,6 +8,7 @@ */ package com.snowplowanalytics.snowplow.bigquery.processing +import cats.data.NonEmptyVector import io.circe.Json import java.time.{Instant, LocalDate} @@ -32,11 +33,11 @@ private[processing] object BigQueryCaster extends Caster[AnyRef] { new java.math.BigDecimal(unscaled.bigInteger, details.scale) override def timestampValue(v: Instant): java.lang.Long = Long.box(v.toEpochMilli * 1000) // Microseconds override def dateValue(v: LocalDate): java.lang.Long = Long.box(v.toEpochDay) - override def arrayValue(vs: List[AnyRef]): JSONArray = + override def arrayValue(vs: Vector[AnyRef]): JSONArray = // BigQuery does not permit nulls in a repeated field new JSONArray(vs.filterNot(_ == null).asJava) - override def structValue(vs: List[Caster.NamedValue[AnyRef]]): JSONObject = { - val map = vs + override def structValue(vs: NonEmptyVector[Caster.NamedValue[AnyRef]]): JSONObject = { + val map = vs.iterator .map { case Caster.NamedValue(k, v) => (k, v) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index 4d90d2e9..b852924c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -16,7 +16,7 @@ import scala.jdk.CollectionConverters._ object BigQuerySchemaUtils { - def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] = + def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Vector[Field]): Vector[Field] = ddlFields.filter { field => Option(tableDescriptor.findFieldByName(field.name)) match { case Some(fieldDescriptor) => @@ -32,7 +32,7 @@ object BigQuerySchemaUtils { case Descriptors.FieldDescriptor.Type.MESSAGE => ddlField.fieldType match { case Type.Struct(nestedFields) => - alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty + alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty case _ => false } @@ -40,7 +40,7 @@ object BigQuerySchemaUtils { false } - def mergeInColumns(bqFields: FieldList, ddlFields: Seq[Field]): FieldList = { + def mergeInColumns(bqFields: FieldList, ddlFields: Vector[Field]): FieldList = { val ddlFieldsByName = ddlFields.map(f => f.name -> f).toMap val bqFieldNames = bqFields.asScala.map(f => f.getName).toSet val alteredExisting = bqFields.asScala.map { bqField => @@ -65,7 +65,7 @@ object BigQuerySchemaUtils { Option(bqField.getSubFields) match { case Some(bqNestedFields) => bqField.toBuilder - .setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields)) + .setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields.toVector)) .build case None => bqField @@ -86,7 +86,7 @@ object BigQuerySchemaUtils { .setMode(BQField.Mode.REPEATED) .build case Type.Struct(nestedFields) => - val nested = FieldList.of(nestedFields.map(bqFieldOf).asJava) + val nested = FieldList.of(nestedFields.map(bqFieldOf).toVector.asJava) BQField .newBuilder(ddlField.name, StandardSQLTypeName.STRUCT, nested) .setMode(bqModeOf(ddlField.nullability)) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala index 0159a299..9a70d50b 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala @@ -109,8 +109,10 @@ class ProcessingSpec extends Specification with CatsEffect { { "schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", "data": { - "schema": "iglu:com.snowplowanalytics.snowplow.media/ad_start_event/jsonschema/1-0-0", - "data": {} + "schema": "iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0", + "data": { + "percentProgress": 50 + } } } """.as[UnstructEvent].fold(throw _, identity) @@ -122,7 +124,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.CreatedTable, Action.OpenedWriter, - Action.AlterTableAddedColumns(Vector("unstruct_event_com_snowplowanalytics_snowplow_media_ad_start_event_1")), + Action.AlterTableAddedColumns(Vector("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1")), Action.ClosedWriter, Action.OpenedWriter, Action.WroteRowsToBigQuery(2), diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index e78a8afd..36ef7cfb 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -44,7 +44,7 @@ object BuildSettings { Test / igluUris := Seq( // Iglu schemas used in unit tests "iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0", - "iglu:com.snowplowanalytics.snowplow.media/ad_start_event/jsonschema/1-0-0" + "iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0" ) ) ++ commonSettings diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d8fa8902..d6efa5dd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,7 +30,7 @@ object Dependencies { val bigquery = "2.34.2" // Snowplow - val streams = "0.5.2" + val streams = "0.6.0" val igluClient = "3.1.0" // tests From 8338639c506ebfa536f68b20e6f0cd21dcbe0c69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Tue, 30 Apr 2024 09:36:09 +0200 Subject: [PATCH 2/4] Handle too many columns - part 2 We already handle one type of exception for too many columns case with message like: ```Too many columns (10067) in schema. Only 10000 columns are allowed``` It turns out BQ can fail with different one in this case, like: ```Too many total leaf fields: 10001, max allowed field count: 10000``` so we have to handle it as well. --- .../processing/TableManager.scala | 4 +- .../processing/TableManagerSpec.scala | 93 ++++++++++++++++++- project/plugins.sbt | 2 +- 3 files changed, 92 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index 1804b2f7..207732f2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -134,7 +134,9 @@ object TableManager { addingColumnsEnabled: Ref[F, Boolean], columns: Vector[Field] ): PartialFunction[Throwable, F[Unit]] = { - case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") => + case bqe: BigQueryException + if bqe.lowerCaseReason === "invalid" && (bqe.lowerCaseMessage + .startsWith("too many columns") || bqe.lowerCaseMessage.startsWith("too many total leaf fields")) => val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) for { _ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns") diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala index b4772729..b85f3b49 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala @@ -32,12 +32,14 @@ class TableManagerSpec extends Specification with CatsEffect { def is = s2""" The table manager Add columns when told to $e1 - Retry adding columns and send alerts when there is a setup exceptionr $e2 + Retry adding columns and send alerts when there is a setup exception $e2 Retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e3 Become healthy after recovering from an earlier setup error $e4 Become healthy after recovering from an earlier transient error $e5 - Disable future attempts to add columns if the table has too many columns $e6 - Eventually re-enable future attempts to add columns if the table has too many columns $e7 + Disable future attempts to add columns if the table has too many columns - exception type 1 $e6_1 + Disable future attempts to add columns if the table has too many columns - exception type 2 $e6_2 + Eventually re-enable future attempts to add columns if the table has too many columns - exception type 1 $e7_1 + Eventually re-enable future attempts to add columns if the table has too many columns - exception type 2 $e7_2 """ def e1 = control().flatMap { c => @@ -211,7 +213,7 @@ class TableManagerSpec extends Specification with CatsEffect { } } - def e6 = { + def e6_1 = { val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") val exception = new BigQueryException(400, error.getMessage, error) val mocks = List(Response.ExceptionThrown(exception)) @@ -251,7 +253,47 @@ class TableManagerSpec extends Specification with CatsEffect { } } - def e7 = { + def e6_2 = { + val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) + control(Mocks(addColumnsResults = mocks)).flatMap { c => + val testFields1 = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val testFields2 = Vector( + Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")), + Field("f4", Type.Integer, Type.Nullability.Required, Set("f4")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields1) + _ <- IO.sleep(10.seconds) + _ <- tableManager.addColumns(testFields2) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields1), + Action.SentAlert(0L) + ) + + val test = for { + _ <- io + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beHealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + + def e7_1 = { val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") val exception = new BigQueryException(400, error.getMessage, error) val mocks = List(Response.ExceptionThrown(exception)) @@ -292,6 +334,47 @@ class TableManagerSpec extends Specification with CatsEffect { } } + def e7_2 = { + val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) + control(Mocks(addColumnsResults = mocks)).flatMap { c => + val testFields1 = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val testFields2 = Vector( + Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")), + Field("f4", Type.Integer, Type.Nullability.Required, Set("f4")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields1) + _ <- IO.sleep(310.seconds) + _ <- tableManager.addColumns(testFields2) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields1), + Action.SentAlert(0L), + Action.AddColumnsAttempted(testFields2) + ) + + val test = for { + _ <- io + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beHealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + /** Convenience matchers for health probe * */ def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => diff --git a/project/plugins.sbt b/project/plugins.sbt index 992fe572..f352ec6d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1") +addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.2") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") From 771f821e4410c816bdbd0f1d2826f6313b486fd6 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 3 May 2024 00:34:24 +0100 Subject: [PATCH 3/4] Allow for delay in Writer discovering new columns After the loader alters the table to add new columns, it immediately opens a new Writer and expects the Writer to be aware of the new columns. However, we have found the Writer might get opened with no awareness of the newly added columns. Presumbably because of the async nature of BigQuery's architecture. This fix works by retrying opening the writer until eventually it should get opened with awareness of the new columns --- .../processing/BigQuerySchemaUtils.scala | 29 +++++++++++++++ .../processing/Processing.scala | 25 +++++++++++-- .../processing/TableManager.scala | 22 ++++++++---- .../AtomicDescriptor.scala | 27 ++++++++++++++ .../MockEnvironment.scala | 36 ++++++++++++++----- .../processing/ProcessingSpec.scala | 21 +++++++++-- .../processing/TableManagerSpec.scala | 6 ++-- 7 files changed, 142 insertions(+), 24 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index b852924c..ea63d6a0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -8,6 +8,9 @@ */ package com.snowplowanalytics.snowplow.bigquery.processing +import cats.Eq +import cats.implicits._ + import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type} import com.google.protobuf.Descriptors import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName} @@ -16,6 +19,30 @@ import scala.jdk.CollectionConverters._ object BigQuerySchemaUtils { + def fieldsMissingFromDescriptor(tableDescriptor: Descriptors.Descriptor, bqFields: FieldList): Boolean = + bqFields.asScala.exists { field => + Option(tableDescriptor.findFieldByName(field.getName)) match { + case Some(fieldDescriptor) => + val nullableMismatch = fieldDescriptor.isRequired && (field.getMode === BQField.Mode.NULLABLE) + nullableMismatch || nestedFieldMissingFromDescriptor(fieldDescriptor, field) + case None => + true + } + } + + private def nestedFieldMissingFromDescriptor(tableField: Descriptors.FieldDescriptor, bqField: BQField): Boolean = + tableField.getType match { + case Descriptors.FieldDescriptor.Type.MESSAGE => + Option(bqField.getSubFields) match { + case Some(nestedFields) => + fieldsMissingFromDescriptor(tableField.getMessageType, nestedFields) + case _ => + false + } + case _ => + false + } + def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Vector[Field]): Vector[Field] = ddlFields.filter { field => Option(tableDescriptor.findFieldByName(field.name)) match { @@ -113,6 +140,8 @@ object BigQuerySchemaUtils { private def bqModeOf(nullability: Type.Nullability): BQField.Mode = if (nullability.nullable) BQField.Mode.NULLABLE else BQField.Mode.REQUIRED + private implicit val modeEq: Eq[BQField.Mode] = Eq.fromUniversalEquals + def showDescriptor(descriptor: Descriptors.Descriptor): String = descriptor.getFields.asScala .map(_.getName) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index f66da7e9..c088a36c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -17,6 +17,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.{PolicyDecision, RetryDetails, RetryPolicy} import retry.implicits._ +import com.google.cloud.bigquery.FieldList import java.nio.charset.StandardCharsets import scala.concurrent.duration.Duration @@ -205,7 +206,7 @@ object Processing { def handlingServerSideSchemaMismatches(env: Environment[F]): F[Writer.WriteResult] = { def onFailure(wr: Writer.WriteResult, details: RetryDetails): F[Unit] = { val extractedDetail = BigQueryRetrying.extractRetryDetails(details) - val msg = s"Newly added columns have not yet propagated to the BigQuery Writer. $extractedDetail" + val msg = s"Newly added columns have not yet propagated to the BigQuery Writer server-side. $extractedDetail" val log = wr match { case Writer.WriteResult.ServerSideSchemaMismatch(e) if details.retriesSoFar > errorsAllowedWithShortLogging => Logger[F].warn(e)(msg) @@ -287,7 +288,7 @@ object Processing { * Alters the table to add any columns that were present in the Events but not currently in the * table */ - private def handleSchemaEvolution[F[_]: Sync]( + private def handleSchemaEvolution[F[_]: Async]( env: Environment[F] ): Pipe[F, BatchAfterTransform, BatchAfterTransform] = _.evalTap { batch => @@ -299,7 +300,9 @@ object Processing { } val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields) if (fieldsToAdd.nonEmpty) { - env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_ + env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist => + openWriterUntilFieldsExist(env, fieldsToExist) + } } else { Sync[F].unit } @@ -309,6 +312,22 @@ object Processing { } } + private def openWriterUntilFieldsExist[F[_]: Async](env: Environment[F], fieldsToExist: FieldList): F[Unit] = + env.writer.opened + .use(_.descriptor) + .retryingOnFailures( + policy = env.alterTableWaitPolicy, + wasSuccessful = { descriptor => + (!BigQuerySchemaUtils.fieldsMissingFromDescriptor(descriptor, fieldsToExist)).pure[F] + }, + onFailure = { case (_, details) => + val extractedDetail = BigQueryRetrying.extractRetryDetails(details) + val msg = s"Newly added columns have not yet propagated to the BigQuery Writer client-side. $extractedDetail" + Logger[F].warn(msg) *> env.writer.closed.use_ + } + ) + .void + private def sendFailedEvents[F[_]: Sync]( env: Environment[F], badRowProcessor: BadRowProcessor diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index 207732f2..ff96156a 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -37,7 +37,15 @@ import scala.jdk.CollectionConverters._ trait TableManager[F[_]] { - def addColumns(columns: Vector[Field]): F[Unit] + /** + * Attempt to add columns to the table + * + * @return + * A list of fields which are guaranteed to eventually exist in the table. Fields might not be + * available immediately due to asynchronous nature of BigQuery. The returned list will be empty + * if adding columns failed due to too many columns in the table. + */ + def addColumns(columns: Vector[Field]): F[FieldList] def createTable: F[Unit] @@ -66,7 +74,7 @@ object TableManager { for { addingColumnsEnabled <- Ref[F].of[Boolean](true) } yield new WithHandledErrors[F] { - def addColumns(columns: Vector[Field]): F[Unit] = + def addColumns(columns: Vector[Field]): F[FieldList] = addingColumnsEnabled.get.flatMap { case true => BigQueryRetrying @@ -78,7 +86,7 @@ object TableManager { .onError(logOnRaceCondition) } case false => - Async[F].unit + FieldList.of().pure[F] } def createTable: F[Unit] = @@ -99,7 +107,7 @@ object TableManager { client: BigQuery ): TableManager[F] = new TableManager[F] { - def addColumns(columns: Vector[Field]): F[Unit] = + def addColumns(columns: Vector[Field]): F[FieldList] = for { table <- Sync[F].blocking(client.getTable(config.dataset, config.table)) schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema) @@ -108,7 +116,7 @@ object TableManager { schema <- Sync[F].pure(Schema.of(fields)) table <- Sync[F].pure(setTableSchema(table, schema)) _ <- Sync[F].blocking(table.update()) - } yield () + } yield fields def createTable: F[Unit] = { val tableInfo = atomicTableInfo(config) @@ -133,7 +141,7 @@ object TableManager { monitoring: Monitoring[F], addingColumnsEnabled: Ref[F, Boolean], columns: Vector[Field] - ): PartialFunction[Throwable, F[Unit]] = { + ): PartialFunction[Throwable, F[FieldList]] = { case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && (bqe.lowerCaseMessage .startsWith("too many columns") || bqe.lowerCaseMessage.startsWith("too many total leaf fields")) => @@ -143,7 +151,7 @@ object TableManager { _ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), bqe)) _ <- addingColumnsEnabled.set(false) _ <- enableAfterDelay.start - } yield () + } yield FieldList.of() } private def showColumns(columns: Vector[Field]): String = diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala index 88904054..957ab5c6 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala @@ -32,6 +32,17 @@ object AtomicDescriptor { fromDescriptorProtoBuilder(descriptorProto) } + /** A table which has been altered to add the ad_click_event unstruct event column */ + def withWebPageAndAdClick: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, webPage.setNumber(2)) + .addField(2, adClickEvent.setNumber(3)) + .addNestedType(webPageNestedType) + .addNestedType(adClickEventNestedType) + fromDescriptorProtoBuilder(descriptorProto) + } + private def fromDescriptorProtoBuilder(descriptorProto: DescriptorProtos.DescriptorProto.Builder): Descriptors.Descriptor = { descriptorProto.setName("event") @@ -65,4 +76,20 @@ object AtomicDescriptor { .addField(0, webPageId.setNumber(1)) .setName("web_page_1") + private def adClickEvent: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setTypeName("ad_click_event_1") + .setName("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1") + + private def adClickEventPercentProgress: DescriptorProtos.FieldDescriptorProto.Builder = + DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .setName("percent_progress") + + private def adClickEventNestedType: DescriptorProtos.DescriptorProto.Builder = + DescriptorProtos.DescriptorProto.newBuilder + .addField(0, adClickEventPercentProgress.setNumber(1)) + .setName("ad_click_event_1") + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala index 0a3b33aa..61c64169 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala @@ -14,6 +14,7 @@ import cats.effect.kernel.{Ref, Resource, Unique} import org.http4s.client.Client import fs2.Stream import com.google.protobuf.Descriptors +import com.google.cloud.bigquery.FieldList import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.schemaddl.parquet.Field @@ -62,7 +63,7 @@ object MockEnvironment { def build(inputs: List[TokenedEvents], mocks: Mocks): Resource[IO, MockEnvironment] = for { state <- Resource.eval(Ref[IO].of(Vector.empty[Action])) - writerResource <- Resource.eval(testWriter(state, mocks.writerResponses)) + writerResource <- Resource.eval(testWriter(state, mocks.writerResponses, mocks.descriptors)) writerColdswap <- Coldswap.make(writerResource) source = testSourceAndAck(inputs, state) appHealth <- Resource.eval(AppHealth.init(10.seconds, source, everythingHealthy)) @@ -73,7 +74,7 @@ object MockEnvironment { badSink = testBadSink(mocks.badSinkResponse, state), resolver = Resolver[IO](Nil, None), httpClient = testHttpClient, - tableManager = testTableManager(state), + tableManager = testTableManager(mocks.addColumnsResponse, state), writer = writerColdswap, metrics = testMetrics(state), appHealth = appHealth, @@ -91,11 +92,18 @@ object MockEnvironment { final case class Mocks( writerResponses: List[Response[Writer.WriteResult]], - badSinkResponse: Response[Unit] + badSinkResponse: Response[Unit], + addColumnsResponse: Response[FieldList], + descriptors: List[Descriptors.Descriptor] ) object Mocks { - val default: Mocks = Mocks(writerResponses = List.empty, badSinkResponse = Response.Success(())) + val default: Mocks = Mocks( + writerResponses = List.empty, + badSinkResponse = Response.Success(()), + addColumnsResponse = Response.Success(FieldList.of()), + descriptors = List.empty + ) } sealed trait Response[+A] @@ -111,10 +119,15 @@ object MockEnvironment { def cloud = "OnPrem" } - private def testTableManager(state: Ref[IO, Vector[Action]]): TableManager.WithHandledErrors[IO] = + private def testTableManager(mockedResponse: Response[FieldList], state: Ref[IO, Vector[Action]]): TableManager.WithHandledErrors[IO] = new TableManager.WithHandledErrors[IO] { - def addColumns(columns: Vector[Field]): IO[Unit] = - state.update(_ :+ AlterTableAddedColumns(columns.map(_.name))) + def addColumns(columns: Vector[Field]): IO[FieldList] = + mockedResponse match { + case Response.Success(fieldList) => + state.update(_ :+ AlterTableAddedColumns(columns.map(_.name))).as(fieldList) + case Response.ExceptionThrown(value) => + IO.raiseError(value) + } def createTable: IO[Unit] = state.update(_ :+ CreatedTable) @@ -161,15 +174,20 @@ object MockEnvironment { */ private def testWriter( actionRef: Ref[IO, Vector[Action]], - responses: List[Response[Writer.WriteResult]] + responses: List[Response[Writer.WriteResult]], + descriptors: List[Descriptors.Descriptor] ): IO[Resource[IO, Writer[IO]]] = for { responseRef <- Ref[IO].of(responses) + descriptorRef <- Ref[IO].of(descriptors) } yield { val make = actionRef.update(_ :+ OpenedWriter).as { new Writer[IO] { def descriptor: IO[Descriptors.Descriptor] = - IO(AtomicDescriptor.withWebPage) + descriptorRef.modify { + case head :: tail => (tail, head) + case Nil => (Nil, AtomicDescriptor.withWebPage) + } def write(rows: List[Map[String, AnyRef]]): IO[Writer.WriteResult] = for { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala index 9a70d50b..16979859 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala @@ -14,6 +14,7 @@ import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl import io.circe.literal._ +import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName} import java.nio.charset.StandardCharsets import java.nio.ByteBuffer @@ -22,7 +23,7 @@ import scala.concurrent.duration.DurationLong import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{UnstructEvent, unstructEventDecoder} -import com.snowplowanalytics.snowplow.bigquery.MockEnvironment +import com.snowplowanalytics.snowplow.bigquery.{AtomicDescriptor, MockEnvironment} import com.snowplowanalytics.snowplow.bigquery.MockEnvironment.{Action, Mocks} import com.snowplowanalytics.snowplow.runtime.HealthProbe import com.snowplowanalytics.snowplow.sources.TokenedEvents @@ -116,7 +117,23 @@ class ProcessingSpec extends Specification with CatsEffect { } } """.as[UnstructEvent].fold(throw _, identity) - runTest(inputEvents(count = 1, good(unstructEvent))) { case (inputs, control) => + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField.of( + "unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1", + StandardSQLTypeName.STRUCT, + FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING)) + ) + ) + ), + descriptors = List( + AtomicDescriptor.withWebPage, + AtomicDescriptor.withWebPage, + AtomicDescriptor.withWebPageAndAdClick + ) + ) + runTest(inputEvents(count = 1, good(unstructEvent)), mocks) { case (inputs, control) => for { _ <- Processing.stream(control.environment).compile.drain state <- control.state.get diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala index b85f3b49..f3b07ec9 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala @@ -16,7 +16,7 @@ import cats.effect.testkit.TestControl import com.google.api.gax.rpc.PermissionDeniedException import com.google.api.gax.grpc.GrpcStatusCode import io.grpc.Status -import com.google.cloud.bigquery.{BigQueryError, BigQueryException} +import com.google.cloud.bigquery.{BigQueryError, BigQueryException, FieldList} import scala.concurrent.duration.{DurationLong, FiniteDuration} @@ -448,7 +448,7 @@ object TableManagerSpec { for { mocksRef <- Ref[IO].of(mocks) } yield new TableManager[IO] { - def addColumns(columns: Vector[Field]): IO[Unit] = + def addColumns(columns: Vector[Field]): IO[FieldList] = for { response <- mocksRef.modify { case head :: tail => (tail, head) @@ -457,7 +457,7 @@ object TableManagerSpec { _ <- state.update(_ :+ Action.AddColumnsAttempted(columns)) result <- response match { case Response.Success => - IO.unit + IO.pure(FieldList.of()) case Response.ExceptionThrown(ex) => IO.raiseError(ex).adaptError { t => t.setStackTrace(Array()) // don't clutter our test logs From c1951fe344ce2fd44e1dab9e48499d9d08224859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Wed, 8 May 2024 11:41:39 +0200 Subject: [PATCH 4/4] Require alter table when schema is evolved for contexts When schema evolves, e.g. when new nested field is added to entity, loader should explicity try to alter underlying BigQuery schema. It works correctly for self-describing events (unstruct columns), but as it turns in case of contexts it doesn't modify schema, alter is skipped, what results in bad data. This commit should fix this problem. --- .../processing/BigQuerySchemaUtils.scala | 2 + .../test_vendor/test_name/jsonschema/1-0-0 | 14 ++ .../test_vendor/test_name/jsonschema/1-0-1 | 15 ++ .../AtomicDescriptor.scala | 78 +++++++++ .../processing/ProcessingSpec.scala | 155 +++++++++++++++++- 5 files changed, 259 insertions(+), 5 deletions(-) create mode 100644 modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 create mode 100644 modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index ea63d6a0..43180848 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -60,6 +60,8 @@ object BigQuerySchemaUtils { ddlField.fieldType match { case Type.Struct(nestedFields) => alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty + case Type.Array(Type.Struct(nestedFields), _) => + alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty case _ => false } diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 new file mode 100644 index 00000000..2a91f0b7 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "test_vendor", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "myString": {"type": "string"} + }, + "additionalProperties": false, + "type": "object" +} diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 new file mode 100644 index 00000000..f21ac304 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 @@ -0,0 +1,15 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "test_vendor", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "myString": {"type": "string"}, + "myInteger": {"type": "integer"} + }, + "additionalProperties": false, + "type": "object" +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala index 957ab5c6..f4514679 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala @@ -32,6 +32,46 @@ object AtomicDescriptor { fromDescriptorProtoBuilder(descriptorProto) } + def withTestUnstruct100: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testUnstruct.setNumber(2)) + .addNestedType(testNestedType100) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestUnstruct101: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testUnstruct.setNumber(2)) + .addNestedType(testNestedType101) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestContext100: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testContext.setNumber(2)) + .addNestedType(testNestedType100) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestContext101: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testContext.setNumber(2)) + .addNestedType(testNestedType101) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withAdClickContext: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, adClickContext.setNumber(2)) + .addNestedType(adClickEventNestedType) + fromDescriptorProtoBuilder(descriptorProto) + } + /** A table which has been altered to add the ad_click_event unstruct event column */ def withWebPageAndAdClick: Descriptors.Descriptor = { val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder @@ -65,22 +105,60 @@ object AtomicDescriptor { .setTypeName("web_page_1") .setName("unstruct_event_com_snowplowanalytics_snowplow_web_page_1") + private def testContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED) + .setTypeName("test_1") + .setName("contexts_test_vendor_test_name_1") + + private def testUnstruct: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setTypeName("test_1") + .setName("unstruct_event_test_vendor_test_name_1") + private def webPageId: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) .setName("id") + private def myString: DescriptorProtos.FieldDescriptorProto.Builder = + DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setName("my_string") + + private def myInteger: DescriptorProtos.FieldDescriptorProto.Builder = + DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .setName("my_integer") + private def webPageNestedType: DescriptorProtos.DescriptorProto.Builder = DescriptorProtos.DescriptorProto.newBuilder .addField(0, webPageId.setNumber(1)) .setName("web_page_1") + private def testNestedType100: DescriptorProtos.DescriptorProto.Builder = + DescriptorProtos.DescriptorProto.newBuilder + .addField(0, myString.setNumber(1)) + .setName("test_1") + + private def testNestedType101: DescriptorProtos.DescriptorProto.Builder = + DescriptorProtos.DescriptorProto.newBuilder + .addField(0, myString.setNumber(1)) + .addField(1, myInteger.setNumber(2)) + .setName("test_1") + private def adClickEvent: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) .setTypeName("ad_click_event_1") .setName("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1") + private def adClickContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED) + .setTypeName("ad_click_event_1") + .setName("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1") + private def adClickEventPercentProgress: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala index 16979859..c757d263 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala @@ -15,6 +15,7 @@ import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl import io.circe.literal._ import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName} +import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData} import java.nio.charset.StandardCharsets import java.nio.ByteBuffer @@ -22,7 +23,7 @@ import java.time.Instant import scala.concurrent.duration.DurationLong import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{UnstructEvent, unstructEventDecoder} +import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent, unstructEventDecoder} import com.snowplowanalytics.snowplow.bigquery.{AtomicDescriptor, MockEnvironment} import com.snowplowanalytics.snowplow.bigquery.MockEnvironment.{Action, Mocks} import com.snowplowanalytics.snowplow.runtime.HealthProbe @@ -36,7 +37,10 @@ class ProcessingSpec extends Specification with CatsEffect { Insert events to Bigquery and ack the events $e1 Emit BadRows when there are badly formatted events $e2 Write good batches and bad events when input contains both $e3 - Alter the Bigquery table when the writer's protobuf Descriptor has missing columns $e4 + Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct $e4_1 + Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $e4_2 + Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - unstruct $e4_3 + Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - contexts $e4_4 Skip altering the table when the writer's protobuf Descriptor has relevant self-describing entitiy columns $e5 Emit BadRows when the WriterProvider reports a problem with the data $e6 Recover when the WriterProvider reports a server-side schema mismatch $e7 @@ -105,7 +109,7 @@ class ProcessingSpec extends Specification with CatsEffect { } } - def e4 = { + def e4_1 = { val unstructEvent: UnstructEvent = json""" { "schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", @@ -153,6 +157,147 @@ class ProcessingSpec extends Specification with CatsEffect { } } + def e4_2 = { + val data = json"""{ "percentProgress": 50 }""" + val contexts = Contexts( + List( + SelfDescribingData( + SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0").toOption.get, + data + ) + ) + ) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField + .newBuilder( + "contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1", + StandardSQLTypeName.STRUCT, + FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING)) + ) + .setMode(BQField.Mode.REPEATED) + .build() + ) + ), + descriptors = List( + AtomicDescriptor.initial, + AtomicDescriptor.initial, + AtomicDescriptor.withAdClickContext + ) + ) + runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + + def e4_3 = { + val data = json"""{ "myInteger": 100 }""" + val unstruct = UnstructEvent( + Some(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data)) + ) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField.of( + "unstruct_event_test_vendor_test_name_1", + StandardSQLTypeName.STRUCT, + FieldList.of( + BQField.of("my_string", StandardSQLTypeName.STRING), + BQField.of("my_integer", StandardSQLTypeName.INT64) + ) + ) + ) + ), + descriptors = List( + AtomicDescriptor.withTestUnstruct100, + AtomicDescriptor.withTestUnstruct100, + AtomicDescriptor.withTestUnstruct101 + ) + ) + runTest(inputEvents(count = 1, good(ue = unstruct)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("unstruct_event_test_vendor_test_name_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + + def e4_4 = { + val data = json"""{ "myInteger": 100}""" + val contexts = Contexts(List(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data))) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField + .newBuilder( + "contexts_test_vendor_test_name_1", + StandardSQLTypeName.STRUCT, + FieldList.of( + BQField.of("my_string", StandardSQLTypeName.STRING), + BQField.of("my_integer", StandardSQLTypeName.INT64) + ) + ) + .setMode(BQField.Mode.REPEATED) + .build() + ) + ), + descriptors = List( + AtomicDescriptor.withTestContext100, + AtomicDescriptor.withTestContext100, + AtomicDescriptor.withTestContext101 + ) + ) + runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("contexts_test_vendor_test_name_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + def e5 = { val unstructEvent: UnstructEvent = json""" { @@ -350,14 +495,14 @@ object ProcessingSpec { .compile .toList - def good(ue: UnstructEvent = UnstructEvent(None)): IO[TokenedEvents] = + def good(ue: UnstructEvent = UnstructEvent(None), contexts: Contexts = Contexts(List.empty)): IO[TokenedEvents] = for { ack <- IO.unique eventId1 <- IO.randomUUID eventId2 <- IO.randomUUID collectorTstamp <- IO.realTimeInstant } yield { - val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue) + val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue).copy(contexts = contexts) val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") val serialized = Chunk(event1, event2).map { e => ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8))