Skip to content

Commit 252396e

Browse files
committed
test(fabric): bound port manager queue tests and update warning log
1 parent 6002a28 commit 252396e

File tree

4 files changed

+119
-36
lines changed

4 files changed

+119
-36
lines changed

src/lightning/fabric/plugins/environments/lightning.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
from typing_extensions import override
1818

1919
from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
20-
from lightning.fabric.utilities.port_manager import get_port_manager
20+
from lightning.fabric.utilities.port_manager import (
21+
find_free_network_port as _pm_find_free_network_port,
22+
)
23+
from lightning.fabric.utilities.port_manager import (
24+
get_port_manager,
25+
)
2126
from lightning.fabric.utilities.rank_zero import rank_zero_only
2227

2328

@@ -64,7 +69,7 @@ def main_address(self) -> str:
6469
def main_port(self) -> int:
6570
if self._main_port == -1:
6671
self._main_port = (
67-
int(os.environ["MASTER_PORT"]) if "MASTER_PORT" in os.environ else find_free_network_port()
72+
int(os.environ["MASTER_PORT"]) if "MASTER_PORT" in os.environ else _pm_find_free_network_port()
6873
)
6974
return self._main_port
7075

@@ -115,27 +120,8 @@ def teardown(self) -> None:
115120
def find_free_network_port() -> int:
116121
"""Finds a free port on localhost.
117122
118-
It is useful in single-node training when we don't want to connect to a real main node but have to set the
119-
`MASTER_PORT` environment variable.
120-
121-
The allocated port is reserved and won't be returned by subsequent calls until it's explicitly released.
122-
123-
Returns:
124-
A port number that is reserved and free at the time of allocation
123+
Deprecated alias. Use :func:`lightning.fabric.utilities.port_manager.find_free_network_port` instead.
125124
126125
"""
127-
# If an external launcher already specified a MASTER_PORT (for example, torch.distributed.spawn or
128-
# multiprocessing helpers), reserve it through the port manager so no other test reuses the same number.
129-
if "MASTER_PORT" in os.environ:
130-
master_port_str = os.environ["MASTER_PORT"]
131-
try:
132-
existing_port = int(master_port_str)
133-
except ValueError:
134-
pass
135-
else:
136-
port_manager = get_port_manager()
137-
if port_manager.reserve_existing_port(existing_port):
138-
return existing_port
139-
140-
port_manager = get_port_manager()
141-
return port_manager.allocate_port()
126+
127+
return _pm_find_free_network_port()

src/lightning/fabric/utilities/port_manager.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
233233

234234
queue_count = len(state.recently_released)
235235
if queue_count > 800: # >78% of typical 1024 capacity
236+
utilization_pct = (queue_count / _RECENTLY_RELEASED_PORTS_MAXLEN) * 100
236237
log.warning(
237-
f"Port queue utilization high: {queue_count} entries. "
238+
f"Port queue utilization high: {queue_count} entries ({utilization_pct:.1f}% of capacity). "
238239
f"Allocated port {port}. Active allocations: {len(state.allocated_ports)}"
239240
)
240241

@@ -550,3 +551,26 @@ def get_port_manager() -> PortManager:
550551
if _port_manager is None:
551552
_port_manager = PortManager()
552553
return _port_manager
554+
555+
556+
def find_free_network_port() -> int:
557+
"""Find and reserve a free network port using the global port manager.
558+
559+
Returns:
560+
A port number that is reserved and free at the time of allocation.
561+
562+
"""
563+
564+
if "MASTER_PORT" in os.environ:
565+
master_port_str = os.environ["MASTER_PORT"]
566+
try:
567+
existing_port = int(master_port_str)
568+
except ValueError:
569+
pass
570+
else:
571+
port_manager = get_port_manager()
572+
if port_manager.reserve_existing_port(existing_port):
573+
return existing_port
574+
575+
port_manager = get_port_manager()
576+
return port_manager.allocate_port()

tests/tests_fabric/utilities/test_port_manager.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,23 @@
1313
# limitations under the License.
1414
"""Tests for the PortManager utility and port allocation integration."""
1515

16+
import collections
1617
import os
1718
import socket
1819
import threading
19-
from collections import Counter
2020

2121
import pytest
2222

23-
from lightning.fabric.plugins.environments.lightning import find_free_network_port
24-
from lightning.fabric.utilities.port_manager import PortManager, get_port_manager
23+
import lightning.fabric.utilities.port_manager as port_manager_module
24+
import lightning.fabric.utilities.port_state as port_state_module
25+
from lightning.fabric.plugins.environments.lightning import (
26+
find_free_network_port as env_find_free_network_port,
27+
)
28+
from lightning.fabric.utilities.port_manager import (
29+
PortManager,
30+
find_free_network_port,
31+
get_port_manager,
32+
)
2533

2634
# =============================================================================
2735
# Fixtures
@@ -143,7 +151,7 @@ def allocate_ports():
143151
assert len(set(ports)) == 100, f"Expected 100 unique ports, got {len(set(ports))}"
144152

145153
# Check for any duplicates
146-
counts = Counter(ports)
154+
counts = collections.Counter(ports)
147155
duplicates = {port: count for port, count in counts.items() if count > 1}
148156
assert not duplicates, f"Found duplicate ports: {duplicates}"
149157

@@ -495,6 +503,20 @@ def allocate():
495503
manager.release_port(port)
496504

497505

506+
def test_find_free_network_port_alias(monkeypatch):
507+
"""Legacy environment alias should reuse the port manager backed implementation."""
508+
509+
manager = get_port_manager()
510+
manager.release_all()
511+
512+
port = env_find_free_network_port()
513+
514+
try:
515+
assert port in manager._allocated_ports
516+
finally:
517+
manager.release_port(port)
518+
519+
498520
def test_port_allocation_simulates_distributed_test_lifecycle():
499521
"""Simulate the lifecycle of a distributed test with port allocation and release."""
500522
manager = get_port_manager()
@@ -714,9 +736,14 @@ def test_port_manager_recently_released_prevents_immediate_reuse():
714736
manager.release_port(new_port)
715737

716738

717-
def test_port_manager_recently_released_queue_cycles():
739+
def _set_recently_released_limit(monkeypatch, value: int) -> None:
740+
monkeypatch.setattr(port_manager_module, "_RECENTLY_RELEASED_PORTS_MAXLEN", value, raising=True)
741+
monkeypatch.setattr(port_state_module, "_RECENTLY_RELEASED_MAX_LEN", value, raising=True)
742+
743+
744+
def test_port_manager_recently_released_queue_cycles(monkeypatch):
718745
"""Test that recently_released queue cycles after maxlen allocations."""
719-
from lightning.fabric.utilities.port_manager import _RECENTLY_RELEASED_PORTS_MAXLEN
746+
_set_recently_released_limit(monkeypatch, 64)
720747

721748
manager = PortManager()
722749

@@ -727,8 +754,10 @@ def test_port_manager_recently_released_queue_cycles():
727754
# Port should be in recently_released queue
728755
assert first_port in manager._recently_released
729756

757+
queue_limit = port_manager_module._RECENTLY_RELEASED_PORTS_MAXLEN
758+
730759
# Allocate and release many ports to fill the queue beyond maxlen
731-
for _ in range(_RECENTLY_RELEASED_PORTS_MAXLEN + 10):
760+
for _ in range(queue_limit + 10):
732761
port = manager.allocate_port()
733762
manager.release_port(port)
734763

@@ -755,14 +784,20 @@ def test_port_manager_reserve_clears_recently_released():
755784
manager.release_port(port)
756785

757786

758-
def test_port_manager_high_queue_utilization_warning(caplog):
787+
def test_port_manager_high_queue_utilization_warning(monkeypatch, caplog):
759788
"""Test that warning is logged when queue utilization exceeds 80%."""
760789
import logging
761790

791+
_set_recently_released_limit(monkeypatch, 64)
792+
793+
queue_limit = port_manager_module._RECENTLY_RELEASED_PORTS_MAXLEN
794+
trigger_count = int(queue_limit * 0.8) + 1 # Just over 80%
795+
expected_pct = (trigger_count / queue_limit) * 100
796+
762797
manager = PortManager()
763798

764-
# Fill queue to >80% (821/1024 = 80.2%)
765-
for _ in range(821):
799+
# Fill queue to just over 80%
800+
for _ in range(trigger_count):
766801
port = manager.allocate_port()
767802
manager.release_port(port)
768803

@@ -773,4 +808,4 @@ def test_port_manager_high_queue_utilization_warning(caplog):
773808

774809
# Verify warning was logged
775810
assert any("Port queue utilization high" in record.message for record in caplog.records)
776-
assert any("80." in record.message for record in caplog.records) # Should show 80.x%
811+
assert any(f"{expected_pct:.1f}%" in record.message for record in caplog.records)

tests/tests_fabric/utilities/test_port_manager_process_safe.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import pytest
2424

25+
import lightning.fabric.utilities.port_manager as port_manager_module
2526
from lightning.fabric.utilities.file_lock import UnixFileLock, WindowsFileLock, create_file_lock
2627
from lightning.fabric.utilities.port_manager import PortManager, _get_lock_dir, _get_lock_file
2728
from lightning.fabric.utilities.port_state import PortAllocation, PortState
@@ -106,6 +107,43 @@ def test_file_lock_context_manager_timeout(tmpdir):
106107
lock1.release()
107108

108109

110+
def test_get_lock_dir_handles_permission_error(monkeypatch, tmp_path):
111+
"""_get_lock_dir should tolerate probe unlink permission errors and register cleanup."""
112+
113+
monkeypatch.setenv("LIGHTNING_PORT_LOCK_DIR", str(tmp_path))
114+
115+
registered_calls = []
116+
117+
def fake_register(func, *args, **kwargs):
118+
registered_calls.append((func, args, kwargs))
119+
return func
120+
121+
monkeypatch.setattr(port_manager_module.atexit, "register", fake_register)
122+
123+
original_unlink = Path.unlink
124+
call_state = {"count": 0}
125+
126+
def fake_unlink(self, *args, **kwargs):
127+
if self.name.startswith(".lightning_port_manager_write_test_") and call_state["count"] == 0:
128+
call_state["count"] += 1
129+
raise PermissionError("locked")
130+
return original_unlink(self, *args, **kwargs)
131+
132+
monkeypatch.setattr(Path, "unlink", fake_unlink)
133+
134+
lock_dir = _get_lock_dir()
135+
assert Path(lock_dir) == tmp_path
136+
assert registered_calls, "Cleanup should be registered when unlink fails"
137+
138+
cleanup_func, args, kwargs = registered_calls[0]
139+
probe_path = args[0]
140+
assert isinstance(probe_path, Path)
141+
assert probe_path.exists()
142+
143+
cleanup_func(*args, **kwargs)
144+
assert not probe_path.exists()
145+
146+
109147
# =============================================================================
110148
# Tests for PortState
111149
# =============================================================================

0 commit comments

Comments
 (0)