From fe629488790e7c0f82b12cae941ae1a81d5cbda2 Mon Sep 17 00:00:00 2001 From: John Carter Date: Sun, 30 Jun 2019 21:21:29 +1200 Subject: [PATCH] Reformat code with black --- hass_test.py | 10 +- home_assistant/sure_petflap.py | 73 ++-- setup.py | 24 +- sp_cli.py | 94 ++--- sure_petcare/__init__.py | 623 ++++++++++++++++++--------------- sure_petcare/utils.py | 32 +- 6 files changed, 478 insertions(+), 378 deletions(-) diff --git a/hass_test.py b/hass_test.py index 1b87b6b..7b6af39 100755 --- a/hass_test.py +++ b/hass_test.py @@ -26,11 +26,11 @@ from pprint import pprint import json -dut = Dut.SurePetConnect( user, pw, debug = True, cache_file = cache_file ) +dut = Dut.SurePetConnect(user, pw, debug=True, cache_file=cache_file) -print( '--- state (decoded JSON):' ) -pprint( json.loads( dut.state ) ) +print("--- state (decoded JSON):") +pprint(json.loads(dut.state)) -print( '\n--- state attributes:' ) -pprint( dut.state_attributes ) +print("\n--- state attributes:") +pprint(dut.state_attributes) diff --git a/home_assistant/sure_petflap.py b/home_assistant/sure_petflap.py index 5ae35ec..a9273d2 100644 --- a/home_assistant/sure_petflap.py +++ b/home_assistant/sure_petflap.py @@ -2,13 +2,16 @@ from datetime import timedelta import json + def is_hass_component(): try: import homeasssistant + return True except ImportError: return False + if is_hass_component(): from homeassistant.helpers.entity import Entity from homeassistant.util import Throttle @@ -21,10 +24,9 @@ def is_hass_component(): import voluptuous as vol - PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ - vol.Required(CONF_USERNAME): cv.string, - vol.Required(CONF_PASSWORD): cv.string, - }) + PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( + {vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string} + ) else: # Assume not running within home assistant. This *does* mean that you # won't be able to run this test script if you have homeassistant @@ -34,26 +36,28 @@ def is_hass_component(): from sure_petcare.utils import gen_device_id # dummy dependencies - class Entity( object ): + class Entity(object): pass - def Throttle( *args, **kwargs ): - def decorator( f ): + def Throttle(*args, **kwargs): + def decorator(f): return f + return decorator -#REQUIREMENTS = ['sure_petcare'] +# REQUIREMENTS = ['sure_petcare'] _LOGGER = logging.getLogger(__name__) -CONF_device_id = 'device_id' +CONF_device_id = "device_id" SCAN_INTERVAL = timedelta(seconds=300) MIN_TIME_BETWEEN_SCANS = timedelta(seconds=600) MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(seconds=120) + def setup_platform(hass, config, add_devices, discovery_info=None): """Setup the sensor platform.""" username = config.get(CONF_USERNAME) @@ -66,15 +70,20 @@ class SurePetConnect(Entity): def __init__(self, username, password, **kwargs): """Initialize the sensor.""" - _LOGGER.debug('Initializing...') + _LOGGER.debug("Initializing...") # 1.25V is a fairly conservative guess for alkalines. If you use # rechargeables, you may need to change this. - self.FULL_BATTERY_VOLTAGE = 1.6 # volts - self.LOW_BATTERY_VOLTAGE = 1.25 # volts - self.battery = [-1] *60 - self.battery[0] = 1 # Initialize average so we have a mean + self.FULL_BATTERY_VOLTAGE = 1.6 # volts + self.LOW_BATTERY_VOLTAGE = 1.25 # volts + self.battery = [-1] * 60 + self.battery[0] = 1 # Initialize average so we have a mean self.battery_pos = -1 - self.sure = SurePetFlap(email_address=username, password=password, device_id=gen_device_id(), **kwargs) + self.sure = SurePetFlap( + email_address=username, + password=password, + device_id=gen_device_id(), + **kwargs + ) self._state = None self._attributes = [] self.update() @@ -82,7 +91,7 @@ def __init__(self, username, password, **kwargs): @property def name(self): """Return the name of the sensor.""" - return 'SurePet Connect' + return "SurePet Connect" @property def state(self): @@ -92,15 +101,15 @@ def state(self): @property def unit_of_measurement(self): """Return the unit of measurement.""" - return '' - + return "" + @Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS) def update(self): """Fetch new state data for the sensor. This is the only method that should fetch new data for Home Assistant. """ - _LOGGER.debug('Returning current state...') + _LOGGER.debug("Returning current state...") flap_status = {} with self.sure: # Update only data required @@ -113,22 +122,26 @@ def update(self): self.sure.update_router_status() for pet in self.sure.pets: pet_status = self.sure.get_current_status(pet) - flap_status[str(self.sure.pets[pet]['name'])] = pet_status - self.battery_pos = (self.battery_pos + 1) % len(self.battery) #Loop around + flap_status[str(self.sure.pets[pet]["name"])] = pet_status + self.battery_pos = (self.battery_pos + 1) % len(self.battery) # Loop around # NB: Units have changed. Earlier versions reported the raw voltage # direct from the Sure backend which was the sum of the four # batteries. The current API reports voltage per battery, making # it easier to set thresholds based on battery chemistry. bat_left = self.sure.battery - self.LOW_BATTERY_VOLTAGE bat_full = self.FULL_BATTERY_VOLTAGE - self.LOW_BATTERY_VOLTAGE - self.battery[self.battery_pos] = int(bat_left/bat_full*100) - flap_status['avg_battery'] = int(self.mean([ i for i in self.battery if i > 0])) - flap_status['battery'] = self.battery[self.battery_pos] - flap_status['flap_online'] = self.sure.flap_status[self.sure.default_flap]['online'] - flap_status['hub_online'] = self.sure.router_status[self.sure.default_router]['online'] - flap_status['lock_status'] = self.sure.lock_mode() - flap_status['locked'] = self.sure.locked() - _LOGGER.debug('State: ' + str(flap_status)) + self.battery[self.battery_pos] = int(bat_left / bat_full * 100) + flap_status["avg_battery"] = int(self.mean([i for i in self.battery if i > 0])) + flap_status["battery"] = self.battery[self.battery_pos] + flap_status["flap_online"] = self.sure.flap_status[self.sure.default_flap][ + "online" + ] + flap_status["hub_online"] = self.sure.router_status[self.sure.default_router][ + "online" + ] + flap_status["lock_status"] = self.sure.lock_mode() + flap_status["locked"] = self.sure.locked() + _LOGGER.debug("State: " + str(flap_status)) self._state = json.dumps(flap_status) self._attributes = flap_status @@ -140,6 +153,6 @@ def state_attributes(self): """ return self._attributes - + def mean(self, numbers): return float(sum(numbers)) / max(len(numbers), 1) diff --git a/setup.py b/setup.py index 74d8fcc..0591cab 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,15 @@ from setuptools import setup -setup(name='sure_petcare', - version='0.1', - description='Library to access sure connect catflat', - url='http://github.com/rcastberg/sure_petcare', - author='Rene Castberg', - author_email='rene@castberg.org', - license='GPL', - install_requires=['requests'], - packages=['sure_petcare'], - scripts=['sp_cli.py'], - zip_safe=False) +setup( + name="sure_petcare", + version="0.1", + description="Library to access sure connect catflat", + url="http://github.com/rcastberg/sure_petcare", + author="Rene Castberg", + author_email="rene@castberg.org", + license="GPL", + install_requires=["requests"], + packages=["sure_petcare"], + scripts=["sp_cli.py"], + zip_safe=False, +) diff --git a/sp_cli.py b/sp_cli.py index 4cf2f3e..ee8a69c 100755 --- a/sp_cli.py +++ b/sp_cli.py @@ -6,7 +6,8 @@ import sure_petcare -def main( argv ): + +def main(argv): description = """\ Sure Petcare Connect CLI @@ -44,31 +45,36 @@ def main( argv ): your pets, flap or household. Note that --update is mutually exclusive with any other option (other than --email or --pass).""" - parser = argparse.ArgumentParser( description = description, formatter_class = argparse.RawDescriptionHelpFormatter ) - parser.add_argument( '-e', '--email', - help = 'account email address' ) - parser.add_argument( '-p', '--pass', dest = 'pw', - help = 'account password' ) - parser.add_argument( '--update', action = 'store_true', - help = 'update cache from Sure servers. Mutually exclusive with commands/queries.' ) - parser.add_argument( '-c', '--cache-file', - help = 'Cache file to use if not default' ) - parser.add_argument( 'cmd', nargs = '*', - help = 'One of ' + ', '.join( sorted(CMDS.keys()) ) ) + parser = argparse.ArgumentParser( + description=description, formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument("-e", "--email", help="account email address") + parser.add_argument("-p", "--pass", dest="pw", help="account password") + parser.add_argument( + "--update", + action="store_true", + help="update cache from Sure servers. Mutually exclusive with commands/queries.", + ) + parser.add_argument("-c", "--cache-file", help="Cache file to use if not default") + parser.add_argument( + "cmd", nargs="*", help="One of " + ", ".join(sorted(CMDS.keys())) + ) args = parser.parse_args() if args.update and args.cmd: - exit( '--update and commands/queries are mutually exclusive' ) + exit("--update and commands/queries are mutually exclusive") if not args.update and not args.cmd: parser.print_help() exit() - debug = os.environ.get( 'SPDEBUG' ) is not None - sp = sure_petcare.SurePetFlap( email_address = args.email, - password = args.pw, - cache_file = args.cache_file, - debug = debug ) + debug = os.environ.get("SPDEBUG") is not None + sp = sure_petcare.SurePetFlap( + email_address=args.email, + password=args.pw, + cache_file=args.cache_file, + debug=debug, + ) if args.update: # Either update and write the cache, or... @@ -77,72 +83,75 @@ def main( argv ): else: # ... execute queries on cached data if args.cmd[0] in CMDS: - CMDS[args.cmd[0]]( sp, args ) + CMDS[args.cmd[0]](sp, args) else: - exit( 'Unknown command: %s' % (args.cmd[0],) ) + exit("Unknown command: %s" % (args.cmd[0],)) + CMDS = {} -def cmd( f ): - if f.__name__.startswith( 'cmd_' ): + + +def cmd(f): + if f.__name__.startswith("cmd_"): fn = f.__name__[4:] CMDS[fn] = f else: - raise ValueError( 'bad use of @cmd decorator: %s' % (f.__name__,) ) + raise ValueError("bad use of @cmd decorator: %s" % (f.__name__,)) return f @cmd -def cmd_ls_house( sp, args ): +def cmd_ls_house(sp, args): """ List households """ for hid, hdata in sp.households.items(): - default_flag = (hid == sp.default_household) and '(active)' or '' - print( '%s\t%s %s' % (hid, hdata['name'], default_flag,) ) + default_flag = (hid == sp.default_household) and "(active)" or "" + print("%s\t%s %s" % (hid, hdata["name"], default_flag)) @cmd -def cmd_ls_pets( sp, args ): +def cmd_ls_pets(sp, args): """ For each pet in household, show location (inside, outside, unknown) """ for pid, pdata in sp.pets.items(): - print( '%s (%s) is %s' % (pdata['name'], pid, sp.get_current_status( pid ),) ) + print("%s (%s) is %s" % (pdata["name"], pid, sp.get_current_status(pid))) @cmd -def cmd_ls_flaps( sp, args ): +def cmd_ls_flaps(sp, args): """ For each pet in household, show location (inside, outside, unknown) """ - for flap_id, name in sp.household['flaps'].items(): - bat = sp.get_battery( flap_id = flap_id ) - lck = sp.lock_mode( flap_id ) - print( '%s (%s) at %05.3fV is %s' % (name, flap_id, bat, lck,) ) + for flap_id, name in sp.household["flaps"].items(): + bat = sp.get_battery(flap_id=flap_id) + lck = sp.lock_mode(flap_id) + print("%s (%s) at %05.3fV is %s" % (name, flap_id, bat, lck)) @cmd -def cmd_pet_tl( sp, args ): +def cmd_pet_tl(sp, args): """ For each pet in household, show location (inside, outside, unknown) """ try: name = args.cmd[1] except IndexError: - exit('need pet name (enclose in quotes if necessary)') + exit("need pet name (enclose in quotes if necessary)") - sp.print_timeline( name = name ) + sp.print_timeline(name=name) @cmd -def cmd_set_hid( sp, args ): +def cmd_set_hid(sp, args): """ Set default household ID """ try: hid = int(args.cmd[1]) - except (ValueError, IndexError,): - exit( 'need valid household ID' ) + except (ValueError, IndexError): + exit("need valid household ID") if hid in sp.households: with sp: @@ -150,7 +159,8 @@ def cmd_set_hid( sp, args ): if sp.update_required: sp.update() else: - exit( 'Household ID %s not known' % (hid,) ) + exit("Household ID %s not known" % (hid,)) + -if __name__ == '__main__': - main( sys.argv ) +if __name__ == "__main__": + main(sys.argv) diff --git a/sure_petcare/__init__.py b/sure_petcare/__init__.py index 63ad736..cfd91cd 100644 --- a/sure_petcare/__init__.py +++ b/sure_petcare/__init__.py @@ -11,56 +11,57 @@ import sure_petcare.utils as utils from .utils import mk_enum -CACHE_FILE = os.path.expanduser( '~/.surepet.cache' ) +CACHE_FILE = os.path.expanduser("~/.surepet.cache") # version of cache structure CACHE_VERSION = 2 -DIRECTION ={0:'Looked through',1:'Entered House',2:'Left House'} -INOUT_STATUS = {1 : 'Inside', 2 : 'Outside'} +DIRECTION = {0: "Looked through", 1: "Entered House", 2: "Left House"} +INOUT_STATUS = {1: "Inside", 2: "Outside"} # The following event types are known, eg EVT.CURFEW. -EVT = mk_enum( 'EVT', - {'MOVE': 0, - 'MOVE_UID': 7, # movement of unknown animal - 'BAT_WARN': 1, - 'LOCK_ST': 6, - 'USR_IFO': 12, - 'USR_NEW': 17, - 'CURFEW': 20, - } ) - -LK_MOD = mk_enum( 'LK_MOD', - {'UNLOCKED': 0, - 'LOCKED_IN': 1, - 'LOCKED_OUT': 2, - 'LOCKED_ALL': 3, - 'CURFEW': 4, - 'CURFEW_LOCKED': -1, - 'CURFEW_UNLOCKED': -2, - 'CURFEW_UNKNOWN': -3, - } ) - -PROD_ID = mk_enum( 'PROD_ID', - {'ROUTER': 1, - 'PET_FLAP': 3, # Pet Door Connect - 'CAT_FLAP': 6, # Cat Door Connect - } ) - -LOC = mk_enum( 'LOC', - {'INSIDE': 1, - 'OUTSIDE': 2, - 'UNKNOWN': -1, - } ) +EVT = mk_enum( + "EVT", + { + "MOVE": 0, + "MOVE_UID": 7, # movement of unknown animal + "BAT_WARN": 1, + "LOCK_ST": 6, + "USR_IFO": 12, + "USR_NEW": 17, + "CURFEW": 20, + }, +) + +LK_MOD = mk_enum( + "LK_MOD", + { + "UNLOCKED": 0, + "LOCKED_IN": 1, + "LOCKED_OUT": 2, + "LOCKED_ALL": 3, + "CURFEW": 4, + "CURFEW_LOCKED": -1, + "CURFEW_UNLOCKED": -2, + "CURFEW_UNKNOWN": -3, + }, +) + +PROD_ID = mk_enum( + "PROD_ID", + {"ROUTER": 1, "PET_FLAP": 3, "CAT_FLAP": 6}, # Pet Door Connect # Cat Door Connect +) + +LOC = mk_enum("LOC", {"INSIDE": 1, "OUTSIDE": 2, "UNKNOWN": -1}) # REST API endpoints (no trailing slash) -_URL_AUTH = 'https://app.api.surehub.io/api/auth/login' -_URL_HOUSEHOLD = 'https://app.api.surehub.io/api/household' -_URL_DEV = 'https://app.api.surehub.io/api/device' -_URL_TIMELINE = 'https://app.api.surehub.io/api/timeline' -_URL_PET = 'https://app.api.surehub.io/api/pet' +_URL_AUTH = "https://app.api.surehub.io/api/auth/login" +_URL_HOUSEHOLD = "https://app.api.surehub.io/api/household" +_URL_DEV = "https://app.api.surehub.io/api/device" +_URL_TIMELINE = "https://app.api.surehub.io/api/timeline" +_URL_PET = "https://app.api.surehub.io/api/pet" -API_USER_AGENT = 'Mozilla/5.0 (Linux; Android 7.0; SM-G930F Build/NRD90M; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/64.0.3282.137 Mobile Safari/537.36' +API_USER_AGENT = "Mozilla/5.0 (Linux; Android 7.0; SM-G930F Build/NRD90M; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/64.0.3282.137 Mobile Safari/537.36" _HARD_RATE_LIMIT = 60 @@ -73,7 +74,15 @@ class SurePetFlapAPI(object): below. """ - def __init__( self, email_address = None, password = None, household_id = None, device_id = None, cache_file = CACHE_FILE, debug = False ): + def __init__( + self, + email_address=None, + password=None, + household_id=None, + device_id=None, + cache_file=CACHE_FILE, + debug=False, + ): """ `email_address` and `password` are self explanatory. They are cached on disc file `cache_file` and are therefore only mandatory on first @@ -120,19 +129,21 @@ def __init__( self, email_address = None, password = None, household_id = None, """ # cache_status is None to indicate that it hasn't been initialised self.cache_file = cache_file or CACHE_FILE - self.cache_lockfile = self.cache_file + '.lock' + self.cache_lockfile = self.cache_file + ".lock" self._init_default_household = household_id self._init_email = email_address self._init_pw = password self._load_cache() self.__read_only = True - if (email_address is None or password is None) and self.cache['AuthToken'] is None: - raise ValueError('No cached credentials and none provided') + if (email_address is None or password is None) and self.cache[ + "AuthToken" + ] is None: + raise ValueError("No cached credentials and none provided") self.debug = debug self.s = requests.session() if debug: self.req_count = self.req_rx_bytes = 0 - self.s.hooks['response'].append( self._log_req ) + self.s.hooks["response"].append(self._log_req) if device_id is None: self.device_id = utils.gen_device_id() else: @@ -143,18 +154,21 @@ def __init__( self, email_address = None, password = None, household_id = None, # @property - def update_required( self ): + def update_required(self): """Indicates whether an `update()` call is **required** for correct function, not whether the cache is up-to-date.""" - return (self.cache['AuthToken'] is None or - self.households is None or - self.household.get('pets') is None) + return ( + self.cache["AuthToken"] is None + or self.households is None + or self.household.get("pets") is None + ) @property - def battery( self ): + def battery(self): "Battery level of default flap at default household" return self.get_battery() - def get_battery( self, household_id = None, flap_id = None ): + + def get_battery(self, household_id=None, flap_id=None): """ Return battery voltage (assuming four batteries). The level at which you should replace them depends on the chemistry (type) of the battery. @@ -162,35 +176,36 @@ def get_battery( self, household_id = None, flap_id = None ): the official app to get better advice. """ household_id = household_id or self.default_household - flap_id = flap_id or self.get_default_flap( household_id ) + flap_id = flap_id or self.get_default_flap(household_id) try: - return self.all_flap_status[household_id][flap_id]['battery'] / 4.0 + return self.all_flap_status[household_id][flap_id]["battery"] / 4.0 except KeyError: raise SPAPIUnitialised() @property - def pets( self ): + def pets(self): return self.get_pets() - def get_pets( self, household_id = None ): + + def get_pets(self, household_id=None): "Return dict of pets. Default household used if not specified." household_id = household_id or self.default_household try: - return self.households[household_id]['pets'] + return self.households[household_id]["pets"] except KeyError: raise SPAPIUnitialised() - def get_pet_id_by_name( self, name, household_id = None ): + def get_pet_id_by_name(self, name, household_id=None): """ Returns the numeric ID (not the tag ID) of the pet by name. Match is case insensitive and the first pet found with that name is returned. Default household used if not specified. """ household_id = household_id or self.default_household - for pet_id, petdata in self.get_pets( household_id ).items(): - if petdata['name'].lower() == name.lower(): + for pet_id, petdata in self.get_pets(household_id).items(): + if petdata["name"].lower() == name.lower(): return pet_id - def get_pet_location( self, pet_id, household_id = None ): + def get_pet_location(self, pet_id, household_id=None): """ Returns one of enum LOC indicating last known movement of the pet. Default household used if not specified. @@ -198,9 +213,9 @@ def get_pet_location( self, pet_id, household_id = None ): household_id = household_id or self.default_household if pet_id not in self.all_pet_status[household_id]: raise SPAPIUnknownPet() - return self.all_pet_status[household_id][pet_id]['where'] + return self.all_pet_status[household_id][pet_id]["where"] - def get_lock_mode( self, flap_id = None, household_id = None ): + def get_lock_mode(self, flap_id=None, household_id=None): """ Returns one of enum LK_MOD indicating flap lock mode. Default household and flap used if not specified. @@ -208,61 +223,67 @@ def get_lock_mode( self, flap_id = None, household_id = None ): household_id = household_id or self.default_household household = self.households[household_id] if flap_id is None: - flap_id = household['default_flap'] - lock_data = self.all_flap_status[household_id][flap_id]['locking'] - if lock_data['mode'] == LK_MOD.CURFEW: - if lock_data['curfew']['locked']: + flap_id = household["default_flap"] + lock_data = self.all_flap_status[household_id][flap_id]["locking"] + if lock_data["mode"] == LK_MOD.CURFEW: + if lock_data["curfew"]["locked"]: return LK_MOD.CURFEW_LOCKED else: return LK_MOD.CURFEW_UNLOCKED - return lock_data['mode'] + return lock_data["mode"] # # Default household and device helpers # @property - def default_household( self ): + def default_household(self): "Get the default house ID." - return self.cache['default_household'] + return self.cache["default_household"] + @default_household.setter - def default_household( self, id ): + def default_household(self, id): "Set the default household ID." if self.__read_only: raise SPAPIReadOnly() - self.cache['default_household'] = id + self.cache["default_household"] = id + @property - def household( self ): + def household(self): "Return default household dict" return self.households[self.default_household] @property - def default_router( self ): + def default_router(self): "Returns the default router ID for the default household" return self.get_default_router() - def get_default_router( self, household_id = None ): + + def get_default_router(self, household_id=None): "Get the default router ID." household_id = household_id or self.default_household - return self.households[household_id]['default_router'] - def set_default_router( self, household_id, rid ): + return self.households[household_id]["default_router"] + + def set_default_router(self, household_id, rid): "Set the default router ID." if self.__read_only: raise SPAPIReadOnly() - self.households[household_id]['default_router'] = rid + self.households[household_id]["default_router"] = rid @property - def default_flap( self ): + def default_flap(self): "Returns the default flap ID for the default household" return self.get_default_flap() - def get_default_flap( self, household_id = None ): + + def get_default_flap(self, household_id=None): "Get the default flap ID." household_id = household_id or self.default_household - return self.households[household_id]['default_flap'] - def set_default_flap( self, household_id, flap_id ): + return self.households[household_id]["default_flap"] + + def set_default_flap(self, household_id, flap_id): "Set the default flap ID." if self.__read_only: raise SPAPIReadOnly() - self.households[household_id]['default_flap'] = flap_id + self.households[household_id]["default_flap"] = flap_id # # These properties return respective data for all households as a dict @@ -270,67 +291,77 @@ def set_default_flap( self, household_id, flap_id ): # @property - def households( self ): + def households(self): """ Return dict of households which include name, timezone information suitable for use with pytz and also pet info. NB: Indexed by household ID. """ - return self.cache['households'] + return self.cache["households"] + @households.setter - def households( self, data ): + def households(self, data): "Set household data." if self.__read_only: raise SPAPIReadOnly() - self.cache['households'] = data + self.cache["households"] = data @property - def router_status( self ): + def router_status(self): "Dict of all routers in default household" return self.all_router_status[self.default_household] + @property - def all_router_status( self, household_id = None ): + def all_router_status(self, household_id=None): "Dict of all routers indexed by household and router IDs" - return self.cache['router_status'] + return self.cache["router_status"] + @property - def flap_status( self ): + def flap_status(self): "Dict of all flaps for default household" return self.all_flap_status[self.default_household] + @property - def all_flap_status( self ): + def all_flap_status(self): "Dict of all flaps indexed by household and router IDs" - return self.cache['flap_status'] + return self.cache["flap_status"] + @property - def pet_status( self ): + def pet_status(self): "Dict of all pets for default household" return self.all_pet_status[self.default_household] + @property - def all_pet_status( self ): + def all_pet_status(self): "Dict of all pets indexed by household and pet IDs" - return self.cache['pet_status'] + return self.cache["pet_status"] + @property - def pet_timeline( self ): + def pet_timeline(self): "Pet events for default household (subset of house timeline)" return self.all_pet_timeline[self.default_household] + @property - def all_pet_timeline( self ): + def all_pet_timeline(self): "Dict of pet events indexed by household ID (subset of house timeline)" - return self.cache['pet_timeline'] + return self.cache["pet_timeline"] + @property - def house_timeline( self ): + def house_timeline(self): "Events for default household" return self.all_house_timeline[self.default_household] + @property - def all_house_timeline( self ): + def all_house_timeline(self): "Dict of household events indexed by household ID" - return self.cache['house_timeline'] + return self.cache["house_timeline"] # # Update methods. USE SPARINGLY! # - def update( self ): + def update(self): """ Update everything. Must be invoked once, but please, only once. Call the individual update methods according to your applications needs. @@ -341,34 +372,35 @@ def update( self ): self.update_pet_info() self.update_pet_status() self.update_flap_status() - #self.update_router_status() + # self.update_router_status() # XXX For now, router status contains little of interest and isn't worth # the API call. Call it explicitly if you need this (which you # should be doing anyway to save on bandwidth). self.update_timelines() - def update_authtoken( self, force = False ): + def update_authtoken(self, force=False): """ Update cache with authentication token if missing. Use `force = True` when the token expires (the API generally does this automatically). """ if self.__read_only: raise SPAPIReadOnly() - if self.cache['AuthToken'] is not None and not force: + if self.cache["AuthToken"] is not None and not force: return # Allow constructor to override email/pw should they change - data = {"email_address": self._init_email or self.cache['email'], - "password": self._init_pw or self.cache['pw'], - "device_id": self.device_id, - } - headers=self._create_header() + data = { + "email_address": self._init_email or self.cache["email"], + "password": self._init_pw or self.cache["pw"], + "device_id": self.device_id, + } + headers = self._create_header() response = self.s.post(_URL_AUTH, headers=headers, json=data) if response.status_code == 401: raise SPAPIAuthError() response_data = response.json() - self.cache['AuthToken'] = response_data['data']['token'] + self.cache["AuthToken"] = response_data["data"]["token"] - def update_households( self, force = False ): + def update_households(self, force=False): """ Update cache with info about the household(s) associated with the account. """ @@ -376,26 +408,30 @@ def update_households( self, force = False ): raise SPAPIReadOnly() if self.households is not None and not force: return - params = ( # XXX Could we merge update_households() with update_pet_info()? - ('with[]', ['household', 'timezone',],), #'pet', + params = ( # XXX Could we merge update_households() with update_pet_info()? + ("with[]", ["household", "timezone"]), #'pet', + ) + headers = self._create_header() + response_household = self._api_get( + _URL_HOUSEHOLD, headers=headers, params=params ) - headers=self._create_header() - response_household = self._api_get(_URL_HOUSEHOLD, headers=headers, params=params) response_household = response_household.json() self.households = { - x['id']: {'name': x['name'], - 'olson_tz': x['timezone']['timezone'], - 'utc_offset': x['timezone']['utc_offset'], - 'default_router': None, - 'default_flap': None, - } for x in response_household['data'] + x["id"]: { + "name": x["name"], + "olson_tz": x["timezone"]["timezone"], + "utc_offset": x["timezone"]["utc_offset"], + "default_router": None, + "default_flap": None, } + for x in response_household["data"] + } # default_household may have been set by the constructor, but override # it anyway if the specified ID wasn't in the data just fetched. if self.default_household not in self.households: - self.default_household = response_household['data'][0]['id'] + self.default_household = response_household["data"][0]["id"] - def update_device_ids( self, household_id = None, force = False ): + def update_device_ids(self, household_id=None, force=False): """ Update cache with list of router and flap IDs for each household. The default router and flap are set to the first ones found. @@ -404,25 +440,26 @@ def update_device_ids( self, household_id = None, force = False ): raise SPAPIReadOnly() household_id = household_id or self.default_household household = self.households[household_id] - if (household['default_router'] is not None and - household['default_flap'] is not None and not force): + if ( + household["default_router"] is not None + and household["default_flap"] is not None + and not force + ): return - params = ( - ('with[]', 'children'), - ) - routers = household['routers'] = {} - flaps = household['flaps'] = {} - url = '%s/%s/device' % (_URL_HOUSEHOLD, household_id,) + params = (("with[]", "children"),) + routers = household["routers"] = {} + flaps = household["flaps"] = {} + url = "%s/%s/device" % (_URL_HOUSEHOLD, household_id) response_children = self._get_data(url, params) - for device in response_children['data']: - if device['product_id'] in (PROD_ID.PET_FLAP, PROD_ID.CAT_FLAP): # Catflap - flaps[device['id']] = device['name'] - elif device['product_id'] == PROD_ID.ROUTER: # Router - routers[device['id']] = device['name'] - household['default_flap'] = list(flaps.keys())[0] - household['default_router'] = list(routers.keys())[0] + for device in response_children["data"]: + if device["product_id"] in (PROD_ID.PET_FLAP, PROD_ID.CAT_FLAP): # Catflap + flaps[device["id"]] = device["name"] + elif device["product_id"] == PROD_ID.ROUTER: # Router + routers[device["id"]] = device["name"] + household["default_flap"] = list(flaps.keys())[0] + household["default_router"] = list(routers.keys())[0] - def update_pet_info( self, household_id = None, force = False ): + def update_pet_info(self, household_id=None, force=False): """ Update cache with pet information. """ @@ -430,21 +467,21 @@ def update_pet_info( self, household_id = None, force = False ): raise SPAPIReadOnly() household_id = household_id or self.default_household household = self.households[household_id] - if household.get('pets') is not None and not force: + if household.get("pets") is not None and not force: return - params = ( - ('with[]', ['photo', 'tag']), - ) - url = '%s/%s/pet' % (_URL_HOUSEHOLD, household_id,) + params = (("with[]", ["photo", "tag"]),) + url = "%s/%s/pet" % (_URL_HOUSEHOLD, household_id) response_pets = self._get_data(url, params) - household['pets'] = { - x['id']: {'name': x['name'], - 'tag_id': x['tag_id'], - 'photo': x.get('photo', {}).get('location') - } for x in response_pets['data'] + household["pets"] = { + x["id"]: { + "name": x["name"], + "tag_id": x["tag_id"], + "photo": x.get("photo", {}).get("location"), } + for x in response_pets["data"] + } - def update_flap_status( self, household_id = None ): + def update_flap_status(self, household_id=None): """ Update flap status. Default household used if not specified. """ @@ -452,12 +489,14 @@ def update_flap_status( self, household_id = None ): raise SPAPIReadOnly() household_id = household_id or self.default_household household = self.households[household_id] - for flap_id in household['flaps']: - url = '%s/%s/status' % (_URL_DEV, flap_id,) + for flap_id in household["flaps"]: + url = "%s/%s/status" % (_URL_DEV, flap_id) response = self._get_data(url) - self.cache['flap_status'].setdefault( household_id, {} )[flap_id] = response['data'] + self.cache["flap_status"].setdefault(household_id, {})[flap_id] = response[ + "data" + ] - def update_router_status( self, household_id = None ): + def update_router_status(self, household_id=None): """ Update router status. Don't call unless you really need to because there's not much of interest here. Default household used if not @@ -467,12 +506,14 @@ def update_router_status( self, household_id = None ): raise SPAPIReadOnly() household_id = household_id or self.default_household household = self.households[household_id] - for rid in household['routers']: - url = '%s/%s/status' % (_URL_DEV, rid,) + for rid in household["routers"]: + url = "%s/%s/status" % (_URL_DEV, rid) response = self._get_data(url) - self.cache['router_status'].setdefault( household_id, {} )[rid] = response['data'] + self.cache["router_status"].setdefault(household_id, {})[rid] = response[ + "data" + ] - def update_timelines( self, household_id = None ): + def update_timelines(self, household_id=None): """ Update household event timeline and curfew lock status. Default household used if not specified. @@ -498,116 +539,127 @@ def update_timelines( self, household_id = None ): if self.__read_only: raise SPAPIReadOnly() household_id = household_id or self.default_household - params = ( - ('type', '0,3,6,7,12,13,14,17,19,20'), - ) - url = '%s/household/%s' % (_URL_TIMELINE, household_id,) + params = (("type", "0,3,6,7,12,13,14,17,19,20"),) + url = "%s/household/%s" % (_URL_TIMELINE, household_id) response = self._get_data(url, params) - self.cache['house_timeline'][household_id] = response['data'] + self.cache["house_timeline"][household_id] = response["data"] # Build per-pet timeline - tag_lut = {v['tag_id']: k for k, v in self.get_pets( household_id ).items()} - self.cache['pet_timeline'][household_id] = { - pet_id: [x for x in self.cache['house_timeline'][household_id] - if x['type'] == EVT.MOVE and tag_lut[x['movements'][0]['tag_id']] == pet_id] - for pet_id in self.get_pets( household_id ).keys() - } - + tag_lut = {v["tag_id"]: k for k, v in self.get_pets(household_id).items()} + self.cache["pet_timeline"][household_id] = { + pet_id: [ + x + for x in self.cache["house_timeline"][household_id] + if x["type"] == EVT.MOVE + and tag_lut[x["movements"][0]["tag_id"]] == pet_id + ] + for pet_id in self.get_pets(household_id).keys() + } - def update_pet_status( self, household_id = None ): + def update_pet_status(self, household_id=None): """ Update pet status. Default household used if not specified. """ if self.__read_only: raise SPAPIReadOnly() household_id = household_id or self.default_household - self.cache['pet_status'][household_id] = {} - for pet_id in self.get_pets( household_id ): - url = '%s/%s/position' % (_URL_PET, pet_id,) + self.cache["pet_status"][household_id] = {} + for pet_id in self.get_pets(household_id): + url = "%s/%s/position" % (_URL_PET, pet_id) headers = self._create_header() response = self._get_data(url) - self.cache['pet_status'][household_id][pet_id] = response['data'] + self.cache["pet_status"][household_id][pet_id] = response["data"] # # Low level remote API wrappers. Do not use. # - def _get_data( self, url, params = None ): + def _get_data(self, url, params=None): if self.__read_only: raise SPAPIReadOnly() headers = None if url in self.cache: - time_since_last = datetime.now(timezone.utc) - self.cache[url]['ts'] + time_since_last = datetime.now(timezone.utc) - self.cache[url]["ts"] # Return cached data unless older than hard rate limit if time_since_last.total_seconds() > _HARD_RATE_LIMIT: - headers = self._create_header(ETag=self.cache[url]['ETag']) + headers = self._create_header(ETag=self.cache[url]["ETag"]) else: headers = self._create_header() if headers is not None: response = self._api_get(url, headers=headers, params=params) - if response.status_code in [304, 404, 500, 502, 503, 504,]: + if response.status_code in [304, 404, 500, 502, 503, 504]: # Used cached data in event of (respectively), not modified, # server error, server overload, server unavailable and gateway # timeout. Doesn't cope with such events absent cached data, # but hopefully that is sufficiently rare not to bother with. if response.status_code == 404: - raise IndexError( url ) + raise IndexError(url) if response.status_code == 304: # Can only get here if there is a cached response - self.cache[url]['ts'] = datetime.now(timezone.utc) - return self.cache[url]['LastData'] + self.cache[url]["ts"] = datetime.now(timezone.utc) + return self.cache[url]["LastData"] self.cache[url] = { - 'LastData': response.json(), - 'ETag': response.headers['ETag'].strip( '"' ), - 'ts': datetime.now(timezone.utc), - } - return self.cache[url]['LastData'] - - def _create_header( self, ETag = None ): - headers={ - 'Connection': 'keep-alive', - 'Accept': 'application/json, text/plain, */*', - 'Origin': 'https://surepetcare.io', - 'User-Agent': API_USER_AGENT, - 'Referer': 'https://surepetcare.io/', - 'Accept-Encoding': 'gzip, deflate', - 'Accept-Language': 'en-US,en-GB;q=0.9', - 'X-Requested-With': 'com.sureflap.surepetcare', + "LastData": response.json(), + "ETag": response.headers["ETag"].strip('"'), + "ts": datetime.now(timezone.utc), + } + return self.cache[url]["LastData"] + + def _create_header(self, ETag=None): + headers = { + "Connection": "keep-alive", + "Accept": "application/json, text/plain, */*", + "Origin": "https://surepetcare.io", + "User-Agent": API_USER_AGENT, + "Referer": "https://surepetcare.io/", + "Accept-Encoding": "gzip, deflate", + "Accept-Language": "en-US,en-GB;q=0.9", + "X-Requested-With": "com.sureflap.surepetcare", } - if self.cache['AuthToken'] is not None: - headers['Authorization']='Bearer ' + self.cache['AuthToken'] + if self.cache["AuthToken"] is not None: + headers["Authorization"] = "Bearer " + self.cache["AuthToken"] if ETag is not None: - headers['If-None-Match'] = ETag + headers["If-None-Match"] = ETag return headers - def _api_get( self, url, *args, **kwargs ): - r = self.s.get( url, *args, **kwargs ) + def _api_get(self, url, *args, **kwargs): + r = self.s.get(url, *args, **kwargs) if r.status_code == 401: # Retry once - self.update_authtoken( force = True ) - if 'headers' in kwargs and 'Authorization' in kwargs['headers']: - kwargs['headers']['Authorization']='Bearer ' + self.cache['AuthToken'] - r = self.s.get( url, *args, **kwargs ) + self.update_authtoken(force=True) + if "headers" in kwargs and "Authorization" in kwargs["headers"]: + kwargs["headers"]["Authorization"] = "Bearer " + self.cache["AuthToken"] + r = self.s.get(url, *args, **kwargs) else: - raise SPAPIException( 'Auth required but not present in header' ) + raise SPAPIException("Auth required but not present in header") return r - def _log_req( self, r, *args, **kwargs ): + def _log_req(self, r, *args, **kwargs): """ Debugging aid: print network requests """ - l = len( '\n'.join( ': '.join(x) for x in r.headers.items() ) ) + l = len("\n".join(": ".join(x) for x in r.headers.items())) l += len(r.content) self.req_count += 1 self.req_rx_bytes += l - print( 'requests: %s %s -> %s (%0.3f kiB, total %0.3f kiB in %s requests)' % (r.request.method, r.request.url, r.status_code, l/1024.0, self.req_rx_bytes/1024.0, self.req_count,) ) + print( + "requests: %s %s -> %s (%0.3f kiB, total %0.3f kiB in %s requests)" + % ( + r.request.method, + r.request.url, + r.status_code, + l / 1024.0, + self.req_rx_bytes / 1024.0, + self.req_count, + ) + ) # # Cache management # - def _load_cache( self ): + def _load_cache(self): """ Read cache file. The cache is written by the context `__exit__()` method. @@ -615,44 +667,44 @@ def _load_cache( self ): # Cache locking is done by the context manager methods. default_cache = { - 'AuthToken': None, - 'households': None, - 'default_household': self._init_default_household, - 'router_status': {}, # indexed by household - 'flap_status': {}, # indexed by household - 'pet_status': {}, # indexed by household - 'pet_timeline': {}, # indexed by household - 'house_timeline': {}, # indexed by household - 'version': CACHE_VERSION # of cache structure. + "AuthToken": None, + "households": None, + "default_household": self._init_default_household, + "router_status": {}, # indexed by household + "flap_status": {}, # indexed by household + "pet_status": {}, # indexed by household + "pet_timeline": {}, # indexed by household + "house_timeline": {}, # indexed by household + "version": CACHE_VERSION, # of cache structure. } try: - with open( self.cache_file, 'rb' ) as f: - self.cache = pickle.load( f ) - except (pickle.PickleError, OSError,): + with open(self.cache_file, "rb") as f: + self.cache = pickle.load(f) + except (pickle.PickleError, OSError): self.cache = default_cache - if self.cache['version'] != CACHE_VERSION: + if self.cache["version"] != CACHE_VERSION: # reset cache, but try to preserve auth credentials - auth_token = self.cache.get('AuthToken') + auth_token = self.cache.get("AuthToken") self.cache = default_cache - self.cache['AuthToken'] = auth_token + self.cache["AuthToken"] = auth_token - def __enter__( self ): + def __enter__(self): """ Entering context unlocks the cache for modification and update. """ - if os.path.exists( self.cache_lockfile ): + if os.path.exists(self.cache_lockfile): raise SPAPICacheLocked() else: # Yeah, there are better ways of doing this, but I don't want to # add to the API's dependencies for the sake of compatibility. - with open( self.cache_lockfile, 'w' ) as lf: + with open(self.cache_lockfile, "w") as lf: # Conveniently, this also tests that the cache file location # is writeable. - lf.write( str(os.getpid()) ) + lf.write(str(os.getpid())) # Check to make sure that we didn't get gazumped - with open( self.cache_lockfile, 'r' ) as lf: + with open(self.cache_lockfile, "r") as lf: if int(lf.read()) != os.getpid(): raise SPAPICacheLocked() # We've got a solid lock. Hopefully. @@ -660,53 +712,63 @@ def __enter__( self ): self.__read_only = False return self - def __exit__( self, exc_type, exc_value, traceback ): + def __exit__(self, exc_type, exc_value, traceback): """ Exiting context locks the cache to prevent further modification and also flushes the cache to disc. """ self.__read_only = True - with open( self.cache_file, 'wb' ) as f: - pickle.dump( self.cache, f ) - os.remove( self.cache_lockfile ) + with open(self.cache_file, "wb") as f: + pickle.dump(self.cache, f) + os.remove(self.cache_lockfile) -class SurePetFlapMixin( object ): +class SurePetFlapMixin(object): """ A mixin that implements introspection of data collected by SurePetFlapAPI. """ - def print_timeline( self, pet_id = None, name = None, entry_type = None, household_id = None ): + def print_timeline( + self, pet_id=None, name=None, entry_type=None, household_id=None + ): """ Print timeline for a particular pet, specify entry_type to only get one direction. Default household is used if not specified. """ household_id = household_id or self.default_household if pet_id is None and name is None: - raise ValueError('Please define pet_id or name') + raise ValueError("Please define pet_id or name") if pet_id is None: pet_id = self.get_pet_id_by_name(name) - pet_id=int(pet_id) + pet_id = int(pet_id) try: - tag_id = self.household['pets'][pet_id]['tag_id'] - pet_name = self.household['pets'][pet_id]['name'] + tag_id = self.household["pets"][pet_id]["tag_id"] + pet_name = self.household["pets"][pet_id]["name"] except KeyError as e: - raise SPAPIUnknownPet( str(e) ) + raise SPAPIUnknownPet(str(e)) petdata = self.all_pet_timeline[household_id][pet_id] for movement in petdata: try: if entry_type is not None: - if movement['movements'][0]['tag_id'] == tag_id: - if movement['movements'][0]['direction'] == entry_type: - print(movement['movements'][0]['created_at'], pet_name, DIRECTION[movement['movements'][0]['direction']]) + if movement["movements"][0]["tag_id"] == tag_id: + if movement["movements"][0]["direction"] == entry_type: + print( + movement["movements"][0]["created_at"], + pet_name, + DIRECTION[movement["movements"][0]["direction"]], + ) else: - if movement['movements'][0]['tag_id'] == tag_id: - print(movement['movements'][0]['created_at'], pet_name, DIRECTION[movement['movements'][0]['direction']]) + if movement["movements"][0]["tag_id"] == tag_id: + print( + movement["movements"][0]["created_at"], + pet_name, + DIRECTION[movement["movements"][0]["direction"]], + ) except Exception as e: print(e) - def locked( self, flap_id = None, household_id = None ): + def locked(self, flap_id=None, household_id=None): """ Return whether door is locked or not. Default household and flap used if not specified. @@ -714,37 +776,41 @@ def locked( self, flap_id = None, household_id = None ): household_id = household_id or self.default_household household = self.households[household_id] if flap_id is None: - flap_id = household['default_flap'] - lock_data = self.all_flap_status[household_id][flap_id]['locking'] - if lock_data['mode'] == LK_MOD.UNLOCKED: + flap_id = household["default_flap"] + lock_data = self.all_flap_status[household_id][flap_id]["locking"] + if lock_data["mode"] == LK_MOD.UNLOCKED: return False - if lock_data['mode'] in [LK_MOD.LOCKED_IN, LK_MOD.LOCKED_OUT, LK_MOD.LOCKED_ALL,]: + if lock_data["mode"] in [ + LK_MOD.LOCKED_IN, + LK_MOD.LOCKED_OUT, + LK_MOD.LOCKED_ALL, + ]: return True - if lock_data['mode'] == LK_MOD.CURFEW: - return lock_data['curfew']['locked'] + if lock_data["mode"] == LK_MOD.CURFEW: + return lock_data["curfew"]["locked"] - def lock_mode( self, flap_id = None, household_id = None ): + def lock_mode(self, flap_id=None, household_id=None): """ Returns a string describing the flap lock mode. Default household and flap used if not specified. """ - lock = self.get_lock_mode( flap_id, household_id ) + lock = self.get_lock_mode(flap_id, household_id) if lock == LK_MOD.UNLOCKED: - return 'Unlocked' + return "Unlocked" elif lock == LK_MOD.LOCKED_IN: - return 'Keep pets in' + return "Keep pets in" elif lock == LK_MOD.LOCKED_OUT: - return 'Keep pets out' + return "Keep pets out" elif lock == LK_MOD.LOCKED_ALL: - return 'Locked' + return "Locked" elif lock == LK_MOD.CURFEW_UNKNOWN: - return 'Curfew enabled but state unknown' + return "Curfew enabled but state unknown" elif lock == LK_MOD.CURFEW_LOCKED: - return 'Locked with curfew' + return "Locked with curfew" elif lock == LK_MOD.CURFEW_UNLOCKED: - return 'Unlocked with curfew' + return "Unlocked with curfew" - def get_current_status( self, pet_id = None, name = None, household_id = None ): + def get_current_status(self, pet_id=None, name=None, household_id=None): """ Returns a string describing the last known movement of the pet. @@ -754,19 +820,19 @@ def get_current_status( self, pet_id = None, name = None, household_id = None ): presumably applies to the official website and app. """ if pet_id is None and name is None: - raise ValueError('Please define pet_id or name') + raise ValueError("Please define pet_id or name") if pet_id is None: pet_id = self.get_pet_id_by_name(name) pet_id = int(pet_id) - loc = self.get_pet_location( pet_id, household_id ) + loc = self.get_pet_location(pet_id, household_id) if loc == LOC.UNKNOWN: - return 'Unknown' + return "Unknown" else: - #Get last update + # Get last update return INOUT_STATUS[loc] -class SurePetFlap( SurePetFlapMixin, SurePetFlapAPI ): +class SurePetFlap(SurePetFlapMixin, SurePetFlapAPI): """Class to take care of network communication with SurePet's products. See docstring for parent classes on how to use. In particular, **please** @@ -777,28 +843,29 @@ class SurePetFlap( SurePetFlapMixin, SurePetFlapAPI ): serialisable as JSON. How you store and retrieve it is up to you. """ + pass -class SPAPIException( Exception ): +class SPAPIException(Exception): pass -class SPAPIReadOnly( SPAPIException ): +class SPAPIReadOnly(SPAPIException): pass -class SPAPICacheLocked( SPAPIException ): +class SPAPICacheLocked(SPAPIException): pass -class SPAPIAuthError( SPAPIException ): +class SPAPIAuthError(SPAPIException): pass -class SPAPIUnitialised( SPAPIException ): +class SPAPIUnitialised(SPAPIException): pass -class SPAPIUnknownPet( SPAPIException ): +class SPAPIUnknownPet(SPAPIException): pass diff --git a/sure_petcare/utils.py b/sure_petcare/utils.py index 6a41db2..7b6254f 100644 --- a/sure_petcare/utils.py +++ b/sure_petcare/utils.py @@ -1,6 +1,7 @@ import os -def mk_enum( name, kv ): + +def mk_enum(name, kv): """ Emulate enum types found in other languages. @@ -10,18 +11,25 @@ def mk_enum( name, kv ): strings. In the latter case, numeric values will automatically be assigned. """ if type(kv) is list: - kv = dict( zip( kv, range(len(kv)) ) ) + kv = dict(zip(kv, range(len(kv)))) class Cls(object): __class__ = name - def __init__( self, d ): + + def __init__(self, d): self._data = d - def __getattr__( self, name ): + + def __getattr__(self, name): return self._data[name] - def find( self, target ): - return ['%s.%s' % (self.__class__,k,) for k, v in self._data.items() if v == target] - return Cls( kv ) + def find(self, target): + return [ + "%s.%s" % (self.__class__, k) + for k, v in self._data.items() + if v == target + ] + + return Cls(kv) def getmac(): @@ -29,12 +37,12 @@ def getmac(): Hackish way to obtain an unspecified MAC address. """ mac = None - folders = os.listdir('/sys/class/net/') + folders = os.listdir("/sys/class/net/") for interface in folders: - if interface == 'lo': + if interface == "lo": continue try: - mac = open('/sys/class/net/'+interface+'/address').readline() + mac = open("/sys/class/net/" + interface + "/address").readline() # XXX What happens when multiple interfaces are found? Might # be better to break here to stop at the first MAC which, # on most/many systems, will be the first wired Ethernet @@ -43,13 +51,13 @@ def getmac(): except Exception as e: return None if mac is not None: - return mac.strip() #trim new line + return mac.strip() # trim new line def gen_device_id(): """ Generates a "unique" client device ID based on MAC address. """ - mac_dec = int( getmac().replace( ':', '').replace( '-', '' ), 16 ) + mac_dec = int(getmac().replace(":", "").replace("-", ""), 16) # Use low order bits because upper two octets are low entropy return str(mac_dec)[-10:]