Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/4.0.0 #259

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 4.0.0 (2024-09-30)
--------------------------
Add caching to lookupSchemasUntil function (#258)

Version 3.2.0 (2024-08-20)
--------------------------
Add a function that discovers and looks up schemas without using the list endpoint of an Iglu server (#256)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ Iglu Scala Client is used extensively in **[Snowplow][snowplow-repo]** to valida

## Installation

The latest version of Iglu Scala Client is 3.2.0, which works with Scala 2.12, 2.13, and 3.
The latest version of Iglu Scala Client is 4.0.0, which works with Scala 2.12, 2.13, and 3.

If you're using SBT, add the following lines to your build file:

```scala
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "3.2.0"
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "4.0.0"
```

## API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trait CreateResolverCache[F[_]] {

def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]]

def createSchemaContentListCache(size: Int): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]]

def createMutex[K]: F[ResolverMutex[F, K]]

}
Expand All @@ -43,6 +45,10 @@ object CreateResolverCache {
override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] =
createLruMap(size)

override def createSchemaContentListCache(
size: Int
): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]] =
createLruMap(size)
}

implicit def idCreateResolverCache: CreateResolverCache[Id] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
)) && custom.values.flatMap(_.errors).forall(_ == RegistryError.NotFound)
}

/**
* The variant of lookupSchemasUntilResult that returns the result
* that isn't wrapped with ResolverResult
*/
def lookupSchemasUntil(
maxSchemaKey: SchemaKey
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] =
lookupSchemasUntilResult(maxSchemaKey).map(_.map(_.value))

/**
* Looks up all the schemas with the same model until `maxSchemaKey`.
* For the schemas of previous revisions, it starts with addition = 0
Expand All @@ -209,49 +222,95 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* @return All the schemas if all went well, [[Resolver.SchemaResolutionError]] with the first error that happened
* while looking up the schemas if something went wrong.
*/
def lookupSchemasUntil(
def lookupSchemasUntilResult(
maxSchemaKey: SchemaKey
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
def go(
current: SchemaVer.Full,
acc: List[SelfDescribingSchema[Json]]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
val currentSchemaKey = maxSchemaKey.copy(version = current)
lookupSchema(currentSchemaKey).flatMap {
case Left(e) =>
if (current.addition === 0)
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
else if (current.revision < maxSchemaKey.version.revision && isNotFound(e))
go(current.copy(revision = current.revision + 1, addition = 0), acc)
else
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
case Right(json) =>
if (current.revision < maxSchemaKey.version.revision)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else if (current.addition < maxSchemaKey.version.addition)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else
Monad[F].pure(
Right(
NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse
): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] = {
def get(): F[Either[SchemaResolutionError, SchemaContentList]] = {
def go(
current: SchemaVer.Full,
acc: List[SelfDescribingSchema[Json]]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
val currentSchemaKey = maxSchemaKey.copy(version = current)
lookupSchema(currentSchemaKey).flatMap {
case Left(e) =>
if (current.addition === 0)
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
else if (current.revision < maxSchemaKey.version.revision && isNotFound(e))
go(current.copy(revision = current.revision + 1, addition = 0), acc)
else
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
case Right(json) =>
if (current.revision < maxSchemaKey.version.revision)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
)
else if (current.addition < maxSchemaKey.version.addition)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else
Monad[F].pure(
Right(
NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse
)
)
}
}

go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil)
}

go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil)
def handleAfterFetch(
result: Either[SchemaResolutionError, SchemaContentList]
): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] =
cache match {
case Some(c) =>
val updated = result.leftMap(e => resolutionErrorToFailureMap(e))
c.putSchemaContentListResult(maxSchemaKey, updated).map {
case Right(ResolverCache.TimestampedItem(i, t)) =>
Right(ResolverResult.Cached(maxSchemaKey, i, t))
case Left(failure) =>
val schemaKey = result.leftMap(_.schemaKey).left.getOrElse(maxSchemaKey)
Left(SchemaResolutionError(schemaKey, resolutionError(failure)))
}
case None =>
result
.map[SchemaContentListLookupResult](ResolverResult.NotCached(_))
.pure[F]
}

def lockAndLookup: F[Either[SchemaResolutionError, SchemaContentListLookupResult]] =
withLockOnSchemaContentList(maxSchemaKey) {
getSchemaContentListFromCache(maxSchemaKey).flatMap {
case Some(TimestampedItem(Right(i), t)) =>
Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t)))
case Some(TimestampedItem(Left(_), _)) | None =>
for {
result <- get()
fixed <- handleAfterFetch(result)
} yield fixed
}
}

getSchemaContentListFromCache(maxSchemaKey).flatMap {
case Some(TimestampedItem(Right(i), t)) =>
Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t)))
case Some(TimestampedItem(Left(_), _)) | None =>
lockAndLookup
}
}

def resolutionErrorToFailureMap(resolutionError: SchemaResolutionError): LookupFailureMap =
resolutionError.error.value.toMap.flatMap { case (key, value) =>
allRepos.find(_.config.name == key).map((_, value))
}

/**
* Get list of available schemas for particular vendor and name part
* Server supposed to return them in proper order
Expand Down Expand Up @@ -428,6 +487,12 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => f
}

private def withLockOnSchemaContentList[A](schemaKey: SchemaKey)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaContentList(schemaKey)(f)
case None => f
}

private def getSchemaListFromCache(
vendor: Vendor,
name: Name,
Expand All @@ -441,15 +506,26 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => Monad[F].pure(None)
}

private def getSchemaContentListFromCache(
schemaKey: SchemaKey
)(implicit
F: Monad[F],
C: Clock[F]
): F[Option[ResolverCache.TimestampedItem[SchemaContentListLookup]]] =
cache match {
case Some(c) => c.getTimestampedSchemaContentList(schemaKey)
case None => Monad[F].pure(None)
}
}

/** Companion object. Lets us create a Resolver from a Json */
object Resolver {

type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SupersededBy = Option[SchemaVer.Full]
type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SchemaContentListLookupResult = ResolverResult[SchemaKey, SchemaContentList]
type SupersededBy = Option[SchemaVer.Full]

/**
* The result of doing schema lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import Resolver.SchemaItem
class ResolverCache[F[_]] private (
schemas: LruMap[F, SchemaKey, SchemaCacheEntry],
schemaLists: LruMap[F, ListCacheKey, ListCacheEntry],
schemaContentLists: LruMap[F, SchemaKey, SchemaContentListCacheEntry],
schemaMutex: ResolverMutex[F, SchemaKey],
schemaListMutex: ResolverMutex[F, ListCacheKey],
schemaContentListMutex: ResolverMutex[F, SchemaKey],
val ttl: Option[TTL]
) {

Expand Down Expand Up @@ -144,6 +146,23 @@ class ResolverCache[F[_]] private (
f: => F[A]
): F[A] =
schemaListMutex.withLockOn((vendor, name, model))(f)

private[resolver] def getTimestampedSchemaContentList(
schemaKey: SchemaKey
)(implicit F: Monad[F], C: Clock[F]): F[Option[TimestampedItem[SchemaContentListLookup]]] =
getTimestampedItem(ttl, schemaContentLists, schemaKey)

private[resolver] def putSchemaContentListResult(
schemaKey: SchemaKey,
schemas: SchemaContentListLookup
)(implicit
F: Monad[F],
C: Clock[F]
): F[Either[LookupFailureMap, TimestampedItem[SchemaContentList]]] =
putItemResult(schemaContentLists, schemaKey, schemas)

private[resolver] def withLockOnSchemaContentList[A](key: SchemaKey)(f: => F[A]): F[A] =
schemaContentListMutex.withLockOn(key)(f)
}

object ResolverCache {
Expand All @@ -159,11 +178,21 @@ object ResolverCache {
): F[Option[ResolverCache[F]]] = {
if (shouldCreateResolverCache(size, ttl)) {
for {
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
} yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaContentLists <- C.createSchemaContentListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
schemaContentListMutex <- C.createMutex[SchemaKey]
} yield new ResolverCache[F](
schemas,
schemaLists,
schemaContentLists,
schemaMutex,
listMutex,
schemaContentListMutex,
ttl
).some
} else
Applicative[F].pure(none)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ package com.snowplowanalytics.iglu.client

import scala.concurrent.duration.FiniteDuration

import cats.data.NonEmptyList

import io.circe.Json

// Iglu Core
import com.snowplowanalytics.iglu.core.SchemaList
import com.snowplowanalytics.iglu.core.{SchemaList, SelfDescribingSchema}

// This project
import resolver.registries.Registry
Expand All @@ -32,6 +36,8 @@ package object resolver {
/** Schema's model */
type Model = Int

type SchemaContentList = NonEmptyList[SelfDescribingSchema[Json]]

/**
* Map of all repositories to its aggregated state of failure
* None as value means repository already responded with `not-found`,
Expand All @@ -53,6 +59,13 @@ package object resolver {
*/
type ListLookup = Either[LookupFailureMap, SchemaList]

/**
* Validated schema content list lookup result containing, cache result
* which is list of self describing schemas in case of success or
* Map of all currently failed repositories in case of failure
*/
type SchemaContentListLookup = Either[LookupFailureMap, SchemaContentList]

/** Time to live for cached items */
type TTL = FiniteDuration

Expand All @@ -77,4 +90,7 @@ package object resolver {
/** Cache entry for schema list lookup results */
type ListCacheEntry = CacheEntry[ListLookup]

/** Cache entry for schema content list lookup results */
type SchemaContentListCacheEntry = CacheEntry[SchemaContentListLookup]

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ResolverCacheSpec extends Specification {
4.millis,
List((key, (2.millis, Right(SchemaItem(Json.Null, None))))),
5,
List()
List(),
Map.empty
)

val test = for {
Expand Down
Loading
Loading