Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce common library for all tools #500

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions tools/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Common tools methods
"""Methods for aioshelly cmdline tools."""
from __future__ import annotations

from datetime import datetime
from typing import Any, cast

import aiohttp

from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType
from aioshelly.common import ConnectionOptions, get_info
from aioshelly.const import BLOCK_GENERATIONS, MODEL_NAMES, RPC_GENERATIONS
from aioshelly.exceptions import InvalidAuthError, ShellyError
from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer

coap_context = COAP()
ws_context = WsServer()


async def create_device(
aiohttp_session: aiohttp.ClientSession,
options: ConnectionOptions,
init: bool,
gen: int | None,
) -> Any:
"""Create a Gen1/Gen2/Gen3 device."""
if gen is None:
if info := await get_info(aiohttp_session, options.ip_address):
gen = info.get("gen", 1)
else:
raise ShellyError("Unknown Gen")

if gen in BLOCK_GENERATIONS:
return await BlockDevice.create(aiohttp_session, coap_context, options, init)

if gen in RPC_GENERATIONS:
return await RpcDevice.create(aiohttp_session, ws_context, options, init)

raise ShellyError("Unknown Gen")


async def connect_and_print_device(
aiohttp_session: aiohttp.ClientSession,
options: ConnectionOptions,
init: bool,
gen: int | None,
) -> None:
"""Connect and print device data."""
device = await create_device(aiohttp_session, options, init, gen)
print_device(device)
device.subscribe_updates(device_updated)


def device_updated(
cb_device: BlockDevice | RpcDevice,
update_type: BlockUpdateType | RpcUpdateType,
) -> None:
"""Device updated callback."""
print()
print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})")
try:
print_device(cb_device)
except InvalidAuthError:
print("Invalid or missing authorization (from async init)")


def print_device(device: BlockDevice | RpcDevice) -> None:
"""Print device data."""
if not device.initialized:
print()
print(f"** Device @ {device.ip_address} not initialized **")
print()
return

model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})"
print(f"** {device.name} - {model_name} @ {device.ip_address} **")
print()

if device.gen in BLOCK_GENERATIONS:
print_block_device(cast(BlockDevice, device))
elif device.gen in RPC_GENERATIONS:
print_rpc_device(cast(RpcDevice, device))


def print_block_device(device: BlockDevice) -> None:
"""Print block (GEN1) device data."""
assert device.blocks

for block in device.blocks:
print(block)
for attr, value in block.current_values().items():
info = block.info(attr)

if value is None:
value = "-"

if BLOCK_VALUE_UNIT in info:
unit = " " + info[BLOCK_VALUE_UNIT]
else:
unit = ""

print(f"{attr.ljust(16)}{value}{unit}")
print()


def print_rpc_device(device: RpcDevice) -> None:
"""Print RPC (GEN2/3) device data."""
print(f"Status: {device.status}")
print(f"Event: {device.event}")
print(f"Connected: {device.connected}")


async def update_outbound_ws(
options: ConnectionOptions, init: bool, ws_url: str
) -> None:
"""Update outbound WebSocket URL (Gen2/3)."""
async with aiohttp.ClientSession() as aiohttp_session:
device: RpcDevice = await create_device(aiohttp_session, options, init, 2)
print(f"Updating outbound weboskcet URL to {ws_url}")
print(f"Restart required: {await device.update_outbound_websocket(ws_url)}")
129 changes: 13 additions & 116 deletions tools/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,27 @@
import signal
import sys
import traceback
from datetime import datetime
from types import FrameType
from typing import Any, cast

import aiohttp

import aioshelly
from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType
from aioshelly.common import ConnectionOptions
from aioshelly.const import BLOCK_GENERATIONS, MODEL_NAMES, RPC_GENERATIONS, WS_API_URL
from aioshelly.const import WS_API_URL
from aioshelly.exceptions import (
DeviceConnectionError,
FirmwareUnsupported,
InvalidAuthError,
MacAddressMismatchError,
ShellyError,
WrongShellyGen,
)
from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer

coap_context = COAP()
ws_context = WsServer()


async def create_device(
aiohttp_session: aiohttp.ClientSession,
options: ConnectionOptions,
init: bool,
gen: int | None,
) -> Any:
"""Create a Gen1/Gen2/Gen3 device."""
if gen is None:
if info := await aioshelly.common.get_info(aiohttp_session, options.ip_address):
gen = info.get("gen", 1)
else:
raise ShellyError("Unknown Gen")

if gen in BLOCK_GENERATIONS:
return await BlockDevice.create(aiohttp_session, coap_context, options, init)

if gen in RPC_GENERATIONS:
return await RpcDevice.create(aiohttp_session, ws_context, options, init)

raise ShellyError("Unknown Gen")
from tools.common import (
coap_context,
connect_and_print_device,
create_device,
device_updated,
print_device,
update_outbound_ws,
ws_context,
)


async def test_single(options: ConnectionOptions, init: bool, gen: int | None) -> None:
Expand Down Expand Up @@ -128,76 +105,6 @@ async def test_devices(init: bool, gen: int | None) -> None:
await asyncio.sleep(0.1)


async def connect_and_print_device(
aiohttp_session: aiohttp.ClientSession,
options: ConnectionOptions,
init: bool,
gen: int | None,
) -> None:
"""Connect and print device data."""
device = await create_device(aiohttp_session, options, init, gen)
print_device(device)
device.subscribe_updates(device_updated)


def device_updated(
cb_device: BlockDevice | RpcDevice, update_type: BlockUpdateType | RpcUpdateType
) -> None:
"""Device updated callback."""
print()
print(f"{datetime.now().strftime('%H:%M:%S')} Device updated! ({update_type})")
try:
print_device(cb_device)
except InvalidAuthError:
print("Invalid or missing authorization (from async init)")


def print_device(device: BlockDevice | RpcDevice) -> None:
"""Print device data."""
if not device.initialized:
print()
print(f"** Device @ {device.ip_address} not initialized **")
print()
return

model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})"
print(f"** {device.name} - {model_name} @ {device.ip_address} **")
print()

if device.gen in BLOCK_GENERATIONS:
print_block_device(cast(BlockDevice, device))
elif device.gen in RPC_GENERATIONS:
print_rpc_device(cast(RpcDevice, device))


def print_block_device(device: BlockDevice) -> None:
"""Print block (GEN1) device data."""
assert device.blocks

for block in device.blocks:
print(block)
for attr, value in block.current_values().items():
info = block.info(attr)

if value is None:
value = "-"

if BLOCK_VALUE_UNIT in info:
unit = " " + info[BLOCK_VALUE_UNIT]
else:
unit = ""

print(f"{attr.ljust(16)}{value}{unit}")
print()


def print_rpc_device(device: RpcDevice) -> None:
"""Print RPC (GEN2/3) device data."""
print(f"Status: {device.status}")
print(f"Event: {device.event}")
print(f"Connected: {device.connected}")


def get_arguments() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
"""Get parsed passed in arguments."""
parser = argparse.ArgumentParser(description="aioshelly example")
Expand Down Expand Up @@ -265,16 +172,6 @@ def get_arguments() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
return parser, arguments


async def update_outbound_ws(
options: ConnectionOptions, init: bool, ws_url: str
) -> None:
"""Update outbound WebSocket URL (Gen2/3)."""
async with aiohttp.ClientSession() as aiohttp_session:
device: RpcDevice = await create_device(aiohttp_session, options, init, 2)
print(f"Updating outbound weboskcet URL to {ws_url}")
print(f"Restart required: {await device.update_outbound_websocket(ws_url)}")


async def main() -> None:
"""Run main."""
parser, args = get_arguments()
Expand Down Expand Up @@ -302,13 +199,13 @@ async def main() -> None:
if args.debug:
logging.basicConfig(level="DEBUG", force=True)

def handle_sigint(_exit_code: int, _frame: FrameType) -> None:
def handle_sigint(_exit_code: int) -> None:
"""Handle Keyboard signal interrupt (ctrl-c)."""
coap_context.close()
ws_context.close()
sys.exit()
sys.exit(_exit_code)

signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGINT, handle_sigint) # type: ignore [func-returns-value]

if args.devices:
await test_devices(args.init, gen)
Expand Down