Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative throughput calculation (should hopefully give better results) #7

Open
wants to merge 2 commits into
base: zero-order-grad
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/db/mysql/OptimizerDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
return pid;
}

void getTcnPipeResource(const Pair &pair, std::vector<std::string> &usedResources) {
usedResources.clear();

std::vector<std::string> getTcnPipeResource(const Pair &pair) {
std::vector<std::string> retval;
soci::rowset<soci::row> resources = (sql.prepare <<
"SELECT resc_id FROM t_tcn_resource_use "
" WHERE source_se = :source_se AND dest_se = :dest_se",
Expand All @@ -225,12 +224,13 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
for (auto j = resources.begin(); j != resources.end(); ++j) {
auto rescId = j->get<std::string>("resc_id");

usedResources.push_back(rescId);
retval.push_back(rescId);
}
return retval;
}

void getTcnResourceSpec(const std::string &project, std::map<std::string, double> &resourceConstraints) {
resourceConstraints.clear();
std::map<std::string, double> getTcnResourceSpec(const std::string &project, &resourceConstraints) {
std::map<std::string, double> retval;

soci::rowset<soci::row> specs = (sql.prepare <<
"SELECT resc_id, max_usage from t_tcn_resource_ctrlspec "
Expand All @@ -241,8 +241,9 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
auto rescId = i->get<std::string>("resc_id");
auto capacity = i->get<double>("max_usage");

resourceConstraints[rescId] = capacity;
retval[rescId] = capacity;
}
return retval;
}

int64_t getTransferredInfo(const Pair &pair, time_t windowStart) {
Expand Down
89 changes: 25 additions & 64 deletions src/server/services/optimizer/Optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,15 @@ void Optimizer::run(void)
// See FTS-1094
pairs.sort();

std::map<string, std::list<Pair>> pairsOfProject;

// Retrieve pair state
std::map<Pair, PairState> aggregatedPairState;

// amount transferred per project per link
std::map<std::pair<std::string, std::string>, double> transferredMap;

// resource limits per project
std::map<std::string, std::map<std::string, double>> resourceLimits;

for (auto i = pairs.begin(); i != pairs.end(); ++i) {
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Test run " << *i << " using traditional optimizer" << commit;
auto optMode = runOptimizerForPair(*i);
Expand All @@ -168,68 +173,29 @@ void Optimizer::run(void)
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Put " << *i << " to TCN aggregated optimizer" << commit;

aggregatedPairState[*i] = getPairState(*i, timeMultiplexing, newInterval);

std::string project = dataSource->getTcnProject(*i);

FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "TCN Control: put pipe " << *i << " to project "
<< project << commit;
pairsOfProject[project].push_back(*i);
}
}

if (!newInterval) {
for (auto it = pairsOfProject.begin(); it != pairsOfProject.end(); ++it) {
auto project = it->first;
auto projectPairs = it->second;

std::map<std::string, double> resourceLimits;
dataSource->getTcnResourceSpec(project, resourceLimits);

FTS3_COMMON_LOGGER_NEWLOG(DEBUG)
<< "TCN Control: getting TCN resource control spec for project "
<< project << commit;

std::map<std::string, std::list<Pair>> resourcePairs;

for (auto p = projectPairs.begin(); p != projectPairs.end(); ++p) {
std::vector<std::string> usedResources;
dataSource->getTcnPipeResource(*p, usedResources);

for (auto resc = usedResources.begin(); resc != usedResources.end(); ++resc) {
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "TCN Control: getting TCN resource usage: "
<< project << " | " << *p << " (" << p->vo << ") uses resource "
<< *resc << commit;
resourcePairs[*resc].push_back(*p);
}
auto limitsFound = resourceLimits.find(project);
if(limitsFound == resourceLimits.end()) {
resourceLimits[project] = dataSource->getTcnResourceSpec();
}

bool sleeping = false;

for (auto rl = resourceLimits.begin(); rl != resourceLimits.end(); ++rl) {
auto resc = rl->first;
double bwLimit = rl->second / 1024;
auto pairsUsingResource = resourcePairs[resc];

double actualMBps = 0;
for (auto p = pairsUsingResource.begin(); p != pairsUsingResource.end(); ++p) {
actualMBps += aggregatedPairState[*p].throughput / 1024 / 1024;
}

if (actualMBps > bwLimit) {
sleeping = true;
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Time multiplexing: project " << project
<< " exceeds limit on resource " << resc
<< " (target: " << bwLimit << " MB/s, actual: " << actualMBps << " MB/s),"
<< " all pipes of the project will sleep." << commit;
break;
std::vector<std::string> links = dataSource->getTcnPipeResource(*p);
if (newInterval) {
for(auto link = links.begin(); link != links.end(); link++){
initialTransferred[std::pair<project,*link>] = dataSource->getTransferredInfo(*i, qosIntervalStart);
}
}

if (sleeping) {
for (auto p = projectPairs.begin(); p != projectPairs.end(); ++p) {
sleepingPipes.insert(*p);
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Time multiplexing: pipe " << *p
<< " in project " << project << " is sleeping." << commit;
else {
for(auto link = links.begin(); link != links.end(); link++){
int64_t curTransferred = dataSource->getTransferredInfo(*i, qosIntervalStart) - initialTransferred[std::pair<project,*link>];
uint64_t limit = resourceLimits[project][*link];
// multiply limit by 1024 to get MBps
if (curTransferred > qosInterval * limit * 1024 && limit != 0) {
// we've gone over our bandwidth limit
// add to the set of sleeping pipes
sleepingPipes.insert(*i);
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Time multiplexing: pipe " << *i << " exceeds resource limit, sleep." << commit;
}
}
}
}
Expand All @@ -248,11 +214,6 @@ void Optimizer::run(void)
decisionVector = runTCNOptimizer(aggregatedPairState);
}

// for(auto sleepingPipe = sleepingPipes.begin(); sleepingPipe != sleepingPipes.end(); sleepingPipe++){
// set decision for sleeping pipes to zero
// decisionVector[*sleepingPipe] = 0;
// }

boost::timer::cpu_times const elapsed(timer.elapsed());
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Global time elapsed: " << elapsed.system << ", " << elapsed.user << commit;

Expand Down
4 changes: 3 additions & 1 deletion src/server/services/optimizer/Optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class OptimizerDataSource {
virtual void getTcnPipeResource(const Pair &pair, std::vector<std::string> &usedResources) = 0;

virtual void getTcnResourceSpec(const std::string &project,
std::map<std::string, double> &resourceConstraints) = 0;
std::map<std::string, double> &resourceConstraints) = 0;

// Get the weighted throughput for the pair
virtual void getThroughputInfo(const Pair &, const boost::posix_time::time_duration &,
Expand Down Expand Up @@ -216,6 +216,8 @@ class Optimizer: public boost::noncopyable {
// throughput (time multiplexing)
std::set<Pair> sleepingPipes;
time_t qosIntervalStart; // beginning of the current resource interval
// amount transferred per project per link at beginning of qosInterval
std::map<std::pair<std::string, std::string>, int64_t> initialTransferred;

// Run the optimization algorithm for the number of connections.
// Returns true if a decision is stored
Expand Down
8 changes: 5 additions & 3 deletions test/unit/server/services/optimizer/Optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,16 @@ class BaseOptimizerFixture: public OptimizerDataSource, public Optimizer {
return "project0";
}

void getTcnPipeResource(const Pair &pair, std::vector<std::string> &usedResources) {
std::vector<std::string> getTcnPipeResource(const Pair &pair) {
// TODO: mock function for unit test
std::vector<std::string> retval;
return;
}

void getTcnResourceSpec(const std::string &project, std::map<std::string, double> &resourceConstraints) {
std::map<std::string, double> getTcnResourceSpec(const std::string &project) {
// TODO: mock function for unit test
return;
std::map<std::string, double> retval;
return retval;
}

void getPairLimitOnPLinks(const Pair &pair, time_t windowStart,
Expand Down