A Kafka-Dispatcher instance is configured with a file in a YAML format.
The file is composed by 3 main sections:
kafka
server
dispatcher
This section configures the interaction with the kafka server.
Syntax:
kafka:
bootstrap-servers: host:port
consumer:
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";'
security.protocol: SASL_SSL
consumer:
auto-offset-reset: earliest
enable-auto-commit: true/false
group-id: arbitrary name for this dispatcher
missing-topics-fatal: true/false
topics: list of comma-separated topics
Where:
- bootstrap-servers is the list of kafka brokers to connect to:
- for a Kafka broker, hostname and port of the kafka server instance(s)
- for an Event Hubs namespace, FQDN namespace:9093
- sasl.mechanism:
- for a Kafka broker, use
GSSAPI
- for an Event Hubs namespace, use
PLAIN
- for a Kafka broker, use
- sasl.jaas.config is the SASL configuration for the brokers:
- for a Kafka broker, if authentication is required, specify username and password, otherwise leave empty
- for an Event Hubs namespace, use $ConnectionString as username and the connection string as password
- security.protocol:
- for a Kafka broker, use
SASL_SSL
- for an Event Hubs namespace, use
PLAINTEXT
- for a Kafka broker, use
- enable-auto-commit: automatically acknowledge the kafka server that a message has been received
- group-id is the identifier used by the dispatcher to register to the kafka server
- missing-topics-fatal: if true, the service exits if any of the consumer topics does not exist
- topics is the list of topics the dispatcher will register for notifications
The remaining two sections (server and dispatcher) are common across all the dispatcher services and are documented here.
The following example configures a dispatcher instance as follows:
- it registers the instance to be notified for messages published in 2 topics of interests
- for the each topic, one action is defined:
- when a message from oncorseq.sequencing.in_progress is received, a nextflow process is triggered. If the payload includes a sampleID key, its value is replaced in the trigger before executing it.
- when a message from oncorseq.sequencing.pipeline_initialized is received, a command echoing the value of the pipeline parameter is executed.
kafka:
bootstrap-servers: hostname.med.cornell.edu:29092
properties:
sasl.mechanism: GSSAPI
sasl.jaas.config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";'
security.protocol: PLAINTEXT
consumer:
auto-offset-reset: earliest
group-id: kafka-dispatcher
enable-auto-commit: true
missing-topics-fatal: false
topics: oncorseq_sequencing_pipeline_initialized,oncorseq_sequencing_in_progress,oncorseq_sequencing_analysis_started
server:
port: 8080
servlet:
context-path: /dispatcher
dispatcher:
topics:
- name: oncorseq.sequencing.in_progress
actions:
- trigger: nextflow /path/main.nf -w /workingDir -c /path/nextflow-manuele.config --sampleID ${sampleID} --dispatcherURL http://localhost:8080/dispatcher/ --resourceDir /path
reply:
topic: oncorseq.sequencing.pipeline_initialized
payload: sampleID=${sampleID}&status=initialized
- name: oncorseq.sequencing.pipeline_initialized
actions:
- trigger: echo "Good job with ${pipeline}"
See further configuration examples here.