diff --git a/src/assemble.jl b/src/assemble.jl index b60a8b6..a2bfd55 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -61,6 +61,8 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) _prog_ctr = 0 next = 1 + failed = Int[] + Threads.@threads for _i = 1:nthreads() while next <= length(packets) @@ -69,25 +71,36 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) break end lock(_lock) - p = packets[next] + cur = next next += 1 unlock(_lock) + if cur > length(packets) + break + end + p = packets[cur] # assemble the corresponding data - Ap = feature_matrix(p.data, basis) - Yp = target_vector(p.data) - Wp = weight_vector(p.data) + try + Ap = feature_matrix(p.data, basis) + Yp = target_vector(p.data) + Wp = weight_vector(p.data) - # write into global design matrix - lock(_lock) - A[p.rows, :] .= Ap - Y[p.rows] .= Yp - W[p.rows] .= Wp - _prog_ctr += length(p.rows) - ProgressMeter.update!(_prog, _prog_ctr) - unlock(_lock) + # write into global design matrix + lock(_lock) + A[p.rows, :] .= Ap + Y[p.rows] .= Yp + W[p.rows] .= Wp + _prog_ctr += length(p.rows) + ProgressMeter.update!(_prog, _prog_ctr) + unlock(_lock) + catch + @info("failed assembly: cur = $cur") + push!(failed, cur) + end end + @info("thread $_i done") end @info " - Assembly completed." + @show failed return Array(A), Array(Y), Array(W) end