diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index 9a8bc98ce..b1b66afbe 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -12,6 +12,7 @@ import org.apache.avro.Schema import retry.syntax.all._ import retry.{RetryDetails, RetryPolicy, _} import cats.implicits._ +import hydra.common.logging.LoggingAdapter import hydra.kafka.model.TopicMetadataV2Request.Subject import scala.language.higherKinds @@ -25,7 +26,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr v2MetadataTopicName: Subject, metadataAlgebra: MetadataAlgebra[F], validator: KeyAndValueSchemaV2Validator[F] - ) (implicit eff: Sync[F]){ + ) (implicit eff: Sync[F]) extends LoggingAdapter { private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = { (error, retryDetails) => @@ -34,6 +35,10 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr ) } + private def logInfo(message: String) = { + log.info(s"CreateTopicProgram: $message") + } + private def registerSchema( subject: Subject, schema: Schema, @@ -49,8 +54,14 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr schemaRegistry.registerSchema(suffixedSubject, schema) *> schemaRegistry.getVersion(suffixedSubject, schema).map { newSchemaVersion => - if (previousSchemaVersion.contains(newSchemaVersion)) None - else Some(newSchemaVersion) + if (previousSchemaVersion.contains(newSchemaVersion)) { + logInfo(s"Schema for the topic $subject with the version $newSchemaVersion already exists.") + None + } + else { + logInfo(s"Schema for the topic $subject was created. The version is $newSchemaVersion.") + Some(newSchemaVersion) + } } } }.retryingOnAllErrors(retryPolicy, onFailure("RegisterSchema")) @@ -58,6 +69,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr .makeCase(registerSchema)((newVersionMaybe, exitCase) => (exitCase, newVersionMaybe) match { case (ExitCase.Error(_), Some(newVersion)) => + logInfo(s"Delete a schema for the topic $subject.") schemaRegistry.deleteSchemaOfVersion(suffixedSubject, newVersion) case _ => Bracket[F, Throwable].unit } @@ -95,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr Resource .makeCase(createTopic)({ case (Some(_), ExitCase.Error(_)) => + logInfo(s"Clean a resource for the topic $subject.") kafkaAdmin.deleteTopic(subject.value) case _ => Bracket[F, Throwable].unit })