Skip to content

Commit

Permalink
better scheduling of data packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Ortner committed Jun 23, 2023
1 parent 91939ef commit 4c0cda2
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions src/assemble.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,34 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis)
_lock = ReentrantLock()
_prog = Progress(sum(length, rows))
_prog_ctr = 0
Threads.@threads for _i = 1:length(packets)
p = packets[_i]
Ap = feature_matrix(p.data, basis)
Yp = target_vector(p.data)
Wp = weight_vector(p.data)
# ----- syncronized communication block
lock(_lock)
A[p.rows, :] .= Ap
Y[p.rows] .= Yp
W[p.rows] .= Wp
_prog_ctr += length(p.rows)
ProgressMeter.update!(_prog, _prog_ctr)
unlock(_lock)
# -----
next = 1

Threads.@threads for _i = 1:nthreads()

while next <= length(packets)
# retrieve the next packet
lock(_lock)
if next > length(packets)
break
end
p = packets[next]
next += 1
unlock(_lock)

# assemble the corresponding data
Ap = feature_matrix(p.data, basis)
Yp = target_vector(p.data)
Wp = weight_vector(p.data)

# write into global design matrix
lock(_lock)
A[p.rows, :] .= Ap
Y[p.rows] .= Yp
W[p.rows] .= Wp
_prog_ctr += length(p.rows)
ProgressMeter.update!(_prog, _prog_ctr)
unlock(_lock)
end
end
@info " - Assembly completed."
return Array(A), Array(Y), Array(W)
Expand Down

0 comments on commit 4c0cda2

Please sign in to comment.