From 88faa86e89d57b14933f1ec947b82d075a665ad0 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 7 Jan 2025 13:02:48 +0100 Subject: [PATCH 1/2] Add support for specifying a bind port hint in the machine spec --- docs/src/_changelog.md | 2 ++ src/cluster.jl | 22 +++++++++++++++------- src/managers.jl | 25 ++++++++++++++++--------- test/sshmanager.jl | 6 ++++++ 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 5d9207f..1796692 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -18,6 +18,8 @@ This documents notable changes in DistributedNext.jl. The format is based on incompatibilities from both libraries being used simultaneously ([#10]). - [`other_workers()`](@ref) and [`other_procs()`](@ref) were implemented and exported ([#18]). +- The `SSHManager` now supports specifying a bind port hint in the machine + specification ([#19], see the [`addprocs()`](@ref) docs). ## [v1.0.0] - 2024-12-02 diff --git a/src/cluster.jl b/src/cluster.jl index 10c53ac..eb4805c 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -214,6 +214,7 @@ end mutable struct LocalProcess id::Int bind_addr::String + bind_port_hint::Int bind_port::Int cookie::String LocalProcess() = new(1) @@ -257,8 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std init_worker(cookie) interface = IPv4(LPROC.bind_addr) if LPROC.bind_port == 0 - port_hint = 9000 + (getpid() % 1000) - (port, sock) = listenany(interface, port_hint) + (port, sock) = listenany(interface, LPROC.bind_port_hint) LPROC.bind_port = Int(port) else sock = listen(interface, LPROC.bind_port) @@ -1318,17 +1318,24 @@ end # initialize the local proc network address / port function init_bind_addr() opts = JLOptions() + bind_port_hint = 9000 + (getpid() % 1000) + bind_port = 0 + if opts.bindto != C_NULL bind_to = split(unsafe_string(opts.bindto), ":") bind_addr = string(parse(IPAddr, bind_to[1])) if length(bind_to) > 1 - bind_port = parse(Int,bind_to[2]) - else - bind_port = 0 + port_str = bind_to[2] + if startswith(port_str, '[') + if !endswith(port_str, ']') + error("Malformed bind port string, please see the addprocs documentation for the formatting rules: $(port_str)") + end + bind_port_hint = parse(Int, port_str[2:end - 1]) + else + bind_port = parse(Int, port_str) + end end else - bind_port = 0 - interfaces = _get_interfaces(IPv4) if isempty(interfaces) # Include IPv6 interfaces if there are no IPv4 ones @@ -1355,6 +1362,7 @@ function init_bind_addr() global LPROC LPROC.bind_addr = bind_addr LPROC.bind_port = bind_port + LPROC.bind_port_hint = bind_port_hint end using Random: randstring diff --git a/src/managers.jl b/src/managers.jl index 15e7bd7..dad6037 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -56,23 +56,30 @@ arguments (see below). In particular, the `exename` keyword can be used to speci the path to the `julia` binary on the remote machine(s). `machines` is a vector of "machine specifications" which are given as strings of -the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port` -to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect -to this worker at the specified `bind_addr` and `port`. +the form `[user@]host[:ssh_port] [bind_addr[:bind_port]]`. `user` defaults to +current user and `ssh_port` to the standard SSH port. If +`[bind_addr[:bind_port]]` is specified, other workers will connect to this +worker at the specified `bind_addr` and `bind_port`. `bind_port` can be a +specific port like in `addr:9000`, but it can also specify a port hint by +enclosing it in brackets like `addr:[9000]`. It is possible to launch multiple processes on a remote host by using a tuple in the `machines` vector or the form `(machine_spec, count)`, where `count` is the number of workers to be launched on the specified host. Passing `:auto` as the worker count will -launch as many workers as the number of CPU threads on the remote host. +launch as many workers as the number of CPU threads on the remote host. If the +`bind_port` is specified then the first worker will bind to `bind_port` and all +other workers on the host will use `bind_port` as a port hint. **Examples**: ```julia addprocs([ - "remote1", # one worker on 'remote1' logging in with the current username - "user@remote2", # one worker on 'remote2' logging in with the 'user' username - "user@remote3:2222", # specifying SSH port to '2222' for 'remote3' - ("user@remote4", 4), # launch 4 workers on 'remote4' - ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5' + "remote1", # one worker on 'remote1' logging in with the current username + "user@remote2", # one worker on 'remote2' logging in with the 'user' username + "user@remote3:2222", # specifying SSH port to '2222' for 'remote3' + "user@remote4 10.1.1.1:8000" # specify the address for the worker to bind to on 'remote4' + "user@remote5 10.1.1.1:[8000]" # same as above, but with a port hint instead of a specific port + ("user@remote4", 4), # launch 4 workers on 'remote4' + ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5' ]) ``` diff --git a/test/sshmanager.jl b/test/sshmanager.jl index 956dd87..476844d 100644 --- a/test/sshmanager.jl +++ b/test/sshmanager.jl @@ -58,6 +58,12 @@ end @test length(new_pids) == 5 test_n_remove_pids(new_pids) + print("\nssh addprocs with a port hint\n") + new_pids = addprocs_with_testenv(["localhost 127.0.0.1:[8000]"]; sshflags=sshflags) + worker = DistributedNext.worker_from_id(only(new_pids)) + @test 8000 >= worker.config.port < 9000 + test_n_remove_pids(new_pids) + print("\nssh addprocs with tunnel\n") new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags) @test length(new_pids) == num_workers From cdabed6e3e6e682ebc085463f0922e8dbdf81879 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 7 Jan 2025 13:03:28 +0100 Subject: [PATCH 2/2] Use the bind port as a hint for multiple workers on the same host Otherwise all the workers would try to bind to the same port, which would cause an error. --- docs/src/_changelog.md | 4 ++++ src/cluster.jl | 23 ++++++++++++++++++++++- test/sshmanager.jl | 5 +++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 1796692..6af0a8a 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -11,6 +11,10 @@ This documents notable changes in DistributedNext.jl. The format is based on ### Fixed - Fixed a cause of potential hangs when exiting the process ([#16]). +- Fixed cases like `addprocs([("machine 10.1.1.1:9000", 2)])` where the bind + port is specified. Previously this would cause errors when the workers all + tried to bind to the same port, now all additional workers will treat the bind + port as a port hint ([#19]). ### Added - A watcher mechanism has been added to detect when both the Distributed stdlib diff --git a/src/cluster.jl b/src/cluster.jl index eb4805c..3ca82d9 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -733,8 +733,29 @@ function launch_additional(np::Integer, cmd::Cmd) io_objs = Vector{Any}(undef, np) addresses = Vector{Any}(undef, np) + worker_cmd = Cmd(cmd) + bind_idx = findfirst(==("--bind-to"), cmd) + if !isnothing(bind_idx) + # The actual bind spec will be the next argument + bind_idx += 1 + + bind_addr = worker_cmd[bind_idx] + parts = split(bind_addr, ':') + if length(parts) == 2 + port_str = parts[2] + + # If the port is not specified as a port hint then we convert it + # to a hint, otherwise the workers will try to bind to the same + # port and error. + if !startswith(port_str, '[') + new_bind_addr = "$(parts[1]):[$(port_str)]" + worker_cmd.exec[bind_idx] = new_bind_addr + end + end + end + for i in 1:np - io = open(detach(cmd), "r+") + io = open(detach(worker_cmd), "r+") write_cookie(io) io_objs[i] = io.out end diff --git a/test/sshmanager.jl b/test/sshmanager.jl index 476844d..383e5fb 100644 --- a/test/sshmanager.jl +++ b/test/sshmanager.jl @@ -64,6 +64,11 @@ end @test 8000 >= worker.config.port < 9000 test_n_remove_pids(new_pids) + print("\nssh addprocs with multiple workers and port specified\n") + new_pids = addprocs_with_testenv([("localhost 127.0.0.1:8000", 2)]; sshflags=sshflags) + @test length(new_pids) == 2 + test_n_remove_pids(new_pids) + print("\nssh addprocs with tunnel\n") new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags) @test length(new_pids) == num_workers