Skip to content

Commit

Permalink
Device branch (#83)
Browse files Browse the repository at this point in the history
* added device to iot

* adding device class and its package dependencies

* renamed file

* updated device code and added dependencies to configure.py

* added device unpack() to the device class to unzip shorthand list within a command into list of commands, supports literals, wildcards, and regular expressions

* Added peek_jobs() function to look ahead if there are any upcoming jobs in the next x seconds

* cleaned up comment

* updates required packages for device

---------

Co-authored-by: David Parks <[email protected]>
  • Loading branch information
kvoitiuk and davidparks21 authored Apr 25, 2024
1 parent 4e9173c commit d9b1e3a
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 10 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ dependencies = [
'tenacity',
"typing_extensions>=4.6; python_version<'3.11'",
'diskcache',
'pytz'
'pytz',
'tzlocal'
]

[tool.hatch.build.hooks.vcs]
Expand Down
121 changes: 121 additions & 0 deletions src/braingeneers/iot/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import re
import fnmatch
import schedule
import signal
import sys
Expand Down Expand Up @@ -170,6 +171,9 @@ def update_state(self, new_state):

def generate_job_tag(self):
return str(next(self.job_index))

def peek_jobs(self, lookahead_seconds = 10):
return any(job.next_run <= datetime.now() + timedelta(seconds=lookahead_seconds) for job in self.scheduler.jobs)

def is_my_topic(self, topic):
return self.device_name in topic.split('/')
Expand Down Expand Up @@ -200,6 +204,123 @@ def generate_response_topic(self, response_cmnd_key = None, response_cmnd_value
def get_curr_timestamp(self):
return (datetime.now(tz=pytz.timezone('US/Pacific')).strftime('%Y-%m-%d-T%H%M%S-'))

def _unpack_string(self, unpack_item, match_items):
"""
Unpacks items from match_items based on a string or regex pattern.
Raises an error if the pattern is invalid or if no matches are found.
Args:
unpack_item (str): A string or regex pattern to match against match_items.
match_items (list): A list of items to match the unpack_item against.
Returns:
list: A sorted list of items from match_items that match the unpack_item.
"""
if not isinstance(unpack_item, str):
raise ValueError(f"Item {unpack_item} should be a string. Forfeiting entire command list.")

try:
if unpack_item.startswith("^") and unpack_item.endswith("$"):
regex = re.compile(unpack_item)
filtered = [item for item in match_items if regex.match(item)]
else:
filtered = fnmatch.filter(match_items, unpack_item)
except re.error:
raise ValueError(f"Invalid regex {unpack_item}")

if not filtered:
raise ValueError(f"Item {unpack_item} not found in match_items. Forfeiting entire command list.")

return sorted(filtered)

def _enqueue_messages_to_schedule(self, topic, message, unpack_field, unpack_items):
"""
Updates the message dictionary's specified field with each item from unpack_items,
printing the updated message each time.
Args:
message (dict): The message to update.
unpack_field (str): The field within the message to update.
unpack_items (list): A list of new values to sequentially assign to unpack_field.
"""
print("ENQUEUEING!")
for unpack_item in unpack_items:
new_message = message.copy()
new_message[unpack_field] = unpack_item
#self.add_to_scheduler(new_message, param, time_unit, at_time))
schedule_message = {
"COMMAND": "SCHEDULE-REQUEST",
"TYPE": "ADD",
"EVERY_X_SECONDS": "1",
"AT": ":01",
"FLAG": "ONCE",
"FROM": self.device_name,
"DO": json.dumps(new_message)
}

print(schedule_message)
job_tag = self.generate_job_tag()
self.scheduler.every(1).second.do(self.run_once, topic, new_message, job_tag).tag=(job_tag)

self.update_state(self.state)
return


def unpack(self, topic, message, unpack_field, match_items, sort_all=False, enqueue_schedule=True):
"""
Unpacks items based on patterns specified in a message field against a list of items.
Filters and optionally sorts matched items from match_items based on patterns found in message[unpack_field].
Uses _stuff_messages to display how each matched item updates the message.
Args:
message (dict): The dictionary containing the unpack_field.
unpack_field (str): Key in message whose value is a string or list of strings or regex patterns.
match_items (list): Items to match against the patterns.
sort_all (bool): If True, sorts the entire result list; otherwise, maintains order of first appearance.
Returns:
boolean: True if anything has been unpacked into individual messages; False otherwise.
Usage:
try:
unpackable, unpacked_list = self.unpack(message, "WELL_ID", self.wells, sort_all=True, enqueue_schedule=True)
if not unpackable: #It's a single literal value, execute command now
except ValueError as e:
print("Error:", e)
"""
unpack_items = message.get(unpack_field)

#unpack items should not contain '?'
if isinstance(unpack_items, str) and unpack_items.find("?") == -1 and unpack_items in match_items:
return False, []

if not isinstance(unpack_items, (str, list)):
raise ValueError("unpack_items must be a string or a list of strings")

filtered = []
seen = set()

if isinstance(unpack_items, str):
unpack_items = [unpack_items] # Treat single string as a list with one item

for item in unpack_items:
current_matches = self._unpack_string(item, match_items)
for match in current_matches:
if match not in seen:
seen.add(match)
filtered.append(match)

if sort_all:
filtered.sort()

print(filtered)

if enqueue_schedule:
self._enqueue_messages_to_schedule(topic, message, unpack_field, filtered)

return True, filtered


# def set_mqtt_publish_topic(self, topic = None):
# # only change class variable, don't need to do change internal message broker settings
# if topic is not None:
Expand Down
21 changes: 13 additions & 8 deletions src/braingeneers/iot/example_device.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
from device import Device

# Or use this instead, if calling this in a repo outside braingeneerspy:
# from braingeneers.iot.device import Device
# from braingeneers.iot import Device

class ExampleDevice(Device):
""" Example Device Class
Demonstrates how to use and inherit the Device class for new application
"""
def __init__(self, device_name, eggs = 0, ham = 0):
def __init__(self, device_name, eggs = 0, ham = 0, spam = 1):
"""Initialize the ExampleDevice class
Args:
device_name (str): name of the device
ham (str): starting quantity of ham
eggs (int): starting quantity of eggs
spam (int): starting quantity of spam
:param device_specific_handlers: dictionary that maps the device's command keywords
to a function call that handles an incomming command.
"""
self.eggs = eggs
self.ham = ham
self.spam = spam

super().__init__(device_name=device_name, device_type = "Other", primed_default=True)

Expand All @@ -32,11 +34,12 @@ def __init__(self, device_name, eggs = 0, ham = 0):
def device_state(self):
"""
Return the device state as a dictionary. This is used by the parent Device class to update the device shadow.
Child can add any additional device specific state to the dictionary i.e., "EGGS" and "HAM"
Child can add any additional device specific state to the dictionary i.e., "EGGS", "HAM", and "SPAM"
"""
return { **super().device_state,
"EGGS": self.eggs,
"HAM": self.ham
"HAM": self.ham,
"SPAM": self.spam
}

def is_primed(self):
Expand All @@ -60,16 +63,17 @@ def handle_add(self, topic, message):
Args:
topic (str): topic of the received message
message (dict): message received by the device
ADD assumes that the message contains the keys "EGGS" and "HAM" and adds the values to the device's state.
ADD assumes that the message contains the keys "EGGS", "HAM", "SPAM", and adds the values to the device's state.
"""
try:
self.eggs += message["EGGS"]
self.ham += message["HAM"]
self.spam += message["SPAM"]
self.update_state(self.state) # to update eggs and ham change ASAP in shadow
except:
self.mb.publish_message(topic= self.generate_response_topic("ADD", "ERROR"),
message= { "COMMAND": "ADD-ERROR",
"ERROR": f"Missing argument for EGGS or HAM"})
"ERROR": f"Missing argument for EGGS, HAM, or SPAM"})
return

def handle_list(self, topic, message):
Expand All @@ -78,10 +82,11 @@ def handle_list(self, topic, message):
Args:
topic (str): topic of the received message
message (dict): message received by the device
LIST responds with a message containing the current values for "EGGS" and "HAM".
LIST responds with a message containing the current values for "EGGS", "HAM", and "SPAM".
"""
self.mb.publish_message(topic=self.generate_response_topic("LIST", "RESPONSE"),
message= { "COMMAND": "LIST-RESPONSE",
"EGGS": self.eggs,
"HAM" : self.ham })
"HAM" : self.ham,
"SPAM" : self.spam })
return
4 changes: 3 additions & 1 deletion src/braingeneers/iot/example_device_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
parser.add_argument('--device_name', type=str, required=False, default="spam", help='Name of device (default: spam)')
parser.add_argument('--eggs', type=int, required=False, default=0, help='Starting quantity of eggs (default: 0)')
parser.add_argument('--ham', type=int, required=False, default=0, help='Starting quantity of ham (default: 0)')
parser.add_argument('--spam', type=int, required=False, default=1, help='Starting quantity of spam (default: 1)')

args = parser.parse_args()

# Create a device object
device = ExampleDevice(device_name=args.device_name, eggs=args.eggs, ham=args.ham)
device = ExampleDevice(device_name=args.device_name, eggs=args.eggs, ham=args.ham, spam=args.spam)

# Start the device activities, running in a loop
# Control + C should gracefully stop execution
Expand Down

0 comments on commit d9b1e3a

Please sign in to comment.