diff --git a/.env.dev b/.env.dev index d7a130c2..dfe82647 100644 --- a/.env.dev +++ b/.env.dev @@ -123,6 +123,10 @@ KAFKA_CPU_LIMIT=0.8 KAFKA_CPU_RESERVE=0.05 KAFKA_MEMORY_LIMIT=3G KAFKA_MEMORY_RESERVE=500M +KAFKA_PARTITIONS=3 +KAFKA_INSTANCES=1 +# Topics should comma seperated +KAFKA_TOPICS=2xx,reprocess,3xx # Kafdrop diff --git a/.env.local b/.env.local index 336a7f86..d5614f23 100644 --- a/.env.local +++ b/.env.local @@ -74,3 +74,14 @@ JS_REPORT_LICENSE_KEY= # MAKE SURE YOU HAVE RUN 'set-permissions.sh' SCRIPT BEFORE AND AFTER RUNNING JS REPORT JS_REPORT_DEV_MOUNT=false JS_REPORT_PACKAGE_PATH= + +# Message Bus - Kafka + +KAFKA_PARTITIONS=1 +KAFKA_INSTANCES=1 +# Topics should comma seperated +KAFKA_TOPICS=2xx,reprocess + +# Kafdrop + + diff --git a/.env.remote b/.env.remote index a8491d16..1b920be9 100644 --- a/.env.remote +++ b/.env.remote @@ -41,3 +41,10 @@ POSTGRES_REPLICA_SET=postgres-1:5432,postgres-2:5432,postgres-3:5432 MONGO_SET_COUNT=3 OPENHIM_MONGO_URL=mongodb://mongo-1:27017,mongo-2:27017,mongo-3:27017/openhim?replicaSet=mongo-set OPENHIM_MONGO_ATNAURL=mongodb://mongo-1:27017,mongo-2:27017,mongo-3:27017/openhim?replicaSet=mongo-set + +# Message Bus - Kafka + +KAFKA_PARTITIONS=3 +KAFKA_INSTANCES=3 +# Topics should comma seperated +KAFKA_TOPICS=2xx,reprocess diff --git a/message-bus-kafka/docker-compose.await-helper.yml b/message-bus-kafka/docker-compose.await-helper.yml new file mode 100644 index 00000000..674b4871 --- /dev/null +++ b/message-bus-kafka/docker-compose.await-helper.yml @@ -0,0 +1,10 @@ +version: '3.9' + +services: + await-helper: + image: jembi/await-helper:1.0.1 + deploy: + replicas: 1 + restart_policy: + condition: none + command: '-k http://kafdrop:9013' diff --git a/message-bus-kafka/importer/docker-compose.config.yml b/message-bus-kafka/importer/docker-compose.config.yml new file mode 100644 index 00000000..c3e2fed4 --- /dev/null +++ b/message-bus-kafka/importer/docker-compose.config.yml @@ -0,0 +1,24 @@ +version: '3.9' + +services: + message-bus-kafka-config-importer: + image: jembi/instantohie-config-importer + deploy: + restart_policy: + condition: none + environment: + KAFDROP_HOST: kafdrop + KAFDROP_PORT: 9013 + KAFKA_PARTITIONS: ${KAFKA_PARTITIONS:-3} + KAFKA_TOPICS: ${KAFKA_TOPICS} + command: sh -c "wait-on -t 60000 http-get://kafdrop:9013/topic && node /kafka.js && echo 'success';" + configs: + - source: kafka.js + target: /kafka.js + +configs: + kafka.js: + file: ./kafka.js + name: kafka-config.js-${kafka_config_DIGEST:?err} + labels: + name: ethiopia diff --git a/message-bus-kafka/importer/kafka.js b/message-bus-kafka/importer/kafka.js new file mode 100644 index 00000000..6dfea0a9 --- /dev/null +++ b/message-bus-kafka/importer/kafka.js @@ -0,0 +1,41 @@ +'use strict' + +const http = require('http') + +const KAFDROP_HOST = process.env.KAFDROP_HOST || 'kafdrop' +const KAFDROP_PORT = process.env.KAFDROP_PORT || '9013' +const KAFKA_PARTITIONS = process.env.KAFKA_PARTITIONS || 3 +const KAFKA_TOPICS= process.env.KAFKA_TOPICS + +const createKafkaTopic = topic => { + const options = { + protocol: 'http:', + hostname: KAFDROP_HOST, + path: '/topic', + port: KAFDROP_PORT, + method: 'POST', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded' + } + } + + const req = http.request(options, res => { + if (res.statusCode != 200) { + throw Error(`Failed to create topic - ${topic}`) + } + console.log(`Created topic - ${topic}`) + }) + req.write(`name=${topic}&partitionsNumber=${KAFKA_PARTITIONS}&replicationFactor=1`) + req.end() +} + +console.log('Creating kafka topics......................'); + +(() => { + if (KAFKA_TOPICS) { + KAFKA_TOPICS.split(',').forEach(topic => createKafkaTopic(topic)) + } else { + console.log('Topics not created: KAFKA_TOPICS variable invalid') + process.exit(1) + } +})(); diff --git a/message-bus-kafka/package-metadata.json b/message-bus-kafka/package-metadata.json index 07fb2298..20d1283b 100644 --- a/message-bus-kafka/package-metadata.json +++ b/message-bus-kafka/package-metadata.json @@ -8,6 +8,8 @@ "KAFKA_CPU_RESERVE": "0.05", "KAFKA_MEMORY_LIMIT": "3G", "KAFKA_MEMORY_RESERVE": "500M", + "KAFKA_PARTITIONS": "3", + "KAFKA_TOPICS": "", "KAFDROP_CPU_LIMIT": "0.8", "KAFDROP_CPU_RESERVE": "0.05", "KAFDROP_MEMORY_LIMIT": "3G", diff --git a/message-bus-kafka/swarm.sh b/message-bus-kafka/swarm.sh old mode 100644 new mode 100755 index 71db3192..4635e997 --- a/message-bus-kafka/swarm.sh +++ b/message-bus-kafka/swarm.sh @@ -1,6 +1,6 @@ #!/bin/bash -statefulNodes=${STATEFUL_NODES:-"cluster"} +STATEFUL_NODES=${STATEFUL_NODES:-"cluster"} COMPOSE_FILE_PATH=$( cd "$(dirname "${BASH_SOURCE[0]}")" || exit @@ -13,7 +13,10 @@ ROOT_PATH="${COMPOSE_FILE_PATH}/.." . "${ROOT_PATH}/utils/docker-utils.sh" . "${ROOT_PATH}/utils/log.sh" -if [[ $statefulNodes == "cluster" ]]; then +readonly KAFKA_INSTANCES=${KAFKA_INSTANCES:-1} +export KAFKA_INSTANCES + +if [[ $STATEFUL_NODES == "cluster" ]]; then log info "Running Message Bus Kafka package in Cluster node mode" kafkaClusterComposeParam="-c ${COMPOSE_FILE_PATH}/docker-compose.cluster.yml" else @@ -30,32 +33,46 @@ else fi if [[ $1 == "init" ]] || [[ $1 == "up" ]]; then + config::set_config_digests "${COMPOSE_FILE_PATH}"/importer/docker-compose.config.yml + try "docker stack deploy -c ${COMPOSE_FILE_PATH}/docker-compose.yml $kafkaClusterComposeParam $kafkaDevComposeParam instant" "Failed to deploy Message Bus Kafka" + + config::await_service_running "kafka" "${COMPOSE_FILE_PATH}"/docker-compose.await-helper.yml "${KAFKA_INSTANCES}" + + try "docker stack deploy -c ${COMPOSE_FILE_PATH}/importer/docker-compose.config.yml instant" "Failed to deploy Message Bus Kafka" + + config::remove_stale_service_configs "${COMPOSE_FILE_PATH}"/importer/docker-compose.config.yml "ethiopia" + config::remove_config_importer message-bus-kafka-config-importer elif [[ $1 == "down" ]]; then try "docker service scale instant_zookeeper-1=0 instant_kafdrop=0" "Failed to scale down zookeeper and kafdrop" # You cannot scale a global service so we have to remove it try "docker service rm instant_kafka" "Failed to remove kafka" - if [[ $statefulNodes == "cluster" ]]; then + if [[ $STATEFUL_NODES == "cluster" ]]; then try "docker service scale instant_zookeeper-2=0" "Failed to scale down zookeeper cluster" try "docker service scale instant_zookeeper-3=0" "Failed to scale down zookeeper cluster" fi elif [[ $1 == "destroy" ]]; then - try "docker service rm instant_zookeeper-1 instant_kafka instant_kafdrop" "Failed to destroy kafka" - log info "Allow services to shut down before deleting volumes" - config::await_service_removed instant_zookeeper-1 - config::await_service_removed instant_kafka - config::await_service_removed instant_kafdrop + docker::service_destroy zookeeper-1 + docker::service_destroy kafka + docker::service_destroy kafdrop + + docker::try_remove_volume zookeeper-1-volume + docker::try_remove_volume kafka-volume - try "docker volume rm instant_kafka-volume" "Failed to remove kafka volume" - try "docker volume rm instant_zookeeper-1-volume" "Failed to remove zookeeper volume" + if [[ $STATEFUL_NODES == "cluster" ]]; then + docker::service_destroy zookeeper-2 + docker::service_destroy zookeeper-3 - if [[ $statefulNodes == "cluster" ]]; then - try "docker service rm instant_zookeeper-2" "Failed to remove zookeeper cluster volumes" - try "docker service rm instant_zookeeper-3" "Failed to remove zookeeper cluster volumes" + docker::try_remove_volume zookeeper-2-volume + docker::try_remove_volume zookeeper-3-volume log notice "Volumes are only deleted on the host on which the command is run. Kafka volumes on other nodes are not deleted" fi + + if ! docker service rm instant_message-bus-kafka-config-importer; then + log warn "message-bus-kafka-config-importer not removed... it's possible the service has already been removed" + fi else log error "Valid options are: init, up, down, or destroy" fi diff --git a/utils/config-utils.sh b/utils/config-utils.sh index a327cffd..193139a7 100755 --- a/utils/config-utils.sh +++ b/utils/config-utils.sh @@ -164,6 +164,12 @@ config::remove_config_importer() { local -r start_time=$(date +%s) local config_importer_state + + if [[ -z $(docker service ps instant_"$config_importer_service_name") ]]; then + log info "instant_$config_importer_service_name service cannot be removed as it does not exist!" + exit 0 + fi + config_importer_state=$(docker service ps instant_"$config_importer_service_name" --format "{{.CurrentState}}") until [[ $config_importer_state == *"Complete"* ]]; do config::timeout_check "$start_time" "$config_importer_service_name to run" "$exit_time" "$warning_time" @@ -192,6 +198,7 @@ config::await_service_removed() { config::timeout_check "$start_time" "${SERVICE_NAME} to be removed" sleep 1 done + log info "Service $SERVICE_NAME successfully removed" } # Waits for the provided service to join the network diff --git a/utils/docker-utils.sh b/utils/docker-utils.sh index 9979a76c..b86f6990 100644 --- a/utils/docker-utils.sh +++ b/utils/docker-utils.sh @@ -107,7 +107,7 @@ docker::try_remove_volume() { local start_time start_time=$(date +%s) until [[ -n "$(docker volume rm instant_"${VOLUME_NAME}" 2>/dev/null)" ]]; do - config::timeout_check "${start_time}" "${VOLUME_NAME} to be removed" "10" "20" + config::timeout_check "${start_time}" "${VOLUME_NAME} to be removed" "20" "10" sleep 1 done overwrite "Waiting for volume ${VOLUME_NAME} to be removed... Done"