Skip to content

Commit

Permalink
refactor; unify emoji style; use recent python features
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Hensel committed Jul 7, 2024
1 parent b22cb74 commit f2bba9b
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 453 deletions.
24 changes: 13 additions & 11 deletions .github/workflows/code-quality.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ jobs:
- name: Install dependencies
run: |
python -m pip install -q -U pip
pip install -q -U flake8 pytest
if [ -f requirements.txt ]; then pip install -q -U -r requirements.txt; fi
- name: Lint with flake8
continue-on-error: true
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=120 --statistics
- name: Test with pytest
pip install -q -U -r requirements.txt
pip install -q -U pytest pytest-asyncio coverage prospector[with_everything]
- name: Run tests with coverage
run: |
pytest
coverage run -m pytest
coverage report -m
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: false

- name: Run Prospector
run: prospector
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ wheels/
.installed.cfg
*.egg

.coverage

# dotenv
.envrc

Expand Down
13 changes: 7 additions & 6 deletions src/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_authorized_users(self) -> List[int]:

def setup_logging(self) -> None:
log_level = self._parse_log_level(self.get('logging.level', 'INFO'))
formatter = SensitiveFormatter('%(asctime)s %(levelname)s [%(name)s] %(message)s',)
formatter = SensitiveFormatter('%(asctime)s %(levelname)s [%(name)s] %(message)s')

logging.basicConfig(
level=log_level,
Expand Down Expand Up @@ -54,9 +54,10 @@ def _parse_log_level(self, level: Any) -> int:

def _setup_syslog_handler(self) -> None:
try:
syslog_handler = logging.handlers.SysLogHandler(
from logging.handlers import SysLogHandler
syslog_handler = SysLogHandler(
address=(self.get('logging.syslog_host'), self.get('logging.syslog_port', 514)),
socktype=logging.handlers.socket.SOCK_DGRAM if self.get('logging.syslog_protocol', 'udp') == 'udp' else logging.handlers.socket.SOCK_STREAM
socktype=SysLogHandler.UDP_SOCKET if self.get('logging.syslog_protocol', 'udp') == 'udp' else SysLogHandler.TCP_SOCKET
)
syslog_handler.setFormatter(SensitiveFormatter('%(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(syslog_handler)
Expand All @@ -70,9 +71,9 @@ def validate_config(self) -> None:
'meshtastic.connection_type',
'meshtastic.device',
]
for key in required_keys:
if not self.get(key):
raise ValueError(f"Missing required configuration: {key}")
missing_keys = [key for key in required_keys if not self.get(key)]
if missing_keys:
raise ValueError(f"Missing required configuration: {', '.join(missing_keys)}")

class SensitiveFormatter(logging.Formatter):
def __init__(self, fmt: Optional[str] = None, datefmt: Optional[str] = None):
Expand Down
44 changes: 13 additions & 31 deletions src/meshgram.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import asyncio
from typing import Optional, List
from typing import Optional, Sequence
from meshtastic_interface import MeshtasticInterface
from telegram_interface import TelegramInterface
from message_processor import MessageProcessor
Expand All @@ -14,7 +14,7 @@ def __init__(self, config: ConfigManager) -> None:
self.meshtastic: Optional[MeshtasticInterface] = None
self.telegram: Optional[TelegramInterface] = None
self.message_processor: Optional[MessageProcessor] = None
self.tasks: List[Task] = []
self.tasks: Sequence[Task] = ()

async def setup(self) -> None:
self.logger.info("Setting up meshgram...")
Expand All @@ -36,47 +36,29 @@ async def run(self) -> None:
try:
await self.setup()
except Exception as e:
self.logger.error(f"Failed to set up Meshgram: {e}")
self.logger.error(f"Failed to set up Meshgram: {e}", exc_info=True)
return

self.logger.info("Meshgram is running ヽ(´▽`)/")
self.tasks = [
self.logger.info("Meshgram is running.")
self.tasks = (
asyncio.create_task(self.message_processor.process_messages()),
asyncio.create_task(self.meshtastic.process_thread_safe_queue()),
asyncio.create_task(self.meshtastic.process_pending_messages()),
asyncio.create_task(self.telegram.start_polling()),
asyncio.create_task(self.message_processor.check_heartbeats())
]
)
try:
await asyncio.gather(*self.tasks)
except asyncio.CancelledError:
self.logger.info("Received cancellation signal.")
except Exception as e:
self.logger.error(f"An error occurred: {e}", exc_info=True)
self.handle_task_exceptions()
finally:
await self.shutdown()

async def shutdown(self) -> None:
self.logger.info("Shutting down meshgram...")
for task in self.tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
if self.meshtastic:
await self.meshtastic.close()
if self.telegram:
await self.telegram.close()
if self.message_processor:
if hasattr(self.message_processor, 'close'):
await self.message_processor.close()
else:
self.logger.warning("MessageProcessor does not have a close method.")
self.logger.info("Meshgram shutdown complete.")

async def main() -> None:
parser = argparse.ArgumentParser(description='Meshgram: Meshtastic-Telegram Bridge')
parser.add_argument('-c', '--config', default='config/config.yaml', help='Path to configuration file')
parser.add_argument('--version', action='version', version='%(prog)s 1.0.0')
parser.add_argument('--version', action='version', version='%(prog)s 1.0.0') # FIXME, get git head revision somehow
args = parser.parse_args()

config = ConfigManager(args.config)
Expand All @@ -86,15 +68,15 @@ async def main() -> None:
app = Meshgram(config)
try:
await app.run()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt. Shutting down gracefully...")
except Exception as e:
logger.error(f"Unhandled exception: {e}", exc_info=True)
except* ExceptionGroup as eg:
logger.error("Exception(s) occurred:")
for i, e in enumerate(eg.exceptions, 1):
logger.error(f"Exception {i}: {e}", exc_info=e)
finally:
await app.shutdown()

if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutdown complete.")
print("\nShutdown complete.")
88 changes: 44 additions & 44 deletions src/meshtastic_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ def __init__(self, config: ConfigManager) -> None:
self.config = config
self.logger = get_logger(__name__)
self.interface: Optional[Union[SerialInterface, TCPInterface]] = None
self.message_queue = asyncio.Queue()
self.thread_safe_queue = queue.Queue()
self.message_queue: asyncio.Queue = asyncio.Queue()
self.thread_safe_queue: queue.Queue = queue.Queue()
self.loop = asyncio.get_event_loop()
self.pending_messages: List[PendingMessage] = []
self.last_telemetry: Dict[str, Any] = {}
self.max_retries, self.retry_interval = 3, 60
self.max_retries: int = 3
self.retry_interval: int = 60
self.node_manager = NodeManager()
self.is_setup = False
self.is_setup: bool = False

async def setup(self) -> None:
self.logger.info("Setting up meshtastic interface...")
Expand Down Expand Up @@ -65,12 +66,24 @@ async def _fetch_node_info(self) -> None:
except Exception as e:
self.logger.error(f"Failed to get node info: {e}")

def on_meshtastic_message(self, packet, interface):
def on_meshtastic_message(self, packet: Dict[str, Any], interface: Any) -> None:
self.logger.info(f"Received message from Meshtastic: {packet}")
self.logger.debug(f"Message details - fromId: {packet.get('fromId')}, toId: {packet.get('toId')}, portnum: {packet.get('decoded', {}).get('portnum')}")
self.thread_safe_queue.put(packet)

def on_connection(self, interface, topic=pub.AUTO_TOPIC):
if packet.get('decoded', {}).get('portnum') == 'ROUTING_APP':
self.handle_ack(packet)
else:
self.thread_safe_queue.put(packet)

def handle_ack(self, packet: Dict[str, Any]) -> None:
ack_data = {
'type': 'ack',
'from': packet.get('fromId'),
'to': packet.get('toId'),
'message_id': packet.get('id')
}
self.loop.call_soon_threadsafe(self.message_queue.put_nowait, ack_data)

def on_connection(self, interface: Any, topic: Optional[str] = pub.AUTO_TOPIC) -> None:
self.logger.info(f"Connected to Meshtastic interface: {interface}")

async def send_message(self, text: str, recipient: str) -> None:
Expand All @@ -84,46 +97,31 @@ async def send_message(self, text: str, recipient: str) -> None:
self.logger.error(f"Error sending message to Meshtastic: {e}", exc_info=True)
self.pending_messages.append(PendingMessage(text, recipient))

async def send_node_info(self, node_info: Dict[str, Any]) -> None:
node_id = node_info.get('user', {}).get('id', 'unknown')
self.node_manager.update_node(node_id, {
'name': node_info.get('user', {}).get('longName', 'unknown'),
'shortName': node_info.get('user', {}).get('shortName', 'unknown'),
'hwModel': node_info.get('user', {}).get('hwModel', 'unknown')
})
await self.message_queue.put({'type': 'node_info', 'text': self.node_manager.format_node_info(node_id)})

async def send_bell(self, dest_id: str) -> None:
try:
await asyncio.to_thread(self.interface.sendText, "🔔", destinationId=dest_id)
self.logger.info(f"Bell (text message) sent to node {dest_id}")
except Exception as e:
self.logger.error(f"Error sending bell to node {dest_id}: {e}")
raise
async def send_message(self, text: str, recipient: str) -> None:
if not text or not recipient:
raise ValueError("Text and recipient must not be empty")
if len(text) > 230: # Meshtastic message size limit
raise ValueError("Message too long")

async def request_location(self, dest_id: str) -> None:
self.logger.info(f"Attempting to send message to Meshtastic: {text}")
try:
await asyncio.to_thread(self.interface.sendText, "Please share your location", destinationId=dest_id)
self.logger.info(f"Location request (text message) sent to node {dest_id}")
self.logger.debug(f"Sending message to Meshtastic with recipient: {recipient}")
result = await asyncio.to_thread(self.interface.sendText, text, destinationId=recipient)
self.logger.info(f"Message sent to Meshtastic: {text}")
self.logger.debug(f"Send result: {result}")
except Exception as e:
self.logger.error(f"Error requesting location from node {dest_id}: {e}")
raise
self.logger.error(f"Error sending message to Meshtastic: {e}", exc_info=True)
self.pending_messages.append(PendingMessage(text, recipient))

async def request_telemetry(self, dest_id: str) -> None:
try:
await asyncio.to_thread(self.interface.sendTelemetry)
self.logger.info(f"Telemetry request sent to node {dest_id}")
except Exception as e:
self.logger.error(f"Error requesting telemetry from node {dest_id}: {e}")
raise
async def send_bell(self, dest_id: str) -> None:
if not dest_id:
raise ValueError("Destination ID must not be empty")

async def traceroute(self, dest_id: str) -> None:
try:
self.logger.info(f"Initiating traceroute to {dest_id}")
await asyncio.to_thread(self.interface.sendText, f"!traceroute {dest_id}", destinationId=dest_id)
self.logger.info(f"Traceroute request sent to {dest_id}")
await asyncio.to_thread(self.interface.sendText, "🔔", destinationId=dest_id)
self.logger.info(f"Bell (text message) sent to node {dest_id}")
except Exception as e:
self.logger.error(f"Error performing traceroute to node {dest_id}: {e}")
self.logger.error(f"Error sending bell to node {dest_id}: {e}")
raise

async def process_pending_messages(self) -> None:
Expand Down Expand Up @@ -157,9 +155,11 @@ async def get_status(self) -> str:
return "Meshtastic interface not connected"
try:
node_info = await asyncio.to_thread(self.interface.getMyNodeInfo)
return f"Connected to node: {node_info.get('user', {}).get('longName', 'Unknown')}\n" \
f"Battery level: {node_info.get('deviceMetrics', {}).get('batteryLevel', 'Unknown')}\n" \
f"Channel utilization: {node_info.get('deviceMetrics', {}).get('channelUtilization', 'Unknown')}"
return (
f"Node: {node_info.get('user', {}).get('longName', 'N/A')}\n"
f"Battery: {node_info.get('deviceMetrics', {}).get('batteryLevel', 'N/A')}%\n"
f"Air Utilization TX: {node_info.get('deviceMetrics', {}).get('airUtilTx', 'N/A')}%"
)
except Exception as e:
return f"Error getting meshtastic status: {e}"

Expand Down
Loading

0 comments on commit f2bba9b

Please sign in to comment.