Skip to content

Commit

Permalink
#57 Switch per-process frequency checks from ahead-of-time in Config …
Browse files Browse the repository at this point in the history
…to runtime checks in PowerPolicy

The checks here are more benevolent, only reporting an actual collision rather than all potential ones. However, if there's a race condition, the runtime checks may not always catch it.

TODO: if a collision is detected, it throws a runtime exception, which stops the scheduler without a cleanup, leaving behind processes and cgroups
  • Loading branch information
MatejKafka committed Sep 9, 2021
1 parent 577a1ac commit 7e89ae5
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 248 deletions.
135 changes: 4 additions & 131 deletions src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ static cpu_set parse_cpu_set(const string &cpulist, T_all all_cpus)
// "all" = use all available CPUs
// MK: imo, it would be nicer to use *, but that (along with most other
// special characters) has special meaning in YAML, and must be quoted
if (cpulist == "all") return cpu_set(all_cpus);
return cpu_set(cpulist);
if (cpulist == "all") return { all_cpus };
return { cpulist };
}
template<typename T_all>
static cpu_set parse_cpu_set(const YAML::Node &node, T_all all_cpus)
Expand Down Expand Up @@ -289,135 +289,10 @@ Node Config::normalize_window(const Node &win, // in: window to normalize
return norm_win;
}

static unsigned int imx8_freq_i_from_freq(bool is_a53, size_t freq_MHz)
{
// clang-format off
if (is_a53) {switch (freq_MHz) { // A53
case 600: return 0;
case 896: return 1;
case 1104: return 2;
case 1200: return 3;
default: break;
}} else {switch (freq_MHz) { // A72
case 600: return 0;
case 1056: return 1;
case 1296: return 2;
case 1596: return 3;
default: break;
}}
// clang-format on
throw runtime_error(
"Frequency not supported by the "s + (is_a53 ? "A53" : "A72") + " cluster: '" +
to_string(freq_MHz) + "' MHz (supported frequencies: " +
(is_a53 ? "600 MHz, 896 MHz, 1104 MHz, 1200 MHz" : "600 MHz, 1056 MHz, 1296 MHz, 1596 MHz") +
")");
}

/**
* Check if per-process frequencies for the i.MX8 are feasible; that is, if each process
* is guaranteed to have the correct frequency set and there won't ever be 2 processes
* with different per-process frequency running on the same CPU cluster at the same time.
*
* MK: This is quite monolithic - if you have any idea how to split it up, feel free to do so.
* Validates that all referenced partitions exist and slices don't have overlapping cpusets.
*
* FIXME: currently, this whole section is i.MX8-specific; if it was moved somewhere where
* the available cpufreq policies are already known, it could be generic for any CPU;
* however, it would be system-dependent and could not be checked ahead-of-time on another machine
*/
void Config::validate_per_process_freq_feasibility()
{
// i.MX8 frequency bitmap (4 possible frequencies)
using Imx8FreqMap = bitset<4>;
const std::array<cpu_set, 2> imx8_cpus{ 0b1111, 0b110000 };
const cpu_set imx8_all_cpus(0b111111);

// for each partition, find out which frequencies the processes inside are requesting
map<string, std::array<Imx8FreqMap, imx8_cpus.size()>> requested_freqs{};
for (const auto &part : config["partitions"]) {
Imx8FreqMap part_freqs_a53(0);
Imx8FreqMap part_freqs_a72(0);
for (const auto &proc : part["processes"]) {
if (proc["_a53_freq"]) {
auto freq = proc["_a53_freq"].as<size_t>();
// convert the frequency back to index, as it's easier to work with here
part_freqs_a53.set(imx8_freq_i_from_freq(true, freq));
}
if (proc["_a72_freq"]) {
auto freq = proc["_a72_freq"].as<size_t>();
part_freqs_a72.set(imx8_freq_i_from_freq(false, freq));
}
}
requested_freqs[part["name"].as<string>()] = { part_freqs_a53, part_freqs_a72 };
}

// TODO: validate that all freq requests are used (e.g. if a process requests
// an A53 freq, check that it ever runs on the A53 cluster)

// find all requested frequencies for each window (separately for SC and BE
// partitions, as they run separately), and check if there are any collisions
// it is OK if there are multiple frequencies, all from the same partition;
// it is also OK if multiple partitions require the same single frequency;
// otherwise, a runtime collision may potentially occur and we throw an error
int window_i = -1;
bool collision_found = false;
for (const auto &win : config["windows"]) {
window_i++;
for (const string &part_key : { "sc_partition", "be_partition" }) {
std::array<Imx8FreqMap, imx8_cpus.size()> win_freqs{ 0, 0 };
for (const auto &slice : win["slices"]) {
if (!slice[part_key]) continue; // no SC/BE partition
auto slice_cpu = parse_cpu_set(slice["cpu"], imx8_all_cpus);

size_t i = 0;
// iterate in parallel over CPU clusters and corresponding requested frequencies
// we already checked that all partition references are valid, this lookup is safe
for (Imx8FreqMap req_freq : requested_freqs[slice[part_key].as<string>()]) {
Imx8FreqMap &win_freq = win_freqs[i];
cpu_set cluster_cpu = imx8_cpus[i];
i++;

if (req_freq.none()) continue; // no freq constraints
// if there's no overlap with the cluster, skip it
if (!(cluster_cpu & slice_cpu)) continue;

if (win_freq.none() || (win_freq | req_freq).count() == 1) {
// no previous constraints or a single compatible frequency, feasible so far
win_freq |= req_freq;
} else {
// there are at least 2 different freqs, and as both sets are non-empty,
// that's a potential runtime collision
// TODO: find the offending process tuple and report it in the error message
collision_found = true;
// don't throw the error, as that would only show the first one;
// instead, log it and then throw error after all checks are done
logger->error("Unfeasible combination of fixed per-process frequencies was "
"specified in the configuration."
"\n\tThe collision occurs between processes in window #{},"
" between {} partitions on CPU cluster '{}'."
"\n\tA process from the partition '{}' collides with a "
"process from one of the previous partitions "
"scheduled inside the window.",
window_i,
part_key == "sc_partition" ? "SC" : "BE",
cluster_cpu.as_list(),
slice[part_key].as<string>());
}
}
}
}
}

if (collision_found) {
throw runtime_error("Unfeasible per-process frequencies in configuration; "
"see above for more detailed information");
}
}

/**
* Validates that all referenced partitions exist, slices don't have overlapping cpusets,
* and that per-process frequencies are feasible, if specified.
*
* It would be simpler, and significantly to do the validation using the created scheduler objects
* It would be simpler to do the validation using created scheduler objects
* rather than the YAML config, but I want the validation to take place even when
* only dumping normalized config, and creating an intermediate struct-based config
* format is probably currently too much work for too little benefit.
Expand Down Expand Up @@ -465,8 +340,6 @@ void Config::validate_config()
acc |= slice_cpu;
}
}

validate_per_process_freq_feasibility();
}

void Config::normalize()
Expand Down
1 change: 0 additions & 1 deletion src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class Config
int anonymous_partition_counter = 0;

void validate_config();
void validate_per_process_freq_feasibility();

YAML::Node normalize_window(const YAML::Node &win, YAML::Node &partitions);
YAML::Node normalize_partition(const YAML::Node &part, float total_budget);
Expand Down
7 changes: 5 additions & 2 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ int main(int argc, char *argv[])
// this spawns the underlying system processes
sched.setup();

// configure linux scheduler - set highest possible priority for demos
// should be called after child process creation (in `sched.setup()`),
// configure linux scheduler - set the highest possible priority for demos
// must be called after child process creation (in `sched.setup()`),
// as we don't want children to inherit RT priority
struct sched_param sp = { .sched_priority = 99 };
if (sched_setscheduler(0, SCHED_FIFO, &sp) == -1) {
Expand All @@ -228,6 +228,9 @@ int main(int argc, char *argv[])
// everything is set up now, start the event loop
// the event loop terminates either on timeout (if set),
// SIGTERM, SIGINT (Ctrl-c), or when all scheduled processes exit
// FIXME: when a runtime error occurs, cgroups are not cleaned up, because
// they're not empty; catch the exception here, attempt to cleanup (sched.initiate_shutdown),
// then rethrow (if it fails, throw immediately)
scheduler_timeout ? sched.run(scheduler_timeout.value()) : sched.run();

} catch (const exception &e) {
Expand Down
1 change: 1 addition & 0 deletions src/power_policy/_power_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class PowerPolicy

// TODO: check the performance impact, potentially hide it behind a compile-time flag
virtual void on_process_start(Process &) {}
virtual void on_process_end(Process &) {}

// it seems a bit weird to have what's essentially an input parsing function here,
// but imo it is still cleaner than including all defined policies in main.cpp
Expand Down
60 changes: 49 additions & 11 deletions src/power_policy/imx8_per_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@
/**
* Set frequency based on the currently executing process.
*
* NOTE: this is a test power policy, the error messages are not very helpful
* The requested frequencies for processes are validated at runtime, and when a collision
* occurs (multiple processes request different frequencies for the same CPU cluster), an
* error message is shown.
* Originally, an ahead-of-time collision algorithm was used, but it was quite complex,
* and a clean implementation required refactoring a significant part of the codebase
* (Config required access to PowerManager), which was deemed too time-intensive
* to implement for a single experimental feature.
*/
class PowerPolicy_Imx8_PerProcess : public PmPowerPolicy
{
private:
CpufreqPolicy &a53_pol;
CpufreqPolicy &a72_pol;
const cpu_set a53_cpus{0b001111};
const cpu_set a72_cpus{0b110000};
// currently, this is hardcoded for i.MX8, but should be quite simply extensible
// for other CPU cluster layouts
std::array<CpufreqPolicy *, 2> policies{ &pm.get_policy("policy0"), &pm.get_policy("policy4") };
// a list of currently running processes which requested a fixed frequency
// there may be multiple processes which requested the same frequency,
// which is why a list is used instead of a single pointer
std::array<std::vector<Process *>, 2> locking_process_for_policy{};

public:
PowerPolicy_Imx8_PerProcess()
: a53_pol{ pm.get_policy("policy0") }
, a72_pol{ pm.get_policy("policy4") }
{
ASSERT(policies.size() == locking_process_for_policy.size());
for (auto &p : pm.policy_iter()) {
if (!p.available_frequencies) {
throw runtime_error("Cannot list available frequencies for the CPU"
Expand All @@ -32,14 +40,44 @@ class PowerPolicy_Imx8_PerProcess : public PmPowerPolicy

void on_process_start(Process &proc) override
{
ASSERT(proc.requested_frequencies.size() == policies.size());
// check if the process is requesting any specific frequency and
// if its current cpuset intersects with the A53/A72 cluster
cpu_set proc_cpus = proc.part.current_cpus;
if (proc.a53_freq && proc_cpus & a53_cpus) {
a53_pol.write_frequency(proc.a53_freq.value());

for (size_t i = 0; i < policies.size(); i++) {
auto &policy = *policies[i];
auto &locking_processes = locking_process_for_policy[i];
auto requested_freq = proc.requested_frequencies[i];

if (requested_freq && proc_cpus & policy.affected_cores) {
// process is requesting a fixed frequency, check if it is
// compatible with existing locks; all locking processes must have requested the
// same frequency, so it suffices to check the first one
if (!locking_processes.empty() &&
locking_processes[0]->requested_frequencies[i] != requested_freq) {
throw std::runtime_error(
fmt::format("Scheduled processes require different frequencies on the CPU "
"cluster '#{}': '{}', '{}'",
i,
locking_processes[0]->requested_frequencies[i].value(),
requested_freq.value()));
}
locking_processes.push_back(&proc);
policy.write_frequency(requested_freq.value());
}
}
if (proc.a72_freq && proc_cpus & a72_cpus) {
a72_pol.write_frequency(proc.a72_freq.value());
}

void on_process_end(Process &process) override {
// remove the process from the list of locking processes
for (auto &lp : locking_process_for_policy) {
for (auto it = lp.begin(); it != lp.end(); it++) {
if (*it == &process) {
lp.erase(it);
break;
}
}
}
}
};
3 changes: 1 addition & 2 deletions src/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ Process::Process(ev::loop_ref loop,
std::optional<CpuFrequencyHz> a72_freq,
bool has_initialization)
: part(partition)
, a53_freq{ a53_freq }
, a72_freq{ a72_freq }
, requested_frequencies{ a53_freq, a72_freq }
// budget +- (jitter / 2)
, jitter_distribution_ms(-budget_jitter.count() / 2,
budget_jitter.count() - budget_jitter.count() / 2)
Expand Down
3 changes: 1 addition & 2 deletions src/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ class Process
void mark_uncompleted();

Partition &part;
const std::optional<CpuFrequencyHz> a53_freq;
const std::optional<CpuFrequencyHz> a72_freq;
const std::array<std::optional<CpuFrequencyHz>, 2> requested_frequencies;

private:
std::uniform_int_distribution<long> jitter_distribution_ms;
Expand Down
12 changes: 8 additions & 4 deletions src/slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Slice::Slice(ev::loop_ref loop,

bool Slice::load_next_process(time_point current_time)
{
ASSERT(running_process == nullptr);
running_process = running_partition->seek_pending_process();
if (running_process) {
return true;
Expand Down Expand Up @@ -71,11 +72,15 @@ void Slice::start_next_process(time_point current_time)
power_policy.on_process_start(*running_process);
}

void Slice::stop_current_process()
void Slice::stop_current_process(bool mark_completed)
{
ASSERT(running_process != nullptr);
// this way, the process will run a bit longer
// if this call takes a long time to complete
power_policy.on_process_end(*running_process);
timer.stop();
running_process->suspend();
running_process->mark_completed();
if (mark_completed) running_process->mark_completed();
running_process = nullptr;
}

Expand Down Expand Up @@ -141,8 +146,7 @@ void Slice::stop(time_point current_time)
running_process->set_remaining_budget(remaining);
}

running_process->suspend();
running_process = nullptr;
stop_current_process(false);
}

// Called as a response to timeout or process completion.
Expand Down
2 changes: 1 addition & 1 deletion src/slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Slice

void schedule_next(time_point current_time);
void start_partition(Partition *part, time_point current_time, bool move_to_first_proc);
void stop_current_process();
void stop_current_process(bool mark_completed = true);
bool load_next_process(time_point current_time);
void start_next_process(time_point current_time);
};
8 changes: 1 addition & 7 deletions test/0200-config.t
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
. testlib
plan_tests 21
plan_tests 20

# set fixed log level for config tests; otherwise if user would run something
# like `SPDLOG_LEVEL=trace make test`, the output would include the logs and the tests would fail
Expand Down Expand Up @@ -42,12 +42,6 @@ out=$(demos-sched -C "{
}" 2>&1)
is $? 1 "'jitter > 2 * budget' causes an error"

out=$(demos-sched -C "{
windows: [ {length: 500, sc_partition: SC} ],
partitions: [ {name: SC, processes: [{cmd: echo, budget: 50, _a53_freq: 10}]} ]
}" 2>&1)
is $? 1 "CPU frequency out of range <0, 3> causes an error"

test_normalization "missing slice definition" \
"{
windows: [ {length: 500, sc_partition: SC} ],
Expand Down
Loading

0 comments on commit 7e89ae5

Please sign in to comment.