Skip to content

Commit

Permalink
Skip schemas for unstructured transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 28, 2023
1 parent b326dd4 commit 972e1bb
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.FunctorFilter
import cats.data.{NonEmptyList, Validated, ValidatedNel}
import cats.implicits._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.schemaddl.parquet.{CastError, Caster, Field, Type}
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}
import com.snowplowanalytics.snowplow.badrows.{
BadRow,
Failure => BadRowFailure,
Expand Down Expand Up @@ -76,19 +78,42 @@ object Transform {
processor: BadRowProcessor,
caster: Caster[A],
jsonCaster: Json.Folder[A],
event: Event
event: Event,
entitiesToSkip: List[SchemaCriterion]
): Either[BadRow, List[Caster.NamedValue[A]]] =
forAtomic(caster, event).toEither
.map { atomic =>
val entities = event.contexts.toShreddedJson ++ event.derived_contexts.toShreddedJson ++ event.unstruct_event.toShreddedJson.toMap
atomic ::: entities.toList.map { case (key, json) =>
atomic ::: extractEntities(event, entitiesToSkip).iterator.map { case (key, json) =>
Caster.NamedValue(key, json.foldWith(jsonCaster))
}
}.toList
}
.leftMap { nel =>
BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event))
}

private def extractEntities(event: Event, entitiesToSkip: List[SchemaCriterion]): Map[String, Json] =
extractFromContexts(event.contexts, entitiesToSkip) ++
extractFromContexts(event.derived_contexts, entitiesToSkip) ++
extractFromUnstruct(event.unstruct_event, entitiesToSkip)

private def extractFromContexts(contexts: Contexts, entitiesToSkip: List[SchemaCriterion]): Map[String, Json] =
Contexts(filterOutEntities(contexts.data, entitiesToSkip)).toShreddedJson

private def extractFromUnstruct(unstruct: UnstructEvent, entitiesToSkip: List[SchemaCriterion]): Map[String, Json] =
UnstructEvent(filterOutEntities(unstruct.data, entitiesToSkip)).toShreddedJson.toMap

private def filterOutEntities[Entities[_]: FunctorFilter](
entities: Entities[SelfDescribingData[Json]],
entitiesToSkip: List[SchemaCriterion]
): Entities[SelfDescribingData[Json]] =
if (entitiesToSkip.nonEmpty) {
entities.filterNot { entity =>
entitiesToSkip.exists(_.matches(entity.schema))
}
} else {
entities
}

private def failForResolverErrors(
processor: BadRowProcessor,
event: Event,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.loaders.transform

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster.NamedValue
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}
import com.snowplowanalytics.snowplow.badrows.{Processor => BadRowProcessor}
import io.circe.Json
import io.circe.literal._
import org.specs2.matcher.MatchResult
import org.specs2.Specification

import java.time.Instant
import java.util.UUID

class SkippingSchemasSpec extends Specification {

def is = s2"""
Skip none if there is nothing to skip $e1
Skip none if there is no schema matching criteria $e2
Skip all for *-*-* criteria $e3
Skip all matching partial A-*-* criteria $e4
Skip all matching partial A-B-* criteria $e5
Skip all matching full A-B-C criteria $e6
Skip all for mixed criteria: A-*-* + A-B-* + A-B-C $e7
"""

def e1 = {
val schemasToSkip = List.empty

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}""")),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List.empty,
shouldExist = List(
NamedValue(name = "unstruct_event_com_example_my_schema_1", value = json"""{"field": "ue1"}"""),
NamedValue(name = "contexts_com_example_my_schema_2", value = json"""[{"field": "c1"}]"""),
NamedValue(name = "contexts_com_example_my_schema_3", value = json"""[{"field": "dc1"}]""")
)
)
}

def e2 = {
val schemasToSkip = List("iglu:com.example/mySchema/jsonschema/1-2-3")

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}""")),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List.empty,
shouldExist = List(
NamedValue(name = "unstruct_event_com_example_my_schema_1", value = json"""{"field": "ue1"}"""),
NamedValue(name = "contexts_com_example_my_schema_2", value = json"""[{"field": "c1"}]"""),
NamedValue(name = "contexts_com_example_my_schema_3", value = json"""[{"field": "dc1"}]""")
)
)
}

def e3 = {
val schemasToSkip = List("iglu:com.example/mySchema/jsonschema/*-*-*")

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}""")),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List(
"unstruct_event_com_example_my_schema_1",
"contexts_com_example_my_schema_2",
"contexts_com_example_my_schema_3"
),
shouldExist = List.empty
)
}

def e4 = {
val schemasToSkip = List("iglu:com.example/mySchema/jsonschema/1-*-*")

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}""")),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List(
"unstruct_event_com_example_my_schema_1"
),
shouldExist = List(
NamedValue(name = "contexts_com_example_my_schema_2", value = json"""[{"field": "c1"}]"""),
NamedValue(name = "contexts_com_example_my_schema_3", value = json"""[{"field": "dc1"}]""")
)
)
}

def e5 = {
val schemasToSkip = List("iglu:com.example/mySchema/jsonschema/1-0-*", "iglu:com.example/mySchema/jsonschema/2-0-*")

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(
sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}"""),
sdj(key = "iglu:com.example/mySchema/jsonschema/2-1-0", data = json"""{"field": "c2"}""")
),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List(
"unstruct_event_com_example_my_schema_1"
),
shouldExist = List(
// There is still '_2' column because 2-0-0 is skipped, but 2-1-0 with c2 value is not
NamedValue(name = "contexts_com_example_my_schema_2", value = json"""[{"field": "c2"}]"""),
NamedValue(name = "contexts_com_example_my_schema_3", value = json"""[{"field": "dc1"}]""")
)
)
}

def e6 = {
val schemasToSkip = List("iglu:com.example/mySchema/jsonschema/3-0-0")

val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(
sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}"""),
sdj(key = "iglu:com.example/mySchema/jsonschema/2-1-0", data = json"""{"field": "c2"}""")
),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

assert(input, schemasToSkip)(
shouldNotExist = List(
"contexts_com_example_my_schema_3"
),
shouldExist = List(
NamedValue(name = "unstruct_event_com_example_my_schema_1", value = json"""{"field": "ue1"}"""),
NamedValue(name = "contexts_com_example_my_schema_2", value = json"""[{"field": "c1"}, {"field": "c2"}]""")
)
)
}

def e7 = {
val input = inputEvent(
withUnstruct = Some(sdj(key = "iglu:com.example/mySchema/jsonschema/1-0-0", data = json"""{"field": "ue1"}""")),
withContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/2-0-0", data = json"""{"field": "c1"}""")),
withDerivedContexts = List(sdj(key = "iglu:com.example/mySchema/jsonschema/3-0-0", data = json"""{"field": "dc1"}"""))
)

val schemasToSkip = List(
"iglu:com.example/mySchema/jsonschema/1-*-*",
"iglu:com.example/mySchema/jsonschema/2-0-*",
"iglu:com.example/mySchema/jsonschema/3-0-0"
)

assert(input, schemasToSkip)(
shouldNotExist = List(
"unstruct_event_com_example_my_schema_1",
"contexts_com_example_my_schema_2",
"contexts_com_example_my_schema_3"
),
shouldExist = List.empty
)
}

private def inputEvent(
withUnstruct: Option[SelfDescribingData[Json]],
withContexts: List[SelfDescribingData[Json]],
withDerivedContexts: List[SelfDescribingData[Json]]
): Event =
Event
.minimal(UUID.randomUUID, Instant.now, "0.0.0", "0.0.0")
.copy(unstruct_event = UnstructEvent(withUnstruct))
.copy(contexts = Contexts(withContexts))
.copy(derived_contexts = Contexts(withDerivedContexts))

private def sdj(key: String, data: Json): SelfDescribingData[Json] =
SelfDescribingData[Json](SchemaKey.fromUri(key).toOption.get, data)

private def assert(
input: Event,
schemasToSkip: List[String]
)(
shouldNotExist: List[String],
shouldExist: List[NamedValue[Json]]
): MatchResult[Any] = {
val badProcessor = BadRowProcessor("snowflake-loader", "0.0.0")

val output = Transform
.transformEventUnstructured(
badProcessor,
TestCaster,
TestCirceFolder,
input,
schemasToSkip.map(schemas => SchemaCriterion.parse(schemas).get)
)
.toOption
.get

output.map(_.name) must not(containAnyOf(shouldNotExist)) and
(output must containAllOf(shouldExist))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TransformSpec extends Specification {
def e1 = {
val event = Event.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

val expected = List(
NamedValue("event_id", Json.fromString(testEventId.toString)),
Expand Down Expand Up @@ -64,7 +64,7 @@ class TransformSpec extends Specification {
tr_total = Some(12.34)
)

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

val expected = List(
NamedValue("app_id", Json.fromString("myapp")),
Expand All @@ -91,7 +91,7 @@ class TransformSpec extends Specification {
tr_total = Some(12.3456) // Too many decimal points
)

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

result must beLeft
}
Expand All @@ -114,7 +114,7 @@ class TransformSpec extends Specification {
.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")
.copy(unstruct_event = SnowplowEvent.UnstructEvent(Some(sdj)))

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

val expected = NamedValue("unstruct_event_com_example_my_schema_7", data)

Expand Down Expand Up @@ -149,7 +149,7 @@ class TransformSpec extends Specification {
.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")
.copy(contexts = contexts)

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

val expected = List(
NamedValue("contexts_com_example_my_schema_7", Json.fromValues(List(data1))),
Expand Down Expand Up @@ -191,7 +191,7 @@ class TransformSpec extends Specification {
.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")
.copy(contexts = contexts)

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

val expected = NamedValue("contexts_com_example_my_schema_7", Json.fromValues(List(data1, data2)))

Expand All @@ -206,6 +206,7 @@ object TransformSpec {

val testEventId = UUID.randomUUID
val testTimestamp = Instant.now
val schemasToSkip = List.empty

val badProcessor = BadRowProcessor("snowflake-loader", "0.0.0")

Expand Down

0 comments on commit 972e1bb

Please sign in to comment.