diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 482b031b3..b926d2f4e 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -42,6 +42,7 @@ import enrichments.{EventEnrichments => EE} import enrichments.{MiscEnrichments => ME} import enrichments.registry._ import enrichments.registry.apirequest.ApiRequestEnrichment +import enrichments.registry.extractor.ExtractorEnrichment import enrichments.registry.pii.PiiPseudonymizerEnrichment import enrichments.registry.sqlquery.SqlQueryEnrichment import enrichments.web.{PageEnrichments => WPE} @@ -92,8 +93,15 @@ object EnrichmentManager { enriched.pii = pii.asString } } + _ <- extractor[F](processor, raw, enriched, registry.extractor) } yield enriched + def extractor[F[_]: Monad](processor: Processor, rawEvent: RawEvent, enriched: EnrichedEvent, enrichment: Option[ExtractorEnrichment]): EitherT[F, BadRow, Unit] = + enrichment match { + case Some(extractor) => extractor.process(processor, rawEvent, enriched) + case None => EitherT.rightT[F, BadRow](()) + } + /** * Run all the enrichments and aggregate the errors if any * @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\ diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index 3e9a06bcd..eb9435569 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments import cats.Monad -import cats.data.{EitherT, NonEmptyList, ValidatedNel} +import cats.data.{EitherT, ValidatedNel, NonEmptyList} import cats.effect.Clock import cats.implicits._ @@ -21,7 +21,7 @@ import cats.implicits._ import io.circe._ import io.circe.syntax._ -import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData} +import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey} import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.Client @@ -37,6 +37,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.Enrichm import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.PiiPseudonymizerEnrichment import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.SqlQueryEnrichment @@ -109,6 +110,8 @@ object EnrichmentRegistry { ): EitherT[F, String, EnrichmentRegistry[F]] = confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) => e match { + case c: ExtractorConf => + er.map(_.copy(extractor = Some(c.enrichment))) case c: ApiRequestConf => for { enrichment <- EitherT.right(c.enrichment[F](blocker)) @@ -250,5 +253,6 @@ final case class EnrichmentRegistry[F[_]]( uaParser: Option[UaParserEnrichment] = None, userAgentUtils: Option[UserAgentUtilsEnrichment] = None, weather: Option[WeatherEnrichment[F]] = None, - yauaa: Option[YauaaEnrichment] = None + yauaa: Option[YauaaEnrichment] = None, + extractor: Option[ExtractorEnrichment] = None ) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala index 20906bd89..0433d8377 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala @@ -33,6 +33,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ HttpApi } import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.{ ExtractorEnrichment, Extractable } import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF sealed trait EnrichmentConf { @@ -209,4 +210,8 @@ object EnrichmentConf { ) extends EnrichmentConf { def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize) } + + final case class ExtractorConf(schemaKey: SchemaKey, entities: Set[Extractable], erase: Boolean) extends EnrichmentConf { + def enrichment: ExtractorEnrichment = ExtractorEnrichment(entities, erase) + } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/Extractable.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/Extractable.scala new file mode 100644 index 000000000..a796369fd --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/Extractable.scala @@ -0,0 +1,73 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor + +import io.circe.{JsonObject, Json, Decoder} +import cats.implicits._ + +import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey} + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +sealed trait Extractable extends Product with Serializable { + def schemaKey: SchemaKey + def keys: List[(String, TypedField)] + + private def getters = + keys.map { case (key, v) => Extractable.EventClass.getMethod(key) -> v } + private def erasers = + keys.map { case (key, f) => Extractable.EventClass.getMethod("set" ++ key.capitalize, f.manifest.runtimeClass) } + + def getJson(event: EnrichedEvent): Either[Throwable, JsonObject] = + getters + .traverse { case (getter, to) => to.cast(getter.invoke(event)).map { value => to.name -> value } } + .map { kvs => JsonObject.fromIterable(kvs.collect { case (k, Some(v)) => (k, v) }) } + + def process(event: EnrichedEvent): Either[Throwable, Option[SelfDescribingData[Json]]] = + getJson(event) match { + case Right(o) if o.isEmpty => Right(None) + case Right(o) => Right(Some(SelfDescribingData(schemaKey, Json.fromJsonObject(o)))) + case Left(error) => Left(error) + } + + def erase(event: EnrichedEvent): Unit = { + erasers.foreach { eraser => eraser.invoke(event, null) } + } +} + +object Extractable { + + type Extractables = List[Extractable] + + def All: Extractables = List(MaxMind) + + implicit def extractableCirceDecoder: Decoder[Extractable] = + Decoder[String].map(_.toLowerCase).emap { e => + All + .find(_.toString.toLowerCase == e.toLowerCase) + .toRight(s"$e is an unknown entity to extract. Try: ${All.map(_.toString.toLowerCase).mkString(", ")} or all") + } + + implicit def extractablesCirceDecoder: Decoder[Extractables] = + Decoder[List[Extractable]] + .handleErrorWith(e => Decoder[String].emap(s => if (s.toLowerCase == "all") All.asRight else e.show.asLeft)) + + private val EventClass = classOf[EnrichedEvent] + + case object MaxMind extends Extractable { + val schemaKey = SchemaKey("com.maxmind", "context", "jsonschema", SchemaVer.Full(1,0,0)) + + def keys = List( + "geo_country" -> TypedField.Str("country"), + "geo_region" -> TypedField.Str("region"), + "geo_city" -> TypedField.Str("city"), + "geo_zipcode" -> TypedField.Str("zipcode"), + "geo_latitude" -> TypedField.Flo("latitude"), + "geo_longitude" -> TypedField.Flo("longitude"), + "geo_region_name" -> TypedField.Str("region_name"), + + "ip_isp" -> TypedField.Str("isp"), + "ip_organization" -> TypedField.Str("organization"), + "ip_domain" -> TypedField.Str("domain"), + "ip_netspeed" -> TypedField.Str("netspeed") + ) + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/ExtractorEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/ExtractorEnrichment.scala new file mode 100644 index 000000000..a0a3cf56c --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/ExtractorEnrichment.scala @@ -0,0 +1,104 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor + +import java.time.Instant + +import cats.Applicative +import cats.implicits._ +import cats.data.{EitherT, ValidatedNel, NonEmptyList} + +import io.circe.{Error, Json} +import io.circe.parser.parse +import io.circe.syntax._ + +import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey} +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.snowplow.badrows.FailureDetails.{EnrichmentFailureMessage, EnrichmentFailure} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Payload, Failure} +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent.toRawEvent +import com.snowplowanalytics.snowplow.enrich.common.enrichments.MiscEnrichments +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment.failedReflection +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{ParseableEnrichment, EnrichmentConf} +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils + +final case class ExtractorEnrichment(entities: Set[Extractable], erase: Boolean) { + def process[F[_]: Applicative](processor: Processor, rawEvent: RawEvent, event: EnrichedEvent): EitherT[F, BadRow, Unit] = { + // Create beforehand because later it could be mutated + val badRowPayload = Payload.EnrichmentPayload(toPartiallyEnrichedEvent(event), toRawEvent(rawEvent)) + entities.toList.flatTraverse { entity => + entity.process(event) match { + case Left(value) => + Left((entity.toString.toLowerCase, value)) + case Right(Some(json)) => + if (erase) entity.erase(event) else () // Unsafe mutation + Right(List(json)) + case Right(None) => + Right(Nil) + } + } match { + case Left(error) => + EitherT.leftT(failedReflection(processor, badRowPayload)(error)) + case Right(entities) if entities.isEmpty => + EitherT.rightT[F, BadRow](()) + case Right(entities) => + val contexts = if (event.derived_contexts == null) + SelfDescribingData(MiscEnrichments.ContextsSchema, List.empty[Json]).asRight + else + parse(event.derived_contexts) + .flatMap(_.as[SelfDescribingData[Json]]) + .flatMap { contexts => contexts.data.as[List[Json]].map(data => SelfDescribingData(contexts.schema, data)) } + + contexts + .map { contexts => SelfDescribingData(contexts.schema, (contexts.data ++ entities.map(_.normalize)).asJson) } + .leftMap(ExtractorEnrichment.failedDecoding(processor, badRowPayload)) + .toEitherT[F] + .map(contexts => event.setDerived_contexts(contexts.asString)) // Unsafe mutation + } + } +} + +object ExtractorEnrichment extends ParseableEnrichment { + + def failedDecoding(processor: Processor, payload: Payload.EnrichmentPayload)(error: Error): BadRow = { + val message = EnrichmentFailureMessage.Simple(s"Cannot decode derived_contexts. ${error.show}") + val enrichmentFailure = EnrichmentFailure(None, message) + val messages = NonEmptyList.one(enrichmentFailure) + val failure = Failure.EnrichmentFailures(Instant.now(), messages) + BadRow.EnrichmentFailures(processor, failure, payload) + } + + def failedReflection(processor: Processor, payload: Payload.EnrichmentPayload)(error: (String, Throwable)): BadRow = { + val message = EnrichmentFailureMessage.Simple(s"Failed to extract a property for ${error._1}. ${error._2}") + val enrichmentFailure = EnrichmentFailure(None, message) + val messages = NonEmptyList.one(enrichmentFailure) + val failure = Failure.EnrichmentFailures(Instant.now(), messages) + BadRow.EnrichmentFailures(processor, failure, payload) + } + + + val supportedSchema: SchemaCriterion = + SchemaCriterion( + "com.snowplowanalytics.snowplow.enrichments", + "extractor_enrichment_config", + "jsonschema", + 1, + 0, + 0 + ) + + def parse(config: Json, schemaKey: SchemaKey, localMode: Boolean): ValidatedNel[String, EnrichmentConf] = { + isParseable(config, schemaKey) + .toValidatedNel + .andThen { _ => + val erase = CirceUtils.extract[Boolean](config, "parameters", "erase") + val extract = CirceUtils.extract[Extractable.Extractables](config, "parameters", "extract") + + (erase, extract).mapN { (er, ex) => + EnrichmentConf.ExtractorConf(schemaKey, ex.toSet, er) + }.toValidatedNel + } + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/TypedField.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/TypedField.scala new file mode 100644 index 000000000..dc2a9e6df --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/extractor/TypedField.scala @@ -0,0 +1,29 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor + +import io.circe.{Encoder, Json} + +import cats.implicits._ + +sealed trait TypedField { + type Target + def manifest: Manifest[Target] + def name: String + def encoder: Encoder[Target] + + def cast(anyRef: AnyRef): Either[Throwable, Option[Json]] = + Either.catchNonFatal(anyRef.asInstanceOf[Target]).map(Option.apply).nested.map(encoder.apply).value +} + +object TypedField { + case class Str(name: String) extends TypedField { + type Target = String + def manifest: Manifest[String] = implicitly[Manifest[String]] + val encoder = Encoder[String] + } + case class Flo(name: String) extends TypedField { + type Target = java.lang.Float + def manifest: Manifest[java.lang.Float] = implicitly[Manifest[java.lang.Float]] + val encoder = Encoder[java.lang.Float] + } +} + diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/extractor/ExtractorEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/extractor/ExtractorEnrichmentSpec.scala new file mode 100644 index 000000000..9c813e6f0 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/extractor/ExtractorEnrichmentSpec.scala @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor + +import cats.Id +import cats.implicits._ + +import io.circe.Json +import io.circe.parser.parse +import io.circe.literal._ + +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.snowplow.badrows.Processor + +import org.joda.time.DateTime + +import org.specs2.mutable.Specification + +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.enrichments.MiscEnrichments +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +class ExtractorEnrichmentSpec extends Specification { + "process" should { + "mutate an event adding necessary contexts without erasing" in { + val enrichment = ExtractorEnrichment(Set(Extractable.MaxMind), false) + + val city = "Krasnoyarsk" + val latitude = 56.0184f + val longitude = 92.8672f + val event = new EnrichedEvent + event.setGeo_city(city) + event.setGeo_latitude(latitude) + event.setGeo_longitude(longitude) + + val expected = SelfDescribingData( + MiscEnrichments.ContextsSchema, + json"""[{ + "schema" : "iglu:com.maxmind/context/jsonschema/1-0-0", + "data" : { + "city" : "Krasnoyarsk", + "latitude" : 56.0184, + "longitude" : 92.8672 + } + }]""" + ) + + val postProcessing = enrichment.process[Id](Processor("test-processor", "1.0.0"), ExtractorEnrichmentSpec.rawEvent, event).value + val output = parse(event.derived_contexts).flatMap(_.as[SelfDescribingData[Json]]) + + event.geo_city must beEqualTo(city) + event.geo_latitude must beEqualTo(latitude) + event.geo_longitude must beEqualTo(longitude) + postProcessing must beRight + output must beRight(expected) + } + + "mutate an event adding necessary contexts with erasing" in { + val enrichment = ExtractorEnrichment(Set(Extractable.MaxMind), true) + + val city = "Krasnoyarsk" + val event = new EnrichedEvent + event.setGeo_city(city) + event.setGeo_latitude(56.0184f) + event.setGeo_longitude(92.8672f) + + val expected = SelfDescribingData( + MiscEnrichments.ContextsSchema, + json"""[{ + "schema" : "iglu:com.maxmind/context/jsonschema/1-0-0", + "data" : { + "city" : "Krasnoyarsk", + "latitude" : 56.0184, + "longitude" : 92.8672 + } + }]""" + ) + + val postProcessing = enrichment.process[Id](Processor("test-processor", "1.0.0"), ExtractorEnrichmentSpec.rawEvent, event).value + val output = parse(event.derived_contexts).flatMap(_.as[SelfDescribingData[Json]]) + + event.geo_city must beNull + event.geo_latitude must beNull + event.geo_longitude must beNull + postProcessing must beRight + output must beRight(expected) + } + + "append contexts into existing derived_contexts field" in { + val enrichment = ExtractorEnrichment(Set(Extractable.MaxMind), false) + + val city = "Krasnoyarsk" + val latitude = 56.0184f + val longitude = 92.8672f + val event = new EnrichedEvent + val priorContext = json"""{"schema":"iglu:com.acme/one/jsonschema/1-0-0","data":{"value":1}}""" + event.setGeo_city(city) + event.setGeo_latitude(latitude) + event.setGeo_longitude(longitude) + event.setDerived_contexts(s"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[${priorContext.noSpaces}]}""") + + val expected = SelfDescribingData( + MiscEnrichments.ContextsSchema, + json"""[ + $priorContext, + { + "schema" : "iglu:com.maxmind/context/jsonschema/1-0-0", + "data" : { + "city" : "Krasnoyarsk", + "latitude" : 56.0184, + "longitude" : 92.8672 + } + }]""" + ) + + val postProcessing = enrichment.process[Id](Processor("test-processor", "1.0.0"), ExtractorEnrichmentSpec.rawEvent, event).value + val output = parse(event.derived_contexts).flatMap(_.as[SelfDescribingData[Json]]) + + event.geo_city must beEqualTo(city) + event.geo_latitude must beEqualTo(latitude) + event.geo_longitude must beEqualTo(longitude) + postProcessing must beRight + output must beRight(expected) + } + } +} + +object ExtractorEnrichmentSpec { + val api = CollectorPayload.Api("com.snowplowanalytics.snowplow", "tp2") + val source = CollectorPayload.Source("clj-tomcat", "UTF-8", None) + val context = CollectorPayload.Context( + DateTime.parse("2013-08-29T00:18:48.000+00:00").some, + "37.157.33.123".some, + None, + None, + Nil, + None + ) + val rawEvent = RawEvent(api, Map.empty, None, source, context) +} \ No newline at end of file