From 70eea185e5d3a76cfcd89fb7e6e3a84159bd1bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Sun, 29 Sep 2024 14:25:55 -0300 Subject: [PATCH 01/14] [WIP] Refactoring to get publicly usable Locks --- src/extrainterpreters/memoryboard.py | 128 +++++++++++++-------------- src/extrainterpreters/utils.py | 7 ++ 2 files changed, 70 insertions(+), 65 deletions(-) diff --git a/src/extrainterpreters/memoryboard.py b/src/extrainterpreters/memoryboard.py index 3490dd8..f387190 100644 --- a/src/extrainterpreters/memoryboard.py +++ b/src/extrainterpreters/memoryboard.py @@ -10,20 +10,18 @@ from . import interpreters, running_interpreters, get_current, raw_list_all from . import _memoryboard from .utils import ( - guard_internal_use, - Field, + _InstMode, + _remote_memory, + _address_and_size, + _atomic_byte_lock, DoubleField, + Field, StructBase, - _InstMode, ResourceBusyError, + guard_internal_use ) -from ._memoryboard import _remote_memory, _address_and_size, _atomic_byte_lock - -_remote_memory = guard_internal_use(_remote_memory) -_address_and_size = guard_internal_use(_address_and_size) -_atomic_byte_lock = guard_internal_use(_atomic_byte_lock) - +from .lock import _CrossInterpreterStructLock class RemoteState: building = 0 @@ -52,61 +50,61 @@ class RemoteDataState: REMOTE_HEADER_SIZE = RemoteHeader._size -class _CrossInterpreterStructLock: - def __init__(self, struct, timeout=DEFAULT_TIMEOUT): - buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) - # struct_ptr = buffer_ptr + struct._offset - lock_offset = struct._offset + struct._get_offset_for_field("lock") - if lock_offset >= size: - raise ValueError("Lock address out of bounds for struct buffer") - self._lock_address = buffer_ptr + lock_offset - self._original_timeout = self._timeout = timeout - self._entered = 0 - - def timeout(self, timeout: None | float): - """One use only timeout, for the same lock - - with lock.timeout(0.5): - ... - """ - self._timeout = timeout - return self - - def __enter__(self): - if self._entered: - self._entered += 1 - return self - if self._timeout is None: - if not _atomic_byte_lock(self._lock_address): - self._timeout = self._original_timeout - raise ResourceBusyError("Couldn't acquire lock") - else: - threshold = time.time() + self._timeout - while time.time() <= threshold: - if _atomic_byte_lock(self._lock_address): - break - else: - self._timeout = self._original_timeout - raise TimeoutError("Timeout trying to acquire lock") - self._entered += 1 - return self - - def __exit__(self, *args): - if not self._entered: - return - self._entered -= 1 - if self._entered: - return - buffer = _remote_memory(self._lock_address, 1) - buffer[0] = 0 - del buffer - self._entered = False - self._timeout = self._original_timeout - - def __getstate__(self): - state = self.__dict__.copy() - state["_entered"] = False - return state +#class _CrossInterpreterStructLock: + #def __init__(self, struct, timeout=DEFAULT_TIMEOUT): + #buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) + ## struct_ptr = buffer_ptr + struct._offset + #lock_offset = struct._offset + struct._get_offset_for_field("lock") + #if lock_offset >= size: + #raise ValueError("Lock address out of bounds for struct buffer") + #self._lock_address = buffer_ptr + lock_offset + #self._original_timeout = self._timeout = timeout + #self._entered = 0 + + #def timeout(self, timeout: None | float): + #"""One use only timeout, for the same lock + + #with lock.timeout(0.5): + #... + #""" + #self._timeout = timeout + #return self + + #def __enter__(self): + #if self._entered: + #self._entered += 1 + #return self + #if self._timeout is None: + #if not _atomic_byte_lock(self._lock_address): + #self._timeout = self._original_timeout + #raise ResourceBusyError("Couldn't acquire lock") + #else: + #threshold = time.time() + self._timeout + #while time.time() <= threshold: + #if _atomic_byte_lock(self._lock_address): + #break + #else: + #self._timeout = self._original_timeout + #raise TimeoutError("Timeout trying to acquire lock") + #self._entered += 1 + #return self + + #def __exit__(self, *args): + #if not self._entered: + #return + #self._entered -= 1 + #if self._entered: + #return + #buffer = _remote_memory(self._lock_address, 1) + #buffer[0] = 0 + #del buffer + #self._entered = False + #self._timeout = self._original_timeout + + #def __getstate__(self): + #state = self.__dict__.copy() + #state["_entered"] = False + #return state # when a RemoteArray can't be destroyed in parent, @@ -468,7 +466,7 @@ def __del__(self): if getattr(self, "_data", None) is not None: try: self.close() - except TypeError: + except (TypeError, AttributeError): # at interpreter shutdown, some of the names needed in "close" # may have been deleted pass diff --git a/src/extrainterpreters/utils.py b/src/extrainterpreters/utils.py index 0806570..166d0e7 100644 --- a/src/extrainterpreters/utils.py +++ b/src/extrainterpreters/utils.py @@ -31,6 +31,13 @@ def wrapper(*args, **kwargs): return wrapper +from ._memoryboard import _remote_memory, _address_and_size, _atomic_byte_lock + +_remote_memory = guard_internal_use(_remote_memory) +_address_and_size = guard_internal_use(_address_and_size) +_atomic_byte_lock = guard_internal_use(_atomic_byte_lock) + + class clsproperty: def __init__(self, method): self.method = method From e47fa72931edc18d57d4bf8ee5b36f15ebfe77f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Sun, 29 Sep 2024 15:17:09 -0300 Subject: [PATCH 02/14] Structs: remove '_from_values', create '_push_to' method --- src/extrainterpreters/lock.py | 110 +++++++++++++++++++++++++++++++++ src/extrainterpreters/utils.py | 16 ++--- tests/test_lock.py | 0 tests/test_struct.py | 12 ++++ 4 files changed, 130 insertions(+), 8 deletions(-) create mode 100644 src/extrainterpreters/lock.py create mode 100644 tests/test_lock.py diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py new file mode 100644 index 0000000..231a301 --- /dev/null +++ b/src/extrainterpreters/lock.py @@ -0,0 +1,110 @@ +import os +import pickle +import threading +import time +import sys +from functools import wraps + +from collections.abc import MutableSequence + +from . import interpreters, running_interpreters, get_current, raw_list_all +from . import _memoryboard +from .utils import ( + guard_internal_use, + Field, + DoubleField, + StructBase, + _InstMode, + ResourceBusyError, +) + +from ._memoryboard import _remote_memory, _address_and_size, _atomic_byte_lock + +_remote_memory = guard_internal_use(_remote_memory) +_address_and_size = guard_internal_use(_address_and_size) +_atomic_byte_lock = guard_internal_use(_atomic_byte_lock) + + +class RemoteState: + building = 0 + ready = 1 + serialized = 2 + received = 2 + garbage = 3 + + +class RemoteHeader(StructBase): + lock = Field(1) + state = Field(1) + enter_count = Field(3) + exit_count = Field(3) + + +class RemoteDataState: + not_ready = 0 + read_only = 1 # not used for now. + read_write = 2 + + +TIME_RESOLUTION = sys.getswitchinterval() +DEFAULT_TIMEOUT = 50 * TIME_RESOLUTION +DEFAULT_TTL = 3600 +REMOTE_HEADER_SIZE = RemoteHeader._size + + +class _CrossInterpreterStructLock: + def __init__(self, struct, timeout=DEFAULT_TIMEOUT): + buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) + # struct_ptr = buffer_ptr + struct._offset + lock_offset = struct._offset + struct._get_offset_for_field("lock") + if lock_offset >= size: + raise ValueError("Lock address out of bounds for struct buffer") + self._lock_address = buffer_ptr + lock_offset + self._original_timeout = self._timeout = timeout + self._entered = 0 + + def timeout(self, timeout: None | float): + """One use only timeout, for the same lock + + with lock.timeout(0.5): + ... + """ + self._timeout = timeout + return self + + def __enter__(self): + if self._entered: + self._entered += 1 + return self + if self._timeout is None: + if not _atomic_byte_lock(self._lock_address): + self._timeout = self._original_timeout + raise ResourceBusyError("Couldn't acquire lock") + else: + threshold = time.time() + self._timeout + while time.time() <= threshold: + if _atomic_byte_lock(self._lock_address): + break + else: + self._timeout = self._original_timeout + raise TimeoutError("Timeout trying to acquire lock") + self._entered += 1 + return self + + def __exit__(self, *args): + if not self._entered: + return + self._entered -= 1 + if self._entered: + return + buffer = _remote_memory(self._lock_address, 1) + buffer[0] = 0 + del buffer + self._entered = False + self._timeout = self._original_timeout + + def __getstate__(self): + state = self.__dict__.copy() + state["_entered"] = False + return state + diff --git a/src/extrainterpreters/utils.py b/src/extrainterpreters/utils.py index 166d0e7..56fb00d 100644 --- a/src/extrainterpreters/utils.py +++ b/src/extrainterpreters/utils.py @@ -144,14 +144,6 @@ def _fields(cls): if isinstance(v, RawField): yield k - @classmethod - def _from_values(cls, *args): - data = bytearray(b"\x00" * cls._size) - self = cls(data, 0) - for arg, field_name in zip(args, self._fields): - setattr(self, field_name, arg) - return self - @property def _bytes(self): return bytes(self._data[self._offset : self._offset + self._size]) @@ -173,6 +165,14 @@ def _get_offset_for_field(cls, field_name): field = getattr(cls, field_name) return field._calc_offset(cls) + def _push_to(self, data, offset=0): + """Paste struct data into a new buffer given by data, offset + + Returns a new instance pointing to the data in the new copy. + """ + data[offset: offset + self._size] = self._bytes + return self._from_data(data, offset) + def __repr__(self): field_data = [] for field_name in self._fields: diff --git a/tests/test_lock.py b/tests/test_lock.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_struct.py b/tests/test_struct.py index 6271e01..e491113 100644 --- a/tests/test_struct.py +++ b/tests/test_struct.py @@ -65,6 +65,18 @@ class Struct(StructBase): assert s.data_offset == 1000 assert s.length == 2_000_000 +def test_struct_push_to(): + class Struct(StructBase): + data_offset = Field(2) + length = Field(4) + + s = Struct(data_offset=1000, length=2_000_000) + + data = bytearray(Struct._size) + n = s._push_to(data, 0) + assert n.data_offset == 1000 + assert n.length == 2_000_000 + def test_struct_bytes(): class Struct(StructBase): From ea500fb6e3666239cb365b8d1091f91e8ca675a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 01:44:45 -0300 Subject: [PATCH 03/14] [WIP] tests --- src/extrainterpreters/__init__.py | 1 + src/extrainterpreters/lock.py | 85 ++++++++++++++++------------ src/extrainterpreters/memoryboard.py | 57 ------------------- tests/test_lock.py | 84 +++++++++++++++++++++++++++ 4 files changed, 134 insertions(+), 93 deletions(-) diff --git a/src/extrainterpreters/__init__.py b/src/extrainterpreters/__init__.py index c6a483f..71f7000 100644 --- a/src/extrainterpreters/__init__.py +++ b/src/extrainterpreters/__init__.py @@ -69,6 +69,7 @@ def raw_list_all(): from .base_interpreter import BaseInterpreter from .queue import SingleQueue, Queue from .simple_interpreter import SimpleInterpreter as Interpreter +from .lock import Lock, RLock def list_all(): diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index 231a301..83c9d53 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -1,55 +1,26 @@ -import os -import pickle -import threading import time import sys -from functools import wraps -from collections.abc import MutableSequence -from . import interpreters, running_interpreters, get_current, raw_list_all -from . import _memoryboard +from . import running_interpreters + from .utils import ( + _atomic_byte_lock, + _remote_memory, + _address_and_size, guard_internal_use, Field, - DoubleField, StructBase, - _InstMode, ResourceBusyError, ) -from ._memoryboard import _remote_memory, _address_and_size, _atomic_byte_lock - -_remote_memory = guard_internal_use(_remote_memory) -_address_and_size = guard_internal_use(_address_and_size) -_atomic_byte_lock = guard_internal_use(_atomic_byte_lock) - - -class RemoteState: - building = 0 - ready = 1 - serialized = 2 - received = 2 - garbage = 3 - - -class RemoteHeader(StructBase): +class _LockBuffer(StructBase): lock = Field(1) - state = Field(1) - enter_count = Field(3) - exit_count = Field(3) - - -class RemoteDataState: - not_ready = 0 - read_only = 1 # not used for now. - read_write = 2 - TIME_RESOLUTION = sys.getswitchinterval() DEFAULT_TIMEOUT = 50 * TIME_RESOLUTION DEFAULT_TTL = 3600 -REMOTE_HEADER_SIZE = RemoteHeader._size +LOCK_BUFFER_SIZE = _LockBuffer._size class _CrossInterpreterStructLock: @@ -73,6 +44,8 @@ def timeout(self, timeout: None | float): return self def __enter__(self): + # Remember: all attributes are "interpreter-local" + # just the bytes in the passed in struct are shared. if self._entered: self._entered += 1 return self @@ -108,3 +81,43 @@ def __getstate__(self): state["_entered"] = False return state + +class IntRLock: + """Cross Interpreter re-entrant lock + + This will allow re-entrant acquires in the same + interpreter, _even_ if it is being acquired + in another thread in the same interpreter. + + It should not be very useful - but + the implementation path code leads here. Prefer the public + "RLock" and "Lock" classes to avoid surprises. + """ + + def __init__(self): + self._lock = _LockBuffer(lock=0) + + def acquire(self, blocking=True, timeout=-1): + pass + + def release(self): + pass + + def locked(self): + return False + + +class RLock(IntRLock): + """Cross interpreter re-entrant lock, analogous to + threading.RLock + https://docs.python.org/3/library/threading.html#rlock-objects + + More specifically: it will allow re-entrancy in + _the same thread_ and _same interpreter_ - + a different thread in the same interpreter will still + be blocked out. + """ + + +class Lock(IntRLock): + pass diff --git a/src/extrainterpreters/memoryboard.py b/src/extrainterpreters/memoryboard.py index f387190..a7854ae 100644 --- a/src/extrainterpreters/memoryboard.py +++ b/src/extrainterpreters/memoryboard.py @@ -50,63 +50,6 @@ class RemoteDataState: REMOTE_HEADER_SIZE = RemoteHeader._size -#class _CrossInterpreterStructLock: - #def __init__(self, struct, timeout=DEFAULT_TIMEOUT): - #buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) - ## struct_ptr = buffer_ptr + struct._offset - #lock_offset = struct._offset + struct._get_offset_for_field("lock") - #if lock_offset >= size: - #raise ValueError("Lock address out of bounds for struct buffer") - #self._lock_address = buffer_ptr + lock_offset - #self._original_timeout = self._timeout = timeout - #self._entered = 0 - - #def timeout(self, timeout: None | float): - #"""One use only timeout, for the same lock - - #with lock.timeout(0.5): - #... - #""" - #self._timeout = timeout - #return self - - #def __enter__(self): - #if self._entered: - #self._entered += 1 - #return self - #if self._timeout is None: - #if not _atomic_byte_lock(self._lock_address): - #self._timeout = self._original_timeout - #raise ResourceBusyError("Couldn't acquire lock") - #else: - #threshold = time.time() + self._timeout - #while time.time() <= threshold: - #if _atomic_byte_lock(self._lock_address): - #break - #else: - #self._timeout = self._original_timeout - #raise TimeoutError("Timeout trying to acquire lock") - #self._entered += 1 - #return self - - #def __exit__(self, *args): - #if not self._entered: - #return - #self._entered -= 1 - #if self._entered: - #return - #buffer = _remote_memory(self._lock_address, 1) - #buffer[0] = 0 - #del buffer - #self._entered = False - #self._timeout = self._original_timeout - - #def __getstate__(self): - #state = self.__dict__.copy() - #state["_entered"] = False - #return state - - # when a RemoteArray can't be destroyed in parent, # it comes to "sleep" here, where a callback in the # GC will periodically try to remove it: diff --git a/tests/test_lock.py b/tests/test_lock.py index e69de29..1629754 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -0,0 +1,84 @@ +import pickle +from textwrap import dedent as D + + +import pytest + +import extrainterpreters as ei + + +from extrainterpreters import Lock, RLock +from extrainterpreters.lock import IntRLock + +@pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) +def test_locks_are_acquireable(LockCls): + lock = LockCls() + assert not lock.locked() + lock.acquire() + assert lock.locked() + lock.release() + assert not lock.locked() + + +@pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) +def test_locks_work_as_context_manager(LockCls): + lock = LockCls() + assert not lock.locked() + with lock: + assert lock.locked() + assert not lock.locked() + + + +def test_lock_cant_be_reacquired(): + lock = Lock() + + lock.acquire() + + with pytest.raises(TimeoutError): + lock.acquire(timeout=0) + + +@pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) +def test_locks_cant_be_passed_to_other_interpreter(LockCls): + lock = LockCls() + interp = ei.Interpreter().start() + interp.run_string( + D( + f""" + import extrainterpreters; extrainterpreters.DEBUG=True + lock = pickle.loads({pickle.dumps(lock)}) + assert lock._lock._data == 0 + """ + ) + ) + lock._lock._data[0] = 2 + interp.run_string( + D( + """ + assert lock._lock._data[0] == 2 + lock._lock._data[0] = 5 + """ + ) + ) + assert lock._lock._data[0] == 5 + interp.close() + + +#@pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) +#def test_locks_cant_be_acquired_in_other_interpreter(LockCls): + #lock = LockCls() + #interp = ei.Interpreter().start() + #board.new_item((1, 2)) + #interp.run_string( + #D( + #f""" + #import extrainterpreters; extrainterpreters.DEBUG=True + #lock = pickle.loads({pickle.dumps(lock)}) + + #index, item = board.fetch_item() + #assert item == (1,2) + #board.new_item((3,4)) + #""" + #) + #) From aebfcbef1eaf4e46d731ead97301d38a49b26dc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 02:18:41 -0300 Subject: [PATCH 04/14] Factor-out RemoteArray --- src/extrainterpreters/memoryboard.py | 389 +----------------------- src/extrainterpreters/queue.py | 3 +- src/extrainterpreters/remote_array.py | 414 ++++++++++++++++++++++++++ tests/test_boards.py | 106 +++---- 4 files changed, 473 insertions(+), 439 deletions(-) create mode 100644 src/extrainterpreters/remote_array.py diff --git a/src/extrainterpreters/memoryboard.py b/src/extrainterpreters/memoryboard.py index a7854ae..a8bafb6 100644 --- a/src/extrainterpreters/memoryboard.py +++ b/src/extrainterpreters/memoryboard.py @@ -9,6 +9,7 @@ from . import interpreters, running_interpreters, get_current, raw_list_all from . import _memoryboard +from .remote_array import RemoteArray from .utils import ( _InstMode, _remote_memory, @@ -23,396 +24,8 @@ from .lock import _CrossInterpreterStructLock -class RemoteState: - building = 0 - ready = 1 - serialized = 2 - received = 2 - garbage = 3 - -class RemoteHeader(StructBase): - lock = Field(1) - state = Field(1) - enter_count = Field(3) - exit_count = Field(3) - - -class RemoteDataState: - not_ready = 0 - read_only = 1 # not used for now. - read_write = 2 - - -TIME_RESOLUTION = sys.getswitchinterval() -DEFAULT_TIMEOUT = 50 * TIME_RESOLUTION DEFAULT_TTL = 3600 -REMOTE_HEADER_SIZE = RemoteHeader._size - - -# when a RemoteArray can't be destroyed in parent, -# it comes to "sleep" here, where a callback in the -# GC will periodically try to remove it: -_array_registry = [] - -_collecting_generation = 1 - - -def _collector(action, data): - """Garbage Collector "plug-in": - - when a RemoteBuffer is closed in parent interpreter - earlier than it is exist in sub-interpreters, - it is decomissioned and put in "_array_registry". - - This function will be called when the garbage collector - is run, and check if any pending buffers can be fully - dealocated. - """ - if action != "start" or data.get("generation", 0) < _collecting_generation: - return - if not _array_registry: - return - new_registry = [] - for buffer in _array_registry: - buffer.close() - if buffer._data is not None: - new_registry.append(buffer) - _array_registry[:] = new_registry - - -import gc - -gc.callbacks.append(_collector) - - -@MutableSequence.register -class RemoteArray: - """[WIP] - Single class which can hold shared buffers across interpreters. - - It is used in the internal mechanisms of extrainterpreters, but offers - enough safeguards to be used in user code - upon being sending to a - remote interpreter, data can be shared through this structure in a safe way. - - (It can be sent to the sub-interpreter through a Queue, or by unpckling it - in a "run_string" call) - - It offers both byte-access with item-setting (Use sliced notation to - write a lot of data at once) and a file-like interface, mainly for providing - pickle compatibility. - - """ - - """ - Life cycle semantics: - - creation: set header state to "building" - - on first serialize (__getstate__), set a "serialized" state: - - can no longer be deleted, unless further criteria are met - - mark timestamp on buffer - this is checked against TTL - - on de-serialize: do nothing - - on client-side "__del__" without enter: - - increase cancel in buffer canceled counter (?) - - on client-side "__enter__": - - check TTL against serialization timestamp - on fail, raise - - increment "entered" counter on buffer - - on client-side "__exit__": - - increment "exited" counter on buffer - - on parent side "exit": - - check serialization: - if no serialization ocurred, just destroy buffer. - - check enter and exit on buffer counters: - if failed, (more enters than exits) save to "pending deletion" - - check TTL against timestamp of serialization: - if TTL not reached, save to "pending deletion" - - on parent side "__del__": - - call __exit__ - - - suggested default TTL: 1 seconds - - - check for the possibility of a GC hook (gc.callbacks list) - - if possible, iterate all "pending deletion" and check conditions in "__exit__" - - if no GC hook possible,think of another reasonable mechanism to periodically try to delete pending deletion buffers. (dedicate thread with one check per second? Less frequent? - - """ - __slots__ = ( - "_cursor", - "_lock", - "_data", - "_data_state", - "_size", - "_anchor", - "_mode", - "_timestamp", - "_ttl", - "_internal", - ) - - def __init__(self, *, size=None, payload=None, ttl=DEFAULT_TTL): - if size is None and payload is not None: - size = len(payload) - self._size = size - self._data = bytearray(b"\x00" * (size + REMOTE_HEADER_SIZE)) - if payload: - # TBD: Temporary thing - we are allowing zero-copy buffers soon - self._data[REMOTE_HEADER_SIZE:] = payload - # Keeping reference to a "normal" memoryview, so that ._data - # can't be resized (and worse: repositioned) by the interpreter. - # trying to do so will raise a BufferError - self._anchor = memoryview(self._data) - self._cursor = 0 - self._data_state = RemoteDataState.read_write - self._lock = _CrossInterpreterStructLock(self.header) - self._mode = _InstMode.parent - self._ttl = ttl - self.header.state = RemoteState.building - - @property - def header(self): - if self._data_state == RemoteDataState.not_ready: - raise RuntimeError("Trying to use buffer metadata not ready for use.") - return RemoteHeader._from_data(self._data, 0) - - def _convert_index(self, index): - if isinstance(index, slice): - start, stop, step = index.indices(self._size) - index = slice(start + REMOTE_HEADER_SIZE, stop + REMOTE_HEADER_SIZE, step) - else: - index += REMOTE_HEADER_SIZE - return index - - def __getitem__(self, index): - if not self._data_state in ( - RemoteDataState.read_only, - RemoteDataState.read_write, - ): - raise RuntimeError( - "Trying to read data from buffer that is not ready for use" - ) - return self._data.__getitem__(self._convert_index(index)) - - def __setitem__(self, index, value): - # TBD: Maybe require lock? - # An option is to fail if unable to get the lock, and - # provide a timeouted method that will wait for it. - # (going for that): - if self._data_state != RemoteDataState.read_write: - raise RuntimeError( - "Trying to write data to buffer that is not ready for use" - ) - with self._lock: - return self._data.__setitem__(self._convert_index(index), value) - raise RuntimeError("Remote Array busy in other thread") - - def _enter_child(self): - ttl = self._check_ttl() - if not ttl: - raise RuntimeError( - f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}" - ) - self._data = _remote_memory(*self._internal[:2]) - self._lock = self._internal[2] - self._cursor = 0 - with self._lock: - # Avoid race conditions: better re-test the TTL - ttl = self._check_ttl() - if not ttl: - self._data = None - raise RuntimeError( - f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}, (stage 2)" - ) - self._data_state = RemoteDataState.read_write - if (state := self.header.state) not in ( - RemoteState.serialized, - RemoteState.received, - ): - self._data = None - raise RuntimeError(f"Invalid state in buffer: {state}") - self.header.enter_count += 1 - return self - - def _enter_parent(self): - if self.header.state != RemoteState.building: - raise RuntimeError("Cannot enter buffer: invalid state") - self.header.state = RemoteState.ready - self._data_state = RemoteDataState.read_write - return self - - def start(self): - if self._mode == _InstMode.zombie: - raise RuntimeError( - "This buffer is decomissioned and no longer can be used for data exchange" - ) - return ( - self._enter_child() - if self._mode == _InstMode.child - else self._enter_parent() - ) - - def __delitem__(self, index): - raise NotImplementedError() - - def __len__(self): - return self._size - - def iter(self): - return iter(self.data) - - def read(self, n=None): - with self._lock: - if n is None: - n = len(self) - self._cursor - prev = self._cursor - self._cursor += n - return self[prev : self._cursor] - - def write(self, content): - with self._lock: - if isinstance(content, str): - content = content.encode("utf-8") - self[self._cursor : self._cursor + len(content)] = content - self._cursor += len(content) - - def tell(self): - return self._cursor - - def readline(self): - # needed by pickle.load - result = [] - read = 0 - with self._lock: - cursor = self._cursor - while read != 0x0A: - if cursor >= len(self): - break - result.append(read := self[cursor]) - cursor += 1 - self._cursor = self.cursor - return bytes(result) - - def seek(self, pos): - self._cursor = pos - - def _data_for_remote(self): - # TBD: adjust when spliting payload buffer from header buffer - # return _address_and_size(self.data) - address, length = _address_and_size(self._data) - address += RemoteHeader._size - length -= RemoteHeader._size - return address, length - - def __getstate__(self): - with self._lock: - if self.header.state not in ( - RemoteState.ready, - RemoteState.serialized, - RemoteState.received, - ): - raise RuntimeError( - f"Can not pickle remote buffer in current state {self.header.state=}" - ) - with self._lock: - if self.header.state == RemoteState.ready: - self.header.state = RemoteState.serialized - state = {"buffer_data": _address_and_size(self._data)} - state["ttl"] = self._ttl - # if not hasattr(self, "_timestamp"): - # self._timestamp = time.monotonic() - self._timestamp = time.monotonic() - state["timestamp"] = self._timestamp - state["_lock"] = self._lock - return state - - def __setstate__(self, state): - self._internal = state["buffer_data"] + (state["_lock"],) - self._ttl = state["ttl"] - self._timestamp = state["timestamp"] - self._size = state["buffer_data"][1] - RemoteHeader._size - # atention: the Lock will use a byte in the buffer, with an independent allocation mechanism. - # It is unpickled and ready to use at this point - but we will - # just add it to the instance in __enter__ , after other checks - # take place. - self._lock = None # state["_lock"] - self._data = None - self._cursor = 0 - self._mode = _InstMode.child - self._data_state = RemoteDataState.not_ready - - def __repr__(self): - return f"<{self.__class__.__name__} with {len(self)} bytes>" - - def _copy_to_limbo(self): - inst = type(self).__new__(type(self)) - inst._anchor = self._anchor - inst._data = self._data - inst._mode = _InstMode.zombie - inst._size = self._size - inst._lock = self._lock - inst._data_state = self._data_state - inst._timestamp = self._timestamp - inst._ttl = self._ttl - _array_registry.append(inst) - - def _check_ttl(self): - """Returns True if time-to-live has not expired""" - if not (timestamp := getattr(self, "_timestamp", None)): - return True - return time.monotonic() - timestamp <= self._ttl - - def close(self): - # when called at interpreter shutdown, "_InstMode" may have been deleted - target_mode = _InstMode.child if globals()["_InstMode"] else "child" - if self._mode == target_mode: - if self._data is None: - return - with self._lock: - self.header.exit_count += 1 - self._data_state = RemoteDataState.not_ready - self._data = None - return - with self._lock: - early_stages = self.header.state in ( - RemoteState.building, - RemoteState.ready, - ) - if early_stages: - self.header.state = RemoteState.garbage - - ttl_cleared = not self._check_ttl() - - if ttl_cleared and self.header.exit_count >= self.header.enter_count: - self.header.state = RemoteState.garbage - - if self.header.state == RemoteState.garbage: - self._data_state = RemoteDataState.not_ready - del self._anchor - self._data = None - return - if self._mode == _InstMode.zombie: - # do nothing on fail - return - self._copy_to_limbo() - del self._anchor - self._data = None - del self._cursor - self._data_state = RemoteDataState.not_ready - # This instance is now a floating "casc" which can no longer access - # data. GC "plugin" will keep trying to delete it. - - def __exit__(self, *args): - return self.close() - - def __enter__(self): - return self.start() - - def __del__(self): - if getattr(self, "_data", None) is not None: - try: - self.close() - except (TypeError, AttributeError): - # at interpreter shutdown, some of the names needed in "close" - # may have been deleted - pass class BufferBase: diff --git a/src/extrainterpreters/queue.py b/src/extrainterpreters/queue.py index bcd0fa3..6b4c1bf 100644 --- a/src/extrainterpreters/queue.py +++ b/src/extrainterpreters/queue.py @@ -18,7 +18,8 @@ non_reentrant, ResourceBusyError, ) -from .memoryboard import LockableBoard, RemoteArray, RemoteState +from .remote_array import RemoteArray, RemoteState +from .memoryboard import LockableBoard from . import interpreters, get_current from .resources import EISelector, register_pipe, PIPE_REGISTRY diff --git a/src/extrainterpreters/remote_array.py b/src/extrainterpreters/remote_array.py new file mode 100644 index 0000000..2d2b0bb --- /dev/null +++ b/src/extrainterpreters/remote_array.py @@ -0,0 +1,414 @@ +import gc +import os +import pickle +import threading +import time +import sys +from functools import wraps + +from collections.abc import MutableSequence + +from . import interpreters, running_interpreters, get_current, raw_list_all +from . import _memoryboard +from .utils import ( + _InstMode, + _remote_memory, + _address_and_size, + _atomic_byte_lock, + DoubleField, + Field, + StructBase, + ResourceBusyError, + guard_internal_use +) + +from .lock import _CrossInterpreterStructLock + +class RemoteState: + building = 0 + ready = 1 + serialized = 2 + received = 2 + garbage = 3 + + +class RemoteHeader(StructBase): + lock = Field(1) + state = Field(1) + enter_count = Field(3) + exit_count = Field(3) + + +class RemoteDataState: + not_ready = 0 + read_only = 1 # not used for now. + read_write = 2 + + +DEFAULT_TTL = 3600 +REMOTE_HEADER_SIZE = RemoteHeader._size + + +# when a RemoteArray can't be destroyed in parent, +# it comes to "sleep" here, where a callback in the +# GC will periodically try to remove it: +_array_registry = [] + +_collecting_generation = 1 + + +def _collector(action, data): + """Garbage Collector "plug-in": + + when a RemoteBuffer is closed in parent interpreter + earlier than it is exist in sub-interpreters, + it is decomissioned and put in "_array_registry". + + This function will be called when the garbage collector + is run, and check if any pending buffers can be fully + dealocated. + """ + if action != "start" or data.get("generation", 0) < _collecting_generation: + return + if not _array_registry: + return + new_registry = [] + for buffer in _array_registry: + buffer.close() + if buffer._data is not None: + new_registry.append(buffer) + _array_registry[:] = new_registry + + + +gc.callbacks.append(_collector) + + +@MutableSequence.register +class RemoteArray: + """[WIP] + Single class which can hold shared buffers across interpreters. + + It is used in the internal mechanisms of extrainterpreters, but offers + enough safeguards to be used in user code - upon being sending to a + remote interpreter, data can be shared through this structure in a safe way. + + (It can be sent to the sub-interpreter through a Queue, or by unpckling it + in a "run_string" call) + + It offers both byte-access with item-setting (Use sliced notation to + write a lot of data at once) and a file-like interface, mainly for providing + pickle compatibility. + + """ + + """ + Life cycle semantics: + - creation: set header state to "building" + - on first serialize (__getstate__), set a "serialized" state: + - can no longer be deleted, unless further criteria are met + - mark timestamp on buffer - this is checked against TTL + - on de-serialize: do nothing + - on client-side "__del__" without enter: + - increase cancel in buffer canceled counter (?) + - on client-side "__enter__": + - check TTL against serialization timestamp - on fail, raise + - increment "entered" counter on buffer + - on client-side "__exit__": + - increment "exited" counter on buffer + - on parent side "exit": + - check serialization: + if no serialization ocurred, just destroy buffer. + - check enter and exit on buffer counters: + if failed, (more enters than exits) save to "pending deletion" + - check TTL against timestamp of serialization: + if TTL not reached, save to "pending deletion" + - on parent side "__del__": + - call __exit__ + + - suggested default TTL: 1 seconds + + - check for the possibility of a GC hook (gc.callbacks list) + - if possible, iterate all "pending deletion" and check conditions in "__exit__" + - if no GC hook possible,think of another reasonable mechanism to periodically try to delete pending deletion buffers. (dedicate thread with one check per second? Less frequent? + + """ + __slots__ = ( + "_cursor", + "_lock", + "_data", + "_data_state", + "_size", + "_anchor", + "_mode", + "_timestamp", + "_ttl", + "_internal", + ) + + def __init__(self, *, size=None, payload=None, ttl=DEFAULT_TTL): + if size is None and payload is not None: + size = len(payload) + self._size = size + self._data = bytearray(b"\x00" * (size + REMOTE_HEADER_SIZE)) + if payload: + # TBD: Temporary thing - we are allowing zero-copy buffers soon + self._data[REMOTE_HEADER_SIZE:] = payload + # Keeping reference to a "normal" memoryview, so that ._data + # can't be resized (and worse: repositioned) by the interpreter. + # trying to do so will raise a BufferError + self._anchor = memoryview(self._data) + self._cursor = 0 + self._data_state = RemoteDataState.read_write + self._lock = _CrossInterpreterStructLock(self.header) + self._mode = _InstMode.parent + self._ttl = ttl + self.header.state = RemoteState.building + + @property + def header(self): + if self._data_state == RemoteDataState.not_ready: + raise RuntimeError("Trying to use buffer metadata not ready for use.") + return RemoteHeader._from_data(self._data, 0) + + def _convert_index(self, index): + if isinstance(index, slice): + start, stop, step = index.indices(self._size) + index = slice(start + REMOTE_HEADER_SIZE, stop + REMOTE_HEADER_SIZE, step) + else: + index += REMOTE_HEADER_SIZE + return index + + def __getitem__(self, index): + if not self._data_state in ( + RemoteDataState.read_only, + RemoteDataState.read_write, + ): + raise RuntimeError( + "Trying to read data from buffer that is not ready for use" + ) + return self._data.__getitem__(self._convert_index(index)) + + def __setitem__(self, index, value): + # TBD: Maybe require lock? + # An option is to fail if unable to get the lock, and + # provide a timeouted method that will wait for it. + # (going for that): + if self._data_state != RemoteDataState.read_write: + raise RuntimeError( + "Trying to write data to buffer that is not ready for use" + ) + with self._lock: + return self._data.__setitem__(self._convert_index(index), value) + raise RuntimeError("Remote Array busy in other thread") + + def _enter_child(self): + ttl = self._check_ttl() + if not ttl: + raise RuntimeError( + f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}" + ) + self._data = _remote_memory(*self._internal[:2]) + self._lock = self._internal[2] + self._cursor = 0 + with self._lock: + # Avoid race conditions: better re-test the TTL + ttl = self._check_ttl() + if not ttl: + self._data = None + raise RuntimeError( + f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}, (stage 2)" + ) + self._data_state = RemoteDataState.read_write + if (state := self.header.state) not in ( + RemoteState.serialized, + RemoteState.received, + ): + self._data = None + raise RuntimeError(f"Invalid state in buffer: {state}") + self.header.enter_count += 1 + return self + + def _enter_parent(self): + if self.header.state != RemoteState.building: + raise RuntimeError("Cannot enter buffer: invalid state") + self.header.state = RemoteState.ready + self._data_state = RemoteDataState.read_write + return self + + def start(self): + if self._mode == _InstMode.zombie: + raise RuntimeError( + "This buffer is decomissioned and no longer can be used for data exchange" + ) + return ( + self._enter_child() + if self._mode == _InstMode.child + else self._enter_parent() + ) + + def __delitem__(self, index): + raise NotImplementedError() + + def __len__(self): + return self._size + + def iter(self): + return iter(self.data) + + def read(self, n=None): + with self._lock: + if n is None: + n = len(self) - self._cursor + prev = self._cursor + self._cursor += n + return self[prev : self._cursor] + + def write(self, content): + with self._lock: + if isinstance(content, str): + content = content.encode("utf-8") + self[self._cursor : self._cursor + len(content)] = content + self._cursor += len(content) + + def tell(self): + return self._cursor + + def readline(self): + # needed by pickle.load + result = [] + read = 0 + with self._lock: + cursor = self._cursor + while read != 0x0A: + if cursor >= len(self): + break + result.append(read := self[cursor]) + cursor += 1 + self._cursor = self.cursor + return bytes(result) + + def seek(self, pos): + self._cursor = pos + + def _data_for_remote(self): + # TBD: adjust when spliting payload buffer from header buffer + # return _address_and_size(self.data) + address, length = _address_and_size(self._data) + address += RemoteHeader._size + length -= RemoteHeader._size + return address, length + + def __getstate__(self): + with self._lock: + if self.header.state not in ( + RemoteState.ready, + RemoteState.serialized, + RemoteState.received, + ): + raise RuntimeError( + f"Can not pickle remote buffer in current state {self.header.state=}" + ) + with self._lock: + if self.header.state == RemoteState.ready: + self.header.state = RemoteState.serialized + state = {"buffer_data": _address_and_size(self._data)} + state["ttl"] = self._ttl + # if not hasattr(self, "_timestamp"): + # self._timestamp = time.monotonic() + self._timestamp = time.monotonic() + state["timestamp"] = self._timestamp + state["_lock"] = self._lock + return state + + def __setstate__(self, state): + self._internal = state["buffer_data"] + (state["_lock"],) + self._ttl = state["ttl"] + self._timestamp = state["timestamp"] + self._size = state["buffer_data"][1] - RemoteHeader._size + # atention: the Lock will use a byte in the buffer, with an independent allocation mechanism. + # It is unpickled and ready to use at this point - but we will + # just add it to the instance in __enter__ , after other checks + # take place. + self._lock = None # state["_lock"] + self._data = None + self._cursor = 0 + self._mode = _InstMode.child + self._data_state = RemoteDataState.not_ready + + def __repr__(self): + return f"<{self.__class__.__name__} with {len(self)} bytes>" + + def _copy_to_limbo(self): + inst = type(self).__new__(type(self)) + inst._anchor = self._anchor + inst._data = self._data + inst._mode = _InstMode.zombie + inst._size = self._size + inst._lock = self._lock + inst._data_state = self._data_state + inst._timestamp = self._timestamp + inst._ttl = self._ttl + _array_registry.append(inst) + + def _check_ttl(self): + """Returns True if time-to-live has not expired""" + if not (timestamp := getattr(self, "_timestamp", None)): + return True + return time.monotonic() - timestamp <= self._ttl + + def close(self): + # when called at interpreter shutdown, "_InstMode" may have been deleted + target_mode = _InstMode.child if globals()["_InstMode"] else "child" + if self._mode == target_mode: + if self._data is None: + return + with self._lock: + self.header.exit_count += 1 + self._data_state = RemoteDataState.not_ready + self._data = None + return + with self._lock: + early_stages = self.header.state in ( + RemoteState.building, + RemoteState.ready, + ) + if early_stages: + self.header.state = RemoteState.garbage + + ttl_cleared = not self._check_ttl() + + if ttl_cleared and self.header.exit_count >= self.header.enter_count: + self.header.state = RemoteState.garbage + + if self.header.state == RemoteState.garbage: + self._data_state = RemoteDataState.not_ready + del self._anchor + self._data = None + return + if self._mode == _InstMode.zombie: + # do nothing on fail + return + self._copy_to_limbo() + del self._anchor + self._data = None + del self._cursor + self._data_state = RemoteDataState.not_ready + # This instance is now a floating "casc" which can no longer access + # data. GC "plugin" will keep trying to delete it. + + def __exit__(self, *args): + return self.close() + + def __enter__(self): + return self.start() + + def __del__(self): + if getattr(self, "_data", None) is not None: + try: + self.close() + except (TypeError, AttributeError): + # at interpreter shutdown, some of the names needed in "close" + # may have been deleted + pass + diff --git a/tests/test_boards.py b/tests/test_boards.py index b6753a3..a130c24 100644 --- a/tests/test_boards.py +++ b/tests/test_boards.py @@ -8,9 +8,11 @@ _CrossInterpreterStructLock, RemoteArray, ) +from extrainterpreters.remote_array import RemoteArray, RemoteDataState, RemoteState +from extrainterpreters import Interpreter from extrainterpreters import memoryboard +from extrainterpreters import remote_array from extrainterpreters.utils import StructBase, Field -from extrainterpreters import Interpreter import extrainterpreters as ei @@ -128,60 +130,64 @@ def test_structlock_timeout_is_restored_on_acquire_fail(lockable, set_timeout): def test_remotearray_base(): # temp work around: other tests are leaking interpreters and boards! - xx = memoryboard._array_registry - memoryboard._array_registry = [] - buffer = RemoteArray(size=2048, ttl=0.05) - assert buffer.header.state == memoryboard.RemoteState.building - with buffer: - assert buffer.header.state == memoryboard.RemoteState.ready - serialized = pickle.dumps(buffer) - assert buffer.header.state == memoryboard.RemoteState.serialized - new_buffer = pickle.loads(serialized) - assert buffer.header.state == memoryboard.RemoteState.serialized - with new_buffer: - assert buffer.header.state == memoryboard.RemoteState.received - - new_buffer[0] = 255 - - assert buffer.header.enter_count == 1 - assert buffer.header.exit_count == 0 - with pytest.raises(RuntimeError): - new_buffer[0] = 128 - assert buffer.header.exit_count == 1 - assert buffer[0] == 255 - time.sleep(buffer._ttl * 1.1) - assert buffer._data is None - assert len(memoryboard._array_registry) == 0 - memoryboard._array_registry = xx + xx = remote_array._array_registry + try: + remote_array._array_registry = [] + buffer = RemoteArray(size=2048, ttl=0.05) + assert buffer.header.state == RemoteState.building + with buffer: + assert buffer.header.state == RemoteState.ready + serialized = pickle.dumps(buffer) + assert buffer.header.state == RemoteState.serialized + new_buffer = pickle.loads(serialized) + assert buffer.header.state == RemoteState.serialized + with new_buffer: + assert buffer.header.state == RemoteState.received + + new_buffer[0] = 255 + + assert buffer.header.enter_count == 1 + assert buffer.header.exit_count == 0 + with pytest.raises(RuntimeError): + new_buffer[0] = 128 + assert buffer.header.exit_count == 1 + assert buffer[0] == 255 + time.sleep(buffer._ttl * 1.1) + assert buffer._data is None + assert len(remote_array._array_registry) == 0 + finally: + remote_array._array_registry = xx def test_remotearray_not_deleted_before_ttl_expires(): - xx = memoryboard._array_registry - memoryboard._array_registry = [] import gc + xx = remote_array._array_registry + try: + remote_array._array_registry = [] + + buffer = RemoteArray(size=2048, ttl=0.1) + assert buffer.header.state == RemoteState.building + with buffer: + serialized = pickle.dumps(buffer) + new_buffer = pickle.loads(serialized) + with new_buffer: + pass + assert buffer.header.exit_count == 1 + original_data = buffer._data + gc.disable() + assert buffer._data is None + assert buffer._data_state == RemoteDataState.not_ready + assert len(remote_array._array_registry) == 1 + dead = remote_array._array_registry[0] + assert dead._data is original_data + + gc.enable() - buffer = RemoteArray(size=2048, ttl=0.1) - assert buffer.header.state == memoryboard.RemoteState.building - with buffer: - serialized = pickle.dumps(buffer) - new_buffer = pickle.loads(serialized) - with new_buffer: - pass - assert buffer.header.exit_count == 1 - original_data = buffer._data - gc.disable() - assert buffer._data is None - assert buffer._data_state == memoryboard.RemoteDataState.not_ready - assert len(memoryboard._array_registry) == 1 - dead = memoryboard._array_registry[0] - assert dead._data is original_data - - gc.enable() - - time.sleep(buffer._ttl * 1.1) - gc.collect() - assert len(memoryboard._array_registry) == 0 - memoryboard._array_registry = xx + time.sleep(buffer._ttl * 1.1) + gc.collect() + assert len(remote_array._array_registry) == 0 + finally: + remote_array._array_registry = xx def test_memoryboard_fetch_from_gone_interpreter_doesnot_crash(lowlevel): From e528f8c855b405f1244dad19522ca87605b66037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 03:22:50 -0300 Subject: [PATCH 05/14] [WIP] Locks are passed across interpreters --- src/extrainterpreters/base_interpreter.py | 9 +++++++-- src/extrainterpreters/lock.py | 12 +++++++++++- src/extrainterpreters/remote_array.py | 11 +++-------- src/extrainterpreters/utils.py | 2 +- tests/test_lock.py | 22 +++++++++++++--------- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/extrainterpreters/base_interpreter.py b/src/extrainterpreters/base_interpreter.py index 06a478f..4f6d793 100644 --- a/src/extrainterpreters/base_interpreter.py +++ b/src/extrainterpreters/base_interpreter.py @@ -162,12 +162,17 @@ def execute(self, func, args=(), kwargs=None): ) ) - def run_string(self, code): + def run_string(self, code, raise_=False): """Execs a string of code in associated interpreter Mostly to mirror interpreters.run_string as a convenient method. """ - return interpreters.run_string(self.intno, code) + result = interpreters.run_string(self.intno, code) + if result and raise_: + # In Python 3.13+ indicates an exception occured. + # (in Python 3.12, an exception is raised immediatelly) + raise RuntimeError(result) + return result # currently not working. will raise when the interpreter is destroyed: # def is_running(self): diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index 83c9d53..23661bd 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -4,6 +4,7 @@ from . import running_interpreters +from .remote_array import RemoteArray from .utils import ( _atomic_byte_lock, _remote_memory, @@ -95,7 +96,12 @@ class IntRLock: """ def __init__(self): - self._lock = _LockBuffer(lock=0) + self._buffer = bytearray(1) + # prevents buffer from being moved around by Python allocators + self._anchor = memoryview(self._buffer) + + lock_str = _LockBuffer._from_data(self._buffer) + self._lock = _CrossInterpreterStructLock(lock_str) def acquire(self, blocking=True, timeout=-1): pass @@ -106,6 +112,10 @@ def release(self): def locked(self): return False + def __getstate__(self): + return {"_lock": self._lock} + + class RLock(IntRLock): """Cross interpreter re-entrant lock, analogous to diff --git a/src/extrainterpreters/remote_array.py b/src/extrainterpreters/remote_array.py index 2d2b0bb..787ad96 100644 --- a/src/extrainterpreters/remote_array.py +++ b/src/extrainterpreters/remote_array.py @@ -22,7 +22,6 @@ guard_internal_use ) -from .lock import _CrossInterpreterStructLock class RemoteState: building = 0 @@ -61,7 +60,7 @@ def _collector(action, data): """Garbage Collector "plug-in": when a RemoteBuffer is closed in parent interpreter - earlier than it is exist in sub-interpreters, + earlier than it is exited in sub-interpreters, it is decomissioned and put in "_array_registry". This function will be called when the garbage collector @@ -127,11 +126,6 @@ class RemoteArray: - call __exit__ - suggested default TTL: 1 seconds - - - check for the possibility of a GC hook (gc.callbacks list) - - if possible, iterate all "pending deletion" and check conditions in "__exit__" - - if no GC hook possible,think of another reasonable mechanism to periodically try to delete pending deletion buffers. (dedicate thread with one check per second? Less frequent? - """ __slots__ = ( "_cursor", @@ -147,10 +141,11 @@ class RemoteArray: ) def __init__(self, *, size=None, payload=None, ttl=DEFAULT_TTL): + from .lock import _CrossInterpreterStructLock # avoid circular imports, the easy way. if size is None and payload is not None: size = len(payload) self._size = size - self._data = bytearray(b"\x00" * (size + REMOTE_HEADER_SIZE)) + self._data = bytearray(size + REMOTE_HEADER_SIZE) if payload: # TBD: Temporary thing - we are allowing zero-copy buffers soon self._data[REMOTE_HEADER_SIZE:] = payload diff --git a/src/extrainterpreters/utils.py b/src/extrainterpreters/utils.py index 56fb00d..908f598 100644 --- a/src/extrainterpreters/utils.py +++ b/src/extrainterpreters/utils.py @@ -123,7 +123,7 @@ class StructBase: def __init__(self, **kwargs): self._offset = 0 - self._data = bytearray(b"\x00" * self._size) + self._data = bytearray(self._size) for field_name in self._fields: setattr(self, field_name, kwargs.pop(field_name)) if kwargs: diff --git a/tests/test_lock.py b/tests/test_lock.py index 1629754..5be5ea2 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -40,28 +40,32 @@ def test_lock_cant_be_reacquired(): @pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) -def test_locks_cant_be_passed_to_other_interpreter(LockCls): +def test_locks_can_be_passed_to_other_interpreter(LockCls, lowlevel): lock = LockCls() + lock_data = ei.utils._remote_memory(lock._lock._lock_address, 1) interp = ei.Interpreter().start() interp.run_string( D( f""" - import extrainterpreters; extrainterpreters.DEBUG=True + import extrainterpreters as ei; ei.DEBUG=True lock = pickle.loads({pickle.dumps(lock)}) - assert lock._lock._data == 0 + lock_data = ei.utils._remote_memory(lock._lock._lock_address, 1) + assert lock_data[0] == 0 """ - ) + ), + raise_=True ) - lock._lock._data[0] = 2 + lock_data[0] = 2 interp.run_string( D( """ - assert lock._lock._data[0] == 2 - lock._lock._data[0] = 5 + assert lock_data[0] == 2 + lock_data[0] = 5 """ - ) + ), + raise_=True ) - assert lock._lock._data[0] == 5 + assert lock_data[0] == 5 interp.close() From b5d0e9ee9d1664533122d90d789312aa417f5ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 03:49:51 -0300 Subject: [PATCH 06/14] Context manager protocol forminterpreter-wise rentrant locks implmeneted --- src/extrainterpreters/lock.py | 19 ++++++++++++++++--- tests/test_lock.py | 11 ++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index 23661bd..3d384bd 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -59,6 +59,7 @@ def __enter__(self): while time.time() <= threshold: if _atomic_byte_lock(self._lock_address): break + time.sleep(TIME_RESOLUTION * 4) else: self._timeout = self._original_timeout raise TimeoutError("Timeout trying to acquire lock") @@ -104,13 +105,25 @@ def __init__(self): self._lock = _CrossInterpreterStructLock(lock_str) def acquire(self, blocking=True, timeout=-1): - pass + timeout = None if timeout == -1 or not blocking else timeout + self._lock.timeout(timeout) + self._lock.__enter__() + return def release(self): - pass + self._lock.__exit__() + + def __enter__(self): + self.acquire() + #self._lock.__enter__() + return self + + def __exit__(self, *args): + self.release() + #self._lock.__exit__() def locked(self): - return False + return bool(self._lock._entered) def __getstate__(self): return {"_lock": self._lock} diff --git a/tests/test_lock.py b/tests/test_lock.py index 5be5ea2..4cfd12d 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -30,7 +30,16 @@ def test_locks_work_as_context_manager(LockCls): -def test_lock_cant_be_reacquired(): +def test_lock_cant_be_reacquired_same_interpreter(): + lock = Lock() + + lock.acquire() + + with pytest.raises(TimeoutError): + lock.acquire(timeout=0) + +@pytest.mark.skip("to be implemented") +def test_lock_cant_be_reacquired_other_interpreter(): lock = Lock() lock.acquire() From 7c98d9f6a035f5485bee203bf3936bd2143bce60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 20:04:35 -0300 Subject: [PATCH 07/14] Fixes timeout/block semantics --- src/extrainterpreters/lock.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index 3d384bd..c1eb074 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -1,3 +1,4 @@ +from threading import TIMEOUT_MAX import time import sys @@ -105,10 +106,18 @@ def __init__(self): self._lock = _CrossInterpreterStructLock(lock_str) def acquire(self, blocking=True, timeout=-1): - timeout = None if timeout == -1 or not blocking else timeout - self._lock.timeout(timeout) - self._lock.__enter__() - return + if blocking: + timeout = TIMEOUT_MAX if timeout == -1 or not blocking else timeout + self._lock.timeout(timeout) + self._lock.__enter__() + return + else: + self._lock.timeout(None) + try: + self._lock.__enter__() + except ResourceBusyError: + return False + return True def release(self): self._lock.__exit__() @@ -143,4 +152,8 @@ class RLock(IntRLock): class Lock(IntRLock): - pass + ... + #def acquire(self, blocking=True, timeout=-1): + #if self.locked(): + #if not blocking or timeout == -1: + From b2a1d0114229186ebe2b911428ffbdd05d6ccd89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 20:35:11 -0300 Subject: [PATCH 08/14] Non-rentrant lock working in single interpreter; + more semantics fixes --- src/extrainterpreters/lock.py | 72 +++++++++++++++++++++++++++++------ tests/test_lock.py | 5 +-- 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index c1eb074..dd1c256 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -26,6 +26,24 @@ class _LockBuffer(StructBase): class _CrossInterpreterStructLock: + """Foundations for cross-interpreter lock. + + Used internally, coupled wiht larger memory structs + from which it will consume a single byte - it will + ideally lock those structs. + + A "struct" should be a StructBase class containing a single-byte + "lock" field, with a proper buffer. + + (Keep in mind that when the struct is passed to other interpreters, + if dealocated in the interpreter of origin, the "byte" buffer used + will point to unalocated memory, with certain disaster ahead) + + It is also used as base for the public Lock classes bellow: + those can be used in user-code. + + """ + def __init__(self, struct, timeout=DEFAULT_TIMEOUT): buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) # struct_ptr = buffer_ptr + struct._offset @@ -44,30 +62,36 @@ def timeout(self, timeout: None | float): """ self._timeout = timeout return self - def __enter__(self): + try: + return self.acquire(self._timeout) + finally: + self._timeout = self._original_timeout + + def acquire(self, timeout): # Remember: all attributes are "interpreter-local" # just the bytes in the passed in struct are shared. if self._entered: self._entered += 1 return self - if self._timeout is None: + if timeout is None or timeout == 0: if not _atomic_byte_lock(self._lock_address): - self._timeout = self._original_timeout raise ResourceBusyError("Couldn't acquire lock") else: - threshold = time.time() + self._timeout - while time.time() <= threshold: + threshold = time.monotonic() + timeout + while time.monotonic() <= threshold: if _atomic_byte_lock(self._lock_address): break time.sleep(TIME_RESOLUTION * 4) else: - self._timeout = self._original_timeout raise TimeoutError("Timeout trying to acquire lock") self._entered += 1 return self def __exit__(self, *args): + self.release() + + def release(self): if not self._entered: return self._entered -= 1 @@ -76,7 +100,7 @@ def __exit__(self, *args): buffer = _remote_memory(self._lock_address, 1) buffer[0] = 0 del buffer - self._entered = False + self._entered = 0 self._timeout = self._original_timeout def __getstate__(self): @@ -110,7 +134,7 @@ def acquire(self, blocking=True, timeout=-1): timeout = TIMEOUT_MAX if timeout == -1 or not blocking else timeout self._lock.timeout(timeout) self._lock.__enter__() - return + return True else: self._lock.timeout(None) try: @@ -152,8 +176,32 @@ class RLock(IntRLock): class Lock(IntRLock): - ... - #def acquire(self, blocking=True, timeout=-1): - #if self.locked(): - #if not blocking or timeout == -1: + _delay = 4 * TIME_RESOLUTION + + def acquire(self, blocking=True, timeout=-1): + locked = self.locked() + if self.locked() and not blocking: + return False + timeout = TIMEOUT_MAX if timeout == -1 else timeout + start = time.monotonic() + retry = False + while time.monotonic() - start < timeout: + if self.locked(): + retry = True + else: + try: + self._lock.acquire(0) + except ResourceBusyError: + retry = True + + if self._lock._entered > 1: + retry = True + self._lock.release() + + if not retry: + return True + + time.sleep(self._delay) + retry = False + raise ResourceBusyError("Could not acquire lock") diff --git a/tests/test_lock.py b/tests/test_lock.py index 4cfd12d..421fc4a 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -33,10 +33,9 @@ def test_locks_work_as_context_manager(LockCls): def test_lock_cant_be_reacquired_same_interpreter(): lock = Lock() - lock.acquire() + assert lock.acquire() - with pytest.raises(TimeoutError): - lock.acquire(timeout=0) + assert not lock.acquire(blocking=False) @pytest.mark.skip("to be implemented") def test_lock_cant_be_reacquired_other_interpreter(): From 8d9504017e10419e4f901fbcd5fdf95c93fd46ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Mon, 30 Sep 2024 21:53:26 -0300 Subject: [PATCH 09/14] Make Lock classes safe to pass around --- src/extrainterpreters/lock.py | 36 +++++++++++++++++++++------ tests/test_lock.py | 47 +++++++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index dd1c256..ae46f35 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -45,7 +45,10 @@ class _CrossInterpreterStructLock: """ def __init__(self, struct, timeout=DEFAULT_TIMEOUT): - buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) + if isinstance(struct._data, RemoteArray): + buffer_ptr, size = struct._data._data_for_remote() + else: # bytes, bytearray + buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) # struct_ptr = buffer_ptr + struct._offset lock_offset = struct._offset + struct._get_offset_for_field("lock") if lock_offset >= size: @@ -105,7 +108,7 @@ def release(self): def __getstate__(self): state = self.__dict__.copy() - state["_entered"] = False + state["_entered"] = 0 return state @@ -122,9 +125,21 @@ class IntRLock: """ def __init__(self): - self._buffer = bytearray(1) - # prevents buffer from being moved around by Python allocators - self._anchor = memoryview(self._buffer) + + # RemoteArray is a somewhat high-level data structure, + # which includes another byte for a lock - just + # to take account of the buffer life-cycle + # across interpreters. + + # unfortunatelly, I got no simpler mechanism than that + # to resolve the problem of the Lock object, along + # with the buffer being deleted in its owner interpreter + # while alive in a scondary one. + # (Remotearrays will go to a parking area, waiting until they + # are dereferenced remotely before freeing the memory) + + self._buffer = RemoteArray(size=1) + self._buffer._enter_parent() lock_str = _LockBuffer._from_data(self._buffer) self._lock = _CrossInterpreterStructLock(lock_str) @@ -156,10 +171,17 @@ def __exit__(self, *args): #self._lock.__exit__() def locked(self): - return bool(self._lock._entered) + if self._lock._entered: + return True + try: + self._lock.acquire(0) + except ResourceBusyError: + return True + self._lock.release() + return False def __getstate__(self): - return {"_lock": self._lock} + return {"_lock": self._lock, "_buffer": self._buffer} diff --git a/tests/test_lock.py b/tests/test_lock.py index 421fc4a..157b114 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -1,4 +1,5 @@ import pickle +from functools import partial from textwrap import dedent as D @@ -10,6 +11,21 @@ from extrainterpreters import Lock, RLock from extrainterpreters.lock import IntRLock + +@pytest.fixture +def interpreter(lowlevel): + interp = ei.Interpreter().start() + interp.run_string( + D( + f""" + import extrainterpreters as ei; ei.DEBUG=True + """ + ), + raise_=True + ) + yield interp + interp.close() + @pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) def test_locks_are_acquireable(LockCls): lock = LockCls() @@ -37,25 +53,31 @@ def test_lock_cant_be_reacquired_same_interpreter(): assert not lock.acquire(blocking=False) -@pytest.mark.skip("to be implemented") -def test_lock_cant_be_reacquired_other_interpreter(): - lock = Lock() - - lock.acquire() - with pytest.raises(TimeoutError): - lock.acquire(timeout=0) +def test_lock_cant_be_reacquired_other_interpreter(interpreter): + lock = Lock() + # some assertion lasagna - + # just checks basic toggling - no race conditions tested here: + run = partial(interpreter.run_string, raise_=True) + run(f"lock = pickle.loads({pickle.dumps(lock)})") + run (f"assert lock.acquire(blocking=False)") + assert not lock.acquire(blocking=False) + run (f"assert not lock.acquire(blocking=False)") + run (f"lock.release()") + assert lock.acquire(blocking=False) + run (f"assert not lock.acquire(blocking=False)") + lock.release() + run (f"assert lock.acquire(blocking=False)") + run (f"lock.release()") @pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) -def test_locks_can_be_passed_to_other_interpreter(LockCls, lowlevel): +def test_locks_can_be_passed_to_other_interpreter(LockCls, interpreter): lock = LockCls() lock_data = ei.utils._remote_memory(lock._lock._lock_address, 1) - interp = ei.Interpreter().start() - interp.run_string( + interpreter.run_string( D( f""" - import extrainterpreters as ei; ei.DEBUG=True lock = pickle.loads({pickle.dumps(lock)}) lock_data = ei.utils._remote_memory(lock._lock._lock_address, 1) assert lock_data[0] == 0 @@ -64,7 +86,7 @@ def test_locks_can_be_passed_to_other_interpreter(LockCls, lowlevel): raise_=True ) lock_data[0] = 2 - interp.run_string( + interpreter.run_string( D( """ assert lock_data[0] == 2 @@ -74,7 +96,6 @@ def test_locks_can_be_passed_to_other_interpreter(LockCls, lowlevel): raise_=True ) assert lock_data[0] == 5 - interp.close() #@pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) From a48360e6b0901ac6e155c108fd75785454441200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20S=2E=20O=2E=20Bueno?= Date: Tue, 1 Oct 2024 00:33:23 -0300 Subject: [PATCH 10/14] Adds test of interpreters-lock as thread-lock --- src/extrainterpreters/lock.py | 6 ++---- tests/test_lock.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index ae46f35..92971d0 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -147,13 +147,11 @@ def __init__(self): def acquire(self, blocking=True, timeout=-1): if blocking: timeout = TIMEOUT_MAX if timeout == -1 or not blocking else timeout - self._lock.timeout(timeout) - self._lock.__enter__() + self._lock.acquire(timeout) return True else: - self._lock.timeout(None) try: - self._lock.__enter__() + self._lock.acquire(None) except ResourceBusyError: return False return True diff --git a/tests/test_lock.py b/tests/test_lock.py index 157b114..90e86f2 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -1,4 +1,6 @@ import pickle +import threading +import time from functools import partial from textwrap import dedent as D @@ -71,6 +73,32 @@ def test_lock_cant_be_reacquired_other_interpreter(interpreter): run (f"lock.release()") + +def test_lock_works_across_threads_in_same_interpreter(): + lock = Lock() + results = [] + def aux1(): + # assert code does't work in nested functions + results.append((lock.acquire(blocking=False), "aux1 - first lock, should work")) + time.sleep(0.1) + lock.release() + def aux2(): + time.sleep(0.025) + results.append((not lock.acquire(blocking=False), "aux2 - first lock, should fail")) + time.sleep(0.085) + results.append((lock.acquire(blocking=False), "aux2 - second lock, should work")) + if lock.locked(): + lock.release() + t1 = threading.Thread(target=aux1) + t2 = threading.Thread(target=aux2) + t1.start(); t2.start() + + t1.join(); t2.join() + + for status, message in results: + assert status, message + + @pytest.mark.parametrize("LockCls", [Lock, RLock, IntRLock]) def test_locks_can_be_passed_to_other_interpreter(LockCls, interpreter): lock = LockCls() From 9048de3e49293415880a94115e9ff11ca5a93b38 Mon Sep 17 00:00:00 2001 From: Joao S O Bueno Date: Sat, 16 Nov 2024 14:25:34 -0300 Subject: [PATCH 11/14] Bump version and prepare for release --- .github/workflows/pypi.yml | 2 +- src/extrainterpreters/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 4867b85..1d01a20 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -37,7 +37,7 @@ jobs: - build runs-on: ubuntu-latest environment: - name: release + name: pypi url: https://pypi.org/p/extrainterpreters permissions: id-token: write # IMPORTANT: mandatory for trusted publishing diff --git a/src/extrainterpreters/__init__.py b/src/extrainterpreters/__init__.py index 71f7000..0813f4a 100644 --- a/src/extrainterpreters/__init__.py +++ b/src/extrainterpreters/__init__.py @@ -31,7 +31,7 @@ # Early declarations to avoid circular imports: -__version__ = "0.2-beta3" +__version__ = "0.2.0" BFSZ = 10_000_000 From 6b5d156613251b7e6cfa8ebc6d4da9917c1c9662 Mon Sep 17 00:00:00 2001 From: Joao S O Bueno Date: Sat, 16 Nov 2024 15:13:18 -0300 Subject: [PATCH 12/14] Second adjustment to workflow to use manylinux (A.I. generated yaml) --- .github/workflows/pypi.yml | 46 ++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 1d01a20..c3fa3cd 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -4,30 +4,32 @@ on: push jobs: build: - name: Build distribution 📦 runs-on: ubuntu-latest - environment: - name: release - steps: - - uses: actions/checkout@v4 - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: "3.12" - - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user - - name: Build a binary wheel and a source tarball - run: python3 -m build - - name: Store the distribution packages - uses: actions/upload-artifact@v4 - with: - name: python-package-distributions - path: dist/ + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python versions + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install cibuildwheel + run: python -m pip install cibuildwheel==2.14.1 + + - name: Build wheels + env: + CIBW_BUILD: "cp312-* cp313-*" + CIBW_SKIP: "pp*" + CIBW_PLATFORM: "manylinux2014_x86_64" # Adjust if needed + run: | + python -m cibuildwheel --output-dir dist + + - name: Upload built wheels as artifact + uses: actions/upload-artifact@v4 + with: + name: built-wheels + path: dist publish-to-pypi: name: >- From 2fbd1d4f0b5e3942b45697c9cfe7726aa689260d Mon Sep 17 00:00:00 2001 From: Joao S O Bueno Date: Sat, 16 Nov 2024 15:41:57 -0300 Subject: [PATCH 13/14] Build manylinux wheels, take 3 --- .github/workflows/pypi.yml | 46 +++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index c3fa3cd..0b3ea78 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -1,35 +1,35 @@ name: Publish Python 🐍 distribution 📦 to PyPI and TestPyPI on: push +name: Build -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v3 - - name: Set up Python versions - uses: actions/setup-python@v4 - with: - python-version: "3.12" +jobs: + build_wheels: + name: Build wheels on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + # macos-13 is an intel runner, macos-14 is apple silicon + os: [ubuntu-latest, ] # windows-latest, macos-13, macos-14] - - name: Install cibuildwheel - run: python -m pip install cibuildwheel==2.14.1 + steps: + - uses: actions/checkout@v4 - name: Build wheels - env: - CIBW_BUILD: "cp312-* cp313-*" - CIBW_SKIP: "pp*" - CIBW_PLATFORM: "manylinux2014_x86_64" # Adjust if needed - run: | - python -m cibuildwheel --output-dir dist + uses: pypa/cibuildwheel@v2.21.3 + # env: + # CIBW_SOME_OPTION: value + # ... + # with: + # package-dir: . + # output-dir: wheelhouse + # config-file: "{package}/pyproject.toml" - - name: Upload built wheels as artifact - uses: actions/upload-artifact@v4 + - uses: actions/upload-artifact@v4 with: - name: built-wheels - path: dist + name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }} + path: ./wheelhouse/*.whl publish-to-pypi: name: >- @@ -49,7 +49,7 @@ jobs: uses: actions/download-artifact@v4.1.7 with: name: python-package-distributions - path: dist/ + path: wheelhouse/ - name: Publish distribution 📦 to PyPI uses: pypa/gh-action-pypi-publish@release/v1 From 9a4fe4e581b96b4b9605243d88e3df76a51d5eb2 Mon Sep 17 00:00:00 2001 From: Joao S O Bueno Date: Sat, 16 Nov 2024 15:47:10 -0300 Subject: [PATCH 14/14] Adjustments --- .github/workflows/pypi.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 0b3ea78..fa20b38 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -1,11 +1,11 @@ name: Publish Python 🐍 distribution 📦 to PyPI and TestPyPI on: push -name: Build +name: publish jobs: - build_wheels: + build: name: Build wheels on ${{ matrix.os }} runs-on: ${{ matrix.os }} strategy: