Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
769 changes: 769 additions & 0 deletions docs/source-fabric/advanced/port_manager_design.md

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions docs/source-fabric/levels/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<../advanced/distributed_communication>
<../advanced/multiple_setup>
<../advanced/compile>
<../advanced/port_manager_design>
<../advanced/model_parallel/fsdp>
<../guide/checkpoint/distributed_checkpoint>

Expand Down Expand Up @@ -59,6 +60,14 @@ Advanced skills
:height: 170
:tag: advanced

.. displayitem::
:header: Coordinate distributed ports safely
:description: Learn how Lightning Fabric manages process-safe port allocation with file-backed state
:button_link: ../advanced/port_manager_design.html
:col_css: col-md-4
:height: 170
:tag: advanced

.. displayitem::
:header: Save and load very large models
:description: Save and load very large models efficiently with distributed checkpoints
Expand Down
34 changes: 10 additions & 24 deletions src/lightning/fabric/plugins/environments/lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from typing_extensions import override

from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
from lightning.fabric.utilities.port_manager import get_port_manager
from lightning.fabric.utilities.port_manager import (
find_free_network_port as _pm_find_free_network_port,
)
from lightning.fabric.utilities.port_manager import (
get_port_manager,
)
from lightning.fabric.utilities.rank_zero import rank_zero_only


Expand Down Expand Up @@ -64,7 +69,7 @@ def main_address(self) -> str:
def main_port(self) -> int:
if self._main_port == -1:
self._main_port = (
int(os.environ["MASTER_PORT"]) if "MASTER_PORT" in os.environ else find_free_network_port()
int(os.environ["MASTER_PORT"]) if "MASTER_PORT" in os.environ else _pm_find_free_network_port()
)
return self._main_port

Expand Down Expand Up @@ -115,27 +120,8 @@ def teardown(self) -> None:
def find_free_network_port() -> int:
"""Finds a free port on localhost.

It is useful in single-node training when we don't want to connect to a real main node but have to set the
`MASTER_PORT` environment variable.

The allocated port is reserved and won't be returned by subsequent calls until it's explicitly released.

Returns:
A port number that is reserved and free at the time of allocation
Deprecated alias. Use :func:`lightning.fabric.utilities.port_manager.find_free_network_port` instead.

"""
# If an external launcher already specified a MASTER_PORT (for example, torch.distributed.spawn or
# multiprocessing helpers), reserve it through the port manager so no other test reuses the same number.
if "MASTER_PORT" in os.environ:
master_port_str = os.environ["MASTER_PORT"]
try:
existing_port = int(master_port_str)
except ValueError:
pass
else:
port_manager = get_port_manager()
if port_manager.reserve_existing_port(existing_port):
return existing_port

port_manager = get_port_manager()
return port_manager.allocate_port()

return _pm_find_free_network_port()
227 changes: 227 additions & 0 deletions src/lightning/fabric/utilities/file_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Platform-abstracted file locking for cross-process coordination."""

import logging
import os
import sys
import time
from abc import ABC, abstractmethod
from contextlib import suppress
from pathlib import Path
from types import TracebackType
from typing import Literal, Optional

log = logging.getLogger(__name__)


class FileLock(ABC):
"""Abstract base class for platform-specific file locking.

File locks enable process-safe coordination by providing exclusive access to shared resources across multiple
processes. This abstract interface allows platform-specific implementations while maintaining a consistent API.

"""

def __init__(self, lock_file: Path) -> None:
"""Initialize the file lock.

Args:
lock_file: Path to the lock file

"""
self._lock_file = lock_file
self._fd: Optional[int] = None
self._is_locked = False

@abstractmethod
def acquire(self, timeout: float = 30.0) -> bool:
"""Acquire the lock, blocking up to timeout seconds.

Args:
timeout: Maximum seconds to wait for lock acquisition

Returns:
True if lock was acquired, False if timeout occurred

"""

@abstractmethod
def release(self) -> None:
"""Release the lock if held."""

def is_locked(self) -> bool:
"""Check if this instance currently holds the lock.

Returns:
True if lock is currently held by this instance

"""
return self._is_locked

def __enter__(self) -> "FileLock":
"""Enter context manager - acquire lock."""
if not self.acquire():
raise TimeoutError(f"Failed to acquire lock on {self._lock_file} within timeout")
return self

def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Literal[False]:
"""Exit context manager - release lock."""
self.release()
return False # Don't suppress exceptions

def __del__(self) -> None:
"""Cleanup - ensure lock is released and file descriptor closed."""
if self._is_locked:
with suppress(Exception):
self.release()

if self._fd is not None:
with suppress(Exception):
os.close(self._fd)


class UnixFileLock(FileLock):
"""File locking using fcntl.flock for Unix-like systems (Linux, macOS).

Uses fcntl.flock() which provides advisory locking. This implementation uses LOCK_EX (exclusive lock) with LOCK_NB
(non-blocking) for timeout support.

"""

def acquire(self, timeout: float = 30.0) -> bool:
"""Acquire exclusive lock using fcntl.flock.

Args:
timeout: Maximum seconds to wait for lock

Returns:
True if lock acquired, False if timeout occurred

"""
import fcntl

# Ensure lock file exists and open it
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
self._lock_file.touch(exist_ok=True)

if self._fd is None:
self._fd = os.open(str(self._lock_file), os.O_RDWR | os.O_CREAT)

start_time = time.time()
while time.time() - start_time < timeout:
try:
# Try to acquire exclusive lock non-blockingly
fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
self._is_locked = True
return True
except OSError:
# Lock held by another process, wait and retry
time.sleep(0.1)

# Timeout - log warning
elapsed = time.time() - start_time
log.warning(f"Lock acquisition timeout after {elapsed:.1f}s for {self._lock_file}")
return False

def release(self) -> None:
"""Release the lock using fcntl.flock."""
if not self._is_locked or self._fd is None:
return

import fcntl

try:
fcntl.flock(self._fd, fcntl.LOCK_UN)
self._is_locked = False
except OSError as e:
log.warning(f"Error releasing lock on {self._lock_file}: {e}")


class WindowsFileLock(FileLock):
"""File locking using msvcrt.locking for Windows systems.

Uses msvcrt.locking() which provides mandatory locking on Windows. This implementation uses LK_NBLCK (non-blocking
exclusive lock) for timeout support.

"""

def acquire(self, timeout: float = 30.0) -> bool:
"""Acquire exclusive lock using msvcrt.locking.

Args:
timeout: Maximum seconds to wait for lock

Returns:
True if lock acquired, False if timeout occurred

"""
import msvcrt

# Ensure lock file exists and open it
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
self._lock_file.touch(exist_ok=True)

if self._fd is None:
self._fd = os.open(str(self._lock_file), os.O_RDWR | os.O_CREAT)

start_time = time.time()
while time.time() - start_time < timeout:
try:
# Try to lock 1 byte at file position 0
msvcrt.locking(self._fd, msvcrt.LK_NBLCK, 1)
self._is_locked = True
return True
except OSError:
# Lock held by another process, wait and retry
time.sleep(0.1)

# Timeout - log warning
elapsed = time.time() - start_time
log.warning(f"Lock acquisition timeout after {elapsed:.1f}s for {self._lock_file}")
return False

def release(self) -> None:
"""Release the lock using msvcrt.locking."""
if not self._is_locked or self._fd is None:
return

import msvcrt

try:
# Unlock the byte we locked
msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1)
self._is_locked = False
except OSError as e:
log.warning(f"Error releasing lock on {self._lock_file}: {e}")


def create_file_lock(lock_file: Path) -> FileLock:
"""Factory function to create platform-appropriate file lock.

Args:
lock_file: Path to the lock file

Returns:
Platform-specific FileLock instance

"""
if sys.platform == "win32":
return WindowsFileLock(lock_file)
return UnixFileLock(lock_file)
Loading
Loading