Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump common-streams to 0.6.0 #374

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.data.NonEmptyVector
import io.circe.Json

import java.time.{Instant, LocalDate}
Expand All @@ -32,11 +33,11 @@ private[processing] object BigQueryCaster extends Caster[AnyRef] {
new java.math.BigDecimal(unscaled.bigInteger, details.scale)
override def timestampValue(v: Instant): java.lang.Long = Long.box(v.toEpochMilli * 1000) // Microseconds
override def dateValue(v: LocalDate): java.lang.Long = Long.box(v.toEpochDay)
override def arrayValue(vs: List[AnyRef]): JSONArray =
override def arrayValue(vs: Vector[AnyRef]): JSONArray =
// BigQuery does not permit nulls in a repeated field
new JSONArray(vs.filterNot(_ == null).asJava)
override def structValue(vs: List[Caster.NamedValue[AnyRef]]): JSONObject = {
val map = vs
override def structValue(vs: NonEmptyVector[Caster.NamedValue[AnyRef]]): JSONObject = {
val map = vs.iterator
.map { case Caster.NamedValue(k, v) =>
(k, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.Eq
import cats.implicits._

import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.google.protobuf.Descriptors
import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName}
Expand All @@ -16,7 +19,31 @@ import scala.jdk.CollectionConverters._

object BigQuerySchemaUtils {

def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] =
def fieldsMissingFromDescriptor(tableDescriptor: Descriptors.Descriptor, bqFields: FieldList): Boolean =
bqFields.asScala.exists { field =>
Option(tableDescriptor.findFieldByName(field.getName)) match {
case Some(fieldDescriptor) =>
val nullableMismatch = fieldDescriptor.isRequired && (field.getMode === BQField.Mode.NULLABLE)
nullableMismatch || nestedFieldMissingFromDescriptor(fieldDescriptor, field)
case None =>
true
}
}

private def nestedFieldMissingFromDescriptor(tableField: Descriptors.FieldDescriptor, bqField: BQField): Boolean =
tableField.getType match {
case Descriptors.FieldDescriptor.Type.MESSAGE =>
Option(bqField.getSubFields) match {
case Some(nestedFields) =>
fieldsMissingFromDescriptor(tableField.getMessageType, nestedFields)
case _ =>
false
}
case _ =>
false
}

def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Vector[Field]): Vector[Field] =
ddlFields.filter { field =>
Option(tableDescriptor.findFieldByName(field.name)) match {
case Some(fieldDescriptor) =>
Expand All @@ -32,15 +59,17 @@ object BigQuerySchemaUtils {
case Descriptors.FieldDescriptor.Type.MESSAGE =>
ddlField.fieldType match {
case Type.Struct(nestedFields) =>
alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty
alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty
case Type.Array(Type.Struct(nestedFields), _) =>
alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty
case _ =>
false
}
case _ =>
false
}

def mergeInColumns(bqFields: FieldList, ddlFields: Seq[Field]): FieldList = {
def mergeInColumns(bqFields: FieldList, ddlFields: Vector[Field]): FieldList = {
val ddlFieldsByName = ddlFields.map(f => f.name -> f).toMap
val bqFieldNames = bqFields.asScala.map(f => f.getName).toSet
val alteredExisting = bqFields.asScala.map { bqField =>
Expand All @@ -65,7 +94,7 @@ object BigQuerySchemaUtils {
Option(bqField.getSubFields) match {
case Some(bqNestedFields) =>
bqField.toBuilder
.setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields))
.setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields.toVector))
.build
case None =>
bqField
Expand All @@ -86,7 +115,7 @@ object BigQuerySchemaUtils {
.setMode(BQField.Mode.REPEATED)
.build
case Type.Struct(nestedFields) =>
val nested = FieldList.of(nestedFields.map(bqFieldOf).asJava)
val nested = FieldList.of(nestedFields.map(bqFieldOf).toVector.asJava)
BQField
.newBuilder(ddlField.name, StandardSQLTypeName.STRUCT, nested)
.setMode(bqModeOf(ddlField.nullability))
Expand All @@ -113,6 +142,8 @@ object BigQuerySchemaUtils {
private def bqModeOf(nullability: Type.Nullability): BQField.Mode =
if (nullability.nullable) BQField.Mode.NULLABLE else BQField.Mode.REQUIRED

private implicit val modeEq: Eq[BQField.Mode] = Eq.fromUniversalEquals

def showDescriptor(descriptor: Descriptors.Descriptor): String =
descriptor.getFields.asScala
.map(_.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.{PolicyDecision, RetryDetails, RetryPolicy}
import retry.implicits._
import com.google.cloud.bigquery.FieldList

import java.nio.charset.StandardCharsets
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -205,7 +206,7 @@ object Processing {
def handlingServerSideSchemaMismatches(env: Environment[F]): F[Writer.WriteResult] = {
def onFailure(wr: Writer.WriteResult, details: RetryDetails): F[Unit] = {
val extractedDetail = BigQueryRetrying.extractRetryDetails(details)
val msg = s"Newly added columns have not yet propagated to the BigQuery Writer. $extractedDetail"
val msg = s"Newly added columns have not yet propagated to the BigQuery Writer server-side. $extractedDetail"
val log = wr match {
case Writer.WriteResult.ServerSideSchemaMismatch(e) if details.retriesSoFar > errorsAllowedWithShortLogging =>
Logger[F].warn(e)(msg)
Expand Down Expand Up @@ -287,7 +288,7 @@ object Processing {
* Alters the table to add any columns that were present in the Events but not currently in the
* table
*/
private def handleSchemaEvolution[F[_]: Sync](
private def handleSchemaEvolution[F[_]: Async](
env: Environment[F]
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
Expand All @@ -299,7 +300,9 @@ object Processing {
}
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_
env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist =>
openWriterUntilFieldsExist(env, fieldsToExist)
}
} else {
Sync[F].unit
}
Expand All @@ -309,6 +312,22 @@ object Processing {
}
}

private def openWriterUntilFieldsExist[F[_]: Async](env: Environment[F], fieldsToExist: FieldList): F[Unit] =
env.writer.opened
.use(_.descriptor)
.retryingOnFailures(
policy = env.alterTableWaitPolicy,
wasSuccessful = { descriptor =>
(!BigQuerySchemaUtils.fieldsMissingFromDescriptor(descriptor, fieldsToExist)).pure[F]
},
onFailure = { case (_, details) =>
val extractedDetail = BigQueryRetrying.extractRetryDetails(details)
val msg = s"Newly added columns have not yet propagated to the BigQuery Writer client-side. $extractedDetail"
Logger[F].warn(msg) *> env.writer.closed.use_
}
)
.void

private def sendFailedEvents[F[_]: Sync](
env: Environment[F],
badRowProcessor: BadRowProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ import scala.jdk.CollectionConverters._

trait TableManager[F[_]] {

def addColumns(columns: Vector[Field]): F[Unit]
/**
* Attempt to add columns to the table
*
* @return
* A list of fields which are guaranteed to eventually exist in the table. Fields might not be
* available immediately due to asynchronous nature of BigQuery. The returned list will be empty
* if adding columns failed due to too many columns in the table.
*/
def addColumns(columns: Vector[Field]): F[FieldList]

def createTable: F[Unit]

Expand Down Expand Up @@ -66,7 +74,7 @@ object TableManager {
for {
addingColumnsEnabled <- Ref[F].of[Boolean](true)
} yield new WithHandledErrors[F] {
def addColumns(columns: Vector[Field]): F[Unit] =
def addColumns(columns: Vector[Field]): F[FieldList] =
addingColumnsEnabled.get.flatMap {
case true =>
BigQueryRetrying
Expand All @@ -78,7 +86,7 @@ object TableManager {
.onError(logOnRaceCondition)
}
case false =>
Async[F].unit
FieldList.of().pure[F]
}

def createTable: F[Unit] =
Expand All @@ -99,7 +107,7 @@ object TableManager {
client: BigQuery
): TableManager[F] = new TableManager[F] {

def addColumns(columns: Vector[Field]): F[Unit] =
def addColumns(columns: Vector[Field]): F[FieldList] =
for {
table <- Sync[F].blocking(client.getTable(config.dataset, config.table))
schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema)
Expand All @@ -108,7 +116,7 @@ object TableManager {
schema <- Sync[F].pure(Schema.of(fields))
table <- Sync[F].pure(setTableSchema(table, schema))
_ <- Sync[F].blocking(table.update())
} yield ()
} yield fields

def createTable: F[Unit] = {
val tableInfo = atomicTableInfo(config)
Expand All @@ -133,15 +141,17 @@ object TableManager {
monitoring: Monitoring[F],
addingColumnsEnabled: Ref[F, Boolean],
columns: Vector[Field]
): PartialFunction[Throwable, F[Unit]] = {
case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") =>
): PartialFunction[Throwable, F[FieldList]] = {
case bqe: BigQueryException
if bqe.lowerCaseReason === "invalid" && (bqe.lowerCaseMessage
.startsWith("too many columns") || bqe.lowerCaseMessage.startsWith("too many total leaf fields")) =>
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns")
_ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), bqe))
_ <- addingColumnsEnabled.set(false)
_ <- enableAfterDelay.start
} yield ()
} yield FieldList.of()
}

private def showColumns(columns: Vector[Field]): String =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "test_vendor",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"myString": {"type": "string"}
},
"additionalProperties": false,
"type": "object"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "test_vendor",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"myString": {"type": "string"},
"myInteger": {"type": "integer"}
},
"additionalProperties": false,
"type": "object"
}
Loading
Loading