Skip to content

Commit

Permalink
use atomic field
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy authored and JamesWrigley committed May 25, 2024
1 parent d80ff46 commit ad6dc03
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mutable struct Worker
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
add_msgs::Array{Any,1}
@atomic gcflag::Bool
state::WorkerState
@atomic state::WorkerState
c_state::Threads.Condition # wait for state changes, lock for state
ct_time::Float64 # creation time
conn_func::Any # used to setup connections lazily
Expand Down Expand Up @@ -145,15 +145,13 @@ end

function set_worker_state(w, state)
lock(w.c_state) do
w.state = state
@atomic w.state = state
notify(w.c_state; all=true)
end
end

function check_worker_state(w::Worker)
lock(w.c_state)
if w.state === W_CREATED
unlock(w.c_state)
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand All @@ -173,9 +171,8 @@ function check_worker_state(w::Worker)
errormonitor(t)
wait_for_conn(w)
end
else
unlock(w.c_state)
end
return nothing
end

exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker)
Expand All @@ -193,9 +190,7 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
lock(w.c_state)
if w.state === W_CREATED
unlock(w.c_state)
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

Expand All @@ -210,8 +205,6 @@ function wait_for_conn(w)
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
else
unlock(w.c_state)
end
nothing
end
Expand Down Expand Up @@ -667,8 +660,8 @@ function create_worker(manager, wconfig)
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
lock(jw.c_state) do
if jw.state === W_CREATED
if jw.state === W_CREATED
lock(jw.c_state) do
wait(jw.c_state)
end
end
Expand Down

0 comments on commit ad6dc03

Please sign in to comment.