From 7cfd363e0fc7dff7b1978d222381cf5f999b05ea Mon Sep 17 00:00:00 2001 From: Simone Chemelli Date: Fri, 16 Feb 2024 20:52:27 +0000 Subject: [PATCH 1/2] Introduce common class for all tools --- tools/common.py | 127 +++++++++++++++++++++++++++++++++++++++++ tools/example.py | 145 +++++++---------------------------------------- 2 files changed, 147 insertions(+), 125 deletions(-) create mode 100644 tools/common.py diff --git a/tools/common.py b/tools/common.py new file mode 100644 index 00000000..12e231b4 --- /dev/null +++ b/tools/common.py @@ -0,0 +1,127 @@ +# Common tools class +"""Class for aioshelly cmdline tools.""" +from __future__ import annotations + +from datetime import datetime +from typing import Any, cast + +import aiohttp + +import aioshelly +from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType +from aioshelly.common import ConnectionOptions +from aioshelly.const import BLOCK_GENERATIONS, MODEL_NAMES, RPC_GENERATIONS +from aioshelly.exceptions import InvalidAuthError, ShellyError +from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer + +coap_context = COAP() +ws_context = WsServer() + + +class CommonTools: + """Common method for building tools.""" + + async def create_device( + self, + aiohttp_session: aiohttp.ClientSession, + options: ConnectionOptions, + init: bool, + gen: int | None, + ) -> Any: + """Create a Gen1/Gen2/Gen3 device.""" + if gen is None: + if info := await aioshelly.common.get_info( + aiohttp_session, options.ip_address + ): + gen = info.get("gen", 1) + else: + raise ShellyError("Unknown Gen") + + if gen in BLOCK_GENERATIONS: + return await BlockDevice.create( + aiohttp_session, coap_context, options, init + ) + + if gen in RPC_GENERATIONS: + return await RpcDevice.create(aiohttp_session, ws_context, options, init) + + raise ShellyError("Unknown Gen") + + async def connect_and_print_device( + self, + aiohttp_session: aiohttp.ClientSession, + options: ConnectionOptions, + init: bool, + gen: int | None, + ) -> None: + """Connect and print device data.""" + device = await self.create_device(aiohttp_session, options, init, gen) + self.print_device(device) + device.subscribe_updates(self.device_updated) + + def device_updated( + self, + cb_device: BlockDevice | RpcDevice, + update_type: BlockUpdateType | RpcUpdateType, + ) -> None: + """Device updated callback.""" + print() + print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})") + try: + self.print_device(cb_device) + except InvalidAuthError: + print("Invalid or missing authorization (from async init)") + + def print_device(self, device: BlockDevice | RpcDevice) -> None: + """Print device data.""" + if not device.initialized: + print() + print(f"** Device @ {device.ip_address} not initialized **") + print() + return + + model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})" + print(f"** {device.name} - {model_name} @ {device.ip_address} **") + print() + + if device.gen in BLOCK_GENERATIONS: + self.print_block_device(cast(BlockDevice, device)) + elif device.gen in RPC_GENERATIONS: + self.print_rpc_device(cast(RpcDevice, device)) + + def print_block_device(self, device: BlockDevice) -> None: + """Print block (GEN1) device data.""" + assert device.blocks + + for block in device.blocks: + print(block) + for attr, value in block.current_values().items(): + info = block.info(attr) + + if value is None: + value = "-" + + if BLOCK_VALUE_UNIT in info: + unit = " " + info[BLOCK_VALUE_UNIT] + else: + unit = "" + + print(f"{attr.ljust(16)}{value}{unit}") + print() + + def print_rpc_device(self, device: RpcDevice) -> None: + """Print RPC (GEN2/3) device data.""" + print(f"Status: {device.status}") + print(f"Event: {device.event}") + print(f"Connected: {device.connected}") + + async def update_outbound_ws( + self, options: ConnectionOptions, init: bool, ws_url: str + ) -> None: + """Update outbound WebSocket URL (Gen2/3).""" + async with aiohttp.ClientSession() as aiohttp_session: + device: RpcDevice = await self.create_device( + aiohttp_session, options, init, 2 + ) + print(f"Updating outbound weboskcet URL to {ws_url}") + print(f"Restart required: {await device.update_outbound_websocket(ws_url)}") diff --git a/tools/example.py b/tools/example.py index 883bdb52..9b1f12f8 100644 --- a/tools/example.py +++ b/tools/example.py @@ -9,57 +9,28 @@ import signal import sys import traceback -from datetime import datetime -from types import FrameType -from typing import Any, cast import aiohttp -import aioshelly -from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType from aioshelly.common import ConnectionOptions -from aioshelly.const import BLOCK_GENERATIONS, MODEL_NAMES, RPC_GENERATIONS, WS_API_URL +from aioshelly.const import WS_API_URL from aioshelly.exceptions import ( DeviceConnectionError, FirmwareUnsupported, InvalidAuthError, MacAddressMismatchError, - ShellyError, WrongShellyGen, ) -from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer - -coap_context = COAP() -ws_context = WsServer() - - -async def create_device( - aiohttp_session: aiohttp.ClientSession, - options: ConnectionOptions, - init: bool, - gen: int | None, -) -> Any: - """Create a Gen1/Gen2/Gen3 device.""" - if gen is None: - if info := await aioshelly.common.get_info(aiohttp_session, options.ip_address): - gen = info.get("gen", 1) - else: - raise ShellyError("Unknown Gen") - - if gen in BLOCK_GENERATIONS: - return await BlockDevice.create(aiohttp_session, coap_context, options, init) - - if gen in RPC_GENERATIONS: - return await RpcDevice.create(aiohttp_session, ws_context, options, init) +from tools.common import CommonTools, coap_context, ws_context - raise ShellyError("Unknown Gen") - -async def test_single(options: ConnectionOptions, init: bool, gen: int | None) -> None: +async def test_single( + common: CommonTools, options: ConnectionOptions, init: bool, gen: int | None +) -> None: """Test single device.""" async with aiohttp.ClientSession() as aiohttp_session: try: - device = await create_device(aiohttp_session, options, init, gen) + device = await common.create_device(aiohttp_session, options, init, gen) except FirmwareUnsupported as err: print(f"Device firmware not supported, error: {repr(err)}") return @@ -76,15 +47,15 @@ async def test_single(options: ConnectionOptions, init: bool, gen: int | None) - print(f"Wrong Shelly generation {gen}, device gen: {2 if gen==1 else 1}") return - print_device(device) + common.print_device(device) - device.subscribe_updates(device_updated) + device.subscribe_updates(common.device_updated) while True: await asyncio.sleep(0.1) -async def test_devices(init: bool, gen: int | None) -> None: +async def test_devices(common: CommonTools, init: bool, gen: int | None) -> None: """Test multiple devices.""" device_options = [] with open("devices.json", encoding="utf8") as fp: @@ -95,7 +66,9 @@ async def test_devices(init: bool, gen: int | None) -> None: results = await asyncio.gather( *[ asyncio.gather( - connect_and_print_device(aiohttp_session, options, init, gen), + common.connect_and_print_device( + aiohttp_session, options, init, gen + ), ) for options in device_options ], @@ -128,76 +101,6 @@ async def test_devices(init: bool, gen: int | None) -> None: await asyncio.sleep(0.1) -async def connect_and_print_device( - aiohttp_session: aiohttp.ClientSession, - options: ConnectionOptions, - init: bool, - gen: int | None, -) -> None: - """Connect and print device data.""" - device = await create_device(aiohttp_session, options, init, gen) - print_device(device) - device.subscribe_updates(device_updated) - - -def device_updated( - cb_device: BlockDevice | RpcDevice, update_type: BlockUpdateType | RpcUpdateType -) -> None: - """Device updated callback.""" - print() - print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})") - try: - print_device(cb_device) - except InvalidAuthError: - print("Invalid or missing authorization (from async init)") - - -def print_device(device: BlockDevice | RpcDevice) -> None: - """Print device data.""" - if not device.initialized: - print() - print(f"** Device @ {device.ip_address} not initialized **") - print() - return - - model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})" - print(f"** {device.name} - {model_name} @ {device.ip_address} **") - print() - - if device.gen in BLOCK_GENERATIONS: - print_block_device(cast(BlockDevice, device)) - elif device.gen in RPC_GENERATIONS: - print_rpc_device(cast(RpcDevice, device)) - - -def print_block_device(device: BlockDevice) -> None: - """Print block (GEN1) device data.""" - assert device.blocks - - for block in device.blocks: - print(block) - for attr, value in block.current_values().items(): - info = block.info(attr) - - if value is None: - value = "-" - - if BLOCK_VALUE_UNIT in info: - unit = " " + info[BLOCK_VALUE_UNIT] - else: - unit = "" - - print(f"{attr.ljust(16)}{value}{unit}") - print() - - -def print_rpc_device(device: RpcDevice) -> None: - """Print RPC (GEN2/3) device data.""" - print(f"Status: {device.status}") - print(f"Event: {device.event}") - print(f"Connected: {device.connected}") - - def get_arguments() -> tuple[argparse.ArgumentParser, argparse.Namespace]: """Get parsed passed in arguments.""" parser = argparse.ArgumentParser(description="aioshelly example") @@ -265,16 +168,6 @@ def get_arguments() -> tuple[argparse.ArgumentParser, argparse.Namespace]: return parser, arguments -async def update_outbound_ws( - options: ConnectionOptions, init: bool, ws_url: str -) -> None: - """Update outbound WebSocket URL (Gen2/3).""" - async with aiohttp.ClientSession() as aiohttp_session: - device: RpcDevice = await create_device(aiohttp_session, options, init, 2) - print(f"Updating outbound weboskcet URL to {ws_url}") - print(f"Restart required: {await device.update_outbound_websocket(ws_url)}") - - async def main() -> None: """Run main.""" parser, args = get_arguments() @@ -282,6 +175,8 @@ async def main() -> None: await coap_context.initialize(args.coap_port) await ws_context.initialize(args.ws_port, args.ws_api_url) + common = CommonTools() + if not args.init and not (args.gen1 or args.gen2 or args.gen3): parser.error("specify gen if no device init at startup") if args.gen1 and args.gen2: @@ -302,16 +197,16 @@ async def main() -> None: if args.debug: logging.basicConfig(level="DEBUG", force=True) - def handle_sigint(_exit_code: int, _frame: FrameType) -> None: + def handle_sigint(_exit_code: int) -> None: """Handle Keyboard signal interrupt (ctrl-c).""" coap_context.close() ws_context.close() - sys.exit() + sys.exit(_exit_code) - signal.signal(signal.SIGINT, handle_sigint) + signal.signal(signal.SIGINT, handle_sigint) # type: ignore [func-returns-value] if args.devices: - await test_devices(args.init, gen) + await test_devices(common, args.init, gen) elif args.ip_address: if args.username and args.password is None: parser.error("--username and --password must be used together") @@ -319,9 +214,9 @@ def handle_sigint(_exit_code: int, _frame: FrameType) -> None: args.ip_address, args.username, args.password, device_mac=args.mac ) if args.update_ws: - await update_outbound_ws(options, args.init, args.update_ws) + await common.update_outbound_ws(options, args.init, args.update_ws) else: - await test_single(options, args.init, gen) + await test_single(common, options, args.init, gen) else: parser.error("--ip_address or --devices must be specified") From 3becae4ecc51d0e361f056f0cb44b4dcec8392a1 Mon Sep 17 00:00:00 2001 From: Simone Chemelli Date: Sun, 18 Feb 2024 10:58:04 +0000 Subject: [PATCH 2/2] apply review comment --- tools/common.py | 209 +++++++++++++++++++++++------------------------ tools/example.py | 34 ++++---- 2 files changed, 119 insertions(+), 124 deletions(-) diff --git a/tools/common.py b/tools/common.py index 12e231b4..a78dec3c 100644 --- a/tools/common.py +++ b/tools/common.py @@ -1,5 +1,5 @@ -# Common tools class -"""Class for aioshelly cmdline tools.""" +# Common tools methods +"""Methods for aioshelly cmdline tools.""" from __future__ import annotations from datetime import datetime @@ -7,9 +7,8 @@ import aiohttp -import aioshelly from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType -from aioshelly.common import ConnectionOptions +from aioshelly.common import ConnectionOptions, get_info from aioshelly.const import BLOCK_GENERATIONS, MODEL_NAMES, RPC_GENERATIONS from aioshelly.exceptions import InvalidAuthError, ShellyError from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer @@ -18,110 +17,104 @@ ws_context = WsServer() -class CommonTools: - """Common method for building tools.""" - - async def create_device( - self, - aiohttp_session: aiohttp.ClientSession, - options: ConnectionOptions, - init: bool, - gen: int | None, - ) -> Any: - """Create a Gen1/Gen2/Gen3 device.""" - if gen is None: - if info := await aioshelly.common.get_info( - aiohttp_session, options.ip_address - ): - gen = info.get("gen", 1) - else: - raise ShellyError("Unknown Gen") - - if gen in BLOCK_GENERATIONS: - return await BlockDevice.create( - aiohttp_session, coap_context, options, init - ) - - if gen in RPC_GENERATIONS: - return await RpcDevice.create(aiohttp_session, ws_context, options, init) - - raise ShellyError("Unknown Gen") - - async def connect_and_print_device( - self, - aiohttp_session: aiohttp.ClientSession, - options: ConnectionOptions, - init: bool, - gen: int | None, - ) -> None: - """Connect and print device data.""" - device = await self.create_device(aiohttp_session, options, init, gen) - self.print_device(device) - device.subscribe_updates(self.device_updated) - - def device_updated( - self, - cb_device: BlockDevice | RpcDevice, - update_type: BlockUpdateType | RpcUpdateType, - ) -> None: - """Device updated callback.""" +async def create_device( + aiohttp_session: aiohttp.ClientSession, + options: ConnectionOptions, + init: bool, + gen: int | None, +) -> Any: + """Create a Gen1/Gen2/Gen3 device.""" + if gen is None: + if info := await get_info(aiohttp_session, options.ip_address): + gen = info.get("gen", 1) + else: + raise ShellyError("Unknown Gen") + + if gen in BLOCK_GENERATIONS: + return await BlockDevice.create(aiohttp_session, coap_context, options, init) + + if gen in RPC_GENERATIONS: + return await RpcDevice.create(aiohttp_session, ws_context, options, init) + + raise ShellyError("Unknown Gen") + + +async def connect_and_print_device( + aiohttp_session: aiohttp.ClientSession, + options: ConnectionOptions, + init: bool, + gen: int | None, +) -> None: + """Connect and print device data.""" + device = await create_device(aiohttp_session, options, init, gen) + print_device(device) + device.subscribe_updates(device_updated) + + +def device_updated( + cb_device: BlockDevice | RpcDevice, + update_type: BlockUpdateType | RpcUpdateType, +) -> None: + """Device updated callback.""" + print() + print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})") + try: + print_device(cb_device) + except InvalidAuthError: + print("Invalid or missing authorization (from async init)") + + +def print_device(device: BlockDevice | RpcDevice) -> None: + """Print device data.""" + if not device.initialized: + print() + print(f"** Device @ {device.ip_address} not initialized **") print() - print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})") - try: - self.print_device(cb_device) - except InvalidAuthError: - print("Invalid or missing authorization (from async init)") - - def print_device(self, device: BlockDevice | RpcDevice) -> None: - """Print device data.""" - if not device.initialized: - print() - print(f"** Device @ {device.ip_address} not initialized **") - print() - return - - model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})" - print(f"** {device.name} - {model_name} @ {device.ip_address} **") + return + + model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})" + print(f"** {device.name} - {model_name} @ {device.ip_address} **") + print() + + if device.gen in BLOCK_GENERATIONS: + print_block_device(cast(BlockDevice, device)) + elif device.gen in RPC_GENERATIONS: + print_rpc_device(cast(RpcDevice, device)) + + +def print_block_device(device: BlockDevice) -> None: + """Print block (GEN1) device data.""" + assert device.blocks + + for block in device.blocks: + print(block) + for attr, value in block.current_values().items(): + info = block.info(attr) + + if value is None: + value = "-" + + if BLOCK_VALUE_UNIT in info: + unit = " " + info[BLOCK_VALUE_UNIT] + else: + unit = "" + + print(f"{attr.ljust(16)}{value}{unit}") print() - if device.gen in BLOCK_GENERATIONS: - self.print_block_device(cast(BlockDevice, device)) - elif device.gen in RPC_GENERATIONS: - self.print_rpc_device(cast(RpcDevice, device)) - - def print_block_device(self, device: BlockDevice) -> None: - """Print block (GEN1) device data.""" - assert device.blocks - - for block in device.blocks: - print(block) - for attr, value in block.current_values().items(): - info = block.info(attr) - - if value is None: - value = "-" - - if BLOCK_VALUE_UNIT in info: - unit = " " + info[BLOCK_VALUE_UNIT] - else: - unit = "" - - print(f"{attr.ljust(16)}{value}{unit}") - print() - - def print_rpc_device(self, device: RpcDevice) -> None: - """Print RPC (GEN2/3) device data.""" - print(f"Status: {device.status}") - print(f"Event: {device.event}") - print(f"Connected: {device.connected}") - - async def update_outbound_ws( - self, options: ConnectionOptions, init: bool, ws_url: str - ) -> None: - """Update outbound WebSocket URL (Gen2/3).""" - async with aiohttp.ClientSession() as aiohttp_session: - device: RpcDevice = await self.create_device( - aiohttp_session, options, init, 2 - ) - print(f"Updating outbound weboskcet URL to {ws_url}") - print(f"Restart required: {await device.update_outbound_websocket(ws_url)}") + +def print_rpc_device(device: RpcDevice) -> None: + """Print RPC (GEN2/3) device data.""" + print(f"Status: {device.status}") + print(f"Event: {device.event}") + print(f"Connected: {device.connected}") + + +async def update_outbound_ws( + options: ConnectionOptions, init: bool, ws_url: str +) -> None: + """Update outbound WebSocket URL (Gen2/3).""" + async with aiohttp.ClientSession() as aiohttp_session: + device: RpcDevice = await create_device(aiohttp_session, options, init, 2) + print(f"Updating outbound weboskcet URL to {ws_url}") + print(f"Restart required: {await device.update_outbound_websocket(ws_url)}") diff --git a/tools/example.py b/tools/example.py index 9b1f12f8..832edd31 100644 --- a/tools/example.py +++ b/tools/example.py @@ -21,16 +21,22 @@ MacAddressMismatchError, WrongShellyGen, ) -from tools.common import CommonTools, coap_context, ws_context +from tools.common import ( + coap_context, + connect_and_print_device, + create_device, + device_updated, + print_device, + update_outbound_ws, + ws_context, +) -async def test_single( - common: CommonTools, options: ConnectionOptions, init: bool, gen: int | None -) -> None: +async def test_single(options: ConnectionOptions, init: bool, gen: int | None) -> None: """Test single device.""" async with aiohttp.ClientSession() as aiohttp_session: try: - device = await common.create_device(aiohttp_session, options, init, gen) + device = await create_device(aiohttp_session, options, init, gen) except FirmwareUnsupported as err: print(f"Device firmware not supported, error: {repr(err)}") return @@ -47,15 +53,15 @@ async def test_single( print(f"Wrong Shelly generation {gen}, device gen: {2 if gen==1 else 1}") return - common.print_device(device) + print_device(device) - device.subscribe_updates(common.device_updated) + device.subscribe_updates(device_updated) while True: await asyncio.sleep(0.1) -async def test_devices(common: CommonTools, init: bool, gen: int | None) -> None: +async def test_devices(init: bool, gen: int | None) -> None: """Test multiple devices.""" device_options = [] with open("devices.json", encoding="utf8") as fp: @@ -66,9 +72,7 @@ async def test_devices(common: CommonTools, init: bool, gen: int | None) -> None results = await asyncio.gather( *[ asyncio.gather( - common.connect_and_print_device( - aiohttp_session, options, init, gen - ), + connect_and_print_device(aiohttp_session, options, init, gen), ) for options in device_options ], @@ -175,8 +179,6 @@ async def main() -> None: await coap_context.initialize(args.coap_port) await ws_context.initialize(args.ws_port, args.ws_api_url) - common = CommonTools() - if not args.init and not (args.gen1 or args.gen2 or args.gen3): parser.error("specify gen if no device init at startup") if args.gen1 and args.gen2: @@ -206,7 +208,7 @@ def handle_sigint(_exit_code: int) -> None: signal.signal(signal.SIGINT, handle_sigint) # type: ignore [func-returns-value] if args.devices: - await test_devices(common, args.init, gen) + await test_devices(args.init, gen) elif args.ip_address: if args.username and args.password is None: parser.error("--username and --password must be used together") @@ -214,9 +216,9 @@ def handle_sigint(_exit_code: int) -> None: args.ip_address, args.username, args.password, device_mac=args.mac ) if args.update_ws: - await common.update_outbound_ws(options, args.init, args.update_ws) + await update_outbound_ws(options, args.init, args.update_ws) else: - await test_single(common, options, args.init, gen) + await test_single(options, args.init, gen) else: parser.error("--ip_address or --devices must be specified")