Skip to content

Commit

Permalink
Fix data movement task dependencies with clang-formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
nicelhc13 committed Jun 13, 2023
1 parent 70a8f31 commit 31b64e2
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 20 deletions.
5 changes: 4 additions & 1 deletion src/c/backend/include/parray.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ class InnerPArray {
size_t get_num_active_tasks(DevID_t global_dev_id);

// TODO(hc): I will replace this list with a concurrent map.
/// Get a reference to a list of tasks who are using this PArray
/// Get a reference to a list of tasks who are using this PArray.
TaskList &get_task_list_ref();

/// Clear a list of tasks who referenced this PArray.
void clear_task_list_ref();

/// Return the instance of Python PArray.
void *get_py_parray();

Expand Down
12 changes: 8 additions & 4 deletions src/c/backend/include/policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ class MappingPolicy {
virtual void run_task_mapping(
InnerTask *task, const Mapper &mapper,
std::vector<std::shared_ptr<DeviceRequirement>> *chosen_devices,
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
const std::vector<
std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list,
std::vector<std::shared_ptr<PlacementRequirementBase>> *placement_req_options_vec) = 0;
std::vector<std::shared_ptr<PlacementRequirementBase>>
*placement_req_options_vec) = 0;

protected:
DeviceManager *device_manager_;
Expand Down Expand Up @@ -138,9 +140,11 @@ class LocalityLoadBalancingMappingPolicy : public MappingPolicy {
void run_task_mapping(
InnerTask *task, const Mapper &mapper,
std::vector<std::shared_ptr<DeviceRequirement>> *chosen_devices,
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
const std::vector<
std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list,
std::vector<std::shared_ptr<PlacementRequirementBase>> *placement_req_options_vec);
std::vector<std::shared_ptr<PlacementRequirementBase>>
*placement_req_options_vec);
};

#endif
2 changes: 2 additions & 0 deletions src/c/backend/parray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ size_t InnerPArray::get_num_active_tasks(DevID_t global_dev_id) {

TaskList &InnerPArray::get_task_list_ref() { return this->_task_lists; }

void InnerPArray::clear_task_list_ref() { this->_task_lists.clear(); }

void *InnerPArray::get_py_parray() { return this->_py_parray; }

uint64_t InnerPArray::get_parray_parentid() { return this->parent_id; }
Expand Down
14 changes: 12 additions & 2 deletions src/c/backend/phases.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void Mapper::run(SchedulerPhase *next_phase) {
std::vector<std::shared_ptr<DeviceRequirement>> chosen_devices;

policy_->run_task_mapping(task, *this, &chosen_devices, parray_list,
&placement_req_options_vec);
&placement_req_options_vec);

if (chosen_devices.empty()) {
// It means that none of the devices is available for this task.
Expand Down Expand Up @@ -196,8 +196,9 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) {
// We will use a concurrent map for parray's
// task list as an optimization.

std::vector<void *> compute_task_dependencies = task->get_dependencies();
std::vector<InnerTask *> data_task_dependencies;
#if 0
std::vector<void *> compute_task_dependencies = task->get_dependencies();
for (size_t k = 0; k < compute_task_dependencies.size(); ++k) {
InnerTask *parray_dependency =
static_cast<InnerTask *>(compute_task_dependencies[k]);
Expand All @@ -210,6 +211,15 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) {
}
}
}
#endif

for (size_t t = 0; t < parray_task_list.size(); ++t) {
// The current task has already added to the PArray reference list
// on Python. So, ignore itself.
if (task->id != parray_task_list.at(t)->id) {
data_task_dependencies.push_back(parray_task_list.at(t));
}
}

// TODO(hc): pass false to add_dependencies() as optimization.
datamove_task->add_dependencies(data_task_dependencies, true);
Expand Down
13 changes: 7 additions & 6 deletions src/c/backend/policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
std::vector<std::shared_ptr<DeviceRequirement>> *chosen_devices,
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list,
std::vector<std::shared_ptr<PlacementRequirementBase>> *placement_req_options_vec) {
std::vector<std::shared_ptr<PlacementRequirementBase>>
*placement_req_options_vec) {
// A set of chosen devices to a task.
Score_t best_score{-1};
// If any device was chosen as a candidate device,
Expand Down Expand Up @@ -242,8 +243,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
std::shared_ptr<DeviceRequirement> dev_req =
std::dynamic_pointer_cast<DeviceRequirement>(base_req);
Score_t score{0};
bool is_req_available = calc_score_devplacement(
task, dev_req, mapper, &score, parray_list[0]);
bool is_req_available = calc_score_devplacement(task, dev_req, mapper,
&score, parray_list[0]);
if (!is_req_available) {
continue;
}
Expand All @@ -263,9 +264,9 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
// std::cout << "[Mapper] Task name:" << task->get_name() << ", " <<
// "Checking arch requirement."
// << "\n";
bool is_req_available = calc_score_archplacement(
task, arch_req, mapper, chosen_dev_req, &chosen_dev_score,
parray_list[0]);
bool is_req_available =
calc_score_archplacement(task, arch_req, mapper, chosen_dev_req,
&chosen_dev_score, parray_list[0]);
if (!is_req_available) {
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions src/python/parla/cython/cyparray.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ cdef extern from "include/parray.hpp" namespace "parray":
void set_size(uint64_t)
uint64_t get_num_active_tasks(uint32_t global_dev_id) except +
uint64_t get_parray_parentid() except +
void clear_task_list_ref()

cdef class CyPArray:
# Hold a C++ instance which we're wrapping
cdef InnerPArray* cpp_parray
cdef InnerPArray* get_cpp_parray(self)
cpdef get_num_active_tasks(self, int global_dev_id)
cpdef get_parray_parentid(self)
cpdef clear_task_list_ref(self)
3 changes: 3 additions & 0 deletions src/python/parla/cython/cyparray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ cdef class CyPArray:

cpdef get_parray_parentid(self):
return self.cpp_parray.get_parray_parentid()

cpdef clear_task_list_ref(self):
self.cpp_parray.clear_task_list_ref()
20 changes: 13 additions & 7 deletions src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,18 @@ class Task:

def add_dataflow(self, dataflow):
if dataflow is not None:
# First, clear existing task references to
# this PArray.
for out_parray_tpl in dataflow.output:
out_parray = out_parray_tpl[0]
cy_parray = out_parray.cy_parray
parent_cy_parray = out_parray.parent.cy_parray
parent_cy_parray.clear_task_list_ref()
for inout_parray_tpl in dataflow.inout:
inout_parray = inout_parray_tpl[0]
cy_parray = inout_parray.cy_parray
parent_cy_parray = inout_parray.parent.cy_parray
parent_cy_parray.clear_task_list_ref()
for in_parray_tpl in dataflow.input:
in_parray = in_parray_tpl[0]
in_parray_devid = in_parray_tpl[1]
Expand Down Expand Up @@ -598,7 +610,7 @@ class DataMovementTask(Task):
target_dev = self.assigned_devices[0]
global_id = target_dev.get_global_id()
parray_id = device_manager.globalid_to_parrayid(global_id)
# print("Attempt to Move: ", self.parray.name, " to a device ", parray_id, flush=True)
#print("Attempt to Move: ", self.parray.name, " to a device ", parray_id, flush=True)
self.parray._auto_move(parray_id, write_flag)
#print(self, "Move PArray ", self.parray.ID, " to a device ", parray_id, flush=True)
#print(self, "STATUS: ", self.parray.print_overview())
Expand Down Expand Up @@ -1635,9 +1647,3 @@ class BackendTaskSpace(TaskSpace):

def wait(self):
self.inner_space.wait()






0 comments on commit 31b64e2

Please sign in to comment.