diff --git a/README.md b/README.md index ba5d61a..604f807 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,43 @@ Installation ------------ You can get the code from `https://github.com/thecynic/pylutron` -Example -------- +API Example +----------- import pylutron rra2 = pylutron.Lutron("192.168.0.x", "lutron", "integration") rra2.load_xml_db() rra2.connect() +CLI Example +----------- + +Interactive CLI: + + ./lutron_cli.py -c mycontroller -u myuser -p mypassword + pylutron> help + pylutron> list + pylutron> light my_light on 75 + pylutron> list keypads -f + pylutron> press my_k m + +Non-interactive Scripting: + + ./lutron_cli.py -c mycontroller -u myuser -p mypassword light my_light on 75 + +All commands take a filter, which is a case insensitive regex filter. Without +wildcards, the filter matches at the start of the string. For example, + + pylutron> list areas a + +Will match all areas that start with 'a'. To match all areas with an 'a', +the regex will look like: + + pylutron> list areas .*a.* + +Most commands have a -h to get per command help. Most of the list commands +have a -f to see more details. This is especially helpful with Keypads +to see which Buttons they contain. License ------- diff --git a/lutron_cli.py b/lutron_cli.py new file mode 100755 index 0000000..2a75409 --- /dev/null +++ b/lutron_cli.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +import pylutron +import cmd2 +import sys +import time +import argparse +import re +import logging +import time + +# Change these to match the site +default_controller = "192.168.111.123" +default_user = "ipad" +default_password = "ipad" + +# create the top-level parser for the alternate command +# The alternate command doesn't provide its own help flag +list_parser = argparse.ArgumentParser() +list_subparsers = list_parser.add_subparsers(title='subcommands') +light_parser = argparse.ArgumentParser() +button_parser = argparse.ArgumentParser() + +class lutron(cmd2.Cmd): + def __init__(self, controller, user, password): + cmd2.Cmd.__init__(self) + self.prompt = "pylutron> " + self.self_in_py = True + self.default_category = "Python cmd2 built-ins" + +# self._l = pylutron.Lutron(controller, user, password, pylutron.Processor.QS) +# self._db = self._l.load_xml_db("/tmp/cached_xml_db") + self._l = pylutron.Lutron(controller, user, password) + self._db = self._l.load_xml_db() + + self._l.connect() + + # table for list commands. allows listing of areas, keypads, switches, lights, and fans. + # default lists only the name, optional --full argument lists all attributes + # keypads can take optional --button argument to list the buttons associated with the keypad + # all devices can take an optional regex filter + devices = [{"cmd":"areas", "func":self._list_areas}, + {"cmd":"keypads", "func":self._list_keypads, "args":('-b', '--button')}, + {"cmd":"switches", "func":self._list_switches}, + {"cmd":"lights", "func":self._list_lights}, + {"cmd":"fans", "func":self._list_fans} ] + for d in devices: + subparser = list_subparsers.add_parser(d["cmd"]) + subparser.set_defaults(func=d["func"]) + subparser.add_argument('filter', nargs='?') + subparser.add_argument('-f', '--full', action='store_true') + + if "args" in d: + subparser.add_argument(d['args'][0], d['args'][1], nargs='?', const='.*', default=None) + + light_parser.add_argument('filter', nargs='?', const='.*', default=None) + lsp = light_parser.add_subparsers() + sp = lsp.add_parser("on") + sp.set_defaults(func=self._lights_on) + sp.add_argument('level', nargs='?', default="100") + + sp = lsp.add_parser("off") + sp.set_defaults(func=self._lights_off) + + button_parser.add_argument('keypad') + button_parser.add_argument('button') + + + @cmd2.with_category("PyLutron") + @cmd2.with_argparser(list_parser) + def do_list(self, args): + """List all the devices in the controller""" + + if filter in args and args.filter: + try: + re.compile(args.filter) + except: + self.poutput("Bad regular expression for match filter \"%s\". Try again." % args.filter) + return + + if 'func' in args: + self.poutput("\n".join([s for (d, s) in args.func(args)])) + return + + self.poutput("\n".join([str(a) for a in self._l.areas])) + + @cmd2.with_category("PyLutron") + @cmd2.with_argparser(light_parser) + def do_lights(self, args): + """Turn a light on/off""" + + # find the lights requested by the filter + args.full=False + lights = self._list_lights(args) + + # make sure there's at least one + if not lights: + self.poutput("No lights matching \"%s\" found" % args.filter) + return + + # turn them on/off + if 'func' in args: + for (light, name) in lights: + args.func(light, args) + return + + # if there's on on/off requested, print the light with it's level + for (light, name) in lights: + self.poutput("name: {}, level: {}".format(name, light.level)) + + def _lights_on(self, light, args): + """Turn a light on to a specific level""" + light.level = float(args.level) + + def _lights_off(self, light, args): + """Turn a light off, i.e. level = 0""" + light.level = 0.0 + + @cmd2.with_category("PyLutron") + @cmd2.with_argparser(button_parser) + def do_press(self, args): + """Press a button""" + + # find the buttons requested by the filter + args.full=False + args.filter=args.keypad + keypads = self._list_keypads(args) + + # make sure there's at least one + if not keypads: + self.poutput("No keypad matching \"%s\" found" % args.filter) + return + + for (kp, name) in keypads: + buttons = self.list_buttons(args, kp, True) + for b in buttons: + b.press() + + def _list_areas(self, args): + """List the areas in the controller""" + format_str = "{0!s}" if args.full else "{0.name}" + + filter_regex = r'(?i:%s)' % args.filter if args.filter else r'.*' + + return [(a,format_str.format(a)) for a in self._l.areas if re.match(filter_regex, a.name)] + + def default_more_cb(self, *args, **kwargs): + return True + + def _list_helper(self, args, attr, kind, format_str_pre="", format_str_post="", more_cb=default_more_cb): + """Returns a list of (obj, str) tuples filtered by args.filter regex. The format of the str is determined + by args.full.""" + format_str = "{0!s}" if args.full else "{1[0]}" + format_str = format_str_pre + format_str + format_str_post + + filter_regex = r'(?i:%s)' % args.filter if args.filter else r'.*' + + out = "" + devices = [] + for a in self._l.areas: + devices.extend([(a,d) for d in getattr(a,attr) if re.match(kind, d.type) and re.match(filter_regex, d.name)]) + + return [(d,format_str.format(d, str(d).split(","), more_cb(args,d))) for (a,d) in devices if more_cb(args,d)] + + def list_buttons(self, args, keypad, obj=False): + if args.button: + try: + regex_str = r"(?i:%s)" % args.button + regex = re.compile(regex_str) + if obj: + buttons = [b for b in keypad.buttons if regex.match(b.name)] + else: + buttons = "\n\t".join([ str(b) for b in keypad.buttons if regex.match(b.name)]) + if buttons == '\n\t': + buttons = [] + except: + buttons = "Bad button match filter \"%s\". Try again." % args.button + + return buttons + + def _list_keypads(self, args): + """List the areas in the controller""" + + if args.button: + return self._list_helper(args, "keypads", r'KEYPAD', "", "\n\t{2}", self.list_buttons) + else: + return self._list_helper(args, "keypads", r'KEYPAD') + + def _list_switches(self, args): + """List the areas in the controller""" + return self._list_helper(args, "keypads", r'DIMMER/SWITCH') + + def _list_lights(self, args): + """List the lights in the controller""" + return self._list_helper(args, "outputs", r'DIMMER') + + def _list_fans(self, args): + """List the areas in the controller""" + return self._list_helper(args, "outputs", r'FAN') + +if __name__ == '__main__': + cmdline = argparse.ArgumentParser(description="pylutron CLI") + cmdline.add_argument('-c', '--controller', default=default_controller) + cmdline.add_argument('-u', '--user', default=default_user) + cmdline.add_argument('-p', '--password', default=default_password) + cmdline.add_argument('-d', '--debug', action='store_true') + + # only consume the connection args, if there are other cmd line args assume + # those are commands + args, left = cmdline.parse_known_args() + sys.argv = sys.argv[:1]+left + + if args.debug: + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + + # create the lutron class + app = lutron(args.controller, args.user, args.password) + + # Note, the connection to the processor will retry forever, so this check + # is a bit superflous + if not app._l.connected: + sys.exit() + + # either run a command or enter the interactive loop + if len(sys.argv) > 1: + app.onecmd_plus_hooks(" ".join(sys.argv[1:])) + else: + sys.exit(app.cmdloop()) + diff --git a/pylutron/__init__.py b/pylutron/__init__.py index e4bb085..e69d999 100644 --- a/pylutron/__init__.py +++ b/pylutron/__init__.py @@ -13,6 +13,9 @@ import telnetlib import threading import time +import string +import io +import parse from typing import Any, Callable, Dict, Type @@ -29,6 +32,290 @@ socket.timeout, ) +class Processor(object): + """ Encapsulates the specific communication protocols associated with a Lutron Processor. + This is the base class that contains the interfaces and datastructures. Subclasses + provide for implementation specific connections to the processor and protocol translations. + + The Processor class keeps track of the objects associated with that processes and a + mapping from id to obj. Different Processors may have different notions what an id is. + + Each subclass should define a static variable in this class with its name as a variable + and the value as its stringified name. It should also regiter a factory to create the + processor. + + Example: + For processor subclass ProcessorFoo(Processor). + 1) Create a variable: Processor.Foo = "Foo" + 2) Register in the factory: Processor.Factory[Processor.Foo] = ProcessorFoo + """ + + # static factory for creating the processors. + # example: + # processor = Processor.Factory[Processor.HWI]() + Factory = {} + + USER_PROMPT = b'login: ' + PW_PROMPT = b'password: ' + + def __init__(self): + """ Initialize """ + self._ids = {} # dict of ids to hold id -> obj mappings + + def obj(self, obj_id, cmd_type=None): + """ return the obj from an id. If the cmd_type isn't passed in, it'll look through all + cmd types for the id""" + obj_id = obj_id.strip() + if cmd_type: + return self._ids[cmd_type][obj_id] + else: + for cmd_dict in self._ids: + if obj_id in cmd_dict: + return cmd_dict[obj_id] + + @property + def prompt(self): + return self._prompt + + @property + def cmd_types(self): + """ return all the cmd_types available""" + return self._ids.keys() + + def register_id(self, cmd_type, obj): + """Handles the management of ids. HWI processors use addresses whereas the newer + processors use ids. This abstracts out the differences.""" + self._register_id(cmd_type, obj.id, obj) + + def _register_id(self, cmd_type, obj_id, obj): + """ Subclasses that have a different notion of an id will override this. + Store a map of id's to objects. """ + ids = self._ids.setdefault(cmd_type, {}) + + if obj_id in ids: + raise IntegrationIdExistsError + self._ids[cmd_type][obj_id] = obj + _LOGGER.debug("Registered %s of type %s" % (obj_id, cmd_type)) + + def cmd(self, command_str): + """ Take in a command and translate if needed. The native protocol is QS, + so if the command is in HWI format, convert to QS. All commands are returned as + a list of one or more commands.""" + + return [command_str] + + def connect(self, telnet, user, password): + """Connect to the processor. """ + + # Wait for the login prompt, send the username. Wait for the password. + telnet.read_until(self.USER_PROMPT, timeout=3) + telnet.write(user + b'\r\n') + telnet.read_until(self.PW_PROMPT, timeout=3) + telnet.write(password + b'\r\n') + + def parser(self, lutron, xml_db_str): + """ Returns a Parser object to parse the database for this processor. + Subclasses must implement this. """ + pass + + def initialize(self, connection): + """ Called after a successful connection to setup any additional configuration. """ + pass + +class QSProcessor(Processor): + """ Processor for QS systems """ + Processor.QS = "QS" + + def __init__(self): + """ Initialize QS processor. """ + super().__init__() + + self._prompt = b'QNET> ' # prompt to look for when logged in + + def parser(self, lutron, xml_db_str): + """ Parse QS/RA2 database. """ + return QSXmlDbParser(lutron, xml_db_str) + + def initialize(self): + """ Turn on feedback """ + connection._send_locked("#MONITORING,12,2") + connection._send_locked("#MONITORING,255,2") + connection._send_locked("#MONITORING,3,1") + connection._send_locked("#MONITORING,4,1") + connection._send_locked("#MONITORING,5,1") + connection._send_locked("#MONITORING,6,1") + connection._send_locked("#MONITORING,8,1") + + +class RA2Processor(QSProcessor): + """ Processor for RA2 systems """ + Processor.RA2 = "RA2" + + def __init__(self): + """ Initialize RA2 processor. """ + super().__init__() + self._prompt = b'GNET> ' # prompt to look for when logged in + +class HWIProcessor(Processor): + """ Encapsulates the specific communication protocols associated with a Lutron Processor. The base comms protocol is RA2/QS, + when talking to an HWI processor, there is a protocol conversion.""" + Processor.HWI = "HWI" + + class CommandFormatter(string.Formatter): + """ Helper class to format strings for conversions. Adds to the Formatter spec: + - Added a psuedo arg_name 'all'. An arg_name such as {all} will return all the arguments + - Added two new conversion specifications: !o and !F. + o !o print the object associated with an object id. + o !F convert a string to a float. """ + + def __init__(self, processor): + """ Init, requires an initialized processor""" + self._processor = processor + + def get_field(self, field_name, *args, **kwargs): + """ A pseudo arg_name {all} that will return all input args. Example: + input: "This is the input: {all}".format("a", "b", "c", "d") + output: This is the input: a b c d""" + + if field_name == "all": + return ((" ".join(args[0]), "all")) + else: + return super().get_field(field_name, *args, **kwargs) + + def convert_field(self, value, conversion): + """ Additional field conversions. !o for object id, and !F for string to float. Example: + "The obj: {0!o} from obj_id: {0}".format(obj_id) + + "A real float: {0!F:.2f}.format("123.4567")""" + + if conversion == 'o': + return self._processor.obj(value) + elif conversion == 'F': + return float(value) + else: + return super().convert_field(value, conversion) + + def __init__(self): + """ Initialize HWI processor. """ + super().__init__() + self._prompt = b'LNET> ' # prompt to look for when logged in + self._format = self.CommandFormatter(self) # internal string formatter + + # rewrite rules. matches on the first argument. + # + # Input Output + # ----- ---- + # literal literal + # QS Command String HWI command string + # QS Device Action HWI Button Press + # QS Output Request HWI Request Intensity + # QS Output Action HWI Fade to Intensity with 0 time delay + # HWI level change QS Output Response + # + # + # Note. LED processing not considered + # + self._cmds = { 'PROMPT' : ["PROMPTOFF"], + 'MON_OFF' : ["DLMOFF", "KLMOFF", "KBMOFF", "GSMOFF"], + 'BTN_MON' : ["KBMON"], + 'LED_MON' : ["KLMON"], + 'ZONE_MON' : ["DLMON"], + '#DEVICE' : ["KBP, {1}, {2}"], + '?OUTPUT' : ["RDL, {1}"], + '#OUTPUT' : ["FADEDIM, {3!F:.0f}, 0, 0, {1}"], + 'DL' : ["%sOUTPUT, {1}, %s, {2}" % (Lutron.OP_RESPONSE, Output._ACTION_ZONE_LEVEL)], + } + + def canonicalize_addr(self, addr): + """ Turns a HWI address into the canonical format. i.e., square brackets, colon separated, + and two digits. + 1:2:34 -> [01:02:34]""" + + # if it already has brackets or whitespace, strip it out so that it can be reformatted + addr = addr.strip("[]%s" % string.whitespace) + + # this mess turns a HWI address into the canonical format. i.e., 1:2:3 -> [ 01:02:03 ] + return "[{}]".format(":".join(["{:02}".format(int(a)) for a in addr.split(':')])) + + def register_id(self, cmd_type, obj): + """Handles the management of ids. HWI processors use addresses whereas the newer + processors use ids. This abstracts out the differences.""" + + self._register_id(cmd_type, self.canonicalize_addr(obj.address), obj) + + def cmd(self, command_str): + """ Take in a command and translate if needed. The native protocol is QS, + so if the command is in HWI format, convert to QS. All commands are returned as + a list of one or more commands.""" + + # empty strings are often returned from the main loop + if command_str == '': + return [command_str] + + # ignore the return of the prompt + if command_str == self._prompt: + return [] + + # + # Find the command name for lookup. If there's nothing that looks like a command, deem it + # a passthrough and return the original string. All commands need not be implemented in the + # _cmds table. + # + + # OP_EXECUTE or OP_QUERY commands are translated for HWI + if command_str[0] == Lutron.OP_EXECUTE or command_str[0] == Lutron.OP_QUERY: + cmd_name = command_str.split(',')[0] + + # Some string literal commands w/ arguments don't start with OP_EXECUTE or OP_QUERY + elif command_str.split(",")[0] in self._cmds: + cmd_name = command_str.split(",")[0] + + # String literal commands without arguments + elif command_str in self._cmds: + cmd_name = command_str + + # There's no translation, pass it back as is + else: + return([command_str]) + + # Each native command can turn into one or more translated commands. + # The commands returned from the table are format strings to determine how to + # handle the args. So, get the command and then format the final string with args. + cmd_list = self._cmds[cmd_name] + cooked_cmds = [self._format.vformat((n), command_str.split(','), {}) for n in cmd_list] + try: + _LOGGER.debug("Converting cmd %s to %s" % (command_str, cooked_cmds)) + return cooked_cmds + except: + return [command_str] + + def connect(self, telnet, user, password): + """Connect to the processor. HWI requires login,password whereas QS/RA2 is a normal + login followed by password""" + + # Wait for the login prompt, send the username,password and then turn on the prompt. + telnet.read_until(self.USER_PROMPT, timeout=3) + login_string="%s,%s".encode('ascii') % (user, password) + telnet.write(login_string + b'\r\n') + + # turn on prompting, this is used to find the end of the returned line + telnet.write("PROMPTON".encode('ascii') + b'\r\n') + + def parser(self, lutron, xml_db_str): + return HWIXmlDbParser(lutron, xml_db_str) + + def initialize(self, connection): + """ Setup monitoring """ + connection._send_locked("MON_OFF") + connection._send_locked("BTN_MON") + connection._send_locked("LED_MON") + connection._send_locked("ZONE_MON") + +# register all the processors into the Processor factory. +Processor.Factory[Processor.QS] = QSProcessor +Processor.Factory[Processor.RA2] = RA2Processor +Processor.Factory[Processor.HWI] = HWIProcessor + class LutronException(Exception): """Top level module exception.""" pass @@ -52,9 +339,6 @@ class InvalidSubscription(LutronException): class LutronConnection(threading.Thread): """Encapsulates the connection to the Lutron controller.""" - USER_PROMPT = b'login: ' - PW_PROMPT = b'password: ' - PROMPT = b'GNET> ' def __init__(self, host, user, password, recv_callback): """Initializes the lutron connection, doesn't actually connect.""" @@ -69,9 +353,17 @@ def __init__(self, host, user, password, recv_callback): self._connect_cond = threading.Condition(lock=self._lock) self._recv_cb = recv_callback self._done = False + self._processor = None self.setDaemon(True) + @property + def processor(self): + return self._processor + + def setProcessor(self, processor): + self._processor = processor + def connect(self): """Connects to the lutron controller.""" if self._connected or self.is_alive(): @@ -90,7 +382,10 @@ def _send_locked(self, cmd): """ _LOGGER.debug("Sending: %s" % cmd) try: - self._telnet.write(cmd.encode('ascii') + b'\r\n') + cooked_cmds = self._processor.cmd(cmd) + for cooked_cmd in cooked_cmds: + _LOGGER.debug("Sending Command: %s -> %s" % (cmd, cooked_cmd)) + self._telnet.write(cooked_cmd.encode('ascii') + b'\r\n') except _EXPECTED_NETWORK_EXCEPTIONS: _LOGGER.exception("Error sending {}".format(cmd)) self._disconnect_locked() @@ -109,6 +404,7 @@ def send(self, cmd): def _do_login_locked(self): """Executes the login procedure (telnet) as well as setting up some connection defaults like turning off the prompt, etc.""" + _LOGGER.info("Logging in over telnet") self._telnet = telnetlib.Telnet(self._host, timeout=2) # 2 second timeout # Ensure we know that connection goes away somewhat quickly @@ -126,19 +422,29 @@ def _do_login_locked(self): except OSError: _LOGGER.exception('error configuring socket') - self._telnet.read_until(LutronConnection.USER_PROMPT, timeout=3) - self._telnet.write(self._user + b'\r\n') - self._telnet.read_until(LutronConnection.PW_PROMPT, timeout=3) - self._telnet.write(self._password + b'\r\n') - self._telnet.read_until(LutronConnection.PROMPT, timeout=3) + self._processor.connect(self._telnet, self._user, self._password) - self._send_locked("#MONITORING,12,2") - self._send_locked("#MONITORING,255,2") - self._send_locked("#MONITORING,3,1") - self._send_locked("#MONITORING,4,1") - self._send_locked("#MONITORING,5,1") - self._send_locked("#MONITORING,6,1") - self._send_locked("#MONITORING,8,1") + _LOGGER.debug("Logged in, waiting for prompt") + try: + prompt = self._telnet.read_until(self._processor.prompt, timeout=3) + _LOGGER.debug("Received prompt: %s", prompt) + if not self._processor.prompt in prompt: + _LOGGER.warning("Bad Password (%s). Disconnecting." % prompt) + self._telnet = None + return + except EOFError: + _LOGGER.exception("Logged out while waiting for prompt") + self._telnet = None + return + + # send commands to initialize the processor to the state that's needed + self._processor.initialize(self) + + # flush any commands coming back from the initialization. The extra commands + # can confuse the rest of the code + while True: + if self._telnet.read_until(b"\n", timeout=1) == b'': + break def _disconnect_locked(self): """Closes the current connection. Assume self._lock is held.""" @@ -154,11 +460,13 @@ def _maybe_reconnect(self): with self._lock: if not self._connected: _LOGGER.info("Connecting") - # This can throw an exception, but we'll catch it in run() + # Make sure that it was able to log in. If the telnet connection got torn + # down during login, it's not a successful connection self._do_login_locked() - self._connected = True + if self._telnet: + self._connected = True + _LOGGER.info("Connected") self._connect_cond.notify_all() - _LOGGER.info("Connected") def _main_loop(self): """Main body of the the thread function. @@ -175,12 +483,18 @@ def _main_loop(self): # code runs synchronously in this loop). t = self._telnet if t is not None: - line = t.read_until(b"\n", timeout=3) + try: + line = t.read_until(b"\n", timeout=3) + except EOFError: + self._connected = False + _LOGGER.exception('Connection closed while reading next line.') else: raise EOFError('Telnet object already torn down') except _EXPECTED_NETWORK_EXCEPTIONS: _LOGGER.exception("Uncaught exception") try: + # if it didn't connect, wait a bit to see if the transient error cleared up. + time.sleep(1) self._lock.acquire() self._disconnect_locked() # don't spam reconnect @@ -188,7 +502,7 @@ def _main_loop(self): continue finally: self._lock.release() - self._recv_cb(line.decode('ascii').rstrip()) + self._recv_cb(self._processor.cmd(line.decode('ascii').rstrip())[0]) def run(self): """Main entry point into our receive thread. @@ -212,11 +526,114 @@ class LutronXmlDbParser(object): def __init__(self, lutron, xml_db_str): """Initializes the XML parser, takes the raw XML data as string input.""" + self._lutron = lutron self._xml_db_str = xml_db_str self.areas = [] self.project_name = None + def __str__(self): + return "%s:\n%s" % (self.project_name, + "\n".join([str(a) for a in self._areas])) + +class HWIXmlDbParser(LutronXmlDbParser): + """ Parse HWI XML DB's """ + + def __init__(self, lutron, xml_db_str): + super().__init__(lutron, xml_db_str) + + def parse(self): + """Main entrypoint into the parser. It interprets and creates all the + relevant Lutron objects and stuffs them into the appropriate hierarchy.""" + import xml.etree.ElementTree as ET + root = ET.fromstring(self._xml_db_str) + # The structure is something like this: + # + # + # + # + # + # + # + # + + self.project_name = root.find('ProjectName').text + + for area_xml in root.getiterator('Area'): + self._parse_area(area_xml) + return True + + def _parse_area(self, area_xml): + """Parses an Area tag, which is effectively a room, depending on how the + Lutron controller programming was done.""" + + area_name = area_xml.find('Name').text + for room_xml in area_xml.getiterator('Room'): + area = Area(self._lutron, + name="%s-%s" % (area_name, room_xml.find('Name').text), + integration_id=int(room_xml.find('Id').text), + occupancy_group_id=None) + + outputs = room_xml.find('Outputs') + for output_xml in outputs.getiterator('Output'): + output = self._parse_output(output_xml) + area.add_output(output) + self._parse_devices(area, room_xml) + self.areas.append(area) + + def _parse_devices(self, area, room_xml): + control_stations = room_xml.find('Inputs') + for cs_xml in control_stations.getiterator('ControlStation'): + devices = cs_xml.find('Devices') + for device_xml in devices.getiterator('Device'): + keypad = self._parse_keypad(device_xml, cs_xml) + area.add_keypad(keypad) + + def _parse_output(self, output_xml): + """Parses an output, which is generally a switch controlling a set of + lights/outlets, etc.""" + + output = Output(self._lutron, + name=output_xml.find('Name').text, + integration_id=output_xml.find('Address').text, + address=output_xml.find('Address').text, + output_type=output_xml.find('Type').text, + watts=int(output_xml.find('FixtureWattage').text)) + return output + def _parse_keypad(self, keypad_xml, cs_xml): + """Parses a keypad or dimmer.""" + keypad = Keypad(self._lutron, + name=cs_xml.find('Name').text, + keypad_type=keypad_xml.find('Type').text, + location=keypad_xml.find('GangPosition').text, + integration_id=keypad_xml.find('Address').text, + address=keypad_xml.find('Address').text) + + buttons = keypad_xml.find('Buttons') + for buttons_xml in buttons.getiterator('Button'): + button = self._parse_button(keypad, buttons_xml) + if button: keypad.add_button(button) + + return keypad + + def _parse_button(self, keypad, button_xml): + button_type = button_xml.find('Type').text + + if button_type != 'Not Programmed': + button = Button(self._lutron, keypad, + name=button_xml.find('Name').text, + num=int(button_xml.find('Number').text), + button_type=button_xml.find('Type').text.replace("Default ", ""), + direction='direction') + + return button + +class QSXmlDbParser(LutronXmlDbParser): + """ Parse QS/RA2 XML DB's """ + + def __init__(self, lutron, xml_db_str): + super().__init__(lutron, xml_db_str) + def parse(self): """Main entrypoint into the parser. It interprets and creates all the relevant Lutron objects and stuffs them into the appropriate hierarchy.""" @@ -357,7 +774,6 @@ def _parse_motion_sensor(self, sensor_xml): name=sensor_xml.get('Name'), integration_id=int(sensor_xml.get('IntegrationID'))) - class Lutron(object): """Main Lutron Controller class. @@ -371,7 +787,7 @@ class Lutron(object): OP_QUERY = '?' OP_RESPONSE = '~' - def __init__(self, host, user, password): + def __init__(self, host, user, password, processor_type=None): """Initializes the Lutron object. No connection is made to the remote device.""" self._host = host @@ -382,12 +798,29 @@ def __init__(self, host, user, password): self._ids = {} self._legacy_subscribers = {} self._areas = [] + self._processor_type = processor_type @property def areas(self): """Return the areas that were discovered for this Lutron controller.""" return self._areas + @property + def name(self): + """Return the name of the project running on this Lutron controller.""" + return self._name + + def id_to_obj(self, obj_id, cmd_type=None): + return self._conn.processor.obj(obj_id, cmd_type) + + @property + def processor_type(self): + return self._processor_type + + @property + def connected(self): + return self._conn._connected + def subscribe(self, obj, handler): """Subscribes to status updates of the requested object. @@ -408,10 +841,7 @@ def register_id(self, cmd_type, obj): """Registers an object (through its integration id) to receive update notifications. This is the core mechanism how Output and Keypad objects get notified when the controller sends status updates.""" - ids = self._ids.setdefault(cmd_type, {}) - if obj.id in ids: - raise IntegrationIdExistsError - self._ids[cmd_type][obj.id] = obj + self._conn.processor.register_id(cmd_type, obj) def _dispatch_legacy_subscriber(self, obj, *args, **kwargs): """This dispatches the registered callback for 'obj'. This is only used @@ -422,8 +852,9 @@ def _dispatch_legacy_subscriber(self, obj, *args, **kwargs): def _recv(self, line): """Invoked by the connection manager to process incoming data.""" - if line == '': + if line == '' or self._conn._processor.prompt.decode('utf-8') in line: return + _LOGGER.debug("Received: %s" % line) # Only handle query response messages, which are also sent on remote status # updates (e.g. user manually pressed a keypad button) if line[0] != Lutron.OP_RESPONSE: @@ -431,16 +862,17 @@ def _recv(self, line): return parts = line[1:].split(',') cmd_type = parts[0] - integration_id = int(parts[1]) + integration_id = parts[1] args = parts[2:] - if cmd_type not in self._ids: + + if cmd_type not in self._conn.processor.cmd_types: _LOGGER.info("Unknown cmd %s (%s)" % (cmd_type, line)) return - ids = self._ids[cmd_type] - if integration_id not in ids: + + obj = self.id_to_obj(integration_id, cmd_type) + if not obj: _LOGGER.warning("Unknown id %d (%s)" % (integration_id, line)) return - obj = ids[integration_id] handled = obj.handle_update(args) def connect(self): @@ -450,7 +882,7 @@ def connect(self): def send(self, op, cmd, integration_id, *args): """Formats and sends the requested command to the Lutron controller.""" out_cmd = ",".join( - (cmd, str(integration_id)) + tuple((str(x) for x in args))) + (cmd, integration_id) + tuple((str(x) for x in args))) self._conn.send(op + out_cmd) def load_xml_db(self, cache_path=None): @@ -461,23 +893,83 @@ def load_xml_db(self, cache_path=None): xml_db = None loaded_from = None + processor_type = None + processor = None + # why don't format and parse use the same syntax!?? + # the cached file has a header that denotes which type of processor to instantiate + header_write = "pylutron processor: {}\n" + header_read = "pylutron processor: {:w}" + + # read from cached file if it exists if cache_path: try: with open(cache_path, 'rb') as f: xml_db = f.read() - loaded_from = 'cache' - except Exception: + # look for the header in the file + p = parse.search(header_read, str(xml_db)) + + # if it's there and there's a processor associated with it, use that processor + # and fixup the xml_db to remove the header since the XML parser doesn't + # like it there. + if p: + if p.fixed[0] in Processor.Factory: + loaded_from = 'cache' + processor = p.fixed[0] + xml_db = xml_db[p.spans[0][1]-1:] + except Exception as e: + print(e) pass - if not loaded_from: - import urllib.request - url = 'http://' + self._host + '/DbXmlInfo.xml' - with urllib.request.urlopen(url) as xmlfile: - xml_db = xmlfile.read() - loaded_from = 'repeater' + # if there's no cache, try loading from the HWI database via FTP. + # if there's nothing there, try loading from the RA2/QS URL + # + # this determines the Processor type to create. + if not loaded_from: + try: + _LOGGER.debug("Trying FTP for XML DB") + import ftplib + with ftplib.FTP(self._host, "lutron", "lutron") as ftp: + if logging.getLogger().level == logging.DEBUG: + ftp.set_debuglevel(2) + ftp.set_pasv(0) + ftp.login() + + # login successful, retrieve the XML database. + cached_file = io.BytesIO(b'0') + ftp.cwd('proc0') + ftp.retrbinary("RETR fullxml.dat", cached_file.write) + loaded_from = 'FTP' + processor = Processor.HWI + + # the xml db is in zip format, unzip it to get at the real db + import zipfile + with zipfile.ZipFile(cached_file) as myzip: + with myzip.open("fulldata.dat") as myfile: + xml_db = myfile.read() + + except Exception as e: + _LOGGER.debug("FTP failed,trying HTTP for XML DB: %s" % e) + import urllib.request + url = 'http://' + self._host + '/DbXmlInfo.xml' + with urllib.request.urlopen(url) as xmlfile: + xml_db = xmlfile.read() + loaded_from = 'repeater' + processor = Processor.RA2 + + # if the user asked for a specific processor, use that instead. + # otherwise the processor type is decided by how the database was + # found + if self.processor_type: + processor = self.processor_type + + # create the processor _LOGGER.info("Loaded xml db from %s" % loaded_from) + self._conn.setProcessor(Processor.Factory[processor]()) - parser = LutronXmlDbParser(lutron=self, xml_db_str=xml_db) + # setup the parser based on the processor type + parser = self._conn.processor.parser(lutron=self, xml_db_str=xml_db) + + # parse assert(parser.parse()) # throw our own exception self._areas = parser.areas self._name = parser.project_name @@ -485,8 +977,11 @@ def load_xml_db(self, cache_path=None): _LOGGER.info('Found Lutron project: %s, %d areas' % ( self._name, len(self.areas))) - if cache_path and loaded_from == 'repeater': + # save this for next time if a cache_path was provided + if cache_path and loaded_from != None: + _LOGGER.info('Saving cache file: %s' % cache_path) with open(cache_path, 'wb') as f: + f.write(header_write.format(processor).encode()) f.write(xml_db) return True @@ -551,17 +1046,23 @@ class LutronEntity(object): """Base class for all the Lutron objects we'd like to manage. Just holds basic common info we'd rather not manage repeatedly.""" - def __init__(self, lutron, name): + def __init__(self, lutron, name, address=None): """Initializes the base class with common, basic data.""" self._lutron = lutron self._name = name self._subscribers = [] + self._address=address @property def name(self): """Returns the entity name (e.g. Pendant).""" return self._name + @property + def address(self): + """Returns the address of the object. Addresses exist in legacy HWI""" + return self._address + def _dispatch_event(self, event: LutronEvent, params: Dict): """Dispatches the specified event to all the subscribers.""" for handler, context in self._subscribers: @@ -606,12 +1107,16 @@ class Event(LutronEvent): """ LEVEL_CHANGED = 1 - def __init__(self, lutron, name, watts, output_type, integration_id): + def __init__(self, lutron, name, watts, output_type, integration_id, address=None): """Initializes the Output.""" - super(Output, self).__init__(lutron, name) + super(Output, self).__init__(lutron, name, address), self._watts = watts self._output_type = output_type - self._level = 0.0 + # set the level to something invalid to allow a just started system to send a command which + # can set the level to 0. Otherwise the default value of level=0 will cause the short-circuit + # check of the new level being the same as the old to trigger when hit with a request for + # new level of 0 + self._level = -1.0 self._query_waiters = _RequestHelper() self._integration_id = integration_id @@ -619,13 +1124,13 @@ def __init__(self, lutron, name, watts, output_type, integration_id): def __str__(self): """Returns a pretty-printed string for this object.""" - return 'Output name: "%s" watts: %d type: "%s" id: %d' % ( - self._name, self._watts, self._output_type, self._integration_id) + return 'Output name: "%s," watts: %d, type: "%s", id: %s, address: %s, level: %f' % ( + self._name, self._watts, self._output_type, self._integration_id, self._address, self._level) def __repr__(self): """Returns a stringified representation of this object.""" return str({'name': self._name, 'watts': self._watts, - 'type': self._output_type, 'id': self._integration_id}) + 'type': self._output_type, 'id': self._integration_id, 'address': self._address, 'level': self.level}) @property def id(self): @@ -634,12 +1139,12 @@ def id(self): def handle_update(self, args): """Handles an event update for this object, e.g. dimmer level change.""" - _LOGGER.debug("handle_update %d -- %s" % (self._integration_id, args)) + _LOGGER.debug("handle_update %s -- %s" % (self._integration_id, args)) state = int(args[0]) if state != Output._ACTION_ZONE_LEVEL: return False - level = float(args[1]) - _LOGGER.debug("Updating %d(%s): s=%d l=%f" % ( + level = float(args[1].strip()) + _LOGGER.debug("Updating %s(%s): s=%d l=%f" % ( self._integration_id, self._name, state, level)) self._level = level self._query_waiters.notify() @@ -751,8 +1256,8 @@ def __init__(self, lutron, keypad, name, num, button_type, direction): def __str__(self): """Pretty printed string value of the Button object.""" - return 'Button name: "%s" num: %d type: "%s" direction: "%s"' % ( - self.name, self.number, self._button_type, self._direction) + return 'Button name: "%s", num: %d, type: "%s"' % ( + self.name, self.number, self._button_type) def __repr__(self): """String representation of the Button object.""" @@ -817,7 +1322,7 @@ def __init__(self, lutron, keypad, name, led_num, component_num): def __str__(self): """Pretty printed string value of the Led object.""" - return 'LED keypad: "%s" name: "%s" num: %d component_num: %d"' % ( + return 'LED keypad: "%s", name: "%s", num: %d, component_num: %d"' % ( self._keypad.name, self.name, self.number, self.component_number) def __repr__(self): @@ -879,9 +1384,9 @@ class Keypad(LutronEntity): """ _CMD_TYPE = 'DEVICE' - def __init__(self, lutron, name, keypad_type, location, integration_id): + def __init__(self, lutron, name, keypad_type, location, integration_id, address=None): """Initializes the Keypad object.""" - super(Keypad, self).__init__(lutron, name) + super(Keypad, self).__init__(lutron, name, address) self._buttons = [] self._leds = [] self._components = {} @@ -943,6 +1448,13 @@ def handle_update(self, args): return self._components[component].handle_update(action, params) return False + def __str__(self): + """Returns a pretty-printed string for this object.""" + return 'Keypad name: "%s", location: %s, id: %s, type: %s, address: %s\n\t\t%s\n\t\t%s' % ( + self._name, self._location, self._integration_id, self._type, self._address, + "\n\t\t".join([str(o) for o in self._buttons]), + "\n\t\t".join([str(o) for o in self._leds]) + ) class PowerSource(Enum): """Enum values representing power source, reported by queries to @@ -1005,7 +1517,7 @@ def id(self): def __str__(self): """Returns a pretty-printed string for this object.""" - return 'MotionSensor {} Id: {} Battery: {} Power: {}'.format( + return 'MotionSensor {}, Id: {}, Battery: {}, Power: {}'.format( self.name, self.id, self.battery_status, self.power_source) def __repr__(self): @@ -1125,7 +1637,6 @@ def _do_query_state(self): return self._lutron.send(Lutron.OP_QUERY, OccupancyGroup._CMD_TYPE, self._integration_id, OccupancyGroup._ACTION_STATE) - def handle_update(self, args): """Handles an event update for this object, e.g. occupancy state change.""" action = int(args[0]) @@ -1169,6 +1680,20 @@ def add_sensor(self, sensor): if not self._occupancy_group: self._occupancy_group = OccupancyGroup(self._lutron, self) + def __str__(self): + """Returns a pretty-printed string for this object.""" + return 'Area name: "%s", occupancy_group_id: %s, id: %d\n\t%s\n\t%s' % ( + self._name, self._occupancy_group_id, self._integration_id, + "\n\t".join([str(o) for o in self._outputs]), + "\n\t".join([str(o) for o in self._keypads]) + ) + + def __repr__(self): + """Returns a stringified representation of this object.""" + return str({'name': self._name, + 'occupancy_group_id': self._occupancy_group_id, 'id': self._integration_id, + 'outputs': self._outputs, 'keypads': self._keypads}) + @property def name(self): """Returns the name of this area.""" diff --git a/setup.py b/setup.py index e8be3dc..835fc52 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name = 'pylutron', - version = '0.2.5', + version = '0.2.6', license = 'MIT', description = 'Python library for Lutron RadioRA 2', author = 'Dima Zavin',