Skip to content

Commit

Permalink
Handle too many columns - part 2
Browse files Browse the repository at this point in the history
We already handle one type of exception for too many columns case with
message like:

```Too many columns (10067) in schema. Only 10000 columns are allowed```

It turns out BQ can fail with different one in this case, like:

```Too many total leaf fields: 10001, max allowed field count: 10000```

so we have to handle it as well.
  • Loading branch information
pondzix committed Apr 30, 2024
1 parent fd9d253 commit c65cee4
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ object TableManager {
addingColumnsEnabled: Ref[F, Boolean],
columns: Vector[Field]
): PartialFunction[Throwable, F[Unit]] = {
case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") =>
case bqe: BigQueryException
if bqe.lowerCaseReason === "invalid" && (bqe.lowerCaseMessage
.startsWith("too many columns") || bqe.lowerCaseMessage.startsWith("too many total leaf fields")) =>
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ class TableManagerSpec extends Specification with CatsEffect {
def is = s2"""
The table manager
Add columns when told to $e1
Retry adding columns and send alerts when there is a setup exceptionr $e2
Retry adding columns and send alerts when there is a setup exception $e2
Retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e3
Become healthy after recovering from an earlier setup error $e4
Become healthy after recovering from an earlier transient error $e5
Disable future attempts to add columns if the table has too many columns $e6
Eventually re-enable future attempts to add columns if the table has too many columns $e7
Disable future attempts to add columns if the table has too many columns - exception type 1 $e6_1
Disable future attempts to add columns if the table has too many columns - exception type 2 $e6_2
Eventually re-enable future attempts to add columns if the table has too many columns - exception type 1 $e7_1
Eventually re-enable future attempts to add columns if the table has too many columns - exception type 2 $e7_2
"""

def e1 = control().flatMap { c =>
Expand Down Expand Up @@ -211,7 +213,7 @@ class TableManagerSpec extends Specification with CatsEffect {
}
}

def e6 = {
def e6_1 = {
val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
Expand Down Expand Up @@ -251,7 +253,47 @@ class TableManagerSpec extends Specification with CatsEffect {
}
}

def e7 = {
def e6_2 = {
val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
control(Mocks(addColumnsResults = mocks)).flatMap { c =>
val testFields1 = Vector(
Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")),
Field("f2", Type.Integer, Type.Nullability.Required, Set("f2"))
)

val testFields2 = Vector(
Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")),
Field("f4", Type.Integer, Type.Nullability.Required, Set("f4"))
)

val io = for {
tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring)
_ <- tableManager.addColumns(testFields1)
_ <- IO.sleep(10.seconds)
_ <- tableManager.addColumns(testFields2)
} yield ()

val expected = Vector(
Action.AddColumnsAttempted(testFields1),
Action.SentAlert(0L)
)

val test = for {
_ <- io
state <- c.state.get
health <- c.appHealth.status
} yield List(
state should beEqualTo(expected),
health should beHealthy
).reduce(_ and _)

TestControl.executeEmbed(test)
}
}

def e7_1 = {
val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
Expand Down Expand Up @@ -292,6 +334,47 @@ class TableManagerSpec extends Specification with CatsEffect {
}
}

def e7_2 = {
val error = new BigQueryError("invalid", "global", "Too many total leaf fields: 10001, max allowed field count: 10000")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
control(Mocks(addColumnsResults = mocks)).flatMap { c =>
val testFields1 = Vector(
Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")),
Field("f2", Type.Integer, Type.Nullability.Required, Set("f2"))
)

val testFields2 = Vector(
Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")),
Field("f4", Type.Integer, Type.Nullability.Required, Set("f4"))
)

val io = for {
tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring)
_ <- tableManager.addColumns(testFields1)
_ <- IO.sleep(310.seconds)
_ <- tableManager.addColumns(testFields2)
} yield ()

val expected = Vector(
Action.AddColumnsAttempted(testFields1),
Action.SentAlert(0L),
Action.AddColumnsAttempted(testFields2)
)

val test = for {
_ <- io
state <- c.state.get
health <- c.appHealth.status
} yield List(
state should beEqualTo(expected),
health should beHealthy
).reduce(_ and _)

TestControl.executeEmbed(test)
}
}

/** Convenience matchers for health probe * */

def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) =>
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1")
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.2")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")

0 comments on commit c65cee4

Please sign in to comment.