Skip to content

Commit

Permalink
Bump iglu-scala-client to 3.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 21, 2024
1 parent d44d7e2 commit 7f576c4
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,32 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.{EitherT, NonEmptyVector}
import cats.data.{EitherT, NonEmptyList}
import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaResolutionError
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits.toSchema
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import scala.math.Ordered._

private[transform] object SchemaProvider {

private def getSchema[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, SelfDescribingSchema[Schema]] =
for {
json <- EitherT(resolver.lookupSchema(schemaKey))
.leftMap(resolverBadRow(schemaKey))
schema <- EitherT.fromOption[F](Schema.parse(json), parseSchemaBadRow(schemaKey))
} yield SelfDescribingSchema(SchemaMap(schemaKey), schema)

// Note schema order of the returned list is not guaranteed
// Note schema order of the returned list is guaranteed
def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] =
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] =
for {
schemaKeys <- EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas.toVector)
schemaKeys <- EitherT.rightT[F, FailureDetails.LoaderIgluError](schemaKeys.filter(_ < schemaKey))
topSchema <- getSchema(resolver, schemaKey)
lowerSchemas <- schemaKeys.filter(_ < schemaKey).traverse(getSchema(resolver, _))
} yield NonEmptyVector(topSchema, lowerSchemas)

private def resolverFetchBadRow(
vendor: String,
name: String,
format: String,
model: Int
)(
e: ClientError.ResolutionError
): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)
jsons <- EitherT(resolver.lookupSchemasUntil(schemaKey))
.leftMap { case SchemaResolutionError(schemaKey, error) => resolverBadRow(schemaKey)(error) }
schemas <- jsons.traverse { json =>
EitherT
.fromOption[F](Schema.parse(json.schema), parseSchemaBadRow(json.self.schemaKey))
.map(schema => SelfDescribingSchema(SchemaMap(json.self.schemaKey), schema))
}
} yield schemas

private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.IgluError(schemaKey, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyVector
import cats.data.{NonEmptyList, NonEmptyVector}
import cats.implicits._
import io.circe.syntax._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema}
Expand Down Expand Up @@ -51,23 +51,19 @@ object TypedTabledEntity {
* @param subVersions
* Sub-versions (e.g. '*-0-0') that were present in the batch of events.
* @param schemas
* Iglu schemas pre-fetched from Iglu Server
* Iglu schemas pre-fetched from Iglu Server ordered by key
*/
private[transform] def build(
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion],
schemas: NonEmptyVector[SelfDescribingSchema[Schema]]
schemas: NonEmptyList[SelfDescribingSchema[Schema]]
): Option[TypedTabledEntity] =
// Schemas need to be ordered by key to merge in correct order.
schemas.sorted.toVector
.flatMap { case sds =>
fieldFromSchema(tabledEntity, sds.schema).map((_, sds))
}
.toNev
.map { nev =>
val (rootField, rootSchema) = nev.head
schemas
.traverse(sds => fieldFromSchema(tabledEntity, sds.schema).map((_, sds)))
.map { nel =>
val (rootField, rootSchema) = nel.head
val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil)
nev.tail
nel.tail
.foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) =>
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

when handling Iglu failures should
return a IgluError if schema key does not exist in a valid series of schemas $fail1
return a SchemaListNotFound if the series of schemas does not exist $fail2
return an InvalidSchema if the series contains a schema that cannot be parsed $fail3
return an InvalidSchema if the series contains a schema that cannot be parsed $fail2
"""

def ue1 = {
Expand Down Expand Up @@ -534,26 +533,6 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

def fail2 = {

val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "myvendor", "doesnotexist", 1)

val input = Map(
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
(failure.tabledEntity must beEqualTo(tabledEntity)) and
(failure.versionsInBatch must beEqualTo(Set((0, 0)))) and
(failure.failure must beLike { case _: FailureDetails.LoaderIgluError.SchemaListNotFound => ok })
})
}

}

def fail3 = {

val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "myvendor", "invalid_syntax", 1)

val input = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyVector
import cats.data.NonEmptyList
import cats.effect.IO
import io.circe.parser.{parse => parseToJson}
import scala.io.Source

import org.specs2.Specification
import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
Expand All @@ -32,18 +32,17 @@ class SchemaProviderSpec extends Specification with CatsEffect {
fetch all schemas from a series when queried with the highest schema key $e2
fetch subset of schemas from a series when queried with an intermediate schema key $e3
return a IgluError if schema key does not exist in a valid series of schemas $e4
return a SchemaListNotFound if the series of schemas does not exist $e5
return an InvalidSchema if the series contains a schema that cannot be parsed $e6
return an InvalidSchema if the series contains a schema that cannot be parsed $e5
"""

def e1 = {

val expected = NonEmptyVector.of(
val expected = NonEmptyList.of(
SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700)
)

SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey700).value.map { result =>
result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] =>
result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] =>
schemas must beEqualTo(expected)
}
}
Expand All @@ -52,30 +51,30 @@ class SchemaProviderSpec extends Specification with CatsEffect {

def e2 = {

val expected = Vector(
val expected = List(
SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700),
SelfDescribingSchema(SchemaMap(testSchemaKey701), testSchema701),
SelfDescribingSchema(SchemaMap(testSchemaKey710), testSchema710)
)

SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey710).value.map { result =>
result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] =>
schemas.toVector must containTheSameElementsAs(expected)
result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] =>
schemas.toList must containTheSameElementsAs(expected)
}
}

}

def e3 = {

val expected = Vector(
val expected = List(
SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700),
SelfDescribingSchema(SchemaMap(testSchemaKey701), testSchema701)
)

SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey701).value.map { result =>
result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] =>
schemas.toVector must containTheSameElementsAs(expected)
result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] =>
schemas.toList must containTheSameElementsAs(expected)
}
}

Expand All @@ -90,19 +89,6 @@ class SchemaProviderSpec extends Specification with CatsEffect {

def e5 = {

val testSchemaKey = SchemaKey.fromUri("iglu:myvendor/doesnotexist/jsonschema/7-0-0").toOption.get
val criterion = SchemaCriterion.parse("iglu:myvendor/doesnotexist/jsonschema/7-*-*").get

SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey).value.map { result =>
result must beLeft.like { case failure: FailureDetails.LoaderIgluError.SchemaListNotFound =>
failure.schemaCriterion must beEqualTo(criterion)
}
}

}

def e6 = {

val testSchemaKey = SchemaKey.fromUri("iglu:myvendor/invalid_syntax/jsonschema/1-0-0").toOption.get

SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey).value.map { result =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object WebhookSpec {
}

def igluCirceClient: IgluCirceClient[Id] =
IgluCirceClient.fromResolver[Id](Resolver[Id](Nil, None), 0)
IgluCirceClient.fromResolver[Id](Resolver[Id](Nil, None), 0, 1000)

// Needed because we use Id effect in tests for iglu-scala-client
implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] {
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Dependencies {
// Snowplow
val schemaDdl = "0.23.0"
val badrows = "2.3.0"
val igluClient = "3.1.0"
val igluClient = "3.2.0"
val tracker = "2.0.0"
val analyticsSdk = "3.2.1"

Expand Down

0 comments on commit 7f576c4

Please sign in to comment.