From c65cee4b57e6dcb2b21d9eeef0e77f89b1c002bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Tue, 30 Apr 2024 09:36:09 +0200 Subject: [PATCH] Handle too many columns - part 2 We already handle one type of exception for too many columns case with message like: ```Too many columns (10067) in schema. Only 10000 columns are allowed``` It turns out BQ can fail with different one in this case, like: ```Too many total leaf fields: 10001, max allowed field count: 10000``` so we have to handle it as well. --- .../processing/TableManager.scala | 4 +- .../processing/TableManagerSpec.scala | 93 ++++++++++++++++++- project/plugins.sbt | 2 +- 3 files changed, 92 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index 1804b2f7..207732f2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -134,7 +134,9 @@ object TableManager { addingColumnsEnabled: Ref[F, Boolean], columns: Vector[Field] ): PartialFunction[Throwable, F[Unit]] = { - case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") => + case bqe: BigQueryException + if bqe.lowerCaseReason === "invalid" && (bqe.lowerCaseMessage + .startsWith("too many columns") || bqe.lowerCaseMessage.startsWith("too many total leaf fields")) => val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) for { _ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns") diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala index b4772729..b85f3b49 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala @@ -32,12 +32,14 @@ class TableManagerSpec extends Specification with CatsEffect { def is = s2""" The table manager Add columns when told to $e1 - Retry adding columns and send alerts when there is a setup exceptionr $e2 + Retry adding columns and send alerts when there is a setup exception $e2 Retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e3 Become healthy after recovering from an earlier setup error $e4 Become healthy after recovering from an earlier transient error $e5 - Disable future attempts to add columns if the table has too many columns $e6 - Eventually re-enable future attempts to add columns if the table has too many columns $e7 + Disable future attempts to add columns if the table has too many columns - exception type 1 $e6_1 + Disable future attempts to add columns if the table has too many columns - exception type 2 $e6_2 + Eventually re-enable future attempts to add columns if the table has too many columns - exception type 1 $e7_1 + Eventually re-enable future attempts to add columns if the table has too many columns - exception type 2 $e7_2 """ def e1 = control().flatMap { c => @@ -211,7 +213,7 @@ class TableManagerSpec extends Specification with CatsEffect { } } - def e6 = { + def e6_1 = { val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") val exception = new BigQueryException(400, error.getMessage, error) val mocks = List(Response.ExceptionThrown(exception)) @@ -251,7 +253,47 @@ class TableManagerSpec extends Specification with CatsEffect { } } - def e7 = { + def e6_2 = { + val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) + control(Mocks(addColumnsResults = mocks)).flatMap { c => + val testFields1 = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val testFields2 = Vector( + Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")), + Field("f4", Type.Integer, Type.Nullability.Required, Set("f4")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields1) + _ <- IO.sleep(10.seconds) + _ <- tableManager.addColumns(testFields2) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields1), + Action.SentAlert(0L) + ) + + val test = for { + _ <- io + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beHealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + + def e7_1 = { val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.") val exception = new BigQueryException(400, error.getMessage, error) val mocks = List(Response.ExceptionThrown(exception)) @@ -292,6 +334,47 @@ class TableManagerSpec extends Specification with CatsEffect { } } + def e7_2 = { + val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000") + val exception = new BigQueryException(400, error.getMessage, error) + val mocks = List(Response.ExceptionThrown(exception)) + control(Mocks(addColumnsResults = mocks)).flatMap { c => + val testFields1 = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val testFields2 = Vector( + Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")), + Field("f4", Type.Integer, Type.Nullability.Required, Set("f4")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields1) + _ <- IO.sleep(310.seconds) + _ <- tableManager.addColumns(testFields2) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields1), + Action.SentAlert(0L), + Action.AddColumnsAttempted(testFields2) + ) + + val test = for { + _ <- io + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beHealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + /** Convenience matchers for health probe * */ def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => diff --git a/project/plugins.sbt b/project/plugins.sbt index 992fe572..f352ec6d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1") +addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.2") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")