Skip to content

Commit

Permalink
Merge pull request #17518 from ghalliday/issue29843
Browse files Browse the repository at this point in the history
HPCC-29843 Minor optimizations to roxie code

Reviewed-By: Richard Chapman <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jul 11, 2023
2 parents 02a13e2 + 4f3a3f9 commit 88cd80d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
45 changes: 42 additions & 3 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}

CCycleTimer workerTimer;
hash64_t queryHash = packet->queryHeader().queryHash;
hash64_t queryHash = header.queryHash;
Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
if (!queryFactory && logctx.queryWuid())
{
Expand All @@ -1698,14 +1698,14 @@ class CRoxieWorker : public CInterface, implements IPooledThread
if (!queryFactory)
{
StringBuffer hdr;
IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie agent received request for unregistered query: %s", packet->queryHeader().toString(hdr).str());
IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie agent received request for unregistered query: %s", header.toString(hdr).str());
EXCLOG(E, "doActivity");
throwRemoteException(E, activity, packet, false);
return;
}

activitiesStarted++;
unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
unsigned activityId = header.activityId & ~ROXIE_PRIORITY_MASK;
Owned <IAgentActivityFactory> factory = queryFactory->getAgentActivityFactory(activityId);
assertex(factory);
setActivity(factory->createActivity(logctx, packet));
Expand Down Expand Up @@ -3055,6 +3055,45 @@ class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager

//==================================================================================================

void * CDummyMessagePacker::getBuffer(unsigned len, bool variable)
{
if (variable)
{
char *ret = (char *) data.ensureCapacity(len + sizeof(RecordLengthType));
return ret + sizeof(RecordLengthType);
}
else
{
return data.ensureCapacity(len);
}
}

void CDummyMessagePacker::putBuffer(const void *buf, unsigned len, bool variable)
{
if (variable)
{
buf = ((char *) buf) - sizeof(RecordLengthType);
*(RecordLengthType *) buf = len;
len += sizeof(RecordLengthType);
}
data.setWritePos(lastput + len);
lastput += len;
}

void CDummyMessagePacker::flush()
{
}

void CDummyMessagePacker::sendMetaInfo(const void *buf, unsigned len)
{
throwUnexpected();
}

unsigned CDummyMessagePacker::size() const
{
return lastput;
}

interface ILocalMessageCollator : extends IMessageCollator
{
virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0;
Expand Down
33 changes: 5 additions & 28 deletions roxie/ccd/ccdqueue.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,11 @@ public:
lastput = 0;
}

virtual void *getBuffer(unsigned len, bool variable) override
{
if (variable)
{
char *ret = (char *) data.ensureCapacity(len + sizeof(RecordLengthType));
return ret + sizeof(RecordLengthType);
}
else
{
return data.ensureCapacity(len);
}
}

virtual void putBuffer(const void *buf, unsigned len, bool variable) override
{
if (variable)
{
buf = ((char *) buf) - sizeof(RecordLengthType);
*(RecordLengthType *) buf = len;
len += sizeof(RecordLengthType);
}
data.setWritePos(lastput + len);
lastput += len;
}

virtual void flush() override { }
virtual void sendMetaInfo(const void *buf, unsigned len) override { throwUnexpected(); }
virtual unsigned size() const override { return lastput; }
virtual void *getBuffer(unsigned len, bool variable) override;
virtual void putBuffer(const void *buf, unsigned len, bool variable) override;
virtual void flush() override;
virtual void sendMetaInfo(const void *buf, unsigned len) override;
virtual unsigned size() const override;
};

interface IPacketDiscarder : public IInterface
Expand Down

0 comments on commit 88cd80d

Please sign in to comment.