Skip to content

Commit

Permalink
Merge pull request #108 from jembi/PLAT-346-kafak-topic-partitioning
Browse files Browse the repository at this point in the history
Plat 346 kafka topic partitioning
  • Loading branch information
tumbledwyer authored Jul 26, 2022
2 parents 27ec7e0 + 3123910 commit 26ce99f
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ KAFKA_CPU_LIMIT=0.8
KAFKA_CPU_RESERVE=0.05
KAFKA_MEMORY_LIMIT=3G
KAFKA_MEMORY_RESERVE=500M
KAFKA_PARTITIONS=3
KAFKA_INSTANCES=1
# Topics should comma seperated
KAFKA_TOPICS=2xx,reprocess,3xx

# Kafdrop

Expand Down
11 changes: 11 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,14 @@ JS_REPORT_LICENSE_KEY=
# MAKE SURE YOU HAVE RUN 'set-permissions.sh' SCRIPT BEFORE AND AFTER RUNNING JS REPORT
JS_REPORT_DEV_MOUNT=false
JS_REPORT_PACKAGE_PATH=

# Message Bus - Kafka

KAFKA_PARTITIONS=1
KAFKA_INSTANCES=1
# Topics should comma seperated
KAFKA_TOPICS=2xx,reprocess

# Kafdrop


7 changes: 7 additions & 0 deletions .env.remote
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@ POSTGRES_REPLICA_SET=postgres-1:5432,postgres-2:5432,postgres-3:5432
MONGO_SET_COUNT=3
OPENHIM_MONGO_URL=mongodb://mongo-1:27017,mongo-2:27017,mongo-3:27017/openhim?replicaSet=mongo-set
OPENHIM_MONGO_ATNAURL=mongodb://mongo-1:27017,mongo-2:27017,mongo-3:27017/openhim?replicaSet=mongo-set

# Message Bus - Kafka

KAFKA_PARTITIONS=3
KAFKA_INSTANCES=3
# Topics should comma seperated
KAFKA_TOPICS=2xx,reprocess
10 changes: 10 additions & 0 deletions message-bus-kafka/docker-compose.await-helper.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '3.9'

services:
await-helper:
image: jembi/await-helper:1.0.1
deploy:
replicas: 1
restart_policy:
condition: none
command: '-k http://kafdrop:9013'
24 changes: 24 additions & 0 deletions message-bus-kafka/importer/docker-compose.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: '3.9'

services:
message-bus-kafka-config-importer:
image: jembi/instantohie-config-importer
deploy:
restart_policy:
condition: none
environment:
KAFDROP_HOST: kafdrop
KAFDROP_PORT: 9013
KAFKA_PARTITIONS: ${KAFKA_PARTITIONS:-3}
KAFKA_TOPICS: ${KAFKA_TOPICS}
command: sh -c "wait-on -t 60000 http-get://kafdrop:9013/topic && node /kafka.js && echo 'success';"
configs:
- source: kafka.js
target: /kafka.js

configs:
kafka.js:
file: ./kafka.js
name: kafka-config.js-${kafka_config_DIGEST:?err}
labels:
name: ethiopia
41 changes: 41 additions & 0 deletions message-bus-kafka/importer/kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const http = require('http')

const KAFDROP_HOST = process.env.KAFDROP_HOST || 'kafdrop'
const KAFDROP_PORT = process.env.KAFDROP_PORT || '9013'
const KAFKA_PARTITIONS = process.env.KAFKA_PARTITIONS || 3
const KAFKA_TOPICS= process.env.KAFKA_TOPICS

const createKafkaTopic = topic => {
const options = {
protocol: 'http:',
hostname: KAFDROP_HOST,
path: '/topic',
port: KAFDROP_PORT,
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}

const req = http.request(options, res => {
if (res.statusCode != 200) {
throw Error(`Failed to create topic - ${topic}`)
}
console.log(`Created topic - ${topic}`)
})
req.write(`name=${topic}&partitionsNumber=${KAFKA_PARTITIONS}&replicationFactor=1`)
req.end()
}

console.log('Creating kafka topics......................');

(() => {
if (KAFKA_TOPICS) {
KAFKA_TOPICS.split(',').forEach(topic => createKafkaTopic(topic))
} else {
console.log('Topics not created: KAFKA_TOPICS variable invalid')
process.exit(1)
}
})();
2 changes: 2 additions & 0 deletions message-bus-kafka/package-metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"KAFKA_CPU_RESERVE": "0.05",
"KAFKA_MEMORY_LIMIT": "3G",
"KAFKA_MEMORY_RESERVE": "500M",
"KAFKA_PARTITIONS": "3",
"KAFKA_TOPICS": "",
"KAFDROP_CPU_LIMIT": "0.8",
"KAFDROP_CPU_RESERVE": "0.05",
"KAFDROP_MEMORY_LIMIT": "3G",
Expand Down
43 changes: 30 additions & 13 deletions message-bus-kafka/swarm.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

statefulNodes=${STATEFUL_NODES:-"cluster"}
STATEFUL_NODES=${STATEFUL_NODES:-"cluster"}

COMPOSE_FILE_PATH=$(
cd "$(dirname "${BASH_SOURCE[0]}")" || exit
Expand All @@ -13,7 +13,10 @@ ROOT_PATH="${COMPOSE_FILE_PATH}/.."
. "${ROOT_PATH}/utils/docker-utils.sh"
. "${ROOT_PATH}/utils/log.sh"

if [[ $statefulNodes == "cluster" ]]; then
readonly KAFKA_INSTANCES=${KAFKA_INSTANCES:-1}
export KAFKA_INSTANCES

if [[ $STATEFUL_NODES == "cluster" ]]; then
log info "Running Message Bus Kafka package in Cluster node mode"
kafkaClusterComposeParam="-c ${COMPOSE_FILE_PATH}/docker-compose.cluster.yml"
else
Expand All @@ -30,32 +33,46 @@ else
fi

if [[ $1 == "init" ]] || [[ $1 == "up" ]]; then
config::set_config_digests "${COMPOSE_FILE_PATH}"/importer/docker-compose.config.yml

try "docker stack deploy -c ${COMPOSE_FILE_PATH}/docker-compose.yml $kafkaClusterComposeParam $kafkaDevComposeParam instant" "Failed to deploy Message Bus Kafka"

config::await_service_running "kafka" "${COMPOSE_FILE_PATH}"/docker-compose.await-helper.yml "${KAFKA_INSTANCES}"

try "docker stack deploy -c ${COMPOSE_FILE_PATH}/importer/docker-compose.config.yml instant" "Failed to deploy Message Bus Kafka"

config::remove_stale_service_configs "${COMPOSE_FILE_PATH}"/importer/docker-compose.config.yml "ethiopia"
config::remove_config_importer message-bus-kafka-config-importer
elif [[ $1 == "down" ]]; then
try "docker service scale instant_zookeeper-1=0 instant_kafdrop=0" "Failed to scale down zookeeper and kafdrop"
# You cannot scale a global service so we have to remove it
try "docker service rm instant_kafka" "Failed to remove kafka"
if [[ $statefulNodes == "cluster" ]]; then
if [[ $STATEFUL_NODES == "cluster" ]]; then
try "docker service scale instant_zookeeper-2=0" "Failed to scale down zookeeper cluster"
try "docker service scale instant_zookeeper-3=0" "Failed to scale down zookeeper cluster"
fi
elif [[ $1 == "destroy" ]]; then
try "docker service rm instant_zookeeper-1 instant_kafka instant_kafdrop" "Failed to destroy kafka"

log info "Allow services to shut down before deleting volumes"

config::await_service_removed instant_zookeeper-1
config::await_service_removed instant_kafka
config::await_service_removed instant_kafdrop
docker::service_destroy zookeeper-1
docker::service_destroy kafka
docker::service_destroy kafdrop

docker::try_remove_volume zookeeper-1-volume
docker::try_remove_volume kafka-volume

try "docker volume rm instant_kafka-volume" "Failed to remove kafka volume"
try "docker volume rm instant_zookeeper-1-volume" "Failed to remove zookeeper volume"
if [[ $STATEFUL_NODES == "cluster" ]]; then
docker::service_destroy zookeeper-2
docker::service_destroy zookeeper-3

if [[ $statefulNodes == "cluster" ]]; then
try "docker service rm instant_zookeeper-2" "Failed to remove zookeeper cluster volumes"
try "docker service rm instant_zookeeper-3" "Failed to remove zookeeper cluster volumes"
docker::try_remove_volume zookeeper-2-volume
docker::try_remove_volume zookeeper-3-volume
log notice "Volumes are only deleted on the host on which the command is run. Kafka volumes on other nodes are not deleted"
fi

if ! docker service rm instant_message-bus-kafka-config-importer; then
log warn "message-bus-kafka-config-importer not removed... it's possible the service has already been removed"
fi
else
log error "Valid options are: init, up, down, or destroy"
fi
7 changes: 7 additions & 0 deletions utils/config-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ config::remove_config_importer() {
local -r start_time=$(date +%s)

local config_importer_state

if [[ -z $(docker service ps instant_"$config_importer_service_name") ]]; then
log info "instant_$config_importer_service_name service cannot be removed as it does not exist!"
exit 0
fi

config_importer_state=$(docker service ps instant_"$config_importer_service_name" --format "{{.CurrentState}}")
until [[ $config_importer_state == *"Complete"* ]]; do
config::timeout_check "$start_time" "$config_importer_service_name to run" "$exit_time" "$warning_time"
Expand Down Expand Up @@ -192,6 +198,7 @@ config::await_service_removed() {
config::timeout_check "$start_time" "${SERVICE_NAME} to be removed"
sleep 1
done
log info "Service $SERVICE_NAME successfully removed"
}

# Waits for the provided service to join the network
Expand Down
2 changes: 1 addition & 1 deletion utils/docker-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ docker::try_remove_volume() {
local start_time
start_time=$(date +%s)
until [[ -n "$(docker volume rm instant_"${VOLUME_NAME}" 2>/dev/null)" ]]; do
config::timeout_check "${start_time}" "${VOLUME_NAME} to be removed" "10" "20"
config::timeout_check "${start_time}" "${VOLUME_NAME} to be removed" "20" "10"
sleep 1
done
overwrite "Waiting for volume ${VOLUME_NAME} to be removed... Done"
Expand Down

0 comments on commit 26ce99f

Please sign in to comment.