Skip to content

Commit

Permalink
Merge pull request #36 from ks6088ts-labs/feature/issue-35_eventgrid-…
Browse files Browse the repository at this point in the history
…scenario

add event grid messaging scenario
  • Loading branch information
ks6088ts authored Oct 23, 2024
2 parents 3a34b20 + a27844b commit b98475c
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,6 @@ cython_debug/
# Project
*.env
requirements.txt
*.pem
*.crt
*.key
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ server: ## run server
.PHONY: env
env: ## create env files
@sh scripts/create_env_files.sh

.PHONY: mosquitto
mosquitto: ## run mosquitto
cd configs/mosquitto && mosquitto -c tls.conf
12 changes: 12 additions & 0 deletions configs/mosquitto/tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
per_listener_settings true

listener 1883
allow_anonymous true

listener 8883
allow_anonymous true
require_certificate true
cafile chain.pem
certfile localhost.crt
keyfile localhost.key
tls_version tlsv1.2
Binary file added docs/assets/2_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
89 changes: 89 additions & 0 deletions docs/scenarios/2_azure_event_grid_messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Azure Event Grid Messaging

This scenario demonstrates how to handle messages from Azure Event Grid.

## Architecture

[![architecture](../assets/2_architecture.png)](../assets/2_architecture.png)

## Setup

Refer to [Quickstart: Publish and subscribe to MQTT messages on Event Grid Namespace with Azure portal](https://learn.microsoft.com/en-us/azure/event-grid/mqtt-publish-and-subscribe-portal) to create an Event Grid Namespace and a topic.

[Azure-Samples/MqttApplicationSamples](https://github.com/Azure-Samples/MqttApplicationSamples) provides a sample application to publish and subscribe messages to the Event Grid.

### Create CA certificate and key

```shell
step ca init \
--deployment-type standalone \
--name MqttAppSamplesCA \
--dns localhost \
--address 127.0.0.1:443 \
--provisioner MqttAppSamplesCAProvisioner
```

### Create client certificate and key

```shell
CLIENT_DIR=configs/clients
mkdir -p $CLIENT_DIR

# Create client certificate and key
CLIENT_NAME=client1
step certificate create $CLIENT_NAME $CLIENT_DIR/$CLIENT_NAME.pem $CLIENT_DIR/$CLIENT_NAME.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Display certificate fingerprint to register the client on Azure Event Grid Namespace
step certificate fingerprint $CLIENT_DIR/$CLIENT_NAME.pem
```

### Create mosquitto configuration

```shell
MOSQUITTO_DIR=configs/mosquitto
mkdir -p $MOSQUITTO_DIR

# Set up mosquitto
cat ~/.step/certs/root_ca.crt ~/.step/certs/intermediate_ca.crt > $MOSQUITTO_DIR/chain.pem

step certificate create localhost $MOSQUITTO_DIR/localhost.crt $MOSQUITTO_DIR/localhost.key \
--ca ~/.step/certs/intermediate_ca.crt \
--ca-key ~/.step/secrets/intermediate_ca_key \
--no-password \
--insecure \
--not-after 2400h

# Run mosquitto
make mosquitto
```

## Demo

```shell
# If you use localhost, run mosquitto first
make mosquitto

# Set the host name of the Event Grid Namespace. If you use localhost, set it to localhost.
HOST_NAME=localhost
# HOST_NAME="EVENT_GRID_NAME.japaneast-1.ts.eventgrid.azure.net"

# Subscribe the topic
poetry run python scripts/event_grid.py subscribe \
--topic "sample/topic1" \
--client-name client1 \
--host-name $HOST_NAME \
--verbose

# Publish messages to the topic
poetry run python scripts/event_grid.py publish \
--topic "sample/topic1" \
--payload "helloworld" \
--client-name client2 \
--host-name $HOST_NAME \
--verbose
```
12 changes: 12 additions & 0 deletions eventgrid.env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
MQTT_HOST_NAME="localhost" or "EVENT_GRID_NAMESPACE_NAME.japaneast-1.ts.eventgrid.azure.net"
MQTT_TCP_PORT="8883"
MQTT_USE_TLS="true"
MQTT_CLEAN_SESSION="true"
MQTT_KEEP_ALIVE_IN_SECONDS="30"
MQTT_CLIENT_ID="sample_client"
MQTT_USERNAME="sample_client"
MQTT_PASSWORD=""
MQTT_CA_FILE="chain.pem"
MQTT_CERT_FILE="sample_client.pem"
MQTT_KEY_FILE="sample_client.key"
MQTT_KEY_FILE_PASSWORD=""
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ nav:
- Home: index.md
- Scenarios:
- 1. Azure IoT Hub Messaging: scenarios/1_azure_iot_hub_messaging.md
- 2. Azure Event Grid Messaging: scenarios/2_azure_event_grid_messaging.md
theme:
name: material
# https://squidfunk.github.io/mkdocs-material/setup/changing-the-colors/#automatic-light-dark-mode
Expand Down
143 changes: 143 additions & 0 deletions scripts/event_grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import ssl
import time

import paho.mqtt.client as mqtt
import typer
from dotenv import load_dotenv

app = typer.Typer()
logger = logging.getLogger(__name__)


def get_connection_settings(
host_name: str,
client_name: str,
) -> dict:
return {
"MQTT_HOST_NAME": host_name,
"MQTT_USERNAME": client_name,
"MQTT_CLIENT_ID": client_name,
"MQTT_CERT_FILE": f"configs/clients/{client_name}.pem",
"MQTT_KEY_FILE": f"configs/clients/{client_name}.key",
"MQTT_CA_FILE": "configs/mosquitto/chain.pem",
"MQTT_TCP_PORT": 8883,
"MQTT_USE_TLS": True,
"MQTT_CLEAN_SESSION": True,
"MQTT_KEEP_ALIVE_IN_SECONDS": 60,
"MQTT_KEY_FILE_PASSWORD": None,
}


def get_mqtt_client(
connection_settings: dict,
):
client = mqtt.Client(
client_id=connection_settings["MQTT_CLIENT_ID"],
clean_session=connection_settings["MQTT_CLEAN_SESSION"],
protocol=mqtt.MQTTv311,
transport="tcp",
)
if "MQTT_USERNAME" in connection_settings:
client.username_pw_set(
username=connection_settings["MQTT_USERNAME"],
password=connection_settings["MQTT_PASSWORD"] if "MQTT_PASSWORD" in connection_settings else None,
)
if connection_settings["MQTT_USE_TLS"]:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3

if connection_settings["MQTT_CERT_FILE"]:
context.load_cert_chain(
certfile=connection_settings["MQTT_CERT_FILE"],
keyfile=connection_settings["MQTT_KEY_FILE"],
password=connection_settings["MQTT_KEY_FILE_PASSWORD"],
)
if connection_settings["MQTT_HOST_NAME"] == "localhost":
context.load_verify_locations(
cafile=connection_settings["MQTT_CA_FILE"],
)
else:
context.load_default_certs()

client.tls_set_context(context)
return client


def attach_functions(client: mqtt.Client) -> mqtt.Client:
client.on_connect = lambda client, userdata, flags, rc: logger.info(
f"on_connect: client={client}, userdata={userdata}, flags={flags}, rc={rc}"
)
client.on_disconnect = lambda client, userdata, rc: logger.info(
f"on_disconnect: client={client}, userdata={userdata}, rc={rc}"
)
client.on_message = lambda client, userdata, message: logger.info(
f"on_message: client={client}, userdata={userdata}, message={message.payload.decode()}"
)
return client


@app.command()
def publish(
topic: str = "sample/topic1",
payload: str = "Hello, World!",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)

mqtt_client.loop_start()
result = mqtt_client.publish(
topic=topic,
payload=payload,
)
logger.info(result)
# fixme: wait for the message to be sent
time.sleep(1)

mqtt_client.loop_stop()


@app.command()
def subscribe(
topic: str = "sample/topic1",
client_name: str = "client1",
host_name: str = "localhost",
verbose: bool = False,
):
if verbose:
logging.basicConfig(level=logging.DEBUG)

connection_settings = get_connection_settings(
client_name=client_name,
host_name=host_name,
)
mqtt_client = get_mqtt_client(connection_settings)
mqtt_client = attach_functions(mqtt_client)

mqtt_client.connect(
host=connection_settings["MQTT_HOST_NAME"],
port=connection_settings["MQTT_TCP_PORT"],
)
mqtt_client.subscribe(topic)
mqtt_client.loop_forever()


if __name__ == "__main__":
load_dotenv("event_grid.env")
app()

0 comments on commit b98475c

Please sign in to comment.