Skip to content
This repository has been archived by the owner on May 28, 2019. It is now read-only.

Commit

Permalink
Merge pull request #304 from imotai/master
Browse files Browse the repository at this point in the history
add write_bytes_ps limit
  • Loading branch information
imotai committed Feb 15, 2016
2 parents 9079fdf + c2a0afa commit f320131
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 181 deletions.
36 changes: 17 additions & 19 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ then
echo "boost exist"
else
echo "start install boost...."
wget http://superb-dca2.dl.sourceforge.net/project/boost/boost/1.57.0/boost_1_57_0.tar.gz >/dev/null
wget -O boost_1_57_0.tar.gz https://github.com/imotai/common_deps/releases/download/boost/boost_1_57.tar.gz >/dev/null
tar zxf boost_1_57_0.tar.gz >/dev/null
rm -rf ${DEPS_PREFIX}/boost_1_57_0
mv boost_1_57_0 ${DEPS_PREFIX}
Expand Down Expand Up @@ -161,21 +161,6 @@ else
cd -
fi

if [ -d "leveldb" ]
then
echo "leveldb exist"
else

# leveldb
git clone https://github.com/imotai/leveldb.git
cd leveldb
make -j8 >/dev/null
cp -rf include/* ${DEPS_PREFIX}/include
cp libleveldb.a ${DEPS_PREFIX}/lib
cd -
fi


if [ -d "ins" ]
then
echo "ins exist"
Expand All @@ -187,11 +172,9 @@ else
sed -i 's/^PROTOBUF_PATH=.*/PROTOBUF_PATH=..\/..\/thirdparty/' Makefile
sed -i 's/^PROTOC_PATH=.*/PROTOC_PATH=..\/..\/thirdparty\/bin/' Makefile
sed -i 's/^PROTOC=.*/PROTOC=..\/..\/thirdparty\/bin\/protoc/' Makefile
sed -i 's|^PREFIX=.*|PREFIX=..\/..\/thirdparty|' Makefile
sed -i 's|^PROTOC=.*|PROTOC=${PREFIX}/bin/protoc|' Makefile
sed -i 's/^GFLAGS_PATH=.*/GFLAGS_PATH=..\/..\/thirdparty/' Makefile
sed -i 's/^LEVELDB_PATH=.*/LEVELDB_PATH=..\/..\/thirdparty/' Makefile
sed -i 's/^GTEST_PATH=.*/GTEST_PATH=..\/..\/thirdparty/' Makefile
sed -i 's/^PREFIX=.*/PREFIX=..\/..\/thirdparty/' Makefile
export PATH=${DEPS_PREFIX}/bin:$PATH
export BOOST_PATH=${DEPS_PREFIX}/boost_1_57_0
export PBRPC_PATH=${DEPS_PREFIX}/
Expand All @@ -201,6 +184,21 @@ else
cd -
fi

if [ -d "leveldb" ]
then
echo "leveldb exist"
else

# leveldb
git clone https://github.com/imotai/leveldb.git
cd leveldb
make -j8 >/dev/null
cp -rf include/* ${DEPS_PREFIX}/include
cp libleveldb.a ${DEPS_PREFIX}/lib
cd -
fi



if [ -d "mdt" ]
then
Expand Down
1 change: 1 addition & 0 deletions optools/galaxy
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ init() {
mkdir -p $CGROUP_ROOT/memory && mount -t cgroup -omemory none $CGROUP_ROOT/memory >/dev/null 2>&1
mkdir -p $CGROUP_ROOT/cpuacct && mount -t cgroup -ocpuacct none $CGROUP_ROOT/cpuacct >/dev/null 2>&1
mkdir -p $CGROUP_ROOT/freezer && mount -t cgroup -ofreezer none $CGROUP_ROOT/freezer >/dev/null 2>&1
mkdir -p $CGROUP_ROOT/blkio && mount -t cgroup -oblkio none $CGROUP_ROOT/blkio >/dev/null 2>&1
// mount fstab
mount -a
/usr/sbin/adduser galaxy >/dev/null 2>&1
Expand Down
4 changes: 3 additions & 1 deletion src/agent/agent_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ void AgentImpl::CreatePodInfo(
task_info.stage = kTaskStagePENDING;
task_info.fail_retry_times = 0;
task_info.max_retry_times = 10;
if (req->has_job_name()) {
task_info.job_name = req->job_name();
}
pod_info->tasks[task_info.task_id] = task_info;
}
if (req->has_job_name()) {
Expand Down Expand Up @@ -334,7 +337,6 @@ bool AgentImpl::PingMaster() {
HeartBeatRequest request;
HeartBeatResponse response;
request.set_endpoint(endpoint_);
LOG(WARNING, "agent %s ping master : %s", endpoint_.c_str(), master_endpoint_.c_str());
return rpc_client_->SendRequest(master_,
&Master_Stub::HeartBeat,
&request,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/agent_internal_infos.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct TaskInfo {
std::string task_id;
std::string pod_id; // which pod belong to
std::string job_id;
std::string job_name;
TaskDescriptor desc;
TaskStatus status;
std::string initd_endpoint;
Expand Down Expand Up @@ -110,6 +111,7 @@ struct TaskInfo {
void CopyFrom(const TaskInfo& task) {
task_id = task.task_id;
pod_id = task.pod_id;
job_name = task.job_name;
job_id = task.job_id;
desc.CopyFrom(task.desc);
status.CopyFrom(task.status);
Expand Down
43 changes: 33 additions & 10 deletions src/agent/pod_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ int PodManager::CleanPodEnv(const PodInfo& pod_info) {
return 0;
}

LOG(WARNING, "pod gc path %s", pod_info.pod_status.pod_gc_path().c_str());
LOG(WARNING, "pod of job %s gc path %s", pod_info.job_name.c_str(),
pod_info.pod_status.pod_gc_path().c_str());
std::string new_workspace_dir = pod_info.pod_status.pod_gc_path();
if (::access(new_workspace_dir.c_str(), F_OK) == 0) {
LOG(WARNING, "path %s is already exists", new_workspace_dir.c_str());
Expand Down Expand Up @@ -285,7 +286,9 @@ int PodManager::CheckPod(const std::string& pod_id) {
int status = 0;
pid_t pid = ::waitpid(pod_info.initd_pid, &status, WNOHANG);
if (pid == 0) {
LOG(WARNING, "fail to kill %s initd", pod_info.pod_id.c_str());
LOG(WARNING, "fail to kill %s of job %s initd",
pod_info.pod_id.c_str(),
pod_info.job_name.c_str());
return 0;
}
}
Expand All @@ -294,7 +297,8 @@ int PodManager::CheckPod(const std::string& pod_id) {
LOG(WARNING, "fail to clean %s env", pod_info.pod_id.c_str());
return 0;
}
LOG(INFO, "remove pod %s", pod_info.pod_id.c_str());
LOG(INFO, "remove pod %s of job %s", pod_info.pod_id.c_str(),
pod_info.job_name.c_str());
pods_.erase(pod_it);
return -1;
}
Expand Down Expand Up @@ -403,7 +407,9 @@ int PodManager::DeletePod(const std::string& pod_id) {
return -1;
}
}
LOG(INFO, "pod %s to delete", pods_it->first.c_str());
LOG(INFO, "pod %s of job %s to delete",
pods_it->first.c_str(),
pod_info.job_name.c_str());
return 0;
}

Expand Down Expand Up @@ -431,19 +437,33 @@ int PodManager::AddPod(const PodInfo& info) {
+ internal_info.pod_id + "_" + time_str;
internal_info.pod_status.set_pod_gc_path(gc_dir);
internal_info.pod_status.set_start_time(::baidu::common::timer::get_micros());
LOG(WARNING, "pod gc path %s", pods_[info.pod_id].pod_status.pod_gc_path().c_str());
LOG(WARNING, "pod of job %s gc path %s", internal_info.job_name.c_str(),
pods_[info.pod_id].pod_status.pod_gc_path().c_str());

if (AllocPortForInitd(internal_info.initd_port) != 0){
LOG(WARNING, "pod %s alloc port for initd failed",
internal_info.pod_id.c_str());
return -1;
}
LOG(INFO, "run pod with namespace_isolation: [%d]", internal_info.pod_desc.namespace_isolation());
if (internal_info.pod_desc.requirement().has_read_bytes_ps()) {
LOG(INFO, "run pod with read_bytes_ps: [%d]", internal_info.pod_desc.requirement().read_bytes_ps());
if (internal_info.pod_desc.requirement().read_bytes_ps() > 0) {
LOG(INFO, "run pod %s of job %s with read_bytes_ps: [%ld]",
info.pod_id.c_str(), info.job_name.c_str(),
internal_info.pod_desc.requirement().read_bytes_ps());
} else {
LOG(INFO, "run pod without read_bytes_ps limit");
LOG(INFO, "run pod %s of job %s without read_bytes_ps limit",
info.pod_id.c_str(), info.job_name.c_str());
}

if (internal_info.pod_desc.requirement().write_bytes_ps() > 0) {
LOG(INFO, "run pod %s of job %s with write bytes ps: %ld",
info.pod_id.c_str(), info.job_name.c_str(),
info.pod_desc.requirement().write_bytes_ps());
} else {
LOG(INFO, "run pod %s of job %s without write limit",
info.pod_id.c_str(), info.job_name.c_str());
}

int lanuch_initd_ret = -1;
if (FLAGS_agent_namespace_isolation_switch && internal_info.pod_desc.namespace_isolation()) {
lanuch_initd_ret = LanuchInitd(&internal_info);
Expand Down Expand Up @@ -493,7 +513,9 @@ int PodManager::ReloadPod(const PodInfo& info) {
std::map<std::string, PodInfo>::iterator pods_it =
pods_.find(info.pod_id);
if (pods_it != pods_.end()) {
LOG(WARNING, "pod %s already loaded", info.pod_id.c_str());
LOG(WARNING, "pod %s of job %s already loaded",
info.pod_id.c_str(),
info.job_name.c_str());
return 0;
}
pods_[info.pod_id] = info;
Expand All @@ -513,6 +535,7 @@ int PodManager::ReloadPod(const PodInfo& info) {
task_it->second.pod_id = info.pod_id;
task_it->second.initd_endpoint = "127.0.0.1:";
task_it->second.job_id = internal_info.job_id;
task_it->second.job_name = info.job_name;
task_it->second.gc_dir = gc_dir;
task_it->second.initd_endpoint.append(
boost::lexical_cast<std::string>(
Expand All @@ -528,7 +551,7 @@ int PodManager::ReloadPod(const PodInfo& info) {
internal_info.pod_path = FLAGS_agent_work_dir + "/" + internal_info.pod_id;
LOG(INFO, "reload pod %s job %s initd pid %d at %s success ",
internal_info.pod_id.c_str(),
internal_info.job_id.c_str(),
internal_info.job_name.c_str(),
internal_info.initd_pid,
internal_info.pod_path.c_str());
return 0;
Expand Down
52 changes: 38 additions & 14 deletions src/agent/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "agent/resource_collector.h"
#include "logging.h"
#include "timer.h"
#include "string_util.h"
#include "utils/trace.h"

DECLARE_string(gce_cgroup_root);
Expand Down Expand Up @@ -371,7 +372,8 @@ void TaskManager::CollectIO(const std::string& task_id) {
CGroupIOStatistics current;
bool ok = CGroupIOCollector::Collect(freezer_path, &current);
if (!ok) {
LOG(WARNING, "fail to collect io stat for task %s", task_id.c_str());
LOG(WARNING, "fail to collect io stat for task %s",
task_id.c_str());
}else {
MutexLock scope_lock(&tasks_mutex_);
std::map<std::string, TaskInfo*>::iterator it = tasks_.find(task_id);
Expand Down Expand Up @@ -400,6 +402,11 @@ void TaskManager::CollectIO(const std::string& task_id) {
}
task->status.mutable_resource_used()->set_read_bytes_ps(read_bytes_ps);
task->status.mutable_resource_used()->set_write_bytes_ps(write_bytes_ps);
LOG(INFO, "pod %s of job %s read_bytes_ps %s/s write_bytes_ps %s/s",
task->pod_id.c_str(),
task->job_name.c_str(),
::baidu::common::HumanReadableString(read_bytes_ps).c_str(),
::baidu::common::HumanReadableString(write_bytes_ps).c_str());
task->status.mutable_resource_used()->set_syscr_ps(syscr_ps);
task->status.mutable_resource_used()->set_syscw_ps(syscw_ps);
task->old_io_stat = current;
Expand Down Expand Up @@ -1198,10 +1205,6 @@ bool TaskManager::HandleInitTaskBlkioCgroup(std::string& subsystem, TaskInfo* ta
if (task == NULL) {
return false;
}
if (!task->desc.requirement().has_read_bytes_ps()) {
return true;
}

LOG(INFO, "create cgroup %s for task %s", subsystem.c_str(), task->task_id.c_str());
if (hierarchies_.find("blkio") == hierarchies_.end()) {
LOG(WARNING, "blkio subsystem is disabled");
Expand All @@ -1214,21 +1217,42 @@ bool TaskManager::HandleInitTaskBlkioCgroup(std::string& subsystem, TaskInfo* ta
return false;
}
task->cgroups["blkio"] = blkio_path;
int64_t read_bytes_ps = task->desc.requirement().read_bytes_ps();
int32_t major_number;
bool ok = file::GetDeviceMajorNumberByPath(FLAGS_agent_work_dir, major_number);
if (!ok) {
LOG(WARNING, "get device major for task %s fail", task->task_id.c_str());
return false;
}
std::string limit_string = boost::lexical_cast<std::string>(major_number) + ":0 "
+ boost::lexical_cast<std::string>(read_bytes_ps);
if (cgroups::Write(blkio_path,
"blkio.throttle.read_bps_device",
limit_string) != 0) {
LOG(WARNING, "set read_bps fail for %s", blkio_path.c_str());
return false;
};
int64_t read_bytes_ps = task->desc.requirement().read_bytes_ps();
if (read_bytes_ps > 0) {
std::string read_limit_string = boost::lexical_cast<std::string>(major_number) + ":0 "
+ boost::lexical_cast<std::string>(read_bytes_ps);
if (cgroups::Write(blkio_path,
"blkio.throttle.read_bps_device",
read_limit_string) != 0) {
LOG(WARNING, "set read_bps fail for %s", blkio_path.c_str());
return false;
};
} else {
LOG(WARNING, "ignore read bytes ps of task podid %s of job %s",
task->pod_id.c_str(),
task->job_name.c_str());
}
int64_t write_bytes_ps = task->desc.requirement().write_bytes_ps();
if (write_bytes_ps > 0) {
std::string write_limit_string = boost::lexical_cast<std::string>(major_number) + ":0 "
+ boost::lexical_cast<std::string>(write_bytes_ps);
if (cgroups::Write(blkio_path,
"blkio.throttle.write_bps_device",
write_limit_string) != 0) {
LOG(WARNING, "set write_bps fail for %s", blkio_path.c_str());
return false;
};
} else {
LOG(WARNING, "ignore write bytes ps of task podid %s of job %s",
task->pod_id.c_str(),
task->job_name.c_str());
}
return true;
}

Expand Down
Binary file removed src/client/.galaxy_client.cc.swp
Binary file not shown.
14 changes: 14 additions & 0 deletions src/client/galaxy_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ int BuildJobFromConfig(const std::string& config, ::baidu::galaxy::JobDescriptio
return -1;
}
}
if (pod_require.HasMember("write_bytes_ps")) {
ok = ReadableStringToInt(pod_require["write_bytes_ps"].GetString(), &res->write_bytes_ps);
if (ok != 0) {
fprintf(stderr, "fail to parse pod write_bytes_ps %s\n", pod_require["write_bytes_ps"].GetString());
return -1;
}
}
std::vector< ::baidu::galaxy::TaskDescription>& tasks = pod.tasks;
if (pod_json.HasMember("tasks")) {
const rapidjson::Value& tasks_json = pod_json["tasks"];
Expand Down Expand Up @@ -403,6 +410,13 @@ int BuildJobFromConfig(const std::string& config, ::baidu::galaxy::JobDescriptio
return -1;
}
}
if (tasks_json[i]["requirement"].HasMember("write_bytes_ps")) {
ok = ReadableStringToInt(tasks_json[i]["requirement"]["write_bytes_ps"].GetString(), &res->write_bytes_ps);
if (ok != 0) {
fprintf(stderr, "fail to parse task write_bytes_ps %s\n", tasks_json[i]["requirement"]["write_bytes_ps"].GetString());
return -1;
}
}
tasks.push_back(task);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/rpc/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RpcClient {
_rpc_client = new sofa::pbrpc::RpcClient(options);
}
~RpcClient() {
_rpc_client->Shutdown();
delete _rpc_client;
}
template <class T>
Expand Down
Loading

0 comments on commit f320131

Please sign in to comment.