diff --git a/Dockerfile.new b/Dockerfile.new new file mode 100644 index 000000000..6e6db8d22 --- /dev/null +++ b/Dockerfile.new @@ -0,0 +1,15 @@ +FROM jelastic/jetty:9.4.49-openjdk-1.8.0_352 + +USER root + +ENV JAVA_OPTS="-Xmx2G" + +ENV CONTAINER_HTTP_PORT="8088" + +RUN mkdir -p /etc/hydra && mkdir -p /var/log/hydra && mkdir /ps-publish + +EXPOSE 8088 + +COPY ps-publish/hydra-ingest* /ps-publish + +ENTRYPOINT ["/ps-publish/bin/hydra-ingest"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..2977fdaa6 --- /dev/null +++ b/Makefile @@ -0,0 +1,43 @@ +# Makefile for building and running Docker containers + +# Define variables +DOCKER_IMAGE_NAME = hydra-publish-test +DOCKERFILE = Dockerfile.new +PORT_MAPPING = -p 8088:8088 +ENV_FILE = .env + +# Target to build the Docker image +build: + mkdir ps-publish + sbt clean compile + sbt universal:packageBin + unzip ingest/target/universal/*.zip -d ps-publish + docker build -t $(DOCKER_IMAGE_NAME) -f $(DOCKERFILE) . + +# Target to run the Docker container +run: + docker run -d $(PORT_MAPPING) --env-file $(ENV_FILE) --name ${DOCKER_IMAGE_NAME} $(DOCKER_IMAGE_NAME) + +# Target to stop and remove the Docker container +stop: + docker stop $(DOCKER_IMAGE_NAME) + docker rm $(DOCKER_IMAGE_NAME) + +# Target to clean up all containers and images +clean: + docker stop $(DOCKER_IMAGE_NAME) || true + docker rm $(DOCKER_IMAGE_NAME) || true + docker rmi $(DOCKER_IMAGE_NAME) || true + rm -rf ps-publish + +# Target to show available targets +help: + @echo "Available targets:" + @echo " build - Build the Docker image" + @echo " run - Run the Docker container" + @echo " stop - Stop and remove the Docker container" + @echo " clean - Clean up all containers and images" + @echo " help - Show this help message" + +# By default, show the help message +.DEFAULT_GOAL := help diff --git a/README.md b/README.md index ca6e33d26..b51f16724 100644 --- a/README.md +++ b/README.md @@ -31,53 +31,13 @@ sbt clean compile ## Docker -### Services needed to run Hydra -- Kafka 2.0.0 -- Confluent Schema Registry 5.0.0 -- Zookeeper (3.x +) +### Development Environment for Testing +We have a development MSK and Schema Registry Cluster running in the eplur-staging AWS account. Access to this cluster is granted via IAM to the `exp_adapt_dvs_set` role. -This documentation walks through setting up the core basic components of Hydra. - -### Create a VirtualBox instance - -``` -docker-machine create --driver virtualbox --virtualbox-memory 6000 hydra -``` - -### Configure Terminal to attach to the new machine - -``` -docker-machine env hydra -``` - -### Create a Docker network - -``` -docker network create hydra -``` - -### Start Zookeeper - -Hydra uses Zookeeper as a coordination service to automate bootstrapping and joining a cluster. - -It is also used by Kafka and the Schema Registry. - -Since all services depend on Zookeeper being up, so we will start that first. It is not always -needed to do this, but doing so avoids race conditions tht may happen across the different containers. - -``` -docker-compose up -d zookeeper -``` - -### Start Hydra - -``` -docker-compose up hydra -``` - -> You can also start each service separately. - -That should do it! +### Steps for building locally +- Create a .env from the example.env template. +- Update the .env file with your AWS Credentials. Those can be gathered in AWS Identity Center. +- Use the Makefile to build and deploy Hydra Publish into a local Docker container. # Checking if Hydra is Running diff --git a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala index 93b18776f..115db9a4f 100644 --- a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala +++ b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala @@ -639,5 +639,4 @@ class RedisSchemaRegistryClient(restService: RestService, override def testCompatibility(s: String, schema: Schema): Boolean = { restService.testCompatibility(schema.toString(), s, "latest") } - } diff --git a/example.env b/example.env new file mode 100644 index 000000000..3c4e11b11 --- /dev/null +++ b/example.env @@ -0,0 +1,35 @@ +LOG_DIR=log +LOG_LEVEL=INFO +AKKA_LOG_LEVEL=DEBUG +HYDRA_V2_METADATA_CONTACT=#test-messages-thread +HYDRA_REPLICATION_FACTOR=1 +KAFKA_BROKER_MIN_INSYNC_REPLICAS=1 +HYDRA_MIN_INSYNC_REPLICAS=1 +HYDRA_V2_METADATA_CONSUMER_GROUP=v2MetadataConsumer +HYDRA_V2_CREATE_TOPICS_ENABLED=true +HYDRA_V2_METADATA_CREATE_ON_STARTUP=true +CONTAINER_HTTP_PORT=8088 + +# Below are the environment variables that you will need for each of the resepctive sources of your Kafka data. Uncomment to use. +## Get these credentials from AWS Identity Center under the exp_adapt_dvs_set role. +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_SESSION_TOKEN= + +HYDRA_SCHEMA_REGISTRY_URL=https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081 +HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS=b-1.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098 +HYDRA_KAFKA_SECURITY_PROTOCOL=SASL_SSL +HYDRA_KAFKA_SASL_MECHANISM=AWS_MSK_IAM +HYDRA_KAFKA_SASL_JAAS_CONFIG=software.amazon.msk.auth.iam.IAMLoginModule required; +HYDRA_KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler + +AKKA_HTTP_SERVER_REQUEST_TIMEOUT=35s +INGEST_TIMEOUT=35000 millis + +HYDRA_SCHEMA_REGISTRY_USE_REDIS=false +HYDRA_SCHEMA_REGISTRY_REDIS_HOST=localhost +HYDRA_SCHEMA_REGISTRY_REDIS_PORT=6379 +HYDRA_SCHEMA_REGISTRY_REDIS_SSL=false +HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL=10080 +HYDRA_SCHEMA_REGISTRY_REDIS_SCHEMA_CACHE_TTL=11080 +HYDRA_SCHEMA_REGISTRY_REDIS_VERSION_CACHE_TTL=12080 \ No newline at end of file diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index 9a8bc98ce..b1b66afbe 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -12,6 +12,7 @@ import org.apache.avro.Schema import retry.syntax.all._ import retry.{RetryDetails, RetryPolicy, _} import cats.implicits._ +import hydra.common.logging.LoggingAdapter import hydra.kafka.model.TopicMetadataV2Request.Subject import scala.language.higherKinds @@ -25,7 +26,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr v2MetadataTopicName: Subject, metadataAlgebra: MetadataAlgebra[F], validator: KeyAndValueSchemaV2Validator[F] - ) (implicit eff: Sync[F]){ + ) (implicit eff: Sync[F]) extends LoggingAdapter { private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = { (error, retryDetails) => @@ -34,6 +35,10 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr ) } + private def logInfo(message: String) = { + log.info(s"CreateTopicProgram: $message") + } + private def registerSchema( subject: Subject, schema: Schema, @@ -49,8 +54,14 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr schemaRegistry.registerSchema(suffixedSubject, schema) *> schemaRegistry.getVersion(suffixedSubject, schema).map { newSchemaVersion => - if (previousSchemaVersion.contains(newSchemaVersion)) None - else Some(newSchemaVersion) + if (previousSchemaVersion.contains(newSchemaVersion)) { + logInfo(s"Schema for the topic $subject with the version $newSchemaVersion already exists.") + None + } + else { + logInfo(s"Schema for the topic $subject was created. The version is $newSchemaVersion.") + Some(newSchemaVersion) + } } } }.retryingOnAllErrors(retryPolicy, onFailure("RegisterSchema")) @@ -58,6 +69,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr .makeCase(registerSchema)((newVersionMaybe, exitCase) => (exitCase, newVersionMaybe) match { case (ExitCase.Error(_), Some(newVersion)) => + logInfo(s"Delete a schema for the topic $subject.") schemaRegistry.deleteSchemaOfVersion(suffixedSubject, newVersion) case _ => Bracket[F, Throwable].unit } @@ -95,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr Resource .makeCase(createTopic)({ case (Some(_), ExitCase.Error(_)) => + logInfo(s"Clean a resource for the topic $subject.") kafkaAdmin.deleteTopic(subject.value) case _ => Bracket[F, Throwable].unit })