Skip to content

Commit

Permalink
run event grid scripts to communicate via MQTT broker of both mosquit…
Browse files Browse the repository at this point in the history
…to and Event Grid
  • Loading branch information
ks6088ts committed Oct 23, 2024
1 parent 3a34b20 commit a27844b
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,6 @@ cython_debug/
# Project
*.env
requirements.txt
*.pem
*.crt
*.key
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ server: ## run server
.PHONY: env
env: ## create env files
@sh scripts/create_env_files.sh

.PHONY: mosquitto
mosquitto: ## run mosquitto
cd configs/mosquitto && mosquitto -c tls.conf
12 changes: 12 additions & 0 deletions configs/mosquitto/tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
per_listener_settings true

listener 1883
allow_anonymous true

listener 8883
allow_anonymous true
require_certificate true
cafile chain.pem
certfile localhost.crt
keyfile localhost.key
tls_version tlsv1.2
Binary file added docs/assets/2_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
89 changes: 89 additions & 0 deletions docs/scenarios/2_azure_event_grid_messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Azure Event Grid Messaging

This scenario demonstrates how to handle messages from Azure Event Grid.

## Architecture

[![architecture](../assets/2_architecture.png)](../assets/2_architecture.png)

## Setup

Refer to [Quickstart: Publish and subscribe to MQTT messages on Event Grid Namespace with Azure portal](https://learn.microsoft.com/en-us/azure/event-grid/mqtt-publish-and-subscribe-portal) to create an Event Grid Namespace and a topic.

[Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples) provides a sample application to publish and subscribe messages to the Event Grid.

### Create CA certificate and key

```shell
step ca init \
--deployment-type standalone \
--name MqttAppSamplesCA \
--dns localhost \
--address 127.0.0.1:443 \
--provisioner MqttAppSamplesCAProvisioner
```

### Create client certificate and key

```shell
CLIENT_DIR=configs/clients
mkdir -p $CLIENT_DIR

# Create client certificate and key
CLIENT_NAME=client1
step certificate create $CLIENT_NAME $CLIENT_DIR/$CLIENT_NAME.pem $CLIENT_DIR/$CLIENT_NAME.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Display certificate fingerprint to register the client on Azure Event Grid Namespace
step certificate fingerprint $CLIENT_DIR/$CLIENT_NAME.pem
```

### Create mosquitto configuration

```shell
MOSQUITTO_DIR=configs/mosquitto
mkdir -p $MOSQUITTO_DIR

# Set up mosquitto
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $MOSQUITTO_DIR/chain.pem

step certificate create localhost $MOSQUITTO_DIR/localhost.crt $MOSQUITTO_DIR/localhost.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Run mosquitto
make mosquitto
```

## Demo

```shell
# If you use localhost, run mosquitto first
make mosquitto

# Set the host name of the Event Grid Namespace. If you use localhost, set it to localhost.
HOST_NAME=localhost
# HOST_NAME="EVENT_GRID_NAME.japaneast-1.ts.eventgrid.azure.net"

# Subscribe the topic
poetry run python scripts/event_grid.py subscribe \
--topic "sample/topic1" \
--client-name client1 \
--host-name $HOST_NAME \
--verbose

# Publish messages to the topic
poetry run python scripts/event_grid.py publish \
--topic "sample/topic1" \
--payload "helloworld" \
--client-name client2 \
--host-name $HOST_NAME \
--verbose
```
12 changes: 12 additions & 0 deletions eventgrid.env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
MQTT_HOST_NAME="localhost" or "EVENT_GRID_NAMESPACE_NAME.japaneast-1.ts.eventgrid.azure.net"
MQTT_TCP_PORT="8883"
MQTT_USE_TLS="true"
MQTT_CLEAN_SESSION="true"
MQTT_KEEP_ALIVE_IN_SECONDS="30"
MQTT_CLIENT_ID="sample_client"
MQTT_USERNAME="sample_client"
MQTT_PASSWORD=""
MQTT_CA_FILE="chain.pem"
MQTT_CERT_FILE="sample_client.pem"
MQTT_KEY_FILE="sample_client.key"
MQTT_KEY_FILE_PASSWORD=""
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ nav:
- Home: index.md
- Scenarios:
- 1. Azure IoT Hub Messaging: scenarios/1_azure_iot_hub_messaging.md
- 2. Azure Event Grid Messaging: scenarios/2_azure_event_grid_messaging.md
theme:
name: material
# https://squidfunk.github.io/mkdocs-material/setup/changing-the-colors/#automatic-light-dark-mode
Expand Down
143 changes: 143 additions & 0 deletions scripts/event_grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import ssl
import time

import paho.mqtt.client as mqtt
import typer
from dotenv import load_dotenv

app = typer.Typer()
logger = logging.getLogger(__name__)


def get_connection_settings(
host_name: str,
client_name: str,
) -> dict:
return {
"MQTT_HOST_NAME": host_name,
"MQTT_USERNAME": client_name,
"MQTT_CLIENT_ID": client_name,
"MQTT_CERT_FILE": f"configs/clients/{client_name}.pem",
"MQTT_KEY_FILE": f"configs/clients/{client_name}.key",
"MQTT_CA_FILE": "configs/mosquitto/chain.pem",
"MQTT_TCP_PORT": 8883,
"MQTT_USE_TLS": True,
"MQTT_CLEAN_SESSION": True,
"MQTT_KEEP_ALIVE_IN_SECONDS": 60,
"MQTT_KEY_FILE_PASSWORD": None,
}


def get_mqtt_client(
connection_settings: dict,
):
client = mqtt.Client(
client_id=connection_settings["MQTT_CLIENT_ID"],
clean_session=connection_settings["MQTT_CLEAN_SESSION"],
protocol=mqtt.MQTTv311,
transport="tcp",
)
if "MQTT_USERNAME" in connection_settings:
client.username_pw_set(
username=connection_settings["MQTT_USERNAME"],
password=connection_settings["MQTT_PASSWORD"] if "MQTT_PASSWORD" in connection_settings else None,
)
if connection_settings["MQTT_USE_TLS"]:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3

if connection_settings["MQTT_CERT_FILE"]:
context.load_cert_chain(
certfile=connection_settings["MQTT_CERT_FILE"],
keyfile=connection_settings["MQTT_KEY_FILE"],
password=connection_settings["MQTT_KEY_FILE_PASSWORD"],
)
if connection_settings["MQTT_HOST_NAME"] == "localhost":
context.load_verify_locations(
cafile=connection_settings["MQTT_CA_FILE"],
)
else:
context.load_default_certs()

client.tls_set_context(context)
return client


def attach_functions(client: mqtt.Client) -> mqtt.Client:
client.on_connect = lambda client, userdata, flags, rc: logger.info(
f"on_connect: client={client}, userdata={userdata}, flags={flags}, rc={rc}"
)
client.on_disconnect = lambda client, userdata, rc: logger.info(
f"on_disconnect: client={client}, userdata={userdata}, rc={rc}"
)
client.on_message = lambda client, userdata, message: logger.info(
f"on_message: client={client}, userdata={userdata}, message={message.payload.decode()}"
)
return client


@app.command()
def publish(
topic: str = "sample/topic1",
payload: str = "Hello, World!",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)

mqtt_client.loop_start()
result = mqtt_client.publish(
topic=topic,
payload=payload,
)
logger.info(result)
# fixme: wait for the message to be sent
time.sleep(1)

mqtt_client.loop_stop()


@app.command()
def subscribe(
topic: str = "sample/topic1",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)
mqtt_client.subscribe(topic)
mqtt_client.loop_forever()


if __name__ == "__main__":
load_dotenv("event_grid.env")
app()

0 comments on commit a27844b

Please sign in to comment.