Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KAFKA_CONFIGS variable to do additional configuration #333

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ If you wish to use multi-line YAML or some other delimiter between your topic de

For example, `KAFKA_CREATE_TOPICS_SEPARATOR: "$$'\n"'` would use a newline to split the topic definitions. Syntax has to follow docker-compose escaping rules, and [ANSI-C](https://www.gnu.org/software/bash/manual/html_node/ANSI_002dC-Quoting.html) quoting.

Another environment variable can be added to perform further configuration changes: ```KAFKA_CONFIGS```. For example:

environment:
- KAFKA_CONFIGS=topics:Topic1:add:retention.ms=604800000 cleanup.policy=delete segment.bytes=1024

The format for this variable mirrors the ```kafka-configs.sh``` script. For multi-line YAML a separate delimiter variable ```KAFKA_CREATE_TOPICS_SEPARATOR``` is available.

## Advertised hostname

You can configure the advertised hostname in different ways
Expand Down
63 changes: 40 additions & 23 deletions create-topics.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#!/bin/bash

if [[ -z "$KAFKA_CREATE_TOPICS" ]]; then
exit 0
fi

if [[ -z "$START_TIMEOUT" ]]; then
START_TIMEOUT=600
fi
Expand All @@ -26,24 +22,45 @@ if $start_timeout_exceeded; then
exit 1
fi

# Expected format:
# name:partitions:replicas:cleanup.policy
IFS="${KAFKA_CREATE_TOPICS_SEPARATOR-,}"; for topicToCreate in $KAFKA_CREATE_TOPICS; do
echo "creating topics: $topicToCreate"
IFS=':' read -r -a topicConfig <<< "$topicToCreate"
config=
if [ -n "${topicConfig[3]}" ]; then
config="--config=cleanup.policy=${topicConfig[3]}"
fi
COMMAND="JMX_PORT='' ${KAFKA_HOME}/bin/kafka-topics.sh \\
--create \\
--zookeeper ${KAFKA_ZOOKEEPER_CONNECT} \\
--topic ${topicConfig[0]} \\
--partitions ${topicConfig[1]} \\
--replication-factor ${topicConfig[2]} \\
${config} \\
--if-not-exists &"
eval "${COMMAND}"
done
if [[ -n "$KAFKA_CREATE_TOPICS" ]]; then
# Expected format:
# name:partitions:replicas:cleanup.policy
IFS="${KAFKA_CREATE_TOPICS_SEPARATOR-,}"; for topicToCreate in $KAFKA_CREATE_TOPICS; do
echo "creating topics: $topicToCreate"
IFS=':' read -r -a topicConfig <<< "$topicToCreate"
config=
if [ -n "${topicConfig[3]}" ]; then
config="--config=cleanup.policy=${topicConfig[3]}"
fi
COMMAND="JMX_PORT='' ${KAFKA_HOME}/bin/kafka-topics.sh \\
--create \\
--zookeeper ${KAFKA_ZOOKEEPER_CONNECT} \\
--topic ${topicConfig[0]} \\
--partitions ${topicConfig[1]} \\
--replication-factor ${topicConfig[2]} \\
${config} \\
--if-not-exists &"
eval "${COMMAND}"
done
fi

wait

if [[ -n "$KAFKA_CONFIGS" ]]; then
# Expected format:
# type:name:add|delete:a=b c=d
IFS="${KAFKA_CONFIG_SEPARATOR-,}"; for configToChange in $KAFKA_CONFIGS; do
echo "changing config: $configToChange"
IFS=':' read -r -a entityConfig <<< "$configToChange"
config="${entityConfig[3]}"
COMMAND="JMX_PORT='' ${KAFKA_HOME}/bin/kafka-configs.sh \\
--zookeeper ${KAFKA_ZOOKEEPER_CONNECT} \\
--entity-type ${entityConfig[0]} \\
--entity-name ${entityConfig[1]} \\
--alter \\
--${entityConfig[2]}-config ${config// /,} &"
eval "${COMMAND}"
done
fi

wait
1 change: 1 addition & 0 deletions start-kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fi

create-topics.sh &
unset KAFKA_CREATE_TOPICS
unset KAFKA_CONFIGS

# DEPRECATED: but maintained for compatibility with older brokers pre 0.9.0 (https://issues.apache.org/jira/browse/KAFKA-1809)
if [[ -z "$KAFKA_ADVERTISED_PORT" && \
Expand Down