diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..a2e3fc4 --- /dev/null +++ b/.env.example @@ -0,0 +1,9 @@ +# RabbitMQ Configuration +RABBITMQ_USER=admin +RABBITMQ_PASS=admin + +# MQTT Configuration +MQTT_TOPIC=transit/vehicles/# + +# Redis Configuration +REDIS_DB=0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c0d9001 --- /dev/null +++ b/.gitignore @@ -0,0 +1,40 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +env/ +ENV/ +*.egg-info/ +dist/ +build/ + +# Environment variables +.env + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Docker +*.log + +# Redis +dump.rdb + +# Node.js (if needed later) +node_modules/ +package-lock.json + +# Temporary files +tmp/ +*.tmp diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..6324d40 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.14 diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..3fcec44 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,132 @@ +# Quick Start Guide + +This guide will help you get the databus-mqtt system up and running in minutes. + +## Prerequisites + +- Docker (20.10 or later) +- Docker Compose (or Docker with Compose plugin) + +## Installation Steps + +1. **Clone the repository** + ```bash + git clone https://github.com/simovilab/databus-mqtt.git + cd databus-mqtt + ``` + +2. **Configure environment (optional)** + ```bash + cp .env.example .env + # Edit .env if you want to change default settings + ``` + +3. **Start all services** + ```bash + docker-compose up -d + ``` + +4. **Verify services are running** + ```bash + docker-compose ps + ``` + + You should see three containers running: + - `databus-rabbitmq` - MQTT broker + - `databus-redis` - In-memory database + - `databus-subscriber` - MQTT to Redis bridge + +## Testing the System + +### 1. Access RabbitMQ Management UI + +Open your browser and navigate to: http://localhost:15672 + +- Username: `admin` (or value from .env) +- Password: `admin` (or value from .env) + +### 2. Publish Test Messages + +Install the paho-mqtt library: +```bash +pip install paho-mqtt +``` + +Run the example publisher: +```bash +python examples/publisher_example.py +``` + +This will start publishing simulated vehicle tracking data. + +### 3. Query Stored Data + +Install the redis library: +```bash +pip install redis +``` + +Run the query example: +```bash +python examples/query_redis.py +``` + +This will display all vehicle data stored in Redis. + +### 4. Monitor Subscriber Logs + +Watch the subscriber service process messages: +```bash +docker-compose logs -f subscriber +``` + +## Common Commands + +### View all logs +```bash +docker-compose logs -f +``` + +### Stop all services +```bash +docker-compose down +``` + +### Restart a service +```bash +docker-compose restart subscriber +``` + +### View Redis data directly +```bash +docker exec -it databus-redis redis-cli +# Then run Redis commands like: +# KEYS * +# GET vehicle:BUS-001 +``` + +## Next Steps + +- Customize the MQTT topic in `.env` file +- Modify `subscriber/subscriber.py` to add custom processing logic +- Integrate with your transit vehicle tracking system +- Add additional services to the Docker Compose file + +## Troubleshooting + +**Services won't start:** +- Check if ports 1883, 5672, 6379, or 15672 are already in use +- Ensure Docker has sufficient resources allocated + +**Can't connect to MQTT broker:** +- Wait 30 seconds for RabbitMQ to fully initialize +- Check logs: `docker-compose logs rabbitmq` + +**No data in Redis:** +- Ensure the subscriber is running: `docker-compose ps subscriber` +- Check subscriber logs: `docker-compose logs subscriber` +- Verify you're publishing to the correct topic + +## Support + +For issues and questions, please open an issue on GitHub. diff --git a/README.md b/README.md index 4cc4165..4d4b191 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,209 @@ # databus-mqtt + MQTT broker for real-time transit vehicles tracking and telemetry data + +## Overview + +This repository provides a complete infrastructure for collecting real-time transit vehicle tracking and telemetry data using MQTT protocol. The system uses RabbitMQ as the MQTT broker and Redis as an in-memory database for quick data retrieval. + +## Architecture + +The system consists of three main components: + +1. **RabbitMQ MQTT Broker**: Receives MQTT messages from transit vehicles +2. **Redis**: In-memory database for fast data storage and retrieval +3. **Subscriber Service**: Python service that subscribes to MQTT topics and stores data in Redis + +``` +Transit Vehicles → MQTT → RabbitMQ → Subscriber Service → Redis + (Port 1883) (Port 6379) +``` + +## Features + +- **MQTT Protocol Support**: Full MQTT broker capabilities via RabbitMQ +- **Real-time Data Processing**: Immediate storage of vehicle tracking data +- **Fast Data Retrieval**: Redis in-memory storage for sub-millisecond access +- **Scalable Architecture**: Docker-based containerized services +- **Management Interface**: RabbitMQ management UI on port 15672 +- **Automatic Reconnection**: Built-in retry logic for service reliability +- **TTL Support**: Automatic data expiration (1 hour default) + +## Prerequisites + +- Docker +- Docker Compose + +## Quick Start + +1. Clone the repository: +```bash +git clone https://github.com/simovilab/databus-mqtt.git +cd databus-mqtt +``` + +2. Copy the environment file and adjust settings if needed: +```bash +cp .env.example .env +``` + +3. Start the services: +```bash +docker-compose up -d +``` + +4. Check service status: +```bash +docker-compose ps +``` + +## Configuration + +### Environment Variables + +Edit the `.env` file to customize the configuration: + +- `RABBITMQ_USER`: RabbitMQ username (default: admin) +- `RABBITMQ_PASS`: RabbitMQ password (default: admin) +- `MQTT_TOPIC`: MQTT topic pattern to subscribe to (default: transit/vehicles/#) +- `REDIS_DB`: Redis database number (default: 0) + +### RabbitMQ Configuration + +RabbitMQ configuration is located in `config/rabbitmq/`: +- `enabled_plugins`: Enables MQTT and management plugins +- `rabbitmq.conf`: Main RabbitMQ configuration file + +## Usage + +### Accessing Services + +- **RabbitMQ Management UI**: http://localhost:15672 (login with credentials from .env) +- **MQTT Port**: localhost:1883 +- **Redis Port**: localhost:6379 + +### Publishing Test Messages + +You can publish test messages using any MQTT client. Example using mosquitto_pub: + +```bash +mosquitto_pub -h localhost -p 1883 -u admin -P admin \ + -t "transit/vehicles/bus/1234" \ + -m '{"vehicle_id": "1234", "lat": 40.7128, "lon": -74.0060, "speed": 25, "heading": 180}' +``` + +### Querying Data from Redis + +Connect to Redis and query stored data: + +```bash +docker exec -it databus-redis redis-cli + +# Get all vehicle keys +KEYS vehicle:* + +# Get data for a specific vehicle +GET vehicle:1234 + +# Get recent vehicles by timeline +ZRANGE vehicles:timeline -10 -1 +``` + +## Data Storage Schema + +The subscriber service stores data in Redis with the following structure: + +- **Key Pattern**: `vehicle:{vehicle_id}` +- **Value**: JSON string containing vehicle data with metadata +- **TTL**: 1 hour (3600 seconds) +- **Timeline Index**: Sorted set `vehicles:timeline` for time-based queries + +Example stored data: +```json +{ + "vehicle_id": "1234", + "lat": 40.7128, + "lon": -74.0060, + "speed": 25, + "heading": 180, + "_timestamp": "2024-01-15T10:30:00.123456", + "_topic": "transit/vehicles/bus/1234" +} +``` + +## Monitoring + +### View Subscriber Logs + +```bash +docker-compose logs -f subscriber +``` + +### View RabbitMQ Logs + +```bash +docker-compose logs -f rabbitmq +``` + +### Check Redis Memory Usage + +```bash +docker exec -it databus-redis redis-cli INFO memory +``` + +## Development + +### Project Structure + +``` +databus-mqtt/ +├── config/ +│ └── rabbitmq/ # RabbitMQ configuration +│ ├── enabled_plugins +│ └── rabbitmq.conf +├── subscriber/ # MQTT subscriber service +│ ├── Dockerfile +│ ├── requirements.txt +│ └── subscriber.py +├── docker-compose.yml # Docker Compose configuration +├── .env.example # Environment variables template +└── README.md # This file +``` + +### Extending the Subscriber + +The subscriber service (`subscriber/subscriber.py`) can be extended to: +- Add custom data validation +- Implement additional storage backends +- Add data transformation logic +- Integrate with other services + +## Stopping the Services + +```bash +docker-compose down +``` + +To remove volumes as well: +```bash +docker-compose down -v +``` + +## Troubleshooting + +### RabbitMQ won't start +- Check if ports 1883, 5672, or 15672 are already in use +- Verify Docker has sufficient resources + +### Subscriber can't connect +- Ensure RabbitMQ is fully started (check `docker-compose logs rabbitmq`) +- Verify credentials in `.env` file +- Check network connectivity between containers + +### Redis connection issues +- Verify Redis container is running: `docker-compose ps redis` +- Check Redis logs: `docker-compose logs redis` + +## License + +Apache License 2.0 - See LICENSE file for details diff --git a/config/rabbitmq/enabled_plugins b/config/rabbitmq/enabled_plugins new file mode 100644 index 0000000..e9ec38b --- /dev/null +++ b/config/rabbitmq/enabled_plugins @@ -0,0 +1,2 @@ +[rabbitmq_management, rabbitmq_mqtt]. + diff --git a/config/rabbitmq/rabbitmq.conf b/config/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..3e95f5e --- /dev/null +++ b/config/rabbitmq/rabbitmq.conf @@ -0,0 +1,23 @@ +# RabbitMQ Configuration File + +# MQTT Plugin Configuration +mqtt.listeners.tcp.default = 1883 +mqtt.allow_anonymous = false +mqtt.vhost = / +mqtt.exchange = amq.topic + +# mqtt.subscription_ttl = 86400000 +mqtt.max_session_expiry_interval_seconds = 86400 # Configuración actualizada para RabbitMQ 3.13+ + +mqtt.prefetch = 10 + +# Management Plugin +management.tcp.port = 15672 + +# Logging +log.file.level = info +log.console = true +log.console.level = info + +# VM Memory +vm_memory_high_watermark.relative = 0.6 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..82fb25c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,69 @@ +services: + rabbitmq: + image: rabbitmq:management + container_name: databus-mqtt-rabbitmq + ports: + - "5672:5672" # AMQP port + - "1883:1883" # MQTT port + - "15672:15672" # Management UI + environment: + RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-admin} + RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS:-admin} + volumes: + - ./config/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./config/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - rabbitmq_data:/var/lib/rabbitmq + networks: + - databus-mqtt-network + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 10s + retries: 5 + + redis: + image: redis:7-alpine + container_name: databus-mqtt-redis + ports: + - "6379:6379" + command: redis-server --appendonly yes + volumes: + - redis_data:/data + networks: + - databus-mqtt-network + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 5 + + subscriber: + build: + context: ./subscriber + dockerfile: Dockerfile + container_name: databus-mqtt-subscriber + environment: + MQTT_HOST: rabbitmq + MQTT_PORT: 1883 + MQTT_USER: ${RABBITMQ_USER:-admin} + MQTT_PASS: ${RABBITMQ_PASS:-admin} + MQTT_TOPIC: ${MQTT_TOPIC:-transit/vehicles/#} + REDIS_HOST: redis + REDIS_PORT: 6379 + REDIS_DB: ${REDIS_DB:-0} + depends_on: + rabbitmq: + condition: service_healthy + redis: + condition: service_healthy + networks: + - databus-mqtt-network + restart: unless-stopped + +volumes: + rabbitmq_data: + redis_data: + +networks: + databus-mqtt-network: + driver: bridge diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..3f0621a --- /dev/null +++ b/examples/README.md @@ -0,0 +1,58 @@ +# Examples + +This directory contains example scripts for testing and interacting with the databus-mqtt system. + +## Files + +### publisher_example.py + +A sample MQTT publisher that generates and publishes simulated transit vehicle data. + +**Usage:** +```bash +pip install paho-mqtt +python publisher_example.py +``` + +This script will: +- Connect to the MQTT broker +- Publish simulated vehicle tracking data for 5 buses +- Continue publishing until stopped with Ctrl+C + +### query_redis.py + +A sample script to query and display vehicle data stored in Redis. + +**Usage:** +```bash +pip install redis +python query_redis.py +``` + +This script will: +- Connect to Redis +- Display all stored vehicle data +- Show timeline information +- Print Redis statistics + +## Testing the System + +1. Start the databus-mqtt system: +```bash +docker-compose up -d +``` + +2. Run the publisher example in one terminal: +```bash +python examples/publisher_example.py +``` + +3. Run the query script in another terminal to see the stored data: +```bash +python examples/query_redis.py +``` + +4. You can also check the subscriber logs: +```bash +docker-compose logs -f subscriber +``` diff --git a/examples/publisher_example.py b/examples/publisher_example.py new file mode 100644 index 0000000..60917dd --- /dev/null +++ b/examples/publisher_example.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +""" +Example MQTT publisher for testing the databus-mqtt system. +This script publishes sample transit vehicle data to the MQTT broker. + +Usage: + pip install paho-mqtt + python publisher_example.py +""" + +import json +import time +import random +from datetime import datetime +import paho.mqtt.client as mqtt + +# Configuration +MQTT_HOST = "localhost" +MQTT_PORT = 1883 +MQTT_USER = "admin" +MQTT_PASS = "admin" +MQTT_TOPIC_BASE = "transit/vehicles/bus" + +def generate_vehicle_data(vehicle_id): + """Generate sample vehicle tracking data""" + # Simulate vehicle moving in NYC area + lat = 40.7128 + random.uniform(-0.1, 0.1) + lon = -74.0060 + random.uniform(-0.1, 0.1) + + return { + "vehicle_id": vehicle_id, + "route": f"Route-{random.randint(1, 10)}", + "lat": round(lat, 6), + "lon": round(lon, 6), + "speed": round(random.uniform(0, 50), 1), + "heading": random.randint(0, 359), + "passengers": random.randint(0, 50), + "timestamp": datetime.utcnow().isoformat() + } + +def on_connect(client, userdata, flags, rc): + """Callback for when the client connects to the broker""" + if rc == 0: + print(f"Connected to MQTT broker at {MQTT_HOST}:{MQTT_PORT}") + else: + print(f"Failed to connect, return code: {rc}") + +def on_publish(client, userdata, mid): + """Callback for when a message is published""" + print(f"Message {mid} published successfully") + +def main(): + """Main function to publish sample data""" + print("Starting MQTT publisher example...") + + # Create MQTT client + client = mqtt.Client(client_id="example-publisher") + client.username_pw_set(MQTT_USER, MQTT_PASS) + client.on_connect = on_connect + client.on_publish = on_publish + + try: + # Connect to broker + print(f"Connecting to MQTT broker at {MQTT_HOST}:{MQTT_PORT}...") + client.connect(MQTT_HOST, MQTT_PORT, 60) + client.loop_start() + + # Wait for connection + time.sleep(2) + + # Publish messages for 5 different vehicles + vehicle_ids = ["BUS-001", "BUS-002", "BUS-003", "BUS-004", "BUS-005"] + + print("\nPublishing vehicle data (press Ctrl+C to stop)...") + message_count = 0 + + while True: + for vehicle_id in vehicle_ids: + # Generate data + data = generate_vehicle_data(vehicle_id) + topic = f"{MQTT_TOPIC_BASE}/{vehicle_id}" + + # Publish message + payload = json.dumps(data) + result = client.publish(topic, payload, qos=1) + + if result.rc == mqtt.MQTT_ERR_SUCCESS: + message_count += 1 + print(f"[{message_count}] Published to {topic}: " + f"Vehicle {vehicle_id} at ({data['lat']}, {data['lon']}) " + f"speed {data['speed']} km/h") + else: + print(f"Failed to publish message to {topic}") + + # Wait a bit between messages + time.sleep(1) + + # Wait before next round + time.sleep(2) + + except KeyboardInterrupt: + print("\n\nStopping publisher...") + except Exception as e: + print(f"Error: {e}") + finally: + client.loop_stop() + client.disconnect() + print("Publisher stopped") + +if __name__ == "__main__": + main() diff --git a/examples/query_redis.py b/examples/query_redis.py new file mode 100644 index 0000000..ea2bdd9 --- /dev/null +++ b/examples/query_redis.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +""" +Example Redis query script for retrieving stored vehicle data. + +Usage: + pip install redis + python query_redis.py +""" + +import redis +import json +import sys + +# Configuration +REDIS_HOST = "localhost" +REDIS_PORT = 6379 +REDIS_DB = 0 + +def connect_redis(): + """Connect to Redis""" + try: + client = redis.Redis( + host=REDIS_HOST, + port=REDIS_PORT, + db=REDIS_DB, + decode_responses=True + ) + client.ping() + print(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}") + return client + except redis.ConnectionError as e: + print(f"Failed to connect to Redis: {e}") + sys.exit(1) + +def print_vehicle_data(client, key): + """Print vehicle data from a Redis key""" + data = client.get(key) + if data: + vehicle_info = json.loads(data) + print(f"\n{key}:") + print(f" Vehicle ID: {vehicle_info.get('vehicle_id', 'N/A')}") + print(f" Location: ({vehicle_info.get('lat', 'N/A')}, {vehicle_info.get('lon', 'N/A')})") + print(f" Speed: {vehicle_info.get('speed', 'N/A')} km/h") + print(f" Heading: {vehicle_info.get('heading', 'N/A')}°") + print(f" Timestamp: {vehicle_info.get('_timestamp', 'N/A')}") + print(f" Topic: {vehicle_info.get('_topic', 'N/A')}") + else: + print(f"No data found for {key}") + +def main(): + """Main function to query Redis data""" + print("Redis Query Example\n" + "=" * 50) + + client = connect_redis() + + # Get all vehicle keys + print("\nFetching all vehicle keys...") + vehicle_keys = client.keys("vehicle:*") + + if not vehicle_keys: + print("No vehicle data found in Redis.") + print("Make sure the subscriber is running and receiving MQTT messages.") + return + + print(f"Found {len(vehicle_keys)} vehicle(s) in Redis:") + + # Print each vehicle's data + for key in sorted(vehicle_keys): + print_vehicle_data(client, key) + + # Query timeline + print("\n" + "=" * 50) + print("Recent vehicles (from timeline):") + timeline = client.zrange("vehicles:timeline", -10, -1, withscores=True) + + if timeline: + for key, score in timeline: + print(f" {key} (last update: {score})") + else: + print(" No timeline data available") + + # Print statistics + print("\n" + "=" * 50) + print("Redis Statistics:") + info = client.info('memory') + print(f" Memory used: {info['used_memory_human']}") + print(f" Total keys: {client.dbsize()}") + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4b87b22 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "databus-mqtt" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.14" +dependencies = [ + "paho-mqtt>=2.1.0", + "redis>=7.0.0", +] diff --git a/subscriber/Dockerfile b/subscriber/Dockerfile new file mode 100644 index 0000000..404b08c --- /dev/null +++ b/subscriber/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +# Copy requirements and install dependencies +COPY requirements.txt . +RUN uv pip install --system --no-cache -r requirements.txt + +COPY . . + +CMD ["python", "subscriber.py"] diff --git a/subscriber/requirements.txt b/subscriber/requirements.txt new file mode 100644 index 0000000..018929a --- /dev/null +++ b/subscriber/requirements.txt @@ -0,0 +1,2 @@ +paho-mqtt>=2.1.0 +redis>=7.0.0 diff --git a/subscriber/subscriber.py b/subscriber/subscriber.py new file mode 100644 index 0000000..b5f38df --- /dev/null +++ b/subscriber/subscriber.py @@ -0,0 +1,150 @@ +import os +import json +import time +import logging +from datetime import datetime +import paho.mqtt.client as mqtt +import redis + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Configuration from environment variables +MQTT_HOST = os.getenv('MQTT_HOST', 'localhost') +MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) +MQTT_USER = os.getenv('MQTT_USER', 'admin') +MQTT_PASS = os.getenv('MQTT_PASS', 'admin') +MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'transit/vehicles/#') + +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PORT = int(os.getenv('REDIS_PORT', 6379)) +REDIS_DB = int(os.getenv('REDIS_DB', 0)) + +# Redis connection +redis_client = None + +def connect_redis(): + """Connect to Redis with retry logic""" + global redis_client + max_retries = 5 + retry_delay = 5 + + for attempt in range(max_retries): + try: + redis_client = redis.Redis( + host=REDIS_HOST, + port=REDIS_PORT, + db=REDIS_DB, + decode_responses=True + ) + redis_client.ping() + logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}") + return True + except redis.ConnectionError as e: + logger.warning(f"Redis connection attempt {attempt + 1}/{max_retries} failed: {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + + logger.error("Failed to connect to Redis after maximum retries") + return False + +def on_connect(client, userdata, flags, rc): + """Callback for when the client connects to the broker""" + if rc == 0: + logger.info(f"Connected to MQTT broker at {MQTT_HOST}:{MQTT_PORT}") + client.subscribe(MQTT_TOPIC) + logger.info(f"Subscribed to topic: {MQTT_TOPIC}") + else: + logger.error(f"Failed to connect to MQTT broker, return code: {rc}") + +def on_message(client, userdata, msg): + """Callback for when a message is received from the broker""" + try: + topic = msg.topic + payload = msg.payload.decode('utf-8') + + logger.info(f"Received message on topic '{topic}': {payload[:100]}...") + + # Parse JSON payload if applicable + try: + data = json.loads(payload) + except json.JSONDecodeError: + data = {"raw": payload} + + # Add metadata + data['_timestamp'] = datetime.utcnow().isoformat() + data['_topic'] = topic + + # Extract vehicle ID or use timestamp for unique key + vehicle_id = data.get('vehicle_id', data.get('id', f"unknown_{int(time.time() * 1000)}")) + stream_key = f"vehicle:{vehicle_id}:stream" + + # Add to Redis Stream + redis_client.xadd(stream_key, data) + + # Set TTL on the stream (1 hour) + redis_client.expire(stream_key, 3600) + + logger.info(f"Added vehicle data to Redis Stream: {stream_key}") + + except Exception as e: + logger.error(f"Error processing message: {e}", exc_info=True) + +def on_disconnect(client, userdata, rc): + """Callback for when the client disconnects from the broker""" + if rc != 0: + logger.warning(f"Unexpected disconnection from MQTT broker, return code: {rc}") + else: + logger.info("Disconnected from MQTT broker") + +def main(): + """Main function to start the subscriber""" + logger.info("Starting MQTT subscriber service...") + + # Connect to Redis + if not connect_redis(): + logger.error("Cannot start without Redis connection") + return + + # Create MQTT client + client = mqtt.Client(client_id="databus-subscriber") + client.username_pw_set(MQTT_USER, MQTT_PASS) + + # Set callbacks + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + # Connect to MQTT broker with retry logic + max_retries = 5 + retry_delay = 5 + + for attempt in range(max_retries): + try: + logger.info(f"Attempting to connect to MQTT broker (attempt {attempt + 1}/{max_retries})...") + client.connect(MQTT_HOST, MQTT_PORT, 60) + break + except Exception as e: + logger.warning(f"MQTT connection attempt {attempt + 1}/{max_retries} failed: {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + else: + logger.error("Failed to connect to MQTT broker after maximum retries") + return + + # Start the loop + try: + logger.info("Starting MQTT client loop...") + client.loop_forever() + except KeyboardInterrupt: + logger.info("Received shutdown signal") + finally: + client.disconnect() + logger.info("Subscriber service stopped") + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..5deef96 --- /dev/null +++ b/uv.lock @@ -0,0 +1,36 @@ +version = 1 +revision = 3 +requires-python = ">=3.14" + +[[package]] +name = "databus-mqtt" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "paho-mqtt" }, + { name = "redis" }, +] + +[package.metadata] +requires-dist = [ + { name = "paho-mqtt", specifier = ">=2.1.0" }, + { name = "redis", specifier = ">=7.0.0" }, +] + +[[package]] +name = "paho-mqtt" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848, upload-time = "2024-04-29T19:52:55.591Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" }, +] + +[[package]] +name = "redis" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d2/0e/80de0c7d9b04360331906b6b713a967e6523d155a92090983eba2e99302e/redis-7.0.0.tar.gz", hash = "sha256:6546ada54354248a53a47342d36abe6172bb156f23d24f018fda2e3c06b9c97a", size = 4754895, upload-time = "2025-10-22T15:38:36.128Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/aa/de/68c1add9d9a49588e6f75a149e079e44bab973e748a35e0582ccada09002/redis-7.0.0-py3-none-any.whl", hash = "sha256:1e66c8355b3443af78367c4937484cd875fdf9f5f14e1fed14aa95869e64f6d1", size = 339526, upload-time = "2025-10-22T15:38:34.901Z" }, +]