diff --git a/docs/kafka.md b/docs/kafka.md index 8329d509..0f59f9c0 100644 --- a/docs/kafka.md +++ b/docs/kafka.md @@ -1,9 +1,60 @@ -# Kafka plugin +# Kafka producer functionality in AliECS + +## Kafka producer functionality in AliECS core + +As of 2024 the AliECS core integrates Kafka producer functionality independent of the plugin, with the goal of all consumers eventually migrating to this new interface. + +### Making sure that AliECS sends messages + +To enable the plugin, one should make sure that the following points are fullfiled. +* The consul instance includes coordinates to the list of kafka brokers. + Navigate to `o2/components/aliecs/ANY/any/settings` and make sure the following key value pairs are there: + ``` + kafkaEndpoints: + - "my-kafka-broker-1:9092" + - "my-kafka-broker-2:9092" + - "my-kafka-broker-3:9092" + ``` + Please restart the AliECS core if you modify this file. + +No further AliECS configuration is necessary. + +AliECS will create the necessary topics if they don't exist yet, in this case the very first message will be lost. +Once the topics exist, no further messages can be lost and no action is necessary. + +### Currently available topics + +See [events.proto](../common/protos/events.proto) for the protobuf definitions of the messages. + +* `aliecs.core` - core events that don't concern a specific environment or task +* `aliecs.environment` - events that concern an environment, e.g. environment state changes +* `aliecs.task` - events emitted by a task, e.g. task state changes +* `aliecs.call` - events emitted before and after the execution of a call +* `aliecs.integrated_service.dcs` - events emitted by the DCS integrated service +* `aliecs.integrated_service.ddsched` - events emitted by the DDSched integrated service +* `aliecs.integrated_service.odc` - events emitted by the ODC integrated service +* `aliecs.integrated_service.trg` - events emitted by the TRG integrated service + +### Decoding the messages + +Messages are encoded with protobuf, with the aforementioned [events.proto](../common/protos/events.proto) file defining the schema. +Integraed service messages include a payload portion that is usually JSON-encoded, and has no predefined schema. + +To generate the precompiled protobuf interface, run `make fdset`. +You can then consume the messages from a given topic using [https://github.com/sevagh/pq](https://github.com/sevagh/pq): +``` +$ FDSET_PATH=./fdset pq kafka aliecs.environment --brokers kafka-broker-hostname:9092 --msgtype events.Event +``` + +Adjust the topic name, fdset path, and broker endpoint as necessary, and append `--beginning` to consume past messages from the beginning of the topic. + + +## Legacy events: Kafka plugin The Kafka plugin in AliECS publishes updates messages about new states of environments and lists of environments in the RUNNING state. The messages are encoded with protobuf. -## Making sure that AliECS sends messages +### Making sure that AliECS sends messages To enable the plugin, one should make sure that the following points are fullfiled. * The consul instance includes coordinates to your kafka broker and enables the plugin. @@ -17,7 +68,7 @@ To enable the plugin, one should make sure that the following points are fullfil * Plugin is enabled for the new environments. Make sure that there is a `true` value set in the consul instance at the path `o2/runtime/aliecs/vars/kafka_enabled`. Alternatively, one can put `kafka_enabled : true` in the Advanced configuration panel in the AliECS GUI. -## Currently available topics +### Currently available topics As for today, AliECS publishes on the following types of topics: @@ -25,11 +76,11 @@ As for today, AliECS publishes on the following types of topics: * `aliecs.env_leave_state.` where `state` can be `STANDBY`, `DEPLOYED`, `CONFIGURED`, `RUNNING`, `ERROR`. For each topic, AliECS publishes a `NewStateNotification` message when any environment is about to leave the corresponding state. * `aliecs.env_list.` where `state` is only `RUNNING`. Each time there is an environment state change, AliECS publishes an `ActiveRunsList` message which contains a list of all environments which are currently in `RUNNING` state. -## Decoding the messages +### Decoding the messages Messages are encoded with protobuf. Please use [this](../core/integration/kafka/protos/kafka.proto) proto file to generate code which deserializes the messages. -## Getting Start of Run and End of Run notifications +### Getting Start of Run and End of Run notifications To get SOR and EOR notifications, please subscribe to the two corresponding topics: * `aliecs.env_state.RUNNING` for Start of Run @@ -37,7 +88,7 @@ To get SOR and EOR notifications, please subscribe to the two corresponding topi Both will provide `NewStateNotification` messages encoded with protobuf. Please note that the EOR message will still contain the RUNNING state, because it is sent just before the transition starts. -## Using Kafka debug tools +### Using Kafka debug tools One can use some Kafka command line tools to verify that a given setup works correctly. One should make sure to have Kafka installed on the machine used to run the tools.