Skip to content

Commit

Permalink
Stay healthy if BigQuery table exceeds column limit
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Mar 31, 2024
1 parent b16153f commit 35c3aee
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 39 deletions.
8 changes: 8 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
8 changes: 8 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
8 changes: 8 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
3 changes: 3 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"alterTableWait": {
"delay": "1 second"
}
"tooManyColumns": {
"delay": "300 seconds"
}
}

"skipSchemas": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@ object Alert {
}

private implicit def throwableShow: Show[Throwable] = {
def go(acc: List[String], next: Throwable): String = {
val nextMessage = Option(next.getMessage)
val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

Option(next.getCause) match {
case Some(cause) => go(msgs, cause)
case None => msgs.reverse.mkString(": ")
def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = Option(t.getMessage)
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show(go(Nil, _))
Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ object Config {
case class SetupErrorRetries(delay: FiniteDuration)
case class AlterTableWaitRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)
case class TooManyColumnsRetries(delay: FiniteDuration)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries,
alterTableWait: AlterTableWaitRetries
alterTableWait: AlterTableWaitRetries,
tooManyColumns: TooManyColumnsRetries
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand Down Expand Up @@ -119,6 +121,7 @@ object Config {
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

// TODO add bigquery docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import cats.Applicative
import cats.effect.Sync
import cats.implicits._
import com.google.api.gax.rpc.PermissionDeniedException
import com.google.cloud.bigquery.BigQueryException
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
import retry.implicits.retrySyntaxError

import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring}
import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax

object BigQueryRetrying {

Expand Down Expand Up @@ -54,7 +56,7 @@ object BigQueryRetrying {
)

private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match {
case BigQueryUtils.BQExceptionWithLowerCaseReason("notfound" | "accessdenied") =>
case bqe: BigQueryException if Set("notfound", "accessdenied").contains(bqe.lowerCaseReason) =>
true.pure[F]
case _: PermissionDeniedException =>
true.pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import scala.jdk.CollectionConverters._

object BigQuerySchemaUtils {

def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Boolean =
ddlFields.exists { field =>
def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] =
ddlFields.filter { field =>
Option(tableDescriptor.findFieldByName(field.name)) match {
case Some(fieldDescriptor) =>
val nullableMismatch = fieldDescriptor.isRequired && field.nullability.nullable
Expand All @@ -32,7 +32,7 @@ object BigQuerySchemaUtils {
case Descriptors.FieldDescriptor.Type.MESSAGE =>
ddlField.fieldType match {
case Type.Struct(nestedFields) =>
alterTableRequired(tableField.getMessageType, nestedFields)
alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ object BigQueryUtils {
def streamIdOf(config: Config.BigQuery): String =
tableIdOf(config).getIAMResourceName + "/streams/_default"

object BQExceptionWithLowerCaseReason {
def unapply(bqe: BigQueryException): Option[(String)] =
Option(bqe.getError()) match {
case Some(bqError) =>
Some(bqError.getReason.toLowerCase)
case None =>
None
}
implicit class BQExceptionSyntax(val bqe: BigQueryException) extends AnyVal {
def lowerCaseReason: String =
Option(bqe.getError())
.flatMap(e => Option(e.getReason))
.map(_.toLowerCase)
.getOrElse("")

def lowerCaseMessage: String =
Option(bqe.getError())
.flatMap(e => Option(e.getReason))
.map(_.toLowerCase)
.getOrElse("")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ object Processing {
val fields = batch.entities.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
if (BigQuerySchemaUtils.alterTableRequired(descriptor, fields)) {
env.tableManager.addColumns(fields) *> env.writer.closed.use_
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_
} else {
Sync[F].unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ package com.snowplowanalytics.snowplow.bigquery.processing

import cats.Show
import cats.implicits._
import cats.effect.{Async, Sync}
import cats.effect.implicits._
import cats.effect.{Async, Ref, Sync}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import com.google.cloud.bigquery.{
Expand All @@ -25,10 +26,12 @@ import com.google.cloud.bigquery.{
TimePartitioning
}
import com.google.auth.Credentials
import com.google.cloud.bigquery.BigQueryException

import com.snowplowanalytics.iglu.schemaddl.parquet.Field
import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields
import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring}
import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax

import scala.jdk.CollectionConverters._

Expand All @@ -44,6 +47,12 @@ object TableManager {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

private sealed trait AddColumnsResult
private object AddColumnsResult {
case object Success extends AddColumnsResult
case class TooManyColumnsInTable(t: Throwable) extends AddColumnsResult
}

def make[F[_]: Async](
config: Config.BigQuery,
retries: Config.Retries,
Expand All @@ -53,20 +62,40 @@ object TableManager {
): F[TableManager[F]] =
for {
client <- Sync[F].delay(BigQueryOptions.newBuilder.setCredentials(credentials).build.getService)
} yield impl(config, retries, client, appHealth, monitoring)
addingColumnsEnabled <- Ref[F].of[Boolean](true)
} yield impl(config, retries, client, appHealth, monitoring, addingColumnsEnabled)

private def impl[F[_]: Async](
config: Config.BigQuery,
retries: Config.Retries,
client: BigQuery,
appHealth: AppHealth[F],
monitoring: Monitoring[F]
monitoring: Monitoring[F],
addingColumnsEnabled: Ref[F, Boolean]
): TableManager[F] = new TableManager[F] {

def addColumns(columns: Vector[Field]): F[Unit] =
BigQueryRetrying.withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) {
Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *>
addColumnsImpl(config, client, columns)
addingColumnsEnabled.get.flatMap {
case true =>
BigQueryRetrying
.withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) {
Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *>
addColumnsImpl(config, client, columns)
}
.flatMap {
case AddColumnsResult.TooManyColumnsInTable(t) =>
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(t)(s"Could not alter table schema because of too many columns")
_ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), t))
_ <- addingColumnsEnabled.set(false)
_ <- enableAfterDelay.start
} yield ()
case _ =>
Async[F].unit
}
case false =>
Async[F].unit
}

def createTable: F[Unit] =
Expand All @@ -77,10 +106,10 @@ object TableManager {
.blocking(client.create(tableInfo))
.void
.recoverWith {
case bqe @ BigQueryUtils.BQExceptionWithLowerCaseReason("duplicate") =>
case bqe: BigQueryException if bqe.lowerCaseReason === "duplicate" =>
// Table already exists
Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}")
case BigQueryUtils.BQExceptionWithLowerCaseReason("accessdenied") =>
case bqe: BigQueryException if bqe.lowerCaseReason === "accessdenied" =>
Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.")
}
}
Expand All @@ -90,29 +119,35 @@ object TableManager {
config: Config.BigQuery,
client: BigQuery,
columns: Vector[Field]
): F[Unit] =
): F[AddColumnsResult] =
for {
table <- Sync[F].blocking(client.getTable(config.dataset, config.table))
schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema)
fields <- Sync[F].pure(schema.getFields)
fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns))
schema <- Sync[F].pure(Schema.of(fields))
table <- Sync[F].pure(setTableSchema(table, schema))
_ <- Sync[F]
.blocking(table.update())
.void
.onError(logOnRaceCondition)
} yield ()
result <- Sync[F]
.blocking(table.update())
.as[AddColumnsResult](AddColumnsResult.Success)
.recover(handleTooManyColumns)
.onError(logOnRaceCondition)
} yield result

private def setTableSchema(table: Table, schema: Schema): Table =
table.toBuilder().setDefinition(StandardTableDefinition.of(schema)).build()

private def logOnRaceCondition[F[_]: Sync]: PartialFunction[Throwable, F[Unit]] = {
case BigQueryUtils.BQExceptionWithLowerCaseReason("invalid") =>
case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" =>
Logger[F].warn(s"Caught known exception which probably means another loader has already altered the table.")
// Don't do anything else; the BigQueryRetrying will handle retries and logging the exception.
}

private def handleTooManyColumns: PartialFunction[Throwable, AddColumnsResult] = {
case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") =>
AddColumnsResult.TooManyColumnsInTable(bqe)
}

private def showColumns(columns: Vector[Field]): String =
columns.map(_.name).mkString(", ")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ object MockEnvironment {
def retriesConfig = Config.Retries(
Config.SetupErrorRetries(30.seconds),
Config.TransientErrorRetries(1.second, 5),
Config.AlterTableWaitRetries(1.second)
Config.AlterTableWaitRetries(1.second),
Config.TooManyColumnsRetries(300.seconds)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ object WriterProviderSpec {
def retriesConfig = Config.Retries(
Config.SetupErrorRetries(30.seconds),
Config.TransientErrorRetries(1.second, 5),
Config.AlterTableWaitRetries(1.second)
Config.AlterTableWaitRetries(1.second),
Config.TooManyColumnsRetries(300.seconds)
)

def control: IO[Control] =
Expand Down

0 comments on commit 35c3aee

Please sign in to comment.