From 8642326719ff71189473548041183ca2d53d464b Mon Sep 17 00:00:00 2001 From: Pingan Ren Date: Thu, 4 Apr 2024 23:22:00 +0200 Subject: [PATCH] make bfs support parallel --- .../physical_path_finding_operator.hpp | 3 +- .../physical_path_finding_operator.cpp | 226 +++++++++++++++++- 2 files changed, 223 insertions(+), 6 deletions(-) diff --git a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp index c519096e..e5dead10 100644 --- a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp +++ b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp @@ -144,7 +144,8 @@ class PhysicalPathFinding : public PhysicalComparisonJoin { void BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) override; - + // //! Schedules tasks to calculate the next iteration of the path-finding + static void ScheduleBFSTasks(Pipeline &pipeline, Event &event, GlobalSinkState &state); }; } // namespace duckdb diff --git a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp index 74a79b58..49f1ae8b 100644 --- a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp +++ b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp @@ -6,6 +6,7 @@ #include "duckdb/parallel/event.hpp" #include "duckdb/parallel/meta_pipeline.hpp" #include "duckdb/parallel/thread_context.hpp" +#include "duckdb/parallel/base_pipeline_event.hpp" #include "duckdb/common/types/column/column_data_collection.hpp" #include @@ -115,6 +116,49 @@ void PathFindingLocalState::CreateCSR(DataChunk &input, // global_csr.Print(); // Debug print } +class GlobalBFSState { +public: + GlobalBFSState() = default; + + GlobalBFSState(int64_t v_size_, idx_t pairs_size_, int64_t *src_, int64_t *dst_, + UnifiedVectorFormat &vdata_src_, UnifiedVectorFormat &vdata_dst_) + : iter(1), v_size(v_size_), src(src_), dst(dst_), vdata_src(std::move(vdata_src_)), + vdata_dst(std::move(vdata_dst_)), started_searches(0), + seen(v_size_), visit1(v_size_), visit2(v_size_), + change(false), result(LogicalTypeId::BIGINT, true, true, pairs_size_) { + for (auto i = 0; i < LANE_LIMIT; i++) { + lane_to_num[i] = -1; + } + } + + void clear() { + iter = 1; + change = false; + for (auto i = 0; i < LANE_LIMIT; i++) { + lane_to_num[i] = -1; + } + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + } +public: + int64_t iter; + int64_t v_size; + int64_t *src; + int64_t *dst; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + idx_t started_searches; + idx_t lane_to_num[LANE_LIMIT]; + vector> seen; + vector> visit1; + vector> visit2; + bool change; + Vector result; +}; + class PathFindingGlobalState : public GlobalSinkState { public: using GlobalCompressedSparseRow = PhysicalPathFinding::GlobalCompressedSparseRow; @@ -129,7 +173,9 @@ class PathFindingGlobalState : public GlobalSinkState { PathFindingGlobalState(PathFindingGlobalState &prev) : GlobalSinkState(prev), global_tasks(prev.global_tasks), - global_csr(std::move(prev.global_csr)), child(prev.child + 1) {} + global_csr(std::move(prev.global_csr)), child(prev.child + 1) { + + } void Sink(DataChunk &input, PathFindingLocalState &lstate) const { lstate.Sink(input, *global_csr); @@ -144,6 +190,9 @@ class PathFindingGlobalState : public GlobalSinkState { unique_ptr global_csr; size_t child; + + // state for BFS + unique_ptr global_bfs_state; }; unique_ptr @@ -312,6 +361,114 @@ static void IterativeLengthFunction(const unique_ptr event_p, ClientContext &context, PathFindingGlobalState &state, idx_t start, idx_t end) + : ExecutorTask(context, std::move(event_p)), context(context), state(state), start(start), end(end) { + } + + TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { + auto& change = state.global_bfs_state->change; + auto& v_size = state.global_bfs_state->v_size; + auto& seen = state.global_bfs_state->seen; + auto& visit = state.global_bfs_state->iter & 1 ? state.global_bfs_state->visit1 : state.global_bfs_state->visit2; + auto& next = state.global_bfs_state->iter & 1 ? state.global_bfs_state->visit2 : state.global_bfs_state->visit1; + int64_t *v = (int64_t *)state.global_csr->v; + vector &e = state.global_csr->e; + + for (auto i = 0; i < v_size; i++) { + next[i] = 0; + } + for (auto i = 0; i < v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + next[n] = next[n] | visit[i]; + } + } + } + for (auto i = 0; i < v_size; i++) { + next[i] = next[i] & ~seen[i]; + seen[i] = seen[i] | next[i]; + change |= next[i].any(); + } + + event->FinishTask(); + return TaskExecutionResult::TASK_FINISHED; + } + +private: + ClientContext &context; + PathFindingGlobalState &state; + idx_t start; + idx_t end; +}; + +class BFSIterativeEvent : public BasePipelineEvent { +public: + BFSIterativeEvent(PathFindingGlobalState &gstate_p, Pipeline &pipeline_p) + : BasePipelineEvent(pipeline_p), gstate(gstate_p) { + } + + PathFindingGlobalState &gstate; + +public: + void Schedule() override { + auto &context = pipeline->GetClientContext(); + + // Schedule tasks equal to the number of threads, which will each merge multiple partitions + auto &ts = TaskScheduler::GetScheduler(context); + idx_t num_threads = ts.NumberOfThreads(); + + vector> bfs_tasks; + bfs_tasks.push_back(make_uniq(shared_from_this(), context, gstate, 0, 0)); + // for (idx_t tnum = 0; tnum < num_threads; tnum++) { + // bfs_tasks.push_back(make_uniq(shared_from_this(), context, gstate)); + // } + SetTasks(std::move(bfs_tasks)); + } + + void FinishEvent() override { + auto& bfs_state = gstate.global_bfs_state; + + auto result_data = FlatVector::GetData(bfs_state->result); + ValidityMask &result_validity = FlatVector::Validity(bfs_state->result); + + if (bfs_state->change) { + // detect lanes that finished + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = bfs_state->lane_to_num[lane]; + if (search_num >= 0) { // active lane + int64_t dst_pos = bfs_state->vdata_dst.sel->get_index(search_num); + if (bfs_state->seen[bfs_state->dst[dst_pos]][lane]) { + result_data[search_num] = + bfs_state->iter; /* found at iter => iter = path length */ + bfs_state->lane_to_num[lane] = -1; // mark inactive + } + } + } + // into the next iteration + auto bfs_event = std::make_shared(gstate, *pipeline); + this->InsertEvent(std::dynamic_pointer_cast(bfs_event)); + } else { + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = bfs_state->lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + bfs_state->lane_to_num[lane] = -1; // mark inactive + } + } + + // if remaining pairs, schedule the BFS for the next batch + if (bfs_state->started_searches < gstate.global_tasks.Count()) { + PhysicalPathFinding::ScheduleBFSTasks(*pipeline, *this, gstate); + } + } + } +}; + SinkFinalizeType PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, @@ -321,24 +478,83 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, auto &csr = gstate.global_csr; auto &global_tasks = gstate.global_tasks; if (global_tasks.Count() != 0) { + DataChunk all_pairs; DataChunk pairs; global_tasks.InitializeScanChunk(pairs); ColumnDataScanState scan_state; global_tasks.InitializeScan(scan_state); while (global_tasks.Scan(scan_state, pairs)) { - Vector result(LogicalType::BIGINT, true, true); - IterativeLengthFunction(csr, pairs, result); - // debug print - Printer::Print(result.ToString(pairs.size())); + all_pairs.Append(pairs, true); + } + // debug print + all_pairs.Print(); + + auto &src = all_pairs.data[0]; + auto &dst = all_pairs.data[1]; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + src.ToUnifiedFormat(all_pairs.size(), vdata_src); + dst.ToUnifiedFormat(all_pairs.size(), vdata_dst); + auto src_data = FlatVector::GetData(src); + auto dst_data = FlatVector::GetData(dst); + + gstate.global_bfs_state = make_uniq(csr->v_size, + global_tasks.Count(), src_data, dst_data, vdata_src, vdata_dst); + + // Schedule the first round of BFS tasks + if (all_pairs.size() > 0) { + ScheduleBFSTasks(pipeline, event, gstate); } } + // debug print + gstate.global_bfs_state->result.Print(global_tasks.Count()); + // Move to the next input child ++gstate.child; return SinkFinalizeType::READY; } +void ScheduleBFSTasks(Pipeline &pipeline, Event &event, GlobalSinkState &state) { + auto &gstate = state.Cast(); + auto &bfs_state = gstate.global_bfs_state; + + // for every batch of pairs, schedule a BFS task + bfs_state->clear(); + + // remaining pairs + if (bfs_state->started_searches < gstate.global_tasks.Count()) { + + auto result_data = FlatVector::GetData(bfs_state->result); + auto& result_validity = FlatVector::Validity(bfs_state->result); + + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + bfs_state->lane_to_num[lane] = -1; + while (bfs_state->started_searches < gstate.global_tasks.Count()) { + int64_t search_num = bfs_state->started_searches++; + int64_t src_pos = bfs_state->vdata_src.sel->get_index(search_num); + int64_t dst_pos = bfs_state->vdata_src.sel->get_index(search_num); + if (!bfs_state->vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + result_data[search_num] = (uint64_t)-1; /* no path */ + } else if (bfs_state->src[src_pos] == bfs_state->dst[dst_pos]) { + result_data[search_num] = + (uint64_t)0; // path of length 0 does not require a search + } else { + bfs_state->visit1[bfs_state->src[src_pos]][lane] = true; + bfs_state->lane_to_num[lane] = search_num; // active lane + break; + } + } + } + + auto bfs_event = make_shared(gstate, pipeline); + event.InsertEvent(std::move(std::dynamic_pointer_cast(bfs_event))); + } +} + + //===--------------------------------------------------------------------===// // Operator //===--------------------------------------------------------------------===//