diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala index 8bbe5897..7f8715bb 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala @@ -44,7 +44,7 @@ object BigQueryUtils { def lowerCaseMessage: String = Option(bqe.getError()) - .flatMap(e => Option(e.getReason)) + .flatMap(e => Option(e.getMessage)) .map(_.toLowerCase) .getOrElse("") diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index 7fdc68de..86eeb520 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._ trait TableManager[F[_]] { - def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult] + def addColumns(columns: Vector[Field]): F[Unit] def createTable: F[Unit] @@ -47,12 +47,6 @@ object TableManager { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - sealed trait AddColumnsResult - object AddColumnsResult { - case object Success extends AddColumnsResult - case class TooManyColumnsInTable(t: Throwable) extends AddColumnsResult - } - trait WithHandledErrors[F[_]] { def addColumns(columns: Vector[Field]): F[Unit] def createTable: F[Unit] @@ -81,19 +75,10 @@ object TableManager { BigQueryRetrying .withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) { Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *> - underlying.addColumns(columns) - } - .flatMap { - case AddColumnsResult.TooManyColumnsInTable(t) => - val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) - for { - _ <- Logger[F].error(t)(s"Could not alter table schema because of too many columns") - _ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), t)) - _ <- addingColumnsEnabled.set(false) - _ <- enableAfterDelay.start - } yield () - case _ => - Async[F].unit + underlying + .addColumns(columns) + .recoverWith(handleTooManyColumns(retries, monitoring, addingColumnsEnabled, columns)) + .onError(logOnRaceCondition) } case false => Async[F].unit @@ -110,7 +95,7 @@ object TableManager { client: BigQuery ): TableManager[F] = new TableManager[F] { - def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult] = + def addColumns(columns: Vector[Field]): F[Unit] = for { table <- Sync[F].blocking(client.getTable(config.dataset, config.table)) schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema) @@ -118,12 +103,10 @@ object TableManager { fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns)) schema <- Sync[F].pure(Schema.of(fields)) table <- Sync[F].pure(setTableSchema(table, schema)) - result <- Sync[F] - .blocking(table.update()) - .as[AddColumnsResult](AddColumnsResult.Success) - .recover(handleTooManyColumns) - .onError(logOnRaceCondition) - } yield result + _ <- Sync[F].blocking(table.update()) + // .recover(handleTooManyColumns) + // .onError(logOnRaceCondition) + } yield () def createTable: F[Unit] = { val tableInfo = atomicTableInfo(config) @@ -150,9 +133,20 @@ object TableManager { // Don't do anything else; the BigQueryRetrying will handle retries and logging the exception. } - private def handleTooManyColumns: PartialFunction[Throwable, AddColumnsResult] = { + private def handleTooManyColumns[F[_]: Async]( + retries: Config.Retries, + monitoring: Monitoring[F], + addingColumnsEnabled: Ref[F, Boolean], + columns: Vector[Field] + ): PartialFunction[Throwable, F[Unit]] = { case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") => - AddColumnsResult.TooManyColumnsInTable(bqe) + val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) + for { + _ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns") + _ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), bqe)) + _ <- addingColumnsEnabled.set(false) + _ <- enableAfterDelay.start + } yield () } private def showColumns(columns: Vector[Field]): String = diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AlertSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AlertSpec.scala new file mode 100644 index 00000000..885c6939 --- /dev/null +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AlertSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow + * Limited Use License Agreement, Version 1.0 located at + * https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR + * DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.bigquery + +import org.specs2.Specification + +class AlertSpec extends Specification { + + def is = s2""" + An Alert should: + Generate a message describing an exception and any nested cause $e1 + Collapse messages if a cause's message contains the parent exception's message $e2 + Collapse messages if a parent exception's message contains the cause's message $e3 + """ + + def e1 = { + val e1 = new RuntimeException("original cause") + val e2 = new RuntimeException("middle cause", e1) + val e3 = new RuntimeException("final error", e2) + + val alert = Alert.FailedToCreateEventsTable(e3) + + val expected = "Failed to create events table: final error: middle cause: original cause" + + Alert.getMessage(alert) must beEqualTo(expected) + } + + def e2 = { + val e1 = new RuntimeException("This happened: original cause") + val e2 = new RuntimeException("original cause", e1) + + val alert = Alert.FailedToCreateEventsTable(e2) + + val expected = "Failed to create events table: This happened: original cause" + + Alert.getMessage(alert) must beEqualTo(expected) + } + + def e3 = { + val e1 = new RuntimeException("original cause") + val e2 = new RuntimeException("This happened: original cause", e1) + + val alert = Alert.FailedToCreateEventsTable(e2) + + val expected = "Failed to create events table: This happened: original cause" + + Alert.getMessage(alert) must beEqualTo(expected) + } +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala index 96a5c65f..b4772729 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala @@ -16,6 +16,7 @@ import cats.effect.testkit.TestControl import com.google.api.gax.rpc.PermissionDeniedException import com.google.api.gax.grpc.GrpcStatusCode import io.grpc.Status +import com.google.cloud.bigquery.{BigQueryError, BigQueryException} import scala.concurrent.duration.{DurationLong, FiniteDuration} @@ -211,7 +212,9 @@ class TableManagerSpec extends Specification with CatsEffect { } def e6 = { - val mocks = List(Response.Success(TableManager.AddColumnsResult.TooManyColumnsInTable(new RuntimeException("Too many columns")))) + val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) control(Mocks(addColumnsResults = mocks)).flatMap { c => val testFields1 = Vector( Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), @@ -249,7 +252,9 @@ class TableManagerSpec extends Specification with CatsEffect { } def e7 = { - val mocks = List(Response.Success(TableManager.AddColumnsResult.TooManyColumnsInTable(new RuntimeException("Too many columns")))) + val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) control(Mocks(addColumnsResults = mocks)).flatMap { c => val testFields1 = Vector( Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), @@ -315,13 +320,13 @@ object TableManagerSpec { case class SentAlert(timeSentSeconds: Long) extends Action } - sealed trait Response[+A] + sealed trait Response object Response { - final case class Success[A](value: A) extends Response[A] - final case class ExceptionThrown(value: Throwable) extends Response[Nothing] + case object Success extends Response + final case class ExceptionThrown(value: Throwable) extends Response } - case class Mocks(addColumnsResults: List[Response[TableManager.AddColumnsResult]]) + case class Mocks(addColumnsResults: List[Response]) case class Control( state: Ref[IO, Vector[Action]], @@ -356,20 +361,20 @@ object TableManagerSpec { AppHealth.init(10.seconds, healthySource, everythingHealthy) } - private def testTableManager(state: Ref[IO, Vector[Action]], mocks: List[Response[TableManager.AddColumnsResult]]): IO[TableManager[IO]] = + private def testTableManager(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[TableManager[IO]] = for { mocksRef <- Ref[IO].of(mocks) } yield new TableManager[IO] { - def addColumns(columns: Vector[Field]): IO[TableManager.AddColumnsResult] = + def addColumns(columns: Vector[Field]): IO[Unit] = for { response <- mocksRef.modify { case head :: tail => (tail, head) - case Nil => (Nil, Response.Success(TableManager.AddColumnsResult.Success)) + case Nil => (Nil, Response.Success) } _ <- state.update(_ :+ Action.AddColumnsAttempted(columns)) result <- response match { - case success: Response.Success[TableManager.AddColumnsResult] => - IO.pure(success.value) + case Response.Success => + IO.unit case Response.ExceptionThrown(ex) => IO.raiseError(ex).adaptError { t => t.setStackTrace(Array()) // don't clutter our test logs