Skip to content

Commit

Permalink
feat: add timeout and retry settings for schema registry connection (#72
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dmrmlvv authored Dec 26, 2024
1 parent 2bacc35 commit 3297b90
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,19 @@ object Schemas {
* Schema configuration that is used to read schema from
* Confluent Schema Registry.
*
* @param id Schema ID
* @param description Schema description
* @param baseUrls List of urls to connect to Schema Registry
* @param schemaId Schema ID to search in schema registry
* @param schemaSubject Schema subject to search in schema registry
* @param version Schema version (by default latest available version is fetched)
* @param validateDefaults Boolean flag enabling or disabling default values validation in Avro schema.
* @param properties List of additional connection properties: sequence of strings in format `key=value`.
* @param headers List of additional HTML headers: sequence of strings in format `key=value`.
* @param metadata List of metadata parameters specific to this schema
* @param id Schema ID
* @param description Schema description
* @param baseUrls List of urls to connect to Schema Registry
* @param schemaId Schema ID to search in schema registry
* @param schemaSubject Schema subject to search in schema registry
* @param version Schema version (by default latest available version is fetched)
* @param validateDefaults Boolean flag enabling or disabling default values validation in Avro schema.
* @param properties List of additional connection properties: sequence of strings in format `key=value`.
* @param headers List of additional HTML headers: sequence of strings in format `key=value`.
* @param metadata List of metadata parameters specific to this schema
* @param connectionTimeoutMs Maximum time in milliseconds to wait for a response from the Schema Registry.
* @param retryAttempts Number of retry attempts in case of a failure.
* @param retryIntervalMs Delay in milliseconds between retry attempts.
*/
final case class RegistrySchemaConfig(
id: ID,
Expand All @@ -155,7 +158,10 @@ object Schemas {
validateDefaults: Boolean = false,
properties: Seq[SparkParam] = Seq.empty,
headers: Seq[SparkParam] = Seq.empty,
metadata: Seq[SparkParam] = Seq.empty
metadata: Seq[SparkParam] = Seq.empty,
connectionTimeoutMs: Int = 60000,
retryAttempts: Int = 3,
retryIntervalMs: Int = 5000
) extends SchemaConfig

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ case class SchemaRegistryConnection(config: RegistrySchemaConfig) {
val baseUrls = config.baseUrls.value.map(_.value).asJava
val connProps = paramsSeqToMap(config.properties.map(_.value)).toOptionMap
val connHeaders = paramsSeqToMap(config.headers.map(_.value)).toOptionMap
val nullProps: java.util.Map[String, Any] = null
val defaultProps = Map(
"host.http.connect.timeout.ms" -> config.connectionTimeoutMs.toString,
"http.read.timeout.ms" -> config.connectionTimeoutMs.toString
)

(connProps, connHeaders) match {
case (Some(props), Some(headers)) =>
new CachedSchemaRegistryClient(baseUrls, cacheCapacity, props.asJava, headers.asJava)
case (Some(props), None) => new CachedSchemaRegistryClient(baseUrls, cacheCapacity, props.asJava)
val enhancedProps = props ++ defaultProps
new CachedSchemaRegistryClient(baseUrls, cacheCapacity, enhancedProps.asJava, headers.asJava)
case (Some(props), None) =>
val enhancedProps = props ++ defaultProps
new CachedSchemaRegistryClient(baseUrls, cacheCapacity, enhancedProps.asJava)
case (None, Some(headers)) =>
new CachedSchemaRegistryClient(baseUrls, cacheCapacity, nullProps, headers.asJava)
case _ => new CachedSchemaRegistryClient(baseUrls, cacheCapacity)
new CachedSchemaRegistryClient(baseUrls, cacheCapacity, defaultProps.asJava, headers.asJava)
case _ => new CachedSchemaRegistryClient(baseUrls, cacheCapacity, defaultProps.asJava)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import org.checkita.dqf.connections.schemaregistry.SchemaRegistryConnection
import org.checkita.dqf.utils.ResultUtils._

import java.io.File
import scala.util.Try
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

object SchemaReaders {

Expand Down Expand Up @@ -232,6 +233,26 @@ object SchemaReaders {
}

implicit object RegistrySchemaReader extends SchemaReader[RegistrySchemaConfig] {
/**
* Executes a function with retries in case of failure, waiting for a fixed delay between attempts.
*
* @param attempts Maximum number of attempts. Must be greater than 0.
* @param backOffMs Delay in milliseconds between retries.
* @param fn The function to execute.
* @return The result of the successfully executed function, or throws an exception if all attempts fail.
*/
@tailrec
private def retry(attempts: Int, backOffMs: Int)(fn: => String): String = {
Try(fn) match {
case Success(result) => result
case Failure(e) =>
if (attempts > 1) {
Thread.sleep(backOffMs)
retry(attempts - 1, backOffMs)(fn)
} else throw e
}
}

/**
* Tries to read schema given the schema configuration
*
Expand All @@ -242,8 +263,14 @@ object SchemaReaders {
override def tryToRead(config: RegistrySchemaConfig)(implicit spark: SparkSession): SourceSchema = {
val conn = SchemaRegistryConnection(config)
val rawSchema = (config.schemaId, config.schemaSubject.map(_.value)) match {
case (Some(id), None) => conn.getSchemaById(id, config.version)
case (None, Some(subject)) => conn.getSchemaBySubject(subject, config.version)
case (Some(id), None) =>
retry(config.retryAttempts, config.retryIntervalMs){
conn.getSchemaById(id, config.version)
}
case (None, Some(subject)) =>
retry(config.retryAttempts, config.retryIntervalMs){
conn.getSchemaBySubject(subject, config.version)
}
case _ => throw new IllegalArgumentException(
"Schema can be read from registry either by its ID or by its subject but not both."
)
Expand Down
4 changes: 4 additions & 0 deletions docs/03-job-configuration/02-Schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ To retrieve schema from Confluent Schema Registry it is required to set up follo
sequence of strings in format `key=value`.
* `metadata` - *Optional*. List of user-defined metadata parameters specific to this schema where each parameter
is a string in format: `param.name=param.value`.
* `connectionTimeoutMs` - *Optional*, default is 60000. Maximum time in milliseconds to wait
for a response from the Schema Registry.
* `retryAttempts` - *Optional*, default is 3. Number of retry attempts in case of a failure.
* `retryIntervalMs` - *Optional*, default is 5000. Delay in milliseconds between retry attempts.

## Supported Type Literals

Expand Down

0 comments on commit 3297b90

Please sign in to comment.