Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAPT-1642 Throw an exception if schema wasn’t created #878

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Dockerfile.new
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM jelastic/jetty:9.4.49-openjdk-1.8.0_352

USER root

ENV JAVA_OPTS="-Xmx2G"

ENV CONTAINER_HTTP_PORT="8088"

RUN mkdir -p /etc/hydra && mkdir -p /var/log/hydra && mkdir /ps-publish

EXPOSE 8088

COPY ps-publish/hydra-ingest* /ps-publish

ENTRYPOINT ["/ps-publish/bin/hydra-ingest"]
43 changes: 43 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Makefile for building and running Docker containers

# Define variables
DOCKER_IMAGE_NAME = hydra-publish-test
DOCKERFILE = Dockerfile.new
PORT_MAPPING = -p 8088:8088
ENV_FILE = .env

# Target to build the Docker image
build:
mkdir ps-publish
sbt clean compile
sbt universal:packageBin
unzip ingest/target/universal/*.zip -d ps-publish
docker build -t $(DOCKER_IMAGE_NAME) -f $(DOCKERFILE) .

# Target to run the Docker container
run:
docker run -d $(PORT_MAPPING) --env-file $(ENV_FILE) --name ${DOCKER_IMAGE_NAME} $(DOCKER_IMAGE_NAME)

# Target to stop and remove the Docker container
stop:
docker stop $(DOCKER_IMAGE_NAME)
docker rm $(DOCKER_IMAGE_NAME)

# Target to clean up all containers and images
clean:
docker stop $(DOCKER_IMAGE_NAME) || true
docker rm $(DOCKER_IMAGE_NAME) || true
docker rmi $(DOCKER_IMAGE_NAME) || true
rm -rf ps-publish

# Target to show available targets
help:
@echo "Available targets:"
@echo " build - Build the Docker image"
@echo " run - Run the Docker container"
@echo " stop - Stop and remove the Docker container"
@echo " clean - Clean up all containers and images"
@echo " help - Show this help message"

# By default, show the help message
.DEFAULT_GOAL := help
52 changes: 6 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,53 +31,13 @@ sbt clean compile

## Docker

### Services needed to run Hydra
- Kafka 2.0.0
- Confluent Schema Registry 5.0.0
- Zookeeper (3.x +)
### Development Environment for Testing
We have a development MSK and Schema Registry Cluster running in the eplur-staging AWS account. Access to this cluster is granted via IAM to the `exp_adapt_dvs_set` role.

This documentation walks through setting up the core basic components of Hydra.

### Create a VirtualBox instance

```
docker-machine create --driver virtualbox --virtualbox-memory 6000 hydra
```

### Configure Terminal to attach to the new machine

```
docker-machine env hydra
```

### Create a Docker network

```
docker network create hydra
```

### Start Zookeeper

Hydra uses Zookeeper as a coordination service to automate bootstrapping and joining a cluster.

It is also used by Kafka and the Schema Registry.

Since all services depend on Zookeeper being up, so we will start that first. It is not always
needed to do this, but doing so avoids race conditions tht may happen across the different containers.

```
docker-compose up -d zookeeper
```

### Start Hydra

```
docker-compose up hydra
```

> You can also start each service separately.

That should do it!
### Steps for building locally
- Create a .env from the example.env template.
- Update the .env file with your AWS Credentials. Those can be gathered in AWS Identity Center.
- Use the Makefile to build and deploy Hydra Publish into a local Docker container.

# Checking if Hydra is Running

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,5 +639,4 @@ class RedisSchemaRegistryClient(restService: RestService,
override def testCompatibility(s: String, schema: Schema): Boolean = {
restService.testCompatibility(schema.toString(), s, "latest")
}

}
35 changes: 35 additions & 0 deletions example.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
LOG_DIR=log
LOG_LEVEL=INFO
AKKA_LOG_LEVEL=DEBUG
HYDRA_V2_METADATA_CONTACT=#test-messages-thread
HYDRA_REPLICATION_FACTOR=1
KAFKA_BROKER_MIN_INSYNC_REPLICAS=1
HYDRA_MIN_INSYNC_REPLICAS=1
HYDRA_V2_METADATA_CONSUMER_GROUP=v2MetadataConsumer
HYDRA_V2_CREATE_TOPICS_ENABLED=true
HYDRA_V2_METADATA_CREATE_ON_STARTUP=true
CONTAINER_HTTP_PORT=8088

# Below are the environment variables that you will need for each of the resepctive sources of your Kafka data. Uncomment to use.
## Get these credentials from AWS Identity Center under the exp_adapt_dvs_set role.
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_SESSION_TOKEN=

HYDRA_SCHEMA_REGISTRY_URL=https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081
HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS=b-1.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.9ofx2d.c14.kafka.us-west-2.amazonaws.com:9098
HYDRA_KAFKA_SECURITY_PROTOCOL=SASL_SSL
HYDRA_KAFKA_SASL_MECHANISM=AWS_MSK_IAM
HYDRA_KAFKA_SASL_JAAS_CONFIG=software.amazon.msk.auth.iam.IAMLoginModule required;
HYDRA_KAFKA_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler

AKKA_HTTP_SERVER_REQUEST_TIMEOUT=35s
INGEST_TIMEOUT=35000 millis

HYDRA_SCHEMA_REGISTRY_USE_REDIS=false
HYDRA_SCHEMA_REGISTRY_REDIS_HOST=localhost
HYDRA_SCHEMA_REGISTRY_REDIS_PORT=6379
HYDRA_SCHEMA_REGISTRY_REDIS_SSL=false
HYDRA_SCHEMA_REGISTRY_REDIS_ID_CACHE_TTL=10080
HYDRA_SCHEMA_REGISTRY_REDIS_SCHEMA_CACHE_TTL=11080
HYDRA_SCHEMA_REGISTRY_REDIS_VERSION_CACHE_TTL=12080
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.avro.Schema
import retry.syntax.all._
import retry.{RetryDetails, RetryPolicy, _}
import cats.implicits._
import hydra.common.logging.LoggingAdapter
import hydra.kafka.model.TopicMetadataV2Request.Subject

import scala.language.higherKinds
Expand All @@ -25,7 +26,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
v2MetadataTopicName: Subject,
metadataAlgebra: MetadataAlgebra[F],
validator: KeyAndValueSchemaV2Validator[F]
) (implicit eff: Sync[F]){
) (implicit eff: Sync[F]) extends LoggingAdapter {

private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = {
(error, retryDetails) =>
Expand All @@ -34,6 +35,10 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
)
}

private def logInfo(message: String) = {
log.info(s"CreateTopicProgram: $message")
}

private def registerSchema(
subject: Subject,
schema: Schema,
Expand All @@ -49,15 +54,22 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
schemaRegistry.registerSchema(suffixedSubject, schema) *>
schemaRegistry.getVersion(suffixedSubject, schema).map {
newSchemaVersion =>
if (previousSchemaVersion.contains(newSchemaVersion)) None
else Some(newSchemaVersion)
if (previousSchemaVersion.contains(newSchemaVersion)) {
logInfo(s"Schema for the topic $subject with the version $newSchemaVersion already exists.")
None
}
else {
logInfo(s"Schema for the topic $subject was created. The version is $newSchemaVersion.")
Some(newSchemaVersion)
}
}
}
}.retryingOnAllErrors(retryPolicy, onFailure("RegisterSchema"))
Resource
.makeCase(registerSchema)((newVersionMaybe, exitCase) =>
(exitCase, newVersionMaybe) match {
case (ExitCase.Error(_), Some(newVersion)) =>
logInfo(s"Delete a schema for the topic $subject.")
schemaRegistry.deleteSchemaOfVersion(suffixedSubject, newVersion)
case _ => Bracket[F, Throwable].unit
}
Expand Down Expand Up @@ -95,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
Resource
.makeCase(createTopic)({
case (Some(_), ExitCase.Error(_)) =>
logInfo(s"Clean a resource for the topic $subject.")
kafkaAdmin.deleteTopic(subject.value)
case _ => Bracket[F, Throwable].unit
})
Expand Down
Loading