diff --git a/dist/PyMonCtl-0.0.10-py3-none-any.whl b/dist/PyMonCtl-0.0.10-py3-none-any.whl new file mode 100644 index 0000000..11ee42d Binary files /dev/null and b/dist/PyMonCtl-0.0.10-py3-none-any.whl differ diff --git a/dist/PyMonCtl-0.0.9-py3-none-any.whl b/dist/PyMonCtl-0.0.9-py3-none-any.whl deleted file mode 100644 index 0098754..0000000 Binary files a/dist/PyMonCtl-0.0.9-py3-none-any.whl and /dev/null differ diff --git a/src/ewmhlib/Props.py b/src/ewmhlib/Props.py index 801efde..3019b59 100644 --- a/src/ewmhlib/Props.py +++ b/src/ewmhlib/Props.py @@ -116,6 +116,7 @@ class MoveResize(IntEnum): class DataFormat(IntEnum): + # I guess 16 is not used in Python (no difference between short and long int) STR = 8 INT = 32 diff --git a/src/ewmhlib/__init__.py b/src/ewmhlib/__init__.py index efa8df7..6e83ec3 100644 --- a/src/ewmhlib/__init__.py +++ b/src/ewmhlib/__init__.py @@ -3,7 +3,7 @@ import sys assert sys.platform == "linux" -from ._ewmhlib import (getAllDisplaysInfo, getDisplayFromRoot, getDisplayFromWindow, +from ._ewmhlib import (displaysCount, getDisplaysNames, getDisplaysInfo, getDisplayFromRoot, getDisplayFromWindow, getProperty, getPropertyValue, changeProperty, sendMessage, _xlibGetAllWindows, defaultDisplay, defaultScreen, defaultRoot, RootWindow, defaultRootWindow, EwmhWindow ) @@ -11,7 +11,7 @@ import ewmhlib.Structs as Structs __all__ = [ - "version", "getAllDisplaysInfo", "getDisplayFromRoot", "getDisplayFromWindow", + "version", "displaysCount", "getDisplaysNames", "getDisplaysInfo", "getDisplayFromRoot", "getDisplayFromWindow", "getProperty", "getPropertyValue", "changeProperty", "sendMessage", "defaultDisplay", "defaultScreen", "defaultRoot", "defaultRootWindow", "RootWindow", "EwmhWindow" ] diff --git a/src/ewmhlib/_ewmhlib.py b/src/ewmhlib/_ewmhlib.py index f033c7d..782bf36 100644 --- a/src/ewmhlib/_ewmhlib.py +++ b/src/ewmhlib/_ewmhlib.py @@ -1,6 +1,7 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- from __future__ import annotations -import ctypes import sys assert sys.platform == "linux" @@ -23,9 +24,8 @@ import Xlib.xobject from Xlib.xobject.drawable import Window as XWindow -from .Props import * -from .Structs import * -from .Structs import _XWindowAttributes +from ewmhlib.Props import Root, DesktopLayout, Window, WindowType, State, StateAction, MoveResize, DataFormat, Mode, HintAction +from ewmhlib.Structs import DisplaysInfo, ScreensInfo, WmHints, Aspect, WmNormalHints, _XWindowAttributes defaultDisplay = Xlib.display.Display() @@ -33,7 +33,43 @@ defaultRoot = defaultScreen.root -def getAllDisplaysInfo() -> dict[str, DisplaysInfo]: +def _getDisplaysCount() -> int: + count = 0 + files: List[str] = os.listdir("/tmp/.X11-unix") + for d in files: + if d.startswith("X"): + name: str = ":" + d[1:] + try: + display: Xlib.display.Display = Xlib.display.Display(name) + display.close() + count += 1 + except: + pass + return count + + +displaysCount = _getDisplaysCount() + + +def getDisplaysNames() -> List[str]: + displays: List[str] = [] + if displaysCount > 1: + files: List[str] = os.listdir("/tmp/.X11-unix") + for d in files: + if d.startswith("X"): + name: str = ":" + d[1:] + try: + display: Xlib.display.Display = Xlib.display.Display(name) + display.close() + displays.append(name) + except: + pass + else: + displays.append(defaultDisplay.get_display_name()) + return displays + + +def getDisplaysInfo() -> dict[str, DisplaysInfo]: """ Gets relevant information on all present displays, including its screens and roots @@ -49,32 +85,29 @@ def getAllDisplaysInfo() -> dict[str, DisplaysInfo]: :return: dict with all displays, screens and roots info """ - displays: List[str] = os.listdir("/tmp/.X11-unix") dspInfo: dict[str, DisplaysInfo] = {} - for d in displays: - if d.startswith("X"): - name: str = ":" + d[1:] - display: Xlib.display.Display = Xlib.display.Display(name) - screens: List[ScreensInfo] = [] - for s in range(display.screen_count()): - try: - screen: Struct = display.screen(s) - screenInfo: ScreensInfo = { - "screen_number": str(s), - "is_default": (screen.root.id == defaultRoot.id), - "screen": screen, - "root": screen.root - } - screens.append(screenInfo) - except: - pass - displayInfo: DisplaysInfo = { - "name": name, - "is_default": (display.get_display_name() == defaultDisplay.get_display_name()), - "screens": screens - } - display.close() - dspInfo[name] = displayInfo + for name in getDisplaysNames(): + display = Xlib.display.Display(name) + screens: List[ScreensInfo] = [] + for s in range(display.screen_count()): + try: + screen: Struct = display.screen(s) + screenInfo: ScreensInfo = { + "screen_number": str(s), + "is_default": (screen.root.id == defaultRoot.id), + "screen": screen, + "root": screen.root + } + screens.append(screenInfo) + except: + pass + displayInfo: DisplaysInfo = { + "name": name, + "is_default": (display.get_display_name() == defaultDisplay.get_display_name()), + "screens": screens + } + display.close() + dspInfo[name] = displayInfo return dspInfo @@ -85,41 +118,28 @@ def getDisplayFromWindow(winId: int) -> Tuple[Xlib.display.Display, Struct, XWin :param winId: id of the window :return: tuple containing display connection, screen struct and root window """ - # res, attr = _XlibAttributes(winId) + # res, attr = _XGetAttributes(winId) # if res and hasattr(attr, "root"): # return getDisplayFromRoot(attr.root) # else: - displays: List[str] = os.listdir("/tmp/.X11-unix") - check = False - if len(displays) > 1: - check = True - elif len(displays) == 1: - name: str = ":" + displays[0][1:] - display: Xlib.display.Display = Xlib.display.Display(name) - if display.screen_count() > 1: + if displaysCount > 1 or defaultDisplay.screen_count() > 1: + for name in getDisplaysNames(): + display = Xlib.display.Display(name) + atom: int = display.get_atom(Root.CLIENT_LIST) + for s in range(display.screen_count()): + try: + scr: Struct = display.screen(s) + r: XWindow = scr.root + ret: Optional[Xlib.protocol.request.GetProperty] = r.get_full_property(atom, Xlib.X.AnyPropertyType) + if ret and hasattr(ret, "value") and winId in ret.value: + return display, scr, r + except: + pass display.close() - check = True - if check: - for i, d in enumerate(displays): - if d.startswith("X"): - name = ":" + d[1:] - display = Xlib.display.Display(name) - atom: int = display.get_atom(Root.CLIENT_LIST) - for s in range(display.screen_count()): - try: - scr: Struct = display.screen(s) - r: XWindow = scr.root - ret: Optional[Xlib.protocol.request.GetProperty] = r.get_full_property(atom, Xlib.X.AnyPropertyType) - if ret and hasattr(ret, "value"): - if winId in ret: - return display, scr, r - except: - pass - display.close() return defaultDisplay, defaultScreen, defaultRoot -def getDisplayFromRoot(rootId: int) -> Tuple[Xlib.display.Display, Struct, XWindow]: +def getDisplayFromRoot(rootId: Optional[int]) -> Tuple[Xlib.display.Display, Struct, XWindow]: """ Gets display connection, screen and root window from a given root id to which it belongs. For default root, this is not needed. Use defaultDisplay, defaultScreen and defaultRoot instead. @@ -127,34 +147,22 @@ def getDisplayFromRoot(rootId: int) -> Tuple[Xlib.display.Display, Struct, XWind :param rootId: id of the target root :return: tuple containing display connection, screen struct and root window """ - displays: List[str] = os.listdir("/tmp/.X11-unix") - check = False - if len(displays) > 1: - check = True - elif len(displays) == 1: - name: str = ":" + displays[0][1:] - display: Xlib.display.Display = Xlib.display.Display(name) - if display.screen_count() > 1: + if rootId and rootId != defaultRoot.id and (displaysCount > 1 or defaultDisplay.screen_count() > 1): + for name in getDisplaysNames(): + display = Xlib.display.Display(name) + for s in range(display.screen_count()): + try: + scr: Struct = display.screen(s) + r: XWindow = scr.root + if rootId == r.id: + return display, scr, r + except: + pass display.close() - check = True - if check: - for i, d in enumerate(displays): - if d.startswith("X"): - name = ":" + d[1:] - display = Xlib.display.Display(name) - for s in range(display.screen_count()): - try: - scr: Struct = display.screen(s) - r: XWindow = scr.root - if rootId == r.id: - return display, scr, r - except: - pass - display.close() return defaultDisplay, defaultScreen, defaultRoot -def getProperty(window: XWindow, prop: Union[str, int, Root, Window], +def getProperty(window: XWindow, prop: Union[str, int], prop_type: int = Xlib.X.AnyPropertyType, display: Xlib.display.Display = defaultDisplay) -> Optional[Xlib.protocol.request.GetProperty]: """ @@ -168,13 +176,12 @@ def getProperty(window: XWindow, prop: Union[str, int, Root, Window], """ if isinstance(prop, str): prop = display.get_atom(prop) - if isinstance(prop, int) and prop != 0: - return window.get_full_property(prop, prop_type, 10) + return window.get_full_property(prop, prop_type) return None -def changeProperty(window: XWindow, prop: Union[str, int, Root, Window], data: Union[List[int], str], +def changeProperty(window: XWindow, prop: Union[str, int], data: Union[List[int], str], prop_type: int = Xlib.Xatom.ATOM, propMode: int = Xlib.X.PropModeReplace, display: Xlib.display.Display = defaultDisplay): """ @@ -191,19 +198,18 @@ def changeProperty(window: XWindow, prop: Union[str, int, Root, Window], data: U prop = display.get_atom(prop) if isinstance(prop, int) and prop != 0: - # I think (to be confirmed) that 16 is not used in Python (no difference between short and long int) if isinstance(data, str): dataFormat: int = DataFormat.STR data = data.encode(encoding="utf-8") else: - data = (data + [0] * (5 - len(data)))[:5] dataFormat = DataFormat.INT + data = (data + [0] * (5 - len(data)))[:5] window.change_property(prop, prop_type, dataFormat, data, propMode) display.flush() -def sendMessage(winId: int, prop: Union[str, int, Root, Window], data: Union[List[int], str], +def sendMessage(winId: int, prop: Union[str, int], data: Union[List[int], str], display: Xlib.display.Display = defaultDisplay, root: XWindow = defaultRoot): """ Send Client Message to given window/root @@ -218,7 +224,6 @@ def sendMessage(winId: int, prop: Union[str, int, Root, Window], data: Union[Lis prop = display.get_atom(prop) if isinstance(prop, int) and prop != 0: - # I think (to be confirmed) that 16 is not used in Python (no difference between short and long int) if isinstance(data, str): dataFormat: int = DataFormat.STR else: @@ -242,7 +247,7 @@ def getPropertyValue(prop: Optional[Xlib.protocol.request.GetProperty], text: bo :param display: display to which window belongs to (defaults to default display) :return: extracted property data (as a list of integers or strings) or None """ - if prop is not None: + if prop and hasattr(prop, "value"): # Value is either bytes (separated by '\x00' when multiple values) or array.array of integers. # The type of array values is stored in array.typecode ('I' in this case). valueData: Union[array.array[int], bytes] = prop.value @@ -254,7 +259,7 @@ def getPropertyValue(prop: Optional[Xlib.protocol.request.GetProperty], text: bo resultStr = [display.get_atom_name(a) for a in valueData if isinstance(a, int) and a != 0] return resultStr else: - resultInt: List[int] = [a for a in valueData] + resultInt: List[int] = [a for a in valueData if isinstance(a, int)] return resultInt # Leaving this to detect if data has an unexpected type return [a for a in valueData] if isinstance(valueData, Iterable) else [valueData] @@ -290,20 +295,15 @@ class RootWindow: WM_PROTOCOLS messages (PING/SYNC) are accessible using wmProtocols subclass (RootWindow.wmProtocols.Ping/Sync) """ - def __init__(self, root: Optional[XWindow] = None): + def __init__(self, root: Optional[Union[XWindow, int]] = None): - if root and root.id != defaultRoot.id: - self.display, self.screen, self.root = getDisplayFromRoot(root.id) - else: - self.display = defaultDisplay - self.screen = defaultScreen - self.root = defaultRoot + if root and isinstance(root, XWindow): + root = root.id + self.display, self.screen, self.root = getDisplayFromRoot(root) self.id: int = self.root.id self.wmProtocols = self._WmProtocols(self.display, self.root) - def getProperty(self, prop: Union[str, int, Root, Window], - prop_type: int = Xlib.X.AnyPropertyType) \ - -> Optional[Xlib.protocol.request.GetProperty]: + def getProperty(self, prop: Union[str, int], prop_type: int = Xlib.X.AnyPropertyType) -> Optional[Xlib.protocol.request.GetProperty]: """ Retrieves given property from root @@ -315,7 +315,7 @@ def getProperty(self, prop: Union[str, int, Root, Window], prop = self.display.get_atom(prop) return getProperty(self.root, prop, prop_type, self.display) - def setProperty(self, prop: Union[str, int, Root, Window], data: Union[List[int], str]): + def setProperty(self, prop: Union[str, int], data: Union[List[int], str]): """ Sets the given property for root @@ -326,7 +326,7 @@ def setProperty(self, prop: Union[str, int, Root, Window], data: Union[List[int] """ sendMessage(self.root.id, prop, data, self.display, self.root) - def sendMessage(self, winId: int, prop: Union[str, int, Root, Window], data: Union[List[int], str]): + def sendMessage(self, winId: int, prop: Union[str, int], data: Union[List[int], str]): """ Sends a ClientMessage event to given window @@ -422,9 +422,7 @@ def getDesktopGeometry(self) -> Optional[List[int]]: :return: tuple of integers (width, height) or None if it couldn't be retrieved """ ret: Optional[Xlib.protocol.request.GetProperty] = self.getProperty(Root.DESKTOP_GEOMETRY) - print("RET", ret) res: Optional[Union[List[int], List[str]]] = getPropertyValue(ret, display=self.display) - print("RES", res) if res is not None: res = cast(List[int], res) return res @@ -483,7 +481,7 @@ def getCurrentDesktop(self) -> Optional[int]: This MUST be set and updated by the Window Manager. If a Pager wants to switch to another virtual desktop, it MUST send a _NET_CURRENT_DESKTOP client message to the root window - :return: index of current desktop in int format or None if couldn't be retrieved + :return: index of current desktop in int format or None if it couldn't be retrieved """ ret: Optional[Xlib.protocol.request.GetProperty] = self.getProperty(Root.CURRENT_DESKTOP) res: Optional[Union[List[int], List[str]]] = getPropertyValue(ret, display=self.display) @@ -1003,22 +1001,22 @@ class EwmhWindow: available using extensions subclass (EwmhWindow.extensions.*) """ - def __init__(self, winId: int, root: XWindow = defaultRoot): + def __init__(self, window: Union[int, XWindow]): - self.root = root - if root.id != defaultRoot.id: - self.display, self.screen, _ = getDisplayFromRoot(root.id) + if isinstance(window, XWindow): + self.id: int = window.id + self.display, self.screen, self.root = getDisplayFromWindow(self.id) + self.xWindow: XWindow = window else: - self.display = defaultDisplay - self.screen = defaultScreen + self.id = window + self.display, self.screen, self.root = getDisplayFromWindow(self.id) + self.xWindow = self.display.create_resource_object('window', self.id) self.rootWindow: RootWindow = defaultRootWindow if self.root.id == defaultRoot.id else RootWindow(self.root) - self.xWindow: XWindow = self.display.create_resource_object('window', winId) - self.id: int = winId - self.extensions = _Extensions(winId, self.display, self.root) + self.extensions = _Extensions(window, self.display, self.root) self._currDesktop = os.environ['XDG_CURRENT_DESKTOP'].lower() - def getProperty(self, prop: Union[str, int, Root, Window], prop_type: int = Xlib.X.AnyPropertyType) \ + def getProperty(self, prop: Union[str, int], prop_type: int = Xlib.X.AnyPropertyType) \ -> Optional[Xlib.protocol.request.GetProperty]: """ Retrieves given property data from given window @@ -1031,7 +1029,7 @@ def getProperty(self, prop: Union[str, int, Root, Window], prop_type: int = Xlib prop = self.display.get_atom(prop) return getProperty(self.xWindow, prop, prop_type, self.display) - def sendMessage(self, prop: Union[str, int, Root, Window], data: Union[List[int], str]): + def sendMessage(self, prop: Union[str, int], data: Union[List[int], str]): """ Sends a ClientMessage event to current window @@ -1040,7 +1038,7 @@ def sendMessage(self, prop: Union[str, int, Root, Window], data: Union[List[int] """ return sendMessage(self.id, prop, data) - def changeProperty(self, prop: Union[str, int, Root, Window], data: Union[List[int], str], + def changeProperty(self, prop: Union[str, int], data: Union[List[int], str], prop_type: int = Xlib.Xatom.ATOM, propMode: Mode = Mode.REPLACE): """ Sets given property for the current window. The property might be ignored by the Window Manager, but returned @@ -1839,7 +1837,7 @@ def setMoveResize(self, gravity: int = 0, x: Optional[int] = None, y: Optional[i gravity_flags = gravity_flags | (1 << 12) else: gravity_flags = gravity_flags | (1 << 13) - self.sendMessage(Root.MOVERESIZE, [gravity_flags, x, y, width, height]) + self.sendMessage(Window.MOVERESIZE, [gravity_flags, x, y, width, height]) def setWmMoveResize(self, x_root: int, y_root: int, orientation: Union[int, MoveResize], button: int, userAction: bool = True): """ @@ -2556,30 +2554,11 @@ def stop(self): Start a new watchdog using start() again. """ if self._threadStarted and self._checkThread is not None: - timer = threading.Timer(self._interval * 2, self._forceStop) - timer.start() self._threadStarted = False self._stopRequested = True self._keep.set() - self._checkThread.join() + self._checkThread.join(1) self._checkThread = None - timer.cancel() - - def _getTid(self): - if self._checkThread and self._checkThread.is_alive(): - if hasattr(self._checkThread, '_thread_id'): - return self._checkThread._thread_id - for id, thread in threading._active.items(): - if thread is self._checkThread: - return id - return None - - def _forceStop(self): - thread_id = self._getTid() - if thread_id is not None: - res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) - if res > 1: - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) def _getWindowParent(win: XWindow, rootId: int) -> int: @@ -2750,14 +2729,14 @@ def _closeTransient(transientWindow: EwmhWindow): transientWindow.setClosed() -_xlib: Optional[Union[CDLL, int]] = None -_xcomp: Optional[Union[CDLL, int]] = None +_xlib: Optional[Union[CDLL, int]] = -1 +_xcomp: Optional[Union[CDLL, int]] = -1 -def _loadX11Library() -> Optional[Union[CDLL, int]]: +def _loadX11Library() -> Optional[CDLL]: global _xlib - if _xlib is None: - lib: Union[CDLL, int] = -1 + if isinstance(_xlib, int): + lib: Optional[CDLL] = None try: libPath: Optional[str] = find_library('X11') if libPath: @@ -2768,10 +2747,10 @@ def _loadX11Library() -> Optional[Union[CDLL, int]]: return _xlib -def _loadXcompLibrary() -> Optional[Union[CDLL, int]]: +def _loadXcompLibrary() -> Optional[CDLL]: global _xcomp - if _xcomp is None: - lib: Union[CDLL, int] = -1 + if isinstance(_xcomp, int): + lib: Optional[CDLL] = None try: libPath: Optional[str] = find_library('Xcomposite') if libPath: @@ -2783,22 +2762,45 @@ def _loadXcompLibrary() -> Optional[Union[CDLL, int]]: def _XGetAttributes(winId: int, dpyName: str = "") -> Tuple[bool, _XWindowAttributes]: - resOK: bool = False + """ + int x, y; /* location of window */ + int width, height; /* width and height of window */ + int border_width; /* border width of window */ + int depth; /* depth of window */ + Visual *visual; /* the associated visual structure */ + Window root; /* root of screen containing window */ + int class; /* InputOutput, InputOnly*/ + int bit_gravity; /* one of the bit gravity values */ + int win_gravity; /* one of the window gravity values */ + int backing_store; /* NotUseful, WhenMapped, Always */ + unsigned long backing_planes; /* planes to be preserved if possible */ + unsigned long backing_pixel; /* value to be used when restoring planes */ + Bool save_under; /* boolean, should bits under be saved? */ + Colormap colormap; /* color map to be associated with window */ + Bool map_installed; /* boolean, is color map currently installed*/ + int map_state; /* IsUnmapped, IsUnviewable, IsViewable */ + long all_event_masks; /* set of events all people have interest in*/ + long your_event_mask; /* my event mask */ + long do_not_propagate_mask; /* set of events that should not propagate */ + Bool override_redirect; /* boolean value for override-redirect */ + Screen *screen; /* back pointer to correct screen */ + """ + res: bool = False attr: _XWindowAttributes = _XWindowAttributes() - xlib: Optional[Union[CDLL, int]] = _loadX11Library() + xlib: Optional[CDLL] = _loadX11Library() - if isinstance(xlib, CDLL): + if xlib: try: if not dpyName: dpyName = defaultDisplay.get_display_name() dpy: int = xlib.XOpenDisplay(dpyName.encode()) xlib.XGetWindowAttributes(dpy, winId, byref(attr)) xlib.XCloseDisplay(dpy) - resOK = True + res = True except: pass - return resOK, attr + return res, attr # Leaving this as reference of using X11 library # https://github.com/evocount/display-management/blob/c4f58f6653f3457396e44b8c6dc97636b18e8d8a/displaymanagement/rotation.py diff --git a/src/pymonctl/__init__.py b/src/pymonctl/__init__.py index b044303..5c8d383 100644 --- a/src/pymonctl/__init__.py +++ b/src/pymonctl/__init__.py @@ -2,23 +2,24 @@ # -*- coding: utf-8 -*- from __future__ import annotations -import ctypes import sys import threading from abc import abstractmethod, ABC from collections.abc import Callable from typing import List, Optional, Union, Tuple -from .structs import DisplayMode, ScreenValue, Size, Point, Box, Rect, Position, Orientation +from pymonctl.structs import DisplayMode, ScreenValue, Size, Point, Box, Rect, Position, Orientation __all__ = [ "getAllMonitors", "getPrimary", "findMonitor", "findMonitorInfo", "arrangeMonitors", - "enableUpdate", "disableUpdate", "isUpdateEnabled", "updateInterval", + "enableUpdateInfo", "disableUpdateInfo", "isUpdateInfoEnabled", "isWatchdogEnabled", "updateWatchdogInterval", + "plugListenerRegister", "plugListenerUnregister", "isPlugListenerRegistered", + "changeListenerRegister", "changeListenerUnregister", "isChangeListenerRegistered", "DisplayMode", "ScreenValue", "Size", "Point", "Box", "Rect", "Position", "Orientation", "getMousePos", "version", "Monitor" ] -__version__ = "0.0.9" +__version__ = "0.0.10" def version(numberOnly: bool = True) -> str: @@ -503,18 +504,20 @@ def attach(self): """ Attach a previously detached monitor to system - WARNING: not working in Linux nor macOS (... yet?) + WARNING: not working in macOS (... yet?) """ raise NotImplementedError @abstractmethod def detach(self, permanent: bool = False): """ - Detach monitor from system + Detach monitor from system. + + Be aware that if you detach a monitor and the script ends, you will have to physically re-attach the monitor. It will not likely work if system has just one monitor plugged. - WARNING: not working in Linux nor macOS (... yet?) + WARNING: not working in macOS (... yet?) """ raise NotImplementedError @@ -527,48 +530,54 @@ def isAttached(self) -> Optional[bool]: raise NotImplementedError +_updateRequested = False +_plugListeners: List[Callable[[List[str], dict[str, ScreenValue]], None]] = [] +_lockPlug = threading.RLock() +_changeListeners: List[Callable[[List[str], dict[str, ScreenValue]], None]] = [] +_lockChange = threading.RLock() +_kill = threading.Event() + + class _UpdateScreens(threading.Thread): - def __init__(self, interval: float = 0.3, - monitorCountChanged: Optional[Callable[[List[str], dict[str, ScreenValue]], None]] = None, - monitorPropsChanged: Optional[Callable[[List[str], dict[str, ScreenValue]], None]] = None): + def __init__(self, kill: threading.Event()): threading.Thread.__init__(self) - self._kill = threading.Event() - self._interval = interval - self._monitorCountChanged = monitorCountChanged - self._monitorPropsChanged = monitorPropsChanged + self._kill = kill + self._interval = 0.5 self._screens: dict[str, ScreenValue] = _getAllMonitorsDict() self._monitors: list[Monitor] = [] - self._count = _getMonitorsCount() def run(self): # _eventLoop(self._kill, self._interval) + global _updateRequested + global _plugListeners + global _changeListeners + while not self._kill.is_set(): - screens = _getAllMonitorsDict() - currentScreens = list(self._screens.keys()) + if _updateRequested or _plugListeners or _changeListeners: - if self._monitorCountChanged is not None: + screens = _getAllMonitorsDict() newScreens = list(screens.keys()) - countNewScreens = len(newScreens) - if self._count != countNewScreens: - self._count = countNewScreens - names = [s for s in newScreens if s not in currentScreens] + [s for s in currentScreens if s not in newScreens] - self._monitorCountChanged(names, screens) + currentScreens = list(self._screens.keys()) + + if currentScreens != newScreens: + names = [s for s in newScreens if s not in currentScreens] + \ + [s for s in currentScreens if s not in newScreens] + for listener in _plugListeners: + listener(names, screens) - if self._monitorPropsChanged is not None: if self._screens != screens: - names = [] - for s in screens.keys(): - if s in currentScreens: - if screens[s] != self._screens[s]: - names.append(s) - self._monitorPropsChanged(names, screens) - self._screens = screens - self._monitors = _getAllMonitors() + names = [s for s in newScreens if s in currentScreens and screens[s] != self._screens[s]] + self._screens = screens + if names: + for listener in _changeListeners: + listener(names, screens) + + self._monitors = _getAllMonitors() self._kill.wait(self._interval) @@ -581,32 +590,12 @@ def getScreens(self) -> dict[str, ScreenValue]: def getMonitors(self) -> list[Monitor]: return self._monitors - def kill(self): - self._kill.set() - - def forceKill(self): - # https://code.activestate.com/recipes/496960-thread2-killable-threads/ - try: - tid = self.native_id - if not tid: - for objid, tobj in threading._active.items(): - if tobj is self: - tid = objid - except: - tid = None - if tid: - res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(SystemExit)) - if res > 1: - # ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) - ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None) - _updateScreens: Optional[_UpdateScreens] = None +_lockUpdate = threading.RLock() -def enableUpdate(interval: float = 1.0, - monitorCountChanged: Optional[Callable[[List[str], dict[str, ScreenValue]], None]] = None, - monitorPropsChanged: Optional[Callable[[List[str], dict[str, ScreenValue]], None]] = None): +def enableUpdateInfo(): """ Enable this only if you need to keep track of monitor-related events like changing its resolution, position, or if monitors can be dynamically plugged or unplugged in a multi-monitor setup. This function can also be @@ -618,119 +607,242 @@ def enableUpdate(interval: float = 1.0, If disabled, the information on the monitors connected to the system will be updated right at the moment, but this might be slow and CPU-consuming, especially if quickly and repeatedly invoked. + """ + global _updateRequested + _updateRequested = True + if _updateScreens is None: + _startUpdateScreens() + + +def disableUpdateInfo(): + """ + The monitors information will be immediately queried after disabling this feature, not taking advantage of + keeping information updated on a separate thread. - It is also possible to define callbacks to be notified in case the number of connected monitors or their - properties change. The information passed to the callbacks is: + Enable this process again, or invoke getMonitors() function if you need updated info. + """ + global _updateRequested + _updateRequested = False + if not _plugListeners and not _changeListeners and not _updateRequested: + _killUpdateScreens() + + +def plugListenerRegister(monitorCountChanged: Callable[[List[str], dict[str, ScreenValue]], None]): + """ + Use this only if you need to keep track of monitor that can be dynamically plugged or unplugged in a + multi-monitor setup. + + The registered callbacks will be invoked in case the number of connected monitors change. + The information passed to the callbacks is: - Names of the screens which have changed (as a list of strings). - All screens info, as returned by getAllMonitorsDict() function. - It is possible to access all their properties by using screen name as dictionary key + It is possible to access all monitors information by using screen name as dictionary key - :param interval: Wait interval for the thread loop in seconds (or fractions). Adapt to your needs. Defaults to 1.0. - Higher values will take longer to detect and notify changes. - Lower values will consume more CPU, and will produce fake, non-final notifications (triggered by intermediate states). :param monitorCountChanged: callback to be invoked in case the number of monitor connected changes - :param monitorPropsChanged: callback to be invoked in case the properties of connected monitors change """ - global _updateScreens + global _plugListeners + global _lockPlug + with _lockPlug: + if monitorCountChanged not in _plugListeners: + _plugListeners.append(monitorCountChanged) if _updateScreens is None: - _updateScreens = _UpdateScreens(interval, monitorCountChanged, monitorPropsChanged) - _updateScreens.daemon = True - _updateScreens.start() + _startUpdateScreens() -def disableUpdate(): +def plugListenerUnregister(monitorCountChanged: Callable[[List[str], dict[str, ScreenValue]], None]): + """ + Use this function to un-register your custom callback. The callback will not be invoked anymore in case + the number of monitor changes. + + :param monitorCountChanged: callback previously registered """ - Stop and kill thread. The monitors information will be immediately queried after disabling this process, - not taking advantage of keeping information updated on a separate thread. + global _plugListeners + global _lockPlug + with _lockPlug: + try: + objIndex = _plugListeners.index(monitorCountChanged) + _plugListeners.pop(objIndex) + except: + pass + if not _plugListeners and not _changeListeners and not _updateRequested: + _killUpdateScreens() - Besides, the callbacks provided (if any) will not be invoked anymore, even though monitors change. - Enable this process again, or invoke getMonitors() function if you need updated info. +def changeListenerRegister(monitorPropsChanged: Callable[[List[str], dict[str, ScreenValue]], None]): """ - global _updateScreens - if _updateScreens is not None: - timer = threading.Timer(_updateScreens._interval * 2, _forceStop) - timer.start() - _updateScreens.kill() - _updateScreens.join() - _updateScreens = None - timer.cancel() + Use this only if you need to keep track of monitor properties changes (position, size, refresh-rate, etc.) in a + multi-monitor setup. + + The registered callbacks will be invoked in case these properties change. + The information passed to the callbacks is: + + - Names of the screens which have changed (as a list of strings). + - All screens info, as returned by getAllMonitorsDict() function. + It is possible to access all monitor information by using screen name as dictionary key -def _forceStop(): - if _updateScreens is not None: + :param monitorPropsChanged: callback to be invoked in case the number of monitor properties change + """ + global _changeListeners + global _lockChange + with _lockChange: + if monitorPropsChanged not in _changeListeners: + _changeListeners.append(monitorPropsChanged) + if _updateScreens is None: + _startUpdateScreens() + + +def changeListenerUnregister(monitorPropsChanged: Callable[[List[str], dict[str, ScreenValue]], None]): + """ + Use this function to un-register your custom callback. The callback will not be invoked anymore in case + the monitor properties change. + + :param monitorPropsChanged: callback previously registered + """ + global _changeListeners + global _lockChange + with _lockChange: try: - _updateScreens.forceKill() + objIndex = _plugListeners.index(monitorPropsChanged) + _plugListeners.pop(objIndex) except: pass + if not _plugListeners and not _changeListeners and not _updateRequested: + _killUpdateScreens() + + +def _startUpdateScreens(): + global _updateScreens + global _lockUpdate + with _lockUpdate: + if _updateScreens is None: + _kill.clear() + _updateScreens = _UpdateScreens(_kill) + _updateScreens.daemon = True + _updateScreens.start() + + +def _killUpdateScreens(): + global _updateScreens + global _lockUpdate + global _kill + with _lockUpdate: + if _updateScreens is not None: + timer = threading.Timer(_updateScreens._interval * 2, _timerHandler) + timer.start() + try: + _kill.set() + _updateScreens.join(_updateScreens._interval * 3) + except: + pass + _updateScreens = None + timer.cancel() + + +class _TimeOutException(Exception): + pass + + +def _timerHandler(): + global _updateScreens + raise _TimeOutException() -def isUpdateEnabled() -> bool: +def isWatchdogEnabled() -> bool: """ - Get monitors watch process status (enabled / disabled) + Check if the daemon updating screens information and (if applies) invoking callbacks when needed is alive. - :return: Returns ''True'' if enabled. + If it is not, just enable update process, or register the callbacks you need. It will be automatically started. + + :return: Return ''True'' is process (thread) is alive """ global _updateScreens return bool(_updateScreens is not None) -def updateInterval(interval: float): +def isUpdateInfoEnabled() -> bool: """ - Change the wait interval for the thread loop in seconds (or fractions) + Get monitors watch process status (enabled / disabled). + + :return: Returns ''True'' if enabled. + """ + global _updateRequested + return _updateRequested + + +def isPlugListenerRegistered(monitorCountChanged: Callable[[List[str], dict[str, ScreenValue]], None]): + """ + Check if callback is already registered to be invoked when monitor plugged count change + + :return: Returns ''True'' if registered + """ + global _plugListeners + return monitorCountChanged in _plugListeners + + +def isChangeListenerRegistered(monitorPropsChanged: Callable[[List[str], dict[str, ScreenValue]], None]): + """ + Check if callback is already registered to be invoked when monitor properties change + + :return: Returns ''True'' if registered + """ + global _changeListeners + return monitorPropsChanged in _changeListeners + + +def updateWatchdogInterval(interval: float): + """ + Change the wait interval for the thread loop in seconds (or fractions), Default is 0.50 seconds. + Higher values will take longer to detect and notify changes. - Lower values will consume more CPU - :param interval: new interval value as float + Lower values will make it faster, but will consume more CPU. + + Also bear in mind that the OS will take some time to refresh changes, so lowering the update interval + may not necessarily produce better (faster) results. + + :param interval: new interval value in seconds (or fractions), as float. """ global _updateScreens if interval > 0 and _updateScreens is not None: _updateScreens.updateInterval(interval) -def _getRelativePosition(monitor, relativeTo) -> Tuple[int, int, str]: +def _getRelativePosition(monitor, relativeTo) -> Tuple[int, int]: + # TODO: Won't accept negative values!!!! MUST modify the other monitors coords... + # https://superuser.com/questions/485120/how-do-i-align-the-bottom-edges-of-two-monitors-with-xrandr relPos = monitor["relativePos"] if relPos == Position.PRIMARY: x = y = 0 - cmd = "" elif relPos == Position.LEFT_TOP: x = relativeTo["position"].x - monitor["size"].width y = relativeTo["position"].y - cmd = " --left-of %s" elif relPos == Position.LEFT_BOTTOM: x = relativeTo["position"].x - monitor["size"].width y = relativeTo["position"].y + relativeTo["size"].height - monitor["size"].height - cmd = " --left-of %s" elif relPos == Position.ABOVE_LEFT: x = relativeTo["position"].x y = relativeTo["position"].y - monitor["size"].height - cmd = " --above %s" elif relPos == Position.ABOVE_RIGHT: x = relativeTo["position"].x + relativeTo["size"].width - monitor["size"].width y = relativeTo["position"].y - monitor["size"].height - cmd = " --above %s" elif relPos == Position.RIGHT_TOP: x = relativeTo["position"].x + relativeTo["size"].width y = relativeTo["position"].y - cmd = " --right-of %s" elif relPos == Position.RIGHT_BOTTOM: x = relativeTo["position"].x + relativeTo["size"].width y = relativeTo["position"].y + relativeTo["size"].height - monitor["size"].height - cmd = " --right-of %s" elif relPos == Position.BELOW_LEFT: x = relativeTo["position"].x y = relativeTo["position"].y + relativeTo["size"].height - cmd = " --below %s" elif relPos == Position.BELOW_RIGHT: x = relativeTo["position"].x + relativeTo["size"].width - monitor["size"].width y = relativeTo["position"].y + relativeTo["size"].height - cmd = " --below %s" else: x = y = monitor["position"] - cmd = "" - return x, y, cmd + return x, y if sys.platform == "darwin": diff --git a/src/pymonctl/_pymonctl_linux.py b/src/pymonctl/_pymonctl_linux.py index 7a28ae8..0dde269 100644 --- a/src/pymonctl/_pymonctl_linux.py +++ b/src/pymonctl/_pymonctl_linux.py @@ -2,27 +2,30 @@ # -*- coding: utf-8 -*- from __future__ import annotations -import subprocess import sys -import threading -import time assert sys.platform == "linux" +import subprocess +import threading +import time + import math -import os from typing import Optional, List, Union, cast, Tuple import Xlib.display import Xlib.X +import Xlib.protocol from Xlib.ext import randr from pymonctl import BaseMonitor, _pointInBox, _getRelativePosition, \ DisplayMode, ScreenValue, Box, Rect, Point, Size, Position, Orientation -from ewmhlib import defaultRootWindow, getProperty, getPropertyValue -from ewmhlib.Props import * +from ewmhlib import displaysCount, getDisplaysNames, defaultDisplay, defaultRoot, defaultScreen, defaultRootWindow, \ + getProperty, getPropertyValue +from ewmhlib.Props import Root +# Check if randr extension is available if not defaultRootWindow.display.has_extension('RANDR'): sys.stderr.write('{}: server does not have the RANDR extension\n'.format(sys.argv[0])) ext = defaultRootWindow.display.query_extension('RANDR') @@ -31,39 +34,52 @@ if ext is None: sys.exit(1) +# Check if Xorg is running (this all will not work on Wayland or alike) +p = subprocess.Popen(["xset", "-q"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) +p.communicate() +if p.returncode != 0: + sys.stderr.write('{}: Xorg is not available\n'.format(sys.argv[0])) + sys.exit(1) + def _XgetDisplays() -> List[Xlib.display.Display]: displays: List[Xlib.display.Display] = [] - try: - files = os.listdir("/tmp/.X11-unix") - except: - files = [] - for f in files: - if f.startswith("X"): - displays.append(Xlib.display.Display(":"+f[1:])) + if displaysCount > 1 or defaultDisplay.screen_count() > 1: + for name in getDisplaysNames(): + try: + displays.append(Xlib.display.Display(name)) + except: + pass if not displays: - displays = [Xlib.display.Display()] + displays = [defaultDisplay] return displays _displays = _XgetDisplays() def _XgetRoots(): roots = [] - global _displays - for display in _displays: - for i in range(display.screen_count()): - try: - screen = display.screen(i) - roots.append([display, screen, screen.root]) - except: - pass + if displaysCount > 1 or defaultDisplay.screen_count() > 1: + global _displays + for display in _displays: + for i in range(display.screen_count()): + try: + screen = display.screen(i) + res = screen.root.xrandr_get_screen_resources() + roots.append([display, screen, screen.root, res]) + except: + pass + if not roots: + res = defaultRoot.xrandr_get_screen_resources() + roots.append([defaultDisplay, defaultScreen, defaultRoot, res]) return roots _roots = _XgetRoots() -def _getAllMonitors() -> list[LinuxMonitor]: +def _getAllMonitors(outputs=None) -> list[LinuxMonitor]: monitors = [] - for outputData in _XgetAllOutputs(): + if not outputs: + outputs = _XgetAllOutputs() + for outputData in outputs: display, screen, root, res, output, outputInfo = outputData if outputInfo.crtc: monitors.append(LinuxMonitor(output)) @@ -85,13 +101,13 @@ def _getAllMonitorsDict() -> dict[str, ScreenValue]: if outputInfo.name == monitorName and outputInfo.crtc: crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, res.config_timestamp) - is_primary = monitor.primary == 1 or (monitor.x == 0 and monitor.y == 0) + is_primary = monitor.primary == 1 x, y, w, h = monitor.x, monitor.y, monitor.width_in_pixels, monitor.height_in_pixels # https://askubuntu.com/questions/1124149/how-to-get-taskbar-size-and-position-with-python wa: List[int] = getPropertyValue(getProperty(window=root, prop=Root.WORKAREA, display=display), display=display) wx, wy, wr, wb = wa[0], wa[1], wa[2], wa[3] - dpiX, dpiY = round((w * 25.4) / monitor.width_in_millimeters), round((h * 25.4) / monitor.height_in_millimeters) - scaleX, scaleY = round((dpiX / 96) * 100), round((dpiY / 96) * 100) + dpiX, dpiY = round((w * 25.4) / (monitor.width_in_millimeters or 1)), round((h * 25.4) / (monitor.height_in_millimeters or 1)) + scaleX, scaleY = _scale(monitorName) rot = int(math.log(crtcInfo.rotation, 2)) freq = 0.0 for mode in res.modes: @@ -102,7 +118,7 @@ def _getAllMonitorsDict() -> dict[str, ScreenValue]: result[outputInfo.name] = { "system_name": outputInfo.name, - 'handle': output, + 'id': output, 'is_primary': is_primary, 'position': Point(x, y), 'size': Size(w, h), @@ -121,7 +137,7 @@ def _getMonitorsCount() -> int: count = 0 global _roots for rootData in _roots: - display, screen, root = rootData + display, screen, root, res = rootData count += len(randr.get_monitors(root).monitors) return count @@ -156,8 +172,51 @@ def _arrangeMonitors(arrangement: dict[str, dict[str, Union[str, int, Position, if not primaryPresent: return + newArrangement: dict[str, dict[str, Union[int, bool]]] = {} + newPos: dict[str, dict[str, int]] = {} + xOffset = yOffset = 0 for monName in arrangement.keys(): - _setPosition(cast(Position, arrangement[monName]["relativePos"]), str(arrangement[monName]["relativeTo"]), monName) + arrInfo = arrangement[monName] + + targetMonInfo = monitors[monName]["monitor"] + relativePos = arrInfo["relativePos"] + relativeTo = arrInfo["relativeTo"] + + targetMon = {"relativePos": relativePos, "relativeTo": relativeTo, + "position": Point(targetMonInfo.x, targetMonInfo.y), + "size": Size(targetMonInfo.width_in_pixels, targetMonInfo.height_in_pixels)} + + if relativePos == Position.PRIMARY: + x, y = 0, 0 + + else: + relMonInfo = monitors[relativeTo]["monitor"] + if relativeTo in newPos.keys(): + relX, relY = newPos[relativeTo]["x"], newPos[relativeTo]["y"] + else: + relX, relY = relMonInfo.x, relMonInfo.y + relMon = {"position": Point(relX, relY), + "size": Size(relMonInfo.width_in_pixels, relMonInfo.height_in_pixels)} + + x, y = _getRelativePosition(targetMon, relMon) + if x < 0: + xOffset += abs(x) + if y < 0: + yOffset += abs(y) + newPos[monName] = {"x": x, "y": y} + + newArrangement[monName] = { + "setPrimary": relativePos == Position.PRIMARY, + "x": x, + "y": y + } + + if newArrangement: + cmd = _buildCommand(newArrangement, xOffset, yOffset) + try: + subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + except: + pass def _getMousePos() -> Point: @@ -186,6 +245,7 @@ def __init__(self, handle: Optional[int] = None): self.display, self.screen, self.root, self.resources, self.handle, self.name = monitorData else: raise ValueError + self._crtc: dict[str, Union[int, Xlib.ext.randr.GetCrtcInfo]] = {} @property def size(self) -> Optional[Size]: @@ -237,27 +297,28 @@ def rect(self) -> Optional[Rect]: return rect @property - def scale(self) -> Optional[Tuple[float, float]]: - for monitorData in _XgetAllMonitors(self.name): - display, root, monitor, monName = monitorData - x, y, w, h = monitor.x, monitor.y, monitor.width_in_pixels, monitor.height_in_pixels - dpiX, dpiY = round((w * 25.4) / monitor.width_in_millimeters), round((h * 25.4) / monitor.height_in_millimeters) - scaleX, scaleY = round((dpiX / 96) * 100), round((dpiY / 96) * 100) - return scaleX, scaleY - return None + def scale(self) -> Tuple[Optional[float], Optional[float]]: + return _scale(self.name) def setScale(self, scale: Tuple[float, float]): - if scale is not None: + if scale is not None and self.name and self.name in _XgetAllMonitorsNames(): # https://askubuntu.com/questions/1193940/setting-monitor-scaling-to-200-with-xrandr - scaleX, scaleY = scale - cmd = " --filter nearest --scale %sx%s" % (scaleX, scaleY) - if self.name and self.name in _XgetAllMonitorsNames(): - cmd = (" --output %s" % self.name) + cmd - cmd = "xrandr" + cmd + scaleX, scaleY = round(100/ scale[0], 1), round(100 / scale[1], 1) + # cmd = "xrandr --output %s --scale %sx%s --filter nearest" % (self.name, scaleX, scaleY) + cmd = "xrandr --output %s --scale %sx%s" % (self.name, scaleX, scaleY) + retry = False try: - subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + ret = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + if ret and hasattr(ret, "returncode"): + retry = ret.returncode != 0 except: pass + # if retry: + # try: + # cmd = "xrandr --output %s --scale %sx%s" % (self.name, scaleX, scaleY) + # subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + # except: + # pass @property def dpi(self) -> Optional[Tuple[float, float]]: @@ -283,7 +344,7 @@ def setOrientation(self, orientation: Optional[Union[int, Orientation]]): # outputs = _XgetAllOutputs(self.name) # for outputData in outputs: # display, screen, root, res, output, outputInfo = outputData - # crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, Xlib.X.CurrentTime) + # crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, res.config_timestamp) # if crtcInfo and crtcInfo.mode: # randr.set_crtc_config(display, outputInfo.crtc, Xlib.X.CurrentTime, crtcInfo.x, crtcInfo.y, # crtcInfo.mode, (orientation or 1) ** 2, crtcInfo.outputs) @@ -296,10 +357,7 @@ def setOrientation(self, orientation: Optional[Union[int, Orientation]]): direction = "left" else: direction = "normal" - cmd = " -o %s" % direction - if self.name in _XgetAllMonitorsNames(): - cmd = (" --output %s" % self.name) + cmd - cmd = "xrandr" + cmd + cmd = "xrandr --output %s --rotate %s" % (self.name, direction) try: subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) except: @@ -310,7 +368,7 @@ def frequency(self) -> Optional[float]: outputs = _XgetAllOutputs(self.name) for outputData in outputs: display, screen, root, res, output, outputInfo = outputData - crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, Xlib.X.CurrentTime) + crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, res.config_timestamp) for mode in res.modes: if crtcInfo.mode == mode.id: return float(round(mode.dot_clock / ((mode.h_total * mode.v_total) or 1), 2)) @@ -383,7 +441,7 @@ def setContrast(self, contrast: Optional[int]): if contrast is not None: value = contrast / 100 if 0 <= value <= 1: - rgb = str(round(contrast, 1)) + rgb = str(round(value, 1)) gamma = rgb + ":" + rgb + ":" + rgb cmd = "xrandr --output %s --gamma %s" % (self.name, gamma) try: @@ -413,16 +471,17 @@ def setMode(self, mode: Optional[DisplayMode]): # Xlib.ext.randr.set_screen_config(defaultRootWindow.root, size_id, 0, 0, round(mode.frequency), 0) # Xlib.ext.randr.change_output_property() if mode is not None: - allModes = self.allModes - if mode in allModes: - cmd = " --mode %sx%s -r %s" % (mode.width, mode.height, round(mode.frequency, 2)) - if self.name and self.name in _XgetAllMonitorsNames(): - cmd = (" --output %s" % self.name) + cmd - cmd = "xrandr" + cmd - try: - subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) - except: - pass + # allModes = self.allModes + # if mode in allModes: + cmd = " --mode %sx%s -r %s" % (mode.width, mode.height, round(mode.frequency, 2)) + if self.name: + cmd = (" --output %s" % self.name) + cmd + cmd = "xrandr" + cmd + i = 0 + while mode != self.mode and i <= 3: + subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + i += 1 + time.sleep(0.3) def _modeB(self) -> Optional[DisplayMode]: @@ -479,7 +538,7 @@ def allModes(self) -> list[DisplayMode]: break for mode in allModes: modes.append(DisplayMode(mode.width, mode.height, - round(mode.dot_clock / ((mode.h_total * mode.v_total) or 1), 2))) + round(mode.dot_clock / ((mode.h_total * mode.v_total) or 1), 2))) return modes @property @@ -500,8 +559,21 @@ def turnOn(self): cmdPart = " --right-of %s" % monName break cmd = ("xrandr --output %s" % self.name) + cmdPart + " --auto" + i = 0 + while i <= 3 and not self.isOn: + try: + subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + except: + pass + i += 1 + time.sleep(0.3) + else: + cmd = "xset -q | grep ' Monitor is ' | awk '{ print$4 }'" try: - subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) + err, ret = subprocess.getstatusoutput(cmd) + if err == 0 and ret == "Standby": + cmd = "xset dpms force on" + subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) except: pass @@ -526,61 +598,60 @@ def isOn(self) -> bool: return self.name in _XgetAllMonitorsNames() def attach(self): - self.turnOn() - # raise NotImplementedError # This produces the same effect, but requires to keep track of last mode used - # outputs = __getAllOutputs(name) - # for outputData in outputs: - # display, screen, root, res, output, outputInfo = outputData - # if output in _crtcs.keys(): - # crtcCode, crtcInfo = _crtcs[output] - # randr.set_crtc_config(display, crtcCode, Xlib.X.CurrentTime, crtcInfo.x, crtcInfo.y, crtcInfo.mode, crtcInfo.rotation, crtcInfo.outputs) - # _crtcs.pop(output) + if self._crtc: + crtcCode = self._crtc["crtc"] + crtcInfo = self._crtc["crtc_info"] + randr.set_crtc_config(self.display, crtcCode, Xlib.X.CurrentTime, crtcInfo.x, crtcInfo.y, crtcInfo.mode, crtcInfo.rotation, crtcInfo.outputs) + self._crtc = {} def detach(self, permanent: bool = False): - self.turnOff() - # raise NotImplementedError # This produces the same effect, but requires to keep track of last mode used - # outputs = __getAllOutputs(name) - # for outputData in outputs: - # display, screen, root, res, output, outputInfo = outputData - # if outputInfo.crtc: - # crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, Xlib.X.CurrentTime) - # randr.set_crtc_config(display, outputInfo.crtc, Xlib.X.CurrentTime, crtcInfo.x, crtcInfo.y, 0, crtcInfo.rotation, []) - # _crtcs[output] = (outputInfo.crtc, crtcInfo) + outputs = _XgetAllOutputs(self.name) + for outputData in outputs: + display, screen, root, res, output, outputInfo = outputData + if outputInfo.crtc: + crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, Xlib.X.CurrentTime) + randr.set_crtc_config(display, outputInfo.crtc, Xlib.X.CurrentTime, crtcInfo.x, crtcInfo.y, 0, crtcInfo.rotation, []) + self._crtc = {"crtc": outputInfo.crtc, "crtc_info": crtcInfo} + @property def isAttached(self) -> bool: - cmd = "xrandr | grep ' connected ' | awk '{ print$1 }'" - err, ret = subprocess.getstatusoutput(cmd) - if err != 0: - ret = [] - return self.name in ret + outputs = _XgetAllOutputs(self.name) + for outputData in outputs: + display, screen, root, res, output, outputInfo = outputData + if outputInfo.crtc: + crtcInfo = randr.get_crtc_info(display, outputInfo.crtc, Xlib.X.CurrentTime) + if crtcInfo.mode != 0: + return True + break + return False def _getPrimaryName(): for monitorData in _XgetAllMonitors(): display, root, monitor, monName = monitorData - if monitor.primary == 1 or (monitor.x == 0 and monitor.y == 0): + if monitor.primary == 1: return monName return None def _setPrimary(name: str): - mainMon = _getPrimaryName() - if name != mainMon: - cmd = "xrandr --output %s --pos 0x0 --primary --left-of %s" % (name, mainMon) + if name and name != _getPrimaryName(): + cmd = "xrandr --output %s --primary" % name try: subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) except: pass -def _setPositionTwice(relativePos: Position, relativeTo: Optional[str], name: str): - # Why it has to be invoked twice? Perhaps there is an option to commit changes? - _setPositionTwice(relativePos, relativeTo, name) - time.sleep(0.5) - _setPositionTwice(relativePos, relativeTo, name) +def _getPosition(name): + pos: Optional[Point] = None + for monitorData in _XgetAllMonitors(name): + display, root, monitor, monName = monitorData + pos = Point(monitor.x, monitor.y) + return pos def _setPosition(relativePos: Position, relativeTo: Optional[str], name: str): @@ -589,54 +660,152 @@ def _setPosition(relativePos: Position, relativeTo: Optional[str], name: str): else: monitors = _XgetAllMonitorsDict() + arrangement: dict[str, dict[str, Union[int, bool]]] = {} + xOffset = yOffset = 0 if name in monitors.keys() and relativeTo in monitors.keys(): + newPos: dict[str, dict[str, int]] = {} + for monitor in monitors.keys(): + if name == monitor: + targetMonInfo = monitors[monitor]["monitor"] + targetMon = {"relativePos": relativePos, "relativeTo": relativeTo, + "position": Point(targetMonInfo.x, targetMonInfo.y), + "size": Size(targetMonInfo.width_in_pixels, targetMonInfo.height_in_pixels)} + + relMonInfo = monitors[relativeTo]["monitor"] + if relativeTo in newPos.keys(): + x, y = newPos[relativeTo]["x"], newPos[relativeTo]["y"] + else: + x, y = relMonInfo.x, relMonInfo.y + relMon = {"position": Point(x, y), + "size": Size(relMonInfo.width_in_pixels, relMonInfo.height_in_pixels)} + + x, y = _getRelativePosition(targetMon, relMon) + w, h = targetMonInfo.width_in_pixels, targetMonInfo.height_in_pixels + setPrimary = targetMonInfo.primary == 1 + + newPos[monitor] = {"x": x, "y": y} - targetMonInfo = monitors[name]["monitor"] - targetMon = {"relativePos": relativePos, "relativeTo": relativeTo, - "position": Point(targetMonInfo.x, targetMonInfo.y), - "size": Size(targetMonInfo.width_in_pixels, targetMonInfo.height_in_pixels)} - - relMonInfo = monitors[relativeTo]["monitor"] - relMon = {"position": Point(relMonInfo.x, relMonInfo.y), - "size": Size(relMonInfo.width_in_pixels, relMonInfo.height_in_pixels)} + else: + monInfo = monitors[monitor]["monitor"] + x, y = monInfo.x, monInfo.y + setPrimary = monInfo.primary == 1 + if x < 0: + xOffset += abs(x) + if y < 0: + yOffset += abs(y) + + arrangement[monitor] = { + "setPrimary": setPrimary, + "x": x, + "y": y + } - primaryName = _getPrimaryName() - # WARNING: There will be no primary monitor when moving it to another position! - # cmd2 = " --noprimary" if name == primaryName else "" - cmd2 = "" - x, y, relCmd = _getRelativePosition(targetMon, relMon) - cmd3 = relCmd % relativeTo if relativePos in (Position.LEFT_TOP, Position.RIGHT_TOP, Position.ABOVE_LEFT, Position.BELOW_LEFT) else "" - cmd = ("xrandr --output %s --pos %sx%s" % (name, x, y)) + cmd2 + cmd3 + if arrangement: + cmd = _buildCommand(arrangement, xOffset, yOffset) try: subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL) except: pass +def _buildCommand(arrangement: dict[str, dict[str, Union[int, bool]]], xOffset: int, yOffset: int): + cmd = "xrandr" + for monName in arrangement.keys(): + arrInfo = arrangement[monName] + cmd += " --output %s" % monName + # xrandr won't accept negative values!!!! + # https://superuser.com/questions/485120/how-do-i-align-the-bottom-edges-of-two-monitors-with-xrandr + cmd += " --pos %sx%s" % (str(int(arrInfo["x"]) + xOffset), str(int(arrInfo["y"]) + yOffset)) + if arrInfo["setPrimary"]: + cmd += " --primary" + print(cmd) + return cmd + + +def _scale(name: str = "") -> Tuple[Optional[float], Optional[float]]: + value = None + cmd = "xrandr -q | grep %s -A 5 | grep ' +\\|*+'" % name + err, ret = subprocess.getstatusoutput(cmd) + if err == 0 and ret: + try: + res = ret.split(" ") + lines = list(filter(None, res)) + a, b = lines[0].split("x") + w = int(a) + h = int(b) + r = float(lines[1].replace("+", "").replace("*", "")) + value = DisplayMode(w, h, r) + except: + pass + scaleX, scaleY = None, None + if value: + for monitorData in _XgetAllMonitors(name): + display, root, monitor, monName = monitorData + w, h = monitor.width_in_pixels, monitor.height_in_pixels + wm, hm = monitor.width_in_millimeters, monitor.height_in_millimeters + if wm and hm: + wDef, hDef = value.width, value.height + dpiXDef, dpiYDef = round((wDef * 25.4) / wm), round((hDef * 25.4) / hm) + dpiX, dpiY = round((w * 25.4) / wm), round((h * 25.4) / hm) + if dpiX and dpiY and dpiXDef and dpiYDef: + scaleX, scaleY = round(100 / (dpiX / dpiXDef)), round(100 / (dpiY / dpiYDef)) + return scaleX, scaleY + + +_outputs = [] +_lockOutputs = threading.RLock() +_monitors = [] +_lockMonitors = threading.RLock() + + def _XgetAllOutputs(name: str = ""): - outputs = [] - global _roots - for rootData in _roots: - display, screen, root = rootData - res = randr.get_screen_resources(root) - for output in res.outputs: - outputInfo = randr.get_output_info(display, output, res.config_timestamp) - if not name or (name and name == outputInfo.name): - outputs.append([display, screen, root, res, output, outputInfo]) - return outputs + global _outputs + global _lockOutputs + global _monitors + global _lockMonitors + newMonitors = _XgetAllMonitors() + if _monitors != newMonitors: + with _lockMonitors: + _monitors = newMonitors + outputs = [] + global _roots + for rootData in _roots: + display, screen, root, res = rootData + if res: + for output in res.outputs: + try: + outputInfo = randr.get_output_info(display, output, res.config_timestamp) + outputs.append([display, screen, root, res, output, outputInfo]) + except: + pass + with _lockOutputs: + _outputs = outputs + if name: + ret = [] + for outputData in _outputs: + display, screen, root, res, output, outputInfo = outputData + if name == outputInfo.name: + ret.append(outputData) + break + return ret + else: + return _outputs def _XgetAllCrtcs(name: str = ""): crtcs = [] - for outputData in _XgetAllOutputs(): + outputs = _XgetAllOutputs(name) + for outputData in outputs: display, screen, root, res, output, outputInfo = outputData - res = randr.get_screen_resources(root) - for output in res.outputs: - outputInfo = randr.get_output_info(display, output, res.config_timestamp) - if not name or (name and name == outputInfo.name): - for crtc in outputInfo.crtcs: + if not name or (name and name == outputInfo.name): + for crtc in outputInfo.crtcs: + try: crtcInfo = randr.get_crtc_info(display, crtc, res.config_timestamp) crtcs.append([display, screen, root, res, output, outputInfo, crtc, crtcInfo]) + except: + pass + if name: + return crtcs return crtcs @@ -644,7 +813,7 @@ def _XgetAllMonitors(name: str = ""): monitors = [] global _roots for rootData in _roots: - display, screen, root = rootData + display, screen, root, res = rootData for monitor in randr.get_monitors(root).monitors: monName = display.get_atom_name(monitor.name) if not name or (name and name == monName): @@ -658,7 +827,7 @@ def _XgetAllMonitorsDict(): monitors = {} global _roots for rootData in _roots: - display, screen, root = rootData + display, screen, root, res = rootData for monitor in randr.get_monitors(root).monitors: monitors[display.get_atom_name(monitor.name)] = {"display": display, "root": root, "monitor": monitor} return monitors @@ -668,8 +837,8 @@ def _XgetAllMonitorsNames(): monNames = [] global _roots for rootData in _roots: - display, screen, root = rootData - for monitor in randr.get_monitors(root, is_active=True).monitors: + display, screen, root, res = rootData + for monitor in randr.get_monitors(root).monitors: monNames.append(display.get_atom_name(monitor.name)) return monNames @@ -678,7 +847,7 @@ def _XgetPrimary(): outputs = _XgetAllOutputs() for monitorData in _XgetAllMonitors(): display, root, monitor, monName = monitorData - if monitor.primary == 1 or (monitor.x == 0 and monitor.y == 0): + if monitor.primary == 1: for outputData in outputs: display, screen, root, res, output, outputInfo = outputData if outputInfo.name == monName: @@ -696,13 +865,13 @@ def _XgetMonitorData(handle: Optional[int] = None): return display, screen, root, res, output, outputInfo.name else: outputs = _XgetAllOutputs() - monitors = _XgetAllMonitors() - for monitorData in monitors: + global _monitors + for monitorData in _monitors: display, root, monitor, monName = monitorData - if monitor.primary == 1 or len(monitors) == 1: + if monitor.primary == 1 or len(_monitors) == 1: for outputData in outputs: display, screen, root, res, output, outputInfo = outputData - if outputInfo.crtc: + if monName == outputInfo.name and outputInfo.crtc: return display, screen, root, res, output, outputInfo.name return None @@ -748,6 +917,7 @@ def _eventLoop(kill: threading.Event, interval: float): print('Output property change') # e = randr.OutputPropertyNotify(display=display.display, binarydata = e._binary) print(e._data) + else: print("Unrecognised subcode", e.sub_code) diff --git a/src/pymonctl/_pymonctl_macos.py b/src/pymonctl/_pymonctl_macos.py index 9a9dd9d..4ce958e 100644 --- a/src/pymonctl/_pymonctl_macos.py +++ b/src/pymonctl/_pymonctl_macos.py @@ -4,13 +4,14 @@ # mypy: disable_error_code = no-any-return from __future__ import annotations -import subprocess import sys -import threading -import time assert sys.platform == "darwin" +import subprocess +import threading +import time + from typing import Optional, List, Union, cast, Tuple import AppKit @@ -87,7 +88,7 @@ def _getAllMonitorsDict(forceUpdate: bool = True) -> dict[str, ScreenValue]: result[scrName] = { 'system_name': name, - 'handle': displayId, + 'id': displayId, 'is_primary': is_primary, 'position': Point(x, y), 'size': Size(w, h), @@ -492,7 +493,7 @@ def _setPosition(relativePos: Union[int, Position], relativeTo: Optional[str], n relMon = {"pos": Point(frame.origin.x, frame.origin.y), "size": Point(frame.size.width, frame.size.height)} - x, y, relCmd = _getRelativePosition(targetMon, relMon) + x, y = _getRelativePosition(targetMon, relMon) ret, configRef = Quartz.CGBeginDisplayConfiguration(None) # If this display becomes primary (0, 0). MUST reposition primary monitor first! if not ret: diff --git a/src/pymonctl/_pymonctl_win.py b/src/pymonctl/_pymonctl_win.py index 23a0bd6..b523508 100644 --- a/src/pymonctl/_pymonctl_win.py +++ b/src/pymonctl/_pymonctl_win.py @@ -8,6 +8,7 @@ assert sys.platform == "win32" +import gc import threading import ctypes.wintypes @@ -20,6 +21,9 @@ from pymonctl import BaseMonitor, _getRelativePosition, \ DisplayMode, ScreenValue, Box, Rect, Point, Size, Position, Orientation +from pymonctl.structs import _QDC_ONLY_ACTIVE_PATHS, _DISPLAYCONFIG_PATH_ACTIVE, _DISPLAYCONFIG_PATH_INFO, \ + _DISPLAYCONFIG_MODE_INFO, _DISPLAYCONFIG_SOURCE_DPI_SCALE_GET, \ + _DISPLAYCONFIG_DEVICE_INFO_GET_DPI_SCALE dpiAware = ctypes.windll.user32.GetAwarenessFromDpiAwarenessContext(ctypes.windll.user32.GetThreadDpiAwarenessContext()) @@ -52,8 +56,8 @@ def _getAllMonitorsDict() -> dict[str, ScreenValue]: pass if dev and dev.StateFlags & win32con.DISPLAY_DEVICE_ATTACHED_TO_DESKTOP: name = monName - x, y, r, b = monitorInfo.get("Monitor", (-1, -1, 0, 0)) - wx, wy, wr, wb = monitorInfo.get("Work", (-1, -1, 0, 0)) + x, y, r, b = monitorInfo.get("Monitor", (0, 0, -1, -1)) + wx, wy, wr, wb = monitorInfo.get("Work", (0, 0, -1, -1)) is_primary = monitorInfo.get("Flags", 0) == win32con.MONITORINFOF_PRIMARY pScale = ctypes.c_uint() ctypes.windll.shcore.GetScaleFactorForMonitor(hMon, ctypes.byref(pScale)) @@ -66,10 +70,14 @@ def _getAllMonitorsDict() -> dict[str, ScreenValue]: rot = settings.DisplayOrientation freq = settings.DisplayFrequency depth = settings.BitsPerPel + handle = None + hMon = win32api.MonitorFromPoint((x, y)) + if hMon and hasattr(hMon, "handle"): + handle = hMon.handle result[name] = { "system_name": name, - "handle": win32api.MonitorFromPoint((x, y)), + "id": handle, "is_primary": is_primary, "position": Point(x, y), "size": Size(abs(r - x), abs(b - y)), @@ -90,8 +98,8 @@ def _getMonitorsCount() -> int: def _findMonitor(x: int, y: int) -> Optional[Win32Monitor]: # Watch this: started to fail when repeatedly and quickly invoking it in Python 3.10 (it was ok in 3.9) hMon = win32api.MonitorFromPoint((x, y), win32con.MONITOR_DEFAULTTONEAREST) - if hMon and hMon.handle > 0: - return Win32Monitor(hMon) + if hMon and hasattr(hMon, "handle"): + return Win32Monitor(hMon.handle) return None @@ -108,6 +116,7 @@ def _arrangeMonitors(arrangement: dict[str, dict[str, Union[str, int, Position, if monName not in arrangement.keys(): return primaryPresent = False + setAsPrimary = "" for monName in arrangement.keys(): relPos = arrangement[monName]["relativePos"] relMon = arrangement[monName]["relativeTo"] @@ -115,12 +124,16 @@ def _arrangeMonitors(arrangement: dict[str, dict[str, Union[str, int, Position, (not relMon and relPos != Position.PRIMARY): return elif relPos == Position.PRIMARY: + setAsPrimary = monName primaryPresent = True if not primaryPresent: return + _setPrimary(setAsPrimary, True) + for monName in arrangement.keys(): - _setPosition(cast(Position, arrangement[monName]["relativePos"]), str(arrangement[monName]["relativeTo"]), monName, False) + if monName != setAsPrimary: + _setPosition(cast(Position, arrangement[monName]["relativePos"]), str(arrangement[monName]["relativeTo"]), monName, False) # First request all changes, then execute this with NULL params win32api.ChangeDisplaySettingsEx() @@ -144,7 +157,7 @@ def __init__(self, handle: Optional[int] = None): """ if not handle: hMon = win32api.MonitorFromPoint((0, 0), win32con.MONITOR_DEFAULTTOPRIMARY) - if hMon and hMon.handle > 0: + if hMon and hasattr(hMon, "handle"): handle = hMon.handle if handle: self.handle = handle @@ -152,29 +165,34 @@ def __init__(self, handle: Optional[int] = None): self.name = monitorInfo.get("Device", "") else: raise ValueError + self._hasVCPSupport = _win32hasVCPSupport(self.handle) + self._hasVCPPowerSupport = _win32hasVCPPowerSupport(self.handle) @property def size(self) -> Optional[Size]: - monitorInfo = win32api.GetMonitorInfo(self.handle) - if monitorInfo: - x, y, r, b = monitorInfo.get("Monitor", (-1, -1, 0, 0)) - return Size(abs(r - x), abs(b - y)) + if self.handle is not None: + monitorInfo = win32api.GetMonitorInfo(self.handle) + if monitorInfo: + x, y, r, b = monitorInfo.get("Monitor", (0, 0, -1, -1)) + return Size(abs(r - x), abs(b - y)) return None @property def workarea(self) -> Optional[Rect]: - monitorInfo = win32api.GetMonitorInfo(self.handle) - if monitorInfo: - wx, wy, wr, wb = monitorInfo.get("Work", (-1, -1, 0, 0)) - return Rect(wx, wy, wr, wb) + if self.handle is not None: + monitorInfo = win32api.GetMonitorInfo(self.handle) + if monitorInfo: + wx, wy, wr, wb = monitorInfo.get("Work", (0, 0, -1, -1)) + return Rect(wx, wy, wr, wb) return None @property def position(self) -> Optional[Point]: - monitorInfo = win32api.GetMonitorInfo(self.handle) - if monitorInfo: - x, y, r, b = monitorInfo.get("Monitor", (-1, -1, 0, 0)) - return Point(x, y) + if self.handle is not None: + monitorInfo = win32api.GetMonitorInfo(self.handle) + if monitorInfo: + x, y, r, b = monitorInfo.get("Monitor", (0, 0, -1, -1)) + return Point(x, y) return None def setPosition(self, relativePos: Union[int, Position], relativeTo: Optional[str]): @@ -182,34 +200,38 @@ def setPosition(self, relativePos: Union[int, Position], relativeTo: Optional[st @property def box(self) -> Optional[Box]: - monitorInfo = win32api.GetMonitorInfo(self.handle) - if monitorInfo: - x, y, r, b = monitorInfo.get("Monitor", (-1, -1, 0, 0)) - return Box(x, y, abs(r - x), abs(b - y)) + if self.handle is not None: + monitorInfo = win32api.GetMonitorInfo(self.handle) + if monitorInfo: + x, y, r, b = monitorInfo.get("Monitor", (0, 0, -1, -1)) + return Box(x, y, abs(r - x), abs(b - y)) return None @property def rect(self) -> Optional[Rect]: - monitorInfo = win32api.GetMonitorInfo(self.handle) - if monitorInfo: - x, y, r, b = monitorInfo.get("Monitor", (-1, -1, 0, 0)) - return Rect(x, y, r, b) + if self.handle is not None: + monitorInfo = win32api.GetMonitorInfo(self.handle) + if monitorInfo: + x, y, r, b = monitorInfo.get("Monitor", (0, 0, -1, -1)) + return Rect(x, y, r, b) return None @property def scale(self) -> Optional[Tuple[float, float]]: - pScale = ctypes.c_uint() - ctypes.windll.shcore.GetScaleFactorForMonitor(self.handle, ctypes.byref(pScale)) - # import wmi - # obj = wmi.WMI().Win32_PnPEntity(ConfigManagerErrorCode=0) - # displays = [x for x in obj if 'DISPLAY' in str(x)] - # for item in displays: - # print(item) - return float(pScale.value), float(pScale.value) + if self.handle is not None: + pScale = ctypes.c_uint() + ctypes.windll.shcore.GetScaleFactorForMonitor(self.handle, ctypes.byref(pScale)) + # import wmi + # obj = wmi.WMI().Win32_PnPEntity(ConfigManagerErrorCode=0) + # displays = [x for x in obj if 'DISPLAY' in str(x)] + # for item in displays: + # print(item) + return float(pScale.value), float(pScale.value) + return None def _getPaths(self): - flags = pymonctl.structs._QDC_ONLY_ACTIVE_PATHS + flags = _QDC_ONLY_ACTIVE_PATHS numPathArrayElements = ctypes.c_uint32() numModeInfoArrayElements = ctypes.c_uint32() ctypes.windll.user32.GetDisplayConfigBufferSizes(flags, @@ -217,11 +239,11 @@ def _getPaths(self): ctypes.byref(numModeInfoArrayElements)) print("PATHS", numPathArrayElements.value, "MODES", numModeInfoArrayElements.value) - flags = pymonctl.structs._DISPLAYCONFIG_PATH_ACTIVE - paths = (pymonctl.structs._DISPLAYCONFIG_PATH_INFO * numPathArrayElements.value)() + flags = _DISPLAYCONFIG_PATH_ACTIVE + paths = (_DISPLAYCONFIG_PATH_INFO * numPathArrayElements.value)() print(ctypes.sizeof(pymonctl.structs._DISPLAYCONFIG_PATH_INFO()), ctypes.sizeof(paths)) - modes = (pymonctl.structs._DISPLAYCONFIG_MODE_INFO * numModeInfoArrayElements.value)() - print(ctypes.sizeof(pymonctl.structs._DISPLAYCONFIG_MODE_INFO()), ctypes.sizeof(modes)) + modes = (_DISPLAYCONFIG_MODE_INFO * numModeInfoArrayElements.value)() + print(ctypes.sizeof(_DISPLAYCONFIG_MODE_INFO()), ctypes.sizeof(modes)) nullptr = ctypes.c_void_p() # or None? ret = ctypes.windll.user32.QueryDisplayConfig(flags, ctypes.byref(numPathArrayElements), @@ -233,19 +255,19 @@ def _getPaths(self): print("RET", ret, "PATHS", numPathArrayElements.value, "MODES", numModeInfoArrayElements.value) if ret == 0: for i in range(numPathArrayElements.value): - pathInfo: pymonctl.structs._DISPLAYCONFIG_PATH_INFO = paths[i].value + pathInfo: _DISPLAYCONFIG_PATH_INFO = paths[i].value print(pathInfo) else: print("FAILED!!! (I guess you are gonna see this a ridiculously huge number of times...)") def setScale(self, scale: Tuple[float, float]): - if scale is not None: + if scale is not None and self.handle is not None: # https://github.com/lihas/windows-DPI-scaling-sample/blob/master/DPIHelper/DpiHelper.cpp # self._getPaths() - scaleData = pymonctl.structs._DISPLAYCONFIG_SOURCE_DPI_SCALE_GET() - scaleData.header.type = pymonctl.structs._DISPLAYCONFIG_DEVICE_INFO_GET_DPI_SCALE + scaleData = _DISPLAYCONFIG_SOURCE_DPI_SCALE_GET() + scaleData.header.type = _DISPLAYCONFIG_DEVICE_INFO_GET_DPI_SCALE scaleData.header.size = ctypes.sizeof(scaleData) # HOW to GET adapterId and sourceId values???? -> QueryDisplayConfig # https://stackoverflow.com/questions/67332814/how-to-properly-clone-and-extend-two-specific-monitors-on-windows @@ -263,10 +285,12 @@ def setScale(self, scale: Tuple[float, float]): @property def dpi(self) -> Optional[Tuple[float, float]]: - dpiX = ctypes.c_uint() - dpiY = ctypes.c_uint() - ctypes.windll.shcore.GetDpiForMonitor(self.handle, 0, ctypes.byref(dpiX), ctypes.byref(dpiY)) - return dpiX.value, dpiY.value + if self.handle is not None: + dpiX = ctypes.c_uint() + dpiY = ctypes.c_uint() + ctypes.windll.shcore.GetDpiForMonitor(self.handle, 0, ctypes.byref(dpiX), ctypes.byref(dpiY)) + return dpiX.value, dpiY.value + return None @property def orientation(self) -> Optional[Union[int, Orientation]]: @@ -302,20 +326,20 @@ def colordepth(self) -> Optional[int]: @property def brightness(self) -> Optional[int]: - minBright = ctypes.c_uint() - currBright = ctypes.c_uint() - maxBright = ctypes.c_uint() - hDevices = _win32getPhysicalMonitorsHandles(self.handle) - for hDevice in hDevices: - ctypes.windll.dxva2.GetMonitorBrightness(hDevice, ctypes.byref(minBright), ctypes.byref(currBright), - ctypes.byref(maxBright)) - _win32destroyPhysicalMonitors(hDevices) - normBrightness = int((currBright.value / ((maxBright.value + minBright.value) or 1))) * 100 - return normBrightness + if self.handle is not None: + minBright = ctypes.c_uint() + currBright = ctypes.c_uint() + maxBright = ctypes.c_uint() + hDevices = _win32getPhysicalMonitorsHandles(self.handle) + for hDevice in hDevices: + ctypes.windll.dxva2.GetMonitorBrightness(hDevice, ctypes.byref(minBright), ctypes.byref(currBright), + ctypes.byref(maxBright)) + _win32destroyPhysicalMonitors(hDevices) + return currBright.value return None def setBrightness(self, brightness: Optional[int]): - if brightness is not None: + if brightness is not None and self.handle is not None: minBright = ctypes.c_uint() currBright = ctypes.c_uint() maxBright = ctypes.c_uint() @@ -323,9 +347,11 @@ def setBrightness(self, brightness: Optional[int]): for hDevice in hDevices: ctypes.windll.dxva2.GetMonitorBrightness(hDevice, ctypes.byref(minBright), ctypes.byref(currBright), ctypes.byref(maxBright)) - normBrightness = brightness * ((maxBright.value + minBright.value) / 100) - if minBright.value <= brightness <= maxBright.value and currBright.value != brightness: - ctypes.windll.dxva2.SetMonitorBrightness(hDevice, normBrightness) + if brightness < minBright.value: + brightness = minBright.value + elif brightness > maxBright.value: + brightness = maxBright.value + ctypes.windll.dxva2.SetMonitorBrightness(hDevice, brightness) ctypes.windll.dxva2.DestroyPhysicalMonitor(hDevice) # This fails with "wmi.x_wmi: " # import wmi @@ -333,20 +359,20 @@ def setBrightness(self, brightness: Optional[int]): @property def contrast(self) -> Optional[int]: - minCont = ctypes.c_uint() - currCont = ctypes.c_uint() - maxCont = ctypes.c_uint() - hDevices = _win32getPhysicalMonitorsHandles(self.handle) - for hDevice in hDevices: - ctypes.windll.dxva2.GetMonitorContrast(hDevice, ctypes.byref(minCont), ctypes.byref(currCont), - ctypes.byref(maxCont)) - _win32destroyPhysicalMonitors(hDevices) - normContrast = int((currCont.value / ((maxCont.value + minCont.value) or 1))) * 100 - return normContrast + if self.handle is not None: + minCont = ctypes.c_uint() + currCont = ctypes.c_uint() + maxCont = ctypes.c_uint() + hDevices = _win32getPhysicalMonitorsHandles(self.handle) + for hDevice in hDevices: + ctypes.windll.dxva2.GetMonitorContrast(hDevice, ctypes.byref(minCont), ctypes.byref(currCont), + ctypes.byref(maxCont)) + _win32destroyPhysicalMonitors(hDevices) + return currCont.value return None def setContrast(self, contrast: Optional[int]): - if contrast is not None: + if contrast is not None and self.handle is not None: minCont = ctypes.c_uint() currCont = ctypes.c_uint() maxCont = ctypes.c_uint() @@ -354,9 +380,11 @@ def setContrast(self, contrast: Optional[int]): for hDevice in hDevices: ctypes.windll.dxva2.GetMonitorContrast(hDevice, ctypes.byref(minCont), ctypes.byref(currCont), ctypes.byref(maxCont)) - normContrast = contrast * ((maxCont.value + minCont.value) / 100) - if minCont.value <= contrast <= maxCont.value and currCont.value != contrast: - ctypes.windll.dxva2.SetMonitorContrast(hDevice, normContrast) + if contrast < minCont.value: + contrast = minCont.value + elif contrast > maxCont.value: + contrast = maxCont.value + ctypes.windll.dxva2.SetMonitorContrast(hDevice, contrast) ctypes.windll.dxva2.DestroyPhysicalMonitor(hDevice) @property @@ -366,11 +394,12 @@ def mode(self) -> Optional[DisplayMode]: def setMode(self, mode: Optional[DisplayMode]): if mode is not None: - devmode = win32api.EnumDisplaySettings(self.name, win32con.ENUM_CURRENT_SETTINGS) + # devmode = win32api.EnumDisplaySettings(self.name, win32con.ENUM_CURRENT_SETTINGS) + devmode = pywintypes.DEVMODEType() # type: ignore[attr-defined] devmode.PelsWidth = mode.width # type: ignore[misc] devmode.PelsHeight = mode.height # type: ignore[misc] devmode.DisplayFrequency = mode.frequency # type: ignore[misc] - devmode.Fields = devmode.Fields | win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT | win32con.DM_DISPLAYFREQUENCY # type: ignore[misc] + devmode.Fields = win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT # | win32con.DM_DISPLAYFREQUENCY # type: ignore[misc] win32api.ChangeDisplaySettingsEx(self.name, devmode, 0) # type: ignore[arg-type] @property @@ -408,7 +437,7 @@ def setPrimary(self): def turnOn(self): # https://stackoverflow.com/questions/16402672/control-screen-with-python - if _win32hasVCPSupport(self.handle) and _win32hasVCPPowerSupport(self.handle): + if self.handle is not None and self._hasVCPSupport and self._hasVCPPowerSupport: if not self.isOn: hDevices = _win32getPhysicalMonitorsHandles(self.handle) for hDevice in hDevices: @@ -426,7 +455,7 @@ def turnOn(self): def turnOff(self): # https://stackoverflow.com/questions/16402672/control-screen-with-python - if _win32hasVCPSupport(self.handle) and _win32hasVCPPowerSupport(self.handle): + if self.handle is not None and self._hasVCPSupport and self._hasVCPPowerSupport: if self.isOn: hDevices = _win32getPhysicalMonitorsHandles(self.handle) for hDevice in hDevices: @@ -439,7 +468,7 @@ def turnOff(self): win32con.SMTO_ABORTIFHUNG, 100) def suspend(self): - if _win32hasVCPSupport(self.handle) and _win32hasVCPPowerSupport(self.handle): + if self.handle is not None and self._hasVCPSupport and self._hasVCPPowerSupport: hDevices = _win32getPhysicalMonitorsHandles(self.handle) for hDevice in hDevices: # code and value according to: VESA Monitor Control Command Set (MCCS) standard, version 1.0 and 2.0. @@ -453,25 +482,26 @@ def suspend(self): @property def isOn(self) -> Optional[bool]: ret = None - if _win32hasVCPSupport(self.handle) and _win32hasVCPPowerSupport(self.handle): - hDevices = _win32getPhysicalMonitorsHandles(self.handle) - for hDevice in hDevices: - # code and value according to: VESA Monitor Control Command Set (MCCS) standard, version 1.0 and 2.0. - pvct = ctypes.c_uint() - currValue = ctypes.c_uint() - maxValue = ctypes.c_uint() - ctypes.windll.dxva2.GetVCPFeatureAndVCPFeatureReply(hDevice, 0xD6, ctypes.byref(pvct), - ctypes.byref(currValue), ctypes.byref(maxValue)) - ret = currValue.value == 1 - _win32destroyPhysicalMonitors(hDevices) - else: - # Not working by now (tried with hDevice as well) - # https://stackoverflow.com/questions/203355/is-there-any-way-to-detect-the-monitor-state-in-windows-on-or-off - # https://learn.microsoft.com/en-us/windows/win32/power/power-management-functions - is_working = ctypes.c_uint() - res = ctypes.windll.kernel32.GetDevicePowerState(self.handle, ctypes.byref(is_working)) - if res: - ret = bool(is_working.value == 1) + if self.handle is not None: + if self._hasVCPSupport and self._hasVCPPowerSupport: + hDevices = _win32getPhysicalMonitorsHandles(self.handle) + for hDevice in hDevices: + # code and value according to: VESA Monitor Control Command Set (MCCS) standard, version 1.0 and 2.0. + pvct = ctypes.c_uint() + currValue = ctypes.c_uint() + maxValue = ctypes.c_uint() + ctypes.windll.dxva2.GetVCPFeatureAndVCPFeatureReply(hDevice, 0xD6, ctypes.byref(pvct), + ctypes.byref(currValue), ctypes.byref(maxValue)) + ret = currValue.value == 1 + _win32destroyPhysicalMonitors(hDevices) + else: + # Not working by now (tried with hDevice as well) + # https://stackoverflow.com/questions/203355/is-there-any-way-to-detect-the-monitor-state-in-windows-on-or-off + # https://learn.microsoft.com/en-us/windows/win32/power/power-management-functions + is_working = ctypes.c_uint() + res = ctypes.windll.kernel32.GetDevicePowerState(self.handle, ctypes.byref(is_working)) + if res: + ret = bool(is_working.value == 1) return ret def attach(self): @@ -484,11 +514,7 @@ def attach(self): # devmode.PelsHeight = settings.PelsHeight devmode.Fields = devmode.Fields | win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT # type: ignore[misc] win32api.ChangeDisplaySettingsEx(self.name, devmode, win32con.CDS_UPDATEREGISTRY) # type: ignore[arg-type] - for mon in win32api.EnumDisplayMonitors(): - hMon = mon[0].handle - monInfo = win32api.GetMonitorInfo(hMon) - if monInfo.get("Device", "") == self.name: - self.handle = hMon + self._findNewHandles() def detach(self, permanent: bool = False): dev = win32api.EnumDisplayDevices(self.name, 0, 0) @@ -500,6 +526,21 @@ def detach(self, permanent: bool = False): devmode.Position_y = 0 devmode.Fields = devmode.Fields | win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT | win32con.DM_POSITION win32api.ChangeDisplaySettingsEx(self.name, devmode, win32con.CDS_UPDATEREGISTRY if permanent else 0) # type: ignore[arg-type] + self._findNewHandles() + + def _findNewHandles(self): + # https://stackoverflow.com/questions/328851/printing-all-instances-of-a-class + # All monitor IDs will change after detaching or attaching a monitor + monitors = win32api.EnumDisplayMonitors() + otherMonitorInstances = [cast(Win32Monitor, obj) for obj in gc.get_objects() if isinstance(obj, Win32Monitor)] + for instance in otherMonitorInstances: + for monitor in monitors: + hMon = monitor[0].handle + monitorInfo = win32api.GetMonitorInfo(hMon) + monName = monitorInfo.get("Device", "") + if instance.name == monName: + instance.handle = hMon + break @property def isAttached(self) -> Optional[bool]: @@ -525,7 +566,7 @@ def _setPrimary(name: str, commit: bool = True): # devmode = win32api.EnumDisplaySettings(name, win32con.ENUM_CURRENT_SETTINGS) devmode.Position_x = 0 devmode.Position_y = 0 - devmode.Fields = devmode.Fields | win32con.DM_POSITION + devmode.Fields = win32con.DM_POSITION flags = win32con.CDS_SET_PRIMARY | win32con.CDS_UPDATEREGISTRY | win32con.CDS_NORESET win32api.ChangeDisplaySettingsEx(name, devmode, flags) # type: ignore[arg-type] break @@ -540,7 +581,7 @@ def _setPrimary(name: str, commit: bool = True): y = monInfo["Monitor"][1] - yOffset devmode.Position_x = x devmode.Position_y = y - devmode.Fields = devmode.Fields | win32con.DM_POSITION + devmode.Fields = win32con.DM_POSITION win32api.ChangeDisplaySettingsEx(monName, devmode, flags) # type: ignore[arg-type] if commit: win32api.ChangeDisplaySettingsEx() @@ -556,18 +597,18 @@ def _setPosition(relativePos: Union[int, Position], relativeTo: Optional[str], n monitors = _win32getAllMonitorsDict() if name in monitors.keys() and relativeTo in monitors.keys(): targetMonInfo = monitors[name]["monitor"] - x, y, r, b = targetMonInfo.get("Monitor", (-1, -1, 0, 0)) + x, y, r, b = targetMonInfo.get("Monitor", (0, 0, -1, -1)) w = abs(r - x) h = abs(b - y) targetMon = {"relativePos": relativePos, "relativeTo": relativeTo, "position": Point(x, y), "size": Size(w, h)} relMonInfo = monitors[relativeTo]["monitor"] - x, y, r, b = relMonInfo.get("Monitor", (-1, -1, 0, 0)) + x, y, r, b = relMonInfo.get("Monitor", (0, 0, -1, -1)) w = abs(r - x) h = abs(b - y) relMon = {"position": Point(x, y), "size": Size(w, h)} - x, y, _ = _getRelativePosition(targetMon, relMon) + x, y = _getRelativePosition(targetMon, relMon) devmode = win32api.EnumDisplaySettings(name, win32con.ENUM_CURRENT_SETTINGS) devmode.Position_x = x # type: ignore[misc] @@ -739,3 +780,6 @@ def _eventLogLoop(kill: threading.Event, interval: float): kill.wait(interval) win32evtlog.CloseEventLog(handle) + +# import win32ui +# print(win32ui.GetDeviceCaps(win32gui.GetDC(None), win32con.HORZSIZE), win32ui.GetDeviceCaps(win32gui.GetDC(None), win32con.VERTSIZE)) \ No newline at end of file diff --git a/tests/test_pymonctl.py b/tests/test_pymonctl.py index d6e9cd4..70175ee 100644 --- a/tests/test_pymonctl.py +++ b/tests/test_pymonctl.py @@ -61,23 +61,35 @@ def changedCB(names, info): print() - pmc.enableUpdate(monitorCountChanged=pluggedCB, monitorPropsChanged=changedCB) - print("CHANGE MODE") + pmc.enableUpdateInfo() + pmc.plugListenerRegister(pluggedCB) + pmc.changeListenerRegister(changedCB) currMode = monitor.mode targetMode = monitor.mode - modes = monitor.allModes - for mode in modes: - if monitor.mode and mode.width != monitor.mode.width: - targetMode = mode - break - targetMode = DisplayMode(3840, 1080, 120) + if monitor.size.width == 5120: + targetMode = DisplayMode(3840, 1080, monitor.defaultMode.frequency) + elif monitor.size.width == 1920: + targetMode = DisplayMode(1360, 768, monitor.defaultMode.frequency) + elif monitor.size.width == 1680: + targetMode = DisplayMode(1440, 900, monitor.defaultMode.frequency) + else: + modes = monitor.allModes + for mode in modes: + if monitor.mode and mode.width != monitor.mode.width: + targetMode = mode + break + print("CHANGE MODE", targetMode) monitor.setMode(targetMode) time.sleep(3) - print("SET DEFAULT MODE") + print("MODE CHANGED?:", monitor.mode) + print("SET DEFAULT MODE", monitor.defaultMode) monitor.setDefaultMode() time.sleep(3) - print("RESTORE MODE") + print("DEFAULT MODE SET?:", monitor.mode) + print("RESTORE MODE", currMode) monitor.setMode(currMode) + time.sleep(3) + print("MODE RESTORED?:", monitor.mode) print() print("CHANGE BRIGHTNESS") @@ -86,6 +98,7 @@ def changedCB(names, info): time.sleep(2) print("RESTORE BRIGHTNESS") monitor.setBrightness(currBright) + time.sleep(2) print() print("CHANGE CONTRAST") @@ -94,6 +107,7 @@ def changedCB(names, info): time.sleep(2) print("RESTORE CONTRAST") monitor.setContrast(currContrast) + time.sleep(2) print() print("CHANGE ORIENTATION") @@ -115,7 +129,7 @@ def changedCB(names, info): print("IS ON?:", monitor.isOn) print("TURN OFF") monitor.turnOff() - time.sleep(5) + time.sleep(3) print("IS ON?:", monitor.isOn) print("TURN ON") monitor.turnOn() @@ -128,25 +142,28 @@ def changedCB(names, info): print("IS ON?:", monitor.isOn) print("WAKEUP") monitor.turnOn() - time.sleep(2) + time.sleep(5) print("IS ON?:", monitor.isOn) print() print("IS ATTACHED?:", monitor.isAttached) print("DETACH") monitor.detach() - time.sleep(5) + time.sleep(3) print("IS ATTACHED?:", monitor.isAttached) print("ATTACH") monitor.attach() - time.sleep(2) + time.sleep(5) print("IS ATTACHED?:", monitor.isAttached) print() - pmc.disableUpdate() + pmc.disableUpdateInfo() + pmc.plugListenerUnregister(pluggedCB) + pmc.changeListenerUnregister(changedCB) if len(monitorsPlugged) > 1: mon1 = monitorsPlugged[0] mon2 = monitorsPlugged[1] + print("MANAGING MONITORS") print("MONITOR 1:", mon1.name) print("MONITOR 2:", mon2.name) @@ -154,24 +171,23 @@ def changedCB(names, info): print("MONITOR 2 AS PRIMARY") print("MONITOR 2 PRIMARY:", mon2.isPrimary) mon2.setPrimary() + time.sleep(3) print("MONITOR 2 PRIMARY:", mon2.isPrimary) - time.sleep(5) print("MONITOR 1 AS PRIMARY") print("MONITOR 1 PRIMARY:", mon1.isPrimary) mon1.setPrimary() + time.sleep(3) print("MONITOR 1 PRIMARY:", mon1.isPrimary) print() print("CHANGE POSITION OF MONITOR 2 TO BELOW_LEFT") mon2.setPosition(Position.BELOW_LEFT, mon1.name) + time.sleep(0.3) print("MONITOR 2 POSITION:", mon2.position) - while True: - try: - time.sleep(0.2) - except KeyboardInterrupt: - break print() + print("=========== size & pos", "MON1", mon1.size, mon1.position, "MON2", mon2.size, mon2.position) + print("CHANGE ARRANGEMENT: MONITOR 2 AS PRIMARY, MONITOR 1 AT LEFT_BOTTOM") arrangement: dict[str, dict[str, Union[str, int, Position, Point, Size]]] = { str(mon2.name): {"relativePos": Position.PRIMARY, "relativeTo": ""}, @@ -179,14 +195,16 @@ def changedCB(names, info): } print(arrangement) pmc.arrangeMonitors(arrangement) - print("MONITOR 1 POSITION:", mon1.position) - print("MONITOR 2 POSITION:", mon2.position) - while True: - try: - time.sleep(0.2) - except KeyboardInterrupt: - break + time.sleep(3) + + print("=========== size & pos", "MON1", mon1.size, mon1.position, "MON2", mon2.size, mon2.position) + + print("MONITOR 1 POSITION:", mon1.position, "LEFT_BOTTOM:", mon2.position == Point(mon1.size.width, mon1.size.height - mon2.size.height)) + print("MONITOR 2 POSITION:", mon2.position, "PRIMARY:", mon2.isPrimary) print() + time.sleep(5) + + print("=========== size & pos", "MON1", mon1.size, mon1.position, "MON2", mon2.size, mon2.position) print("CHANGE ARRANGEMENT: MONITOR 1 AS PRIMARY, MONITOR 2 AT RIGHT_TOP") arrangement = { @@ -195,5 +213,13 @@ def changedCB(names, info): } print(arrangement) pmc.arrangeMonitors(arrangement) - print("MONITOR 1 POSITION:", mon1.position) - print("MONITOR 2 POSITION:", mon2.position) + time.sleep(3) + + print("=========== size & pos", "MON1", mon1.size, mon1.position, "MON2", mon2.size, mon2.position) + + print("MONITOR 1 POSITION:", mon1.position, "PRIMARY:", mon1.isPrimary) + print("MONITOR 2 POSITION:", mon2.position, "RIGHT_TOP:", mon2.position == Point(mon1.size.width, 0)) + + print("=========== size & pos", "MON1", mon1.size, mon1.position, "MON2", mon2.size, mon2.position) + +