Skip to content

Commit 6002a28

Browse files
committed
fix(fabric): harden port lock probes and state queue
1 parent c48259e commit 6002a28

File tree

3 files changed

+65
-33
lines changed

3 files changed

+65
-33
lines changed

src/lightning/fabric/utilities/file_lock.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
from abc import ABC, abstractmethod
2121
from contextlib import suppress
2222
from pathlib import Path
23-
from typing import Optional
23+
from types import TracebackType
24+
from typing import Literal, Optional
2425

2526
log = logging.getLogger(__name__)
2627

@@ -75,7 +76,12 @@ def __enter__(self) -> "FileLock":
7576
raise TimeoutError(f"Failed to acquire lock on {self._lock_file} within timeout")
7677
return self
7778

78-
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
79+
def __exit__(
80+
self,
81+
exc_type: Optional[type[BaseException]],
82+
exc_val: Optional[BaseException],
83+
exc_tb: Optional[TracebackType],
84+
) -> Literal[False]:
7985
"""Exit context manager - release lock."""
8086
self.release()
8187
return False # Don't suppress exceptions

src/lightning/fabric/utilities/port_manager.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
from collections.abc import Iterator
2525
from contextlib import contextmanager, suppress
2626
from pathlib import Path
27-
from typing import Optional
27+
from types import TracebackType
28+
from typing import Literal, Optional
2829

2930
from lightning.fabric.utilities.file_lock import create_file_lock
3031
from lightning.fabric.utilities.port_state import PortState
@@ -76,11 +77,16 @@ def _get_lock_dir() -> Path:
7677
except OSError as e:
7778
log.debug(f"Port manager probe file removal failed with {e}; scheduling cleanup")
7879

79-
atexit.register(lambda p=test_file: p.unlink(missing_ok=True))
80+
atexit.register(_cleanup_probe_file, test_file)
8081

8182
return lock_path
8283

8384

85+
def _cleanup_probe_file(path: Path) -> None:
86+
"""Best-effort removal of a temporary probe file at exit."""
87+
path.unlink(missing_ok=True)
88+
89+
8490
def _get_lock_file() -> Path:
8591
"""Get path to the port manager lock file.
8692
@@ -218,38 +224,13 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
218224
with self._lock: # Thread-safety
219225
try:
220226
with self._file_lock: # Process-safety
221-
# Read current state from file
222227
state = self._read_state()
223-
224-
# Try preferred port if specified
225-
if preferred_port is not None and self._is_port_available(preferred_port, state):
226-
port = preferred_port
227-
else:
228-
# Find a free port
229-
port = None
230-
for _ in range(max_attempts):
231-
candidate = self._find_free_port()
232-
if self._is_port_available(candidate, state):
233-
port = candidate
234-
break
235-
236-
if port is None:
237-
# Provide detailed diagnostics
238-
allocated_count = len(state.allocated_ports)
239-
queue_count = len(state.recently_released)
240-
raise RuntimeError(
241-
f"Failed to allocate a free port after {max_attempts} attempts. "
242-
f"Diagnostics: allocated={allocated_count}, recently_released={queue_count}"
243-
)
244-
245-
# Allocate in shared state
228+
port = self._select_port(state, preferred_port, max_attempts)
246229
state.allocate_port(port, pid=os.getpid())
247230
self._write_state(state)
248231

249-
# Update in-memory cache
250232
self._allocated_ports.add(port)
251233

252-
# Log diagnostics if queue utilization is high
253234
queue_count = len(state.recently_released)
254235
if queue_count > 800: # >78% of typical 1024 capacity
255236
log.warning(
@@ -261,7 +242,6 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
261242
return port
262243

263244
except TimeoutError as e:
264-
# File lock timeout - fail fast to prevent state divergence
265245
log.error(
266246
"Failed to acquire file lock for port allocation. "
267247
"Remediation: (1) Retry the operation after a short delay, "
@@ -274,6 +254,30 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
274254
"Check if another process is holding the lock or if the lock file is inaccessible."
275255
) from e
276256

257+
raise RuntimeError("Unexpected error allocating port")
258+
259+
def _select_port(
260+
self,
261+
state: PortState,
262+
preferred_port: Optional[int],
263+
max_attempts: int,
264+
) -> int:
265+
"""Choose an available port based on preference and state."""
266+
if preferred_port is not None and self._is_port_available(preferred_port, state):
267+
return preferred_port
268+
269+
for _ in range(max_attempts):
270+
candidate = self._find_free_port()
271+
if self._is_port_available(candidate, state):
272+
return candidate
273+
274+
allocated_count = len(state.allocated_ports)
275+
queue_count = len(state.recently_released)
276+
raise RuntimeError(
277+
f"Failed to allocate a free port after {max_attempts} attempts. "
278+
f"Diagnostics: allocated={allocated_count}, recently_released={queue_count}"
279+
)
280+
277281
def _is_port_available(self, port: int, state: PortState) -> bool:
278282
"""Check if a port is available for allocation.
279283
@@ -480,7 +484,12 @@ def __enter__(self) -> "PortManager":
480484
"""
481485
return self
482486

483-
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
487+
def __exit__(
488+
self,
489+
exc_type: Optional[type[BaseException]],
490+
exc_val: Optional[BaseException],
491+
exc_tb: Optional[TracebackType],
492+
) -> Literal[False]:
484493
"""Exit context manager - cleanup ports from this process."""
485494
self.release_all()
486495
return False # Don't suppress exceptions

src/lightning/fabric/utilities/port_state.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
# Maximum age for recently released entries (2 hours)
2828
_RECENTLY_RELEASED_MAX_AGE_SECONDS = 7200
2929

30+
# Maximum number of recently released entries to retain
31+
_RECENTLY_RELEASED_MAX_LEN = 1024
32+
3033

3134
@dataclass
3235
class PortAllocation:
@@ -179,6 +182,7 @@ def release_port(self, port: int) -> None:
179182
pid=allocation.pid,
180183
)
181184
self.recently_released.append(entry)
185+
self._trim_recently_released()
182186
# Remove from allocated
183187
del self.allocated_ports[port_str]
184188

@@ -239,6 +243,9 @@ def cleanup_stale_entries(self) -> int:
239243
# Clean up stale recently released entries
240244
original_count = len(self.recently_released)
241245
self.recently_released = [entry for entry in self.recently_released if not entry.is_stale(current_time)]
246+
if len(self.recently_released) > _RECENTLY_RELEASED_MAX_LEN:
247+
# Keep only the most recent entries if stale cleanup still exceeds max length
248+
self.recently_released = self.recently_released[-_RECENTLY_RELEASED_MAX_LEN:]
242249
stale_count += original_count - len(self.recently_released)
243250

244251
return stale_count
@@ -288,12 +295,22 @@ def from_dict(cls, data: dict[str, Any]) -> "PortState":
288295
RecentlyReleasedEntry.from_dict(entry_data) for entry_data in data.get("recently_released", [])
289296
]
290297

291-
return cls(
298+
state = cls(
292299
version=data.get("version", "1.0"),
293300
allocated_ports=allocated_ports,
294301
recently_released=recently_released,
295302
)
296303

304+
state._trim_recently_released()
305+
return state
306+
307+
def _trim_recently_released(self) -> None:
308+
"""Ensure recently released queue stays within configured bound."""
309+
if len(self.recently_released) > _RECENTLY_RELEASED_MAX_LEN:
310+
excess = len(self.recently_released) - _RECENTLY_RELEASED_MAX_LEN
311+
# Remove the oldest entries (front of the list)
312+
self.recently_released = self.recently_released[excess:]
313+
297314

298315
def _is_pid_alive(pid: int) -> bool:
299316
"""Check if a process with given PID is still running.

0 commit comments

Comments
 (0)