-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[gui] Add base device listener mixin, move Zeroconf to mixins
- Loading branch information
Showing
8 changed files
with
206 additions
and
133 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-11-28. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,12 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-8-31. | ||
# Copyright (c) Kuba Szczodrzyński 2023-11-28. | ||
|
||
from logging import debug, warning | ||
from warnings import warn | ||
|
||
from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf | ||
warn( | ||
"ltchiptool.gui.base.zc is deprecated, " "migrate to ltchiptool.gui.mixin.zc", | ||
stacklevel=2, | ||
) | ||
|
||
from ltchiptool.gui.main import MainFrame | ||
from ltchiptool.gui.mixin.zc import ZeroconfBase | ||
|
||
|
||
# noinspection PyPep8Naming | ||
class ZeroconfBase(ServiceListener): | ||
Main: MainFrame | ||
_zeroconf_browsers: dict[str, ServiceBrowser] = None | ||
_zeroconf_services: dict[str, ServiceInfo] = None | ||
|
||
def AddZeroconfBrowser(self, type_: str) -> None: | ||
if self._zeroconf_browsers is None: | ||
self._zeroconf_browsers = {} | ||
self._zeroconf_services = {} | ||
if not self.Main or not self.Main.Zeroconf: | ||
return | ||
if type_ in self._zeroconf_browsers: | ||
return | ||
self._zeroconf_browsers[type_] = ServiceBrowser(self.Main.Zeroconf, type_, self) | ||
|
||
def StopZeroconf(self) -> None: | ||
if self._zeroconf_browsers is None: | ||
return | ||
for sb in self._zeroconf_browsers.values(): | ||
sb.cancel() | ||
self._zeroconf_browsers.clear() | ||
self._zeroconf_services.clear() | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def OnZeroconfUpdate(self, services: dict[str, ServiceInfo]): | ||
pass | ||
|
||
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service added: {name}") | ||
info = zc.get_service_info(type_, name) | ||
if info: | ||
self._zeroconf_services[name] = info | ||
else: | ||
warning("Couldn't read service info") | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service updated: {name}") | ||
info = zc.get_service_info(type_, name) | ||
if info: | ||
self._zeroconf_services[name] = info | ||
else: | ||
warning("Couldn't read service info") | ||
self._zeroconf_services.pop(name, None) | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service removed: {name}") | ||
self._zeroconf_services.pop(name, None) | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
__deprecated__ = ZeroconfBase |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-11-28. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-11-28. | ||
|
||
from ltchiptool.gui.base.window import BaseWindow | ||
from ltchiptool.gui.main import MainFrame | ||
from ltchiptool.gui.work.devices import DeviceWatcher | ||
from ltchiptool.util.misc import list_serial_ports | ||
|
||
|
||
# noinspection PyPep8Naming | ||
class DevicesBase(BaseWindow): | ||
Main: MainFrame | ||
WATCHER: DeviceWatcher = None | ||
|
||
def StartDeviceWatcher(self) -> None: | ||
if not DevicesBase.WATCHER: | ||
watcher = DevicesBase.WATCHER = DeviceWatcher() | ||
watcher.on_stop = self.OnWatcherStopped | ||
watcher.start() | ||
else: | ||
watcher = DevicesBase.WATCHER | ||
watcher.handlers.append(self.OnDevicesUpdated) | ||
watcher.call_handlers() | ||
|
||
def StopDeviceWatcher(self) -> None: | ||
if not DevicesBase.WATCHER: | ||
return | ||
watcher = DevicesBase.WATCHER | ||
if self.OnDevicesUpdated in watcher.handlers: | ||
watcher.handlers.remove(self.OnDevicesUpdated) | ||
|
||
def OnClose(self): | ||
super().OnClose() | ||
if watcher := DevicesBase.WATCHER: | ||
watcher.stop() | ||
watcher.join() | ||
|
||
@staticmethod | ||
def OnWatcherStopped(*_) -> None: | ||
DevicesBase.WATCHER = None | ||
|
||
def OnDevicesUpdated(self) -> None: | ||
self.OnPortsUpdated(list_serial_ports()) | ||
|
||
def OnPortsUpdated(self, ports: list[tuple[str, bool, str]]) -> None: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-8-31. | ||
|
||
from logging import debug, warning | ||
|
||
from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf | ||
|
||
from ltchiptool.gui.main import MainFrame | ||
|
||
|
||
# noinspection PyPep8Naming | ||
class ZeroconfBase(ServiceListener): | ||
Main: MainFrame | ||
_zeroconf_browsers: dict[str, ServiceBrowser] = None | ||
_zeroconf_services: dict[str, ServiceInfo] = None | ||
|
||
def AddZeroconfBrowser(self, type_: str) -> None: | ||
if self._zeroconf_browsers is None: | ||
self._zeroconf_browsers = {} | ||
self._zeroconf_services = {} | ||
if not self.Main or not self.Main.Zeroconf: | ||
return | ||
if type_ in self._zeroconf_browsers: | ||
return | ||
self._zeroconf_browsers[type_] = ServiceBrowser(self.Main.Zeroconf, type_, self) | ||
|
||
def StopZeroconf(self) -> None: | ||
if self._zeroconf_browsers is None: | ||
return | ||
for sb in self._zeroconf_browsers.values(): | ||
sb.cancel() | ||
self._zeroconf_browsers.clear() | ||
self._zeroconf_services.clear() | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def OnZeroconfUpdate(self, services: dict[str, ServiceInfo]): | ||
pass | ||
|
||
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service added: {name}") | ||
info = zc.get_service_info(type_, name) | ||
if info: | ||
self._zeroconf_services[name] = info | ||
else: | ||
warning("Couldn't read service info") | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service updated: {name}") | ||
info = zc.get_service_info(type_, name) | ||
if info: | ||
self._zeroconf_services[name] = info | ||
else: | ||
warning("Couldn't read service info") | ||
self._zeroconf_services.pop(name, None) | ||
self.OnZeroconfUpdate(self._zeroconf_services) | ||
|
||
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: | ||
debug(f"Zeroconf service removed: {name}") | ||
self._zeroconf_services.pop(name, None) | ||
self.OnZeroconfUpdate(self._zeroconf_services) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-1-9. | ||
|
||
from logging import error | ||
from time import sleep | ||
from typing import Callable | ||
|
||
from ltchiptool.util.logging import verbose | ||
|
||
from .base import BaseThread | ||
|
||
|
||
# Win32 part based on https://abdus.dev/posts/python-monitor-usb/ | ||
class DeviceWatcher(BaseThread): | ||
handlers: list[Callable[[], None]] = None | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.handlers = [] | ||
|
||
def _create_window(self): | ||
import win32api | ||
import win32gui | ||
|
||
wc = win32gui.WNDCLASS() | ||
wc.lpfnWndProc = self._on_message | ||
wc.lpszClassName = self.__class__.__name__ | ||
wc.hInstance = win32api.GetModuleHandle(None) | ||
class_atom = win32gui.RegisterClass(wc) | ||
return win32gui.CreateWindow( | ||
class_atom, self.__class__.__name__, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None | ||
) | ||
|
||
def _on_message(self, hwnd: int, msg: int, wparam: int, lparam: int): | ||
from win32con import ( | ||
DBT_DEVICEARRIVAL, | ||
DBT_DEVICEREMOVECOMPLETE, | ||
WM_DEVICECHANGE, | ||
) | ||
|
||
if msg != WM_DEVICECHANGE: | ||
return 0 | ||
if wparam not in [DBT_DEVICEARRIVAL, DBT_DEVICEREMOVECOMPLETE]: | ||
return 0 | ||
self.call_handlers() | ||
return 0 | ||
|
||
def run_impl_win32(self): | ||
""" | ||
Listens to Win32 `WM_DEVICECHANGE` messages | ||
and trigger a callback when a device has been plugged in or out | ||
See: https://docs.microsoft.com/en-us/windows/win32/devio/wm-devicechange | ||
""" | ||
import win32gui | ||
|
||
hwnd = self._create_window() | ||
verbose(f"Created listener window with hwnd={hwnd:x}") | ||
self.call_handlers() | ||
verbose("Listening to messages") | ||
while self.should_run(): | ||
win32gui.PumpWaitingMessages() | ||
sleep(0.5) | ||
verbose("Listener stopped") | ||
|
||
def run_impl(self): | ||
import platform | ||
|
||
match platform.system(): | ||
case "Windows": | ||
self.run_impl_win32() | ||
case _: | ||
verbose("Running dummy PortWatcher impl") | ||
self.call_handlers() | ||
|
||
def call_handlers(self) -> None: | ||
for handler in self.handlers: | ||
try: | ||
handler() | ||
except Exception as e: | ||
error("DeviceWatcher handler threw an exception", exc_info=e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,12 @@ | ||
# Copyright (c) Kuba Szczodrzyński 2023-1-9. | ||
|
||
from time import sleep | ||
from typing import Callable | ||
from warnings import warn | ||
|
||
from ltchiptool.util.cli import list_serial_ports | ||
from ltchiptool.util.logging import verbose | ||
warn( | ||
"PortWatcher has been removed, please use DevicesBase instead", | ||
stacklevel=2, | ||
) | ||
|
||
from .base import BaseThread | ||
|
||
|
||
# Win32 part based on https://abdus.dev/posts/python-monitor-usb/ | ||
class PortWatcher(BaseThread): | ||
def __init__(self, on_event: Callable[[list[tuple[str, bool, str]]], None]): | ||
super().__init__() | ||
self.on_event = on_event | ||
|
||
def _create_window(self): | ||
""" | ||
Create a window for listening to messages | ||
https://docs.microsoft.com/en-us/windows/win32/learnwin32/creating-a-window#creating-the-window | ||
See also: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-createwindoww | ||
:return: window hwnd | ||
""" | ||
import win32api | ||
import win32gui | ||
|
||
wc = win32gui.WNDCLASS() | ||
wc.lpfnWndProc = self._on_message | ||
wc.lpszClassName = self.__class__.__name__ | ||
wc.hInstance = win32api.GetModuleHandle(None) | ||
class_atom = win32gui.RegisterClass(wc) | ||
return win32gui.CreateWindow( | ||
class_atom, self.__class__.__name__, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None | ||
) | ||
|
||
def _on_message(self, hwnd: int, msg: int, wparam: int, lparam: int): | ||
from win32con import ( | ||
DBT_DEVICEARRIVAL, | ||
DBT_DEVICEREMOVECOMPLETE, | ||
WM_DEVICECHANGE, | ||
) | ||
|
||
if msg != WM_DEVICECHANGE: | ||
return 0 | ||
if wparam not in [DBT_DEVICEARRIVAL, DBT_DEVICEREMOVECOMPLETE]: | ||
return 0 | ||
self.on_event(list_serial_ports()) | ||
return 0 | ||
|
||
def run_impl_win32(self): | ||
""" | ||
Listens to Win32 `WM_DEVICECHANGE` messages | ||
and trigger a callback when a device has been plugged in or out | ||
See: https://docs.microsoft.com/en-us/windows/win32/devio/wm-devicechange | ||
""" | ||
import win32gui | ||
|
||
hwnd = self._create_window() | ||
verbose(f"Created listener window with hwnd={hwnd:x}") | ||
self.on_event(list_serial_ports()) | ||
verbose("Listening to messages") | ||
while self.should_run(): | ||
win32gui.PumpWaitingMessages() | ||
sleep(0.5) | ||
verbose("Listener stopped") | ||
|
||
def run_impl(self): | ||
import platform | ||
|
||
match platform.system(): | ||
case "Windows": | ||
self.run_impl_win32() | ||
case _: | ||
verbose("Running dummy PortWatcher impl") | ||
self.on_event(list_serial_ports()) | ||
class PortWatcher: | ||
pass |