diff --git a/pyproject.toml b/pyproject.toml index 7eaa5ab..6ce0654 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,8 @@ dependencies = [ 'tenacity', "typing_extensions>=4.6; python_version<'3.11'", 'diskcache', - 'pytz' + 'pytz', + 'tzlocal' ] [tool.hatch.build.hooks.vcs] diff --git a/src/braingeneers/iot/device.py b/src/braingeneers/iot/device.py index 34b488e..1a06e0f 100644 --- a/src/braingeneers/iot/device.py +++ b/src/braingeneers/iot/device.py @@ -2,6 +2,7 @@ import json import os import re +import fnmatch import schedule import signal import sys @@ -170,6 +171,9 @@ def update_state(self, new_state): def generate_job_tag(self): return str(next(self.job_index)) + + def peek_jobs(self, lookahead_seconds = 10): + return any(job.next_run <= datetime.now() + timedelta(seconds=lookahead_seconds) for job in self.scheduler.jobs) def is_my_topic(self, topic): return self.device_name in topic.split('/') @@ -200,6 +204,123 @@ def generate_response_topic(self, response_cmnd_key = None, response_cmnd_value def get_curr_timestamp(self): return (datetime.now(tz=pytz.timezone('US/Pacific')).strftime('%Y-%m-%d-T%H%M%S-')) + def _unpack_string(self, unpack_item, match_items): + """ + Unpacks items from match_items based on a string or regex pattern. + Raises an error if the pattern is invalid or if no matches are found. + + Args: + unpack_item (str): A string or regex pattern to match against match_items. + match_items (list): A list of items to match the unpack_item against. + + Returns: + list: A sorted list of items from match_items that match the unpack_item. + """ + if not isinstance(unpack_item, str): + raise ValueError(f"Item {unpack_item} should be a string. Forfeiting entire command list.") + + try: + if unpack_item.startswith("^") and unpack_item.endswith("$"): + regex = re.compile(unpack_item) + filtered = [item for item in match_items if regex.match(item)] + else: + filtered = fnmatch.filter(match_items, unpack_item) + except re.error: + raise ValueError(f"Invalid regex {unpack_item}") + + if not filtered: + raise ValueError(f"Item {unpack_item} not found in match_items. Forfeiting entire command list.") + + return sorted(filtered) + + def _enqueue_messages_to_schedule(self, topic, message, unpack_field, unpack_items): + """ + Updates the message dictionary's specified field with each item from unpack_items, + printing the updated message each time. + + Args: + message (dict): The message to update. + unpack_field (str): The field within the message to update. + unpack_items (list): A list of new values to sequentially assign to unpack_field. + """ + print("ENQUEUEING!") + for unpack_item in unpack_items: + new_message = message.copy() + new_message[unpack_field] = unpack_item + #self.add_to_scheduler(new_message, param, time_unit, at_time)) + schedule_message = { + "COMMAND": "SCHEDULE-REQUEST", + "TYPE": "ADD", + "EVERY_X_SECONDS": "1", + "AT": ":01", + "FLAG": "ONCE", + "FROM": self.device_name, + "DO": json.dumps(new_message) + } + + print(schedule_message) + job_tag = self.generate_job_tag() + self.scheduler.every(1).second.do(self.run_once, topic, new_message, job_tag).tag=(job_tag) + + self.update_state(self.state) + return + + + def unpack(self, topic, message, unpack_field, match_items, sort_all=False, enqueue_schedule=True): + """ + Unpacks items based on patterns specified in a message field against a list of items. + Filters and optionally sorts matched items from match_items based on patterns found in message[unpack_field]. + Uses _stuff_messages to display how each matched item updates the message. + + Args: + message (dict): The dictionary containing the unpack_field. + unpack_field (str): Key in message whose value is a string or list of strings or regex patterns. + match_items (list): Items to match against the patterns. + sort_all (bool): If True, sorts the entire result list; otherwise, maintains order of first appearance. + + Returns: + boolean: True if anything has been unpacked into individual messages; False otherwise. + + Usage: + try: + unpackable, unpacked_list = self.unpack(message, "WELL_ID", self.wells, sort_all=True, enqueue_schedule=True) + if not unpackable: #It's a single literal value, execute command now + except ValueError as e: + print("Error:", e) + """ + unpack_items = message.get(unpack_field) + + #unpack items should not contain '?' + if isinstance(unpack_items, str) and unpack_items.find("?") == -1 and unpack_items in match_items: + return False, [] + + if not isinstance(unpack_items, (str, list)): + raise ValueError("unpack_items must be a string or a list of strings") + + filtered = [] + seen = set() + + if isinstance(unpack_items, str): + unpack_items = [unpack_items] # Treat single string as a list with one item + + for item in unpack_items: + current_matches = self._unpack_string(item, match_items) + for match in current_matches: + if match not in seen: + seen.add(match) + filtered.append(match) + + if sort_all: + filtered.sort() + + print(filtered) + + if enqueue_schedule: + self._enqueue_messages_to_schedule(topic, message, unpack_field, filtered) + + return True, filtered + + # def set_mqtt_publish_topic(self, topic = None): # # only change class variable, don't need to do change internal message broker settings # if topic is not None: diff --git a/src/braingeneers/iot/example_device.py b/src/braingeneers/iot/example_device.py index 50da1d8..915c322 100644 --- a/src/braingeneers/iot/example_device.py +++ b/src/braingeneers/iot/example_device.py @@ -1,24 +1,26 @@ from device import Device # Or use this instead, if calling this in a repo outside braingeneerspy: -# from braingeneers.iot.device import Device +# from braingeneers.iot import Device class ExampleDevice(Device): """ Example Device Class Demonstrates how to use and inherit the Device class for new application """ - def __init__(self, device_name, eggs = 0, ham = 0): + def __init__(self, device_name, eggs = 0, ham = 0, spam = 1): """Initialize the ExampleDevice class Args: device_name (str): name of the device ham (str): starting quantity of ham eggs (int): starting quantity of eggs + spam (int): starting quantity of spam :param device_specific_handlers: dictionary that maps the device's command keywords to a function call that handles an incomming command. """ self.eggs = eggs self.ham = ham + self.spam = spam super().__init__(device_name=device_name, device_type = "Other", primed_default=True) @@ -32,11 +34,12 @@ def __init__(self, device_name, eggs = 0, ham = 0): def device_state(self): """ Return the device state as a dictionary. This is used by the parent Device class to update the device shadow. - Child can add any additional device specific state to the dictionary i.e., "EGGS" and "HAM" + Child can add any additional device specific state to the dictionary i.e., "EGGS", "HAM", and "SPAM" """ return { **super().device_state, "EGGS": self.eggs, - "HAM": self.ham + "HAM": self.ham, + "SPAM": self.spam } def is_primed(self): @@ -60,16 +63,17 @@ def handle_add(self, topic, message): Args: topic (str): topic of the received message message (dict): message received by the device - ADD assumes that the message contains the keys "EGGS" and "HAM" and adds the values to the device's state. + ADD assumes that the message contains the keys "EGGS", "HAM", "SPAM", and adds the values to the device's state. """ try: self.eggs += message["EGGS"] self.ham += message["HAM"] + self.spam += message["SPAM"] self.update_state(self.state) # to update eggs and ham change ASAP in shadow except: self.mb.publish_message(topic= self.generate_response_topic("ADD", "ERROR"), message= { "COMMAND": "ADD-ERROR", - "ERROR": f"Missing argument for EGGS or HAM"}) + "ERROR": f"Missing argument for EGGS, HAM, or SPAM"}) return def handle_list(self, topic, message): @@ -78,10 +82,11 @@ def handle_list(self, topic, message): Args: topic (str): topic of the received message message (dict): message received by the device - LIST responds with a message containing the current values for "EGGS" and "HAM". + LIST responds with a message containing the current values for "EGGS", "HAM", and "SPAM". """ self.mb.publish_message(topic=self.generate_response_topic("LIST", "RESPONSE"), message= { "COMMAND": "LIST-RESPONSE", "EGGS": self.eggs, - "HAM" : self.ham }) + "HAM" : self.ham, + "SPAM" : self.spam }) return \ No newline at end of file diff --git a/src/braingeneers/iot/example_device_main.py b/src/braingeneers/iot/example_device_main.py index e565930..1b17253 100644 --- a/src/braingeneers/iot/example_device_main.py +++ b/src/braingeneers/iot/example_device_main.py @@ -8,10 +8,12 @@ parser.add_argument('--device_name', type=str, required=False, default="spam", help='Name of device (default: spam)') parser.add_argument('--eggs', type=int, required=False, default=0, help='Starting quantity of eggs (default: 0)') parser.add_argument('--ham', type=int, required=False, default=0, help='Starting quantity of ham (default: 0)') + parser.add_argument('--spam', type=int, required=False, default=1, help='Starting quantity of spam (default: 1)') + args = parser.parse_args() # Create a device object - device = ExampleDevice(device_name=args.device_name, eggs=args.eggs, ham=args.ham) + device = ExampleDevice(device_name=args.device_name, eggs=args.eggs, ham=args.ham, spam=args.spam) # Start the device activities, running in a loop # Control + C should gracefully stop execution