Skip to content

Commit

Permalink
Added timers to client program.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jan 22, 2021
1 parent 0bb1406 commit 6f0ba51
Show file tree
Hide file tree
Showing 35 changed files with 364 additions and 428 deletions.
23 changes: 18 additions & 5 deletions core/modules/loader/Central.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class Central {

virtual ~Central();

void run();

std::string getMasterHostName() const { return _masterAddr.ip; }
int getMasterPort() const { return _masterAddr.port; }
NetworkAddress getMasterAddr() const { return _masterAddr; }
Expand All @@ -80,7 +78,8 @@ class Central {
int getErrCount() const { return _server->getErrCount(); }

/// Send the contents of 'sendBuf' to 'host:port'. This waits for the message to be
/// sent before returning. Throws boost::system::system_error on failure.
/// sent before returning.
/// @throw boost::system::system_error on failure.
void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf) {
_server->sendBufferTo(host, port, sendBuf);
}
Expand All @@ -103,19 +102,30 @@ class Central {
return doList->addItem(item);
}

/// Run the server.
void runServer() {
for (; _runningIOThreads < _iOThreads; ++_runningIOThreads) {
run();
}
}

/// Provides a method for identifying different Central classes and
/// CentralWorkers in the log file.
virtual std::string getOurLogId() const { return "Central baseclass"; }

protected:
Central(boost::asio::io_service& ioService_,
std::string const& masterHostName, int masterPort,
int threadPoolSize, int loopSleepTime)
int threadPoolSize, int loopSleepTime,
int iOThreads)
: ioService(ioService_), _masterAddr(masterHostName, masterPort),
_threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime) {
_threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime),
_iOThreads(iOThreads) {
_initialize();
}

void run(); ///< Run a single asio thread.

boost::asio::io_service& ioService;

DoList::Ptr doList; ///< List of items to be checked at regular intervals.
Expand Down Expand Up @@ -143,6 +153,9 @@ class Central {
std::vector<std::thread> _ioServiceThreads; ///< List of asio io threads created by this

std::thread _checkDoListThread; ///< Thread for running doList checks on DoListItems.

int _iOThreads{5}; ///< Number of asio IO threads to run, set by config file.
int _runningIOThreads{0}; ///< Number of asio IO threads started.
};

}}} // namespace lsst::qserv::loader
Expand Down
89 changes: 46 additions & 43 deletions core/modules/loader/CentralClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ namespace loader {

CentralClient::CentralClient(boost::asio::io_service& ioService_,
std::string const& hostName, ClientConfig const& cfg)
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime()),
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()),
_hostName(hostName), _udpPort(cfg.getClientPortUdp()),
_defWorkerHost(cfg.getDefWorkerHost()),
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
_doListMaxLookups(cfg.getMaxLookups()),
_doListMaxInserts(cfg.getMaxInserts()),
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()){
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
}


Expand All @@ -65,45 +65,45 @@ void CentralClient::start() {
}


void CentralClient::handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInfo");
void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup");

StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data));
auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data));
if (sData == nullptr) {
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list");
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
return;
}
auto protoData = sData->protoParse<proto::KeyInfo>();
if (protoData == nullptr) {
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list");
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
return;
}

// TODO put in separate thread
_handleKeyInfo(inMsg, protoData);
_handleKeyLookup(inMsg, protoData);
}


void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
std::unique_ptr<proto::KeyInfo> protoData(std::move(protoBuf));

CompositeKey key(protoData->keyint(), protoData->keystr());
ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk());

LOGS(_log, LOG_LVL_DEBUG, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo);
/// Locate the original one shot and mark it as done.
CentralClient::KeyInfoReqOneShot::Ptr keyInfoOneShot;
CentralClient::KeyLookupReqOneShot::Ptr keyLookupOneShot;
{
std::lock_guard<std::mutex> lck(_waitingKeyInfoMtx);
auto iter = _waitingKeyInfoMap.find(key);
if (iter == _waitingKeyInfoMap.end()) {
LOGS(_log, LOG_LVL_WARN, "handleKeyInfoComplete could not find key=" << key);
std::lock_guard<std::mutex> lck(_waitingKeyLookupMtx);
auto iter = _waitingKeyLookupMap.find(key);
if (iter == _waitingKeyLookupMap.end()) {
LOGS(_log, LOG_LVL_WARN, "_handleKeyLookup could not find key=" << key);
return;
}
keyInfoOneShot = iter->second;
_waitingKeyInfoMap.erase(iter);
keyLookupOneShot = iter->second;
_waitingKeyLookupMap.erase(iter);
}
keyInfoOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
keyLookupOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
LOGS(_log, LOG_LVL_INFO, "Successfully found key=" << key << " " << chunkInfo);
}

Expand Down Expand Up @@ -139,14 +139,14 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique
size_t mapSize;
{
std::lock_guard<std::mutex> lck(_waitingKeyInsertMtx);
mapSize = _waitingKeyInsertMap.size();
auto iter = _waitingKeyInsertMap.find(key);
if (iter == _waitingKeyInsertMap.end()) {
LOGS(_log, LOG_LVL_WARN, "handleKeyInsertComplete could not find key=" << key);
return;
}
keyInsertOneShot = iter->second;
_waitingKeyInsertMap.erase(iter);
mapSize = _waitingKeyInsertMap.size();
}
keyInsertOneShot->keyInsertComplete();
LOGS(_log, LOG_LVL_INFO, "Successfully inserted key=" << key << " " << chunkInfo <<
Expand Down Expand Up @@ -178,6 +178,7 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk,
LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key <<
"size=" << sz << " loopCount=" << loopCount);
}
// Let the CPU do something else while waiting for some requests to finish.
usleep(_maxRequestSleepTime);
++loopCount;
lck.lock();
Expand Down Expand Up @@ -230,60 +231,64 @@ void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchu
strElem.appendToData(msgData);
try {
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
} catch (boost::system::system_error e) {
} catch (boost::system::system_error const& e) {
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInsertReq boost system_error=" << e.what() <<
" key=" << key << " chunk=" << chunk << " sub=" << subchunk);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


KeyInfoData::Ptr CentralClient::keyInfoReq(CompositeKey const& key) {
KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) {
// Returns a pointer to a Tracker object that can be used to track job
// completion and job status. keyInsertOneShot will call _keyInsertReq until
// it knows the task was completed. _handleKeyInfoComplete marks
// the jobs complete as the messages come in from workers.
// Insert a oneShot DoListItem to keep trying to add the key until
// we get word that it has been added successfully.
LOGS(_log, LOG_LVL_INFO, "Trying to lookup key=" << key);
auto keyInfoOneShot = std::make_shared<CentralClient::KeyInfoReqOneShot>(this, key);
auto keyLookupOneShot = std::make_shared<CentralClient::KeyLookupReqOneShot>(this, key);
{
std::unique_lock<std::mutex> lck(_waitingKeyInfoMtx);
std::unique_lock<std::mutex> lck(_waitingKeyLookupMtx);
// Limit the number of concurrent lookups.
// If the key is already in the map, there is no point in blocking.
int loopCount = 0;
auto iter = _waitingKeyInfoMap.find(key);
while (_waitingKeyInfoMap.size() > _doListMaxLookups
&& iter == _waitingKeyInfoMap.end()) {
size_t sz = _waitingKeyInfoMap.size();
uint64_t sleptForMicroSec = 0;
uint64_t const tenSec = 10000000;
auto iter = _waitingKeyLookupMap.find(key);
while (_waitingKeyLookupMap.size() > _doListMaxLookups
&& iter == _waitingKeyLookupMap.end()) {
size_t sz = _waitingKeyLookupMap.size();
lck.unlock();
if (loopCount % 100 == 0) {
// Log a message about this about once every 10 seconds.
if (sleptForMicroSec > tenSec) sleptForMicroSec = 0;
if (sleptForMicroSec == 0) {
LOGS(_log, LOG_LVL_INFO, "keyInfoReq waiting key=" << key <<
"size=" << sz << " loopCount=" << loopCount);
}
// Let the CPU do something else while waiting for some requests to finish.
usleep(_maxRequestSleepTime);
sleptForMicroSec += _maxRequestSleepTime;
++loopCount;
lck.lock();
iter = _waitingKeyInfoMap.find(key);
iter = _waitingKeyLookupMap.find(key);
}

// Use the existing lookup, if there is one.
if (iter != _waitingKeyInfoMap.end()) {
if (iter != _waitingKeyLookupMap.end()) {
auto cData = iter->second->cmdData;
return cData;
}

_waitingKeyInfoMap[key] = keyInfoOneShot;
_waitingKeyLookupMap[key] = keyLookupOneShot;
}
runAndAddDoListItem(keyInfoOneShot);
return keyInfoOneShot->cmdData;
runAndAddDoListItem(keyLookupOneShot);
return keyLookupOneShot->cmdData;
}


void CentralClient::_keyInfoReq(CompositeKey const& key) {
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyInfoReq trying key=" << key);
LoaderMsg msg(LoaderMsg::KEY_INFO_REQ, getNextMsgId(), getHostName(), getUdpPort());
void CentralClient::_keyLookupReq(CompositeKey const& key) {
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyLookupReq trying key=" << key);
LoaderMsg msg(LoaderMsg::KEY_LOOKUP_REQ, getNextMsgId(), getHostName(), getUdpPort());
BufferUdp msgData;
msg.appendToData(msgData);
// create the proto buffer
Expand All @@ -305,11 +310,9 @@ void CentralClient::_keyInfoReq(CompositeKey const& key) {

try {
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
} catch (boost::system::system_error e) {
} catch (boost::system::system_error const& e) {
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInfoReq boost system_error=" << e.what() <<
" key=" << key);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought.
// So just blow up so it's unmistakable something bad happened for now.
}
}

Expand Down Expand Up @@ -341,11 +344,11 @@ void CentralClient::KeyInsertReqOneShot::keyInsertComplete() {
}


util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() {
util::CommandTracked::Ptr CentralClient::KeyLookupReqOneShot::createCommand() {
struct KeyInfoReqCmd : public util::CommandTracked {
KeyInfoReqCmd(KeyInfoData::Ptr& cd, CentralClient* cent_) : cData(cd), cent(cent_) {}
void action(util::CmdData*) override {
cent->_keyInfoReq(cData->key);
cent->_keyLookupReq(cData->key);
}
KeyInfoData::Ptr cData;
CentralClient* cent;
Expand All @@ -354,7 +357,7 @@ util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() {
}


void CentralClient::KeyInfoReqOneShot::keyInfoComplete(CompositeKey const& key,
void CentralClient::KeyLookupReqOneShot::keyInfoComplete(CompositeKey const& key,
int chunk, int subchunk, bool success) {
if (key == cmdData->key) {
cmdData->chunk = chunk;
Expand Down
32 changes: 15 additions & 17 deletions core/modules/loader/CentralClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class CentralClient : public Central {

void start();


~CentralClient() override = default;

std::string const& getHostName() const { return _hostName; }
Expand All @@ -86,8 +85,6 @@ class CentralClient : public Central {
/// @return the default worker's UDP port
int getDefWorkerPortUdp() const { return _defWorkerPortUdp; }



/// Asynchronously request a key value insert to the workers.
/// This can block if too many key insert requests are already in progress.
/// @return - a KeyInfoData object for checking the job's status or
Expand All @@ -101,18 +98,18 @@ class CentralClient : public Central {
/// Asynchronously request a key value lookup from the workers. It returns a
/// KeyInfoData object to be used to track job status and get the value of the key.
/// This can block if too many key lookup requests are already in progress.
KeyInfoData::Ptr keyInfoReq(CompositeKey const& key);
KeyInfoData::Ptr keyLookupReq(CompositeKey const& key);
/// Handle a workers response to the keyInfoReq call.
void handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);
void handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);

std::string getOurLogId() const override { return "client"; }

private:
void _keyInsertReq(CompositeKey const& key, int chunk, int subchunk); ///< see keyInsertReq()
void _handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);

void _keyInfoReq(CompositeKey const& key); ///< see keyInfoReq()
void _handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
void _keyLookupReq(CompositeKey const& key); ///< see keyLookReq()
void _handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);



Expand All @@ -136,12 +133,12 @@ class CentralClient : public Central {
CentralClient* central;
};

/// Create commands to find a key in the index and get its value.
/// Create commands to lookup a key in the index and get its value.
/// It should keep trying this until it works and then drop it from _waitingKeyInfoMap.
struct KeyInfoReqOneShot : public DoListItem {
using Ptr = std::shared_ptr<KeyInfoReqOneShot>;
struct KeyLookupReqOneShot : public DoListItem {
using Ptr = std::shared_ptr<KeyLookupReqOneShot>;

KeyInfoReqOneShot(CentralClient* central_, CompositeKey const& key_) :
KeyLookupReqOneShot(CentralClient* central_, CompositeKey const& key_) :
cmdData(std::make_shared<KeyInfoData>(key_, -1, -1)), central(central_) { setOneShot(true); }

util::CommandTracked::Ptr createCommand() override;
Expand All @@ -164,15 +161,16 @@ class CentralClient : public Central {
const int _defWorkerPortUdp; ///< Default worker UDP port


size_t _doListMaxLookups{1000}; ///< Maximum number of concurrent lookups in DoList (set by config)
size_t _doListMaxInserts{1000}; ///< Maximum number of concurrent inserts in DoList (set by config)
int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length (set by config)
size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config
size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config
/// Time to sleep between checking requests when at max length, set by config
int const _maxRequestSleepTime = 100000;

std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap;
std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap; ///< Map of current insert requests.
std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts

std::map<CompositeKey, KeyInfoReqOneShot::Ptr> _waitingKeyInfoMap; // &&& change all references of keyInfo to keyLookup &&&, including protobuf keyInfo should only apply to worker key count and worker key range.
std::mutex _waitingKeyInfoMtx; ///< protects _waitingKeyInfoMap, _doListMaxLookups
std::map<CompositeKey, KeyLookupReqOneShot::Ptr> _waitingKeyLookupMap; ///< Map of current look up requests.
std::mutex _waitingKeyLookupMtx; ///< protects _waitingKeyLookMap, _doListMaxLookups
};

}}} // namespace lsst::qserv::loader
Expand Down
10 changes: 3 additions & 7 deletions core/modules/loader/CentralMaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void CentralMaster::addWorker(std::string const& ip, int udpPort, int tcpPort) {
}


void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, StringRange const& strRange) {
void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, KeyRange const& strRange) {
if (workerId == 0) {
return;
}
Expand Down Expand Up @@ -99,12 +99,10 @@ void CentralMaster::setWorkerNeighbor(MWorkerListItem::WPtr const& target, int m
auto addr = targetWorker->getUdpAddress();
try {
sendBufferTo(addr.ip, addr.port, msgData);
} catch (boost::system::system_error e) {
} catch (boost::system::system_error const& e) {
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::setWorkerNeighbor boost system_error=" << e.what() <<
" targ=" << *targetWorker << " msg=" << message <<
" neighborId=" << neighborId);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}

Expand Down Expand Up @@ -206,12 +204,10 @@ void CentralMaster::reqWorkerKeysInfo(uint64_t msgId, std::string const& targetI
reqMsg.appendToData(data);
try {
sendBufferTo(targetIp, targetPort, data);
} catch (boost::system::system_error e) {
} catch (boost::system::system_error const& e) {
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::reqWorkerKeysInfo boost system_error=" << e.what() <<
" msgId=" << msgId << " tIp=" << targetIp << " tPort=" << targetPort <<
" ourHost=" << ourHostName << " ourPort=" << ourPort);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}

Expand Down
Loading

0 comments on commit 6f0ba51

Please sign in to comment.