diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index a0800afe..526e9cda 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -86,6 +86,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 1e9b890a..c47abcf9 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -108,6 +108,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 1bd57717..993dbcda 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -88,6 +88,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 26bba26b..3f1e1475 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -35,6 +35,9 @@ "alterTableWait": { "delay": "1 second" } + "tooManyColumns": { + "delay": "300 seconds" + } } "skipSchemas": [] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala index e36e3f4c..f8aaf034 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala @@ -52,16 +52,26 @@ object Alert { } private implicit def throwableShow: Show[Throwable] = { - def go(acc: List[String], next: Throwable): String = { - val nextMessage = Option(next.getMessage) - val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } - Option(next.getCause) match { - case Some(cause) => go(msgs, cause) - case None => msgs.reverse.mkString(": ") + def accumulateMessages(t: Throwable): List[String] = { + val nextMessage = Option(t.getMessage) + Option(t.getCause) match { + case Some(cause) => nextMessage.toList ::: accumulateMessages(cause) + case None => nextMessage.toList } } - Show.show(go(Nil, _)) + Show.show { t => + removeDuplicateMessages(accumulateMessages(t)).mkString(": ") + } } + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala index 738fb0f1..ebc75b53 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala @@ -86,11 +86,13 @@ object Config { case class SetupErrorRetries(delay: FiniteDuration) case class AlterTableWaitRetries(delay: FiniteDuration) case class TransientErrorRetries(delay: FiniteDuration, attempts: Int) + case class TooManyColumnsRetries(delay: FiniteDuration) case class Retries( setupErrors: SetupErrorRetries, transientErrors: TransientErrorRetries, - alterTableWait: AlterTableWaitRetries + alterTableWait: AlterTableWaitRetries, + tooManyColumns: TooManyColumnsRetries ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -119,6 +121,7 @@ object Config { implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries] implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries] implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries] + implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] // TODO add bigquery docs diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala index ff33a2b7..a3c2b7e7 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala @@ -12,12 +12,14 @@ import cats.Applicative import cats.effect.Sync import cats.implicits._ import com.google.api.gax.rpc.PermissionDeniedException +import com.google.cloud.bigquery.BigQueryException import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry._ import retry.implicits.retrySyntaxError import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring} +import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax object BigQueryRetrying { @@ -54,7 +56,7 @@ object BigQueryRetrying { ) private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match { - case BigQueryUtils.BQExceptionWithLowerCaseReason("notfound" | "accessdenied") => + case bqe: BigQueryException if Set("notfound", "accessdenied").contains(bqe.lowerCaseReason) => true.pure[F] case _: PermissionDeniedException => true.pure[F] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index 8b06148a..4d90d2e9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -16,8 +16,8 @@ import scala.jdk.CollectionConverters._ object BigQuerySchemaUtils { - def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Boolean = - ddlFields.exists { field => + def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] = + ddlFields.filter { field => Option(tableDescriptor.findFieldByName(field.name)) match { case Some(fieldDescriptor) => val nullableMismatch = fieldDescriptor.isRequired && field.nullability.nullable @@ -32,7 +32,7 @@ object BigQuerySchemaUtils { case Descriptors.FieldDescriptor.Type.MESSAGE => ddlField.fieldType match { case Type.Struct(nestedFields) => - alterTableRequired(tableField.getMessageType, nestedFields) + alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty case _ => false } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala index d37e7f0a..8bbe5897 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala @@ -35,13 +35,18 @@ object BigQueryUtils { def streamIdOf(config: Config.BigQuery): String = tableIdOf(config).getIAMResourceName + "/streams/_default" - object BQExceptionWithLowerCaseReason { - def unapply(bqe: BigQueryException): Option[(String)] = - Option(bqe.getError()) match { - case Some(bqError) => - Some(bqError.getReason.toLowerCase) - case None => - None - } + implicit class BQExceptionSyntax(val bqe: BigQueryException) extends AnyVal { + def lowerCaseReason: String = + Option(bqe.getError()) + .flatMap(e => Option(e.getReason)) + .map(_.toLowerCase) + .getOrElse("") + + def lowerCaseMessage: String = + Option(bqe.getError()) + .flatMap(e => Option(e.getReason)) + .map(_.toLowerCase) + .getOrElse("") + } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index f2269e88..f66da7e9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -297,8 +297,9 @@ object Processing { val fields = batch.entities.fields.flatMap { tte => tte.mergedField :: tte.recoveries.map(_._2) } - if (BigQuerySchemaUtils.alterTableRequired(descriptor, fields)) { - env.tableManager.addColumns(fields) *> env.writer.closed.use_ + val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields) + if (fieldsToAdd.nonEmpty) { + env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_ } else { Sync[F].unit } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index cc6f65ce..0350f4dc 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -10,7 +10,8 @@ package com.snowplowanalytics.snowplow.bigquery.processing import cats.Show import cats.implicits._ -import cats.effect.{Async, Sync} +import cats.effect.implicits._ +import cats.effect.{Async, Ref, Sync} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import com.google.cloud.bigquery.{ @@ -25,10 +26,12 @@ import com.google.cloud.bigquery.{ TimePartitioning } import com.google.auth.Credentials +import com.google.cloud.bigquery.BigQueryException import com.snowplowanalytics.iglu.schemaddl.parquet.Field import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring} +import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax import scala.jdk.CollectionConverters._ @@ -44,6 +47,12 @@ object TableManager { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + private sealed trait AddColumnsResult + private object AddColumnsResult { + case object Success extends AddColumnsResult + case class TooManyColumnsInTable(t: Throwable) extends AddColumnsResult + } + def make[F[_]: Async]( config: Config.BigQuery, retries: Config.Retries, @@ -53,20 +62,40 @@ object TableManager { ): F[TableManager[F]] = for { client <- Sync[F].delay(BigQueryOptions.newBuilder.setCredentials(credentials).build.getService) - } yield impl(config, retries, client, appHealth, monitoring) + addingColumnsEnabled <- Ref[F].of[Boolean](true) + } yield impl(config, retries, client, appHealth, monitoring, addingColumnsEnabled) private def impl[F[_]: Async]( config: Config.BigQuery, retries: Config.Retries, client: BigQuery, appHealth: AppHealth[F], - monitoring: Monitoring[F] + monitoring: Monitoring[F], + addingColumnsEnabled: Ref[F, Boolean] ): TableManager[F] = new TableManager[F] { def addColumns(columns: Vector[Field]): F[Unit] = - BigQueryRetrying.withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) { - Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *> - addColumnsImpl(config, client, columns) + addingColumnsEnabled.get.flatMap { + case true => + BigQueryRetrying + .withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) { + Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *> + addColumnsImpl(config, client, columns) + } + .flatMap { + case AddColumnsResult.TooManyColumnsInTable(t) => + val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) + for { + _ <- Logger[F].error(t)(s"Could not alter table schema because of too many columns") + _ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), t)) + _ <- addingColumnsEnabled.set(false) + _ <- enableAfterDelay.start + } yield () + case _ => + Async[F].unit + } + case false => + Async[F].unit } def createTable: F[Unit] = @@ -77,10 +106,10 @@ object TableManager { .blocking(client.create(tableInfo)) .void .recoverWith { - case bqe @ BigQueryUtils.BQExceptionWithLowerCaseReason("duplicate") => + case bqe: BigQueryException if bqe.lowerCaseReason === "duplicate" => // Table already exists Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}") - case BigQueryUtils.BQExceptionWithLowerCaseReason("accessdenied") => + case bqe: BigQueryException if bqe.lowerCaseReason === "accessdenied" => Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.") } } @@ -90,7 +119,7 @@ object TableManager { config: Config.BigQuery, client: BigQuery, columns: Vector[Field] - ): F[Unit] = + ): F[AddColumnsResult] = for { table <- Sync[F].blocking(client.getTable(config.dataset, config.table)) schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema) @@ -98,21 +127,27 @@ object TableManager { fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns)) schema <- Sync[F].pure(Schema.of(fields)) table <- Sync[F].pure(setTableSchema(table, schema)) - _ <- Sync[F] - .blocking(table.update()) - .void - .onError(logOnRaceCondition) - } yield () + result <- Sync[F] + .blocking(table.update()) + .as[AddColumnsResult](AddColumnsResult.Success) + .recover(handleTooManyColumns) + .onError(logOnRaceCondition) + } yield result private def setTableSchema(table: Table, schema: Schema): Table = table.toBuilder().setDefinition(StandardTableDefinition.of(schema)).build() private def logOnRaceCondition[F[_]: Sync]: PartialFunction[Throwable, F[Unit]] = { - case BigQueryUtils.BQExceptionWithLowerCaseReason("invalid") => + case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" => Logger[F].warn(s"Caught known exception which probably means another loader has already altered the table.") // Don't do anything else; the BigQueryRetrying will handle retries and logging the exception. } + private def handleTooManyColumns: PartialFunction[Throwable, AddColumnsResult] = { + case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") => + AddColumnsResult.TooManyColumnsInTable(bqe) + } + private def showColumns(columns: Vector[Field]): String = columns.map(_.name).mkString(", ") diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala index 48a5ccba..c5b5cac9 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala @@ -217,6 +217,7 @@ object MockEnvironment { def retriesConfig = Config.Retries( Config.SetupErrorRetries(30.seconds), Config.TransientErrorRetries(1.second, 5), - Config.AlterTableWaitRetries(1.second) + Config.AlterTableWaitRetries(1.second), + Config.TooManyColumnsRetries(300.seconds) ) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala index 122a1aad..4203dc72 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala @@ -318,7 +318,8 @@ object WriterProviderSpec { def retriesConfig = Config.Retries( Config.SetupErrorRetries(30.seconds), Config.TransientErrorRetries(1.second, 5), - Config.AlterTableWaitRetries(1.second) + Config.AlterTableWaitRetries(1.second), + Config.TooManyColumnsRetries(300.seconds) ) def control: IO[Control] =