Skip to content

Commit

Permalink
ADAPT-1642 Throw an exception if schema wasn’t created
Browse files Browse the repository at this point in the history
  • Loading branch information
Mykola Nikulesko committed Oct 3, 2023
1 parent de07e5c commit 29e6fbd
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.avro.Schema
import retry.syntax.all._
import retry.{RetryDetails, RetryPolicy, _}
import cats.implicits._
import hydra.common.logging.LoggingAdapter
import hydra.kafka.model.TopicMetadataV2Request.Subject

import scala.language.higherKinds
Expand All @@ -25,7 +26,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
v2MetadataTopicName: Subject,
metadataAlgebra: MetadataAlgebra[F],
validator: KeyAndValueSchemaV2Validator[F]
) (implicit eff: Sync[F]){
) (implicit eff: Sync[F]) extends LoggingAdapter {

private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = {
(error, retryDetails) =>
Expand All @@ -34,6 +35,10 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
)
}

private def logInfo(message: String) = {
log.info(s"CreateTopicProgram: $message")
}

private def registerSchema(
subject: Subject,
schema: Schema,
Expand All @@ -49,15 +54,22 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
schemaRegistry.registerSchema(suffixedSubject, schema) *>
schemaRegistry.getVersion(suffixedSubject, schema).map {
newSchemaVersion =>
if (previousSchemaVersion.contains(newSchemaVersion)) None
else Some(newSchemaVersion)
if (previousSchemaVersion.contains(newSchemaVersion)) {
logInfo(s"Schema for the topic $subject with the version $newSchemaVersion already exists.")
None
}
else {
logInfo(s"Schema for the topic $subject was created. The version is $newSchemaVersion.")
Some(newSchemaVersion)
}
}
}
}.retryingOnAllErrors(retryPolicy, onFailure("RegisterSchema"))
Resource
.makeCase(registerSchema)((newVersionMaybe, exitCase) =>
(exitCase, newVersionMaybe) match {
case (ExitCase.Error(_), Some(newVersion)) =>
logInfo(s"Delete a schema for the topic $subject.")
schemaRegistry.deleteSchemaOfVersion(suffixedSubject, newVersion)
case _ => Bracket[F, Throwable].unit
}
Expand Down Expand Up @@ -95,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
Resource
.makeCase(createTopic)({
case (Some(_), ExitCase.Error(_)) =>
logInfo(s"Clean a resource for the topic $subject.")
kafkaAdmin.deleteTopic(subject.value)
case _ => Bracket[F, Throwable].unit
})
Expand Down

0 comments on commit 29e6fbd

Please sign in to comment.