diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..3dcd49f --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,27 @@ +FROM python:3.12.2-bookworm + +ENV DEBIAN_FRONTEND=noninteractive \ + DISPLAY=:99 \ + NICEGUI_STORAGE_PATH=data + +# Install packages +RUN apt-get update && apt-get install --no-install-recommends -y \ + sudo git build-essential chromium chromium-driver python3-pip\ + && rm -rf /var/lib/apt/lists/* + +# Create remote user +ARG USERNAME=vscode +ARG USER_UID=1000 +ARG USER_GID=$USER_UID + +RUN groupadd --gid $USER_GID $USERNAME \ + && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME \ + && echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME \ + && chmod 0440 /etc/sudoers.d/$USERNAME + +ENV PATH="/home/${USERNAME}/.local/bin:${PATH}" +ENV CHROME_BINARY_LOCATION=/usr/bin/chromium + +USER $USERNAME + +CMD ["python", "wait.py"] \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..123e1f0 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,38 @@ +// For format details, see https://aka.ms/devcontainer.json. +{ + "name": "autopve-dev", + "build": { + "context": "..", + "dockerfile": "Dockerfile" + }, + "customizations": { + "vscode": { + "extensions": [ + "cschleiden.vscode-github-actions", + "esbenp.prettier-vscode", + "littlefoxteam.vscode-python-test-adapter", + "ms-python.python", + "samuelcolvin.jinjahtml", + "Vue.volar" + ], + "settings": { + "python.testing.cwd": "/workspaces/autopve/", + "python.testing.pytestArgs": ["tests"], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, + "python.defaultInterpreterPath": "/usr/local/bin/python3", + "terminal.integrated.defaultProfile.linux": "bash", + "terminal.integrated.shell.linux": "bash", + "terminal.integrated.profiles.linux": { + "bash (container default)": { + "path": "/usr/bin/bash", + "overrideName": true + } + } + } + } + }, + // More info: https://aka.ms/dev-containers-non-root. + "remoteUser": "vscode", + "postCreateCommand": "/usr/local/bin/python3 -m pip install -r requirements-test.txt" +} diff --git a/.dockerignore b/.dockerignore index ec17670..ff58a2c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -7,5 +7,6 @@ __pycache__/ logs/ notes/ mpl/ +screenshots/ mysecret.py test.py \ No newline at end of file diff --git a/.gitignore b/.gitignore index c2cde77..73a1265 100644 --- a/.gitignore +++ b/.gitignore @@ -168,4 +168,5 @@ notes/ mpl/ mysecret.py test.py -mount/ \ No newline at end of file +mount/ +screenshots/ \ No newline at end of file diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/autopve/content.py b/autopve/content.py index 5e78148..ae44f37 100644 --- a/autopve/content.py +++ b/autopve/content.py @@ -1,11 +1,10 @@ import asyncio from nicegui import ui # type: ignore from autopve import elements as el -import autopve.logo as logo -from autopve.tabs import Tab +from autopve import logo as logo from autopve.tabs.settings import Global, Network, Disk from autopve.tabs.history import History -from autopve.tabs.system import System +from autopve.tabs.system import MustContain, MustNotContain import logging logger = logging.getLogger(__name__) @@ -42,7 +41,8 @@ def _build(self): self._tab["disk"] = ui.tab(name="Disk").classes("text-secondary") self._tab["history"] = ui.tab(name="History").classes("text-secondary") if self._answer != "Default": - self._tab["system"] = ui.tab(name="System").classes("text-secondary") + self._tab["must_contain"] = ui.tab(name="Must Contain").classes("text-secondary") + self._tab["must_not_contain"] = ui.tab(name="Must Not Contain").classes("text-secondary") with ui.row().classes("items-center"): self._answer_display = ui.label(self._answer).classes("text-secondary text-h4") logo.show() @@ -52,7 +52,11 @@ def _build(self): async def _tab_changed(self, e): if e.value == "History": - self._history.update_history() + self._history.update() + elif e.value == "Must Contain": + self._must_contain.update() + elif e.value == "Must Not Contain": + self._must_not_contain.update() def _build_tab_panels(self): self._tab_panels.clear() @@ -62,7 +66,8 @@ def _build_tab_panels(self): self._disk_content = el.ContentTabPanel(self._tab["disk"]) self._history_content = el.ContentTabPanel(self._tab["history"]) if self._answer != "Default": - self._system_content = el.ContentTabPanel(self._tab["system"]) + self._must_contain_content = el.ContentTabPanel(self._tab["must_contain"]) + self._must_not_contain_content = el.ContentTabPanel(self._tab["must_not_contain"]) with self._global_content: self._global = Global(answer=self._answer) with self._network_content: @@ -72,8 +77,10 @@ def _build_tab_panels(self): with self._history_content: self._history = History(answer=self._answer) if self._answer != "Default": - with self._system_content: - self._system = System(answer=self._answer) + with self._must_contain_content: + self._must_contain = MustContain(answer=self._answer) + with self._must_not_contain_content: + self._must_not_contain = MustNotContain(answer=self._answer) async def answer_selected(self, name): self._answer = name diff --git a/autopve/drawer.py b/autopve/drawer.py index 71cb038..4b8de2b 100644 --- a/autopve/drawer.py +++ b/autopve/drawer.py @@ -1,9 +1,8 @@ from typing import Optional +from nicegui.events import KeyEventArguments from nicegui import ui # type: ignore from autopve import elements as el from autopve import storage -from autopve.tabs import Tab - import logging logger = logging.getLogger(__name__) @@ -88,7 +87,7 @@ async def _display_answer_dialog(self, name=""): save = None with ui.dialog() as answer_dialog, el.Card(): - with el.DBody(height="[95vh]", width="[360px]"): + with el.DBody(height="fit", width="[320px]"): with el.WColumn(): all_answers = list(storage.answers.keys()) for answer in list(storage.answers.keys()): @@ -106,16 +105,23 @@ def answer_check(value: str) -> Optional[bool]: return False return None + def enter_submit(e: KeyEventArguments) -> None: + if e.key == "Enter" and save_ea.no_errors is True: + answer_dialog.submit("save") + answer_input = el.VInput(label="answer", value=" ", invalid_characters="""'`"$\\;&<>|(){}""", invalid_values=all_answers, check=answer_check, max_length=20) save_ea = el.ErrorAggregator(answer_input) el.DButton("SAVE", on_click=lambda: answer_dialog.submit("save")).bind_enabled_from(save_ea, "no_errors") + ui.keyboard(on_key=enter_submit, ignore=[]) answer_input.value = name result = await answer_dialog if result == "save": answer = answer_input.value.strip() if len(answer) > 0 and name != "Default": + storage.answer(answer) if name in storage.answers: + storage.answers[answer] = storage.answer(name, copy=True) del storage.answers[name] for row in self._table.rows: if name == row["name"]: diff --git a/autopve/page.py b/autopve/page.py deleted file mode 100644 index 937095f..0000000 --- a/autopve/page.py +++ /dev/null @@ -1,74 +0,0 @@ -from typing import Any, Dict, List, Optional, Union -import asyncio -import copy -import json -import tomlkit -from fastapi import Request -from fastapi.responses import PlainTextResponse -from nicegui import app, Client, ui # type: ignore -from autopve import elements as el -from autopve.drawer import Drawer -from autopve.content import Content -from autopve import storage -import autopve.tabs.history as history -import logging - -logger = logging.getLogger(__name__) - - -def build(): - @app.post("/answer") - async def post_answer(request: Request) -> PlainTextResponse: - def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]): - toml = tomlkit.dumps(data) - toml_fixed = "" - for line in toml.splitlines(): - if len(line) > 0 and line[0] == '"': - line = line.replace('"', "", 2) - toml_fixed = toml_fixed + line + "\n" - r = history.Request(answer=answer, response=toml_fixed, system_info=copy.deepcopy(system_info)) - history.History.add_history(r) - for client in Client.instances.values(): - if not client.has_socket_connection: - continue - with client: - el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=30) - return PlainTextResponse(toml_fixed) - - system_info = await request.json() - system_info_raw = json.dumps(system_info) - default_data = copy.deepcopy(storage.answer("Default")) - answers = list(storage.answers.keys()) - if "Default" in answers: - answers.remove("Default") - for answer in answers: - answer_data = copy.deepcopy(storage.answer(answer)) - if "match" in answer_data: - if len(answer_data["match"]) > 0 and answer_data["match"] in system_info_raw: - if "global" in default_data and "global" in answer_data: - default_data["global"].update(answer_data["global"]) - if "network" in default_data and "network" in answer_data: - default_data["network"].update(answer_data["network"]) - if "disk-setup" in default_data and "disk-setup" in answer_data: - default_data["disk-setup"].update(answer_data["disk-setup"]) - return response(answer, system_info, default_data) - return response("Default", system_info, default_data) - - @ui.page("/", response_timeout=30) - async def index(client: Client) -> None: - app.add_static_files("/static", "static") - el.load_element_css() - ui.colors( - primary=el.orange, - secondary=el.orange, - accent=el.orange, - dark=el.dark, - positive="#21BA45", - negative="#C10015", - info="#5C8984", - warning="#F2C037", - ) - column = ui.column() - content = Content() - drawer = Drawer(column, content.answer_selected, content.hide) - drawer.build() diff --git a/autopve/storage.py b/autopve/storage.py index 37f37fc..00d4d1a 100644 --- a/autopve/storage.py +++ b/autopve/storage.py @@ -1,11 +1,10 @@ -from typing import Any, Dict, Literal +from typing import Any, Dict +import json from nicegui import app import logging logger = logging.getLogger(__name__) - - -configs_version = int(102) +configs_version = int(100) configs_version_string = f"config_{configs_version}" root = app.storage.general.get(configs_version_string, None) if root is None: @@ -35,73 +34,10 @@ } -def answer(name: str) -> dict: +def answer(name: str, copy: bool = False) -> dict: if name not in answers: answers[name] = {} - return answers[name] - - -# def algo(answer_name: str) -> dict: -# h = answer(answer_name) -# if "algo" not in h: -# h["algo"] = {} -# return h["algo"] - - -# def algo_sensor(answer_name: str, sensor: str) -> dict: -# a = algo(answer_name) -# if sensor not in a: -# a[sensor] = {} -# if "type" not in a[sensor]: -# a[sensor]["type"] = "curve" -# return a[sensor] - - -# def curve(answer_name: str, sensor: str) -> dict: -# s = algo_sensor(answer_name, sensor) -# if "curve" not in s: -# s["curve"] = {} -# return s["curve"] - - -# def curve_speed(answer_name: str, sensor: str, default=None) -> dict: -# c = curve(answer_name, sensor) -# if "speed" not in c: -# if default is None: -# c["speed"] = { -# "Min": None, -# "Low": None, -# "Medium": None, -# "High": None, -# "Max": None, -# } -# else: -# c["speed"] = default -# return c["speed"] - - -# def curve_temp(answer_name: str, sensor: str, default=None) -> dict: -# c = curve(answer_name, sensor) -# if "temp" not in c: -# if default is None: -# c["temp"] = { -# "Min": 30, -# "Low": 40, -# "Medium": 50, -# "High": 60, -# "Max": 70, -# } -# else: -# c["temp"] = default -# return c["temp"] - - -# def pid(answer_name: str, sensor: str) -> Dict[str, float]: -# s = algo_sensor(answer_name, sensor) -# if "pid" not in s: -# s["pid"] = {"Kp": 5, "Ki": 0.01, "Kd": 0.1, "Target": 40} -# return s["pid"] - - -# def pid_coefficient(answer_name: str, sensor: str, coefficient: Literal["Kp", "Ki", "Kd", "Target"]) -> float: -# return pid(answer_name, sensor)[coefficient] + if copy is False: + return answers[name] + else: + return json.loads(json.dumps(answers[name])) diff --git a/autopve/tabs/__init__.py b/autopve/tabs/__init__.py index 531afa3..4056b1a 100644 --- a/autopve/tabs/__init__.py +++ b/autopve/tabs/__init__.py @@ -1,104 +1,25 @@ from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass, field -from nicegui import app, ui # type: ignore -import autopve.elements as el -from autopve import storage import logging logger = logging.getLogger(__name__) +@dataclass(kw_only=True) +class Share: + history: List[Dict[str, Any]] = field(default_factory=list) + last_timestamp: float = 0 + unique_system_information: List[str] = field(default_factory=list) + + class Tab: - def __init__(self, answer: Optional[str] = None, table: Optional[str] = None) -> None: - self.answer: Optional[str] = answer - self.table: Optional[str] = table - self.picked_keys: Dict["str", Any] = {} + _share: Share = Share() + + def __init__(self, answer: str, type: Optional[str] = None) -> None: + self.answer: str = answer + self.type: Optional[str] = type + self._elements: Dict[str, Any] = {} self._build() def _build(self): pass - - def key_picker(self, keys: Dict[str, Any]): - def keys_controls(): - with ui.column() as col: - col.tailwind.width("[560px]").align_items("center") - with ui.card() as card: - card.tailwind.width("full") - key_select = ui.select(list(keys.keys()), label="key", new_value_mode="add-unique", with_input=True) - key_select.tailwind.width("full") - with ui.row() as row: - row.tailwind.width("full").align_items("center").justify_content("between") - with ui.row() as row: - row.tailwind.align_items("center") - self.current_help = None - self.current_key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True) - self.current_key.bind_value_from(key_select) - with ui.button(icon="help"): - self.current_help = ui.tooltip("NA") - ui.button(icon="add", on_click=lambda: add_key(self.current_key.value)) - ui.separator() - self.keys_scroll = ui.scroll_area() - self.keys_scroll.tailwind.width("full").height("[480px]") - self.key_controls = {} - items = storage.answer(self.answer) - if self.table is not None and self.table in items: - for key, value in items[self.table].items(): - if isinstance(value, list): - add_key(key, "[" + ",".join(str(v) for v in value) + "]") - else: - add_key(key, str(value)) - - def add_key(key, value=""): - if key is not None and key != "" and key not in self.picked_keys: - with self.keys_scroll: - with ui.row() as key_row: - key_row.tailwind.width("full").align_items("center").justify_content("between") - with ui.row() as row: - row.tailwind.align_items("center") - self.picked_keys[key] = value - self.key_controls[key] = { - "control": el.FInput( - key, - password=True if key == "root_password" else False, - autocomplete=keys[key]["options"] if key in keys and "options" in keys[key] else None, - on_change=lambda e, key=key: set_key(key, e.value), - ), - "row": key_row, - } - self.key_controls[key]["control"].value = value - if key in keys: - with ui.button(icon="help"): - ui.tooltip(keys[key]["description"]) - ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key)) - - def remove_key(key): - self.keys_scroll.remove(self.key_controls[key]["row"]) - del self.picked_keys[key] - del self.key_controls[key] - - def set_key(key, value: str): - if len(value) > 0: - if key in keys and "type" in keys[key]: - if keys[key]["type"] == "list": - self.picked_keys[key] = value[1:-1].split(",") - elif keys[key]["type"] == "int": - self.picked_keys[key] = int(value) - else: - self.picked_keys[key] = value - else: - if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]": - self.picked_keys[key] = value[1:-1].split(",") - elif value.isnumeric(): - self.picked_keys[key] = int(value) - else: - self.picked_keys[key] = value - storage.answer(self.answer)[self.table] = self.picked_keys - - def key_changed(e): - if self.current_help is not None: - if e.value in keys: - self.current_help.text = keys[e.value]["description"] - else: - self.current_help.text = "NA" - - keys_controls() diff --git a/autopve/tabs/history.py b/autopve/tabs/history.py index 4daf3a5..d33270b 100644 --- a/autopve/tabs/history.py +++ b/autopve/tabs/history.py @@ -2,9 +2,11 @@ from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass, field import time +import json +import re from nicegui import app, ui # type: ignore from . import Tab -import autopve.elements as el +from autopve import elements as el import logging logger = logging.getLogger(__name__) @@ -32,7 +34,7 @@ def __init__(self, container, label) -> None: self._label = label self._visible = None self._request = None - self._submitted = None + self._submitted: Optional[asyncio.Event] = None with self._container: self._label = ui.label(self._label).tailwind().text_color("primary") self._done = el.IButton(icon="done", on_click=lambda: self.submit("confirm")) @@ -66,8 +68,6 @@ def submit(self, request) -> None: class History(Tab): - _history: List[Dict[str, Any]] = [] - def _build(self): async def display_request(e): if e.args["data"]["system_info"] is not None and e.args["data"]["response"] is not None: @@ -137,7 +137,7 @@ async def display_request(e): "maxWidth": 200, }, ], - "rowData": self._history, + "rowData": self._share.history, }, theme="balham-dark", ) @@ -160,14 +160,27 @@ def _set_selection(self, mode=None): self._grid.options["rowSelection"] = row_selection self._grid.update() - def update_history(self): + def update(self): self._grid.update() @classmethod def add_history(cls, request: Request) -> None: - if len(cls._history) > 1000: - cls._history.pop(0) - cls._history.append({"timestamp": request.timestamp, "name": request.name, "answer": request.answer, "response": request.response, "system_info": request.system_info}) + if len(cls._share.history) > 1000: + cls._share.history.pop(0) + cls._share.history.append( + { + "timestamp": request.timestamp, + "name": request.name, + "answer": request.answer, + "response": request.response, + "system_info": request.system_info, + } + ) + cls._share.last_timestamp = request.timestamp + matches = re.findall(r"(\"[^\"]+\"\s*:\s*(\"[^\"]+\"|\d+|true|false))", json.dumps(request.system_info)) + for match in matches: + if str(match[0]) not in cls._share.unique_system_information: + cls._share.unique_system_information.append(str(match[0])) async def _remove_history(self): self._set_selection(mode="multiple") @@ -175,6 +188,6 @@ async def _remove_history(self): if request == "confirm": rows = await self._grid.get_selected_rows() for row in rows: - self._history.remove(row) + self._share.history.remove(row) self._grid.update() self._set_selection() diff --git a/autopve/tabs/settings.py b/autopve/tabs/settings.py index aeccb5a..ee7a576 100644 --- a/autopve/tabs/settings.py +++ b/autopve/tabs/settings.py @@ -1,16 +1,110 @@ -from typing import Any, Dict, List, Optional, Union -from dataclasses import dataclass, field -from nicegui import app, ui # type: ignore +from typing import Any, Dict, Optional +from nicegui import ui from . import Tab -import autopve.elements as el +from autopve import elements as el +from autopve import storage import logging logger = logging.getLogger(__name__) -class Global(Tab): - def __init__(self, answer: Optional[str] = None) -> None: - self.keys = { +class Setting(Tab): + def __init__(self, answer: str, type: Optional[str] = None, keys: Dict[str, Dict[str, Any]] = {}) -> None: + self.keys: Dict[str, Dict[str, Any]] = keys + super().__init__(answer, type=type) + + def _build(self): + self.key_picker() + + def key_picker(self): + def keys_controls(): + with ui.column() as col: + col.tailwind.width("[560px]").align_items("center") + with ui.card() as card: + card.tailwind.width("full") + key_select = ui.select(list(self.keys.keys()), label="key", new_value_mode="add", with_input=True) + key_select.tailwind.width("full") + with ui.row() as row: + row.tailwind.width("full").align_items("center").justify_content("between") + with ui.row() as row: + row.tailwind.align_items("center") + self.help = None + key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True) + key.bind_value_from(key_select) + with ui.button(icon="help"): + self.help = ui.tooltip("NA") + ui.button(icon="add", on_click=lambda key=key: add_key(key.value)) + ui.separator() + self._scroll = ui.scroll_area() + self._scroll.tailwind.width("full").height("[480px]") + items = storage.answer(self.answer) + if self.type is not None and self.type in items: + for key, value in items[self.type].items(): + if isinstance(value, list): + add_key(key, "[" + ",".join(str(v) for v in value) + "]") + else: + add_key(key, str(value)) + + def add_key(key: str, value: str = ""): + if key is not None and key != "" and key not in self._elements.keys(): + with self._scroll: + with ui.row() as key_row: + key_row.tailwind.width("full").align_items("center").justify_content("between") + with ui.row() as row: + row.tailwind.align_items("center") + self._elements[key] = { + "control": el.FInput( + key, + password=True if key == "root_password" else False, + autocomplete=self.keys[key]["options"] if key in self.keys and "options" in self.keys[key] else None, + on_change=lambda e, key=key: set_key(key, e.value), + ), + "row": key_row, + } + self._elements[key]["control"].value = value + if key in self.keys: + with ui.button(icon="help"): + ui.tooltip(self.keys[key]["description"]) + ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key)) + + def remove_key(key): + self._scroll.remove(self._elements[key]["row"]) + del self._elements[key] + + def set_key(key, value: str): + v: Any = None + if len(value) > 0: + if key in self.keys and "type" in self.keys[key]: + if self.keys[key]["type"] == "list": + v = value[1:-1].split(",") + elif self.keys[key]["type"] == "int": + v = int(value) + else: + v = value + else: + if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]": + v = value[1:-1].split(",") + elif value.isnumeric(): + v = int(value) + else: + v = value + if self.type not in storage.answer(self.answer): + storage.answer(self.answer)[self.type] = {} + storage.answer(self.answer)[self.type][key] = v + + def key_changed(e): + if self.help is not None: + if e.value in self.keys: + self.help.text = self.keys[e.value]["description"] + else: + self.help.text = "NA" + + keys_controls() + + +class Global(Setting): + def __init__(self, answer: str) -> None: + keys = { "keyboard": {"description": "The keyboard layout with the following possible options"}, "country": {"description": "The country code in the two letter variant. For example, at, us or fr."}, "fqdn": {"description": "The fully qualified domain name of the host. The domain part will be used as the search domain."}, @@ -22,30 +116,24 @@ def __init__(self, answer: Optional[str] = None) -> None: "description": "If set to true, the installer will reboot automatically when an error is encountered. The default behavior is to wait to give the administrator a chance to investigate why the installation failed." }, } - super().__init__(answer=answer, table="global") - - def _build(self): - self.key_picker(keys=self.keys) + super().__init__(answer, type="global", keys=keys) -class Network(Tab): - def __init__(self, answer: Optional[str] = None) -> None: - self.keys = { +class Network(Setting): + def __init__(self, answer: str) -> None: + keys = { "source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer."}, "cidr": {"description": "The IP address in CIDR notation. For example, 192.168.1.10/24."}, "dns": {"description": "The IP address of the DNS server."}, "gateway": {"description": "The IP address of the default gateway."}, "filter": {"description": "Filter against the UDEV properties to select the network card. See filters."}, } - super().__init__(answer=answer, table="network") - - def _build(self): - self.key_picker(keys=self.keys) + super().__init__(answer, type="network", keys=keys) -class Disk(Tab): - def __init__(self, answer: Optional[str] = None) -> None: - self.keys = { +class Disk(Setting): + def __init__(self, answer: str) -> None: + keys = { "filesystem": {"description": "One of the following options: ext4, xfs, zfs, or btrfs.", "options": ["ext4", "xfs", "zfs", "btrfs"]}, "disk_list": {"description": 'List of disks to use. Useful if you are sure about the disk names. For example: disk_list = ["sda", "sdb"].'}, "filter": {"description": "Filter against UDEV properties to select the disks for the installation. See filters."}, @@ -74,7 +162,4 @@ def __init__(self, answer: Optional[str] = None) -> None: }, "btrfs.hdsize": {"description": ""}, } - super().__init__(answer=answer, table="disk-setup") - - def _build(self): - self.key_picker(keys=self.keys) + super().__init__(answer, type="disk-setup", keys=keys) diff --git a/autopve/tabs/system.py b/autopve/tabs/system.py index f716b3c..20728ae 100644 --- a/autopve/tabs/system.py +++ b/autopve/tabs/system.py @@ -1,8 +1,7 @@ -from typing import Any, Dict, List, Optional, Union -from dataclasses import dataclass, field -from nicegui import app, ui # type: ignore +from typing import Optional +from nicegui import ui from . import Tab -import autopve.elements as el +from autopve import elements as el from autopve import storage import logging @@ -10,12 +9,77 @@ class System(Tab): - def __init__(self, answer=None) -> None: - super().__init__(answer) + def __init__(self, answer: str, type: Optional[str] = None, note: str = "") -> None: + self.note: str = note + self.select: Optional[ui.select] = None + self.last_update_timestamp: float = 0 + super().__init__(answer, type=type) def _build(self): - def set_match(match: str): - storage.answer(self.answer)["match"] = match + self.restriction_picker() - answer = storage.answer(self.answer) - el.FInput("Match String", value=answer["match"] if "match" in answer else "", on_change=lambda e: set_match(e.value)) + def restriction_picker(self): + def restriction_controls(): + with ui.column() as col: + col.tailwind.width("[560px]").align_items("center") + with ui.card() as card: + card.tailwind.width("full") + self.select = ui.select(self._share.unique_system_information, new_value_mode="add", with_input=True) + self.select.tailwind.width("full") + card.on("mousemove", handler=self.update, throttle=3) + with ui.row() as row: + row.tailwind.width("full").align_items("center").justify_content("between") + restriction = el.FInput(read_only=True) + restriction.tailwind.width("[420px]") + restriction.bind_value_from(self.select) + ui.button(icon="add", on_click=lambda restriction=restriction: add_restriction(restriction.value)) + ui.label(self.note).tailwind.align_self("center") + ui.separator() + self.scroll = ui.scroll_area() + self.scroll.tailwind.width("full").height("[480px]") + restrictions = [] + if self.type in storage.answer(self.answer): + restrictions = storage.answer(self.answer)[self.type] + for restriction in restrictions: + add_restriction(restriction) + + def add_restriction(restriction: str): + if restriction is not None and restriction.strip() != "" and restriction not in self._elements.keys(): + with self.scroll: + with ui.row() as row: + row.tailwind.width("full").align_items("center").justify_content("between") + with ui.row() as row: + row.tailwind.align_items("center") + self._elements[restriction] = { + "control": el.FInput(value=restriction, read_only=True), + "row": row, + } + self._elements[restriction]["control"].tailwind.width("[420px]") + ui.button(icon="remove", on_click=lambda _, r=restriction: remove_restriction(r)) + if self.type not in storage.answer(self.answer): + storage.answer(self.answer)[self.type] = [] + if restriction not in storage.answer(self.answer)[self.type]: + storage.answer(self.answer)[self.type].append(restriction) + + def remove_restriction(restriction): + self.scroll.remove(self._elements[restriction]["row"]) + del self._elements[restriction] + if restriction in storage.answer(self.answer)[self.type]: + storage.answer(self.answer)[self.type].remove(restriction) + + restriction_controls() + + def update(self): + if self.select is not None and self._share.last_timestamp > self.last_update_timestamp: + self.last_update_timestamp = self._share.last_timestamp + self.select.update() + + +class MustContain(System): + def __init__(self, answer: str) -> None: + super().__init__(answer, type="must_contain", note="The system information must contain at least one of these strings.") + + +class MustNotContain(System): + def __init__(self, answer: str) -> None: + super().__init__(answer, type="must_not_contain", note="The system information must not contain any of these strings.") diff --git a/main.py b/main.py index aadd6d8..5ba2555 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,12 @@ import logging logger = logging.getLogger(__name__) +from typing import Any, Dict +import json +import tomlkit import os +os.environ.setdefault("NICEGUI_STORAGE_PATH", "data") if not os.path.exists("data"): logger.warning("Could not find 'data' directory, verify bind mounts.") if os.path.exists(".nicegui"): @@ -14,12 +18,14 @@ os.makedirs("data") else: logger.warning("Found 'data' directory.") -os.environ.setdefault("NICEGUI_STORAGE_PATH", "data") - +from fastapi import Request +from fastapi.responses import PlainTextResponse +from nicegui import app, Client, ui # type: ignore -if __name__ in {"__main__", "__mp_main__"}: - from nicegui import app, ui # type: ignore +@ui.page("/", response_timeout=30) +# async def page(client: Client) -> None: +def page() -> None: ui.card.default_style("max-width: none") ui.card.default_props("flat bordered") ui.input.default_props("outlined dense hide-bottom-space") @@ -29,9 +35,80 @@ ui.stepper.default_props("flat") ui.stepper.default_classes("full-size-stepper") - from autopve import page + import autopve.elements as el + from autopve.drawer import Drawer + from autopve.content import Content + + app.add_static_files("/static", "static") + el.load_element_css() + ui.colors( + primary=el.orange, + secondary=el.orange, + accent=el.orange, + dark=el.dark, + positive="#21BA45", + negative="#C10015", + info="#5C8984", + warning="#F2C037", + ) + column = ui.column() + content = Content() + drawer = Drawer(column, content.answer_selected, content.hide) + drawer.build() + + +@app.post("/answer") +async def post_answer(request: Request) -> PlainTextResponse: + import autopve.elements as el + from autopve import storage + from autopve.tabs import history + + def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]): + toml = tomlkit.dumps(data) + toml_fixed = "" + for line in toml.splitlines(): + if len(line) > 0 and line[0] == '"': + line = line.replace('"', "", 2) + toml_fixed = toml_fixed + line + "\n" + r = history.Request(answer=answer, response=toml_fixed, system_info=system_info) + history.History.add_history(r) + for client in Client.instances.values(): + if not client.has_socket_connection: + continue + with client: + el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=15) + return PlainTextResponse(toml_fixed) + + system_info = await request.json() + system_info_raw = json.dumps(system_info) + default_data = storage.answer("Default", copy=True) + answers = list(storage.answers.keys()) + if "Default" in answers: + answers.remove("Default") + for answer in answers: + answer_data = storage.answer(answer, copy=True) + match = False + if "must_contain" in answer_data: + for entry in answer_data["must_contain"]: + if len(entry) > 0 and entry in system_info_raw: + match = True + if "must_not_contain" in answer_data: + for entry in answer_data["must_not_contain"]: + if len(entry) > 0 and entry in system_info_raw: + match = False + if match is True: + if "global" in default_data and "global" in answer_data: + default_data["global"].update(answer_data["global"]) + if "network" in default_data and "network" in answer_data: + default_data["network"].update(answer_data["network"]) + if "disk-setup" in default_data and "disk-setup" in answer_data: + default_data["disk-setup"].update(answer_data["disk-setup"]) + return response(answer, system_info, default_data) + return response("Default", system_info, default_data) + + +if __name__ in {"__main__", "__mp_main__"}: from autopve import logo app.on_startup(lambda: print(f"Starting autopve, bound to the following addresses {', '.join(app.urls)}.", flush=True)) - page.build() ui.run(title="autopve", favicon=logo.logo, dark=True, reload=False, show=False, show_welcome_message=False) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6e47fd5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.pytest.ini_options] +addopts = "--driver Chrome" +asyncio_mode = "auto" \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..f98391e --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,55 @@ +aiofiles==23.2.1 +aiohttp==3.9.5 +aiosignal==1.3.1 +annotated-types==0.6.0 +anyio==4.3.0 +attrs==23.2.0 +bidict==0.23.1 +certifi==2024.2.2 +charset-normalizer==3.3.2 +click==8.1.7 +docutils==0.19 +fastapi==0.109.2 +frozenlist==1.4.1 +h11==0.14.0 +httpcore==1.0.5 +httptools==0.6.1 +httpx==0.27.0 +idna==3.7 +ifaddr==0.2.0 +itsdangerous==2.2.0 +Jinja2==3.1.3 +markdown2==2.4.13 +MarkupSafe==2.1.5 +multidict==6.0.5 +nicegui==1.4.23 +orjson==3.10.3 +pscript==0.7.7 +pydantic==2.7.1 +pydantic_core==2.18.2 +Pygments==2.17.2 +python-dotenv==1.0.1 +python-engineio==4.9.0 +python-multipart==0.0.9 +python-socketio==5.11.2 +PyYAML==6.0.1 +requests==2.31.0 +simple-websocket==1.0.0 +sniffio==1.3.1 +starlette==0.36.3 +tomlkit==0.12.4 +typing_extensions==4.11.0 +urllib3==2.2.1 +uvicorn==0.29.0 +uvloop==0.19.0 +vbuild==0.8.2 +watchfiles==0.21.0 +websockets==12.0 +wsproto==1.2.0 +yarl==1.9.4 +pytest +pytest-selenium +pytest-asyncio +selenium +icecream +beautifulsoup4 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 74c2384..ea97c4e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,54 +1,46 @@ aiofiles==23.2.1 -aiohttp==3.9.3 +aiohttp==3.9.5 aiosignal==1.3.1 annotated-types==0.6.0 -anyio==4.2.0 +anyio==4.3.0 attrs==23.2.0 -bidict==0.22.1 -black==24.2.0 +bidict==0.23.1 certifi==2024.2.2 charset-normalizer==3.3.2 click==8.1.7 +docutils==0.19 fastapi==0.109.2 frozenlist==1.4.1 h11==0.14.0 -httpcore==1.0.2 +httpcore==1.0.5 httptools==0.6.1 -httpx==0.26.0 -idna==3.6 +httpx==0.27.0 +idna==3.7 ifaddr==0.2.0 -itsdangerous==2.1.2 +itsdangerous==2.2.0 Jinja2==3.1.3 -markdown2==2.4.12 +markdown2==2.4.13 MarkupSafe==2.1.5 multidict==6.0.5 -mypy-extensions==1.0.0 -nicegui==1.4.15 -nicegui-highcharts==1.0.1 -numpy==1.26.4 -orjson==3.9.13 -packaging==23.2 -pathspec==0.12.1 -platformdirs==4.2.0 +nicegui==1.4.23 +orjson==3.10.3 pscript==0.7.7 -pydantic==2.6.1 -pydantic_core==2.16.2 +pydantic==2.7.1 +pydantic_core==2.18.2 Pygments==2.17.2 python-dotenv==1.0.1 python-engineio==4.9.0 python-multipart==0.0.9 -python-socketio==5.11.1 +python-socketio==5.11.2 PyYAML==6.0.1 requests==2.31.0 -scipy==1.12.0 simple-websocket==1.0.0 -sniffio==1.3.0 +sniffio==1.3.1 starlette==0.36.3 tomlkit==0.12.4 -types-requests==2.31.0.20240125 -typing_extensions==4.9.0 -urllib3==2.2.0 -uvicorn==0.27.1 +typing_extensions==4.11.0 +urllib3==2.2.1 +uvicorn==0.29.0 uvloop==0.19.0 vbuild==0.8.2 watchfiles==0.21.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a16f1dd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,52 @@ +# from nicegui.testing.conftest import * +from collections.abc import Generator +import os +import pytest +from nicegui.testing import Screen +from nicegui.testing.conftest import ( + capabilities, # noqa: F401 + driver, # noqa: F401 + remove_all_screenshots, # noqa: F401 + reset_globals, # noqa: F401 + DOWNLOAD_DIR, +) +from selenium import webdriver + + +@pytest.fixture +def chrome_options(chrome_options: webdriver.ChromeOptions) -> webdriver.ChromeOptions: + """Configure the Chrome driver options.""" + chrome_options.add_argument("disable-dev-shm-using") + chrome_options.add_argument("no-sandbox") + chrome_options.add_argument("headless") + # check if we are running on GitHub Actions + if "GITHUB_ACTIONS" in os.environ: + chrome_options.add_argument("disable-gpu") + else: + chrome_options.add_argument("--use-gl=angle") + chrome_options.add_argument("window-size=1920x1080") + chrome_options.add_experimental_option( + "prefs", + { + "download.default_directory": str(DOWNLOAD_DIR), + "download.prompt_for_download": False, # To auto download the file + "download.directory_upgrade": True, + }, + ) + if "CHROME_BINARY_LOCATION" in os.environ: + chrome_options.binary_location = os.environ["CHROME_BINARY_LOCATION"] + return chrome_options + + +@pytest.fixture +def screen( + driver: webdriver.Chrome, # noqa: F811 + request: pytest.FixtureRequest, + caplog: pytest.LogCaptureFixture, +) -> Generator[Screen, None, None]: + """Create a new Screen instance.""" + screen_ = Screen(driver, caplog) + yield screen_ + if screen_.is_open: + screen_.shot(request.node.name) + screen_.stop_server() diff --git a/wait.py b/wait.py new file mode 100644 index 0000000..28d6c6c --- /dev/null +++ b/wait.py @@ -0,0 +1,4 @@ +import time + +while True: + time.sleep(10)