From 31b64e230092a95ac7e76f361c051cd3e3192dd3 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Tue, 13 Jun 2023 12:21:03 -0500 Subject: [PATCH] Fix data movement task dependencies with clang-formatting --- src/c/backend/include/parray.hpp | 5 ++++- src/c/backend/include/policy.hpp | 12 ++++++++---- src/c/backend/parray.cpp | 2 ++ src/c/backend/phases.cpp | 14 ++++++++++++-- src/c/backend/policy.cpp | 13 +++++++------ src/python/parla/cython/cyparray.pxd | 2 ++ src/python/parla/cython/cyparray.pyx | 3 +++ src/python/parla/cython/tasks.pyx | 20 +++++++++++++------- 8 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/c/backend/include/parray.hpp b/src/c/backend/include/parray.hpp index 908300fa..758fadaa 100644 --- a/src/c/backend/include/parray.hpp +++ b/src/c/backend/include/parray.hpp @@ -61,9 +61,12 @@ class InnerPArray { size_t get_num_active_tasks(DevID_t global_dev_id); // TODO(hc): I will replace this list with a concurrent map. - /// Get a reference to a list of tasks who are using this PArray + /// Get a reference to a list of tasks who are using this PArray. TaskList &get_task_list_ref(); + /// Clear a list of tasks who referenced this PArray. + void clear_task_list_ref(); + /// Return the instance of Python PArray. void *get_py_parray(); diff --git a/src/c/backend/include/policy.hpp b/src/c/backend/include/policy.hpp index eca4cc55..2be43c1c 100644 --- a/src/c/backend/include/policy.hpp +++ b/src/c/backend/include/policy.hpp @@ -97,9 +97,11 @@ class MappingPolicy { virtual void run_task_mapping( InnerTask *task, const Mapper &mapper, std::vector> *chosen_devices, - const std::vector>> + const std::vector< + std::vector>> &parray_list, - std::vector> *placement_req_options_vec) = 0; + std::vector> + *placement_req_options_vec) = 0; protected: DeviceManager *device_manager_; @@ -138,9 +140,11 @@ class LocalityLoadBalancingMappingPolicy : public MappingPolicy { void run_task_mapping( InnerTask *task, const Mapper &mapper, std::vector> *chosen_devices, - const std::vector>> + const std::vector< + std::vector>> &parray_list, - std::vector> *placement_req_options_vec); + std::vector> + *placement_req_options_vec); }; #endif diff --git a/src/c/backend/parray.cpp b/src/c/backend/parray.cpp index 6492c820..dc91812c 100644 --- a/src/c/backend/parray.cpp +++ b/src/c/backend/parray.cpp @@ -61,6 +61,8 @@ size_t InnerPArray::get_num_active_tasks(DevID_t global_dev_id) { TaskList &InnerPArray::get_task_list_ref() { return this->_task_lists; } +void InnerPArray::clear_task_list_ref() { this->_task_lists.clear(); } + void *InnerPArray::get_py_parray() { return this->_py_parray; } uint64_t InnerPArray::get_parray_parentid() { return this->parent_id; } diff --git a/src/c/backend/phases.cpp b/src/c/backend/phases.cpp index 9cb741ce..f41b3908 100644 --- a/src/c/backend/phases.cpp +++ b/src/c/backend/phases.cpp @@ -61,7 +61,7 @@ void Mapper::run(SchedulerPhase *next_phase) { std::vector> chosen_devices; policy_->run_task_mapping(task, *this, &chosen_devices, parray_list, - &placement_req_options_vec); + &placement_req_options_vec); if (chosen_devices.empty()) { // It means that none of the devices is available for this task. @@ -196,8 +196,9 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) { // We will use a concurrent map for parray's // task list as an optimization. - std::vector compute_task_dependencies = task->get_dependencies(); std::vector data_task_dependencies; +#if 0 + std::vector compute_task_dependencies = task->get_dependencies(); for (size_t k = 0; k < compute_task_dependencies.size(); ++k) { InnerTask *parray_dependency = static_cast(compute_task_dependencies[k]); @@ -210,6 +211,15 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) { } } } +#endif + + for (size_t t = 0; t < parray_task_list.size(); ++t) { + // The current task has already added to the PArray reference list + // on Python. So, ignore itself. + if (task->id != parray_task_list.at(t)->id) { + data_task_dependencies.push_back(parray_task_list.at(t)); + } + } // TODO(hc): pass false to add_dependencies() as optimization. datamove_task->add_dependencies(data_task_dependencies, true); diff --git a/src/c/backend/policy.cpp b/src/c/backend/policy.cpp index 1cf1c762..0a3b6f03 100644 --- a/src/c/backend/policy.cpp +++ b/src/c/backend/policy.cpp @@ -208,7 +208,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( std::vector> *chosen_devices, const std::vector>> &parray_list, - std::vector> *placement_req_options_vec) { + std::vector> + *placement_req_options_vec) { // A set of chosen devices to a task. Score_t best_score{-1}; // If any device was chosen as a candidate device, @@ -242,8 +243,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( std::shared_ptr dev_req = std::dynamic_pointer_cast(base_req); Score_t score{0}; - bool is_req_available = calc_score_devplacement( - task, dev_req, mapper, &score, parray_list[0]); + bool is_req_available = calc_score_devplacement(task, dev_req, mapper, + &score, parray_list[0]); if (!is_req_available) { continue; } @@ -263,9 +264,9 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( // std::cout << "[Mapper] Task name:" << task->get_name() << ", " << // "Checking arch requirement." // << "\n"; - bool is_req_available = calc_score_archplacement( - task, arch_req, mapper, chosen_dev_req, &chosen_dev_score, - parray_list[0]); + bool is_req_available = + calc_score_archplacement(task, arch_req, mapper, chosen_dev_req, + &chosen_dev_score, parray_list[0]); if (!is_req_available) { continue; } diff --git a/src/python/parla/cython/cyparray.pxd b/src/python/parla/cython/cyparray.pxd index 0ae3230e..9a972c20 100644 --- a/src/python/parla/cython/cyparray.pxd +++ b/src/python/parla/cython/cyparray.pxd @@ -14,6 +14,7 @@ cdef extern from "include/parray.hpp" namespace "parray": void set_size(uint64_t) uint64_t get_num_active_tasks(uint32_t global_dev_id) except + uint64_t get_parray_parentid() except + + void clear_task_list_ref() cdef class CyPArray: # Hold a C++ instance which we're wrapping @@ -21,3 +22,4 @@ cdef class CyPArray: cdef InnerPArray* get_cpp_parray(self) cpdef get_num_active_tasks(self, int global_dev_id) cpdef get_parray_parentid(self) + cpdef clear_task_list_ref(self) diff --git a/src/python/parla/cython/cyparray.pyx b/src/python/parla/cython/cyparray.pyx index 5140d150..2e2adc37 100644 --- a/src/python/parla/cython/cyparray.pyx +++ b/src/python/parla/cython/cyparray.pyx @@ -37,3 +37,6 @@ cdef class CyPArray: cpdef get_parray_parentid(self): return self.cpp_parray.get_parray_parentid() + + cpdef clear_task_list_ref(self): + self.cpp_parray.clear_task_list_ref() diff --git a/src/python/parla/cython/tasks.pyx b/src/python/parla/cython/tasks.pyx index 6f897eb5..30e1c7ee 100644 --- a/src/python/parla/cython/tasks.pyx +++ b/src/python/parla/cython/tasks.pyx @@ -454,6 +454,18 @@ class Task: def add_dataflow(self, dataflow): if dataflow is not None: + # First, clear existing task references to + # this PArray. + for out_parray_tpl in dataflow.output: + out_parray = out_parray_tpl[0] + cy_parray = out_parray.cy_parray + parent_cy_parray = out_parray.parent.cy_parray + parent_cy_parray.clear_task_list_ref() + for inout_parray_tpl in dataflow.inout: + inout_parray = inout_parray_tpl[0] + cy_parray = inout_parray.cy_parray + parent_cy_parray = inout_parray.parent.cy_parray + parent_cy_parray.clear_task_list_ref() for in_parray_tpl in dataflow.input: in_parray = in_parray_tpl[0] in_parray_devid = in_parray_tpl[1] @@ -598,7 +610,7 @@ class DataMovementTask(Task): target_dev = self.assigned_devices[0] global_id = target_dev.get_global_id() parray_id = device_manager.globalid_to_parrayid(global_id) - # print("Attempt to Move: ", self.parray.name, " to a device ", parray_id, flush=True) + #print("Attempt to Move: ", self.parray.name, " to a device ", parray_id, flush=True) self.parray._auto_move(parray_id, write_flag) #print(self, "Move PArray ", self.parray.ID, " to a device ", parray_id, flush=True) #print(self, "STATUS: ", self.parray.print_overview()) @@ -1635,9 +1647,3 @@ class BackendTaskSpace(TaskSpace): def wait(self): self.inner_space.wait() - - - - - -