From 1c16bc6f6281947615f64d81689738e17ff57196 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sat, 14 Oct 2023 10:18:56 -0700 Subject: [PATCH 1/8] Add gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..df02284 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +Manifest.toml +*.swp From 47c9f3e524c135cc12edd450bf790da908c2fb54 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 13 Sep 2021 13:53:41 -0400 Subject: [PATCH 2/8] Make worker state variable threadsafe --- src/cluster.jl | 55 ++++++++++++++++++++++++++-------- src/managers.jl | 2 +- test/distributed_exec.jl | 5 +++- test/threads.jl | 64 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 test/threads.jl diff --git a/src/cluster.jl b/src/cluster.jl index 5712451..b75caa2 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -100,9 +100,9 @@ mutable struct Worker add_msgs::Array{Any,1} @atomic gcflag::Bool state::WorkerState - c_state::Condition # wait for state changes - ct_time::Float64 # creation time - conn_func::Any # used to setup connections lazily + c_state::Threads.Condition # wait for state changes, lock for state + ct_time::Float64 # creation time + conn_func::Any # used to setup connections lazily r_stream::IO w_stream::IO @@ -134,7 +134,7 @@ mutable struct Worker if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func) + w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func) w.initialized = Event() register_worker(w) w @@ -144,12 +144,16 @@ mutable struct Worker end function set_worker_state(w, state) - w.state = state - notify(w.c_state; all=true) + lock(w.c_state) do + w.state = state + notify(w.c_state; all=true) + end end function check_worker_state(w::Worker) + lock(w.c_state) if w.state === W_CREATED + unlock(w.c_state) if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -169,6 +173,8 @@ function check_worker_state(w::Worker) errormonitor(t) wait_for_conn(w) end + else + unlock(w.c_state) end end @@ -187,13 +193,25 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) + lock(w.c_state) if w.state === W_CREATED + unlock(w.c_state) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - @async (sleep(timeout); notify(w.c_state; all=true)) - wait(w.c_state) - w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + T = Threads.@spawn begin + sleep($timeout) + lock(w.c_state) do + notify(w.c_state; all=true) + end + end + errormonitor(T) + lock(w.c_state) do + wait(w.c_state) + w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + end + else + unlock(w.c_state) end nothing end @@ -491,7 +509,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...) while true if isempty(launched) istaskdone(t_launch) && break - @async (sleep(1); notify(launch_ntfy)) + @async begin + sleep(1) + notify(launch_ntfy) + end wait(launch_ntfy) end @@ -645,7 +666,12 @@ function create_worker(manager, wconfig) # require the value of config.connect_at which is set only upon connection completion for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) - (jw.state === W_CREATED) && wait(jw.c_state) + # wait for wl to join + lock(jw.c_state) do + if jw.state === W_CREATED + wait(jw.c_state) + end + end push!(join_list, jw) end end @@ -668,7 +694,12 @@ function create_worker(manager, wconfig) end for wl in wlist - (wl.state === W_CREATED) && wait(wl.c_state) + lock(wl.c_state) do + if wl.state === W_CREATED + # wait for wl to join + wait(wl.c_state) + end + end push!(join_list, wl) end end diff --git a/src/managers.jl b/src/managers.jl index b667675..905322d 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - @async try + @async try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index d80d642..900a438 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1972,5 +1972,8 @@ end # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. -nprocs() > 1 && rmprocs(workers()) +if nprocs() > 1 + rmprocs(workers()) +end +include("threads.jl") include("topology.jl") diff --git a/test/threads.jl b/test/threads.jl new file mode 100644 index 0000000..385c91f --- /dev/null +++ b/test/threads.jl @@ -0,0 +1,64 @@ +using Test +using Distributed, Base.Threads +using Base.Iterators: product + +exeflags = ("--startup-file=no", + "--check-bounds=yes", + "--depwarn=error", + "--threads=2") + +function call_on(f, wid, tid) + remotecall(wid) do + t = Task(f) + ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid - 1) + schedule(t) + @assert threadid(t) == tid + t + end +end + +# Run function on process holding the data to only serialize the result of f. +# This becomes useful for things that cannot be serialized (e.g. running tasks) +# or that would be unnecessarily big if serialized. +fetch_from_owner(f, rr) = remotecall_fetch(f ∘ fetch, rr.where, rr) + +isdone(rr) = fetch_from_owner(istaskdone, rr) +isfailed(rr) = fetch_from_owner(istaskfailed, rr) + +@testset "RemoteChannel allows put!/take! from thread other than 1" begin + ws = ts = product(1:2, 1:2) + @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws + @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts + # We want (the default) laziness, so that we wait for `Worker.c_state`! + procs_added = addprocs(2; exeflags, lazy=true) + @everywhere procs_added using Base.Threads + + p1 = procs_added[w1] + p2 = procs_added[w2] + chan_id = first(procs_added) + chan = RemoteChannel(chan_id) + send = call_on(p1, t1) do + put!(chan, nothing) + end + recv = call_on(p2, t2) do + take!(chan) + end + + # Wait on the spawned tasks on the owner. Note that we use + # timedwait() instead of @sync to avoid deadlocks. + t1 = Threads.@spawn fetch_from_owner(wait, recv) + t2 = Threads.@spawn fetch_from_owner(wait, send) + @test timedwait(() -> istaskdone(t1), 60) == :ok + @test timedwait(() -> istaskdone(t2), 60) == :ok + + # Check the tasks + @test isdone(send) + @test isdone(recv) + + @test !isfailed(send) + @test !isfailed(recv) + + rmprocs(procs_added) + end + end +end From 1b532c14db83bc07fc109299ee3e3f0073089cb1 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 28 Sep 2021 23:51:17 -0400 Subject: [PATCH 3/8] use atomic field --- src/cluster.jl | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index b75caa2..56b48ab 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -99,7 +99,7 @@ mutable struct Worker del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels? add_msgs::Array{Any,1} @atomic gcflag::Bool - state::WorkerState + @atomic state::WorkerState c_state::Threads.Condition # wait for state changes, lock for state ct_time::Float64 # creation time conn_func::Any # used to setup connections lazily @@ -145,15 +145,13 @@ end function set_worker_state(w, state) lock(w.c_state) do - w.state = state + @atomic w.state = state notify(w.c_state; all=true) end end function check_worker_state(w::Worker) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -173,9 +171,8 @@ function check_worker_state(w::Worker) errormonitor(t) wait_for_conn(w) end - else - unlock(w.c_state) end + return nothing end exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker) @@ -193,9 +190,7 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") @@ -210,8 +205,6 @@ function wait_for_conn(w) wait(w.c_state) w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end - else - unlock(w.c_state) end nothing end @@ -667,8 +660,8 @@ function create_worker(manager, wconfig) for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) # wait for wl to join - lock(jw.c_state) do - if jw.state === W_CREATED + if jw.state === W_CREATED + lock(jw.c_state) do wait(jw.c_state) end end From d7f6faf805d152dcfb8fd5154b38b8b54f306dff Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sat, 14 Oct 2023 10:18:05 -0700 Subject: [PATCH 4/8] init_multi: Be more thread-safe --- src/cluster.jl | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 56b48ab..cd240b3 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -1312,18 +1312,16 @@ end using Random: randstring -let inited = false - # do initialization that's only needed when there is more than 1 processor - global function init_multi() - if !inited - inited = true - push!(Base.package_callbacks, _require_callback) - atexit(terminate_all_workers) - init_bind_addr() - cluster_cookie(randstring(HDR_COOKIE_LEN)) - end - return nothing +# do initialization that's only needed when there is more than 1 processor +const inited = Threads.Atomic{Bool}(false) +function init_multi() + if !Threads.atomic_cas!(inited, false, true) + push!(Base.package_callbacks, _require_callback) + atexit(terminate_all_workers) + init_bind_addr() + cluster_cookie(randstring(HDR_COOKIE_LEN)) end + return nothing end function init_parallel() From 96e02943e3144ebe014d119d59d14510fae5dcff Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Sun, 2 Jun 2024 19:52:32 +0200 Subject: [PATCH 5/8] Use errormonitor() in a few places --- src/cluster.jl | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index cd240b3..f3db0fe 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -757,17 +757,20 @@ function check_master_connect() if ccall(:jl_running_on_valgrind,Cint,()) != 0 return end - @async begin - start = time_ns() - while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout - sleep(1.0) - end - if !haskey(map_pid_wrkr, 1) - print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") - exit(1) + errormonitor( + @async begin + start = time_ns() + while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout + sleep(1.0) + end + + if !haskey(map_pid_wrkr, 1) + print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") + exit(1) + end end - end + ) end From a460f9f21f44982d8fedc64ca93bfa08fb671e5a Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 23 Jul 2024 21:16:54 +0200 Subject: [PATCH 6/8] Replace `@async` with `Threads.@spawn` --- src/cluster.jl | 28 ++++++++++++++-------------- src/macros.jl | 4 ++-- src/managers.jl | 4 ++-- src/messages.jl | 2 +- src/process_messages.jl | 14 +++++++------- src/remotecall.jl | 2 +- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index f3db0fe..add1aa5 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -163,10 +163,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - t = @async exec_conn_func(w) + t = Threads.@spawn exec_conn_func(w) else # route request via node 1 - t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) @@ -258,7 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std else sock = listen(interface, LPROC.bind_port) end - errormonitor(@async while isopen(sock) + errormonitor(Threads.@spawn while isopen(sock) client = accept(sock) process_messages(client, client, true) end) @@ -290,7 +290,7 @@ end function redirect_worker_output(ident, stream) - t = @async while !eof(stream) + t = Threads.@spawn while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = @async readline(io) + readtask = Threads.@spawn readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) @@ -430,7 +430,7 @@ if launching workers programmatically, execute `addprocs` in its own task. ```julia # On busy clusters, call `addprocs` asynchronously -t = @async addprocs(...) +t = Threads.@spawn addprocs(...) ``` ```julia @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = @async launch(manager, params, launched, launch_ntfy) + t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - @async begin + Threads.@spawn begin sleep(1) notify(launch_ntfy) end @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...) if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig - @async setup_launched_worker(manager, wconfig, launched_q) + Threads.@spawn setup_launched_worker(manager, wconfig, launched_q) end end end @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch wconfig.port = port let wconfig=wconfig - @async begin + Threads.@spawn begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) @@ -759,7 +759,7 @@ function check_master_connect() end errormonitor( - @async begin + Threads.@spawn begin start = time_ns() while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout sleep(1.0) @@ -1055,13 +1055,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = @async _rmprocs(pids, typemax(Int)) + t = Threads.@spawn _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return @async nothing + return Threads.@spawn nothing end end @@ -1244,7 +1244,7 @@ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids - @async interrupt(pid) + Threads.@spawn interrupt(pid) end end end diff --git a/src/macros.jl b/src/macros.jl index a4fec31..ac5029e 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex) # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally - @async Core.eval(m, ex) + Threads.@spawn Core.eval(m, ex) end end nothing @@ -275,7 +275,7 @@ function preduce(reducer, f, R) end function pfor(f, R) - t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) diff --git a/src/managers.jl b/src/managers.jl index 905322d..b506c42 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - @async try + Threads.@spawn try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") @@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) - timer_task = @async begin + timer_task = Threads.@spawn begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal diff --git a/src/messages.jl b/src/messages.jl index fe3e5ab..70baa25 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -200,7 +200,7 @@ function flush_gc_msgs() end catch e bt = catch_backtrace() - @async showerror(stderr, e, bt) + Threads.@spawn showerror(stderr, e, bt) end end diff --git a/src/process_messages.jl b/src/process_messages.jl index 3032917..15f5be6 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(Threads.@spawn run_work_thunk(rv, thunk)) return rv end end @@ -118,7 +118,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -299,7 +299,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) nothing @@ -307,7 +307,7 @@ function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) + errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) @@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig) end end end diff --git a/src/remotecall.jl b/src/remotecall.jl index 644ff04..3cd1207 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) -errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) +errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """ From 56329d5b48b40552e7da27f1f444633c03dff6cf Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 21 Jan 2025 19:46:46 +0100 Subject: [PATCH 7/8] Use timedwait() in check_master_connect() --- src/cluster.jl | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index add1aa5..0e8df8c 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -751,7 +751,6 @@ function redirect_output_from_additional_worker(pid, port) end function check_master_connect() - timeout = worker_timeout() * 1e9 # If we do not have at least process 1 connect to us within timeout # we log an error and exit, unless we're running on valgrind if ccall(:jl_running_on_valgrind,Cint,()) != 0 @@ -760,13 +759,9 @@ function check_master_connect() errormonitor( Threads.@spawn begin - start = time_ns() - while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout - sleep(1.0) - end - - if !haskey(map_pid_wrkr, 1) - print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") + timeout = worker_timeout() + if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out + print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end end From 90041ca7aa243d3d3e310b53ce411a1b88fbdf06 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 21 Jan 2025 20:52:34 +0100 Subject: [PATCH 8/8] Keep most 'client oriented' tasks in the same threadpool This is to avoid them accidentally running in another (potentially busy) threadpool. --- src/cluster.jl | 22 +++++++++++----------- src/macros.jl | 4 ++-- src/managers.jl | 4 ++-- src/remotecall.jl | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 0e8df8c..0b42961 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -163,10 +163,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - t = Threads.@spawn exec_conn_func(w) + t = Threads.@spawn Threads.threadpool() exec_conn_func(w) else # route request via node 1 - t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) @@ -194,7 +194,7 @@ function wait_for_conn(w) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - T = Threads.@spawn begin + T = Threads.@spawn Threads.threadpool() begin sleep($timeout) lock(w.c_state) do notify(w.c_state; all=true) @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = Threads.@spawn readline(io) + readtask = Threads.@spawn Threads.threadpool() readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy) + t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - Threads.@spawn begin + Threads.@spawn Threads.threadpool() begin sleep(1) notify(launch_ntfy) end @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...) if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig - Threads.@spawn setup_launched_worker(manager, wconfig, launched_q) + Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q) end end end @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch wconfig.port = port let wconfig=wconfig - Threads.@spawn begin + Threads.@spawn Threads.threadpool() begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) @@ -1050,13 +1050,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = Threads.@spawn _rmprocs(pids, typemax(Int)) + t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return Threads.@spawn nothing + return Threads.@spawn Threads.threadpool() nothing end end @@ -1239,7 +1239,7 @@ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids - Threads.@spawn interrupt(pid) + Threads.@spawn Threads.threadpool() interrupt(pid) end end end diff --git a/src/macros.jl b/src/macros.jl index ac5029e..5f3ce1e 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex) # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally - Threads.@spawn Core.eval(m, ex) + Threads.@spawn Threads.threadpool() Core.eval(m, ex) end end nothing @@ -275,7 +275,7 @@ function preduce(reducer, f, R) end function pfor(f, R) - t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) diff --git a/src/managers.jl b/src/managers.jl index b506c42..cf48c6f 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - Threads.@spawn try + Threads.@spawn Threads.threadpool() try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") @@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) - timer_task = Threads.@spawn begin + timer_task = Threads.@spawn Threads.threadpool() begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal diff --git a/src/remotecall.jl b/src/remotecall.jl index 3cd1207..eda3899 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -322,7 +322,7 @@ function process_worker(rr) msg = (remoteref_id(rr), myid()) # Needs to acquire a lock on the del_msg queue - T = Threads.@spawn begin + T = Threads.@spawn Threads.threadpool() begin publish_del_msg!($w, $msg) end Base.errormonitor(T)