Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bind port hints #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ This documents notable changes in DistributedNext.jl. The format is based on

### Fixed
- Fixed a cause of potential hangs when exiting the process ([#16]).
- Fixed cases like `addprocs([("machine 10.1.1.1:9000", 2)])` where the bind
port is specified. Previously this would cause errors when the workers all
tried to bind to the same port, now all additional workers will treat the bind
port as a port hint ([#19]).

### Added
- A watcher mechanism has been added to detect when both the Distributed stdlib
and DistributedNext may be active and adding workers. This should help prevent
incompatibilities from both libraries being used simultaneously ([#10]).
- [`other_workers()`](@ref) and [`other_procs()`](@ref) were implemented and
exported ([#18]).
- The `SSHManager` now supports specifying a bind port hint in the machine
specification ([#19], see the [`addprocs()`](@ref) docs).

## [v1.0.0] - 2024-12-02

Expand Down
45 changes: 37 additions & 8 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ end
mutable struct LocalProcess
id::Int
bind_addr::String
bind_port_hint::Int
bind_port::Int
cookie::String
LocalProcess() = new(1)
Expand Down Expand Up @@ -257,8 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
init_worker(cookie)
interface = IPv4(LPROC.bind_addr)
if LPROC.bind_port == 0
port_hint = 9000 + (getpid() % 1000)
(port, sock) = listenany(interface, port_hint)
(port, sock) = listenany(interface, LPROC.bind_port_hint)
LPROC.bind_port = Int(port)
else
sock = listen(interface, LPROC.bind_port)
Expand Down Expand Up @@ -733,8 +733,29 @@ function launch_additional(np::Integer, cmd::Cmd)
io_objs = Vector{Any}(undef, np)
addresses = Vector{Any}(undef, np)

worker_cmd = Cmd(cmd)
bind_idx = findfirst(==("--bind-to"), cmd)
if !isnothing(bind_idx)
# The actual bind spec will be the next argument
bind_idx += 1

bind_addr = worker_cmd[bind_idx]
parts = split(bind_addr, ':')
if length(parts) == 2
port_str = parts[2]

# If the port is not specified as a port hint then we convert it
# to a hint, otherwise the workers will try to bind to the same
# port and error.
if !startswith(port_str, '[')
new_bind_addr = "$(parts[1]):[$(port_str)]"
worker_cmd.exec[bind_idx] = new_bind_addr
end
end
end

for i in 1:np
io = open(detach(cmd), "r+")
io = open(detach(worker_cmd), "r+")
write_cookie(io)
io_objs[i] = io.out
end
Expand Down Expand Up @@ -1318,17 +1339,24 @@ end
# initialize the local proc network address / port
function init_bind_addr()
opts = JLOptions()
bind_port_hint = 9000 + (getpid() % 1000)
bind_port = 0

if opts.bindto != C_NULL
bind_to = split(unsafe_string(opts.bindto), ":")
bind_addr = string(parse(IPAddr, bind_to[1]))
if length(bind_to) > 1
bind_port = parse(Int,bind_to[2])
else
bind_port = 0
port_str = bind_to[2]
if startswith(port_str, '[')
if !endswith(port_str, ']')
error("Malformed bind port string, please see the addprocs documentation for the formatting rules: $(port_str)")
end
bind_port_hint = parse(Int, port_str[2:end - 1])
else
bind_port = parse(Int, port_str)
end
end
else
bind_port = 0

interfaces = _get_interfaces(IPv4)
if isempty(interfaces)
# Include IPv6 interfaces if there are no IPv4 ones
Expand All @@ -1355,6 +1383,7 @@ function init_bind_addr()
global LPROC
LPROC.bind_addr = bind_addr
LPROC.bind_port = bind_port
LPROC.bind_port_hint = bind_port_hint
end

using Random: randstring
Expand Down
25 changes: 16 additions & 9 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,30 @@ arguments (see below). In particular, the `exename` keyword can be used to speci
the path to the `julia` binary on the remote machine(s).

`machines` is a vector of "machine specifications" which are given as strings of
the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port`
to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect
to this worker at the specified `bind_addr` and `port`.
the form `[user@]host[:ssh_port] [bind_addr[:bind_port]]`. `user` defaults to
current user and `ssh_port` to the standard SSH port. If
`[bind_addr[:bind_port]]` is specified, other workers will connect to this
worker at the specified `bind_addr` and `bind_port`. `bind_port` can be a
specific port like in `addr:9000`, but it can also specify a port hint by
enclosing it in brackets like `addr:[9000]`.

It is possible to launch multiple processes on a remote host by using a tuple in the
`machines` vector or the form `(machine_spec, count)`, where `count` is the number of
workers to be launched on the specified host. Passing `:auto` as the worker count will
launch as many workers as the number of CPU threads on the remote host.
launch as many workers as the number of CPU threads on the remote host. If the
`bind_port` is specified then the first worker will bind to `bind_port` and all
other workers on the host will use `bind_port` as a port hint.

**Examples**:
```julia
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
("user@remote4", 4), # launch 4 workers on 'remote4'
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
"remote1", # one worker on 'remote1' logging in with the current username
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
"user@remote4 10.1.1.1:8000" # specify the address for the worker to bind to on 'remote4'
"user@remote5 10.1.1.1:[8000]" # same as above, but with a port hint instead of a specific port
("user@remote4", 4), # launch 4 workers on 'remote4'
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])
```

Expand Down
11 changes: 11 additions & 0 deletions test/sshmanager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ end
@test length(new_pids) == 5
test_n_remove_pids(new_pids)

print("\nssh addprocs with a port hint\n")
new_pids = addprocs_with_testenv(["localhost 127.0.0.1:[8000]"]; sshflags=sshflags)
worker = DistributedNext.worker_from_id(only(new_pids))
@test 8000 >= worker.config.port < 9000
test_n_remove_pids(new_pids)

print("\nssh addprocs with multiple workers and port specified\n")
new_pids = addprocs_with_testenv([("localhost 127.0.0.1:8000", 2)]; sshflags=sshflags)
@test length(new_pids) == 2
test_n_remove_pids(new_pids)

print("\nssh addprocs with tunnel\n")
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags)
@test length(new_pids) == num_workers
Expand Down
Loading