Skip to content

Commit

Permalink
handle-exceed-column-limit amendments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 2, 2024
1 parent 8642aaf commit 289e85f
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object BigQueryUtils {

def lowerCaseMessage: String =
Option(bqe.getError())
.flatMap(e => Option(e.getReason))
.flatMap(e => Option(e.getMessage))
.map(_.toLowerCase)
.getOrElse("")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._

trait TableManager[F[_]] {

def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult]
def addColumns(columns: Vector[Field]): F[Unit]

def createTable: F[Unit]

Expand All @@ -47,12 +47,6 @@ object TableManager {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

sealed trait AddColumnsResult
object AddColumnsResult {
case object Success extends AddColumnsResult
case class TooManyColumnsInTable(t: Throwable) extends AddColumnsResult
}

trait WithHandledErrors[F[_]] {
def addColumns(columns: Vector[Field]): F[Unit]
def createTable: F[Unit]
Expand Down Expand Up @@ -81,19 +75,10 @@ object TableManager {
BigQueryRetrying
.withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) {
Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *>
underlying.addColumns(columns)
}
.flatMap {
case AddColumnsResult.TooManyColumnsInTable(t) =>
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(t)(s"Could not alter table schema because of too many columns")
_ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), t))
_ <- addingColumnsEnabled.set(false)
_ <- enableAfterDelay.start
} yield ()
case _ =>
Async[F].unit
underlying
.addColumns(columns)
.recoverWith(handleTooManyColumns(retries, monitoring, addingColumnsEnabled, columns))
.onError(logOnRaceCondition)
}
case false =>
Async[F].unit
Expand All @@ -110,20 +95,18 @@ object TableManager {
client: BigQuery
): TableManager[F] = new TableManager[F] {

def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult] =
def addColumns(columns: Vector[Field]): F[Unit] =
for {
table <- Sync[F].blocking(client.getTable(config.dataset, config.table))
schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema)
fields <- Sync[F].pure(schema.getFields)
fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns))
schema <- Sync[F].pure(Schema.of(fields))
table <- Sync[F].pure(setTableSchema(table, schema))
result <- Sync[F]
.blocking(table.update())
.as[AddColumnsResult](AddColumnsResult.Success)
.recover(handleTooManyColumns)
.onError(logOnRaceCondition)
} yield result
_ <- Sync[F].blocking(table.update())
// .recover(handleTooManyColumns)
// .onError(logOnRaceCondition)
} yield ()

def createTable: F[Unit] = {
val tableInfo = atomicTableInfo(config)
Expand All @@ -150,9 +133,20 @@ object TableManager {
// Don't do anything else; the BigQueryRetrying will handle retries and logging the exception.
}

private def handleTooManyColumns: PartialFunction[Throwable, AddColumnsResult] = {
private def handleTooManyColumns[F[_]: Async](
retries: Config.Retries,
monitoring: Monitoring[F],
addingColumnsEnabled: Ref[F, Boolean],
columns: Vector[Field]
): PartialFunction[Throwable, F[Unit]] = {
case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") =>
AddColumnsResult.TooManyColumnsInTable(bqe)
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns")
_ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), bqe))
_ <- addingColumnsEnabled.set(false)
_ <- enableAfterDelay.start
} yield ()
}

private def showColumns(columns: Vector[Field]): String =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow
* Limited Use License Agreement, Version 1.0 located at
* https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR
* DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.bigquery

import org.specs2.Specification

class AlertSpec extends Specification {

def is = s2"""
An Alert should:
Generate a message describing an exception and any nested cause $e1
Collapse messages if a cause's message contains the parent exception's message $e2
Collapse messages if a parent exception's message contains the cause's message $e3
"""

def e1 = {
val e1 = new RuntimeException("original cause")
val e2 = new RuntimeException("middle cause", e1)
val e3 = new RuntimeException("final error", e2)

val alert = Alert.FailedToCreateEventsTable(e3)

val expected = "Failed to create events table: final error: middle cause: original cause"

Alert.getMessage(alert) must beEqualTo(expected)
}

def e2 = {
val e1 = new RuntimeException("This happened: original cause")
val e2 = new RuntimeException("original cause", e1)

val alert = Alert.FailedToCreateEventsTable(e2)

val expected = "Failed to create events table: This happened: original cause"

Alert.getMessage(alert) must beEqualTo(expected)
}

def e3 = {
val e1 = new RuntimeException("original cause")
val e2 = new RuntimeException("This happened: original cause", e1)

val alert = Alert.FailedToCreateEventsTable(e2)

val expected = "Failed to create events table: This happened: original cause"

Alert.getMessage(alert) must beEqualTo(expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.effect.testkit.TestControl
import com.google.api.gax.rpc.PermissionDeniedException
import com.google.api.gax.grpc.GrpcStatusCode
import io.grpc.Status
import com.google.cloud.bigquery.{BigQueryError, BigQueryException}

import scala.concurrent.duration.{DurationLong, FiniteDuration}

Expand Down Expand Up @@ -211,7 +212,9 @@ class TableManagerSpec extends Specification with CatsEffect {
}

def e6 = {
val mocks = List(Response.Success(TableManager.AddColumnsResult.TooManyColumnsInTable(new RuntimeException("Too many columns"))))
val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
control(Mocks(addColumnsResults = mocks)).flatMap { c =>
val testFields1 = Vector(
Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")),
Expand Down Expand Up @@ -249,7 +252,9 @@ class TableManagerSpec extends Specification with CatsEffect {
}

def e7 = {
val mocks = List(Response.Success(TableManager.AddColumnsResult.TooManyColumnsInTable(new RuntimeException("Too many columns"))))
val error = new BigQueryError("invalid", "global", "Too many columns (10067) in schema. Only 10000 columns are allowed.")
val exception = new BigQueryException(400, error.getMessage, error)
val mocks = List(Response.ExceptionThrown(exception))
control(Mocks(addColumnsResults = mocks)).flatMap { c =>
val testFields1 = Vector(
Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")),
Expand Down Expand Up @@ -315,13 +320,13 @@ object TableManagerSpec {
case class SentAlert(timeSentSeconds: Long) extends Action
}

sealed trait Response[+A]
sealed trait Response
object Response {
final case class Success[A](value: A) extends Response[A]
final case class ExceptionThrown(value: Throwable) extends Response[Nothing]
case object Success extends Response
final case class ExceptionThrown(value: Throwable) extends Response
}

case class Mocks(addColumnsResults: List[Response[TableManager.AddColumnsResult]])
case class Mocks(addColumnsResults: List[Response])

case class Control(
state: Ref[IO, Vector[Action]],
Expand Down Expand Up @@ -356,20 +361,20 @@ object TableManagerSpec {
AppHealth.init(10.seconds, healthySource, everythingHealthy)
}

private def testTableManager(state: Ref[IO, Vector[Action]], mocks: List[Response[TableManager.AddColumnsResult]]): IO[TableManager[IO]] =
private def testTableManager(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[TableManager[IO]] =
for {
mocksRef <- Ref[IO].of(mocks)
} yield new TableManager[IO] {
def addColumns(columns: Vector[Field]): IO[TableManager.AddColumnsResult] =
def addColumns(columns: Vector[Field]): IO[Unit] =
for {
response <- mocksRef.modify {
case head :: tail => (tail, head)
case Nil => (Nil, Response.Success(TableManager.AddColumnsResult.Success))
case Nil => (Nil, Response.Success)
}
_ <- state.update(_ :+ Action.AddColumnsAttempted(columns))
result <- response match {
case success: Response.Success[TableManager.AddColumnsResult] =>
IO.pure(success.value)
case Response.Success =>
IO.unit
case Response.ExceptionThrown(ex) =>
IO.raiseError(ex).adaptError { t =>
t.setStackTrace(Array()) // don't clutter our test logs
Expand Down

0 comments on commit 289e85f

Please sign in to comment.