diff --git a/admin/templates/configuration/etc/log4cxx.index.properties b/admin/templates/configuration/etc/log4cxx.index.properties index 8081a99e68..b34769a14e 100644 --- a/admin/templates/configuration/etc/log4cxx.index.properties +++ b/admin/templates/configuration/etc/log4cxx.index.properties @@ -5,8 +5,9 @@ # export LSST_LOG_CONFIG=$HOME/.lsst/log4cxx.unittest.properties # -#log4j.rootLogger=INFO, CONSOLE -log4j.rootLogger=DEBUG, CONSOLE +log4j.rootLogger=INFO, CONSOLE +#log4j.rootLogger=DEBUG, CONSOLE +#log4j.rootLogger=WARN, CONSOLE log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout diff --git a/admin/templates/configuration/etc/log4cxx.index_master.properties b/admin/templates/configuration/etc/log4cxx.index_master.properties new file mode 100644 index 0000000000..0a3b0c6da5 --- /dev/null +++ b/admin/templates/configuration/etc/log4cxx.index_master.properties @@ -0,0 +1,18 @@ +# +# Configuration file for log4cxx +# can be used for unit test +# by launching next command before unit tests: +# export LSST_LOG_CONFIG=$HOME/.lsst/log4cxx.unittest.properties +# + +#log4j.rootLogger=INFO, CONSOLE +log4j.rootLogger=DEBUG, CONSOLE +#log4j.rootLogger=WARN, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n +log4j.appender.CONSOLE.layout.ConversionPattern=[%d{ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n + +# Tune log at the module level +#log4j.logger.lsst.qserv.util=DEBUG diff --git a/admin/tools/docker/index/container/dev/README b/admin/tools/docker/index/container/dev/README index 56fe47d4ae..40983c1967 100644 --- a/admin/tools/docker/index/container/dev/README +++ b/admin/tools/docker/index/container/dev/README @@ -6,8 +6,6 @@ # These are the optional building steps (in case if this container needs to # be rebuilt from scratch (which is going to take ~2 hours or longer) -#docker build -t qserv/replica:dev . &&& -#docker push qserv/replica:dev &&& docker build -t qserv/loaderbase:dev . docker push qserv/loaderbase:dev diff --git a/admin/tools/docker/index/container/dev/master/appMaster.bash b/admin/tools/docker/index/container/dev/master/appMaster.bash index 8959dc1a1a..3054ad04a0 100755 --- a/admin/tools/docker/index/container/dev/master/appMaster.bash +++ b/admin/tools/docker/index/container/dev/master/appMaster.bash @@ -13,7 +13,7 @@ source /qserv/stack/loadLSST.bash cd /home/qserv/dev/qserv setup -r . -t qserv-dev -export LSST_LOG_CONFIG=/home/qserv/dev/qserv/admin/templates/configuration/etc/log4cxx.index.properties +export LSST_LOG_CONFIG=/home/qserv/dev/qserv/admin/templates/configuration/etc/log4cxx.index_master.properties /home/qserv/dev/qserv/build/loader/appMaster /home/qserv/dev/qserv/core/modules/loader/config/master.cnf diff --git a/admin/tools/docker/index/index-k8-10m.yaml b/admin/tools/docker/index/index-k8-10m.yaml index 43de392489..603f80bb1f 100644 --- a/admin/tools/docker/index/index-k8-10m.yaml +++ b/admin/tools/docker/index/index-k8-10m.yaml @@ -1,5 +1,3 @@ -apiVersion: v1 -kind: Service metadata: name: imaster-svc labels: @@ -61,7 +59,7 @@ metadata: spec: serviceName: iworker-svc podManagementPolicy: Parallel - replicas: 3 + replicas: 14 selector: matchLabels: app: iworker-pod @@ -116,7 +114,7 @@ spec: - name: iclientnum-ctr image: qserv/indexclientnum:dev imagePullPolicy: Always - args: ["1000000", "1", "client-k8s-a1.cnf"] + args: ["10000000", "1", "client-k8s-a1.cnf"] ports: - containerPort: 10050 protocol: UDP @@ -157,7 +155,7 @@ spec: - name: iclientnum2-ctr image: qserv/indexclientnum:dev imagePullPolicy: Always - args: ["2000001", "3000001", "client-k8s-a2.cnf"] + args: ["20000001", "30000001", "client-k8s-a2.cnf"] ports: - containerPort: 10050 protocol: UDP @@ -198,9 +196,218 @@ spec: - name: iclientnum3-ctr image: qserv/indexclientnum:dev imagePullPolicy: Always - args: ["1000001", "2000000", "client-k8s-a3.cnf"] + args: ["10000001", "20000000", "client-k8s-a3.cnf"] + ports: + - containerPort: 10050 + protocol: UDP + +--- +apiVersion: v1 +kind: Service +metadata: + name: iclientnum4-svc + labels: + app: index +spec: + ports: + - port: 10050 + protocol: UDP + clusterIP: None + selector: + app: iclientnum4-pod +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: iclientnum4-sts + labels: + app: index +spec: + serviceName: iclientnum4-svc + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: iclientnum4-pod + template: + metadata: + labels: + app: iclientnum4-pod + spec: + containers: + - name: iclientnum3-ctr + image: qserv/indexclientnum:dev + imagePullPolicy: Always + args: ["40000001", "50000000", "client-k8s-a1.cnf"] ports: - containerPort: 10050 - protocol: UDP + protocol: UDP +--- +apiVersion: v1 +kind: Service +metadata: + name: iclientnum5-svc + labels: + app: index +spec: + ports: + - port: 10050 + protocol: UDP + clusterIP: None + selector: + app: iclientnum5-pod +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: iclientnum5-sts + labels: + app: index +spec: + serviceName: iclientnum5-svc + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: iclientnum5-pod + template: + metadata: + labels: + app: iclientnum5-pod + spec: + containers: + - name: iclientnum5-ctr + image: qserv/indexclientnum:dev + imagePullPolicy: Always + args: ["50000001", "60000000", "client-k8s-a1.cnf"] + ports: + - containerPort: 10050 + protocol: UDP + +--- +apiVersion: v1 +kind: Service +metadata: + name: iclientnum6-svc + labels: + app: index +spec: + ports: + - port: 10050 + protocol: UDP + clusterIP: None + selector: + app: iclientnum6-pod +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: iclientnum6-sts + labels: + app: index +spec: + serviceName: iclientnum6-svc + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: iclientnum6-pod + template: + metadata: + labels: + app: iclientnum6-pod + spec: + containers: + - name: iclientnum6-ctr + image: qserv/indexclientnum:dev + imagePullPolicy: Always + args: ["60000001", "70000000", "client-k8s-a1.cnf"] + ports: + - containerPort: 10050 + protocol: UDP + +--- +apiVersion: v1 +kind: Service +metadata: + name: iclientnum7-svc + labels: + app: index +spec: + ports: + - port: 10050 + protocol: UDP + clusterIP: None + selector: + app: iclientnum7-pod +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: iclientnum7-sts + labels: + app: index +spec: + serviceName: iclientnum7-svc + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: iclientnum7-pod + template: + metadata: + labels: + app: iclientnum7-pod + spec: + containers: + - name: iclientnum7-ctr + image: qserv/indexclientnum:dev + imagePullPolicy: Always + args: ["70000001", "80000000", "client-k8s-a1.cnf"] + ports: + - containerPort: 10050 + protocol: UDP + +--- +apiVersion: v1 +kind: Service +metadata: + name: iclientnum8-svc + labels: + app: index +spec: + ports: + - port: 10050 + protocol: UDP + clusterIP: None + selector: + app: iclientnum8-pod +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: iclientnum8-sts + labels: + app: index +spec: + serviceName: iclientnum8-svc + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: iclientnum8-pod + template: + metadata: + labels: + app: iclientnum8-pod + spec: + containers: + - name: iclientnum8-ctr + image: qserv/indexclientnum:dev + imagePullPolicy: Always + args: ["80000001", "90000000", "client-k8s-a1.cnf"] + ports: + - containerPort: 10050 + protocol: UDP diff --git a/core/modules/loader/BufferUdp.cc b/core/modules/loader/BufferUdp.cc index c233c5ac8c..29d463fee2 100644 --- a/core/modules/loader/BufferUdp.cc +++ b/core/modules/loader/BufferUdp.cc @@ -48,7 +48,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket, // If there's something in the buffer already, get it and return. // This can happen when the previous read of socket read multiple elements. - MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket&&&" + note); + MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket" + note); if (msgElem != nullptr) { return msgElem; } @@ -69,7 +69,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket, /// Try to retrieve an element (there's no guarantee that an entire element got read in a single read. // Store original cursor positions so they can be restored if the read fails. - msgElem = _safeRetrieve("2readFromSocket&&&" + note); + msgElem = _safeRetrieve("2readFromSocket" + note); if (msgElem != nullptr) { return msgElem; } @@ -117,11 +117,11 @@ void BufferUdp::advanceReadCursor(size_t len) { } -std::shared_ptr BufferUdp::_safeRetrieve(std::string const& note) { // &&& delete note, maybe +std::shared_ptr BufferUdp::_safeRetrieve(std::string const& note) { auto wCursorOriginal = _wCursor; auto rCursorOriginal = _rCursor; // throwOnMissing=false since missing data is possible with TCP. - MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve &&&", false); + MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve", false); if (msgElem != nullptr) { return msgElem; } else { @@ -134,20 +134,7 @@ std::shared_ptr BufferUdp::_safeRetrieve(std::string const& note) { bool BufferUdp::isRetrieveSafe(size_t len) const { auto newLen = (_rCursor + len); - // &&&return (newLen <= _end && newLen <= _wCursor); - bool res = (newLen <= _end && newLen <= _wCursor); // &&& - if (!res) { // &&& - LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::isRetrieveSafe not safe len=" << len << - " rCursor=" << (void*)_rCursor << - " newLen=" << (void*)newLen << - " wCursor=" << (void*)_wCursor << - " _end=" << (void*)_end << - " (newLen<=end)=" << (newLen <= _end) << - " (newLen<=_wCursor)=" << (newLen <= _wCursor) << - " res=" << res); - LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::isRetrieveSafe " << dumpStr(false)); - } - return res; + return (newLen <= _end && newLen <= _wCursor); } @@ -157,7 +144,7 @@ bool BufferUdp::retrieve(void* out, size_t len) { _rCursor += len; return true; } - LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::retrieve not safe len=" << len); + LOGS(_log, LOG_LVL_DEBUG, "BufferUdp::retrieve not safe len=" << len); return false; } diff --git a/core/modules/loader/BufferUdp.h b/core/modules/loader/BufferUdp.h index 2ef7980991..66e3730750 100644 --- a/core/modules/loader/BufferUdp.h +++ b/core/modules/loader/BufferUdp.h @@ -46,7 +46,7 @@ class MsgElement; /// A buffer for reading and writing. Nothing can be read from the buffer until /// something has been written to it. -/// TODO: rename BufferUdp is not really accurate anymore. &&& +/// TODO: rename BufferUdp is not really accurate anymore. class BufferUdp { public: using Ptr = std::shared_ptr; diff --git a/core/modules/loader/Central.cc b/core/modules/loader/Central.cc index 2192644578..6791c081e3 100644 --- a/core/modules/loader/Central.cc +++ b/core/modules/loader/Central.cc @@ -77,7 +77,6 @@ void Central::_checkDoList() { while(_loop) { // Run and then sleep for a second. TODO A more advanced timer should be used doList->checkList(); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_loopSleepTime); } } diff --git a/core/modules/loader/CentralClient.cc b/core/modules/loader/CentralClient.cc index d526d7ad6b..5b8cea4b01 100644 --- a/core/modules/loader/CentralClient.cc +++ b/core/modules/loader/CentralClient.cc @@ -49,24 +49,11 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -CentralClient::CentralClient(boost::asio::io_service& ioService_, - std::string const& hostName, ClientConfig const& cfg) - : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), - _hostName(hostName), _udpPort(cfg.getClientPortUdp()), - _defWorkerHost(cfg.getDefWorkerHost()), - _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), - _doListMaxLookups(cfg.getMaxLookups()), - _doListMaxInserts(cfg.getMaxInserts()), - _maxRequestSleepTime(cfg.getMaxRequestSleepTime()) { -} -*/ + CentralClient::CentralClient(boost::asio::io_service& ioService_, std::string const& hostName, ClientConfig const& cfg) : CentralFollower(ioService_, hostName, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(),cfg.getLoopSleepTime(), cfg.getIOThreads(), cfg.getClientPortUdp()), - // &&& _hostName(hostName), - // &&& _udpPort(cfg.getClientPortUdp()), _defWorkerHost(cfg.getDefWorkerHost()), _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), _doListMaxLookups(cfg.getMaxLookups()), @@ -74,11 +61,6 @@ CentralClient::CentralClient(boost::asio::io_service& ioService_, _maxRequestSleepTime(cfg.getMaxRequestSleepTime()) { } -/* &&& -void CentralClient::start() { - _server = std::make_shared(ioService, _hostName, _udpPort, this); -} -*/ void CentralClient::startService() { _server = std::make_shared(ioService, _hostName, _udpPort, this); @@ -92,7 +74,7 @@ CentralClient::~CentralClient() { void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup"); - auto const sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup&&& ")); + auto const sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup ")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list"); return; @@ -135,7 +117,7 @@ void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete&&& ")); + auto sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete ")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to retrieve element"); return; @@ -199,11 +181,10 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk, size_t sz = _waitingKeyInsertMap.size(); lck.unlock(); if (loopCount % 100 == 0) { - LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key << + LOGS(_log, LOG_LVL_DEBUG, "keyInsertReq waiting key=" << key << "size=" << sz << " loopCount=" << loopCount); } // Let the CPU do something else while waiting for some requests to finish. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_maxRequestSleepTime); ++loopCount; lck.lock(); @@ -294,7 +275,6 @@ KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) { "size=" << sz << " loopCount=" << loopCount); } // Let the CPU do something else while waiting for some requests to finish. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_maxRequestSleepTime); sleptForMicroSec += _maxRequestSleepTime; ++loopCount; @@ -355,7 +335,7 @@ void CentralClient::getWorkerForKey(CompositeKey const& key, std::string& ip, in auto nAddr = worker->getUdpAddress(); ip = nAddr.ip; port = nAddr.port; - LOGS(_log, LOG_LVL_DEBUG, "getWorkerForKey " << key << " worker=" << worker); + LOGS(_log, LOG_LVL_DEBUG, "getWorkerForKey " << key << " worker=" << *worker); } else { ip = getDefWorkerHost(); port = getDefWorkerPortUdp(); diff --git a/core/modules/loader/CentralClient.h b/core/modules/loader/CentralClient.h index 779b1d21fd..eed867d310 100644 --- a/core/modules/loader/CentralClient.h +++ b/core/modules/loader/CentralClient.h @@ -67,20 +67,17 @@ class KeyInfoData : public util::Tracker { /// so replies to its request can be sent directly back to it. /// 'Central' provides access to the master and a DoList for handling requests. /// TODO Maybe base this on CentralWorker or have a common base class? -// &&& class CentralClient : public Central { class CentralClient : public CentralFollower { public: /// The client needs to know the master's IP and its own IP. CentralClient(boost::asio::io_service& ioService_, std::string const& hostName, ClientConfig const& cfg); - // &&&void start(); void startService() override; ~CentralClient() override; std::string const& getHostName() const { return _hostName; } - // &&& int getUdpPort() const { return _udpPort; } int getTcpPort() const { return 0; } ///< No tcp port at this time. /// @return the default worker's host name. @@ -161,17 +158,10 @@ class CentralClient : public CentralFollower { CentralClient* central; }; - - /// TODO The worker IP becomes default worker as it should be able to get - /// that information from the master in the future. DM-16555 - // &&& const std::string _hostName; - // &&& const int _udpPort; - // If const is removed, these will need mutex protection. const std::string _defWorkerHost; ///< Default worker host const int _defWorkerPortUdp; ///< Default worker UDP port - size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config /// Time to sleep between checking requests when at max length, set by config diff --git a/core/modules/loader/CentralFollower.cc b/core/modules/loader/CentralFollower.cc index e51c07bc5e..8a38e94f2e 100644 --- a/core/modules/loader/CentralFollower.cc +++ b/core/modules/loader/CentralFollower.cc @@ -65,7 +65,7 @@ void CentralFollower::startMonitoring() { bool CentralFollower::workerInfoReceive(BufferUdp::Ptr const& data) { // Open the data protobuffer and add it to our list. - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralFollower::workerInfoReceive&&& ")); + StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "CentralFollower::workerInfoReceive")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralFollower::workerInfoRecieve Failed to parse list"); return false; diff --git a/core/modules/loader/CentralFollower.h b/core/modules/loader/CentralFollower.h index c30b467c4a..61157f8580 100644 --- a/core/modules/loader/CentralFollower.h +++ b/core/modules/loader/CentralFollower.h @@ -47,9 +47,6 @@ class WorkerKeysInfo; namespace loader { - -// &&& class CentralWorkerDoListItem; - /// This class is used a base central class for servers that need to get /// lists of of worker from the master. /// CentralFollower provides no service on its own. The derived classes must: @@ -61,20 +58,6 @@ class CentralFollower : public Central { public: typedef std::pair CompKeyPair; - /* &&& - enum SocketStatus { - VOID0 = 0, - STARTING1, - ESTABLISHED2 - }; - - enum Direction { - NONE0 = 0, - TORIGHT1, - FROMRIGHT2 - }; - */ - CentralFollower(boost::asio::io_service& ioService, std::string const& hostName_, std::string const& masterHost, int masterPortUdp, int threadPoolSize, int loopSleepTime, int ioThreads, int fPortUdp) @@ -107,34 +90,23 @@ class CentralFollower : public Central { std::string getOurLogId() const override { return "CentralFollower"; } - // &&& friend CentralWorkerDoListItem; - protected: // &&& make some or all private again /// Real workers need to check this for initial ranges. virtual void checkForThisWorkerValues(uint32_t wId, std::string const& ip, int portUdp, int portTcp, KeyRange& strRange) {}; - /// &&& This function is needed to fill the map. On real workers, CentralWorker - /// needs to do additional work to set its id. + /// This function is needed to fill the map. On real workers, CentralWorker + /// needs to do additional work to set its own id. void _workerInfoReceive(std::unique_ptr& protoBuf); - /// See workerWorkerKeysInfoReq(...) - // &&& void _workerWorkerKeysInfoReq(LoaderMsg const& inMsg); - const std::string _hostName; const int _udpPort; - // &&& const int _tcpPort; - WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; ///< Maps of workers. + /// Maps of workers with their key ranges. + WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; - /// The DoListItem that makes sure _monitor() is run. &&& needs to ask master for worker map occasionally - // &&& replace with item to refresh _wWorkerList (see CentralWorker::_startMonitoring)// std::shared_ptr _centralWorkerDoListItem; }; - - - - }}} // namespace lsst::qserv::loader #endif // LSST_QSERV_LOADER_CENTRAL_FOLLOWER_H diff --git a/core/modules/loader/CentralMaster.cc b/core/modules/loader/CentralMaster.cc index 86f385e8c3..a75487247d 100644 --- a/core/modules/loader/CentralMaster.cc +++ b/core/modules/loader/CentralMaster.cc @@ -43,11 +43,6 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -void CentralMaster::start() { - _server = std::make_shared(ioService, getMasterHostName(), getMasterPort(), this); -} -*/ void CentralMaster::startService() { _server = std::make_shared(ioService, getMasterHostName(), getMasterPort(), this); } diff --git a/core/modules/loader/CentralMaster.h b/core/modules/loader/CentralMaster.h index 84e7e37e5d..36b639bcd3 100644 --- a/core/modules/loader/CentralMaster.h +++ b/core/modules/loader/CentralMaster.h @@ -59,7 +59,6 @@ class CentralMaster : public Central { _maxKeysPerWorker(cfg.getMaxKeysPerWorker()) {} /// Open the UDP port. This can throw boost::system::system_error. - // &&&void start(); void startService() override; ~CentralMaster() override { _mWorkerList.reset(); } diff --git a/core/modules/loader/CentralWorker.cc b/core/modules/loader/CentralWorker.cc index 48b85e902d..ebcb69cc70 100644 --- a/core/modules/loader/CentralWorker.cc +++ b/core/modules/loader/CentralWorker.cc @@ -36,6 +36,7 @@ #include "loader/WorkerConfig.h" #include "proto/loader.pb.h" #include "proto/ProtoImporter.h" +#include "util/Timer.h" // &&& // LSST headers @@ -50,20 +51,6 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::io_context& io_context_, - std::string const& hostName_, WorkerConfig const& cfg) - : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), - cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), - _hostName(hostName_), - _udpPort(cfg.getWPortUdp()), - _tcpPort(cfg.getWPortTcp()), - _ioContext(io_context_), - _recentAddLimit(cfg.getRecentAddLimit()), - _thresholdNeighborShift(cfg.getThresholdNeighborShift()), - _maxKeysToShift(cfg.getMaxKeysToShift()) { -} -*/ CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::io_context& io_context_, std::string const& hostName_, WorkerConfig const& cfg) : CentralFollower(ioService_, hostName_, cfg.getMasterHost(), cfg.getMasterPortUdp(), @@ -75,14 +62,7 @@ CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::i _maxKeysToShift(cfg.getMaxKeysToShift()) { } -/* &&& -void CentralWorker::start() { - _server = std::make_shared(ioService, _hostName, _udpPort, this); - _tcpServer = std::make_shared(_ioContext, _tcpPort, this); - _tcpServer->runThread(); - _startMonitoring(); -} -*/ + void CentralWorker::startService() { _server = std::make_shared(ioService, _hostName, _udpPort, this); _tcpServer = std::make_shared(_ioContext, _tcpPort, this); @@ -90,11 +70,10 @@ void CentralWorker::startService() { } - CentralWorker::~CentralWorker() { // Members that contain pointers to this. Deleting while this != null. - // &&&_wWorkerList.reset(); - // TODO: wait for reference count to drop to one or less. + // TODO: Wait for reference count to drop to one or less, + // although CentralWorker is never really shutdown. _tcpServer.reset(); } @@ -110,7 +89,6 @@ void CentralWorker::startMonitoring() { CentralFollower::startMonitoring(); // Add _workerList to _doList so it starts checking new entries. _centralWorkerDoListItem = std::make_shared(this); - // &&& doList->addItem(_wWorkerList); doList->addItem(_centralWorkerDoListItem); } @@ -122,7 +100,6 @@ void CentralWorker::_monitor() { if (_isOurIdInvalid()) { _registerWithMaster(); // Give the master a half second to answer. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(500000); return; } @@ -245,7 +222,7 @@ bool CentralWorker::_determineRange() { // Must send the number of bytes in the message so TCP server knows how many bytes to read. bytesInMsg.appendToData(data); strElem.appendToData(data); - ServerTcpBase::writeData(*_rightSocket, data, "detRange"); + ServerTcpBase::writeData(*_rightSocket, data); } // Get back their basic info { @@ -386,7 +363,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { bytesInMsg.appendToData(data); keyShiftReq.appendToData(data); LOGS(_log, LOG_LVL_INFO, fName << " FROMRIGHT " << keysToShift); - ServerTcpBase::writeData(*_rightSocket, data, "_shift FROMRIGHT"); + ServerTcpBase::writeData(*_rightSocket, data); } // Wait for the KeyList response { @@ -417,7 +394,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { data.reset(); UInt32Element elem(LoaderMsg::SHIFT_FROM_RIGHT_RECEIVED); elem.appendToData(data); - ServerTcpBase::writeData(*_rightSocket, data, "shift SHIFT_FROM_RIGHT_RECEIVED"); + ServerTcpBase::writeData(*_rightSocket, data); LOGS(_log, LOG_LVL_INFO, fName << " direction=" << direction << " keys=" << keysToShift); } else if (direction == TORIGHT1) { @@ -460,7 +437,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { LOGS(_log, LOG_LVL_ERROR, errMsg); // This will keep getting thrown and never work, but at least it will show up // in the logs. - // &&& create new exception, catch it and halve the number of keys to shift ??? + // TODO Maybe create new exception, catch it and halve the number of keys to shift? throw LoaderMsgErr(ERR_LOC, errMsg); } kindShiftRight.appendToData(data); @@ -468,7 +445,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { keyList.appendToData(data); LOGS(_log, LOG_LVL_INFO, fName << " TORIGHT sending keys"); - ServerTcpBase::writeData(*_rightSocket, data, "shift TORIGHT sending keys"); + ServerTcpBase::writeData(*_rightSocket, data); // read back LoaderMsg::SHIFT_TO_RIGHT_KEYS_RECEIVED data.reset(); @@ -667,80 +644,6 @@ void CentralWorker::cancelShiftsWithLeftNeighbor() { } } -/* &&& -bool CentralWorker::workerInfoReceive(BufferUdp::Ptr const& data) { - // Open the data protobuffer and add it to our list. - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralWorker::workerInfoReceive&&& ")); - if (sData == nullptr) { - LOGS(_log, LOG_LVL_WARN, "CentralWorker::workerInfoRecieve Failed to parse list"); - return false; - } - std::unique_ptr protoList = sData->protoParse(); - if (protoList == nullptr) { - LOGS(_log, LOG_LVL_WARN, "CentralWorker::workerInfoRecieve Failed to parse list"); - return false; - } - - // TODO: move this call to another thread - _workerInfoReceive(protoList); - return true; -} -*/ - -/* &&& -void CentralWorker::_workerInfoReceive(std::unique_ptr& protoL) { - std::unique_ptr protoList(std::move(protoL)); - - // Check the information, if it is our network address, set or check our id. - // Then compare it with the map, adding new/changed information. - uint32_t wId = protoList->wid(); - std::string ipUdp(""); - int portUdp = 0; - int portTcp = 0; - if (protoList->has_address()) { - proto::LdrNetAddress protoAddr = protoList->address(); - ipUdp = protoAddr.ip(); - portUdp = protoAddr.udpport(); - portTcp = protoAddr.tcpport(); - } - KeyRange strRange; - if (protoList->has_range()) { - proto::WorkerRange protoRange = protoList->range(); - bool valid = protoRange.valid(); - if (valid) { - CompositeKey min(protoRange.minint(), protoRange.minstr()); - CompositeKey max(protoRange.maxint(), protoRange.maxstr()); - bool unlimited = protoRange.maxunlimited(); - strRange.setMinMax(min, max, unlimited); - } - } - - // If the address matches ours, check the name. - if (getHostName() == ipUdp && getUdpPort() == portUdp) { - if (_isOurIdInvalid()) { - LOGS(_log, LOG_LVL_INFO, "Setting our name " << wId); - _setOurId(wId); - } else if (getOurId() != wId) { - LOGS(_log, LOG_LVL_ERROR, "Our wId doesn't match address from master! wId=" << - getOurId() << " from master=" << wId); - } - - // It is this worker. If there is a valid range in the message and our range is not valid, - // take the range given as our own. - if (strRange.getValid()) { - std::lock_guard lckM(_idMapMtx); - if (not _keyRange.getValid()) { - LOGS(_log, LOG_LVL_INFO, "Setting our range " << strRange); - _keyRange.setMinMax(strRange.getMin(), strRange.getMax(), strRange.getUnlimited()); - } - } - } - - // Make/update entry in map. - _wWorkerList->updateEntry(wId, ipUdp, portUdp, portTcp, strRange); -} -*/ - void CentralWorker::checkForThisWorkerValues(uint32_t wId, std::string const& ip, int portUdp, int portTcp, KeyRange& strRange) { @@ -826,6 +729,9 @@ bool CentralWorker::workerKeyInsertReq(LoaderMsg const& inMsg, BufferUdp::Ptr co } +util::Timer lastInsertTimer; // &&& +std::mutex lastInsertTimerMtx; // &&& + void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr& protoBuf) { std::unique_ptr protoData(std::move(protoBuf)); @@ -850,7 +756,17 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr< // Element already found, check file id and row number. Bad if not the same. // TODO HIGH send back duplicate key mismatch message to the original requester and return } - LOGS(_log, LOG_LVL_INFO, "Key inserted=" << key << "(" << chunkInfo << ")"); + { + std::lock_guard tLg(lastInsertTimerMtx); + lastInsertTimer.stop(); + auto elapsedInsert = lastInsertTimer.getElapsed(); + if (elapsedInsert > 0.5) { + LOGS(_log, LOG_LVL_ERROR, "&&& Longdelay key=" << key << " dlay=" << elapsedInsert); + } + // &&& LOGS(_log, LOG_LVL_INFO, "Key inserted=" << key << "(" << chunkInfo << ")"); + LOGS(_log, LOG_LVL_WARN, "&&&INFO Key inserted=" << key << "(" << chunkInfo << ") dlay=" << elapsedInsert); + lastInsertTimer.start(); + } // TODO Send this item to the keyLogger (which would then send KEY_INSERT_COMPLETE back to the requester), // for now this function will send the message back for proof of concept. LoaderMsg msg(LoaderMsg::KEY_INSERT_COMPLETE, inMsg.msgId->element, getHostName(), getUdpPort()); @@ -898,7 +814,7 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L // The proto buffer should be the same, just need a new message. int hops = protoData->hops() + 1; if (hops > 4) { // TODO replace magic number with variable set via config file. - LOGS(_log, LOG_LVL_INFO, "Too many hops, dropping insert request hops=" << hops << " key=" << key); + LOGS(_log, LOG_LVL_WARN, "Too many hops, dropping insert request hops=" << hops << " key=" << key); return; } LOGS(_log, LOG_LVL_INFO, "Forwarding key insert hops=" << hops << " key=" << key); @@ -1004,7 +920,7 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr(msgElem); if (neighborName == nullptr) { return false; @@ -1018,7 +934,7 @@ bool CentralWorker::workerWorkerSetRightNeighbor(LoaderMsg const& inMsg, BufferU bool CentralWorker::workerWorkerSetLeftNeighbor(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { - auto msgElem = MsgElement::retrieve(*data, " CentralWorker::workerWorkerSetLeftNeighbor&&& "); + auto msgElem = MsgElement::retrieve(*data, "CentralWorker::workerWorkerSetLeftNeighbor"); UInt32Element::Ptr neighborName = std::dynamic_pointer_cast(msgElem); if (neighborName == nullptr) { return false; diff --git a/core/modules/loader/CentralWorker.h b/core/modules/loader/CentralWorker.h index d8baaa3a20..14d8360ad8 100644 --- a/core/modules/loader/CentralWorker.h +++ b/core/modules/loader/CentralWorker.h @@ -40,19 +40,11 @@ namespace lsst { namespace qserv { - -namespace proto { -// &&& class WorkerKeysInfo; -// &&& class WorkerListItem; -} - namespace loader { - class CentralWorkerDoListItem; -// &&& class CentralWorker : public Central { class CentralWorker : public CentralFollower { public: typedef std::pair CompKeyPair; @@ -73,16 +65,11 @@ class CentralWorker : public CentralFollower { std::string const& hostName_, WorkerConfig const& cfg); /// Open the UDP and TCP ports and start monitoring. This can throw boost::system::system_error. - // &&&void start(); void startService() override; void startMonitoring() override; ~CentralWorker() override; - // &&& WWorkerList::Ptr getWorkerList() const { return _wWorkerList; } - - // &&& std::string const& getHostName() const { return _hostName; } - // &&& int getUdpPort() const { return _udpPort; } int getTcpPort() const override { return _tcpPort; } uint32_t getOurId() const { @@ -114,9 +101,6 @@ class CentralWorker : public CentralFollower { /// @returns what it thinks the range of the left neighbor should be. KeyRange updateRangeWithLeftData(KeyRange const& strRange); - /// Receive our name from the master. Returns true if successful. - // &&& bool workerInfoReceive(BufferUdp::Ptr const& data) override; - /// Receive a request to insert a key value pair. /// If the key value pair could not be inserted, it tries to forward the request appropriately. /// @Returns true if the request could be parsed. @@ -160,7 +144,6 @@ class CentralWorker : public CentralFollower { friend CentralWorkerDoListItem; protected: - // &&& void _workerInfoReceive(std::unique_ptr& protoBuf) override; ///< see workerInfoReceive() // &&& need this to work properly with new class void checkForThisWorkerValues(uint32_t wId, std::string const& ip, int portUdp, int portTcp, KeyRange& strRange) override; private: @@ -223,23 +206,18 @@ class CentralWorker : public CentralFollower { /// Connect to the right neighbor. Must hold _rightMtx in the lock. void _rightConnect(std::lock_guard const& rightMtxLG); ///< Disconnect from the right neighbor. Must hold _rightMtx in the lock. - void _rightDisconnect(std::lock_guard const& rightMtxLG, std::string const& note); // &&& remove note ?? + void _rightDisconnect(std::lock_guard const& rightMtxLG, std::string const& note); void _cancelShiftsWithRightNeighbor(); ///< Cancel shifts to/from the right neighbor. void _finishShiftToRight(); ///< The shift to the right neighbor is complete, cleanup. - // &&& const std::string _hostName; - // &&& const int _udpPort; const int _tcpPort; boost::asio::io_context& _ioContext; - // &&& WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; ///< Maps of workers. - bool _ourIdInvalid{true}; ///< true until our id has been set by the master. std::atomic _ourId{0}; ///< id given by the master, 0 is invalid id. mutable std::mutex _ourIdMtx; ///< protects _ourIdInvalid, _ourId - KeyRange _keyRange; ///< range for this worker std::atomic _rangeChanged{false}; std::map _keyValueMap; diff --git a/core/modules/loader/ClientServer.cc b/core/modules/loader/ClientServer.cc index 9779381413..704e73b556 100644 --- a/core/modules/loader/ClientServer.cc +++ b/core/modules/loader/ClientServer.cc @@ -82,8 +82,6 @@ BufferUdp::Ptr ClientServer::parseMsg(BufferUdp::Ptr const& data, // following not expected by client case LoaderMsg::KEY_INSERT_REQ: // This is what this client should send out case LoaderMsg::KEY_LOOKUP_REQ: // This is what this client should send out - // &&& case LoaderMsg::MAST_WORKER_INFO: - // &&& case LoaderMsg::MAST_WORKER_LIST: // TODO having the client know would be useful. case LoaderMsg::MAST_INFO: case LoaderMsg::MAST_INFO_REQ: case LoaderMsg::MAST_WORKER_LIST_REQ: @@ -137,7 +135,8 @@ void ClientServer::_msgRecievedHandler(LoaderMsg const& inMsg, BufferUdp::Ptr co bool success = true; // This is only really expected for parsing errors. Most responses to // requests come in as normal messages. - StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "ClientServer::_msgRecievedHandler&&& ")); + StringElement::Ptr seData = + std::dynamic_pointer_cast(MsgElement::retrieve(*data, "ClientServer::_msgRecievedHandler")); if (seData == nullptr) { success = false; } diff --git a/core/modules/loader/DoListItem.cc b/core/modules/loader/DoListItem.cc new file mode 100644 index 0000000000..36d79f7e98 --- /dev/null +++ b/core/modules/loader/DoListItem.cc @@ -0,0 +1,74 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2019 LSST. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + + +// Class header +#include "loader/DoListItem.h" + +// System headers +#include + +// Qserv headers +#include "loader/Central.h" +#include "loader/LoaderMsg.h" +#include "proto/loader.pb.h" + +// LSST headers +#include "lsst/log/Log.h" + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.DoListItem"); + +} + +namespace lsst { +namespace qserv { +namespace loader { + + +util::CommandTracked::Ptr DoListItem::runIfNeeded(TimeOut::TimePoint now) { + std::lock_guard lock(_mtx); + if (_command == nullptr) { + if (_isOneShotDone()) return nullptr; + if ((_needInfo || _timeOut.due(now)) && _timeRateLimit.due(now)) { + _timeRateLimit.triggered(); + // Randomly vary the next rate limit timeout + int rand = (std::rand()/(RAND_MAX/1000)); // 0 to 1000 + rand += std::min(_commandsCreated * 10000, 120000); + auto rateLimitRandom = now + std::chrono::milliseconds(rand); + _timeRateLimit.triggered(rateLimitRandom); + _command = createCommand(); + if (_oneShot) ++_commandsCreated; + LOGS(_log, LOG_LVL_DEBUG, "cCreated=" << _commandsCreated << " rand=" << rand); + return _command; + } + } else if (_command->isFinished()) { + _command.reset(); // Allow the command to be sent again later. + } + return nullptr; + } + + +}}} // namespace lsst:qserv::loader + diff --git a/core/modules/loader/DoListItem.h b/core/modules/loader/DoListItem.h index 60772c1809..bb192d7dae 100644 --- a/core/modules/loader/DoListItem.h +++ b/core/modules/loader/DoListItem.h @@ -98,25 +98,7 @@ class DoListItem : public std::enable_shared_from_this { virtual ~DoListItem() = default; - util::CommandTracked::Ptr runIfNeeded(TimeOut::TimePoint now) { - std::lock_guard lock(_mtx); - if (_command == nullptr) { - if (_isOneShotDone()) return nullptr; - if ((_needInfo || _timeOut.due(now)) && _timeRateLimit.due(now)) { - _timeRateLimit.triggered(); - // Randomly vary the next rate limit timeout - int rand = (std::rand()/(RAND_MAX/100)); // 0 to 500 - auto rateLimitRandom = now + std::chrono::milliseconds(rand); - _timeRateLimit.triggered(rateLimitRandom); - _command = createCommand(); - if (_oneShot) ++_commandsCreated; - return _command; - } - } else if (_command->isFinished()) { - _command.reset(); // Allow the command to be sent again later. - } - return nullptr; - } + util::CommandTracked::Ptr runIfNeeded(TimeOut::TimePoint now); bool isAlreadyOnList() { return _addedToList; } @@ -177,7 +159,7 @@ class DoListItem : public std::enable_shared_from_this { /// If no info is needed, check for info after this period of time. TimeOut _timeOut{std::chrono::minutes(5)}; /// Rate limiter, no more than 1 message every few seconds - TimeOut _timeRateLimit{std::chrono::milliseconds(1500)}; // TODO: DM-17453 set via config + TimeOut _timeRateLimit{std::chrono::milliseconds(7500)}; // TODO: DM-17453 set via config util::CommandTracked::Ptr _command; std::mutex _mtx; ///< protects _timeOut, _timeRequest, _command, _oneShot, _needInfo /// Number of times the command needed to be created. It's only tracked for oneShots as diff --git a/core/modules/loader/LoaderMsg.cc b/core/modules/loader/LoaderMsg.cc index d81643a984..87f3a8deab 100644 --- a/core/modules/loader/LoaderMsg.cc +++ b/core/modules/loader/LoaderMsg.cc @@ -49,28 +49,28 @@ LoaderMsg::LoaderMsg(uint16_t kind, uint64_t id, std::string const& host, uint32 void LoaderMsg::parseFromData(BufferUdp& data) { - MsgElement::Ptr elem = MsgElement::retrieve(data, " 1parseFromData&&& "); + MsgElement::Ptr elem = MsgElement::retrieve(data, "1parseFromData"); msgKind = std::dynamic_pointer_cast(elem); if (msgKind == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for msgKind:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 2parseFromData&&& "); + elem = MsgElement::retrieve(data, "2parseFromData"); msgId = std::dynamic_pointer_cast(elem); if (msgId == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for msgId:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 3parseFromData&&& "); + elem = MsgElement::retrieve(data, "3parseFromData"); senderHost = std::dynamic_pointer_cast(elem); if (senderHost == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for senderHost:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 4parseFromData&&& "); + elem = MsgElement::retrieve(data, "4parseFromData"); senderPort = std::dynamic_pointer_cast(elem); if (senderPort == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for senderPort:" + diff --git a/core/modules/loader/MasterServer.cc b/core/modules/loader/MasterServer.cc index eae9bed4f9..a93bca8509 100644 --- a/core/modules/loader/MasterServer.cc +++ b/core/modules/loader/MasterServer.cc @@ -73,7 +73,7 @@ BufferUdp::Ptr MasterServer::parseMsg(BufferUdp::Ptr const& data, case LoaderMsg::MSG_RECEIVED: // TODO: locate msg id in send messages and take appropriate action break; - case LoaderMsg::MAST_INFO_REQ: + case LoaderMsg::MAST_INFO_REQ: // &&& TODO delete this enum ??? // TODO: sendData = masterInfoRequest(inMsg, data, senderEndpoint); &&& break; case LoaderMsg::MAST_WORKER_LIST_REQ: diff --git a/core/modules/loader/MsgElement.cc b/core/modules/loader/MsgElement.cc index ae9c31ccc4..b8be45e8b9 100644 --- a/core/modules/loader/MsgElement.cc +++ b/core/modules/loader/MsgElement.cc @@ -45,7 +45,7 @@ bool MsgElement::retrieveType(BufferUdp &data, char& elemType) { } -MsgElement::Ptr MsgElement::retrieve(BufferUdp& data, std::string const& note, bool throwOnMissing) { // &&& delete note, maybe, the thrown error is pretty useful. +MsgElement::Ptr MsgElement::retrieve(BufferUdp& data, std::string const& note, bool throwOnMissing) { char elemT; if (not retrieveType(data, elemT)) { LOGS(_log, LOG_LVL_INFO, note << "no type retrieved "); diff --git a/core/modules/loader/MsgElement.h b/core/modules/loader/MsgElement.h index 903cfb9a2a..8682b20e20 100644 --- a/core/modules/loader/MsgElement.h +++ b/core/modules/loader/MsgElement.h @@ -300,7 +300,7 @@ class StringElement : public MsgElement { /// This the case with UDP, and boost asio async reads that return after X bytes read. template static std::unique_ptr protoParse(BufferUdp& data) { - StringElement::Ptr itemData = std::dynamic_pointer_cast(MsgElement::retrieve(data, "protoParse &&&")); + StringElement::Ptr itemData = std::dynamic_pointer_cast(MsgElement::retrieve(data, "protoParse")); if (itemData == nullptr) { return nullptr; } return itemData->protoParse(); } diff --git a/core/modules/loader/NetworkAddress.cc b/core/modules/loader/NetworkAddress.cc index cdb8a78144..db31494545 100644 --- a/core/modules/loader/NetworkAddress.cc +++ b/core/modules/loader/NetworkAddress.cc @@ -50,7 +50,7 @@ namespace loader { NetworkAddress::UPtr NetworkAddress::create(BufferUdp::Ptr const& bufData, int& tcpPort, std::string const& note) { - StringElement::Ptr data = std::dynamic_pointer_cast(MsgElement::retrieve(*bufData, " NetworkAddress::create&&& ")); + StringElement::Ptr data = std::dynamic_pointer_cast(MsgElement::retrieve(*bufData, "NetworkAddress::create")); if (data == nullptr) { LOGS(_log, LOG_LVL_WARN, "NetworkAddress::create data==nullptr " + note); diff --git a/core/modules/loader/ServerTcpBase.cc b/core/modules/loader/ServerTcpBase.cc index bc7310cf7d..81b468dcdb 100644 --- a/core/modules/loader/ServerTcpBase.cc +++ b/core/modules/loader/ServerTcpBase.cc @@ -68,9 +68,8 @@ void ServerTcpBase::_startAccept() { } -bool ServerTcpBase::writeData(AsioTcp::socket& socket, BufferUdp& data, std::string note) { +bool ServerTcpBase::writeData(AsioTcp::socket& socket, BufferUdp& data) { while (data.getBytesLeftToRead() > 0) { - LOGS(_log, LOG_LVL_INFO, note << " &&& write data bytesLeft=" << data.getBytesLeftToRead()); // Read cursor advances (manually in this case) as data is read from the buffer. auto res = boost::asio::write(socket, boost::asio::buffer(data.getReadCursor(), data.getBytesLeftToRead())); @@ -115,7 +114,7 @@ bool ServerTcpBase::testConnect() { kind.appendToData(data); UInt32Element bytes(1234); // dummy value bytes.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); // send back our name and left neighbor message. data.reset(); @@ -125,7 +124,7 @@ bool ServerTcpBase::testConnect() { ourName.appendToData(data); UInt64Element valuePairCount(testNewNodeValuePairCount); valuePairCount.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); // Get back left neighbor information auto msgKind = std::dynamic_pointer_cast( @@ -159,7 +158,7 @@ bool ServerTcpBase::testConnect() { data.reset(); UInt32Element verified(LoaderMsg::NEIGHBOR_VERIFIED); verified.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); boost::system::error_code ec; socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); @@ -239,14 +238,14 @@ void TcpBaseConnection::_recvKind(const boost::system::error_code& ec, size_t by } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " 1TcpBaseConnection::_recvKind&&& "); // &&& should all tcp stuff be safe retrieve? + auto msgElem = MsgElement::retrieve(_buf, "1TcpBaseConnection::_recvKind"); auto msgKind = std::dynamic_pointer_cast(msgElem); if (msgKind == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_recvKind unexpected type of msg"); _freeConnect(); return; } - msgElem = MsgElement::retrieve(_buf, " 2TcpBaseConnection::_recvKind&&& "); + msgElem = MsgElement::retrieve(_buf, "2TcpBaseConnection::_recvKind"); auto msgBytes = std::dynamic_pointer_cast(msgElem); if (msgBytes == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_recvKind missing bytes"); @@ -310,11 +309,11 @@ void TcpBaseConnection::_handleTest2(const boost::system::error_code& ec, size_t } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + auto msgElem = MsgElement::retrieve(_buf, "_handleTest2_a"); auto msgKind = std::dynamic_pointer_cast(msgElem); - msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + msgElem = MsgElement::retrieve(_buf, "_handleTest2_b"); auto msgName = std::dynamic_pointer_cast(msgElem); - msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + msgElem = MsgElement::retrieve(_buf, " _handleTest2_c"); auto msgKeys = std::dynamic_pointer_cast(msgElem); // TODO move most of this to CentralWorker @@ -373,7 +372,7 @@ void TcpBaseConnection::_handleTest2c(const boost::system::error_code& ec, size_ } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " _handleTest2c&&& "); + auto msgElem = MsgElement::retrieve(_buf, "_handleTest2c"); if (msgElem == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_handleTest2b Kind nullptr error"); _freeConnect(); @@ -443,7 +442,7 @@ void TcpBaseConnection::_handleImYourLNeighbor1(boost::system::error_code const& CompositeKey maxKey(protoRange.maxint(), protoRange.maxstr()); bool unlimited = protoRange.maxunlimited(); leftRange.setMinMax(minKey, maxKey, unlimited); - LOGS(_log, LOG_LVL_WARN, funcName << " leftRange=" << leftRange); + LOGS(_log, LOG_LVL_INFO, funcName << " leftRange=" << leftRange); newLeftRange = _serverTcpBase->getCentralWorker()->updateRangeWithLeftData(leftRange); } proto::Neighbor protoLeftNeigh = protoItem->left(); @@ -466,7 +465,7 @@ void TcpBaseConnection::_handleImYourLNeighbor1(boost::system::error_code const& // Send the number of bytes in the message so TCP client knows how many bytes to read. bytesInMsg.appendToData(_buf); strWKI.appendToData(_buf); - ServerTcpBase::writeData(_socket, _buf, "shift _handleImYourLNeighbor1"); + ServerTcpBase::writeData(_socket, _buf); LOGS(_log, LOG_LVL_INFO, funcName << " done"); } catch (LoaderMsgErr const& ex) { LOGS(_log, LOG_LVL_ERROR, funcName << " Buffer failed " << ex.what()); @@ -540,7 +539,7 @@ void TcpBaseConnection::_handleShiftToRight1(boost::system::error_code const& ec _buf.reset(); UInt32Element elem(LoaderMsg::SHIFT_TO_RIGHT_RECEIVED); elem.appendToData(_buf); - ServerTcpBase::writeData(_socket, _buf, "_shift SHIFT_TO_RIGHT_RECEIVED"); + ServerTcpBase::writeData(_socket, _buf); LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " << _serverTcpBase->getCentralWorker()->dumpKeysStr(2)); } catch (LoaderMsgErr const& ex) { @@ -595,7 +594,7 @@ void TcpBaseConnection::_handleShiftFromRight1(boost::system::error_code const& } // Extract keysToShift from the protobuffer int keyShiftReq = protoKeyShiftReq->keystoshift(); - LOGS(_log, LOG_LVL_INFO, fName << " &&& keystoshift=" << keyShiftReq); + LOGS(_log, LOG_LVL_INFO, fName << " keystoshift=" << keyShiftReq); if (keyShiftReq < 1) { throw LoaderMsgErr(ERR_LOC, " KeyShiftRequest for < 1 key"); } @@ -610,11 +609,11 @@ void TcpBaseConnection::_handleShiftFromRight1(boost::system::error_code const& LOGS(_log, LOG_LVL_ERROR, errMsg); // This will keep getting thrown and never work, but at least it will show up // in the logs. - // &&& create new exception, catch it and halve the number of keys to shift ??? + // TODO create new exception, catch it and halve the number of keys to shift ??? throw LoaderMsgErr(ERR_LOC, errMsg); } keyList->appendToData(data); - ServerTcpBase::writeData(_socket, data, std::string(" &&& _handleShiftFromRight1 " + data.dumpStr(false))); + ServerTcpBase::writeData(_socket, data); // Wait for the SHIFT_FROM_RIGHT_KEYS_RECEIVED response back. _buf.reset(); diff --git a/core/modules/loader/ServerTcpBase.h b/core/modules/loader/ServerTcpBase.h index 6be3acb9d0..dd0f1b2901 100644 --- a/core/modules/loader/ServerTcpBase.h +++ b/core/modules/loader/ServerTcpBase.h @@ -147,7 +147,7 @@ class ServerTcpBase { CentralWorker* getCentralWorker() const { return _centralWorker; } - static bool writeData(AsioTcp::socket& socket, BufferUdp& data, std::string note); // &&& delete note + static bool writeData(AsioTcp::socket& socket, BufferUdp& data); private: void _startAccept(); diff --git a/core/modules/loader/ServerUdpBase.cc b/core/modules/loader/ServerUdpBase.cc index 1bfa533e1e..0eab1316e8 100644 --- a/core/modules/loader/ServerUdpBase.cc +++ b/core/modules/loader/ServerUdpBase.cc @@ -87,7 +87,16 @@ void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUd using namespace boost::asio; LOGS(_log, LOG_LVL_DEBUG, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port); try { - ip::udp::endpoint dest = resolve(hostName, port); + NetworkAddress addr(hostName, port); + ip::udp::endpoint dest; + auto iter = _resolvMap.find(addr); + if (iter == _resolvMap.end()) { + dest = resolve(hostName, port); // may throw boost::system::system_error + _resolvMap[addr] = dest; + } else { + // TODO if the entry is old, call resolv to freshen. + dest = iter->second; + } _socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest); } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::sendBufferTo boost system_error=" << e.what() << @@ -130,6 +139,7 @@ boost::asio::ip::udp::endpoint ServerUdpBase::resolve(std::string const& hostNam using namespace boost::asio; // Resolver returns an iterator. This uses the first item only. // Failure to resolve anything throws a boost::system::error. + // There's a 5 second timeout, which is extremely painful and frequent. ip::udp::endpoint dest = *_resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); return dest; diff --git a/core/modules/loader/ServerUdpBase.h b/core/modules/loader/ServerUdpBase.h index e10e29a55d..46c6d65fb7 100644 --- a/core/modules/loader/ServerUdpBase.h +++ b/core/modules/loader/ServerUdpBase.h @@ -32,6 +32,7 @@ // Qserv headers #include "loader/BufferUdp.h" +#include "loader/NetworkAddress.h" namespace lsst { namespace qserv { @@ -87,6 +88,10 @@ class ServerUdpBase { std::string _hostName; int _port; + /// Map and mutex to store ip destinations + // TODO: add occasional checks to see if addresses changed + std::map _resolvMap; + /// Items for resolving UDP addresses /// There appear to be concurrency issues even with /// separate io_contexts, so re-using existing objects. diff --git a/core/modules/loader/Util.cc b/core/modules/loader/Util.cc index dd01b61d87..4a906be992 100644 --- a/core/modules/loader/Util.cc +++ b/core/modules/loader/Util.cc @@ -77,7 +77,7 @@ std::vector split(std::string const& in, std::function } -/// Test to be put in unit tests &&& +/// TODO Test to be put in unit tests bool splitTest() { auto out = split("www.github.com", [](char c) {return c == '.';}); auto test = (out[0] == "www" && out[1] == "github" && out[2] == "com"); diff --git a/core/modules/loader/WWorkerList.cc b/core/modules/loader/WWorkerList.cc index f60e25b7c0..3b82c6b042 100644 --- a/core/modules/loader/WWorkerList.cc +++ b/core/modules/loader/WWorkerList.cc @@ -110,7 +110,7 @@ bool WWorkerList::workerListReceive(BufferUdp::Ptr const& data) { std::string const funcName("WWorkerList::workerListReceive"); LOGS(_log, LOG_LVL_INFO, funcName << " data=" << data->dumpStr()); // Open the data protobuffer and add it to our list. - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " WWorkerList::workerListReceive&&& ")); + StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "WWorkerList::workerListReceive")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, funcName << " Failed to parse list"); return false; @@ -142,7 +142,7 @@ bool WWorkerList::workerListReceive(BufferUdp::Ptr const& data) { strNames += std::to_string(wId) + ","; item->addDoListItems(_central); } - // TODO: Should this call updateEntry() to fill in the information for the worker? &&& + // TODO: Should this call updateEntry() to fill in the information for the worker? } sizeChange = _wIdMap.size() - initialSize; if (sizeChange > 0) { diff --git a/core/modules/loader/WWorkerList.h b/core/modules/loader/WWorkerList.h index 6bb14b0f6e..bcd9b2bcf9 100644 --- a/core/modules/loader/WWorkerList.h +++ b/core/modules/loader/WWorkerList.h @@ -41,7 +41,6 @@ namespace qserv { namespace loader { class CentralFollower; -//class CentralWorker; // &&& class LoaderMsg; diff --git a/core/modules/loader/WorkerServer.cc b/core/modules/loader/WorkerServer.cc index 6c8325b9c5..92923fbd67 100644 --- a/core/modules/loader/WorkerServer.cc +++ b/core/modules/loader/WorkerServer.cc @@ -137,7 +137,7 @@ void WorkerServer::_msgRecieved(LoaderMsg const& inMsg, BufferUdp::Ptr const& da bool success = true; // This is only really expected for parsing errors. Most responses to // requests come in as normal messages. - StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " WorkerServer::_msgRecieved&&& ")); + StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "WorkerServer::_msgRecieved")); if (seData == nullptr) { success = false; } diff --git a/core/modules/loader/appClientNum.cc b/core/modules/loader/appClientNum.cc index 05b88159a8..0afdc23f9a 100644 --- a/core/modules/loader/appClientNum.cc +++ b/core/modules/loader/appClientNum.cc @@ -122,8 +122,39 @@ KeyInfoData::Ptr clientAddLookup(CentralClient& central, uint64_t j) { return central.keyLookupReq(cKey); } +std::string bitsStr(uint64_t in) { + std::string str; + uint64_t const bits = sizeof(in) * 8; + for (uint64_t j=bits; j>0; --j) { + uint64_t const base = 1; + if ((base << (j - 1)) & in) { + str += "1"; + } else { + str += "0"; + } + } + return str; +} + + +uint64_t reverseBits(uint64_t in) { + uint64_t out = 0; + uint64_t const bits = sizeof(in) * 8; + std::cout << "bits=" << bits << std::endl;; + for (uint64_t j=0; j " << bitsStr(out)); + return out; +} + int main(int argc, char* argv[]) { + + bool reverse = true; // When true, reverse bits before inserting or looking up. std::string cCfgFile("core/modules/loader/config/client1.cnf"); if (argc < 3) { LOGS(_log, LOG_LVL_ERROR, "usage: appClientNum "); @@ -141,7 +172,6 @@ int main(int argc, char* argv[]) { } - //std::string const ourHost = boost::asio::ip::host_name(); &&& std::string const ourHost = getOurHostName(0); LOGS(_log, LOG_LVL_INFO, "ourHost=" << ourHost); boost::asio::io_service ioService; @@ -170,14 +200,16 @@ int main(int argc, char* argv[]) { if (numEnd >= numStart) { totalKeyCount = (numEnd - numStart) + 1; for (uint64_t j=numStart; j<=numEnd; ++j) { - kList.push_back(clientAdd(cClient, j)); + uint64_t key = (reverse) ? reverseBits(j) : j; + kList.push_back(clientAdd(cClient, key)); // occasionally trim the list if (j%modInsertCheck == 0) keyInsertListClean(kList, successCount, failedCount); } } else { totalKeyCount = (numStart - numEnd) + 1; for (uint64_t j=numStart; j>=numEnd; --j) { - kList.push_back(clientAdd(cClient, j)); + uint64_t key = (reverse) ? reverseBits(j) : j; + kList.push_back(clientAdd(cClient, key)); // occasionally trim the list if (j%modInsertCheck == 0) keyInsertListClean(kList, successCount, failedCount); } @@ -191,7 +223,6 @@ int main(int argc, char* argv[]) { if (waitForKeysCount > maxWaitCount) maxWaitCount = waitForKeysCount; while (!keyInsertListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for inserts to finish count=" << count); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(1); ++count; } @@ -229,7 +260,8 @@ int main(int argc, char* argv[]) { int modLookupCheck = cClient.getDoListMaxLookups()/4; if (modLookupCheck < 1) modLookupCheck = 1; for (uint64_t j=nStart; j<=nEnd; ++j) { - kList.push_back(clientAddLookup(cClient, j)); + uint64_t key = (reverse) ? reverseBits(j) : j; + kList.push_back(clientAddLookup(cClient, key)); // occasionally trim the list if (j%modLookupCheck == 0) keyLookupListClean(kList, successCount, failedCount); } @@ -239,7 +271,6 @@ int main(int argc, char* argv[]) { // About 1 second per 1000 keys) while (!keyLookupListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for lookups to finish count=" << count); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(1); ++count; } @@ -259,18 +290,18 @@ int main(int argc, char* argv[]) { return 1; } - LOGS(_log, LOG_LVL_INFO, "lookup all elements. success=" << successCount << + LOGS(_log, LOG_LVL_WARN, "lookup all elements. success=" << successCount << " failed=" << failedCount << " size=" << kList.size()); TimeOut::TimePoint lookupEnd = TimeOut::Clock::now(); - LOGS(_log, LOG_LVL_INFO, "DONE inserts seconds=" << + LOGS(_log, LOG_LVL_WARN, "DONE inserts seconds=" << std::chrono::duration_cast(insertEnd - insertBegin).count()); - LOGS(_log, LOG_LVL_INFO, "DONE lookups seconds=" << + LOGS(_log, LOG_LVL_WARN, "DONE lookups seconds=" << std::chrono::duration_cast(lookupEnd - insertEnd).count()); ioService.stop(); - LOGS(_log, LOG_LVL_INFO, "client DONE"); - while(true) sleep(100); // &&& keep kubernetes from restarting this + LOGS(_log, LOG_LVL_WARN, "client DONE"); + while(true) sleep(100); // prevent kubernetes from restarting this TODO: make this program run as a job. return 0; } diff --git a/core/modules/loader/appMaster.cc b/core/modules/loader/appMaster.cc index a5242eb760..f141f70b59 100644 --- a/core/modules/loader/appMaster.cc +++ b/core/modules/loader/appMaster.cc @@ -47,7 +47,6 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "masterCfg=" << mCfgFile); - //std::string const ourHost = boost::asio::ip::host_name(); &&& std::string const ourHost = getOurHostName(0); LOGS(_log, LOG_LVL_INFO, "ourHost=" << ourHost); boost::asio::io_service ioService; @@ -64,7 +63,6 @@ int main(int argc, char* argv[]) { bool loop = true; while(loop) { - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(10); } ioService.stop(); diff --git a/core/modules/loader/appTest.cc b/core/modules/loader/appTest.cc index 00d96bc2de..29dc27a143 100644 --- a/core/modules/loader/appTest.cc +++ b/core/modules/loader/appTest.cc @@ -220,7 +220,6 @@ int main(int argc, char* argv[]) { server.testConnect(); LOGS(_log, LOG_LVL_INFO, "ServTcpBase e"); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(5); } catch (std::exception const& e) { @@ -373,7 +372,6 @@ int main(int argc, char* argv[]) { auto originalErrCount = wCentral1.getErrCount(); LOGS(_log, LOG_LVL_INFO, "1TSTAGE testSendBadMessage start"); wCentral1.testSendBadMessage(); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(2); // TODO handshaking instead of sleep if (originalErrCount == wCentral1.getErrCount()) { @@ -383,7 +381,6 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "sleeping"); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(5); // TODO change to 20 second timeout with a check every 0.1 seconds. // The workers should agree on the worker list, and it should have 2 elements. if (wCentral1.getWorkerList()->getIdMapSize() == 0) { diff --git a/core/modules/loader/appWorker.cc b/core/modules/loader/appWorker.cc index 0528fa0bfd..bd2300d7d2 100644 --- a/core/modules/loader/appWorker.cc +++ b/core/modules/loader/appWorker.cc @@ -24,9 +24,6 @@ // System headers #include #include -//#include // &&& -//#include // &&& -//#include // &&& // qserv headers #include "loader/CentralWorker.h" @@ -53,169 +50,7 @@ int main(int argc, char* argv[]) { boost::asio::io_service ioService; boost::asio::io_context ioContext; - /* &&& - if (!splitTest()) { - LOGS(_log, LOG_LVL_ERROR, "split test failed! &&&"); - exit(1); - } - - - std::string const ourHost = boost::asio::ip::host_name(); - std::string ourHostIp; - LOGS(_log, LOG_LVL_INFO, "ourHost=" << ourHost); - boost::asio::io_service ioService; - boost::asio::io_context ioContext; - - { - char hostbuffer[256]; - char *IPbuffer; - struct hostent *host_entry; - int hostname; - hostname = gethostname(hostbuffer, sizeof(hostbuffer)); - - // To retrieve host information - //host_entry = gethostbyname(hostbuffer); - host_entry = gethostbyname(ourHost.c_str()); - - // To convert an Internet network - // address into ASCII string - IPbuffer = inet_ntoa(*((struct in_addr*) // &&& replace with inet_ntop - host_entry->h_addr_list[0])); - LOGS(_log, LOG_LVL_ERROR, "hostname=" << hostname << " buf=" << hostbuffer); - LOGS(_log, LOG_LVL_ERROR, "host_entry=" << host_entry); - LOGS(_log, LOG_LVL_ERROR, "IPbuffer=" << IPbuffer); - ourHostIp = IPbuffer; - - //gethostbyaddr(); &&& - hostent *he; - in_addr ipv4addr; - //in6_addr ipv6addr; - - inet_pton(AF_INET, ourHostIp.c_str(), &ipv4addr); - he = gethostbyaddr(&ipv4addr, sizeof ipv4addr, AF_INET); - if (he == nullptr) { - printf("he == nullptr\n"); - } else { - printf("Host name: %s\n", he->h_name); - LOGS(_log, LOG_LVL_INFO, " host name=" << he->h_name); // *** this gets the correct full name - - for(int i=0; he->h_aliases[i] != NULL; ++i) { - LOGS(_log, LOG_LVL_INFO, std::to_string(i) << " host=" << he->h_aliases[i]); - } - } - - //inet_pton(AF_INET6, "2001:db8:63b3:1::beef", &ipv6addr); - //he = gethostbyaddr(&ipv6addr, sizeof ipv6addr, AF_INET6); - //printf("Host name: %s\n", he->h_name); - - } - - - { - addrinfo hints, *info, *p; - int gai_result; - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - //std::string hostname("127.0.0.1"); - printf("hostname0: %s\n", hostname); - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - //hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - - //if ((gai_result = getaddrinfo(hostname, "http", &hints, &info)) != 0) { - if ((gai_result = getaddrinfo(hostname, NULL, &hints, &info)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(gai_result)); - exit(1); - } - - for(p = info; p != NULL; p = p->ai_next) { - printf("hostname1: %s\n", p->ai_canonname); // ** correct - LOGS(_log, LOG_LVL_INFO, "*a*hostname1: " << p->ai_canonname); - } - - freeaddrinfo(info); - - } - - { - addrinfo hints, *info, *p; - int gai_result; - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - //std::string hostname("127.0.0.1"); - printf("hostname0: %s\n", hostname); - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - //hints.ai_socktype = SOCK_STREAM; - //hints.ai_flags = AI_CANONNAME; - - //if ((gai_result = getaddrinfo(hostname, "http", &hints, &info)) != 0) { - if ((gai_result = getaddrinfo(hostname, NULL, &hints, &info)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(gai_result)); - exit(1); - } - - for(p = info; p != NULL; p = p->ai_next) { - printf("hostname1: %s\n", p->ai_canonname); - LOGS(_log, LOG_LVL_INFO, "*b*hostname1: " << p->ai_canonname); - } - - freeaddrinfo(info); - - } - - { - addrinfo hints; - addrinfo *infoptr; - hints.ai_family = AF_INET; // AF_INET means IPv4 only addresses - - //int result = getaddrinfo("www.bbc.com", NULL, &hints, &infoptr); - //int result = getaddrinfo("127.0.0.1", NULL, &hints, &infoptr); // results in hostname "localhost" - int result = getaddrinfo(ourHostIp.c_str(), NULL, &hints, &infoptr); - if (result) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(result)); - exit(1); - } - - - struct addrinfo *p; - char host[1024]; - - for (p = infoptr; p != NULL; p = p->ai_next) { - //getnameinfo(p->ai_addr, p->ai_addrlen, host, sizeof (host), NULL, 0, NI_NUMERICHOST); - getnameinfo(p->ai_addr, p->ai_addrlen, host, sizeof (host), NULL, 0, 0); - LOGS(_log, LOG_LVL_INFO, "getnameinfo host=" << host << " addr=" << p->ai_addr); // ** correct - } - - freeaddrinfo(infoptr); - - - - } - - - auto hostN2 = getOurHostName(2); - LOGS(_log, LOG_LVL_INFO, "hostN2=" << hostN2); - auto hostN1 = getOurHostName(1); - LOGS(_log, LOG_LVL_INFO, "hostN1=" << hostN1); - auto hostN0 = getOurHostName(0); - LOGS(_log, LOG_LVL_INFO, "hostN0=" << hostN0); - auto hostN10 = getOurHostName(10); - LOGS(_log, LOG_LVL_INFO, "hostN10=" << hostN10); - auto hostN4 = getOurHostName(4); - LOGS(_log, LOG_LVL_INFO, "hostN4=" << hostN4); - - //exit(1); -*/ - - std::string ourHostName = getOurHostName(0); // change to return shortest name that resolves. &&& + std::string ourHostName = getOurHostName(0); LOGS(_log, LOG_LVL_INFO, "ourHostName=" << ourHostName); WorkerConfig wCfg(wCfgFile); @@ -230,7 +65,6 @@ int main(int argc, char* argv[]) { bool loop = true; while(loop) { - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(10); } ioService.stop(); // this doesn't seem to work cleanly diff --git a/core/modules/loader/config/client-k8s-a1.cnf b/core/modules/loader/config/client-k8s-a1.cnf index aa0df4d47d..630403cd79 100644 --- a/core/modules/loader/config/client-k8s-a1.cnf +++ b/core/modules/loader/config/client-k8s-a1.cnf @@ -18,16 +18,16 @@ clientPortUdp = 10050 loopSleepTime = 50000 # Maximum number of lookups that can be on the DoList at a time. -maxLookups = 20 +maxLookups = 5000 # Maximum number of inserts that can be on the DoList at a time. -maxInserts = 20 +maxInserts = 10000 # How long to sleep before checking if any lookups or inserts have completed. maxRequestSleepTime = 10000 # Client thread pool size -threadPoolSize = 10 +threadPoolSize = 20 # IO threads -iOThreads = 5 +iOThreads = 200 diff --git a/core/modules/loader/config/client-k8s-a2.cnf b/core/modules/loader/config/client-k8s-a2.cnf index da9118b180..a61dd36199 100644 --- a/core/modules/loader/config/client-k8s-a2.cnf +++ b/core/modules/loader/config/client-k8s-a2.cnf @@ -18,16 +18,16 @@ clientPortUdp = 10050 loopSleepTime = 50000 # Maximum number of lookups that can be on the DoList at a time. -maxLookups = 20 +maxLookups = 5000 # Maximum number of inserts that can be on the DoList at a time. -maxInserts = 20 +maxInserts = 10000 # How long to sleep before checking if any lookups or inserts have completed. maxRequestSleepTime = 10000 # Client thread pool size -threadPoolSize = 10 +threadPoolSize = 20 # IO threads -iOThreads = 5 +iOThreads = 200 diff --git a/core/modules/loader/config/client-k8s-a3.cnf b/core/modules/loader/config/client-k8s-a3.cnf index a741f47828..4dd5cd091e 100644 --- a/core/modules/loader/config/client-k8s-a3.cnf +++ b/core/modules/loader/config/client-k8s-a3.cnf @@ -18,16 +18,16 @@ clientPortUdp = 10050 loopSleepTime = 50000 # Maximum number of lookups that can be on the DoList at a time. -maxLookups = 20 +maxLookups = 5000 # Maximum number of inserts that can be on the DoList at a time. -maxInserts = 20 +maxInserts = 10000 # How long to sleep before checking if any lookups or inserts have completed. maxRequestSleepTime = 10000 # Client thread pool size -threadPoolSize = 10 +threadPoolSize = 20 # IO threads -iOThreads = 5 +iOThreads = 200 diff --git a/core/modules/loader/config/worker-k8s-a.cnf b/core/modules/loader/config/worker-k8s-a.cnf index c3c6c9edae..dfaf8ad04c 100644 --- a/core/modules/loader/config/worker-k8s-a.cnf +++ b/core/modules/loader/config/worker-k8s-a.cnf @@ -10,7 +10,10 @@ wPortUdp = 10043 wPortTcp = 10143 # Worker thread pool size -threadPoolSize = 10 +threadPoolSize = 30 + +# IO threads +iOThreads = 100 # Period of time where a key insert is considered recent in milliseconds recentAddLimit = 60000 @@ -18,7 +21,7 @@ recentAddLimit = 60000 # Difference in number of keys stored between neighbors # A value of 1.2 would cause a shift if either worker had # 20% more keys than its neighbor. -thresholdNeighborShift = 1.1 +thresholdNeighborShift = 1.05 # Maximum number of keys to shift in a single iteration. # An iteration would be transfer, insert, verify range. @@ -26,5 +29,5 @@ maxKeysToShift = 10000 # Time to sleep between checking every item in the DoList # in microseconds. -loopSleepTime = 100000 +loopSleepTime = 50000 diff --git a/core/modules/loader/config/worker1.cnf b/core/modules/loader/config/worker1.cnf index e0ea3f1fdf..fafa427299 100644 --- a/core/modules/loader/config/worker1.cnf +++ b/core/modules/loader/config/worker1.cnf @@ -10,7 +10,7 @@ wPortUdp = 10043 wPortTcp = 10143 # Worker thread pool size -threadPoolSize = 10 +threadPoolSize = 50 # Period of time where a key insert is considered recent in milliseconds recentAddLimit = 60000 diff --git a/core/modules/loader/config/worker2.cnf b/core/modules/loader/config/worker2.cnf index c3c2f6e45d..73d4b512e3 100644 --- a/core/modules/loader/config/worker2.cnf +++ b/core/modules/loader/config/worker2.cnf @@ -10,7 +10,7 @@ wPortUdp = 10044 wPortTcp = 10144 # Worker thread pool size -threadPoolSize = 10 +threadPoolSize = 50 # Period of time where a key insert is considered recent in milliseconds recentAddLimit = 60000 diff --git a/core/modules/loader/config/worker3.cnf b/core/modules/loader/config/worker3.cnf index 7d83a3ac40..cc45d8d5ec 100644 --- a/core/modules/loader/config/worker3.cnf +++ b/core/modules/loader/config/worker3.cnf @@ -10,7 +10,7 @@ wPortUdp = 10045 wPortTcp = 10145 # Worker thread pool size -threadPoolSize = 10 +threadPoolSize = 50 # Period of time where a key insert is considered recent in milliseconds recentAddLimit = 60000