Skip to content

Commit

Permalink
Fix data dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
nicelhc13 committed Jan 14, 2024
1 parent 05a4dff commit e17395a
Show file tree
Hide file tree
Showing 5 changed files with 2,590 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/c/backend/include/phases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class MemoryReserver : virtual public SchedulerPhase {
* @param task The task to create data movement tasks for.
*/
void create_datamove_tasks(InnerTask *task);
void create_datamove_tasks2(InnerTask *task);
};

/**
Expand Down
6 changes: 6 additions & 0 deletions src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class InnerTask {
std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
parray_list;

std::unordered_map<uint64_t, std::vector<InnerTask*>> parray_dependencies_map;

InnerTask();
InnerTask(long long int id, void *py_task);
InnerTask(std::string name, long long int id, void *py_task);
Expand Down Expand Up @@ -623,6 +625,10 @@ class InnerTask {
void begin_multidev_req_addition();
void end_multidev_req_addition();

std::vector<InnerTask*>& get_parray_dependencies(uint64_t parray_parent_id) {
return this->parray_dependencies_map[parray_parent_id];
}

PlacementRequirementCollections &get_placement_req_options() {
return placement_req_options_;
}
Expand Down
85 changes: 84 additions & 1 deletion src/c/backend/phases.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,89 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) {
task->add_dependencies(data_tasks, true);
}




void MemoryReserver::create_datamove_tasks2(InnerTask *task) {
// Get a list of the parrays the current task holds.
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list = task->parray_list;
std::string task_base_name = task->get_name();
std::vector<InnerTask *> data_tasks;
data_tasks.reserve(parray_list.size());

for (size_t i = 0; i < parray_list.size(); ++i) {
for (size_t j = 0; j < parray_list[i].size(); ++j) {
// Create a data movement task for each PArray.
parray::InnerPArray *parray = parray_list[i][j].first;
AccessMode access_mode = parray_list[i][j].second;
InnerDataTask *datamove_task = new InnerDataTask(
// TODO(hc): id should be updated!
task_base_name + ".dm." + std::to_string(i), 0, parray, access_mode,
i);
uint64_t parray_parent_id = parray->get_parent_parray()->id;
// Find dependency intersection between compute and data movement tasks.

// TODO(hc): This is not the complete implementation.
// We will use a concurrent map for parray's
// task list as an optimization.

std::vector<void *> compute_task_dependencies = task->get_dependencies();
std::vector<InnerTask *> data_task_dependencies;
for (size_t k = 0; k < compute_task_dependencies.size(); ++k) {
InnerTask *parray_dependency =
static_cast<InnerTask *>(compute_task_dependencies[k]);
std::vector<InnerTask*>& dep_parray_dependencies =
parray_dependency->get_parray_dependencies(parray_parent_id);

std::cout << parray_dependency->name << " is being traversed\n";
for (size_t t = 0; t < dep_parray_dependencies.size(); ++t) {
data_task_dependencies.push_back(parray_dependency);
// If the current processing parray's access mode is READ ONLY,
// add this dependency as a dependency for this parray.
std::cout << "access mode:" << int(access_mode) << "\n";
if (access_mode == AccessMode::IN) {
std::cout << "IN parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(parray_dependency);
}
}
}

// If the current processing parray's access mode is not READ ONLY,
// add itself as a dependency for this parray.
std::cout << task->name << " is being traversed access id :" << int(access_mode) << "\n";
if (access_mode != AccessMode::IN) {
std::cout << "IN/OUT OUT parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(task);
}

// TODO(hc): pass false to add_dependencies() as optimization.
datamove_task->add_dependencies(data_task_dependencies, true);
// Copy assigned devices to a compute task to a data movement task.
// TODO(hc): When we support xpy, it should be devices corresponding
// to placements of the local partition.
auto device = task->get_assigned_devices()[i];
datamove_task->add_assigned_device(device);

datamove_task->device_constraints.emplace(
std::piecewise_construct,
std::forward_as_tuple(device->get_global_id()),
std::forward_as_tuple(0, 0, 1));

data_tasks.push_back(datamove_task);
// Add the created data movement task to a reserved task queue.
this->scheduler->increase_num_active_tasks();
this->reserved_tasks_buffer.push_back(datamove_task);
}
}

// Create dependencies between data move task and compute tasks.
task->add_dependencies(data_tasks, true);
}




void MemoryReserver::run(SchedulerPhase *next_phase) {
NVTX_RANGE("MemoryReserver::run", NVTX_COLOR_LIGHT_GREEN)

Expand All @@ -263,7 +346,7 @@ void MemoryReserver::run(SchedulerPhase *next_phase) {
if (can_reserve) {
this->reserve_resources(task);
this->reservable_tasks->pop();
this->create_datamove_tasks(task);
this->create_datamove_tasks2(task);
this->reserved_tasks_buffer.push_back(task);
} else {
// TODO:(wlr) we need some break condition to allow the scheduler to
Expand Down
Loading

0 comments on commit e17395a

Please sign in to comment.