Skip to content

Commit

Permalink
Config flow change (#82)
Browse files Browse the repository at this point in the history
* add classes for parsing and validate config, handle setup file with only controller

* add hassio to after dependencies
  • Loading branch information
LoSk-p authored Dec 5, 2024
1 parent 472000e commit 4bcaacb
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 170 deletions.
204 changes: 43 additions & 161 deletions custom_components/robonomics/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,7 @@
ROBONOMICS_WSS_KUSAMA,
DOMAIN,
)
from .exceptions import (
CantConnectToIPFS,
ControllerNotInDevices,
InvalidSubAdminSeed,
InvalidSubOwnerAddress,
NoSubscription,
InvalidConfigPassword,
)
from .config_flow_helpers import ConfigFileParser, ConfigValidator
from .utils import to_thread

_LOGGER = logging.getLogger(__name__)
Expand All @@ -86,118 +79,19 @@
}
)

STEP_OWNER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_SUB_OWNER_ADDRESS): str,
}
)

STEP_WARN_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_WARN_DATA_SENDING): bool,
vol.Required(CONF_WARN_ACCOUNT_MANAGMENT): bool,
}
)

def get_network_ws(network_key: str) -> str:
if network_key == CONF_KUSAMA:
return ROBONOMICS_WSS_KUSAMA[0]
elif network_key == CONF_POLKADOT:
return ROBONOMICS_WSS_POLKADOT[0]

@to_thread
def _is_ipfs_local_connected() -> bool:
"""Check if IPFS local node is running and integration can connect
:return: True if integration can connect to the node, false otherwise
"""

try:
ipfshttpclient2.connect()
return True
except ipfshttpclient2.exceptions.ConnectionError:
return False


async def _has_sub_owner_subscription(
hass: HomeAssistant, sub_owner_address: str, network: str
) -> bool:
"""Check if controller account is in subscription devices
:param sub_owner_address: Subscription owner address
:return: True if ledger is not None, false otherwise
"""

rws = RWS(Account(remote_ws = get_network_ws(network)))
res = await hass.async_add_executor_job(rws.get_ledger, sub_owner_address)
if res is None:
return False
else:
return True


async def _is_sub_admin_in_subscription(
hass: HomeAssistant, controller_seed: str, sub_owner_address: str, network: str
) -> bool:
"""Check if controller account is in subscription devices
:param sub_admin_seed: Controller's seed
:param sub_owner_address: Subscription owner address
:return: True if controller account is in subscription devices, false otherwise
"""

rws = RWS(Account(controller_seed, crypto_type=KeypairType.ED25519, remote_ws = get_network_ws(network)))
res = await hass.async_add_executor_job(rws.is_in_sub, sub_owner_address)
return res


def _is_valid_sub_admin_seed(sub_admin_seed: str) -> Optional[ValueError]:
"""Check if provided controller seed is valid
:param sub_admin_seed: Controller's seed
"""

try:
Account(sub_admin_seed)
except Exception as e:
return e


def _is_valid_sub_owner_address(sub_owner_address: str) -> bool:
"""Check if provided subscription owner address is valid
:param sub_owner_address: Subscription owner address
:return: True if address is valid, false otherwise
"""

return is_valid_ss58_address(sub_owner_address, valid_ss58_format=32)


async def _validate_config(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows us to connect.
:param hass: HomeAssistant instance
:param data: dict with the keys from STEP_USER_DATA_SCHEMA and values provided by the user
"""

if data[CONF_ADMIN_SEED] is None:
raise InvalidConfigPassword
if await hass.async_add_executor_job(
_is_valid_sub_admin_seed, data[CONF_ADMIN_SEED]
):
raise InvalidSubAdminSeed
if not _is_valid_sub_owner_address(data[CONF_SUB_OWNER_ADDRESS]):
raise InvalidSubOwnerAddress
if not await _has_sub_owner_subscription(
hass, data[CONF_SUB_OWNER_ADDRESS], data[CONF_NETWORK]
):
raise NoSubscription
if not await _is_sub_admin_in_subscription(
hass, data[CONF_ADMIN_SEED], data[CONF_SUB_OWNER_ADDRESS], data[CONF_NETWORK]
):
raise ControllerNotInDevices
if not await _is_ipfs_local_connected():
raise CantConnectToIPFS

return {"title": "Robonomics"}


class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Robonomics Control."""
Expand Down Expand Up @@ -249,72 +143,60 @@ async def async_step_conf(
:return: Service functions from HomeAssistant
"""

self.updated_config = {}
self.config = {}
if user_input is None:
return self.async_show_form(
step_id="conf", data_schema=STEP_USER_DATA_SCHEMA
)
_LOGGER.debug(f"User data: {user_input}")
errors = {}
if CONF_CONFIG_FILE in user_input:
config = self._parse_config_file(
user_input[CONF_CONFIG_FILE], user_input[CONF_PASSWORD]
)
config[CONF_NETWORK] = user_input[CONF_NETWORK]
try:
self.config = await ConfigFileParser(self.hass, user_input[CONF_CONFIG_FILE], user_input[CONF_PASSWORD]).parse()
self.config[CONF_NETWORK] = user_input[CONF_NETWORK]
except Exception as e:
_LOGGER.error(f"Exception in file parse: {e}")
errors["base"] = ConfigValidator.get_error_key(e)
return self.async_show_form(
step_id="conf", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)
if not CONF_SUB_OWNER_ADDRESS in self.config:
return await self.async_step_owner()

try:
info = await _validate_config(self.hass, config)
except InvalidSubAdminSeed:
errors["base"] = "invalid_sub_admin_seed"
except InvalidSubOwnerAddress:
errors["base"] = "invalid_sub_owner_address"
except NoSubscription:
errors["base"] = "has_no_subscription"
except ControllerNotInDevices:
errors["base"] = "is_not_in_devices"
except CantConnectToIPFS:
errors["base"] = "can_connect_to_ipfs"
except InvalidConfigPassword:
errors["base"] = "wrong_password"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
await ConfigValidator(self.hass, self.config).validate()
except Exception as e:
_LOGGER.error(f"Exception in validation: {e}")
errors["base"] = ConfigValidator.get_error_key(e)
else:
return self.async_create_entry(title=info["title"], data=config)
return self.async_create_entry(title="Robonomics", data=self.config)
else:
errors["base"] = "file_not_found"

return self.async_show_form(
step_id="conf", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)

def _parse_config_file(self, config_file_id: str, password: str) -> dict:
with process_uploaded_file(self.hass, config_file_id) as f:
config_file_data = f.read_text(encoding="utf-8")
config_file_data = json.loads(config_file_data)
config = {}
try:
controller_kp = Keypair.create_from_encrypted_json(
json.loads(config_file_data.get("controllerkey")), password
async def async_step_owner(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
if user_input is None:
return self.async_show_form(
step_id="owner", data_schema=STEP_OWNER_DATA_SCHEMA
)
config[CONF_ADMIN_SEED] = f"0x{controller_kp.private_key.hex()}"
config[CONF_CONTROLLER_TYPE] = controller_kp.crypto_type
except CryptoError:
config[CONF_ADMIN_SEED] = None
config[CONF_CONTROLLER_TYPE] = None
config[CONF_SUB_OWNER_ADDRESS] = config_file_data.get("owner")
if config_file_data.get("pinatapublic") and config_file_data.get(
"pinataprivate"
):
config[CONF_PINATA_PUB] = config_file_data.get("pinatapublic")
config[CONF_PINATA_SECRET] = config_file_data.get("pinataprivate")
if config_file_data.get("ipfsurl"):
config[CONF_IPFS_GATEWAY] = config_file_data.get("ipfsurl")
config[CONF_IPFS_GATEWAY_PORT] = config_file_data.get("ipfsport") or 443
config[CONF_IPFS_GATEWAY_AUTH] = True
config[CONF_SENDING_TIMEOUT] = config_file_data.get("datalogtimeout")
_LOGGER.debug(f"Config: {config}")
return config
_LOGGER.debug(f"User data: {user_input}")
self.config[CONF_SUB_OWNER_ADDRESS] = user_input[CONF_SUB_OWNER_ADDRESS]
errors = {}
try:
await ConfigValidator(self.hass, self.config).validate()
except Exception as e:
errors["base"] = ConfigValidator.get_error_key(e)
else:
return self.async_create_entry(title="Robonomics", data=self.config)

return self.async_show_form(
step_id="conf", data_schema=STEP_OWNER_DATA_SCHEMA, errors=errors
)


class OptionsFlowHandler(config_entries.OptionsFlow):
Expand Down
2 changes: 2 additions & 0 deletions custom_components/robonomics/config_flow_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .file_parser import ConfigFileParser
from .validation import ConfigValidator
85 changes: 85 additions & 0 deletions custom_components/robonomics/config_flow_helpers/file_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import json
import logging
from nacl.exceptions import CryptoError

from substrateinterface import Keypair

from homeassistant.core import HomeAssistant
from homeassistant.components.file_upload import process_uploaded_file

from ..const import (
CONF_ADMIN_SEED,
CONF_IPFS_GATEWAY,
CONF_IPFS_GATEWAY_AUTH,
CONF_IPFS_GATEWAY_PORT,
CONF_PINATA_PUB,
CONF_PINATA_SECRET,
CONF_SENDING_TIMEOUT,
CONF_SUB_OWNER_ADDRESS,
CONF_CONTROLLER_TYPE,
)
from ..exceptions import (
InvalidConfigPassword,
InvalidConfigFormat,
)

_LOGGER = logging.getLogger(__name__)

class ConfigFileParser:
def __init__(self, hass: HomeAssistant, config_file_id: str, password: str) -> None:
self.hass: HomeAssistant = hass
self.file_id: str = config_file_id
self.password: str = password
self.config: dict = {}

async def parse(self) -> dict:
file_data = await self.hass.async_add_executor_job(self._load_file_data)
if not file_data:
raise InvalidConfigFormat
if "controllerkey" in file_data and "owner" in file_data:
if not self._decrypt_controller(file_data["controllerkey"]):
raise InvalidConfigPassword
self.config[CONF_SUB_OWNER_ADDRESS] = file_data["owner"]
elif "encoded" in file_data:
if not self._decrypt_controller(file_data):
raise InvalidConfigPassword
self._fill_gateways_fields(file_data)
self.config[CONF_SENDING_TIMEOUT] = file_data.get("datalogtimeout", 10)
_LOGGER.debug(f"Config: {self.config}")
return self.config


def _load_file_data(self) -> dict | None:
with process_uploaded_file(self.hass, self.file_id) as f:
config_file_data = f.read_text(encoding="utf-8")
try:
return json.loads(config_file_data)
except Exception as e:
_LOGGER.error(f"Exception in parsing config file: {e}")

def _decrypt_controller(self, controller_encrypted: dict | str) -> bool:
"""Decrypt controller info from config file and fill
CONF_ADMIN_SEED and CONF_CONTROLLER_TYPE fields in self.config"""

if isinstance(controller_encrypted, str):
controller_encrypted_json = json.loads(controller_encrypted)
else:
controller_encrypted_json = controller_encrypted
try:
controller_kp = Keypair.create_from_encrypted_json(
controller_encrypted_json, self.password
)
except CryptoError:
return False
self.config[CONF_ADMIN_SEED] = f"0x{controller_kp.private_key.hex()}"
self.config[CONF_CONTROLLER_TYPE] = controller_kp.crypto_type
return True

def _fill_gateways_fields(self, file_data: dict) -> None:
if file_data.get("pinatapublic") and file_data.get("pinataprivate"):
self.config[CONF_PINATA_PUB] = file_data.get("pinatapublic")
self.config[CONF_PINATA_SECRET] = file_data.get("pinataprivate")
if file_data.get("ipfsurl"):
self.config[CONF_IPFS_GATEWAY] = file_data.get("ipfsurl")
self.config[CONF_IPFS_GATEWAY_PORT] = file_data.get("ipfsport", 443)
self.config[CONF_IPFS_GATEWAY_AUTH] = True
Loading

0 comments on commit 4bcaacb

Please sign in to comment.