diff --git a/src/tutorials/mqtt/docker-compose.yaml b/src/tutorials/mqtt/docker-compose.yaml index ab942f28..26477484 100644 --- a/src/tutorials/mqtt/docker-compose.yaml +++ b/src/tutorials/mqtt/docker-compose.yaml @@ -1,7 +1,8 @@ version: '3' services: kafka: - image: docker.io/bitnami/kafka:3.2 + image: docker.io/bitnami/kafka:latest + container_name: kafka ports: - 9092:9092 - 29092:9092 @@ -32,37 +33,16 @@ services: - kafka init: true - kafka-ui: - container_name: kafka-ui-zq - image: provectuslabs/kafka-ui:latest - ports: - - 80:8080 - depends_on: - kafka-init: - condition: service_completed_successfully - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - zilla: image: ghcr.io/aklivity/zilla:develop-SNAPSHOT depends_on: - kafka ports: - 1883:1883 - - 8080:8080 - - 9090:9090 volumes: - ./zilla.yaml:/etc/zilla/zilla.yaml command: start -v -e - mqtt-simulator: - build: https://github.com/DamascenoRafael/mqtt-simulator.git - restart: on-failure - depends_on: - kafka-init: - condition: service_completed_successfully - networks: default: name: zilla-network diff --git a/src/tutorials/mqtt/mqtt-intro.md b/src/tutorials/mqtt/mqtt-intro.md index ce396b1a..a2b3faa4 100644 --- a/src/tutorials/mqtt/mqtt-intro.md +++ b/src/tutorials/mqtt/mqtt-intro.md @@ -19,7 +19,7 @@ Create these files, `zilla.yaml` and `docker-compose.yaml`, in the same director @tab zilla.yaml ```yaml {10,23-25,37,38} -name: MQTT-example +name: MQTT-intro bindings: # Gateway ingress config @@ -28,8 +28,11 @@ bindings: kind: server options: host: 0.0.0.0 - port: 1883 + port: + - 1883 exit: mqtt_server + +# MQTT Broker With an exit to Kafka mqtt_server: type: mqtt kind: server @@ -70,7 +73,7 @@ bindings: kind: client options: host: kafka - port: 9092 + port: 29092 routes: - when: - cidr: 0.0.0.0/0 @@ -81,36 +84,50 @@ bindings: ```yaml version: '3' services: + + zilla: + image: ghcr.io/aklivity/zilla:latest + depends_on: + - kafka + ports: + - 1883:1883 + volumes: + - ./zilla.yaml:/etc/zilla/zilla.yaml + command: start -v -e + kafka: image: docker.io/bitnami/kafka:latest container_name: kafka ports: - - "9092:9092" + - 9092:9092 + - 29092:9092 environment: ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_CFG_NODE_ID: "1" + KAFKA_CFG_BROKER_ID: "1" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@127.0.0.1:9093" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_LOG_DIRS: "/tmp/logs" + KAFKA_CFG_PROCESS_ROLES: "broker,controller" + KAFKA_CFG_LISTENERS: "CLIENT://:9092,INTERNAL://:29092,CONTROLLER://:9093" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL" + KAFKA_CFG_ADVERTISED_LISTENERS: "CLIENT://localhost:9092,INTERNAL://kafka:29092" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" kafka-init: - image: docker.io/bitnami/kafka:latest + image: docker.io/bitnami/kafka:3.2 command: - "/bin/bash" - "-c" - - "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic mqtt-sessions" - - "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic mqtt-messages" - - "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic mqtt-retained" + - | + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic mqtt-messages + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic mqtt-sessions --config cleanup.policy=compact + /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic mqtt-retained --config cleanup.policy=compact depends_on: - kafka init: true - zilla: - image: ghcr.io/aklivity/zilla:latest - container_name: zilla - depends_on: - - kafka - ports: - - "1883:1883" - volumes: - - ./zilla.yaml:/etc/zilla/zilla.yaml - command: start -v networks: default: @@ -128,12 +145,22 @@ docker-compose up -d ### Use [mosquitto_pub](https://mosquitto.org/download/) to send a greeting +Subscribe to the `zilla` topic + +```bash:no-line-numbers +mosquitto_sub -V 'mqttv5' --topic 'zilla' --debug +``` + +In a separate session publish a message on the `zilla` topic + ```bash:no-line-numbers mosquitto_pub -V 'mqttv5' --topic 'zilla' --message 'Hello, world' --debug --insecure ``` +Your subscribed session should receive the message + ::: note Wait for the services to start -if you get this response `curl: (52) Empty reply from server`, the likely cause is Zilla and Kafka are still starting up. +if you are stuck on `Client null sending CONNECT`, the likely cause is Zilla and Kafka are still starting up. ::: ### Remove the running containers diff --git a/src/tutorials/mqtt/zilla.yaml b/src/tutorials/mqtt/zilla.yaml index 6f134ed7..0a5c97fa 100644 --- a/src/tutorials/mqtt/zilla.yaml +++ b/src/tutorials/mqtt/zilla.yaml @@ -1,4 +1,4 @@ -name: zilla-quickstart +name: MQTT-intro bindings: # Gateway ingress config @@ -7,280 +7,25 @@ bindings: kind: server options: host: 0.0.0.0 - port: - - 8080 + port: - 1883 - routes: - - when: - - port: 8080 - exit: http_server - - when: - - port: 1883 - exit: mqtt_server - exit: http_server - telemetry: - metrics: - - stream.* - http_server: - type: http - kind: server - options: - access-control: - policy: cross-origin - routes: - - when: - - headers: - :scheme: http - :authority: localhost:8080 - :path: /sse/stream - exit: sse_server - - when: - - headers: - :scheme: http - :authority: localhost:8080 - :path: /sse/* - exit: sse_http_kafka_proxy - - when: - - headers: - :scheme: http - :authority: localhost:8080 - :path: /api/* - exit: http_api_kafka_proxy - - when: - - headers: - :scheme: http - :authority: localhost:8080 - :path: /iot/sse - exit: sse_server - - when: - - headers: - :scheme: http - :authority: localhost:8080 - :path: /iot/* - exit: mqtt_http_kafka_proxy - - when: - - headers: - :scheme: http - :authority: localhost:8080 - exit: grpc_server - telemetry: - metrics: - - http.* + exit: mqtt_server -# REST proxy endpoints to Kafka a topic - http_api_kafka_proxy: - type: http-kafka - kind: proxy - routes: - - when: - - method: POST - path: /api/items - exit: kafka_cache_client - with: - capability: produce - topic: items-crud - key: ${idempotencyKey} - - when: - - method: PUT - path: /api/items/{id} - exit: kafka_cache_client - with: - capability: produce - topic: items-crud - key: ${params.id} - - when: - - method: DELETE - path: /api/items/{id} - exit: kafka_cache_client - with: - capability: produce - topic: items-crud - key: ${params.id} - - when: - - method: GET - path: /api/items - exit: kafka_cache_client - with: - capability: fetch - topic: items-crud - merge: - content-type: application/json - - when: - - method: GET - path: /api/items/{id} - exit: kafka_cache_client - with: - capability: fetch - topic: items-crud - filters: - - key: ${params.id} - -# SSE proxy endpoints to Kafka a topic - sse_http_kafka_proxy: - type: http-kafka - kind: proxy - routes: - - when: - - method: POST - path: /sse/events - exit: kafka_cache_client - with: - capability: produce - topic: events-sse - key: ${idempotencyKey} - - when: - - method: GET - path: /sse/events - exit: kafka_cache_client - with: - capability: fetch - topic: events-sse - merge: - content-type: application/json - - when: - - method: GET - path: /sse/events/{id} - exit: kafka_cache_client - with: - capability: fetch - topic: events-sse - filters: - - key: ${params.id} - -# SSE Server With an exit to Kafka - sse_server: - type: sse - kind: server - exit: sse_kafka_proxy - sse_kafka_proxy: - type: sse-kafka - kind: proxy - routes: - - when: - - path: /sse/stream - exit: kafka_cache_client - with: - topic: events-sse - - when: - - path: /iot/sse - exit: kafka_cache_client - with: - topic: iot-messages - -# gRPC service definition - grpc_server: - type: grpc - kind: server - options: - services: - - proto/echo.proto - - proto/route_guide.proto - routes: - - when: - - method: example.EchoService/* - exit: grpc_kafka - - when: - - method: routeguide.RouteGuide/* - exit: grpc_kafka - telemetry: - metrics: - - grpc.* - -# gRPC proxy service to Kafka topics - grpc_kafka: - type: grpc-kafka - kind: proxy - routes: - - when: - - method: example.EchoService/* - exit: kafka_cache_client - with: - capability: produce - topic: echo-service-messages - acks: leader_only - reply-to: echo-service-messages - - when: - - method: routeguide.RouteGuide/* - exit: kafka_cache_client - with: - capability: produce - topic: route-guide-requests - acks: leader_only - reply-to: route-guide-responses - -# gRPC Kafka fanout to a remote server - route_guide_remote_server: - type: kafka-grpc - kind: remote_server - entry: kafka_cache_client - options: - acks: leader_only - routes: - - when: - - topic: route-guide-requests - reply-to: route-guide-responses - method: routeguide.RouteGuide/* - with: - scheme: http - authority: ${{env.ROUTE_GUIDE_SERVER_HOST}}:${{env.ROUTE_GUIDE_SERVER_PORT}} - exit: route_guide_server_grpc_client - -# gRPC RoutGuide server - route_guide_server_grpc_client: - type: grpc - kind: client - exit: route_guide_server_http_client - route_guide_server_http_client: - type: http - kind: client - options: - versions: - - h2 - exit: route_guide_server_tcp_client - route_guide_server_tcp_client: - type: tcp - kind: client - options: - host: ${{env.ROUTE_GUIDE_SERVER_HOST}} - port: ${{env.ROUTE_GUIDE_SERVER_PORT}} - -# MQTT proxy endpoints to Kafka a topics - mqtt_http_kafka_proxy: - type: http-kafka - kind: proxy - routes: - - when: - - method: GET - path: /iot/{topic}/{id} - exit: kafka_cache_client - with: - capability: fetch - topic: ${params.topic} - filters: - - key: ${params.id} - - when: - - method: GET - path: /iot/{topic} - exit: kafka_cache_client - with: - capability: fetch - topic: ${params.topic} - merge: - content-type: application/json - -# MQTT Server With an exit to Kafka +# MQTT Broker With an exit to Kafka mqtt_server: type: mqtt kind: server exit: mqtt_kafka_proxy + +# Proxy MQTT messages to Kafka mqtt_kafka_proxy: type: mqtt-kafka kind: proxy options: topics: - sessions: iot-sessions - messages: iot-messages - retained: iot-retained + sessions: mqtt-sessions + messages: mqtt-messages + retained: mqtt-retained exit: kafka_cache_client # Kafka caching layer @@ -293,9 +38,8 @@ bindings: kind: cache_server options: bootstrap: - - items-crud - - iot-sessions - - iot-retained + - mqtt-sessions + - mqtt-retained exit: kafka_client # Connect to local Kafka @@ -312,26 +56,3 @@ bindings: routes: - when: - cidr: 0.0.0.0/0 - -telemetry: - attributes: - service.namespace: zilla-quickstart - - # Desired metrics to track - metrics: - - http.request.size - - http.response.size - - stream.opens.sent - - stream.closes.sent - - grpc.requests.per.rpc - - grpc.responses.per.rpc - - # Prometheus endpoint definition - exporters: - prometheus_metric_exporter: - type: prometheus - options: - endpoints: - - scheme: http - path: /metrics - port: 9090