Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29385 Allow incremental decompression from lz4 files #17557

Merged
merged 1 commit into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 1 addition & 45 deletions system/jlib/jfcmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>
};


class jlib_decl CFcmpExpander : public CSimpleInterfaceOf<IExpander>
class jlib_decl CFcmpExpander : public CExpanderBase
{
protected:
byte *outbuf;
Expand Down Expand Up @@ -241,50 +241,6 @@ class jlib_decl CFcmpExpander : public CSimpleInterfaceOf<IExpander>
return outlen;
}

virtual void expand(void *buf)
{
if (!outlen)
return;
if (buf)
{
if (bufalloc)
free(outbuf);
bufalloc = 0;
outbuf = (unsigned char *)buf;
}
else if (outlen>bufalloc)
{
if (bufalloc)
free(outbuf);
bufalloc = outlen;
outbuf = (unsigned char *)malloc(bufalloc);
if (!outbuf)
throw MakeStringException(MSGAUD_operator,0, "Out of memory in FcmpExpander::expand, requesting %d bytes", bufalloc);
}
size32_t done = 0;
for (;;)
{
const size32_t szchunk = *in;
in++;
if (szchunk+done<outlen)
{
memcpy((byte *)buf+done, in, szchunk);
size32_t written = szchunk;
done += written;
if (!written||(done>outlen))
throw MakeStringException(0, "FcmpExpander - corrupt data(1) %d %d",written,szchunk);
}
else
{
if (szchunk+done!=outlen)
throw MakeStringException(0, "FcmpExpander - corrupt data(2) %d %d",szchunk,outlen);
memcpy((byte *)buf+done,in,szchunk);
break;
}
in = (const size32_t *)(((const byte *)in)+szchunk);
}
}

virtual void *bufptr() { return outbuf;}
virtual size32_t buflen() { return outlen;}
};
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jflz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ class CFastLZCompressor final : public CFcmpCompressor
class jlib_decl CFastLZExpander : public CFcmpExpander
{
public:
virtual void expand(void *buf)
virtual void expand(void *buf) override
{
if (!outlen)
return;
Expand Down
63 changes: 62 additions & 1 deletion system/jlib/jlz4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ class CLZ4Compressor final : public CFcmpCompressor

class jlib_decl CLZ4Expander : public CFcmpExpander
{
size32_t totalExpanded = 0;
public:
virtual void expand(void *buf)
virtual void expand(void *buf) override
{
if (!outlen)
return;
Expand Down Expand Up @@ -221,6 +222,66 @@ class jlib_decl CLZ4Expander : public CFcmpExpander
}
}

virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override
{
init(src);
totalExpanded = 0;
return expandNext(target);
}

virtual size32_t expandNext(MemoryBuffer & target) override
{
if (totalExpanded == outlen)
return 0;

const size32_t szchunk = *in;
in++;

target.clear();
size32_t written;
if (szchunk+totalExpanded<outlen)
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
{
//All but the last block are compressed (see expand() function above).
//Slightly concerning there always has to be one trailing byte for this to work!
size32_t maxOut = target.capacity();
size32_t maxEstimate = (outlen - totalExpanded);
size32_t estimate = szchunk; // start conservatively - likely to be preallocated to correct size already.
if (estimate > maxEstimate)
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
estimate = maxEstimate;
if (maxOut < estimate)
maxOut = estimate;

for (;;)
{
//Try and compress into the current target buffer. If too small increase size and repeat
written = LZ4_decompress_safe((const char *)in, (char *)target.reserve(maxOut), szchunk, maxOut);
if ((int)written > 0)
{
target.setLength(written);
break;
}

//Sanity check to catch corrupt lz4 data that always returns an error.
if (maxOut > outlen)
throwUnexpected();

maxOut += szchunk; // Likely to quickly approach the actual expanded size
target.clear();
}
}
else
{
void * buf = target.reserve(szchunk);
written = szchunk;
memcpy(buf,in,szchunk);
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
}

in = (const size32_t *)(((const byte *)in)+szchunk);
totalExpanded += written;
if (totalExpanded > outlen)
throw MakeStringException(0, "LZ4Expander - corrupt data(3) %d %d",written,szchunk);
return written;
}
};

void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
Expand Down
79 changes: 67 additions & 12 deletions system/jlib/jlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,20 @@ void CLZWCompressor::close()
}
}


size32_t CExpanderBase::expandFirst(MemoryBuffer & target, const void * src)
{
size32_t size = init(src);
void * buffer = target.reserve(size);
expand(buffer);
return size;
}

size32_t CExpanderBase::expandNext(MemoryBuffer & target)
{
return 0;
}

CLZWExpander::CLZWExpander(bool _supportbigendian)
{
outbuf = NULL;
Expand Down Expand Up @@ -1467,16 +1481,14 @@ class jlib_decl CRDiffCompressor : public ICompressor, public CInterface
};


class jlib_decl CRDiffExpander : public IExpander, public CInterface
class jlib_decl CRDiffExpander : public CExpanderBase
{
unsigned char *outbuf;
size32_t outlen;
size32_t bufalloc;
unsigned char *in;
size32_t recsize;
public:
IMPLEMENT_IINTERFACE;

CRDiffExpander()
{
outbuf = NULL;
Expand Down Expand Up @@ -1987,8 +1999,12 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface
bool writeException;
Owned<ICompressor> compressor;
Owned<IExpander> expander;
MemoryAttr compressedInputBlock;
unsigned compMethod;
offset_t lastFlushPos = (offset_t)-1;
offset_t nextExpansionPos = (offset_t)-1;
offset_t startBlockPos = (offset_t)-1;
size32_t fullBlockSize = 0;

unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); }

Expand Down Expand Up @@ -2017,6 +2033,43 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface
void getblock(offset_t pos)
{
curblockbuf.clear();

//If the blocks are being expanded incrementally check if the position is within the current block
//This test will never be true for row compressed data, or non-incremental decompression
if ((pos >= startBlockPos) && (pos < startBlockPos + fullBlockSize))
{
if (pos < nextExpansionPos)
{
//Start decompressing again and avoid re-reading the data from disk
const void * rawData;
if (fileio)
rawData = compressedInputBlock.get();
else
rawData = mmfile->base()+startBlockPos;

assertex(rawData);
size32_t exp = expander->expandFirst(curblockbuf, rawData);
curblockpos = startBlockPos;
nextExpansionPos = startBlockPos + exp;
if (pos < nextExpansionPos)
return;

curblockbuf.clear();
}

for (;;)
{
size32_t nextSize = expander->expandNext(curblockbuf);
if (nextSize == 0)
throwUnexpected(); // Should have failed the outer block test if nextSize is 0

curblockpos = nextExpansionPos;
nextExpansionPos = nextExpansionPos+nextSize;
if (pos < nextExpansionPos)
return;
}
}

size32_t expsize;
curblocknum = lookupIndex(pos,curblockpos,expsize);
size32_t toread = trailer.blockSize;
Expand All @@ -2027,8 +2080,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface
if (!toread)
return;
if (fileio) {
MemoryAttr comp;
void *b=comp.allocate(toread);
//Allocate on the first call, reuse on subsequent calls.
void * b = compressedInputBlock.allocate(trailer.blockSize);

size32_t r = fileio->read(p,toread,b);
assertex(r==toread);
expand(b,curblockbuf,expsize);
Expand Down Expand Up @@ -2070,11 +2124,10 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface
}
else { // lzw or fastlz or lz4
assertex(expander.get());
size32_t exp = expander->init(compbuf);
if (exp!=expsize) {
throw MakeStringException(-1,"Compressed file format failure(%d,%d) - Encrypted?",exp,expsize);
}
expander->expand(expbuf.reserve(exp));
size32_t exp = expander->expandFirst(expbuf, compbuf);
startBlockPos = curblockpos;
nextExpansionPos = startBlockPos + exp;
fullBlockSize = expsize;
}
}

Expand Down Expand Up @@ -2224,6 +2277,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface
compMethod = COMPRESS_METHOD_LZW;
expander.setown(createLZWExpander(true));
}
//Preallocate the expansion target to the block size - to ensure it is the right size and
//avoid reallocation when expanding lz4
curblockbuf.ensureCapacity(trailer.blockSize);
}
}
}
Expand Down Expand Up @@ -2685,13 +2741,12 @@ class CAESCompressor : implements ICompressor, public CInterface
virtual CompressionMethod getCompressionMethod() const override { return (CompressionMethod)(COMPRESS_METHOD_AES | comp->getCompressionMethod()); }
};

class CAESExpander : implements IExpander, public CInterface
class CAESExpander : implements CExpanderBase
{
Owned<IExpander> exp; // base expander
MemoryBuffer compbuf;
MemoryAttr key;
public:
IMPLEMENT_IINTERFACE;
CAESExpander(const void *_key, unsigned _keylen)
: key(_keylen,_key)
{
Expand Down
9 changes: 9 additions & 0 deletions system/jlib/jlzw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ interface jlib_decl IExpander : public IInterface
virtual void expand(void *target)=0;
virtual void * bufptr()=0;
virtual size32_t buflen()=0;
virtual size32_t expandFirst(MemoryBuffer & target, const void * src) = 0;
virtual size32_t expandNext(MemoryBuffer & target) = 0;
};


Expand All @@ -82,6 +84,13 @@ interface jlib_decl IRandRowExpander : public IInterface
};


class jlib_decl CExpanderBase : public CInterfaceOf<IExpander>
{
public:
//Provide default implementations
virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override;
virtual size32_t expandNext(MemoryBuffer & target) override;
};


extern jlib_decl ICompressor *createLZWCompressor(bool supportbigendian=false); // bigendiansupport required for cross platform with solaris
Expand Down
6 changes: 1 addition & 5 deletions system/jlib/jlzw.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public:
unsigned char dictchar[LZW_HASH_TABLE_SIZE];
};


class CLZWCompressor final : public ICompressor, public CInterface
{
public:
Expand Down Expand Up @@ -88,12 +87,9 @@ protected:
bool supportbigendian;
};


class jlib_decl CLZWExpander : public IExpander, public CInterface
class CLZWExpander : public CExpanderBase
{
public:
IMPLEMENT_IINTERFACE;

CLZWExpander(bool _supportbigendian);
~CLZWExpander();
virtual size32_t init(const void *blk); // returns size required
Expand Down
Loading