Skip to content

Commit

Permalink
Merge pull request #18890 from jakesmith/HPCC-32266-compress-loop-res…
Browse files Browse the repository at this point in the history
…ult-streams

HPCC-32266 Ensure LOOP result streams are compressed.

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jul 18, 2024
2 parents 6ad2ed3 + afefda8 commit 4cf0d7e
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
block.clearCB = true;
assertex(((offset_t)-1) != outputOffset);
unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(owner->emptyRowSemantics);
if (owner->spillCompInfo)
{
rwFlags |= rw_compress;
rwFlags |= owner->spillCompInfo;
}
spillStream.setown(::createRowStreamEx(&(owner->spillFile->queryIFile()), owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
owner->rows.unregisterWriteCallback(*this); // no longer needed
ret = spillStream->nextRow();
Expand Down Expand Up @@ -376,9 +381,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
};

public:
CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillCompInfo, unsigned _spillPriority)
: CSpillableStreamBase(_activity, inRows, _rowIf, _emptyRowSemantics, _spillPriority)
{
spillCompInfo = _spillCompInfo;
}
IRowStream *createRowStream()
{
Expand All @@ -387,6 +393,11 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
{
block.clearCB = true;
unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(emptyRowSemantics);
if (spillCompInfo)
{
rwFlags |= rw_compress;
rwFlags |= spillCompInfo;
}
return ::createRowStream(&spillFile->queryIFile(), rowIf, rwFlags);
}
rowidx_t toRead = rows.numCommitted();
Expand Down Expand Up @@ -1380,10 +1391,7 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
rowidx_t n = numCommitted();
if (0 == n)
return 0;
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, emptyRowSemantics=%u) max rows = %" RIPF "u", _tracingPrefix, boolToStr(skipNulls), emptyRowSemantics, n);

if (_spillCompInfo)
assertex(0 == writeCallbacks.ordinality()); // incompatible
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, emptyRowSemantics=%u) max rows = %" RIPF "u", _tracingPrefix, boolToStr(skipNulls), emptyRowSemantics, n);

unsigned rwFlags = DEFAULT_RWFLAGS;
if (_spillCompInfo)
Expand Down Expand Up @@ -1773,7 +1781,7 @@ class CThorRowCollectorBase : public CSpillable
* because roxiemem's background thread may be blocked on that lock, and calling roxiemem::addRowBuffer here would cause deadlock
*/
activateSharedCallback = true;
spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillPriority));
spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillCompInfo, spillPriority));
spillableRowSet->setTracingPrefix(tracingPrefix);
}
}
Expand Down

0 comments on commit 4cf0d7e

Please sign in to comment.