diff --git a/dali/base/dacoven.cpp b/dali/base/dacoven.cpp index 9f2e08e7bc8..1ba5324da8c 100644 --- a/dali/base/dacoven.cpp +++ b/dali/base/dacoven.cpp @@ -331,10 +331,10 @@ class CCovenBase: implements ICoven, public CInterface return comm->barrier(); } - virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) + virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5, bool allowConnect=true) { assertex(comm); - return comm->verifyConnection(rank,timeout); + return comm->verifyConnection(rank,timeout, allowConnect); } @@ -768,10 +768,10 @@ class CCovenServer: public CCovenBase updateDataStore(); } - virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false) + virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) { assertex(comm); - return comm->send(mbuf,dstrank,tag,timeout,skipDisconnected); + return comm->send(mbuf,dstrank,tag,timeout); } bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER) @@ -912,11 +912,11 @@ class CCovenClient: public CCovenBase assertex(!"setInitSDSNodes not allowed in client"); } - virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false) + virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) { assertex(comm); try { - return comm->send(mbuf,dstrank,tag,timeout,skipDisconnected); + return comm->send(mbuf,dstrank,tag,timeout); } CATCH_MPERR_link_closed } diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index d8fa817ad27..cf38e87afe9 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -2981,7 +2981,7 @@ class CCommunicator: public ICommunicator, public CInterface IMPLEMENT_IINTERFACE; - bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout, bool skipDisconnected) + bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout) { // send does not corrupt mbuf if (dstrank==RANK_NULL) @@ -2996,7 +2996,7 @@ class CCommunicator: public ICommunicator, public CInterface CTimeMon tm(timeout); rank_t endrank; if (dstrank==RANK_ALL) { - send(mbuf,myrank,tag,timeout,false); + send(mbuf,myrank,tag,timeout); dstrank = RANK_ALL_OTHER; } if (dstrank==RANK_ALL_OTHER) { @@ -3023,8 +3023,6 @@ class CCommunicator: public ICommunicator, public CInterface unsigned remaining; if (tm.timedout(&remaining)) return false; - if (skipDisconnected && !channel->isConnected()) - continue; if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false)) return false; } @@ -3065,7 +3063,7 @@ class CCommunicator: public ICommunicator, public CInterface mb.clear(); mb.append("MPTAG_BARRIER"); - bool oks = send(mb,dst,MPTAG_BARRIER,120000,false); + bool oks = send(mb,dst,MPTAG_BARRIER,120000); mb.clear(); bool okr = recv(mb,src,MPTAG_BARRIER,&r); @@ -3083,7 +3081,7 @@ class CCommunicator: public ICommunicator, public CInterface #endif } - bool verifyConnection(rank_t rank, unsigned timeout) + bool verifyConnection(rank_t rank, unsigned timeout, bool allowConnect=true) { CriticalBlock block(verifysect); assertex(rank!=RANK_RANDOM); @@ -3093,7 +3091,7 @@ class CCommunicator: public ICommunicator, public CInterface unsigned remaining; if (tm.timedout(&remaining)) return false; - return channel->verifyConnection(tm,true); + return channel->verifyConnection(tm,allowConnect); } bool verifyAll(bool duplex, unsigned totalTimeout, unsigned perConnectionTimeout) @@ -3273,7 +3271,7 @@ class CCommunicator: public ICommunicator, public CInterface unsigned remaining; if (tm.timedout(&remaining)) return false; - if (!send(mbuff,sendrank,sendtag,remaining,false)||tm.timedout(&remaining)) + if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining)) return false; mbuff.clear(); return recv(mbuff,sendrank,replytag,NULL,remaining); @@ -3284,7 +3282,7 @@ class CCommunicator: public ICommunicator, public CInterface mptag_t replytag = mbuf.getReplyTag(); rank_t dstrank = group->rank(mbuf.getSender()); if (dstrank!=RANK_NULL) { - if (send (mbuf, dstrank, replytag,timeout,false)) { + if (send (mbuf, dstrank, replytag,timeout)) { mbuf.setReplyTag(TAG_NULL); return true; } diff --git a/system/mp/mpcomm.hpp b/system/mp/mpcomm.hpp index f242758a4be..4825640e071 100644 --- a/system/mp/mpcomm.hpp +++ b/system/mp/mpcomm.hpp @@ -34,7 +34,7 @@ const unsigned MPVerboseMsgThreshold = 110; // greater than default logging deta interface ICommunicator: extends IInterface { - virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false) = 0; + virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) = 0; // blocking send (unless MP_ASYNC_SEND used for timeout), NB, mbuf clear on exit // returns false if timedout @@ -54,7 +54,7 @@ interface ICommunicator: extends IInterface virtual void flush (mptag_t tag) = 0; // flushes pending buffers - virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) = 0; // verifies connected to rank + virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5, bool allowConnect=true) = 0; // verifies connected to rank virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30, unsigned perConnectionTimeout=0) = 0; virtual void disconnect(INode *node) = 0; virtual void barrier() = 0; diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 71a8e64e7c3..bc87c51f4d9 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -17,6 +17,9 @@ #include #include +#include +#include +#include #include "jprop.hpp" #include "jexcept.hpp" #include "jiter.ipp" @@ -1599,7 +1602,7 @@ void CJobMaster::freeMPTag(mptag_t tag) } } -void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly, bool skipDisconnected) +void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly, bool aborting) { unsigned groupSizeExcludingMaster = comm.queryGroup().ordinality() - 1; @@ -1609,45 +1612,29 @@ void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mpt replyTag = queryJobChannel(0).queryMPServer().createReplyTag(); msg.setReplyTag(replyTag); } - if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging. - { - class CSendAsyncfor : public CAsyncFor - { - CMessageBuffer &msg; - mptag_t mptag; - unsigned timeout; - StringAttr errorMsg; - ICommunicator &comm; - bool skipDisconnected; - public: - CSendAsyncfor(ICommunicator &_comm, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg, bool _skipDisconnected) - : comm(_comm), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg), skipDisconnected(_skipDisconnected) - { - } - void Do(unsigned i) - { - if (!comm.send(msg, i+1, mptag, timeout, skipDisconnected)) - throw createBCastException(i+1, errorMsg); - } - } afor(comm, msg, mptag, timeout, errorMsg, skipDisconnected); - try + std::vector>> sendResultsFuture; + for (unsigned i=0; i e = createBCastException(0, errorMsg); - EXCLOG(e, NULL); - abort(e); - throw e.getClear(); - } + if (sendOnly) return; unsigned respondents = 0; Owned bitSet = createThreadSafeBitSet(); diff --git a/thorlcr/graph/thgraphmaster.ipp b/thorlcr/graph/thgraphmaster.ipp index 307ab39b3b8..9b7b7d7a21d 100644 --- a/thorlcr/graph/thgraphmaster.ipp +++ b/thorlcr/graph/thgraphmaster.ipp @@ -173,7 +173,7 @@ public: void registerFile(const char *logicalName, StringArray &clusters, unsigned usageCount=0, WUFileKind fileKind=WUFileStandard, bool temp=false); void deregisterFile(const char *logicalName, bool kept=false); const SocketEndpoint &queryAgentEp() const { return agentEp; } - void broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler=NULL, bool sendOnly=false, bool skipDisconnected=false); + void broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler=NULL, bool sendOnly=false, bool aborting=false); IPropertyTree *prepareWorkUnitInfo(); void sendQuery(); void jobDone();