Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support max-allowed Pods for nodes #3

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/base/resource_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ message ResourceDescriptor {
uint64 trace_machine_id = 21;
// Resource labels
repeated Label labels = 32;
// Max pods allowed per node
uint64 max_pods = 33;
}
10 changes: 7 additions & 3 deletions src/scheduling/flow/cpu_cost_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ ArcDescriptor CpuCostModel::ResourceNodeToResourceNode(
}

ArcDescriptor CpuCostModel::LeafResourceNodeToSink(ResourceID_t resource_id) {
return ArcDescriptor(0LL, FLAGS_max_tasks_per_pu, 0ULL);
ResourceStatus* rs = FindPtrOrNull(*resource_map_, resource_id);
ResourceTopologyNodeDescriptor* rtnd = rs->mutable_topology_node();
return ArcDescriptor(0LL, rtnd->resource_desc().num_slots_below(), 0ULL);
}

ArcDescriptor CpuCostModel::TaskContinuation(TaskID_t task_id) {
Expand Down Expand Up @@ -822,7 +824,7 @@ void CpuCostModel::AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr) {
CHECK(rd.type() == ResourceDescriptor::RESOURCE_MACHINE);
ResourceID_t res_id = ResourceIDFromString(rd.uuid());
vector<EquivClass_t> machine_ecs;
for (uint64_t index = 0; index < FLAGS_max_multi_arcs_for_cpu; ++index) {
for (uint64_t index = 0; index < rd.max_pods(); ++index) {
EquivClass_t multi_machine_ec = GetMachineEC(rd.friendly_name(), index);
machine_ecs.push_back(multi_machine_ec);
CHECK(InsertIfNotPresent(&ec_to_index_, multi_machine_ec, index));
Expand Down Expand Up @@ -898,7 +900,9 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
}
// Running/idle task count
rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size());
rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu);
ResourceStatus* m_rs = FindPtrOrNull(*resource_map_, machine_res_id);
ResourceTopologyNodeDescriptor* m_rtnd = m_rs->mutable_topology_node();
rd_ptr->set_num_slots_below(m_rtnd->resource_desc().max_pods());
return accumulator;
}
} else if (accumulator->type_ == FlowNodeType::MACHINE) {
Expand Down