From 97d19f4058163671dac984782e9f52c0708e0027 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Wed, 5 Jul 2023 18:15:42 +0100 Subject: [PATCH] HPCC-29385 Allow incremental decompression from lz4 files Signed-off-by: Gavin Halliday --- system/jlib/jfcmp.hpp | 46 +------------------------ system/jlib/jflz.cpp | 2 +- system/jlib/jlz4.cpp | 63 +++++++++++++++++++++++++++++++++- system/jlib/jlzw.cpp | 79 ++++++++++++++++++++++++++++++++++++------- system/jlib/jlzw.hpp | 9 +++++ system/jlib/jlzw.ipp | 6 +--- 6 files changed, 141 insertions(+), 64 deletions(-) diff --git a/system/jlib/jfcmp.hpp b/system/jlib/jfcmp.hpp index 3e5b96ce618..e1fc4aad577 100644 --- a/system/jlib/jfcmp.hpp +++ b/system/jlib/jfcmp.hpp @@ -211,7 +211,7 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf }; -class jlib_decl CFcmpExpander : public CSimpleInterfaceOf +class jlib_decl CFcmpExpander : public CExpanderBase { protected: byte *outbuf; @@ -241,50 +241,6 @@ class jlib_decl CFcmpExpander : public CSimpleInterfaceOf return outlen; } - virtual void expand(void *buf) - { - if (!outlen) - return; - if (buf) - { - if (bufalloc) - free(outbuf); - bufalloc = 0; - outbuf = (unsigned char *)buf; - } - else if (outlen>bufalloc) - { - if (bufalloc) - free(outbuf); - bufalloc = outlen; - outbuf = (unsigned char *)malloc(bufalloc); - if (!outbuf) - throw MakeStringException(MSGAUD_operator,0, "Out of memory in FcmpExpander::expand, requesting %d bytes", bufalloc); - } - size32_t done = 0; - for (;;) - { - const size32_t szchunk = *in; - in++; - if (szchunk+doneoutlen)) - throw MakeStringException(0, "FcmpExpander - corrupt data(1) %d %d",written,szchunk); - } - else - { - if (szchunk+done!=outlen) - throw MakeStringException(0, "FcmpExpander - corrupt data(2) %d %d",szchunk,outlen); - memcpy((byte *)buf+done,in,szchunk); - break; - } - in = (const size32_t *)(((const byte *)in)+szchunk); - } - } - virtual void *bufptr() { return outbuf;} virtual size32_t buflen() { return outlen;} }; diff --git a/system/jlib/jflz.cpp b/system/jlib/jflz.cpp index a809964c540..078229722ba 100644 --- a/system/jlib/jflz.cpp +++ b/system/jlib/jflz.cpp @@ -687,7 +687,7 @@ class CFastLZCompressor final : public CFcmpCompressor class jlib_decl CFastLZExpander : public CFcmpExpander { public: - virtual void expand(void *buf) + virtual void expand(void *buf) override { if (!outlen) return; diff --git a/system/jlib/jlz4.cpp b/system/jlib/jlz4.cpp index 92f3a8134d6..3b684a3bd60 100644 --- a/system/jlib/jlz4.cpp +++ b/system/jlib/jlz4.cpp @@ -177,8 +177,9 @@ class CLZ4Compressor final : public CFcmpCompressor class jlib_decl CLZ4Expander : public CFcmpExpander { + size32_t totalExpanded = 0; public: - virtual void expand(void *buf) + virtual void expand(void *buf) override { if (!outlen) return; @@ -221,6 +222,66 @@ class jlib_decl CLZ4Expander : public CFcmpExpander } } + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override + { + init(src); + totalExpanded = 0; + return expandNext(target); + } + + virtual size32_t expandNext(MemoryBuffer & target) override + { + if (totalExpanded == outlen) + return 0; + + const size32_t szchunk = *in; + in++; + + target.clear(); + size32_t written; + if (szchunk+totalExpanded maxEstimate) + estimate = maxEstimate; + if (maxOut < estimate) + maxOut = estimate; + + for (;;) + { + //Try and compress into the current target buffer. If too small increase size and repeat + written = LZ4_decompress_safe((const char *)in, (char *)target.reserve(maxOut), szchunk, maxOut); + if ((int)written > 0) + { + target.setLength(written); + break; + } + + //Sanity check to catch corrupt lz4 data that always returns an error. + if (maxOut > outlen) + throwUnexpected(); + + maxOut += szchunk; // Likely to quickly approach the actual expanded size + target.clear(); + } + } + else + { + void * buf = target.reserve(szchunk); + written = szchunk; + memcpy(buf,in,szchunk); + } + + in = (const size32_t *)(((const byte *)in)+szchunk); + totalExpanded += written; + if (totalExpanded > outlen) + throw MakeStringException(0, "LZ4Expander - corrupt data(3) %d %d",written,szchunk); + return written; + } }; void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src) diff --git a/system/jlib/jlzw.cpp b/system/jlib/jlzw.cpp index 8e343cecb6d..46604389dcf 100644 --- a/system/jlib/jlzw.cpp +++ b/system/jlib/jlzw.cpp @@ -448,6 +448,20 @@ void CLZWCompressor::close() } } + +size32_t CExpanderBase::expandFirst(MemoryBuffer & target, const void * src) +{ + size32_t size = init(src); + void * buffer = target.reserve(size); + expand(buffer); + return size; +} + +size32_t CExpanderBase::expandNext(MemoryBuffer & target) +{ + return 0; +} + CLZWExpander::CLZWExpander(bool _supportbigendian) { outbuf = NULL; @@ -1467,7 +1481,7 @@ class jlib_decl CRDiffCompressor : public ICompressor, public CInterface }; -class jlib_decl CRDiffExpander : public IExpander, public CInterface +class jlib_decl CRDiffExpander : public CExpanderBase { unsigned char *outbuf; size32_t outlen; @@ -1475,8 +1489,6 @@ class jlib_decl CRDiffExpander : public IExpander, public CInterface unsigned char *in; size32_t recsize; public: - IMPLEMENT_IINTERFACE; - CRDiffExpander() { outbuf = NULL; @@ -1987,8 +1999,12 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface bool writeException; Owned compressor; Owned expander; + MemoryAttr compressedInputBlock; unsigned compMethod; offset_t lastFlushPos = (offset_t)-1; + offset_t nextExpansionPos = (offset_t)-1; + offset_t startBlockPos = (offset_t)-1; + size32_t fullBlockSize = 0; unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); } @@ -2017,6 +2033,43 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface void getblock(offset_t pos) { curblockbuf.clear(); + + //If the blocks are being expanded incrementally check if the position is within the current block + //This test will never be true for row compressed data, or non-incremental decompression + if ((pos >= startBlockPos) && (pos < startBlockPos + fullBlockSize)) + { + if (pos < nextExpansionPos) + { + //Start decompressing again and avoid re-reading the data from disk + const void * rawData; + if (fileio) + rawData = compressedInputBlock.get(); + else + rawData = mmfile->base()+startBlockPos; + + assertex(rawData); + size32_t exp = expander->expandFirst(curblockbuf, rawData); + curblockpos = startBlockPos; + nextExpansionPos = startBlockPos + exp; + if (pos < nextExpansionPos) + return; + + curblockbuf.clear(); + } + + for (;;) + { + size32_t nextSize = expander->expandNext(curblockbuf); + if (nextSize == 0) + throwUnexpected(); // Should have failed the outer block test if nextSize is 0 + + curblockpos = nextExpansionPos; + nextExpansionPos = nextExpansionPos+nextSize; + if (pos < nextExpansionPos) + return; + } + } + size32_t expsize; curblocknum = lookupIndex(pos,curblockpos,expsize); size32_t toread = trailer.blockSize; @@ -2027,8 +2080,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface if (!toread) return; if (fileio) { - MemoryAttr comp; - void *b=comp.allocate(toread); + //Allocate on the first call, reuse on subsequent calls. + void * b = compressedInputBlock.allocate(trailer.blockSize); + size32_t r = fileio->read(p,toread,b); assertex(r==toread); expand(b,curblockbuf,expsize); @@ -2070,11 +2124,10 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface } else { // lzw or fastlz or lz4 assertex(expander.get()); - size32_t exp = expander->init(compbuf); - if (exp!=expsize) { - throw MakeStringException(-1,"Compressed file format failure(%d,%d) - Encrypted?",exp,expsize); - } - expander->expand(expbuf.reserve(exp)); + size32_t exp = expander->expandFirst(expbuf, compbuf); + startBlockPos = curblockpos; + nextExpansionPos = startBlockPos + exp; + fullBlockSize = expsize; } } @@ -2224,6 +2277,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface compMethod = COMPRESS_METHOD_LZW; expander.setown(createLZWExpander(true)); } + //Preallocate the expansion target to the block size - to ensure it is the right size and + //avoid reallocation when expanding lz4 + curblockbuf.ensureCapacity(trailer.blockSize); } } } @@ -2685,13 +2741,12 @@ class CAESCompressor : implements ICompressor, public CInterface virtual CompressionMethod getCompressionMethod() const override { return (CompressionMethod)(COMPRESS_METHOD_AES | comp->getCompressionMethod()); } }; -class CAESExpander : implements IExpander, public CInterface +class CAESExpander : implements CExpanderBase { Owned exp; // base expander MemoryBuffer compbuf; MemoryAttr key; public: - IMPLEMENT_IINTERFACE; CAESExpander(const void *_key, unsigned _keylen) : key(_keylen,_key) { diff --git a/system/jlib/jlzw.hpp b/system/jlib/jlzw.hpp index 72816bc4370..1d7c7196390 100644 --- a/system/jlib/jlzw.hpp +++ b/system/jlib/jlzw.hpp @@ -66,6 +66,8 @@ interface jlib_decl IExpander : public IInterface virtual void expand(void *target)=0; virtual void * bufptr()=0; virtual size32_t buflen()=0; + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) = 0; + virtual size32_t expandNext(MemoryBuffer & target) = 0; }; @@ -82,6 +84,13 @@ interface jlib_decl IRandRowExpander : public IInterface }; +class jlib_decl CExpanderBase : public CInterfaceOf +{ +public: + //Provide default implementations + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override; + virtual size32_t expandNext(MemoryBuffer & target) override; +}; extern jlib_decl ICompressor *createLZWCompressor(bool supportbigendian=false); // bigendiansupport required for cross platform with solaris diff --git a/system/jlib/jlzw.ipp b/system/jlib/jlzw.ipp index 0e4ce4fcf7a..1f562b7d6cf 100644 --- a/system/jlib/jlzw.ipp +++ b/system/jlib/jlzw.ipp @@ -38,7 +38,6 @@ public: unsigned char dictchar[LZW_HASH_TABLE_SIZE]; }; - class CLZWCompressor final : public ICompressor, public CInterface { public: @@ -88,12 +87,9 @@ protected: bool supportbigendian; }; - -class jlib_decl CLZWExpander : public IExpander, public CInterface +class CLZWExpander : public CExpanderBase { public: - IMPLEMENT_IINTERFACE; - CLZWExpander(bool _supportbigendian); ~CLZWExpander(); virtual size32_t init(const void *blk); // returns size required