diff --git a/ltchiptool/gui/base/__init__.py b/ltchiptool/gui/base/__init__.py new file mode 100644 index 0000000..0c28063 --- /dev/null +++ b/ltchiptool/gui/base/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Kuba Szczodrzyński 2023-11-28. diff --git a/ltchiptool/gui/base/zc.py b/ltchiptool/gui/base/zc.py index d5d1a79..97ee49e 100644 --- a/ltchiptool/gui/base/zc.py +++ b/ltchiptool/gui/base/zc.py @@ -1,60 +1,12 @@ -# Copyright (c) Kuba Szczodrzyński 2023-8-31. +# Copyright (c) Kuba Szczodrzyński 2023-11-28. -from logging import debug, warning +from warnings import warn -from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf +warn( + "ltchiptool.gui.base.zc is deprecated, " "migrate to ltchiptool.gui.mixin.zc", + stacklevel=2, +) -from ltchiptool.gui.main import MainFrame +from ltchiptool.gui.mixin.zc import ZeroconfBase - -# noinspection PyPep8Naming -class ZeroconfBase(ServiceListener): - Main: MainFrame - _zeroconf_browsers: dict[str, ServiceBrowser] = None - _zeroconf_services: dict[str, ServiceInfo] = None - - def AddZeroconfBrowser(self, type_: str) -> None: - if self._zeroconf_browsers is None: - self._zeroconf_browsers = {} - self._zeroconf_services = {} - if not self.Main or not self.Main.Zeroconf: - return - if type_ in self._zeroconf_browsers: - return - self._zeroconf_browsers[type_] = ServiceBrowser(self.Main.Zeroconf, type_, self) - - def StopZeroconf(self) -> None: - if self._zeroconf_browsers is None: - return - for sb in self._zeroconf_browsers.values(): - sb.cancel() - self._zeroconf_browsers.clear() - self._zeroconf_services.clear() - self.OnZeroconfUpdate(self._zeroconf_services) - - def OnZeroconfUpdate(self, services: dict[str, ServiceInfo]): - pass - - def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: - debug(f"Zeroconf service added: {name}") - info = zc.get_service_info(type_, name) - if info: - self._zeroconf_services[name] = info - else: - warning("Couldn't read service info") - self.OnZeroconfUpdate(self._zeroconf_services) - - def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: - debug(f"Zeroconf service updated: {name}") - info = zc.get_service_info(type_, name) - if info: - self._zeroconf_services[name] = info - else: - warning("Couldn't read service info") - self._zeroconf_services.pop(name, None) - self.OnZeroconfUpdate(self._zeroconf_services) - - def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: - debug(f"Zeroconf service removed: {name}") - self._zeroconf_services.pop(name, None) - self.OnZeroconfUpdate(self._zeroconf_services) +__deprecated__ = ZeroconfBase diff --git a/ltchiptool/gui/mixin/__init__.py b/ltchiptool/gui/mixin/__init__.py new file mode 100644 index 0000000..0c28063 --- /dev/null +++ b/ltchiptool/gui/mixin/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Kuba Szczodrzyński 2023-11-28. diff --git a/ltchiptool/gui/mixin/devices.py b/ltchiptool/gui/mixin/devices.py new file mode 100644 index 0000000..f5ab067 --- /dev/null +++ b/ltchiptool/gui/mixin/devices.py @@ -0,0 +1,45 @@ +# Copyright (c) Kuba Szczodrzyński 2023-11-28. + +from ltchiptool.gui.base.window import BaseWindow +from ltchiptool.gui.main import MainFrame +from ltchiptool.gui.work.devices import DeviceWatcher +from ltchiptool.util.misc import list_serial_ports + + +# noinspection PyPep8Naming +class DevicesBase(BaseWindow): + Main: MainFrame + WATCHER: DeviceWatcher = None + + def StartDeviceWatcher(self) -> None: + if not DevicesBase.WATCHER: + watcher = DevicesBase.WATCHER = DeviceWatcher() + watcher.on_stop = self.OnWatcherStopped + watcher.start() + else: + watcher = DevicesBase.WATCHER + watcher.handlers.append(self.OnDevicesUpdated) + watcher.call_handlers() + + def StopDeviceWatcher(self) -> None: + if not DevicesBase.WATCHER: + return + watcher = DevicesBase.WATCHER + if self.OnDevicesUpdated in watcher.handlers: + watcher.handlers.remove(self.OnDevicesUpdated) + + def OnClose(self): + super().OnClose() + if watcher := DevicesBase.WATCHER: + watcher.stop() + watcher.join() + + @staticmethod + def OnWatcherStopped(*_) -> None: + DevicesBase.WATCHER = None + + def OnDevicesUpdated(self) -> None: + self.OnPortsUpdated(list_serial_ports()) + + def OnPortsUpdated(self, ports: list[tuple[str, bool, str]]) -> None: + pass diff --git a/ltchiptool/gui/mixin/zc.py b/ltchiptool/gui/mixin/zc.py new file mode 100644 index 0000000..d5d1a79 --- /dev/null +++ b/ltchiptool/gui/mixin/zc.py @@ -0,0 +1,60 @@ +# Copyright (c) Kuba Szczodrzyński 2023-8-31. + +from logging import debug, warning + +from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf + +from ltchiptool.gui.main import MainFrame + + +# noinspection PyPep8Naming +class ZeroconfBase(ServiceListener): + Main: MainFrame + _zeroconf_browsers: dict[str, ServiceBrowser] = None + _zeroconf_services: dict[str, ServiceInfo] = None + + def AddZeroconfBrowser(self, type_: str) -> None: + if self._zeroconf_browsers is None: + self._zeroconf_browsers = {} + self._zeroconf_services = {} + if not self.Main or not self.Main.Zeroconf: + return + if type_ in self._zeroconf_browsers: + return + self._zeroconf_browsers[type_] = ServiceBrowser(self.Main.Zeroconf, type_, self) + + def StopZeroconf(self) -> None: + if self._zeroconf_browsers is None: + return + for sb in self._zeroconf_browsers.values(): + sb.cancel() + self._zeroconf_browsers.clear() + self._zeroconf_services.clear() + self.OnZeroconfUpdate(self._zeroconf_services) + + def OnZeroconfUpdate(self, services: dict[str, ServiceInfo]): + pass + + def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: + debug(f"Zeroconf service added: {name}") + info = zc.get_service_info(type_, name) + if info: + self._zeroconf_services[name] = info + else: + warning("Couldn't read service info") + self.OnZeroconfUpdate(self._zeroconf_services) + + def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: + debug(f"Zeroconf service updated: {name}") + info = zc.get_service_info(type_, name) + if info: + self._zeroconf_services[name] = info + else: + warning("Couldn't read service info") + self._zeroconf_services.pop(name, None) + self.OnZeroconfUpdate(self._zeroconf_services) + + def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: + debug(f"Zeroconf service removed: {name}") + self._zeroconf_services.pop(name, None) + self.OnZeroconfUpdate(self._zeroconf_services) diff --git a/ltchiptool/gui/panels/flash.py b/ltchiptool/gui/panels/flash.py index 6ba84ff..5962ebf 100644 --- a/ltchiptool/gui/panels/flash.py +++ b/ltchiptool/gui/panels/flash.py @@ -11,9 +11,9 @@ from prettytable import PrettyTable from ltchiptool import Family, SocInterface +from ltchiptool.gui.mixin.devices import DevicesBase from ltchiptool.gui.utils import int_or_zero, on_event, with_target from ltchiptool.gui.work.flash import FlashThread -from ltchiptool.gui.work.ports import PortWatcher from ltchiptool.util.cli import list_serial_ports from ltchiptool.util.detection import Detection from ltchiptool.util.fileio import chname @@ -23,7 +23,7 @@ from .base import BasePanel -class FlashPanel(BasePanel): +class FlashPanel(BasePanel, DevicesBase): detection: Detection | None = None ports: list[tuple[str, bool, str]] prev_read_full: bool = None @@ -136,7 +136,7 @@ def SetSettings( def OnShow(self): super().OnShow() - self.StartWork(PortWatcher(self.OnPortsUpdated), freeze_ui=False) + self.StartDeviceWatcher() def OnUpdate(self, target: wx.Window = None): if self.chip_info: @@ -341,6 +341,7 @@ def port(self, value: str | None): self.Port.SetValue(description) self.DoUpdate(self.Port) return + self.DoUpdate(self.Port) self.delayed_port = value @property diff --git a/ltchiptool/gui/work/devices.py b/ltchiptool/gui/work/devices.py new file mode 100644 index 0000000..cfad486 --- /dev/null +++ b/ltchiptool/gui/work/devices.py @@ -0,0 +1,80 @@ +# Copyright (c) Kuba Szczodrzyński 2023-1-9. + +from logging import error +from time import sleep +from typing import Callable + +from ltchiptool.util.logging import verbose + +from .base import BaseThread + + +# Win32 part based on https://abdus.dev/posts/python-monitor-usb/ +class DeviceWatcher(BaseThread): + handlers: list[Callable[[], None]] = None + + def __init__(self): + super().__init__() + self.handlers = [] + + def _create_window(self): + import win32api + import win32gui + + wc = win32gui.WNDCLASS() + wc.lpfnWndProc = self._on_message + wc.lpszClassName = self.__class__.__name__ + wc.hInstance = win32api.GetModuleHandle(None) + class_atom = win32gui.RegisterClass(wc) + return win32gui.CreateWindow( + class_atom, self.__class__.__name__, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None + ) + + def _on_message(self, hwnd: int, msg: int, wparam: int, lparam: int): + from win32con import ( + DBT_DEVICEARRIVAL, + DBT_DEVICEREMOVECOMPLETE, + WM_DEVICECHANGE, + ) + + if msg != WM_DEVICECHANGE: + return 0 + if wparam not in [DBT_DEVICEARRIVAL, DBT_DEVICEREMOVECOMPLETE]: + return 0 + self.call_handlers() + return 0 + + def run_impl_win32(self): + """ + Listens to Win32 `WM_DEVICECHANGE` messages + and trigger a callback when a device has been plugged in or out + + See: https://docs.microsoft.com/en-us/windows/win32/devio/wm-devicechange + """ + import win32gui + + hwnd = self._create_window() + verbose(f"Created listener window with hwnd={hwnd:x}") + self.call_handlers() + verbose("Listening to messages") + while self.should_run(): + win32gui.PumpWaitingMessages() + sleep(0.5) + verbose("Listener stopped") + + def run_impl(self): + import platform + + match platform.system(): + case "Windows": + self.run_impl_win32() + case _: + verbose("Running dummy PortWatcher impl") + self.call_handlers() + + def call_handlers(self) -> None: + for handler in self.handlers: + try: + handler() + except Exception as e: + error("DeviceWatcher handler threw an exception", exc_info=e) diff --git a/ltchiptool/gui/work/ports.py b/ltchiptool/gui/work/ports.py index 65542cd..a71e823 100644 --- a/ltchiptool/gui/work/ports.py +++ b/ltchiptool/gui/work/ports.py @@ -1,79 +1,12 @@ # Copyright (c) Kuba Szczodrzyński 2023-1-9. -from time import sleep -from typing import Callable +from warnings import warn -from ltchiptool.util.cli import list_serial_ports -from ltchiptool.util.logging import verbose +warn( + "PortWatcher has been removed, please use DevicesBase instead", + stacklevel=2, +) -from .base import BaseThread - -# Win32 part based on https://abdus.dev/posts/python-monitor-usb/ -class PortWatcher(BaseThread): - def __init__(self, on_event: Callable[[list[tuple[str, bool, str]]], None]): - super().__init__() - self.on_event = on_event - - def _create_window(self): - """ - Create a window for listening to messages - https://docs.microsoft.com/en-us/windows/win32/learnwin32/creating-a-window#creating-the-window - - See also: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-createwindoww - - :return: window hwnd - """ - import win32api - import win32gui - - wc = win32gui.WNDCLASS() - wc.lpfnWndProc = self._on_message - wc.lpszClassName = self.__class__.__name__ - wc.hInstance = win32api.GetModuleHandle(None) - class_atom = win32gui.RegisterClass(wc) - return win32gui.CreateWindow( - class_atom, self.__class__.__name__, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None - ) - - def _on_message(self, hwnd: int, msg: int, wparam: int, lparam: int): - from win32con import ( - DBT_DEVICEARRIVAL, - DBT_DEVICEREMOVECOMPLETE, - WM_DEVICECHANGE, - ) - - if msg != WM_DEVICECHANGE: - return 0 - if wparam not in [DBT_DEVICEARRIVAL, DBT_DEVICEREMOVECOMPLETE]: - return 0 - self.on_event(list_serial_ports()) - return 0 - - def run_impl_win32(self): - """ - Listens to Win32 `WM_DEVICECHANGE` messages - and trigger a callback when a device has been plugged in or out - - See: https://docs.microsoft.com/en-us/windows/win32/devio/wm-devicechange - """ - import win32gui - - hwnd = self._create_window() - verbose(f"Created listener window with hwnd={hwnd:x}") - self.on_event(list_serial_ports()) - verbose("Listening to messages") - while self.should_run(): - win32gui.PumpWaitingMessages() - sleep(0.5) - verbose("Listener stopped") - - def run_impl(self): - import platform - - match platform.system(): - case "Windows": - self.run_impl_win32() - case _: - verbose("Running dummy PortWatcher impl") - self.on_event(list_serial_ports()) +class PortWatcher: + pass