Skip to content

Commit

Permalink
HPCC-29284 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 13, 2023
1 parent b4eba75 commit 1674e74
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 51 deletions.
12 changes: 6 additions & 6 deletions dali/base/dacoven.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,10 @@ class CCovenBase: implements ICoven, public CInterface
return comm->barrier();
}

virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5)
virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5, bool allowConnect=true)
{
assertex(comm);
return comm->verifyConnection(rank,timeout);
return comm->verifyConnection(rank,timeout, allowConnect);
}


Expand Down Expand Up @@ -768,10 +768,10 @@ class CCovenServer: public CCovenBase
updateDataStore();
}

virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false)
virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
{
assertex(comm);
return comm->send(mbuf,dstrank,tag,timeout,skipDisconnected);
return comm->send(mbuf,dstrank,tag,timeout);
}

bool sendRecv(CMessageBuffer &mbuff, rank_t sendrank, mptag_t sendtag, unsigned timeout=MP_WAIT_FOREVER)
Expand Down Expand Up @@ -912,11 +912,11 @@ class CCovenClient: public CCovenBase
assertex(!"setInitSDSNodes not allowed in client");
}

virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false)
virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER)
{
assertex(comm);
try {
return comm->send(mbuf,dstrank,tag,timeout,skipDisconnected);
return comm->send(mbuf,dstrank,tag,timeout);
}
CATCH_MPERR_link_closed
}
Expand Down
16 changes: 7 additions & 9 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2981,7 +2981,7 @@ class CCommunicator: public ICommunicator, public CInterface
IMPLEMENT_IINTERFACE;


bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout, bool skipDisconnected)
bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout)
{
// send does not corrupt mbuf
if (dstrank==RANK_NULL)
Expand All @@ -2996,7 +2996,7 @@ class CCommunicator: public ICommunicator, public CInterface
CTimeMon tm(timeout);
rank_t endrank;
if (dstrank==RANK_ALL) {
send(mbuf,myrank,tag,timeout,false);
send(mbuf,myrank,tag,timeout);
dstrank = RANK_ALL_OTHER;
}
if (dstrank==RANK_ALL_OTHER) {
Expand All @@ -3023,8 +3023,6 @@ class CCommunicator: public ICommunicator, public CInterface
unsigned remaining;
if (tm.timedout(&remaining))
return false;
if (skipDisconnected && !channel->isConnected())
continue;
if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
return false;
}
Expand Down Expand Up @@ -3065,7 +3063,7 @@ class CCommunicator: public ICommunicator, public CInterface

mb.clear();
mb.append("MPTAG_BARRIER");
bool oks = send(mb,dst,MPTAG_BARRIER,120000,false);
bool oks = send(mb,dst,MPTAG_BARRIER,120000);
mb.clear();
bool okr = recv(mb,src,MPTAG_BARRIER,&r);

Expand All @@ -3083,7 +3081,7 @@ class CCommunicator: public ICommunicator, public CInterface
#endif
}

bool verifyConnection(rank_t rank, unsigned timeout)
bool verifyConnection(rank_t rank, unsigned timeout, bool allowConnect=true)
{
CriticalBlock block(verifysect);
assertex(rank!=RANK_RANDOM);
Expand All @@ -3093,7 +3091,7 @@ class CCommunicator: public ICommunicator, public CInterface
unsigned remaining;
if (tm.timedout(&remaining))
return false;
return channel->verifyConnection(tm,true);
return channel->verifyConnection(tm,allowConnect);
}

bool verifyAll(bool duplex, unsigned totalTimeout, unsigned perConnectionTimeout)
Expand Down Expand Up @@ -3273,7 +3271,7 @@ class CCommunicator: public ICommunicator, public CInterface
unsigned remaining;
if (tm.timedout(&remaining))
return false;
if (!send(mbuff,sendrank,sendtag,remaining,false)||tm.timedout(&remaining))
if (!send(mbuff,sendrank,sendtag,remaining)||tm.timedout(&remaining))
return false;
mbuff.clear();
return recv(mbuff,sendrank,replytag,NULL,remaining);
Expand All @@ -3284,7 +3282,7 @@ class CCommunicator: public ICommunicator, public CInterface
mptag_t replytag = mbuf.getReplyTag();
rank_t dstrank = group->rank(mbuf.getSender());
if (dstrank!=RANK_NULL) {
if (send (mbuf, dstrank, replytag,timeout,false)) {
if (send (mbuf, dstrank, replytag,timeout)) {
mbuf.setReplyTag(TAG_NULL);
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions system/mp/mpcomm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const unsigned MPVerboseMsgThreshold = 110; // greater than default logging deta

interface ICommunicator: extends IInterface
{
virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER, bool skipDisconnected=false) = 0;
virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) = 0;
// blocking send (unless MP_ASYNC_SEND used for timeout), NB, mbuf clear on exit
// returns false if timedout

Expand All @@ -54,7 +54,7 @@ interface ICommunicator: extends IInterface

virtual void flush (mptag_t tag) = 0; // flushes pending buffers

virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) = 0; // verifies connected to rank
virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5, bool allowConnect=true) = 0; // verifies connected to rank
virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30, unsigned perConnectionTimeout=0) = 0;
virtual void disconnect(INode *node) = 0;
virtual void barrier() = 0;
Expand Down
54 changes: 21 additions & 33 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include <limits.h>
#include <stdlib.h>
#include <future>
#include <vector>
#include <iterator>
#include "jprop.hpp"
#include "jexcept.hpp"
#include "jiter.ipp"
Expand Down Expand Up @@ -1599,7 +1602,7 @@ void CJobMaster::freeMPTag(mptag_t tag)
}
}

void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly, bool skipDisconnected)
void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly, bool aborting)
{
unsigned groupSizeExcludingMaster = comm.queryGroup().ordinality() - 1;

Expand All @@ -1609,45 +1612,30 @@ void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mpt
replyTag = queryJobChannel(0).queryMPServer().createReplyTag();
msg.setReplyTag(replyTag);
}
if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
{
class CSendAsyncfor : public CAsyncFor
{
CMessageBuffer &msg;
mptag_t mptag;
unsigned timeout;
StringAttr errorMsg;
ICommunicator &comm;
bool skipDisconnected;
public:
CSendAsyncfor(ICommunicator &_comm, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg, bool _skipDisconnected)
: comm(_comm), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg), skipDisconnected(_skipDisconnected)
{
}
void Do(unsigned i)
{
if (!comm.send(msg, i+1, mptag, timeout, skipDisconnected))
throw createBCastException(i+1, errorMsg);
}
} afor(comm, msg, mptag, timeout, errorMsg, skipDisconnected);
try
std::vector<std::future<std::pair<unsigned, bool>>> sendResultsFuture;
for (unsigned i=0; i<groupSizeExcludingMaster; i++)
if (comm.verifyConnection(i+1, (unsigned)-1, !aborting))
{
afor.For(groupSizeExcludingMaster, groupSizeExcludingMaster);
sendResultsFuture.push_back(
std::async([&, i]
{
return std::make_pair(i+1, comm.send(msg, i+1, mptag, timeout));
})
);
}
catch (IException *e)
for (auto & sendResultFuture: sendResultsFuture)
{
auto result = sendResultFuture.get();
DBGLOG("Results %u: %s", result.first, result.second?"true":"false");
if (result.second==false)
{
IException * e = createBCastException(result.first, errorMsg);
EXCLOG(e, "broadcastSendAsync");
abort(e);
throw;
throw e;
}
}
else if (!comm.send(msg, RANK_ALL_OTHER, mptag, timeout, skipDisconnected))
{
Owned<IException> e = createBCastException(0, errorMsg);
EXCLOG(e, NULL);
abort(e);
throw e.getClear();
}

if (sendOnly) return;
unsigned respondents = 0;
Owned<IBitSet> bitSet = createThreadSafeBitSet();
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/graph/thgraphmaster.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public:
void registerFile(const char *logicalName, StringArray &clusters, unsigned usageCount=0, WUFileKind fileKind=WUFileStandard, bool temp=false);
void deregisterFile(const char *logicalName, bool kept=false);
const SocketEndpoint &queryAgentEp() const { return agentEp; }
void broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler=NULL, bool sendOnly=false, bool skipDisconnected=false);
void broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler=NULL, bool sendOnly=false, bool aborting=false);
IPropertyTree *prepareWorkUnitInfo();
void sendQuery();
void jobDone();
Expand Down

0 comments on commit 1674e74

Please sign in to comment.