Skip to content

Commit

Permalink
Resolve schemas in parallel (#85)
Browse files Browse the repository at this point in the history
I have seen examples from our apps where cpu usage and event throughput
periodically drops.  This appears to coincide with the Iglu cache
expiration time.

I believe this happens because all schemas tend to expire at the same
time and need to be re-fetched by iglu-scala-client. Currently, we
traverse over schemas sequentially, so we need to wait for each success
before fetching the next schema.  For a pipeline using many schemas,
this can be a long period of downtime (several seconds) as we pause for
schema resolution.

This commit changes to resolving schemas in parallel, so the downtime
pauses should be shorter.
  • Loading branch information
istreeter authored Sep 16, 2024
1 parent ae0e57e commit e0f8592
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.Sync
import cats.effect.Async
import cats.effect.implicits._
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
Expand Down Expand Up @@ -46,7 +47,7 @@ object NonAtomicFields {
failure: FailureDetails.LoaderIgluError
)

def resolveTypes[F[_]: Sync: RegistryLookup](
def resolveTypes[F[_]: Async: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]],
filterCriteria: List[SchemaCriterion]
Expand All @@ -61,7 +62,7 @@ object NonAtomicFields {
// Remove whole schema family if there is no subversion left after filtering
subVersions.nonEmpty
}
.traverse { case (tabledEntity, subVersions) =>
.parTraverse { case (tabledEntity, subVersions) =>
SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
Expand Down

0 comments on commit e0f8592

Please sign in to comment.