Skip to content

Commit

Permalink
Move senpai to public directory
Browse files Browse the repository at this point in the history
Summary: Open sourcing senpai

Reviewed By: hnaz

Differential Revision: D19870251

fbshipit-source-id: a1f399e2bf34728ab235b04d9bd1b9a08dc1a42c
  • Loading branch information
danobi authored and facebook-github-bot committed Feb 13, 2020
1 parent 56c03e2 commit 79d90b1
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 0 deletions.
1 change: 1 addition & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ srcs = files('''
src/oomd/plugins/PressureAbove.cpp
src/oomd/plugins/PressureRisingBeyond.cpp
src/oomd/plugins/SwapFree.cpp
src/oomd/plugins/Senpai.cpp
src/oomd/plugins/Exists.cpp
src/oomd/plugins/KillIOCost.cpp
src/oomd/plugins/KillMemoryGrowth.cpp
Expand Down
271 changes: 271 additions & 0 deletions src/oomd/plugins/Senpai.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright (C) 2018-present, Facebook, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include "oomd/plugins/Senpai.h"

#include <sys/stat.h>

#include <iomanip>
#include <sstream>

#include "oomd/Log.h"
#include "oomd/PluginRegistry.h"
#include "oomd/Stats.h"
#include "oomd/util/Fs.h"
#include "oomd/util/Util.h"

namespace {
auto constexpr kCgroupFs = "/sys/fs/cgroup";
} // namespace

namespace Oomd {

REGISTER_PLUGIN(senpai, Senpai::create);

int Senpai::init(
Engine::MonitoredResources& resources,
const Engine::PluginArgs& args) {
{
struct stat s;
has_memory_high_tmp_ =
!stat("/sys/fs/cgroup/system.slice/memory.high.tmp", &s);
}

if (args.find("cgroup") != args.end()) {
auto cgroup_fs =
(args.find("cgroup_fs") != args.end() ? args.at("cgroup_fs")
: kCgroupFs);

auto cgroups = Util::split(args.at("cgroup"), ',');
for (const auto& c : cgroups) {
cgroups_.emplace(cgroup_fs, c);
resources.emplace(cgroup_fs, c);
}
} else {
OLOG << "Argument=cgroup not present";
return 1;
}

if (args.find("limit_min_bytes") != args.end()) {
limit_min_bytes_ = std::stoull(args.at("limit_min_bytes"));
}

if (args.find("limit_max_bytes") != args.end()) {
limit_max_bytes_ = std::stoull(args.at("limit_max_bytes"));
}

if (args.find("interval") != args.end()) {
interval_ = std::stoull(args.at("interval"));
}

if (args.find("pressure_ms") != args.end()) {
pressure_ms_ =
std::chrono::milliseconds(std::stoull(args.at("pressure_ms")));
}

if (args.find("max_probe") != args.end()) {
max_probe_ = std::stod(args.at("max_probe"));
}

if (args.find("max_backoff") != args.end()) {
max_backoff_ = std::stod(args.at("max_backoff"));
}

if (args.find("coeff_probe") != args.end()) {
coeff_probe_ = std::stod(args.at("coeff_probe"));
}

if (args.find("coeff_backoff") != args.end()) {
coeff_backoff_ = std::stod(args.at("coeff_backoff"));
}

return 0;
}

Engine::PluginRet Senpai::run(OomdContext& ctx) {
std::set<std::string> resolved_cgroups;
for (const auto& cgroup : cgroups_) {
auto resolved = Fs::resolveWildcardPath(cgroup);
for (auto&& cg : std::move(resolved)) {
if (Fs::isDir(cg)) {
resolved_cgroups.emplace(cg);
}
}
}

auto new_cgroups = addRemoveTrackedCgroups(resolved_cgroups);
for (auto& cg : tracked_cgroups_) {
tick(cg.first, cg.second);
}

// new cgroups will be polled after a "tick" has elapsed, so add
// them to the tracked group at the end here
tracked_cgroups_.merge(std::move(new_cgroups));
return Engine::PluginRet::CONTINUE;
}

Senpai::CgroupState::CgroupState(
uint64_t start_limit,
std::chrono::microseconds total,
uint64_t start_ticks,
const std::string& path)
: limit{start_limit}, last_total{total}, ticks{start_ticks} {}

namespace {
std::chrono::microseconds getTotal(const std::string& name) {
// Senpai reads pressure.some to get early notice that a workload
// may be under resource pressure
const auto pressure =
Oomd::Fs::readMempressure(name, Oomd::Fs::PressureType::SOME);

if (!pressure.total) {
throw std::runtime_error("Senpai enabled but no total pressure info");
}

return pressure.total.value();
}

uint64_t getCurrent(const std::string& name) {
return static_cast<uint64_t>(Oomd::Fs::readMemcurrent(name));
}
} // namespace
std::map<std::string, Senpai::CgroupState> Senpai::addRemoveTrackedCgroups(
const std::set<std::string>& resolved_cgroups) {
std::map<std::string, CgroupState> new_cgroups;
auto resolvedIt = resolved_cgroups.cbegin();
auto trackedIt = tracked_cgroups_.begin();
while (resolvedIt != resolved_cgroups.cend()) {
if (trackedIt == tracked_cgroups_.end()) {
// The rest of the resolved cgroups are not tracked, track them
for (auto it = resolvedIt; it != resolved_cgroups.cend(); ++it) {
auto state = initializeCgroup(*it);
new_cgroups.emplace(std::move(*it), std::move(state));
}
return new_cgroups;
}

if (*resolvedIt < trackedIt->first) {
// Resolved cgroup not in tracked map, track it
auto state = initializeCgroup(*resolvedIt);
new_cgroups.emplace(std::move(*resolvedIt), std::move(state));
++resolvedIt;
} else {
if (trackedIt->first < *resolvedIt) {
// tracked cgroup not found, erase it
trackedIt = tracked_cgroups_.erase(trackedIt);
} else {
++resolvedIt;
++trackedIt;
}
}
}
tracked_cgroups_.erase(trackedIt, tracked_cgroups_.end());
return new_cgroups;
}

void Senpai::tick(const std::string& name, CgroupState& state) {
auto limit = static_cast<uint64_t>(readMemhigh(name));
auto total = getTotal(name);
auto factor = 0.0;

if (limit != state.limit) {
// Something else changed limits on this cgroup or it was
// recreated in-between ticks - reset the state and return,
// unfortuantely, the rest of this logic is still racy after this
// point
std::ostringstream oss;
oss << "cgroup " << name << " memory.high " << limit
<< " does not match recorded state " << state.limit
<< ". Resetting cgroup";
OLOG << oss.str();
state = initializeCgroup(name);
return;
}

// Adjust cgroup limit by factor
auto adjust = [&](double factor) {
state.limit += state.limit * factor;
state.limit =
std::max(limit_min_bytes_, std::min(limit_max_bytes_, state.limit));
// Memory high is always a multiple of 4K
state.limit &= ~0xFFF;
writeMemhigh(name, state.limit);
state.ticks = interval_;
state.cumulative = std::chrono::microseconds{0};
};
auto delta = total - state.last_total;
state.last_total = total;
state.cumulative += delta;
auto cumulative = state.cumulative.count();

if (state.cumulative >= pressure_ms_) {
// Excessive pressure, back off. The rate scales exponentially
// with pressure deviation. The coefficient defines how sensitive
// we are to fluctuations around the target pressure: when the
// coefficient is 10, the adjustment curve reaches the backoff
// limit when observed pressure is ten times the target pressure.
double error = state.cumulative / pressure_ms_;
factor = error / coeff_backoff_;
factor *= factor;
factor = std::min(factor * max_backoff_, max_backoff_);
adjust(factor);
} else if (state.ticks) {
--state.ticks;
} else {
// Pressure too low, tighten the limit. Like when backing off, the
// adjustment becomes exponentially more aggressive as observed
// pressure falls below the target pressure. The adjustment limit
// is reached when stall time falls through pressure/coeff_probe_.
auto one = std::chrono::microseconds{1};
double error = pressure_ms_ / std::max(state.cumulative, one);
factor = error / coeff_probe_;
factor *= factor;
factor = std::min(factor * max_probe_, max_probe_);
factor = -factor;
adjust(factor);
}

std::ostringstream oss;
oss << "cgroup " << name << std::setprecision(3) << std::fixed << " limitgb "
<< limit / (double)(1 << 30UL) << " totalus " << total.count()
<< " deltaus " << delta.count() << " cumus " << cumulative << " ticks "
<< state.ticks << std::defaultfloat << " adjust " << factor;
OLOG << oss.str();
}

Senpai::CgroupState Senpai::initializeCgroup(const std::string& path) {
auto start_limit = getCurrent(path);
writeMemhigh(path, start_limit);
return CgroupState(start_limit, getTotal(path), interval_, path);
}

int64_t Senpai::readMemhigh(const std::string& path) {
if (has_memory_high_tmp_) {
return Oomd::Fs::readMemhightmp(path);
} else {
return Oomd::Fs::readMemhigh(path);
}
}

void Senpai::writeMemhigh(const std::string& path, int64_t value) {
if (has_memory_high_tmp_) {
Oomd::Fs::writeMemhightmp(path, value, std::chrono::seconds(20));
} else {
Oomd::Fs::writeMemhigh(path, value);
}
}
} // namespace Oomd
96 changes: 96 additions & 0 deletions src/oomd/plugins/Senpai.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2019-present, Facebook, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#pragma once

#include "oomd/engine/BasePlugin.h"

#include <chrono>
#include <map>
#include <set>
#include <string>
#include <unordered_set>

namespace Oomd {

/*
* A plugin which adjusts memory.high on a cgroup in order to create a
* light amount of memory pressure. This allows memory.current to more
* accurately represent the amount of memory required by the cgroup.
*/
class Senpai : public Engine::BasePlugin {
public:
int init(
Engine::MonitoredResources& resources,
const Engine::PluginArgs& args) override;

Engine::PluginRet run(OomdContext& ctx) override;

static Senpai* create() {
return new Senpai();
}

~Senpai() = default;

private:
struct CgroupState {
CgroupState(
uint64_t start_limit,
std::chrono::microseconds total,
uint64_t start_ticks,
const std::string& path);

// Current memory limit
uint64_t limit;
// Last recorded total memory pressure
std::chrono::microseconds last_total;
// Cumulative memory pressure since last adjustment
std::chrono::microseconds cumulative{0};
// Count-down to decision to probe/backoff
uint64_t ticks;
};

// Removes any untracked cgroups and returns new cgroups to be watched
std::map<std::string, CgroupState> addRemoveTrackedCgroups(
const std::set<std::string>& resolved_cgroups);
void tick(const std::string& name, CgroupState& state);
CgroupState initializeCgroup(const std::string& path);
// Uses memory.high.tmp if available
int64_t readMemhigh(const std::string& path);
void writeMemhigh(const std::string& path, int64_t value);

std::unordered_set<CgroupPath> cgroups_;
std::map<std::string, CgroupState> tracked_cgroups_;

bool has_memory_high_tmp_{false};

// cgroup size limits
uint64_t limit_min_bytes_{1ull << 30};
uint64_t limit_max_bytes_{500ull << 30};
// pressure target - stall time over sampling period
uint64_t interval_{6};
std::chrono::microseconds pressure_ms_{std::chrono::milliseconds{10}};
// translate observed target deviation to cgroup adjustment rate
// - max_probe is reached when stalling falls below pressure / coeff_probe
// - max_backoff is reached when stalling exceeds pressure * coeff_backoff
double max_probe_{0.01};
double max_backoff_{1.0};
double coeff_probe_{10};
double coeff_backoff_{20};
};

} // namespace Oomd

0 comments on commit 79d90b1

Please sign in to comment.