diff --git a/.github/workflows/code-quality.yaml b/.github/workflows/code-quality.yaml index a74184f..cada250 100644 --- a/.github/workflows/code-quality.yaml +++ b/.github/workflows/code-quality.yaml @@ -19,15 +19,17 @@ jobs: - name: Install dependencies run: | python -m pip install -q -U pip - pip install -q -U flake8 pytest - if [ -f requirements.txt ]; then pip install -q -U -r requirements.txt; fi - - name: Lint with flake8 - continue-on-error: true - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=120 --statistics - - name: Test with pytest + pip install -q -U -r requirements.txt + pip install -q -U pytest pytest-asyncio coverage prospector[with_everything] + + - name: Run tests with coverage run: | - pytest \ No newline at end of file + coverage run -m pytest + coverage report -m + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: false + + - name: Run Prospector + run: prospector diff --git a/.gitignore b/.gitignore index a191a23..70d9861 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ wheels/ .installed.cfg *.egg +.coverage + # dotenv .envrc diff --git a/src/config_manager.py b/src/config_manager.py index 04a349c..d509057 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -23,7 +23,7 @@ def get_authorized_users(self) -> List[int]: def setup_logging(self) -> None: log_level = self._parse_log_level(self.get('logging.level', 'INFO')) - formatter = SensitiveFormatter('%(asctime)s %(levelname)s [%(name)s] %(message)s',) + formatter = SensitiveFormatter('%(asctime)s %(levelname)s [%(name)s] %(message)s') logging.basicConfig( level=log_level, @@ -54,9 +54,10 @@ def _parse_log_level(self, level: Any) -> int: def _setup_syslog_handler(self) -> None: try: - syslog_handler = logging.handlers.SysLogHandler( + from logging.handlers import SysLogHandler + syslog_handler = SysLogHandler( address=(self.get('logging.syslog_host'), self.get('logging.syslog_port', 514)), - socktype=logging.handlers.socket.SOCK_DGRAM if self.get('logging.syslog_protocol', 'udp') == 'udp' else logging.handlers.socket.SOCK_STREAM + socktype=SysLogHandler.UDP_SOCKET if self.get('logging.syslog_protocol', 'udp') == 'udp' else SysLogHandler.TCP_SOCKET ) syslog_handler.setFormatter(SensitiveFormatter('%(name)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(syslog_handler) @@ -70,9 +71,9 @@ def validate_config(self) -> None: 'meshtastic.connection_type', 'meshtastic.device', ] - for key in required_keys: - if not self.get(key): - raise ValueError(f"Missing required configuration: {key}") + missing_keys = [key for key in required_keys if not self.get(key)] + if missing_keys: + raise ValueError(f"Missing required configuration: {', '.join(missing_keys)}") class SensitiveFormatter(logging.Formatter): def __init__(self, fmt: Optional[str] = None, datefmt: Optional[str] = None): diff --git a/src/meshgram.py b/src/meshgram.py index 4631c9d..d8d96bb 100644 --- a/src/meshgram.py +++ b/src/meshgram.py @@ -1,6 +1,6 @@ import argparse import asyncio -from typing import Optional, List +from typing import Optional, Sequence from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface from message_processor import MessageProcessor @@ -14,7 +14,7 @@ def __init__(self, config: ConfigManager) -> None: self.meshtastic: Optional[MeshtasticInterface] = None self.telegram: Optional[TelegramInterface] = None self.message_processor: Optional[MessageProcessor] = None - self.tasks: List[Task] = [] + self.tasks: Sequence[Task] = () async def setup(self) -> None: self.logger.info("Setting up meshgram...") @@ -36,47 +36,29 @@ async def run(self) -> None: try: await self.setup() except Exception as e: - self.logger.error(f"Failed to set up Meshgram: {e}") + self.logger.error(f"Failed to set up Meshgram: {e}", exc_info=True) return - self.logger.info("Meshgram is running ใƒฝ(ยดโ–ฝ`)/") - self.tasks = [ + self.logger.info("Meshgram is running.") + self.tasks = ( asyncio.create_task(self.message_processor.process_messages()), asyncio.create_task(self.meshtastic.process_thread_safe_queue()), asyncio.create_task(self.meshtastic.process_pending_messages()), asyncio.create_task(self.telegram.start_polling()), - asyncio.create_task(self.message_processor.check_heartbeats()) - ] + ) try: await asyncio.gather(*self.tasks) except asyncio.CancelledError: self.logger.info("Received cancellation signal.") except Exception as e: - self.logger.error(f"An error occurred: {e}", exc_info=True) + self.handle_task_exceptions() finally: await self.shutdown() - async def shutdown(self) -> None: - self.logger.info("Shutting down meshgram...") - for task in self.tasks: - if not task.done(): - task.cancel() - await asyncio.gather(*self.tasks, return_exceptions=True) - if self.meshtastic: - await self.meshtastic.close() - if self.telegram: - await self.telegram.close() - if self.message_processor: - if hasattr(self.message_processor, 'close'): - await self.message_processor.close() - else: - self.logger.warning("MessageProcessor does not have a close method.") - self.logger.info("Meshgram shutdown complete.") - async def main() -> None: parser = argparse.ArgumentParser(description='Meshgram: Meshtastic-Telegram Bridge') parser.add_argument('-c', '--config', default='config/config.yaml', help='Path to configuration file') - parser.add_argument('--version', action='version', version='%(prog)s 1.0.0') + parser.add_argument('--version', action='version', version='%(prog)s 1.0.0') # FIXME, get git head revision somehow args = parser.parse_args() config = ConfigManager(args.config) @@ -86,10 +68,10 @@ async def main() -> None: app = Meshgram(config) try: await app.run() - except KeyboardInterrupt: - logger.info("Received keyboard interrupt. Shutting down gracefully...") - except Exception as e: - logger.error(f"Unhandled exception: {e}", exc_info=True) + except* ExceptionGroup as eg: + logger.error("Exception(s) occurred:") + for i, e in enumerate(eg.exceptions, 1): + logger.error(f"Exception {i}: {e}", exc_info=e) finally: await app.shutdown() @@ -97,4 +79,4 @@ async def main() -> None: try: asyncio.run(main()) except KeyboardInterrupt: - print("\nShutdown complete.") + print("\nShutdown complete.") \ No newline at end of file diff --git a/src/meshtastic_interface.py b/src/meshtastic_interface.py index cb87bfd..f6d5dab 100644 --- a/src/meshtastic_interface.py +++ b/src/meshtastic_interface.py @@ -22,14 +22,15 @@ def __init__(self, config: ConfigManager) -> None: self.config = config self.logger = get_logger(__name__) self.interface: Optional[Union[SerialInterface, TCPInterface]] = None - self.message_queue = asyncio.Queue() - self.thread_safe_queue = queue.Queue() + self.message_queue: asyncio.Queue = asyncio.Queue() + self.thread_safe_queue: queue.Queue = queue.Queue() self.loop = asyncio.get_event_loop() self.pending_messages: List[PendingMessage] = [] self.last_telemetry: Dict[str, Any] = {} - self.max_retries, self.retry_interval = 3, 60 + self.max_retries: int = 3 + self.retry_interval: int = 60 self.node_manager = NodeManager() - self.is_setup = False + self.is_setup: bool = False async def setup(self) -> None: self.logger.info("Setting up meshtastic interface...") @@ -65,12 +66,24 @@ async def _fetch_node_info(self) -> None: except Exception as e: self.logger.error(f"Failed to get node info: {e}") - def on_meshtastic_message(self, packet, interface): + def on_meshtastic_message(self, packet: Dict[str, Any], interface: Any) -> None: self.logger.info(f"Received message from Meshtastic: {packet}") self.logger.debug(f"Message details - fromId: {packet.get('fromId')}, toId: {packet.get('toId')}, portnum: {packet.get('decoded', {}).get('portnum')}") - self.thread_safe_queue.put(packet) - - def on_connection(self, interface, topic=pub.AUTO_TOPIC): + if packet.get('decoded', {}).get('portnum') == 'ROUTING_APP': + self.handle_ack(packet) + else: + self.thread_safe_queue.put(packet) + + def handle_ack(self, packet: Dict[str, Any]) -> None: + ack_data = { + 'type': 'ack', + 'from': packet.get('fromId'), + 'to': packet.get('toId'), + 'message_id': packet.get('id') + } + self.loop.call_soon_threadsafe(self.message_queue.put_nowait, ack_data) + + def on_connection(self, interface: Any, topic: Optional[str] = pub.AUTO_TOPIC) -> None: self.logger.info(f"Connected to Meshtastic interface: {interface}") async def send_message(self, text: str, recipient: str) -> None: @@ -84,46 +97,31 @@ async def send_message(self, text: str, recipient: str) -> None: self.logger.error(f"Error sending message to Meshtastic: {e}", exc_info=True) self.pending_messages.append(PendingMessage(text, recipient)) - async def send_node_info(self, node_info: Dict[str, Any]) -> None: - node_id = node_info.get('user', {}).get('id', 'unknown') - self.node_manager.update_node(node_id, { - 'name': node_info.get('user', {}).get('longName', 'unknown'), - 'shortName': node_info.get('user', {}).get('shortName', 'unknown'), - 'hwModel': node_info.get('user', {}).get('hwModel', 'unknown') - }) - await self.message_queue.put({'type': 'node_info', 'text': self.node_manager.format_node_info(node_id)}) - - async def send_bell(self, dest_id: str) -> None: - try: - await asyncio.to_thread(self.interface.sendText, "๐Ÿ””", destinationId=dest_id) - self.logger.info(f"Bell (text message) sent to node {dest_id}") - except Exception as e: - self.logger.error(f"Error sending bell to node {dest_id}: {e}") - raise + async def send_message(self, text: str, recipient: str) -> None: + if not text or not recipient: + raise ValueError("Text and recipient must not be empty") + if len(text) > 230: # Meshtastic message size limit + raise ValueError("Message too long") - async def request_location(self, dest_id: str) -> None: + self.logger.info(f"Attempting to send message to Meshtastic: {text}") try: - await asyncio.to_thread(self.interface.sendText, "Please share your location", destinationId=dest_id) - self.logger.info(f"Location request (text message) sent to node {dest_id}") + self.logger.debug(f"Sending message to Meshtastic with recipient: {recipient}") + result = await asyncio.to_thread(self.interface.sendText, text, destinationId=recipient) + self.logger.info(f"Message sent to Meshtastic: {text}") + self.logger.debug(f"Send result: {result}") except Exception as e: - self.logger.error(f"Error requesting location from node {dest_id}: {e}") - raise + self.logger.error(f"Error sending message to Meshtastic: {e}", exc_info=True) + self.pending_messages.append(PendingMessage(text, recipient)) - async def request_telemetry(self, dest_id: str) -> None: - try: - await asyncio.to_thread(self.interface.sendTelemetry) - self.logger.info(f"Telemetry request sent to node {dest_id}") - except Exception as e: - self.logger.error(f"Error requesting telemetry from node {dest_id}: {e}") - raise + async def send_bell(self, dest_id: str) -> None: + if not dest_id: + raise ValueError("Destination ID must not be empty") - async def traceroute(self, dest_id: str) -> None: try: - self.logger.info(f"Initiating traceroute to {dest_id}") - await asyncio.to_thread(self.interface.sendText, f"!traceroute {dest_id}", destinationId=dest_id) - self.logger.info(f"Traceroute request sent to {dest_id}") + await asyncio.to_thread(self.interface.sendText, "๐Ÿ””", destinationId=dest_id) + self.logger.info(f"Bell (text message) sent to node {dest_id}") except Exception as e: - self.logger.error(f"Error performing traceroute to node {dest_id}: {e}") + self.logger.error(f"Error sending bell to node {dest_id}: {e}") raise async def process_pending_messages(self) -> None: @@ -157,9 +155,11 @@ async def get_status(self) -> str: return "Meshtastic interface not connected" try: node_info = await asyncio.to_thread(self.interface.getMyNodeInfo) - return f"Connected to node: {node_info.get('user', {}).get('longName', 'Unknown')}\n" \ - f"Battery level: {node_info.get('deviceMetrics', {}).get('batteryLevel', 'Unknown')}\n" \ - f"Channel utilization: {node_info.get('deviceMetrics', {}).get('channelUtilization', 'Unknown')}" + return ( + f"Node: {node_info.get('user', {}).get('longName', 'N/A')}\n" + f"Battery: {node_info.get('deviceMetrics', {}).get('batteryLevel', 'N/A')}%\n" + f"Air Utilization TX: {node_info.get('deviceMetrics', {}).get('airUtilTx', 'N/A')}%" + ) except Exception as e: return f"Error getting meshtastic status: {e}" diff --git a/src/message_processor.py b/src/message_processor.py index 0397b36..21c57fd 100644 --- a/src/message_processor.py +++ b/src/message_processor.py @@ -1,5 +1,5 @@ import asyncio -from typing import Dict, Any +from typing import Dict, Any, List from datetime import datetime, timezone from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface @@ -13,97 +13,13 @@ def __init__(self, meshtastic: MeshtasticInterface, telegram: TelegramInterface, self.telegram = telegram self.node_manager = meshtastic.node_manager self.start_time = datetime.now(timezone.utc) - self.last_heartbeat = {} - self.heartbeat_timeout = config.get('meshtastic.heartbeat_timeout', 300) self.local_nodes = config.get('meshtastic.local_nodes', []) - self.pending_requests = {} # For tracking location, telemetry, and traceroute requests - - async def handle_meshtastic_message(self, packet: Dict[str, Any]) -> None: - self.logger.debug(f"Received Meshtastic message: {packet}") - if packet.get('fromId') not in self.local_nodes: - self.logger.info(f"Message from non-local node: {packet.get('fromId')}") - - try: - portnum = packet.get('decoded', {}).get('portnum') - handler = getattr(self, f"handle_{portnum.lower()}", None) - if handler: - self.logger.info(f"Handling Meshtastic message type: {portnum}") - await handler(packet) - else: - self.logger.warning(f"Unhandled Meshtastic message type: {portnum}") - except Exception as e: - self.logger.error(f'Error handling Meshtastic message: {e}', exc_info=True) - - async def handle_text_message_app(self, packet: Dict[str, Any]) -> None: - text = packet['decoded']['payload'].decode('utf-8') - sender, recipient = packet.get('fromId', 'unknown'), packet.get('toId', 'unknown') - - message = f"[Meshtastic:{sender}->{recipient}] {text}" - self.logger.info(f"Sending Meshtastic message to Telegram: {message}") - await self.telegram.send_message(message, disable_notification=False) - - async def handle_telegram_text(self, message: Dict[str, Any]) -> None: - self.logger.info(f"Handling Telegram text message: {message}") - sender = message['sender'][:10] - recipient = self.config.get('meshtastic.default_node_id', '^all') - text = message['text'] - - meshtastic_message = f"[TG:{sender}] {text}" - self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") - try: - await self.meshtastic.send_message(meshtastic_message, recipient) - self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") - except Exception as e: - self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) - await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") - - # Add this line to check if the message is being processed - self.logger.info("Finished handling Telegram text message") - - async def handle_position_app(self, packet: Dict[str, Any]) -> None: - position = packet['decoded'].get('position', {}) - sender = packet.get('fromId', 'unknown') - self.node_manager.update_node_position(sender, position) - position_info = self.node_manager.get_node_position(sender) - await self.update_or_send_message('location', sender, position_info) - - async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: - node_id = packet.get('fromId', 'unknown') - telemetry = packet.get('decoded', {}).get('telemetry', {}) - device_metrics = telemetry.get('deviceMetrics', {}) - self.node_manager.update_node_telemetry(node_id, device_metrics) - self.last_heartbeat[node_id] = datetime.now(timezone.utc) - telemetry_info = self.node_manager.get_node_telemetry(node_id) - await self.update_or_send_message('telemetry', node_id, telemetry_info) - - async def handle_admin_app(self, packet: Dict[str, Any]) -> None: - admin_message = packet.get('decoded', {}).get('admin', {}) - self.logger.info(f"Received admin message: {admin_message}") - if 'getRouteReply' in admin_message: - route = admin_message['getRouteReply'].get('route', []) - dest_id = packet.get('toId', 'unknown') - route_str = " -> ".join(map(str, route)) if route else "No route found" - traceroute_result = f"๐Ÿ” Traceroute to {dest_id}:\n{route_str}" - await self.update_or_send_message('traceroute', dest_id, traceroute_result) - elif 'getChannelResponse' in admin_message: - self.logger.info(f"Received channel response: {admin_message['getChannelResponse']}") - else: - self.logger.warning(f"Unhandled admin message: {admin_message}") - - async def update_or_send_message(self, request_type: str, node_id: str, content: str) -> None: - request_key = f"{request_type}:{node_id}" - if request_key in self.pending_requests: - await self.telegram.send_or_update_message(content, message_id=self.pending_requests[request_key]) - del self.pending_requests[request_key] - else: - await self.telegram.send_message(content, disable_notification=True) async def process_messages(self) -> None: tasks = [ self.process_meshtastic_messages(), self.process_telegram_messages(), - self.periodic_status_update(), - self.check_heartbeats() + self.periodic_status_update() ] await asyncio.gather(*tasks) @@ -125,55 +41,148 @@ async def process_meshtastic_messages(self) -> None: async def process_telegram_messages(self) -> None: while True: try: - self.logger.debug("Waiting for Telegram message...") message = await self.telegram.message_queue.get() self.logger.info(f"Processing Telegram message: {message}") - if message['type'] == 'command': - await self.handle_telegram_command(message) - elif message['type'] == 'telegram': - await self.handle_telegram_text(message) - elif message['type'] == 'location': - await self.handle_telegram_location(message) - else: - self.logger.warning(f"Received unknown message type: {message['type']}") - self.logger.debug("Finished processing Telegram message") + await self.handle_telegram_message(message) except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Error processing Telegram message: {e}", exc_info=True) await asyncio.sleep(0.1) - async def periodic_status_update(self) -> None: - while True: - try: - await asyncio.sleep(3600) - status = await self.get_status() - await self.telegram.send_message(status) - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error in periodic status update: {e}", exc_info=True) + async def handle_telegram_message(self, message: Dict[str, Any]) -> None: + # Refactored to use a dictionary for message type handling + handlers = { + 'command': self.handle_telegram_command, + 'telegram': self.handle_telegram_text, + 'location': self.handle_telegram_location + } + handler = handlers.get(message['type']) + if handler: + await handler(message) + else: + self.logger.warning(f"Received unknown message type: {message['type']}") - async def get_status(self) -> str: - uptime = datetime.now(timezone.utc) - self.start_time - meshtastic_status = await self.meshtastic.get_status() - num_nodes = len(self.node_manager.get_all_nodes()) - return f"Meshgram Status:\nUptime: {uptime}\nConnected Nodes: {num_nodes}\nMeshtastic Status:\n{meshtastic_status}" + async def handle_meshtastic_message(self, packet: Dict[str, Any]) -> None: + self.logger.debug(f"Received Meshtastic message: {packet}") + if packet.get('fromId') not in self.local_nodes: + self.logger.info(f"Message from non-local node: {packet.get('fromId')}") + + try: + portnum = packet.get('decoded', {}).get('portnum') + handler = getattr(self, f"handle_{portnum.lower()}", None) + if handler: + self.logger.info(f"Handling Meshtastic message type: {portnum}") + await handler(packet) + else: + self.logger.warning(f"Unhandled Meshtastic message type: {portnum}") + except Exception as e: + self.logger.error(f'Error handling Meshtastic message: {e}', exc_info=True) + + async def handle_text_message_app(self, packet: Dict[str, Any]) -> None: + text = packet['decoded']['payload'].decode('utf-8') + sender, recipient = packet.get('fromId', 'unknown'), packet.get('toId', 'unknown') + + message = f"๐Ÿ“ก Meshtastic: {sender} โ†’ {recipient}\n๐Ÿ’ฌ {text}" + self.logger.info(f"Sending Meshtastic message to Telegram: {message}") + await self.telegram.send_message(message, disable_notification=False) async def handle_nodeinfo_app(self, packet: Dict[str, Any]) -> None: - node_id = packet.get('from', 'unknown') + node_id = packet.get('fromId', 'unknown') node_info = packet['decoded'] self.node_manager.update_node(node_id, { 'shortName': node_info.get('user', {}).get('shortName', 'unknown'), 'longName': node_info.get('user', {}).get('longName', 'unknown'), 'hwModel': node_info.get('user', {}).get('hwModel', 'unknown') }) - await self.telegram.send_message(self.node_manager.format_node_info(node_id), disable_notification=True) + info_text = self.node_manager.format_node_info(node_id) + await self.telegram.send_or_edit_message('nodeinfo', node_id, info_text) + + async def handle_admin_app(self, packet: Dict[str, Any]) -> None: + admin_message = packet.get('decoded', {}).get('admin', {}) + if 'getRouteReply' in admin_message: + route = admin_message['getRouteReply'].get('route', []) + dest_id = packet.get('toId', 'unknown') + if route: + route_str = " โ†’ ".join(f"!{node:08x}" for node in route) + traceroute_result = f"๐Ÿ” Traceroute to {dest_id}:\n{route_str}" + else: + traceroute_result = f"๐Ÿ” Traceroute to {dest_id}: No route found" + await self.telegram.send_message(traceroute_result) + elif 'deviceMetrics' in admin_message: + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_telemetry(node_id, admin_message['deviceMetrics']) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) + elif 'position' in admin_message: + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_position(node_id, admin_message['position']) + position_info = self.node_manager.get_node_position(node_id) + await self.telegram.send_or_edit_message('location', node_id, position_info) + else: + self.logger.warning(f"Received unexpected admin message: {admin_message}") + + async def handle_routing_app(self, packet: Dict[str, Any]) -> None: + routing_info = packet.get('decoded', {}).get('routing', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_routing(node_id, routing_info) + routing_text = self.node_manager.format_node_routing(node_id) + await self.telegram.send_or_edit_message('routing', node_id, routing_text) + + async def handle_neighborinfo_app(self, packet: Dict[str, Any]) -> None: + neighbor_info = packet.get('decoded', {}).get('neighbors', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_neighbors(node_id, neighbor_info) + neighbor_text = self.node_manager.format_node_neighbors(node_id) + await self.telegram.send_or_edit_message('neighbors', node_id, neighbor_text) + + async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: + node_id = packet.get('fromId', 'unknown') + telemetry = packet.get('decoded', {}).get('telemetry', {}) + device_metrics = telemetry.get('deviceMetrics', {}) + self.node_manager.update_node_telemetry(node_id, device_metrics) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) + + async def handle_position_app(self, packet: Dict[str, Any]) -> None: + position = packet['decoded'].get('position', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_position(node_id, position) + position_info = self.node_manager.get_node_position(node_id) + await self.telegram.send_or_edit_message('location', node_id, position_info) + + # Send Telegram map + latitude = position.get('latitudeI', 0) / 1e7 + longitude = position.get('longitudeI', 0) / 1e7 + if latitude != 0 and longitude != 0: + await self.telegram.bot.send_location(chat_id=self.telegram.chat_id, latitude=latitude, longitude=longitude) + + async def handle_detection_sensor_app(self, packet: Dict[str, Any]) -> None: + sensor_data = packet.get('decoded', {}).get('detectionSensor', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_sensor(node_id, sensor_data) + sensor_info = self.node_manager.get_node_sensor_info(node_id) + await self.telegram.send_or_edit_message('sensor', node_id, sensor_info) + + async def handle_telegram_text(self, message: Dict[str, Any]) -> None: + self.logger.info(f"Handling Telegram text message: {message}") + sender = message['sender'][:10] + recipient = self.config.get('meshtastic.default_node_id') + text = message['text'] + + meshtastic_message = f"[TG:{sender}] {text}" + self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") + try: + await self.meshtastic.send_message(meshtastic_message, recipient) + self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") + except Exception as e: + self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) + await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") async def handle_telegram_command(self, message: Dict[str, Any]) -> None: try: command = message.get('command', '').partition('@')[0] - args, user_id = message.get('args', {}), message.get('user_id') + args, user_id = message.get('args', []), message.get('user_id') if not self.telegram.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: await self.telegram.send_message("You are not authorized to use this command.") @@ -188,81 +197,72 @@ async def handle_telegram_command(self, message: Dict[str, Any]) -> None: self.logger.error(f'Error handling Telegram command: {e}', exc_info=True) await self.telegram.send_message(f"Error executing command: {e}") - def validate_node_id(self, node_id: str) -> bool: - return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) - async def handle_telegram_location(self, message: Dict[str, Any]) -> None: lat, lon = message['location']['latitude'], message['location']['longitude'] + alt = message['location'].get('altitude', 0) sender = message['sender'] - await self.meshtastic.send_location(lat, lon, f"Location from telegram user {sender}") + try: + # Validate input + if not self.is_valid_coordinate(lat, lon, alt): + raise ValueError("Invalid coordinates") - async def cmd_status(self, args: Dict[str, Any], user_id: int) -> None: - nodes = self.node_manager.get_all_nodes() - total_nodes = len(nodes) - active_nodes = sum(1 for node in nodes.values() if 'last_updated' in node) - - status_text = f"๐Ÿ“Š Meshgram Status\n๐Ÿ”ข Total nodes: {total_nodes}\nโœ… Active nodes: {active_nodes}\n\n" - for node_id, node_info in nodes.items(): - status_text += (f"๐Ÿ”ท Node {node_id}:\n" - f"๐Ÿ“› Name: {node_info.get('name', 'Unknown')}\n" - f"๐Ÿ”‹ Battery: {node_info.get('batteryLevel', 'Unknown')}\n" - f"โฑ๏ธ Uptime: {node_info.get('uptimeSeconds', 'Unknown')} seconds\n" - f"๐Ÿ•’ Last updated: {node_info.get('last_updated', 'Unknown')}\n\n") - - await self.telegram.send_message(status_text) + recipient = self.config.get('meshtastic.default_node_id') + await self.meshtastic.send_message(f"[TG:{sender}]: location lat={lat:.6f}, lon={lon:.6f}, alt={alt:.1f}m", recipient) + await self.telegram.send_message(f"๐Ÿ“ Location sent to Meshtastic network: lat={lat:.6f}, lon={lon:.6f}, alt={alt:.1f}m") + except ValueError as e: + self.logger.error(f"Invalid location data: {e}") + await self.telegram.send_message(f"Failed to send location to Meshtastic. Invalid data: {e}") + except Exception as e: + self.logger.error(f"Failed to send location to Meshtastic: {e}", exc_info=True) + await self.telegram.send_message("Failed to send location to Meshtastic. Please try again.") - async def cmd_node(self, args: Dict[str, Any], user_id: int) -> None: - node_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - if not node_id: + def is_valid_coordinate(self, lat: float, lon: float, alt: float) -> bool: + return -90 <= lat <= 90 and -180 <= lon <= 180 and -1000 <= alt <= 50000 + + async def periodic_status_update(self) -> None: + while True: + try: + await asyncio.sleep(3600) # Update status every hour + status = await self.get_status() + await self.telegram.send_message(status) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Error in periodic status update: {e}", exc_info=True) + + async def get_status(self) -> str: + uptime = datetime.now(timezone.utc) - self.start_time + meshtastic_status = await self.meshtastic.get_status() + num_nodes = len(self.node_manager.get_all_nodes()) + return (f"๐Ÿ“Š Meshgram Status:\n" + f"โฑ๏ธ Uptime: {uptime}\n" + f"๐Ÿ”ข Connected Nodes: {num_nodes}\n" + f"๐Ÿ“ก Meshtastic Status:\n{meshtastic_status}") + + async def cmd_status(self, args: List[str], user_id: int) -> None: + status = await self.get_status() + await self.telegram.send_message(status) + + async def cmd_bell(self, args: List[str], user_id: int) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: await self.telegram.send_message("No node ID provided and no default node ID set.") return - node_info = self.node_manager.format_node_info(node_id) - await self.telegram.send_message(node_info) - - async def cmd_bell(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') self.logger.info(f"Sending bell to node {dest_id}") try: await self.meshtastic.send_bell(dest_id) - self.logger.info(f"Bell sent successfully to node {dest_id}") await self.telegram.send_message(f"๐Ÿ”” Bell sent to node {dest_id}.", disable_notification=True) except Exception as e: self.logger.error(f"Failed to send bell to node {dest_id}: {e}", exc_info=True) await self.telegram.send_message(f"Failed to send bell to node {dest_id}. Error: {str(e)}") - async def cmd_location(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - await self.meshtastic.request_location(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“ Location request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"location:{dest_id}"] = message.message_id - - async def cmd_telemetry(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - await self.meshtastic.request_telemetry(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“Š Telemetry request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"telemetry:{dest_id}"] = message.message_id - - async def cmd_traceroute(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - message = await self.telegram.send_message(f"๐Ÿ” Initiating traceroute to {dest_id}...") - if message: - self.pending_requests[f"traceroute:{dest_id}"] = message.message_id - await self.meshtastic.traceroute(dest_id) - - async def check_heartbeats(self) -> None: - while True: - try: - now = datetime.now(timezone.utc) - for node_id, last_heartbeat in list(self.last_heartbeat.items()): - if (now - last_heartbeat).total_seconds() > self.heartbeat_timeout: - del self.last_heartbeat[node_id] - await self.telegram.send_message(f"โš ๏ธ Node {node_id} is no longer active.") - await asyncio.sleep(60) - except Exception as e: - self.logger.error(f"Error in check_heartbeats: {e}", exc_info=True) - await asyncio.sleep(60) + async def cmd_node(self, args: List[str], user_id: int) -> None: + node_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not node_id: + await self.telegram.send_message("No node ID provided and no default node ID set.") + return + node_info = self.node_manager.format_node_info(node_id) + await self.telegram.send_message(node_info) async def close(self) -> None: self.logger.info("Closing MessageProcessor...") diff --git a/src/node_manager.py b/src/node_manager.py index 1ddcbe3..5a76baa 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -5,7 +5,7 @@ class NodeManager: def __init__(self): self.nodes: Dict[str, Dict[str, Any]] = {} self.node_history: Dict[str, List[Dict[str, Any]]] = {} - self.history_limit = 100 + self.history_limit: int = 100 def update_node(self, node_id: str, data: Dict[str, Any]) -> None: if node_id not in self.nodes: @@ -25,46 +25,40 @@ def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: def get_all_nodes(self) -> Dict[str, Dict[str, Any]]: return self.nodes - def get_node_history(self, node_id: str) -> List[Dict[str, Any]]: - return self.node_history.get(node_id, []) - - def format_node_info(self, node_id: str) -> str: - node = self.get_node(node_id) - if not node: - return f"โ„น๏ธ No information available for node {node_id}" - info = [f"๐Ÿ”ท Discovered Node ID: {node_id}"] - for key, value in node.items(): - if key != 'last_updated': - emoji = { - 'name': '๐Ÿ“›', 'shortName': '๐Ÿท๏ธ', 'hwModel': '๐Ÿ–ฅ๏ธ', - 'batteryLevel': '๐Ÿ”‹', 'voltage': 'โšก', 'channelUtilization': '๐Ÿ“Š', - 'airUtilTx': '๐Ÿ“ก', 'temperature': '๐ŸŒก๏ธ', 'relativeHumidity': '๐Ÿ’ง', - 'barometricPressure': '๐ŸŒช๏ธ', 'gasResistance': '๐Ÿ’จ', 'current': 'โšก' - }.get(key, '๐Ÿ”น') - info.append(f"{emoji} {key.capitalize()}: {value}") - info.append(f"๐Ÿ•’ Last updated: {node.get('last_updated', 'Unknown')}") - return "\n".join(info) - def get_node_telemetry(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"No telemetry available for node {node_id}" - air_util_tx = node.get('airUtilTx', 'Unknown') + return f"๐Ÿ“Š No telemetry available for node {node_id}" + + battery_level = node.get('batteryLevel', 'N/A') + air_util_tx = node.get('airUtilTx', 'N/A') air_util_tx_str = f"{air_util_tx:.2f}%" if isinstance(air_util_tx, (int, float)) else air_util_tx - return (f"๐Ÿ“Š Telemetry for node {node_id}:\n" - f"โ€ข Battery: {node.get('batteryLevel', 'Unknown')}%\n" - f"โ€ข Air Utilization TX: {air_util_tx_str}\n" - f"โ€ข Uptime: {node.get('uptimeSeconds', 'Unknown')} seconds\n" - f"โ€ข Last updated: {node.get('last_updated', 'Unknown')}") + uptime = node.get('uptimeSeconds', 'N/A') + last_updated = node.get('last_updated', 'N/A') + + return ( + f"๐Ÿ“Š Telemetry for node {node_id}:\n" + f"๐Ÿ”‹ Battery: {battery_level}\n" + f"๐Ÿ“ก Air Utilization TX: {air_util_tx_str}\n" + f"โฑ๏ธ Uptime: {uptime} seconds\n" + f"๐Ÿ•’ Last updated: {self.format_date(last_updated) if last_updated != 'N/A' else 'N/A'}" + ) def get_node_position(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"No position available for node {node_id}" - return (f"๐Ÿ“ Position for node {node_id}:\n" - f"โ€ข Latitude: {node.get('latitude', 'Unknown')}\n" - f"โ€ข Longitude: {node.get('longitude', 'Unknown')}\n" - f"โ€ข Last updated: {node.get('last_position_update', 'Unknown')}") + return f"๐Ÿ“ No position available for node {node_id}" + + latitude = node.get('latitude', 'N/A') + longitude = node.get('longitude', 'N/A') + last_position_update = node.get('last_position_update', 'N/A') + + return ( + f"๐Ÿ“ Position for node {node_id}:\n" + f"๐ŸŒŽ Latitude: {latitude}\n" + f"๐ŸŒ Longitude: {longitude}\n" + f"๐Ÿ•’ Last updated: {self.format_date(last_position_update) if last_position_update != 'N/A' else 'N/A'}" + ) def validate_node_id(self, node_id: str) -> bool: return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) @@ -90,6 +84,44 @@ def get_inactive_nodes(self, timeout: int = 300) -> List[str]: (now - datetime.fromisoformat(node['last_updated'])) > timedelta(seconds=timeout) ] + def format_node_info(self, node_id: str) -> str: + node = self.get_node(node_id) + if not node: + return f"โ„น๏ธ No information available for node {node_id}" + + info = [f"๐Ÿ”ท Node {node_id}:"] + emoji_map = { + 'name': '๐Ÿ“›', 'shortName': '๐Ÿท๏ธ', 'hwModel': '๐Ÿ–ฅ๏ธ', + 'batteryLevel': '๐Ÿ”‹', 'voltage': 'โšก', 'channelUtilization': '๐Ÿ“Š', + 'airUtilTx': '๐Ÿ“ก', 'temperature': '๐ŸŒก๏ธ', 'relativeHumidity': '๐Ÿ’ง', + 'barometricPressure': '๐ŸŒช๏ธ', 'gasResistance': '๐Ÿ’จ', 'current': 'โšก', + 'last_updated': '๐Ÿ•’' + } + + for key, value in node.items(): + if key == 'last_updated': + value = self.format_date(value) + emoji = emoji_map.get(key, '๐Ÿ”น') + info.append(f"{emoji} {key.capitalize()}: {value}") + + return "\n".join(info) + + def format_date(self, date_str: str) -> str: + try: + date = datetime.fromisoformat(date_str) + now = datetime.now(date.tzinfo) + delta = now - date + if delta < timedelta(minutes=1): + return "just now" + elif delta < timedelta(hours=1): + return f"{delta.seconds // 60} minutes ago" + elif delta < timedelta(days=1): + return f"{delta.seconds // 3600} hours ago" + else: + return f"{delta.days} days ago" + except ValueError: + return "Unknown" + def remove_node(self, node_id: str) -> None: self.nodes.pop(node_id, None) self.node_history.pop(node_id, None) \ No newline at end of file diff --git a/src/telegram_interface.py b/src/telegram_interface.py index 169796a..d581097 100644 --- a/src/telegram_interface.py +++ b/src/telegram_interface.py @@ -1,39 +1,27 @@ import asyncio -from typing import Dict, Any, Optional, List -from telegram import Bot, Update, BotCommand, Message, InlineKeyboardButton, InlineKeyboardMarkup +from typing import Dict, Any, Optional, Callable +from telegram import Bot, Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from config_manager import ConfigManager, get_logger -import time - -class RateLimiter: - def __init__(self, max_calls: int, period: float): - self.max_calls, self.period, self.calls = max_calls, period, [] - - async def wait(self): - now = time.time() - self.calls = [call for call in self.calls if now - call < self.period] - if len(self.calls) >= self.max_calls: - await asyncio.sleep(self.period - (now - self.calls[0])) - self.calls.append(time.time()) class TelegramInterface: def __init__(self, config: ConfigManager) -> None: - self.config, self.logger = config, get_logger(__name__) - self.bot, self.application = None, None - self.message_queue, self._stop_event = asyncio.Queue(), asyncio.Event() - self.chat_id, self.last_node_messages = None, {} - self.commands = { + self.config = config + self.logger = get_logger(__name__) + self.bot: Optional[Bot] = None + self.application: Optional[Application] = None + self.message_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self._stop_event: asyncio.Event = asyncio.Event() + self.chat_id: Optional[int] = None + self.last_messages: Dict[str, int] = {} + self.commands: Dict[str, Dict[str, str | Callable]] = { 'start': {'description': 'Start the bot and see available commands', 'handler': self.start_command}, 'help': {'description': 'Show help message', 'handler': self.help_command}, 'status': {'description': 'Check the current status', 'handler': self.handle_command}, 'bell': {'description': 'Send a bell to the meshtastic chat group', 'handler': self.handle_command}, - 'location': {'description': 'Request location from meshtastic side', 'handler': self.handle_command}, - 'telemetry': {'description': 'Request telemetry or display last received value', 'handler': self.handle_command}, - 'traceroute': {'description': 'Trace route to a specific node', 'handler': self.handle_command}, 'node': {'description': 'Get information about a specific node', 'handler': self.handle_command}, 'user': {'description': 'Get information about your Telegram user', 'handler': self.user_command}, } - self.rate_limiter = RateLimiter(max_calls=30, period=60) async def setup(self) -> None: self.logger.info("Setting up telegram interface...") @@ -42,12 +30,12 @@ async def setup(self) -> None: if not token: raise ValueError("Telegram bot token not found in configuration") self.bot = Bot(token=token) - self.application = Application.builder().token(token=token).build() + self.application = Application.builder().token(token).build() self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.on_telegram_message)) self.application.add_handler(MessageHandler(filters.LOCATION, self.on_telegram_location)) for command, data in self.commands.items(): self.application.add_handler(CommandHandler(command, data['handler'])) - await self.bot.set_my_commands([BotCommand(command, data['description']) for command, data in self.commands.items()]) + await self.bot.set_my_commands([(cmd, data['description']) for cmd, data in self.commands.items()]) self.chat_id = self.config.get('telegram.chat_id') if not self.chat_id: raise ValueError("Telegram chat id not found in configuration") @@ -62,172 +50,134 @@ async def start_polling(self) -> None: return self.logger.info("Starting telegram polling...") - await self.application.initialize() - await self.application.start() - - retry_delay = 1 - while not self._stop_event.is_set(): - try: - await self.application.updater.start_polling(drop_pending_updates=True) - self.logger.info("Telegram polling started") - await self._stop_event.wait() - except NetworkError as e: - self.logger.error(f"Network error occurred: {e}. Retrying in {retry_delay} seconds...") - await asyncio.sleep(retry_delay) - retry_delay = min(retry_delay * 2, 60) # Exponential backoff, max 60 seconds - except Exception as e: - self.logger.error(f"Unexpected error in Telegram polling: {e}") - break - - await self._shutdown_polling() + try: + await self.application.initialize() + await self.application.start() + await self.application.updater.start_polling(drop_pending_updates=True) + await self._stop_event.wait() + except Exception as e: + self.logger.error(f"Error in Telegram polling: {e}", exc_info=True) + finally: + await self._shutdown_polling() async def _shutdown_polling(self) -> None: self.logger.info("Stopping telegram polling...") - try: - if self.application.updater.running: - await self.application.updater.stop() - await self.application.stop() - await self.application.shutdown() - except RuntimeError as e: - self.logger.warning(f"RuntimeError during shutdown: {e}") - except Exception as e: - self.logger.error(f"Error during shutdown: {e}") + if self.application: + try: + await self.application.stop() + await self.application.shutdown() + except Exception as e: + self.logger.error(f"Error during Telegram shutdown: {e}", exc_info=True) self.logger.info("Telegram polling stopped") async def on_telegram_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - self.logger.debug(f"Received message from Telegram: {update.message.text}") - try: - await self.message_queue.put({ - 'text': update.message.text, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id, - 'user_id': update.effective_user.id - }) - self.logger.info(f"Received message from Telegram: {update.message.text}") - await update.message.reply_text("Message received and will be sent to Meshtastic.") - await self.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") - except Exception as e: - self.logger.error(f'Error handling Telegram message: {e}') - await update.message.reply_text("An error occurred while processing your message. Please try again.") + await self.message_queue.put({ + 'text': update.message.text, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'telegram', + 'message_id': update.message.message_id, + 'user_id': update.effective_user.id + }) + + async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await self.message_queue.put({ + 'location': { + 'latitude': update.message.location.latitude, + 'longitude': update.message.location.longitude + }, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'location', + 'message_id': update.message.message_id + }) + + async def send_or_edit_message(self, message_type: str, node_id: str, content: str) -> None: + message_key = f"{message_type}:{node_id}" + if message_key in self.last_messages: + await self.edit_message(self.last_messages[message_key], content) + else: + message_id = await self.send_message(content) + if message_id: + self.last_messages[message_key] = message_id - async def send_message(self, text: str, disable_notification: bool = False, pin_message: bool = False) -> Optional[Message]: - self.logger.debug(f"Attempting to send message to Telegram: {text}") + async def send_message(self, text: str, disable_notification: bool = False) -> Optional[int]: try: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification, disable_web_page_preview=True) - self.logger.info(f"Sent message to Telegram: {text}") - if pin_message: - await self.bot.pin_chat_message(chat_id=self.chat_id, message_id=message.message_id) - self.logger.info("Pinned message to Telegram") - return message + message = await self.bot.send_message( + chat_id=self.chat_id, + text=text, + disable_notification=disable_notification, + disable_web_page_preview=True + ) + return message.message_id except Exception as e: self.logger.error(f"Failed to send Telegram message: {e}") return None - async def send_or_update_message(self, text: str, message_id: Optional[int] = None, disable_notification: bool = False) -> None: - await self.rate_limiter.wait() - try: - if message_id: - await self.bot.edit_message_text(chat_id=self.chat_id, message_id=message_id, text=text) - else: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification) - return message.message_id - except Exception as e: - self.logger.error(f"Failed to send or update telegram message: {e}") - - async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def edit_message(self, message_id: int, text: str) -> bool: try: - await self.message_queue.put({ - 'location': { - 'latitude': update.message.location.latitude, - 'longitude': update.message.location.longitude - }, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id - }) - await update.message.reply_text("Location received and will be sent to meshtastic.") + await self.bot.edit_message_text( + chat_id=self.chat_id, + message_id=message_id, + text=text + ) + return True except Exception as e: - self.logger.error(f'Error handling telegram location: {e}') - await update.message.reply_text("An error occurred while processing your location. Please try again.") + self.logger.error(f"Failed to edit Telegram message: {e}") + return False - async def send_or_update_node_info(self, node_id: str, info_text: str) -> None: + async def add_reaction(self, message_id: int, emoji: str) -> None: try: - if node_id in self.last_node_messages: - await self.send_or_update_message(info_text, message_id=self.last_node_messages[node_id]) - else: - message_id = await self.send_or_update_message(info_text, disable_notification=True) - self.last_node_messages[node_id] = message_id - self.logger.info(f"Updated node info for {node_id}") + keyboard = [[InlineKeyboardButton(emoji, callback_data=f"reaction:{emoji}")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await self.bot.edit_message_reply_markup( + chat_id=self.chat_id, + message_id=message_id, + reply_markup=reply_markup + ) except Exception as e: - self.logger.error(f"Failed to update node info for {node_id}: {e}") + self.logger.error(f"Failed to add reaction to Telegram message: {e}") - def generate_help_text(self) -> str: - help_text = "๐Ÿš€ Welcome to Meshgram! Here are the available commands:\n\n" - for command, data in self.commands.items(): - help_text += f"/{command} - {data['description']}\n" - - help_text += "\n๐Ÿ” Advanced Usage:\n" - help_text += "Some commands can target specific nodes by adding a node ID:\n" - help_text += "โ€ข /location [node_id] - ๐Ÿ“ Request location (e.g., /location !abc123)\n" - help_text += "โ€ข /telemetry [node_id] - ๐Ÿ“Š Request telemetry data\n" - help_text += "โ€ข /node [node_id] - โ„น๏ธ Get node information\n" - help_text += "\nIf no node ID is provided, the default node will be used." - - return help_text + def is_user_authorized(self, user_id: int) -> bool: + authorized_users = self.config.get_authorized_users() + return not authorized_users or user_id in authorized_users async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - try: - await update.message.reply_text(self.generate_help_text()) - except Exception as e: - self.logger.error(f"Error in start command: {e}") - await update.message.reply_text("An error occurred. Please try again.") + await self.help_command(update, context) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await self.start_command(update, context) - - def is_user_authorized(self, user_id: int) -> bool: - authorized_users = self.config.get_authorized_users() - return not authorized_users or user_id in authorized_users + help_text = "๐Ÿ“š Available commands:\n\n" + help_text += "\n".join(f"/{command} - {data['description']}" for command, data in self.commands.items()) + await update.message.reply_text(help_text) async def user_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user - user_info = (f"๐Ÿ‘ค User Information:\n" - f"๐Ÿ†” ID: {user.id}\n" - f"๐Ÿ“› Name: {user.full_name}\n" - f"๐Ÿท๏ธ Username: @{user.username}\n" - f"๐Ÿค– Is Bot: {'Yes' if user.is_bot else 'No'}") + user_info = ( + f"๐Ÿ‘ค User Information:\n" + f"๐Ÿ†” ID: {user.id}\n" + f"๐Ÿ‘ค Username: @{user.username}\n" + f"๐Ÿ“› Name: {user.full_name}\n" + f"๐Ÿค– Is Bot: {'Yes' if user.is_bot else 'No'}" + ) await update.message.reply_text(user_info) async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: command = update.message.text.split()[0][1:].partition('@')[0] + args = context.args user_id = update.effective_user.id - if command not in ['start', 'help', 'user', 'status'] and not self.is_user_authorized(user_id): + if not self.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: await update.message.reply_text("You are not authorized to use this command.") return - try: - args = {'node_id': context.args[0]} if context.args else {} - await self.message_queue.put({ - 'type': 'command', - 'command': command, - 'args': args, - 'user_id': user_id - }) - - if command in ['location', 'telemetry']: - node_id = args.get('node_id', 'default node') - await update.message.reply_text(f"Sending {command} request to {node_id}...", disable_notification=True) - except Exception as e: - self.logger.error(f"Error in {command} command: {e}") - await update.message.reply_text(f"An error occurred while processing the {command} command. Please try again.") + await self.message_queue.put({ + 'type': 'command', + 'command': command, + 'args': args, + 'user_id': user_id + }) async def close(self) -> None: self.logger.info("Stopping telegram interface...") self._stop_event.set() - if self.application and self.application.updater.running: - await self.application.updater.stop() if self.application: await self.application.stop() await self.application.shutdown() diff --git a/tests/test_meshtastic_interface.py b/tests/test_meshtastic_interface.py index 6ee4f7b..ccc8294 100644 --- a/tests/test_meshtastic_interface.py +++ b/tests/test_meshtastic_interface.py @@ -1,2 +1,34 @@ -# tests/test_meshtastic_interface.py import pytest +from unittest.mock import AsyncMock, MagicMock +from src.meshtastic_interface import MeshtasticInterface +from src.config_manager import ConfigManager + +@pytest.fixture +def mock_config(): + config = MagicMock(spec=ConfigManager) + config.get.return_value = 'serial' + return config + +@pytest.mark.asyncio +async def test_meshtastic_interface_setup(mock_config): + interface = MeshtasticInterface(mock_config) + interface._create_interface = AsyncMock() + interface._fetch_node_info = AsyncMock() + + await interface.setup() + + assert interface.is_setup == True + interface._create_interface.assert_called_once() + interface._fetch_node_info.assert_called_once() + +@pytest.mark.asyncio +async def test_meshtastic_interface_send_message(mock_config): + interface = MeshtasticInterface(mock_config) + interface.interface = MagicMock() + interface.interface.sendText = AsyncMock() + + await interface.send_message("Test message", "recipient") + + interface.interface.sendText.assert_called_once_with("Test message", destinationId="recipient") + +# Add more tests for other methods... \ No newline at end of file