Skip to content

Commit 1504dd1

Browse files
committed
test(fabric): bound port manager queue tests and update warning log
1 parent 6002a28 commit 1504dd1

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

src/lightning/fabric/utilities/port_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
233233

234234
queue_count = len(state.recently_released)
235235
if queue_count > 800: # >78% of typical 1024 capacity
236+
utilization_pct = (queue_count / _RECENTLY_RELEASED_PORTS_MAXLEN) * 100
236237
log.warning(
237-
f"Port queue utilization high: {queue_count} entries. "
238+
f"Port queue utilization high: {queue_count} entries ({utilization_pct:.1f}% of capacity). "
238239
f"Allocated port {port}. Active allocations: {len(state.allocated_ports)}"
239240
)
240241

tests/tests_fabric/utilities/test_port_manager.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
# limitations under the License.
1414
"""Tests for the PortManager utility and port allocation integration."""
1515

16+
import collections
1617
import os
1718
import socket
1819
import threading
19-
from collections import Counter
2020

2121
import pytest
2222

23-
from lightning.fabric.plugins.environments.lightning import find_free_network_port
24-
from lightning.fabric.utilities.port_manager import PortManager, get_port_manager
23+
import lightning.fabric.utilities.port_manager as port_manager_module
24+
import lightning.fabric.utilities.port_state as port_state_module
25+
from lightning.fabric.utilities.port_manager import (
26+
PortManager,
27+
find_free_network_port,
28+
get_port_manager,
29+
)
2530

2631
# =============================================================================
2732
# Fixtures
@@ -143,7 +148,7 @@ def allocate_ports():
143148
assert len(set(ports)) == 100, f"Expected 100 unique ports, got {len(set(ports))}"
144149

145150
# Check for any duplicates
146-
counts = Counter(ports)
151+
counts = collections.Counter(ports)
147152
duplicates = {port: count for port, count in counts.items() if count > 1}
148153
assert not duplicates, f"Found duplicate ports: {duplicates}"
149154

@@ -714,9 +719,14 @@ def test_port_manager_recently_released_prevents_immediate_reuse():
714719
manager.release_port(new_port)
715720

716721

717-
def test_port_manager_recently_released_queue_cycles():
722+
def _set_recently_released_limit(monkeypatch, value: int) -> None:
723+
monkeypatch.setattr(port_manager_module, "_RECENTLY_RELEASED_PORTS_MAXLEN", value, raising=True)
724+
monkeypatch.setattr(port_state_module, "_RECENTLY_RELEASED_MAX_LEN", value, raising=True)
725+
726+
727+
def test_port_manager_recently_released_queue_cycles(monkeypatch):
718728
"""Test that recently_released queue cycles after maxlen allocations."""
719-
from lightning.fabric.utilities.port_manager import _RECENTLY_RELEASED_PORTS_MAXLEN
729+
_set_recently_released_limit(monkeypatch, 64)
720730

721731
manager = PortManager()
722732

@@ -727,8 +737,10 @@ def test_port_manager_recently_released_queue_cycles():
727737
# Port should be in recently_released queue
728738
assert first_port in manager._recently_released
729739

740+
queue_limit = port_manager_module._RECENTLY_RELEASED_PORTS_MAXLEN
741+
730742
# Allocate and release many ports to fill the queue beyond maxlen
731-
for _ in range(_RECENTLY_RELEASED_PORTS_MAXLEN + 10):
743+
for _ in range(queue_limit + 10):
732744
port = manager.allocate_port()
733745
manager.release_port(port)
734746

@@ -755,14 +767,20 @@ def test_port_manager_reserve_clears_recently_released():
755767
manager.release_port(port)
756768

757769

758-
def test_port_manager_high_queue_utilization_warning(caplog):
770+
def test_port_manager_high_queue_utilization_warning(monkeypatch, caplog):
759771
"""Test that warning is logged when queue utilization exceeds 80%."""
760772
import logging
761773

774+
_set_recently_released_limit(monkeypatch, 64)
775+
776+
queue_limit = port_manager_module._RECENTLY_RELEASED_PORTS_MAXLEN
777+
trigger_count = int(queue_limit * 0.8) + 1 # Just over 80%
778+
expected_pct = (trigger_count / queue_limit) * 100
779+
762780
manager = PortManager()
763781

764-
# Fill queue to >80% (821/1024 = 80.2%)
765-
for _ in range(821):
782+
# Fill queue to just over 80%
783+
for _ in range(trigger_count):
766784
port = manager.allocate_port()
767785
manager.release_port(port)
768786

@@ -773,4 +791,4 @@ def test_port_manager_high_queue_utilization_warning(caplog):
773791

774792
# Verify warning was logged
775793
assert any("Port queue utilization high" in record.message for record in caplog.records)
776-
assert any("80." in record.message for record in caplog.records) # Should show 80.x%
794+
assert any(f"{expected_pct:.1f}%" in record.message for record in caplog.records)

tests/tests_fabric/utilities/test_port_manager_process_safe.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import pytest
2424

25+
import lightning.fabric.utilities.port_manager as port_manager_module
2526
from lightning.fabric.utilities.file_lock import UnixFileLock, WindowsFileLock, create_file_lock
2627
from lightning.fabric.utilities.port_manager import PortManager, _get_lock_dir, _get_lock_file
2728
from lightning.fabric.utilities.port_state import PortAllocation, PortState
@@ -106,6 +107,43 @@ def test_file_lock_context_manager_timeout(tmpdir):
106107
lock1.release()
107108

108109

110+
def test_get_lock_dir_handles_permission_error(monkeypatch, tmp_path):
111+
"""_get_lock_dir should tolerate probe unlink permission errors and register cleanup."""
112+
113+
monkeypatch.setenv("LIGHTNING_PORT_LOCK_DIR", str(tmp_path))
114+
115+
registered_calls = []
116+
117+
def fake_register(func, *args, **kwargs):
118+
registered_calls.append((func, args, kwargs))
119+
return func
120+
121+
monkeypatch.setattr(port_manager_module.atexit, "register", fake_register)
122+
123+
original_unlink = Path.unlink
124+
call_state = {"count": 0}
125+
126+
def fake_unlink(self, *args, **kwargs):
127+
if self.name.startswith(".lightning_port_manager_write_test_") and call_state["count"] == 0:
128+
call_state["count"] += 1
129+
raise PermissionError("locked")
130+
return original_unlink(self, *args, **kwargs)
131+
132+
monkeypatch.setattr(Path, "unlink", fake_unlink)
133+
134+
lock_dir = _get_lock_dir()
135+
assert Path(lock_dir) == tmp_path
136+
assert registered_calls, "Cleanup should be registered when unlink fails"
137+
138+
cleanup_func, args, kwargs = registered_calls[0]
139+
probe_path = args[0]
140+
assert isinstance(probe_path, Path)
141+
assert probe_path.exists()
142+
143+
cleanup_func(*args, **kwargs)
144+
assert not probe_path.exists()
145+
146+
109147
# =============================================================================
110148
# Tests for PortState
111149
# =============================================================================

0 commit comments

Comments
 (0)