From 82ee211c70957d9c0dbeec68b07a4e3eac28fb18 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Wed, 19 Jul 2023 11:20:56 +0100 Subject: [PATCH] wip. --- Project.toml | 5 +++-- src/assemble.jl | 22 +++++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/Project.toml b/Project.toml index b3a9815..bee131b 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "0.1.1" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Folds = "41a02a25-b8f0-4f67-bc48-60067656b558" IterativeSolvers = "42fd0dbc-a981-5370-80f2-aaf504508153" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" LowRankApprox = "898213cb-b102-5a47-900c-97e73b919f73" @@ -21,7 +22,6 @@ PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" ACEfit_PythonCall_ext = "PythonCall" [compat] -julia = "1.9" IterativeSolvers = "0.9.2" LowRankApprox = "0.5.3" Optim = "1.7" @@ -29,9 +29,10 @@ ParallelDataTransfer = "0.5.0" ProgressMeter = "1.7" PythonCall = "0.9" StaticArrays = "1.5" +julia = "1.9" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", ] +test = ["Test"] diff --git a/src/assemble.jl b/src/assemble.jl index 4c60d72..f1a35b8 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -1,4 +1,5 @@ using Distributed +using Folds using ParallelDataTransfer using ProgressMeter using SharedArrays @@ -13,7 +14,7 @@ Base.length(d::DataPacket) = count_observations(d.data) """ Assemble feature matrix, target vector, and weight vector for given data and basis. """ -function assemble(data::AbstractVector{<:AbstractData}, basis) +function assemble(data::AbstractVector{<:AbstractData}, basis; mode=:threaded) @info "Assembling linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -22,16 +23,27 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) end packets = DataPacket.(rows, data) sort!(packets, by = length, rev = true) - (nprocs() > 1) && sendto(workers(), basis = basis) @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." A = SharedArray(zeros(rows[end][end], length(basis))) Y = SharedArray(zeros(size(A, 1))) W = SharedArray(zeros(size(A, 1))) - @info " - Beginning assembly with processor count: $(nprocs())." - @showprogress pmap(packets) do p - A[p.rows, :] .= feature_matrix(p.data, basis) + if mode == :serial + @info " - Beginning serial assembly." + elseif mode == :threaded + @info " - Beginning threaded assembly with $(Threads.nthreads()) threads." + map = Folds.map + elseif mode == :distributed + @info " - Beginning distributed assembly with $(nprocs()) processes." + map = pmap + (nprocs() > 1) && sendto(workers(), basis = basis) + end + progress = Progress(length(data)) + map(packets) do p + A[p.rows,:] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) + next!(progress) + GC.gc() end @info " - Assembly completed." return Array(A), Array(Y), Array(W)