From 29e6fbdbaeefb7847fe4a6a6295f255d048d0b52 Mon Sep 17 00:00:00 2001 From: Mykola Nikulesko Date: Tue, 3 Oct 2023 17:17:16 +0300 Subject: [PATCH] =?UTF-8?q?ADAPT-1642=20Throw=20an=20exception=20if=20sche?= =?UTF-8?q?ma=20wasn=E2=80=99t=20created?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/programs/CreateTopicProgram.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index 9a8bc98ce..b1b66afbe 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -12,6 +12,7 @@ import org.apache.avro.Schema import retry.syntax.all._ import retry.{RetryDetails, RetryPolicy, _} import cats.implicits._ +import hydra.common.logging.LoggingAdapter import hydra.kafka.model.TopicMetadataV2Request.Subject import scala.language.higherKinds @@ -25,7 +26,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr v2MetadataTopicName: Subject, metadataAlgebra: MetadataAlgebra[F], validator: KeyAndValueSchemaV2Validator[F] - ) (implicit eff: Sync[F]){ + ) (implicit eff: Sync[F]) extends LoggingAdapter { private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = { (error, retryDetails) => @@ -34,6 +35,10 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr ) } + private def logInfo(message: String) = { + log.info(s"CreateTopicProgram: $message") + } + private def registerSchema( subject: Subject, schema: Schema, @@ -49,8 +54,14 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr schemaRegistry.registerSchema(suffixedSubject, schema) *> schemaRegistry.getVersion(suffixedSubject, schema).map { newSchemaVersion => - if (previousSchemaVersion.contains(newSchemaVersion)) None - else Some(newSchemaVersion) + if (previousSchemaVersion.contains(newSchemaVersion)) { + logInfo(s"Schema for the topic $subject with the version $newSchemaVersion already exists.") + None + } + else { + logInfo(s"Schema for the topic $subject was created. The version is $newSchemaVersion.") + Some(newSchemaVersion) + } } } }.retryingOnAllErrors(retryPolicy, onFailure("RegisterSchema")) @@ -58,6 +69,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr .makeCase(registerSchema)((newVersionMaybe, exitCase) => (exitCase, newVersionMaybe) match { case (ExitCase.Error(_), Some(newVersion)) => + logInfo(s"Delete a schema for the topic $subject.") schemaRegistry.deleteSchemaOfVersion(suffixedSubject, newVersion) case _ => Bracket[F, Throwable].unit } @@ -95,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr Resource .makeCase(createTopic)({ case (Some(_), ExitCase.Error(_)) => + logInfo(s"Clean a resource for the topic $subject.") kafkaAdmin.deleteTopic(subject.value) case _ => Bracket[F, Throwable].unit })