From 00375738e28c2b225f2e7a061b2fed2f05bffa85 Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 24 Sep 2024 17:46:53 +0300 Subject: [PATCH 1/2] Add caching to lookupSchemasUntil function Also, lookupSchemasUntilResult function is implemented as well. It is the variant of lookupSchemasUntil that returns ResolverResult which contains information about cahe item. Also, mimaPreviousVersions is cleared out because this commit breaks binary compatibility with previous versions since it adds a new function to a trait. Therefore, we needed to clear-out the mimaPreviousVersions. --- .../client/resolver/CreateResolverCache.scala | 6 + .../client/resolver/Resolver.scala | 148 +++++++--- .../client/resolver/ResolverCache.scala | 39 ++- .../client/resolver/package.scala | 18 +- .../resolver/ResolverCacheSpec.scala | 3 +- .../resolver/ResolverResultSpec.scala | 272 +++++++++++++++++- .../resolver/ResolverSpecHelpers.scala | 47 ++- project/BuildSettings.scala | 2 +- 8 files changed, 486 insertions(+), 49 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala index caf7543b..3ba15e3c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala @@ -25,6 +25,8 @@ trait CreateResolverCache[F[_]] { def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] + def createSchemaContentListCache(size: Int): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]] + def createMutex[K]: F[ResolverMutex[F, K]] } @@ -43,6 +45,10 @@ object CreateResolverCache { override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] = createLruMap(size) + override def createSchemaContentListCache( + size: Int + ): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]] = + createLruMap(size) } implicit def idCreateResolverCache: CreateResolverCache[Id] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala index f80bd9d4..092e2783 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala @@ -200,6 +200,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac )) && custom.values.flatMap(_.errors).forall(_ == RegistryError.NotFound) } + /** + * The variant of lookupSchemasUntilResult that returns the result + * that isn't wrapped with ResolverResult + */ + def lookupSchemasUntil( + maxSchemaKey: SchemaKey + )(implicit + F: Monad[F], + L: RegistryLookup[F], + C: Clock[F] + ): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = + lookupSchemasUntilResult(maxSchemaKey).map(_.map(_.value)) + /** * Looks up all the schemas with the same model until `maxSchemaKey`. * For the schemas of previous revisions, it starts with addition = 0 @@ -209,49 +222,95 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac * @return All the schemas if all went well, [[Resolver.SchemaResolutionError]] with the first error that happened * while looking up the schemas if something went wrong. */ - def lookupSchemasUntil( + def lookupSchemasUntilResult( maxSchemaKey: SchemaKey )(implicit F: Monad[F], L: RegistryLookup[F], C: Clock[F] - ): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = { - def go( - current: SchemaVer.Full, - acc: List[SelfDescribingSchema[Json]] - ): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = { - val currentSchemaKey = maxSchemaKey.copy(version = current) - lookupSchema(currentSchemaKey).flatMap { - case Left(e) => - if (current.addition === 0) - Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e))) - else if (current.revision < maxSchemaKey.version.revision && isNotFound(e)) - go(current.copy(revision = current.revision + 1, addition = 0), acc) - else - Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e))) - case Right(json) => - if (current.revision < maxSchemaKey.version.revision) - go( - current.copy(addition = current.addition + 1), - SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc - ) - else if (current.addition < maxSchemaKey.version.addition) - go( - current.copy(addition = current.addition + 1), - SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc - ) - else - Monad[F].pure( - Right( - NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse + ): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] = { + def get(): F[Either[SchemaResolutionError, SchemaContentList]] = { + def go( + current: SchemaVer.Full, + acc: List[SelfDescribingSchema[Json]] + ): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = { + val currentSchemaKey = maxSchemaKey.copy(version = current) + lookupSchema(currentSchemaKey).flatMap { + case Left(e) => + if (current.addition === 0) + Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e))) + else if (current.revision < maxSchemaKey.version.revision && isNotFound(e)) + go(current.copy(revision = current.revision + 1, addition = 0), acc) + else + Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e))) + case Right(json) => + if (current.revision < maxSchemaKey.version.revision) + go( + current.copy(addition = current.addition + 1), + SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc ) - ) + else if (current.addition < maxSchemaKey.version.addition) + go( + current.copy(addition = current.addition + 1), + SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc + ) + else + Monad[F].pure( + Right( + NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse + ) + ) + } } + + go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil) } - go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil) + def handleAfterFetch( + result: Either[SchemaResolutionError, SchemaContentList] + ): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] = + cache match { + case Some(c) => + val updated = result.leftMap(e => resolutionErrorToFailureMap(e)) + c.putSchemaContentListResult(maxSchemaKey, updated).map { + case Right(ResolverCache.TimestampedItem(i, t)) => + Right(ResolverResult.Cached(maxSchemaKey, i, t)) + case Left(failure) => + val schemaKey = result.leftMap(_.schemaKey).left.getOrElse(maxSchemaKey) + Left(SchemaResolutionError(schemaKey, resolutionError(failure))) + } + case None => + result + .map[SchemaContentListLookupResult](ResolverResult.NotCached(_)) + .pure[F] + } + + def lockAndLookup: F[Either[SchemaResolutionError, SchemaContentListLookupResult]] = + withLockOnSchemaContentList(maxSchemaKey) { + getSchemaContentListFromCache(maxSchemaKey).flatMap { + case Some(TimestampedItem(Right(i), t)) => + Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t))) + case Some(TimestampedItem(Left(_), _)) | None => + for { + result <- get() + fixed <- handleAfterFetch(result) + } yield fixed + } + } + + getSchemaContentListFromCache(maxSchemaKey).flatMap { + case Some(TimestampedItem(Right(i), t)) => + Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t))) + case Some(TimestampedItem(Left(_), _)) | None => + lockAndLookup + } } + def resolutionErrorToFailureMap(resolutionError: SchemaResolutionError): LookupFailureMap = + resolutionError.error.value.toMap.flatMap { case (key, value) => + allRepos.find(_.config.name == key).map((_, value)) + } + /** * Get list of available schemas for particular vendor and name part * Server supposed to return them in proper order @@ -428,6 +487,12 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac case None => f } + private def withLockOnSchemaContentList[A](schemaKey: SchemaKey)(f: => F[A]): F[A] = + cache match { + case Some(c) => c.withLockOnSchemaContentList(schemaKey)(f) + case None => f + } + private def getSchemaListFromCache( vendor: Vendor, name: Name, @@ -441,15 +506,26 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac case None => Monad[F].pure(None) } + private def getSchemaContentListFromCache( + schemaKey: SchemaKey + )(implicit + F: Monad[F], + C: Clock[F] + ): F[Option[ResolverCache.TimestampedItem[SchemaContentListLookup]]] = + cache match { + case Some(c) => c.getTimestampedSchemaContentList(schemaKey) + case None => Monad[F].pure(None) + } } /** Companion object. Lets us create a Resolver from a Json */ object Resolver { - type SchemaListKey = (Vendor, Name, Model) - type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem] - type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList] - type SupersededBy = Option[SchemaVer.Full] + type SchemaListKey = (Vendor, Name, Model) + type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem] + type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList] + type SchemaContentListLookupResult = ResolverResult[SchemaKey, SchemaContentList] + type SupersededBy = Option[SchemaVer.Full] /** * The result of doing schema lookup diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala index c94e7931..9324b794 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala @@ -40,8 +40,10 @@ import Resolver.SchemaItem class ResolverCache[F[_]] private ( schemas: LruMap[F, SchemaKey, SchemaCacheEntry], schemaLists: LruMap[F, ListCacheKey, ListCacheEntry], + schemaContentLists: LruMap[F, SchemaKey, SchemaContentListCacheEntry], schemaMutex: ResolverMutex[F, SchemaKey], schemaListMutex: ResolverMutex[F, ListCacheKey], + schemaContentListMutex: ResolverMutex[F, SchemaKey], val ttl: Option[TTL] ) { @@ -144,6 +146,23 @@ class ResolverCache[F[_]] private ( f: => F[A] ): F[A] = schemaListMutex.withLockOn((vendor, name, model))(f) + + private[resolver] def getTimestampedSchemaContentList( + schemaKey: SchemaKey + )(implicit F: Monad[F], C: Clock[F]): F[Option[TimestampedItem[SchemaContentListLookup]]] = + getTimestampedItem(ttl, schemaContentLists, schemaKey) + + private[resolver] def putSchemaContentListResult( + schemaKey: SchemaKey, + schemas: SchemaContentListLookup + )(implicit + F: Monad[F], + C: Clock[F] + ): F[Either[LookupFailureMap, TimestampedItem[SchemaContentList]]] = + putItemResult(schemaContentLists, schemaKey, schemas) + + private[resolver] def withLockOnSchemaContentList[A](key: SchemaKey)(f: => F[A]): F[A] = + schemaContentListMutex.withLockOn(key)(f) } object ResolverCache { @@ -159,11 +178,21 @@ object ResolverCache { ): F[Option[ResolverCache[F]]] = { if (shouldCreateResolverCache(size, ttl)) { for { - schemas <- C.createSchemaCache(size) - schemaLists <- C.createSchemaListCache(size) - schemaMutex <- C.createMutex[SchemaKey] - listMutex <- C.createMutex[ListCacheKey] - } yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some + schemas <- C.createSchemaCache(size) + schemaLists <- C.createSchemaListCache(size) + schemaContentLists <- C.createSchemaContentListCache(size) + schemaMutex <- C.createMutex[SchemaKey] + listMutex <- C.createMutex[ListCacheKey] + schemaContentListMutex <- C.createMutex[SchemaKey] + } yield new ResolverCache[F]( + schemas, + schemaLists, + schemaContentLists, + schemaMutex, + listMutex, + schemaContentListMutex, + ttl + ).some } else Applicative[F].pure(none) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala index b0127c60..a564d3d6 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala @@ -14,8 +14,12 @@ package com.snowplowanalytics.iglu.client import scala.concurrent.duration.FiniteDuration +import cats.data.NonEmptyList + +import io.circe.Json + // Iglu Core -import com.snowplowanalytics.iglu.core.SchemaList +import com.snowplowanalytics.iglu.core.{SchemaList, SelfDescribingSchema} // This project import resolver.registries.Registry @@ -32,6 +36,8 @@ package object resolver { /** Schema's model */ type Model = Int + type SchemaContentList = NonEmptyList[SelfDescribingSchema[Json]] + /** * Map of all repositories to its aggregated state of failure * None as value means repository already responded with `not-found`, @@ -53,6 +59,13 @@ package object resolver { */ type ListLookup = Either[LookupFailureMap, SchemaList] + /** + * Validated schema content list lookup result containing, cache result + * which is list of self describing schemas in case of success or + * Map of all currently failed repositories in case of failure + */ + type SchemaContentListLookup = Either[LookupFailureMap, SchemaContentList] + /** Time to live for cached items */ type TTL = FiniteDuration @@ -77,4 +90,7 @@ package object resolver { /** Cache entry for schema list lookup results */ type ListCacheEntry = CacheEntry[ListLookup] + /** Cache entry for schema content list lookup results */ + type SchemaContentListCacheEntry = CacheEntry[SchemaContentListLookup] + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala index c7d062fe..340e5649 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala @@ -46,7 +46,8 @@ class ResolverCacheSpec extends Specification { 4.millis, List((key, (2.millis, Right(SchemaItem(Json.Null, None))))), 5, - List() + List(), + Map.empty ) val test = for { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala index 3f051a7c..e57d33e2 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala @@ -13,7 +13,11 @@ package com.snowplowanalytics.iglu.client.resolver import cats.effect.testing.specs2.CatsEffect -import com.snowplowanalytics.iglu.client.resolver.Resolver.{ResolverResult, SchemaLookupResult} +import com.snowplowanalytics.iglu.client.resolver.Resolver.{ + ResolverResult, + SchemaContentListLookupResult, + SchemaLookupResult +} import io.circe.parser.parse import java.net.URI @@ -91,6 +95,11 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE return an error if the second schema of previous revision is invalid $e30 return 3-0-0 (no 1-*-* and 2-*-* schemas) $e31 return 3-0-0 from a registry and 3-1-0 from another one $e32 + return cached schema when ttl not exceeded $e33 + refetch the schema from registry when ttl exceeded $e34 + not cache schema if cache is disabled $e35 + cache errors $e36 + return expected results when called multiple times $e37 """ import ResolverSpec._ @@ -821,4 +830,265 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE getUntilSchemaKey(3, 1, 0), NonEmptyList.of(until300, until310) ) + + def e33 = { + val schemaKey = + SchemaKey( + "com.snowplowanalytics.iglu-test", + "mock_schema", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schema1 = Json.fromInt(1) + val schema2 = Json.fromInt(2) + val responses = List(schema1, schema2).map(_.asRight[RegistryError]) + + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock + implicit val registryLookup: RegistryLookup[StaticLookup] = + ResolverSpecHelpers.getLookup(responses, Nil) + + val result = for { + resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), Repos.httpRep) + response1 <- resolver.lookupSchemasUntilResult(schemaKey) + _ <- StaticLookup.addTime(150.seconds) // ttl 200, delay 150 + response2 <- resolver.lookupSchemasUntilResult(schemaKey) + } yield (response1, response2) + + val (_, (response1, response2)) = + result.run(ResolverSpecHelpers.RegistryState.init).value + + response1 must beRight[SchemaContentListLookupResult].like { + case ResolverResult.Cached(_, value1, _) => + (response1 mustEqual response2) and // same cached (including timestamps) item because it didn't expire + (value1.head.schema mustEqual schema1) + } + } + + def e34 = { + val schemaKey = + SchemaKey( + "com.snowplowanalytics.iglu-test", + "mock_schema", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schema1 = Json.fromInt(1) + val schema2 = Json.fromInt(2) + val responses = List(schema1, schema2).map(_.asRight[RegistryError]) + + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock + implicit val registryLookup: RegistryLookup[StaticLookup] = + ResolverSpecHelpers.getLookup(responses, Nil) + + val result = for { + resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), Repos.httpRep) + response1 <- resolver.lookupSchemasUntilResult(schemaKey) + _ <- StaticLookup.addTime(250.seconds) // ttl 200, delay 250 + response2 <- resolver.lookupSchemasUntilResult(schemaKey) + } yield (response1, response2) + + val (_, (response1, response2)) = + result.run(ResolverSpecHelpers.RegistryState.init).value + + response1 must beRight[SchemaContentListLookupResult].like { + case ResolverResult.Cached(_, value1, timestamp1) => + response2 must beRight[SchemaContentListLookupResult].like { + case ResolverResult.Cached(_, value2, timestamp2) => + (value1.head.self.schemaKey mustEqual schemaKey) and + (value1.head.schema mustEqual schema1) and + (value2.head.self.schemaKey mustEqual schemaKey) and + (value2.head.schema mustEqual schema2) and + (timestamp1 mustNotEqual timestamp2) + } + } + } + + def e35 = { + val schemaKey = + SchemaKey( + "com.snowplowanalytics.iglu-test", + "mock_schema", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schema1 = Json.fromInt(1) + val schema2 = Json.fromInt(2) + val responses = List(schema1, schema2).map(_.asRight[RegistryError]) + + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock + implicit val registryLookup: RegistryLookup[StaticLookup] = + ResolverSpecHelpers.getLookup(responses, Nil) + + val result = for { + resolver <- Resolver.init[StaticLookup](0, None, Repos.httpRep) + response1 <- resolver.lookupSchemasUntilResult(schemaKey) + response2 <- resolver.lookupSchemasUntilResult(schemaKey) + } yield (response1, response2) + + val (_, (response1, response2)) = + result.run(ResolverSpecHelpers.RegistryState.init).value + + response1 must beRight[SchemaContentListLookupResult].like { + case ResolverResult.NotCached(value1) => + response2 must beRight[SchemaContentListLookupResult].like { + case ResolverResult.NotCached(value2) => + (value1.head.self.schemaKey mustEqual schemaKey) and + (value1.head.schema mustEqual schema1) and + (value2.head.self.schemaKey mustEqual schemaKey) and + (value2.head.schema mustEqual schema2) + } + } + } + + def e36 = { + val schemaKey = + SchemaKey( + "com.snowplowanalytics.iglu-test", + "mock_schema", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schema1 = Json.fromInt(1).asRight[RegistryError] + val error1 = RegistryError.NotFound.asLeft[Json] + val responses = List(error1, schema1) + + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock + implicit val registryLookup: RegistryLookup[StaticLookup] = + ResolverSpecHelpers.getLookup(responses, Nil) + + val result = for { + resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), Repos.httpRep) + response1 <- resolver.lookupSchemasUntilResult(schemaKey) + _ <- StaticLookup.addTime(150.seconds) // ttl 200, delay 150 + response2 <- resolver.lookupSchemasUntilResult(schemaKey) + } yield (response1, response2) + + val (_, (response1, response2)) = + result.run(ResolverSpecHelpers.RegistryState.init).value + + response1 must beLeft[SchemaResolutionError].like { case value1 => + response2 must beLeft[SchemaResolutionError].like { case value2 => + value1 mustEqual value2 + } + } + } + + def e37 = { + val schemaKey100 = + SchemaKey( + "com.snowplowanalytics.iglu-test", + "mock_schema", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schemaKey102 = schemaKey100.copy(version = SchemaVer.Full(1, 0, 2)) + val schemaKey103 = schemaKey100.copy(version = SchemaVer.Full(1, 0, 3)) + val schemaKey200 = schemaKey100.copy(version = SchemaVer.Full(2, 0, 0)) + val schemaKey214 = schemaKey100.copy(version = SchemaVer.Full(2, 1, 4)) + + val response100_1 = List( + (schemaKey100, Json.fromString("1-0-0_1").asRight) + ) + + val response102 = List( + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 1)), Json.fromString("1-0-1").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 2)), Json.fromString("1-0-2").asRight) + ) + + val response100_2 = List( + (schemaKey100, Json.fromString("1-0-0_2").asRight) + ) + + val response200 = List( + (schemaKey200, Json.fromString("2-0-0").asRight) + ) + + val response214 = List( + (schemaKey200, Json.fromString("2-0-0").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 0, 1)), Json.fromString("2-0-1").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 0, 2)), Json.fromString("2-0-2").asRight), + (schemaKey200, RegistryError.NotFound.asLeft), + (schemaKey200.copy(version = SchemaVer.Full(2, 1, 0)), Json.fromString("2-1-0").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 1, 1)), Json.fromString("2-1-1").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 1, 2)), Json.fromString("2-1-2").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 1, 3)), Json.fromString("2-1-3").asRight), + (schemaKey200.copy(version = SchemaVer.Full(2, 1, 4)), Json.fromString("2-1-4").asRight) + ) + + val response103_1 = List( + (schemaKey100, Json.fromString("1-0-0_1").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 1)), Json.fromString("1-0-1_1").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 2)), Json.fromString("1-0-2_1").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 3)), Json.fromString("1-0-3_1").asRight) + ) + + val response103_2 = List( + (schemaKey100, Json.fromString("1-0-0_1").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 1)), Json.fromString("1-0-1_2").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 2)), Json.fromString("1-0-2_2").asRight), + (schemaKey100.copy(version = SchemaVer.Full(1, 0, 3)), Json.fromString("1-0-3_2").asRight) + ) + + val responses = + response100_1 ++ response102 ++ response100_2 ++ response200 ++ response214 ++ response103_1 ++ response103_2 + + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock + implicit val registryLookup: RegistryLookup[StaticLookup] = + ResolverSpecHelpers.getLookup(responses.map(_._2), Nil) + + def checkResult( + result: Either[SchemaResolutionError, SchemaContentListLookupResult], + expected: List[(SchemaKey, Either[RegistryError, Json])], + expectedCacheKey: SchemaKey + ) = { + val extractedResult = result.toOption.collect { case ResolverResult.Cached(k, v, _) => + (k, v.toList.map(s => (s.self.schemaKey, s.schema))) + } + val extractedExpected = expected.collect { case (k, Right(j)) => (k, j) } + extractedResult must beSome((expectedCacheKey, extractedExpected)) + } + + val result = for { + resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), Repos.httpRep) + notCached100_1 <- resolver.lookupSchemasUntilResult(schemaKey100) + notCached102 <- resolver.lookupSchemasUntilResult(schemaKey102) + _ <- StaticLookup.addTime(100.seconds) + cached100 <- resolver.lookupSchemasUntilResult(schemaKey100) + cached102 <- resolver.lookupSchemasUntilResult(schemaKey102) + _ <- StaticLookup.addTime(200.seconds) + notCached100_2 <- resolver.lookupSchemasUntilResult(schemaKey100) + _ <- StaticLookup.addTime(300.seconds) + notCached200 <- resolver.lookupSchemasUntilResult(schemaKey200) + _ <- StaticLookup.addTime(100.seconds) + cached200 <- resolver.lookupSchemasUntilResult(schemaKey200) + _ <- StaticLookup.addTime(300.seconds) + notCached214 <- resolver.lookupSchemasUntilResult(schemaKey214) + _ <- StaticLookup.addTime(100.seconds) + cached214 <- resolver.lookupSchemasUntilResult(schemaKey214) + _ <- StaticLookup.addTime(300.seconds) + notCached103_1 <- resolver.lookupSchemasUntilResult(schemaKey103) + _ <- StaticLookup.addTime(100.seconds) + cached103 <- resolver.lookupSchemasUntilResult(schemaKey103) + _ <- StaticLookup.addTime(200.seconds) + notCached103_2 <- resolver.lookupSchemasUntilResult(schemaKey103) + } yield checkResult(notCached100_1, response100_1, schemaKey100) and + checkResult(notCached100_2, response100_2, schemaKey100) and + checkResult(notCached102, response100_1 ++ response102, schemaKey102) and + checkResult(notCached200, response200, schemaKey200) and + checkResult(notCached214, response214, schemaKey214) and + checkResult(notCached103_1, response103_1, schemaKey103) and + checkResult(notCached103_2, response103_2, schemaKey103) and + (cached100 mustEqual notCached100_1) and + (cached102 mustEqual notCached102) and + (cached200 mustEqual notCached200) and + (cached214 mustEqual notCached214) and + (cached103 mustEqual notCached103_1) + + result.run(ResolverSpecHelpers.RegistryState.init).value._2 + } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala index 7edeb4de..ebbdfec4 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala @@ -64,16 +64,31 @@ object ResolverSpecHelpers { time: FiniteDuration, cache: List[(SchemaKey, SchemaCacheEntry)], cacheSize: Int, - schemaLists: List[ListCacheEntry] + schemaLists: List[ListCacheEntry], + schemaContentLists: Map[SchemaKey, SchemaContentListCacheEntry] ) { /** Perform a request and write it to [[requestsCounter]] */ def request(name: String): RegistryState = { val i = requestsCounter.getOrElse(name, 0) + 1 - RegistryState(requestsCounter.updated(name, i), time, cache, cacheSize, schemaLists) + RegistryState( + requestsCounter.updated(name, i), + time, + cache, + cacheSize, + schemaLists, + schemaContentLists + ) } - def tick = RegistryState(requestsCounter, time + 1.milli, cache, cacheSize, schemaLists) + def tick = RegistryState( + requestsCounter, + time + 1.milli, + cache, + cacheSize, + schemaLists, + schemaContentLists + ) def req: Int = requestsCounter.values.sum @@ -87,7 +102,7 @@ object ResolverSpecHelpers { } object RegistryState { - val init = RegistryState(Map.empty, 0.seconds, Nil, 0, Nil) + val init = RegistryState(Map.empty, 0.seconds, Nil, 0, Nil, Map.empty) } type StaticLookup[A] = State[RegistryState, A] @@ -115,6 +130,16 @@ object ResolverSpecHelpers { (state, cache) } + def createSchemaContentListCache( + size: Int + ): StaticLookup[LruMap[StaticLookup, SchemaKey, SchemaContentListCacheEntry]] = + State { s => + val cache: LruMap[StaticLookup, SchemaKey, SchemaContentListCacheEntry] = + StataCacheSchemaContentList + val state = s.copy(cacheSize = size) + (state, cache) + } + def createMutex[K]: StaticLookup[ResolverMutex[StaticLookup, K]] = State.pure(stateMutex[K]) } @@ -228,6 +253,20 @@ object ResolverSpecHelpers { } } + private object StataCacheSchemaContentList + extends LruMap[StaticLookup, SchemaKey, SchemaContentListCacheEntry] { + def get(key: SchemaKey): StaticLookup[Option[SchemaContentListCacheEntry]] = + State.apply[RegistryState, Option[SchemaContentListCacheEntry]] { state => + val result = state.schemaContentLists.get(key) + (state.tick, result) + } + + def put(key: SchemaKey, value: SchemaContentListCacheEntry): StaticLookup[Unit] = + State { state => + (state.copy(schemaContentLists = state.schemaContentLists + (key -> value)).tick, ()) + } + } + private def stateMutex[K]: ResolverMutex[StaticLookup, K] = new ResolverMutex[StaticLookup, K] { def withLockOn[A](key: K)(f: => StaticLookup[A]): StaticLookup[A] = diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 91fae162..60d5ffba 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -68,7 +68,7 @@ object BuildSettings { // clear-out mimaBinaryIssueFilters and mimaPreviousVersions. // Otherwise, add previous version to set without // removing other versions. - val mimaPreviousVersions = Set("3.0.0") + val mimaPreviousVersions = Set() lazy val mimaSettings = Seq( mimaPreviousArtifacts := { From 7c36c6a3d5333fe629ecc601b935d52ad665ad74 Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 30 Sep 2024 16:26:36 +0300 Subject: [PATCH 2/2] Prepare for 4.0.0 release --- CHANGELOG | 4 ++++ README.md | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index e0048668..a2c7ce9a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +Version 4.0.0 (2024-09-30) +-------------------------- +Add caching to lookupSchemasUntil function (#258) + Version 3.2.0 (2024-08-20) -------------------------- Add a function that discovers and looks up schemas without using the list endpoint of an Iglu server (#256) diff --git a/README.md b/README.md index b240c3fb..c83307bf 100644 --- a/README.md +++ b/README.md @@ -11,12 +11,12 @@ Iglu Scala Client is used extensively in **[Snowplow][snowplow-repo]** to valida ## Installation -The latest version of Iglu Scala Client is 3.2.0, which works with Scala 2.12, 2.13, and 3. +The latest version of Iglu Scala Client is 4.0.0, which works with Scala 2.12, 2.13, and 3. If you're using SBT, add the following lines to your build file: ```scala -val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "3.2.0" +val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "4.0.0" ``` ## API