From b08141fae094479b313a3fdb30ed4c35b4b4bb64 Mon Sep 17 00:00:00 2001 From: Tyill Date: Thu, 17 Oct 2019 07:18:56 +0500 Subject: [PATCH] snEngine -reduced number of workflows, relevant for deep networks --- src/snEngine/snEngine.h | 3 +- src/snEngine/src/snEngine.cpp | 196 +++++++++++++--------------------- src/snEngine/src/threadPool.h | 91 ++++++++++++---- 3 files changed, 147 insertions(+), 143 deletions(-) diff --git a/src/snEngine/snEngine.h b/src/snEngine/snEngine.h index 926842c..eba3d4d 100644 --- a/src/snEngine/snEngine.h +++ b/src/snEngine/snEngine.h @@ -59,7 +59,8 @@ namespace SN_Eng{ std::function stsCBack_ = nullptr; - ThreadPool* thrPoolForward_ = nullptr, *thrPoolBackward_ = nullptr; + ThreadPool* thrPoolForward_ = nullptr, + * thrPoolBackward_ = nullptr; bool fWorkEnd_ = false; ///< закрытие всех потоков SN_Base::operationParam operParam_; ///< параметры тек итерации diff --git a/src/snEngine/src/snEngine.cpp b/src/snEngine/src/snEngine.cpp index f097deb..121dcd3 100644 --- a/src/snEngine/src/snEngine.cpp +++ b/src/snEngine/src/snEngine.cpp @@ -48,12 +48,14 @@ namespace SN_Eng{ /// создание потоков для forward void SNEngine::createThreadsFwd(std::map& nodes, ThreadPool* thrPool){ - set thrExist; + set nodeExist; for (auto& n : nodes){ if (n.second.oprName == "Input"){ + thrPool->addNode(n.first); - thrExist.insert(n.first); + thrPool->addThread(n.first); + nodeExist.insert(n.first); ndStates_[n.first].parentFW = n.first; } @@ -61,25 +63,25 @@ namespace SN_Eng{ for (auto& n : nodes){ - if (thrExist.find(n.first) != thrExist.end()) continue; + if (nodeExist.find(n.first) != nodeExist.end()) continue; if (n.second.nextNodes.size() > 1){ for (auto& nn : n.second.nextNodes){ - if (thrExist.find(nn) != thrExist.end()) continue; + if (nodeExist.find(nn) != nodeExist.end()) continue; thrPool->addNode(nn); - thrExist.insert(nn); + nodeExist.insert(nn); } } if (n.second.prevNodes.size() > 1){ thrPool->addNode(n.first); - thrExist.insert(n.first); + nodeExist.insert(n.first); } else{ if (nodes[n.second.prevNodes[0]].nextNodes.size() > 1){ thrPool->addNode(n.first); - thrExist.insert(n.first); + nodeExist.insert(n.first); } } } @@ -109,22 +111,26 @@ namespace SN_Eng{ } /// подождем пока все запустятся - for (auto& thr : thrExist){ - thrPool->waitExist(thr); - } + for (auto& n : nodes){ + if (n.second.oprName == "Input"){ + thrPool->waitExist(n.first); + } + } } /// создание потоков для backward void SNEngine::createThreadsBwd(std::map& nodes, ThreadPool* thrPool){ - set thrExist; + set nodeExist; operParam_.action = SN_Base::snAction::backward; for (auto& n : nodes){ if (n.second.oprName == "Output"){ + thrPool->addNode(n.first); - thrExist.insert(n.first); + thrPool->addThread(n.first); + nodeExist.insert(n.first); ndStates_[n.first].parentBW = n.first; } @@ -132,25 +138,25 @@ namespace SN_Eng{ for (auto& n : nodes){ - if (thrExist.find(n.first) != thrExist.end()) continue; + if (nodeExist.find(n.first) != nodeExist.end()) continue; if (n.second.prevNodes.size() > 1){ for (auto& nn : n.second.prevNodes){ - if (thrExist.find(nn) != thrExist.end()) continue; + if (nodeExist.find(nn) != nodeExist.end()) continue; thrPool->addNode(nn); - thrExist.insert(nn); + nodeExist.insert(nn); } } if (n.second.nextNodes.size() > 1){ thrPool->addNode(n.first); - thrExist.insert(n.first); + nodeExist.insert(n.first); } else{ if (nodes[n.second.nextNodes[0]].prevNodes.size() > 1){ thrPool->addNode(n.first); - thrExist.insert(n.first); + nodeExist.insert(n.first); } } } @@ -180,8 +186,11 @@ namespace SN_Eng{ } /// подождем пока все запустятся - for (auto& thr : thrExist){ - thrPool->waitExist(thr); + for (auto& n : nodes){ + if (n.second.oprName == "Output"){ + + thrPool->waitExist(n.first); + } } } @@ -192,11 +201,6 @@ namespace SN_Eng{ nodes_ = brNet.nodes; for (auto& n : brNet.nodes) ndStates_[n.first] = ndState(); - - // только для fwd, поскольку назад может и не пойдем - thrPoolForward_ = new ThreadPool(bind(&SNEngine::operatorThreadForward, this, std::placeholders::_1)); - - createThreadsFwd(brNet.nodes, thrPoolForward_); } SNEngine::~SNEngine(){ @@ -211,6 +215,11 @@ namespace SN_Eng{ operParam_ = operPrm; + if (!thrPoolForward_){ + thrPoolForward_ = new ThreadPool(bind(&SNEngine::operatorThreadForward, this, std::placeholders::_1)); + createThreadsFwd(nodes_, thrPoolForward_); + } + /// предварительно установим готовность thrPoolForward_->preStartAll(); for (auto& n : nodes_) @@ -265,7 +274,7 @@ namespace SN_Eng{ if (prevNodes.size() == 1){ /// выполнение оператора - auto& pn = prevNodes[0]; SN_ENG_DMESS("node " + nname + " actionForward single prevNode " + pn) + auto& pn = prevNodes[0]; SN_ENG_DMESS("node " + nname + " actionForward single prevNode " + pn) ndStates_[nname].selectNextNodes = operats_[nname]->Do(operParam_, vector{operats_[pn]}); } else{ @@ -275,9 +284,8 @@ namespace SN_Eng{ for (auto& n : prevNodes){ string& firts = ndStates_[n].parentFW; - - SN_ENG_DMESS("node " + nname + " waitFinish thread " + firts) - thrPoolForward_->waitFinish(firts); + + thrPoolForward_->waitFinish(firts); SN_ENG_DMESS("node " + nname + " waitFinish thread " + firts) } /// проверим, что предыд оператор выбирал этот узел @@ -346,8 +354,7 @@ namespace SN_Eng{ /// сброс готовности старта для след-х узлов void SNEngine::resetPreStartNode(std::map& nodes, const std::string& nname){ - - + if (nodes[nname].prevNodes.size() == 1) thrPoolForward_->resetPrestart(ndStates_[nname].parentFW); else{ @@ -363,42 +370,6 @@ namespace SN_Eng{ if (allPrevNoPrest) thrPoolForward_->resetPrestart(nname); - } - - if (thrPoolForward_->isPrestart(ndStates_[nname].parentFW)) return; - - /// для всех след узлов тоже сбрас готовность - vector nextNodes{ nname }; - while (!nextNodes.empty()){ - - string snd = nextNodes.back(); - nextNodes.pop_back(); - - if (nodes[snd].nextNodes.empty()) continue; - - for (auto& nn : nodes[snd].nextNodes){ - - /// у след узла только один предыд? добавляем в пул для сброса готовности - if (nodes[nn].prevNodes.size() == 1){ - thrPoolForward_->resetPrestart(nn); - nextNodes.push_back(nn); - } - else{ - - /// все пред узлы не готовы к запуску? - bool allPrevNoPrest = true; - for (auto& n : nodes[nn].prevNodes){ - if (thrPoolForward_->isPrestart(ndStates_[n].parentFW)) - allPrevNoPrest = false; - } - - /// добавляем в пул для сброса готовности - if (allPrevNoPrest){ - thrPoolForward_->resetPrestart(nn); - nextNodes.push_back(nn); - } - } - } } } @@ -419,10 +390,7 @@ namespace SN_Eng{ if (!ndStates_[nname].selectNextNodes.empty() && (ndStates_[nname].selectNextNodes[0] == "noWay")){ - resetPreStartNode(nodes, nn); - - /// откат в начало - selWay = nnameMem; + resetPreStartNode(nodes, nn); /// тек поток тормозим thrPoolForward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) @@ -434,24 +402,13 @@ namespace SN_Eng{ selWay = nn; SN_ENG_DMESS("node " + nname + " selWay " + nn) } else{ - /// новый поток для след узла - if (!thrPoolForward_->isRun(nn)){ - thrPoolForward_->startTask(nn); SN_ENG_DMESS("node " + nname + " start thread " + nn) - } - - /// откат в начало - selWay = nnameMem; - - /// тек поток тормозим - thrPoolForward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + /// рестарт в том же потоке + thrPoolForward_->restartTask(nnameMem, nn); SN_ENG_DMESS("node " + nname + " restart thread " + nn) } } /// конец пути? else if (nextNodes.empty()){ - - /// откат в начало - selWay = nnameMem; - + /// тек поток тормозим thrPoolForward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) } @@ -474,16 +431,20 @@ namespace SN_Eng{ /// ответвления расталкиваем по другим потокам for (auto& nw : nextWays){ - if (!thrPoolForward_->isRun(nw)){ - thrPoolForward_->startTask(nw); SN_ENG_DMESS("node " + nname + " start thread " + nw) + + /// продолжение пути оставляем в этом же потоке + if (selWay.empty()){ + selWay = nw; SN_ENG_DMESS("node " + nname + " selWay " + nw) + continue; } - } - /// откат в начало - selWay = nnameMem; + thrPoolForward_->startTask(nw); SN_ENG_DMESS("node " + nname + " start thread " + nw) + } - /// тек поток тормозим - thrPoolForward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + if (nextWays.empty()){ + /// тек поток тормозим + thrPoolForward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + } } return selWay; @@ -505,10 +466,7 @@ namespace SN_Eng{ /// дальше идти? if (!ndStates_[pn].selectNextNodes.empty() && (ndStates_[pn].selectNextNodes[0] == "noWay")){ - - /// откат в начало - selWay = nnameMem; - + /// тек поток тормозим thrPoolBackward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) } @@ -519,23 +477,13 @@ namespace SN_Eng{ selWay = pn; SN_ENG_DMESS("node " + nname + " selWay " + pn) } else{ - /// новый поток для след узла - if (!thrPoolBackward_->isRun(pn)){ - thrPoolBackward_->startTask(pn); SN_ENG_DMESS("node " + nname + " start thread " + pn) - } - /// откат в начало - selWay = nnameMem; - - /// тек поток тормозим - thrPoolBackward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + /// рестарт в том же потоке + thrPoolBackward_->restartTask(nnameMem, pn); SN_ENG_DMESS("node " + nname + " restart thread " + pn) } } /// конец пути? else if (prevNodes.empty()){ - - /// откат в начало - selWay = nnameMem; - + /// тек поток тормозим thrPoolBackward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) } @@ -553,16 +501,20 @@ namespace SN_Eng{ /// ответвления расталкиваем по другим потокам for (auto& nw : nextWays){ - if (!thrPoolBackward_->isRun(nw)){ - thrPoolBackward_->startTask(nw); SN_ENG_DMESS("node " + nname + " start thread " + nw) + + /// продолжение пути оставляем в этом же потоке + if (selWay.empty()){ + selWay = nw; SN_ENG_DMESS("node " + nname + " selWay " + pn) + continue; } - } - /// откат в начало - selWay = nnameMem; + thrPoolBackward_->startTask(nw); SN_ENG_DMESS("node " + nname + " start thread " + nw) + } - /// тек поток тормозим - thrPoolBackward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + if (nextWays.empty()){ + /// тек поток тормозим + thrPoolBackward_->finish(nnameMem); SN_ENG_DMESS("node " + nname + " finish thread " + nnameMem) + } } return selWay; @@ -572,13 +524,16 @@ namespace SN_Eng{ void SNEngine::operatorThreadForward(std::string nname){ std::string nnameMem = nname; - + + nname = ""; + auto& nodes = nodes_; while (!fWorkEnd_){ - + /// ждем след итерацию - thrPoolForward_->waitStart(nnameMem); + if (nname.empty()) + nname = thrPoolForward_->waitStart(nnameMem); /// обработка текущего узла actionForward(nodes, nname); @@ -593,12 +548,15 @@ namespace SN_Eng{ std::string nnameMem = nname; + nname = ""; + auto& nodes = nodes_; while (!fWorkEnd_){ /// ждем след итерацию - thrPoolBackward_->waitStart(nnameMem); + if (nname.empty()) + nname = thrPoolBackward_->waitStart(nnameMem); /// обработка текущего узла actionBackward(nodes, nname); diff --git a/src/snEngine/src/threadPool.h b/src/snEngine/src/threadPool.h index 9af3d14..b5bead9 100644 --- a/src/snEngine/src/threadPool.h +++ b/src/snEngine/src/threadPool.h @@ -51,26 +51,65 @@ class ThreadPool { delete r.second; } - void addNode(const std::string& node){ + void addThread(const std::string& node){ std::lock_guard lk(mtx_); + if (fWorkEnd_) return; + + threads_[node] = new std::thread(func_, node); + } + + void addNode(const std::string& node){ + std::lock_guard lk(mtx_); + if (fWorkEnd_) return; ready_[node] = new SReady(); - threads_[node] = new std::thread(func_, node); } void startTask(const std::string& node){ std::lock_guard lk(mtx_); - if (fWorkEnd_) return; + if (fWorkEnd_ || ready_[node]->isRun()) return; + + for (auto& thr : threads_){ + if (!ready_[thr.first]->isRun()){ + + ready_[thr.first]->start(node); + ready_[node]->start(node); + + return; + } + } - ready_[node]->start(); - } + threads_[node] = new std::thread(func_, node); + + ready_[node]->exist(); + ready_[node]->start(node); + } + + void restartTask(const std::string& thr, const std::string& node){ + std::lock_guard lk(mtx_); + + if (fWorkEnd_) return; + + auto workNode = ready_[thr]->getWorkNode(); + if (!workNode.empty()) + ready_[workNode]->finish(); + + if (!ready_[node]->isRun()){ + ready_[thr]->start(node); + ready_[node]->start(node); + } + } void finish(const std::string& node){ std::lock_guard lk(mtx_); + auto workNode = ready_[node]->getWorkNode(); + if (!workNode.empty()) + ready_[workNode]->finish(); + ready_[node]->finish(); } @@ -82,17 +121,18 @@ class ThreadPool { } } - void waitStart(const std::string& node){ - if (fWorkEnd_) return; + std::string waitStart(const std::string& node){ + if (fWorkEnd_) return node; - ready_[node]->exist(); + if (!ready_[node]->isExist()) + ready_[node]->exist(); - ready_[node]->waitStart(); + return ready_[node]->waitStart(); } void waitFinish(const std::string& node){ if (fWorkEnd_) return; - + ready_[node]->waitFinish(); } @@ -101,15 +141,7 @@ class ThreadPool { ready_[node]->waitExist(); } - - bool isRun(const std::string& node){ - std::lock_guard lk(mtx_); - - bool isRn = ready_[node]->isRun() || !ready_[node]->isPrestart(); - - return isRn; - } - + bool isPrestart(const std::string& node){ std::lock_guard lk(mtx_); @@ -142,16 +174,17 @@ class ThreadPool { end(); } - void waitStart() { + std::string waitStart() { std::unique_lock lk(lkStart_); if (!end_ && !run_){ cvrStart_.wait(lk); } + return workNode_; } void waitFinish() { std::unique_lock lk(lkFinish_); - if (!end_ && (run_ || preStart_)){ + if (!end_ && preStart_){ cvrFinish_.wait(lk); } } @@ -163,9 +196,10 @@ class ThreadPool { } } - void start(){ + void start(const std::string& workNode){ std::lock_guard lk(lkStart_); - if (!run_){ + workNode_ = workNode; + if (!run_){ run_ = true; cvrStart_.notify_all(); } @@ -206,6 +240,11 @@ class ThreadPool { cvrFinish_.notify_all(); } + bool isExist(){ + + return isExist_; + } + bool isRun(){ return run_; @@ -216,7 +255,13 @@ class ThreadPool { return preStart_; } + std::string getWorkNode(){ + + return workNode_; + } + private: + std::string workNode_; std::mutex lkStart_, lkFinish_, lkExist_; std::condition_variable cvrStart_, cvrFinish_, cvrExist_; bool run_ = false, preStart_ = false, isExist_ = false, end_ = false;