Skip to content

Commit

Permalink
NOTICKET: Replace cutoff-dates with enum based additional validations…
Browse files Browse the repository at this point in the history
… on new topics (#871)
  • Loading branch information
aman-minz authored Nov 20, 2023
1 parent 74ae346 commit 96c5d49
Show file tree
Hide file tree
Showing 24 changed files with 338 additions and 170 deletions.
12 changes: 2 additions & 10 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ object AppConfig {
bootstrapServers: String,
defaultNumPartions: Int,
defaultReplicationFactor: Short,
defaultMinInsyncReplicas: Short,
timestampValidationCutoffDate: Instant,
defaultLoopHoleCutoffDate: Instant
defaultMinInsyncReplicas: Short
)

private[app] implicit val dateStringToInstantDecoder: ConfigDecoder[String, Instant] =
Expand All @@ -98,13 +96,7 @@ object AppConfig {
env("HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS").as[String],
env("HYDRA_DEFAULT_PARTIONS").as[Int].default(10),
env("HYDRA_REPLICATION_FACTOR").as[Short].default(3),
env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2),
env("TIMESTAMP_VALIDATION_CUTOFF_DATE_IN_YYYYMMDD")
.as[Instant]
.default(Instant.parse("2023-08-31T00:00:00Z")),
env("DEFAULT_LOOPHOLE_CUTOFF_DATE_IN_YYYYMMDD")
.as[Instant]
.default(Instant.parse("2023-08-31T00:00:00Z"))
env("HYDRA_MIN_INSYNC_REPLICAS").as[Short].default(2)
).parMapN(CreateTopicConfig)

private implicit val subjectConfigDecoder: ConfigDecoder[String, Subject] =
Expand Down
12 changes: 8 additions & 4 deletions ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact"))
)
Expand Down Expand Up @@ -93,7 +94,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(
dvsConsumersTopicConfig.numPartitions,
Expand Down Expand Up @@ -124,7 +126,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(
cooTopicConfig.numPartitions,
Expand Down Expand Up @@ -152,7 +155,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private (
Some("Data-Platform"),
None,
List.empty,
None
None,
additionalValidations = None
),
TopicDetails(cfg.numPartitions, cfg.replicationFactor, cfg.minInsyncReplicas, Map("cleanup.policy" -> "compact")))
}
Expand Down
6 changes: 2 additions & 4 deletions ingest/src/main/scala/hydra.ingest/modules/Programs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private(
algebras.kafkaClient,
retryPolicy,
cfg.metadataTopicsConfig.topicNameV2,
algebras.metadata,
cfg.createTopicConfig.defaultLoopHoleCutoffDate
algebras.metadata
)

val ingestionFlow: IngestionFlow[F] = new IngestionFlow[F](
Expand All @@ -45,8 +44,7 @@ final class Programs[F[_]: Logger: Sync: Timer: Mode: Concurrent] private(
algebras.schemaRegistry,
algebras.kafkaClient,
cfg.createTopicConfig.schemaRegistryConfig.fullUrl,
algebras.metadata,
cfg.createTopicConfig.timestampValidationCutoffDate
algebras.metadata
)

val topicDeletion: TopicDeletionProgram[F] = new TopicDeletionProgram[F](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ import hydra.core.transport.ValidationStrategy
import hydra.kafka.algebras.{KafkaClientAlgebra, MetadataAlgebra}
import hydra.kafka.algebras.KafkaClientAlgebra.PublishResponse
import hydra.kafka.model.TopicMetadataV2Request.Subject
import hydra.kafka.model.{AdditionalValidation, SchemaAdditionalValidation}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scalacache._
import scalacache.guava._
import scalacache.memoization._

import java.time.Instant
import scala.concurrent.duration._
import scala.language.higherKinds
import scala.jdk.CollectionConverters.asScalaBufferConverter
import scala.util.{Failure, Try}

final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode](
schemaRegistry: SchemaRegistry[F],
kafkaClient: KafkaClientAlgebra[F],
schemaRegistryBaseUrl: String,
metadata: MetadataAlgebra[F],
timestampValidationCutoffDate: Instant)
metadata: MetadataAlgebra[F])
(implicit guavaCache: Cache[SchemaWrapper]){

import IngestionFlowV2._
Expand Down Expand Up @@ -78,8 +78,7 @@ final class IngestionFlowV2[F[_]: MonadError[*[_], Throwable]: Mode](

for {
metadata <- metadata.getMetadataFor(topic)
schemaCreationDate = metadata.map(_.value.createdDate).getOrElse(Instant.now())
useTimestampValidation = schemaCreationDate.isAfter(timestampValidationCutoffDate)
useTimestampValidation = AdditionalValidation.isPresent(metadata, SchemaAdditionalValidation.timestampMillis)
kSchema <- getSchemaWrapper(topic, isKey = true)
vSchema <- getSchemaWrapper(topic, isKey = false)
k <- MonadError[F, Throwable].fromTry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ final class IngestionEndpointSpec
} yield sr).unsafeRunSync
val metadata = (for {
m <- TestMetadataAlgebra()
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now)
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m)
} yield m).unsafeRunSync
new IngestionEndpoint(
new IngestionFlow[IO](schemaReg, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal"),
new IngestionFlowV2[IO](SchemaRegistry.test[IO].unsafeRunSync, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistryUrl.notreal",
metadata, timestampValidationCutoffDate), noAuth
metadata), noAuth
).route
}

Expand Down Expand Up @@ -104,11 +104,11 @@ final class IngestionEndpointSpec
_ <- schemaRegistry.registerSchema("dvs.blah.composit-key", compositeKey)
_ <- schemaRegistry.registerSchema("dvs.blah.composit-value", simpleSchema)
m <- TestMetadataAlgebra()
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m, Instant.now)
_ <- TopicUtils.updateTopicMetadata(List(testSubject.value), m)
} yield {
new IngestionEndpoint(
new IngestionFlow[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal"),
new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m, timestampValidationCutoffDate),
new IngestionFlowV2[IO](schemaRegistry, KafkaClientAlgebra.test[IO].unsafeRunSync, "https://schemaregistry.notreal", m),
noAuth
).route
}).unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class BootstrapSpec extends AnyWordSpecLike with Matchers with NotificationsTest
kafkaClient,
retry,
metadataSubjectV2,
metadata,
Instant.parse("2023-07-05T00:00:00Z")
metadata
)
boot <- Bootstrap.make[IO](c, metadataConfig, consumersTopicConfig, consumerOffsetsOffsetsTopicConfig, kafkaAdmin, tagsTopicConfig)
_ <- boot.bootstrapAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
Some("dvs-teamName"),
None,
List.empty,
Some("notificationUrl")
Some("notificationUrl"),
additionalValidations = None
)

private def buildSchema(topic: String, upgrade: Boolean): Schema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scalacache.Cache
import scalacache.guava.GuavaCache

import java.time.Instant
import scala.concurrent.ExecutionContext

final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers {
Expand All @@ -31,9 +30,6 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers {

private val testSubject: Subject = Subject.createValidated("dvs.test.v0.Testing").get
private val testSubjectV1: Subject = Subject.createValidated("dvs.test.v1.Testing").get
private val timestampValidationCutoffDate: Instant = Instant.parse("2023-07-11T00:00:00Z")
private val preTimestampValidationCutoffDate: Instant = Instant.parse("2023-05-30T00:00:00Z")
private val postTimestampValidationCutoffDate: Instant = Instant.parse("2023-07-30T00:00:00Z")

private val testKeyPayload: String =
"""{"id": "testing"}"""
Expand All @@ -55,16 +51,14 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers {
implicit val guavaCache: Cache[SchemaWrapper] = GuavaCache[SchemaWrapper]

private def ingest(request: V2IngestRequest, altValueSchema: Option[Schema] = None,
altSubject: Option[Subject] = None,
createdDate: Instant = Instant.now,
timestampValidationCutoffDate: Instant = timestampValidationCutoffDate): IO[KafkaClientAlgebra[IO]] = for {
altSubject: Option[Subject] = None, existingTopic: Boolean = false): IO[KafkaClientAlgebra[IO]] = for {
schemaRegistry <- SchemaRegistry.test[IO]
_ <- schemaRegistry.registerSchema(altSubject.getOrElse(testSubject.value) + "-key", testKeySchema)
_ <- schemaRegistry.registerSchema(altSubject.getOrElse(testSubject.value) + "-value", altValueSchema.getOrElse(testValSchema))
kafkaClient <- KafkaClientAlgebra.test[IO]
m <- TestMetadataAlgebra()
_ <- TopicUtils.updateTopicMetadata(List(altSubject.getOrElse(testSubject.value).toString), m, createdDate)
ingestFlow <- IO(new IngestionFlowV2[IO](schemaRegistry, kafkaClient, "https://schemaRegistry.notreal", m, timestampValidationCutoffDate))
_ <- if (existingTopic) TopicUtils.updateTopicMetadata(List(altSubject.getOrElse(testSubject.value).toString), m) else IO()
ingestFlow <- IO(new IngestionFlowV2[IO](schemaRegistry, kafkaClient, "https://schemaRegistry.notreal", m))
_ <- ingestFlow.ingest(request, altSubject.getOrElse(testSubject))
} yield kafkaClient

Expand Down Expand Up @@ -188,55 +182,51 @@ final class IngestionFlowV2Spec extends AnyFlatSpec with Matchers {
IngestionFlowV2.validateKeyAndValueSchemas(key, None) shouldBe a[Right[Throwable,Unit]]
}

it should "accept a logical field type of timestamp-millis having a value 0 before timestamp-millis validation cut-off date" in {
it should "[existing-topic] accept a logical field type of timestamp-millis having a value 0" in {
val testValPayloadV1: String = s"""{"testTimestamp": 0}"""
val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false)

ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, createdDate = preTimestampValidationCutoffDate).flatMap { kafkaClient =>
ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, existingTopic = true).flatMap { kafkaClient =>
kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages =>
val firstMessage = publishedMessages.head
(firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1)
}
}.unsafeRunSync()
}

it should "accept a logical field type of timestamp-millis having a negative value -2 before timestamp-millis validation cut-off date" in {
it should "[exiting-topic] accept a logical field type of timestamp-millis having a negative value -2" in {
val testValPayloadV1: String = s"""{"testTimestamp": -2}"""
val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false)

ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, createdDate = preTimestampValidationCutoffDate).flatMap { kafkaClient =>
ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some, existingTopic = true).flatMap { kafkaClient =>
kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages =>
val firstMessage = publishedMessages.head
(firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1)
}
}.unsafeRunSync()
}

it should "throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a value 0 after timestamp-millis validation " +
"cut-off date" in {
it should "[new-topic] throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a value 0" in {
val testValPayloadV1: String = s"""{"testTimestamp": 0}"""
val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false)
the[AvroConversionAugmentedException] thrownBy ingest(testRequest, createdDate = postTimestampValidationCutoffDate).unsafeRunSync()
the[AvroConversionAugmentedException] thrownBy ingest(testRequest).unsafeRunSync()
}

it should "throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a negative value -2 after timestamp-millis " +
"validation cut-off date" in {
it should "[new-topic] throw an AvroConversionAugmentedException if a logical type(timestamp-millis) field is having a negative value -2" in {
val testValPayloadV1: String = s"""{"testTimestamp": -2}"""
val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false)
the[AvroConversionAugmentedException] thrownBy ingest(testRequest, createdDate = postTimestampValidationCutoffDate).unsafeRunSync()
the[AvroConversionAugmentedException] thrownBy ingest(testRequest).unsafeRunSync()
}

it should "accept a logical field type of timestamp-millis having a valid value 123 after validation cut-off date" in {
it should "[new-topic] accept a logical field type of timestamp-millis having a valid value 123" in {
val testValPayloadV1: String = s"""{"testTimestamp": 123}"""
val testRequest = V2IngestRequest(testKeyPayload, testValPayloadV1.some, ValidationStrategy.Strict.some, useSimpleJsonFormat = false)

ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some,
createdDate = postTimestampValidationCutoffDate).flatMap { kafkaClient =>
ingest(testRequest, testValSchemaForV1.some, testSubjectV1.some).flatMap { kafkaClient =>
kafkaClient.consumeMessages(testSubjectV1.value, "test-consumer", commitOffsets = false).take(1).compile.toList.map { publishedMessages =>
val firstMessage = publishedMessages.head
(firstMessage._1.toString, firstMessage._2.get.toString) shouldBe(testKeyPayload, testValPayloadV1)
}
}.unsafeRunSync()
}

}
11 changes: 5 additions & 6 deletions ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ package hydra.ingest.utils
import cats.data.NonEmptyList
import cats.effect.IO
import cats.implicits._
import hydra.avro.registry.SchemaRegistry
import hydra.avro.registry.SchemaRegistry.SchemaId
import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer
import hydra.kafka.algebras.TestMetadataAlgebra
import hydra.kafka.model.ContactMethod.Email
import hydra.kafka.model.TopicMetadataV2Request.Subject
import hydra.kafka.model._
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.SchemaBuilder

import java.time.Instant

object TopicUtils {

def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO], createdDate: Instant): IO[List[Unit]] = {
def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO]): IO[List[Unit]] = {
topics.traverse(topic => {
val keySchema = SchemaBuilder.record(topic + "Key").fields.requiredInt("test").endRecord()
val valueSchema = SchemaBuilder.record(topic + "Value").fields.requiredInt("test").endRecord()
Expand All @@ -28,13 +26,14 @@ object TopicUtils {
deprecatedDate = None,
Public,
NonEmptyList.of(Email.create("[email protected]").get),
createdDate,
Instant.now(),
List.empty,
None,
Some("dvs-teamName"),
None,
List.empty,
Some("notificationUrl")
Some("notificationUrl"),
additionalValidations = None
)
val topicMetadataContainer = TopicMetadataContainer(
topicMetadataKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package hydra.kafka.model

import enumeratum.{Enum, EnumEntry}
import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer

import scala.collection.immutable

sealed trait AdditionalValidation extends EnumEntry

sealed trait SchemaAdditionalValidation extends AdditionalValidation

object SchemaAdditionalValidation extends Enum[SchemaAdditionalValidation] {

case object defaultInRequiredField extends SchemaAdditionalValidation
case object timestampMillis extends SchemaAdditionalValidation

override val values: immutable.IndexedSeq[SchemaAdditionalValidation] = findValues
}

object AdditionalValidation {
lazy val allValidations: Option[List[AdditionalValidation]] =
Some(SchemaAdditionalValidation.values.toList)

/**
* An OLD topic will have its metadata populated.
* Therefore, additionalValidations=None will be picked from the metadata.
* And no new additionalValidations will be applied on older topics.
*
* A NEW topic will not have a metadata object.
* Therefore, all existing additionalValidations will be assigned.
* Thus, additionalValidations on corresponding fields will be applied.
*
* Corner case: After this feature has been on STAGE/PROD for sometime and some new additionalValidations are required.
* We need not worry about old topics as the value of additionalValidations will remain the same since the topic creation.
* New additionalValidations should be applied only on new topics.
* Therefore, assigning all the values under AdditionalValidation enum is reasonable.
*
* @param metadata a metadata object of current topic
* @return value of additionalValidations if the topic is already existing(OLD topic) otherwise all enum values under AdditionalValidation(NEW topic)
*/
def validations(metadata: Option[TopicMetadataContainer]): Option[List[AdditionalValidation]] =
metadata.map(_.value.additionalValidations).getOrElse(AdditionalValidation.allValidations)

def isPresent(metadata: Option[TopicMetadataContainer], additionalValidation: AdditionalValidation): Boolean =
validations(metadata).exists(_.contains(additionalValidation))
}
Loading

0 comments on commit 96c5d49

Please sign in to comment.