diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index cfcfa9c84c4..fda869ba6c1 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1685,7 +1685,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread } CCycleTimer workerTimer; - hash64_t queryHash = packet->queryHeader().queryHash; + hash64_t queryHash = header.queryHash; Owned queryFactory = getQueryFactory(queryHash, channel); if (!queryFactory && logctx.queryWuid()) { @@ -1698,14 +1698,14 @@ class CRoxieWorker : public CInterface, implements IPooledThread if (!queryFactory) { StringBuffer hdr; - IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie agent received request for unregistered query: %s", packet->queryHeader().toString(hdr).str()); + IException *E = MakeStringException(MSGAUD_operator, ROXIE_UNKNOWN_QUERY, "Roxie agent received request for unregistered query: %s", header.toString(hdr).str()); EXCLOG(E, "doActivity"); throwRemoteException(E, activity, packet, false); return; } activitiesStarted++; - unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK; + unsigned activityId = header.activityId & ~ROXIE_PRIORITY_MASK; Owned factory = queryFactory->getAgentActivityFactory(activityId); assertex(factory); setActivity(factory->createActivity(logctx, packet)); @@ -3055,6 +3055,45 @@ class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager //================================================================================================== +void * CDummyMessagePacker::getBuffer(unsigned len, bool variable) +{ + if (variable) + { + char *ret = (char *) data.ensureCapacity(len + sizeof(RecordLengthType)); + return ret + sizeof(RecordLengthType); + } + else + { + return data.ensureCapacity(len); + } +} + +void CDummyMessagePacker::putBuffer(const void *buf, unsigned len, bool variable) +{ + if (variable) + { + buf = ((char *) buf) - sizeof(RecordLengthType); + *(RecordLengthType *) buf = len; + len += sizeof(RecordLengthType); + } + data.setWritePos(lastput + len); + lastput += len; +} + +void CDummyMessagePacker::flush() +{ +} + +void CDummyMessagePacker::sendMetaInfo(const void *buf, unsigned len) +{ + throwUnexpected(); +} + +unsigned CDummyMessagePacker::size() const +{ + return lastput; +} + interface ILocalMessageCollator : extends IMessageCollator { virtual void enqueueMessage(bool outOfBand, void *data, unsigned datalen, void *meta, unsigned metalen, void *header, unsigned headerlen) = 0; diff --git a/roxie/ccd/ccdqueue.ipp b/roxie/ccd/ccdqueue.ipp index 9c49453ecfe..a6254fbc4c7 100644 --- a/roxie/ccd/ccdqueue.ipp +++ b/roxie/ccd/ccdqueue.ipp @@ -48,34 +48,11 @@ public: lastput = 0; } - virtual void *getBuffer(unsigned len, bool variable) override - { - if (variable) - { - char *ret = (char *) data.ensureCapacity(len + sizeof(RecordLengthType)); - return ret + sizeof(RecordLengthType); - } - else - { - return data.ensureCapacity(len); - } - } - - virtual void putBuffer(const void *buf, unsigned len, bool variable) override - { - if (variable) - { - buf = ((char *) buf) - sizeof(RecordLengthType); - *(RecordLengthType *) buf = len; - len += sizeof(RecordLengthType); - } - data.setWritePos(lastput + len); - lastput += len; - } - - virtual void flush() override { } - virtual void sendMetaInfo(const void *buf, unsigned len) override { throwUnexpected(); } - virtual unsigned size() const override { return lastput; } + virtual void *getBuffer(unsigned len, bool variable) override; + virtual void putBuffer(const void *buf, unsigned len, bool variable) override; + virtual void flush() override; + virtual void sendMetaInfo(const void *buf, unsigned len) override; + virtual unsigned size() const override; }; interface IPacketDiscarder : public IInterface