Skip to content

Commit

Permalink
decompressor support multi concatenated gzip in one file;
Browse files Browse the repository at this point in the history
  • Loading branch information
housisong committed Aug 12, 2024
1 parent 808a67f commit 81c3c99
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 103 deletions.
224 changes: 126 additions & 98 deletions programs/gzip_decompress_by_stream_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,127 +43,138 @@ static inline size_t _limitMaxDefBSize(size_t maxDeflateBlockSize){
}

static int _gzip_decompress_by_stream(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size,struct file_stream *out,
struct file_stream *in, u64 in_size,bool isMultiGz,struct file_stream *out,
xread_t xread_proc,full_write_t full_write_proc,
u64* _actual_in_nbytes_ret,u64* _actual_out_nbytes_ret){
if (isMultiGz) assert(_actual_in_nbytes_ret==0);
int err_code=0;
u8* data_buf=0;
u8* code_buf=0;
u8* _dict_buf_back=0;
u64 in_cur=0;
u64 out_cur=0;
u64 last_out_cur=0;
size_t curDeflateBlockSize=kMaxDeflateBlockSize;
size_t curBlockSize=_limitMaxDefBSize(curDeflateBlockSize);
size_t data_buf_size=curBlockSize+kDictSize;
size_t code_buf_size=(curBlockSize<in_size)?curBlockSize:in_size;
size_t data_cur=kDictSize; //empty
size_t code_cur=code_buf_size; //empty
size_t actual_in_nbytes_ret;
uint32_t data_crc=0;
int is_final_block_ret=0;
uint32_t data_crc;
int is_final_block_ret;
int ret;

data_buf=(u8*)malloc(data_buf_size);
_check(data_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
code_buf=(u8*)malloc(code_buf_size);
_check(code_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);

_read_code_from_file();
{//gzip head
ret=libdeflate_gzip_decompress_head(code_buf,code_buf_size-code_cur,&actual_in_nbytes_ret);
_check_d(ret);
code_cur+=actual_in_nbytes_ret;
}

while(1){
// [ ( dict ) | dataBuf ] [ codeBuf ]
// ^ ^ ^ ^ ^ ^ ^
// data_buf out_cur data_cur data_buf_size code_buf code_cur code_buf_size
size_t kLimitDataSize=curBlockSize/2+kDictSize;
size_t kLimitCodeSize=code_buf_size/2;
size_t actual_out_nbytes_ret;
uint16_t dec_state;
__datas_prepare:
if (is_final_block_ret||(data_cur>kLimitDataSize)){//save data to out file
if (out)
_check(0==full_write_proc(out,data_buf+kDictSize,data_cur-kDictSize), LIBDEFLATE_DESTREAM_WRITE_FILE_ERROR);
data_crc=libdeflate_crc32(data_crc,data_buf+kDictSize,data_cur-kDictSize);
out_cur+=data_cur-kDictSize;
if (is_final_block_ret)
break;
memmove(data_buf,data_buf+data_cur-kDictSize,kDictSize);//dict data for next block
data_cur=kDictSize;
do{//loop for multi concatenated gzip
data_crc=0;
is_final_block_ret=0;
{//gzip head
if (code_cur+GZIP_MIN_HEADER_SIZE>code_buf_size)
_read_code_from_file();
ret=libdeflate_gzip_decompress_head(code_buf,code_buf_size-code_cur,&actual_in_nbytes_ret);
_check_d(ret);
code_cur+=actual_in_nbytes_ret;
}
if (code_cur>kLimitCodeSize)
_read_code_from_file();
dec_state=libdeflate_deflate_decompress_get_state(d);
ret=libdeflate_deflate_decompress_block(d,code_buf+code_cur,code_buf_size-code_cur,
data_buf+data_cur-kDictSize,kDictSize,data_buf_size-data_cur,
&actual_in_nbytes_ret,&actual_out_nbytes_ret,
LIBDEFLATE_STOP_BY_ANY_BLOCK,&is_final_block_ret);
if (ret!=LIBDEFLATE_SUCCESS){
if (((in_cur==in_size)||((size_t)((code_buf_size-code_cur)-actual_in_nbytes_ret)>kInputSufficientSize))
&&(ret!=LIBDEFLATE_INSUFFICIENT_SPACE))
_check_d(ret);
kLimitDataSize=kDictSize;
kLimitCodeSize=0;
if ((data_cur>kDictSize)||((code_cur>0)&&(in_cur<in_size))) { //need more datas & retry
//ok
}else if (curDeflateBlockSize<kMaxDeflateBlockSize_max){//need increase buf size & retry
curDeflateBlockSize=(curDeflateBlockSize*2<kMaxDeflateBlockSize_max)?curDeflateBlockSize*2:kMaxDeflateBlockSize_max;
size_t _curBlockSize=_limitMaxDefBSize(curDeflateBlockSize);
size_t _data_buf_size=_curBlockSize+kDictSize;
const size_t loaded_in_size=(code_buf_size-code_cur);
const u64 rem_in_size=loaded_in_size+(in_size-in_cur);
size_t _code_buf_size=(_curBlockSize<rem_in_size)?_curBlockSize:rem_in_size;
curBlockSize=_curBlockSize;
{
assert(data_cur==kDictSize);
if (_dict_buf_back==0){
_dict_buf_back=(u8*)malloc(kDictSize);
_check(_dict_buf_back!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
}
memcpy(_dict_buf_back,data_buf,kDictSize);
free(data_buf); data_buf=0;
}
if (_code_buf_size>code_buf_size){
u8* _code_buf=(u8*)realloc(code_buf,_code_buf_size);
_check(_code_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
code_buf=_code_buf; _code_buf=0;
memcpy(code_buf+_code_buf_size-loaded_in_size,code_buf+code_cur,loaded_in_size);
code_cur+=_code_buf_size-code_buf_size;
code_buf_size=_code_buf_size;

while(1){
// [ ( dict ) | dataBuf ] [ codeBuf ]
// ^ ^ ^ ^ ^ ^ ^
// data_buf out_cur data_cur data_buf_size code_buf code_cur code_buf_size
size_t kLimitDataSize=curBlockSize/2+kDictSize;
size_t kLimitCodeSize=code_buf_size/2;
size_t actual_out_nbytes_ret;
uint16_t dec_state;
__datas_prepare:
if (is_final_block_ret||(data_cur>kLimitDataSize)){//save data to out file
if (out)
_check(0==full_write_proc(out,data_buf+kDictSize,data_cur-kDictSize), LIBDEFLATE_DESTREAM_WRITE_FILE_ERROR);
data_crc=libdeflate_crc32(data_crc,data_buf+kDictSize,data_cur-kDictSize);
out_cur+=data_cur-kDictSize;
if (isMultiGz){//dict data for next block
memmove(data_buf,data_buf+data_cur-kDictSize,kDictSize);
data_cur=kDictSize;
}
{
data_buf=(u8*)malloc(_data_buf_size);
_check(data_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
memcpy(data_buf,_dict_buf_back,kDictSize);
data_buf_size=_data_buf_size;
if (is_final_block_ret)
break;
}
if (code_cur>kLimitCodeSize)
_read_code_from_file();
dec_state=libdeflate_deflate_decompress_get_state(d);
ret=libdeflate_deflate_decompress_block(d,code_buf+code_cur,code_buf_size-code_cur,
data_buf+data_cur-kDictSize,kDictSize,data_buf_size-data_cur,
&actual_in_nbytes_ret,&actual_out_nbytes_ret,
LIBDEFLATE_STOP_BY_ANY_BLOCK,&is_final_block_ret);
if (ret!=LIBDEFLATE_SUCCESS){
if (((in_cur==in_size)||((size_t)((code_buf_size-code_cur)-actual_in_nbytes_ret)>kInputSufficientSize))
&&(ret!=LIBDEFLATE_INSUFFICIENT_SPACE))
_check_d(ret);
kLimitDataSize=kDictSize;
kLimitCodeSize=0;
if ((data_cur>kDictSize)||((code_cur>0)&&(in_cur<in_size))) { //need more datas & retry
//ok
}else if (curDeflateBlockSize<kMaxDeflateBlockSize_max){//need increase buf size & retry
curDeflateBlockSize=(curDeflateBlockSize*2<kMaxDeflateBlockSize_max)?curDeflateBlockSize*2:kMaxDeflateBlockSize_max;
size_t _curBlockSize=_limitMaxDefBSize(curDeflateBlockSize);
size_t _data_buf_size=_curBlockSize+kDictSize;
const size_t loaded_in_size=(code_buf_size-code_cur);
const u64 rem_in_size=loaded_in_size+(in_size-in_cur);
size_t _code_buf_size=(_curBlockSize<rem_in_size)?_curBlockSize:rem_in_size;
curBlockSize=_curBlockSize;
{
assert(data_cur==kDictSize);
if (_dict_buf_back==0){
_dict_buf_back=(u8*)malloc(kDictSize);
_check(_dict_buf_back!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
}
memcpy(_dict_buf_back,data_buf,kDictSize);
free(data_buf); data_buf=0;
}
if (_code_buf_size>code_buf_size){
u8* _code_buf=(u8*)realloc(code_buf,_code_buf_size);
_check(_code_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
code_buf=_code_buf; _code_buf=0;
memcpy(code_buf+_code_buf_size-loaded_in_size,code_buf+code_cur,loaded_in_size);
code_cur+=_code_buf_size-code_buf_size;
code_buf_size=_code_buf_size;
}
{
data_buf=(u8*)malloc(_data_buf_size);
_check(data_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
memcpy(data_buf,_dict_buf_back,kDictSize);
data_buf_size=_data_buf_size;
}
}else{ //decompress fail, can't increase buf
_check_d(ret);
}
}else{ //decompress fail, can't increase buf
_check_d(ret);
libdeflate_deflate_decompress_set_state(d,dec_state);
goto __datas_prepare; //retry by more datas
}
libdeflate_deflate_decompress_set_state(d,dec_state);
goto __datas_prepare; //retry by more datas
//decompress ok
code_cur+=actual_in_nbytes_ret;
data_cur+=actual_out_nbytes_ret;
}
//decompress ok
code_cur+=actual_in_nbytes_ret;
data_cur+=actual_out_nbytes_ret;
}

{//gzip foot
uint32_t saved_crc;
uint32_t saved_uncompress_nbytes;
if (code_cur+GZIP_FOOTER_SIZE>code_buf_size)
_read_code_from_file();
ret=libdeflate_gzip_decompress_foot(code_buf+code_cur,code_buf_size-code_cur,
&saved_crc,&saved_uncompress_nbytes,&actual_in_nbytes_ret);
_check_d(ret);
code_cur+=actual_in_nbytes_ret;

{//gzip foot
uint32_t saved_crc;
uint32_t saved_uncompress_nbytes;
if (code_cur+GZIP_FOOTER_SIZE>code_buf_size)
_read_code_from_file();
ret=libdeflate_gzip_decompress_foot(code_buf+code_cur,code_buf_size-code_cur,
&saved_crc,&saved_uncompress_nbytes,&actual_in_nbytes_ret);
_check_d(ret);
code_cur+=actual_in_nbytes_ret;

_check(saved_crc==data_crc, LIBDEFLATE_DESTREAM_DATA_CRC_ERROR);
_check(saved_uncompress_nbytes==(u32)out_cur, LIBDEFLATE_DESTREAM_DATA_SIZE_ERROR);
}
_check(saved_crc==data_crc, LIBDEFLATE_DESTREAM_DATA_CRC_ERROR);
_check(saved_uncompress_nbytes==(u32)(out_cur-last_out_cur), LIBDEFLATE_DESTREAM_DATA_SIZE_ERROR);
last_out_cur=out_cur;
}
} while (isMultiGz && (0<(u64)((in_size-in_cur)+(code_buf_size-code_cur))));

if (_actual_in_nbytes_ret)
*_actual_in_nbytes_ret=(in_cur-(size_t)(code_buf_size-code_cur));
Expand All @@ -182,10 +193,16 @@ static int _gzip_decompress_by_stream(struct libdeflate_decompressor *d,
int gzip_decompress_by_stream(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size, struct file_stream *out,
u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret){
return _gzip_decompress_by_stream(d,in,in_size,out,xread,full_write,
return _gzip_decompress_by_stream(d,in,in_size,false,out,xread,full_write,
actual_in_nbytes_ret,actual_out_nbytes_ret);
}

int gzips_decompress_by_stream(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size, struct file_stream *out,
u64* actual_out_nbytes_ret){
return _gzip_decompress_by_stream(d,in,in_size,true,out,xread,full_write,
0,actual_out_nbytes_ret);
}

//------------------------------------------------------------------------------------------------
// multi-thread
Expand Down Expand Up @@ -392,11 +409,11 @@ static void _write_out_thread(out_stream_mt* self){
self->work_end();
}

int gzip_decompress_by_stream_mt(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size,struct file_stream *out,
int thread_num,u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret){
int _gzip_decompress_by_stream_mt(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size,bool isMultiGz,struct file_stream *out,
int thread_num,u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret){
if (thread_num<=1){
return _gzip_decompress_by_stream(d,in,in_size,out,xread,full_write,
return _gzip_decompress_by_stream(d,in,in_size,isMultiGz,out,xread,full_write,
actual_in_nbytes_ret,actual_out_nbytes_ret);
}
xread_t xread_proc=xread;
Expand All @@ -422,7 +439,7 @@ int gzip_decompress_by_stream_mt(struct libdeflate_decompressor *d,
if (in_mt.base) threads[--thread_size]=std::thread(_read_in_thread,&in_mt);
if (out_mt.base) threads[--thread_size]=std::thread(_write_out_thread,&out_mt);
u64 work_write_ret=0;
int ret=_gzip_decompress_by_stream(d,in,in_size,out,xread_proc,full_write_proc,
int ret=_gzip_decompress_by_stream(d,in,in_size,isMultiGz,out,xread_proc,full_write_proc,
actual_in_nbytes_ret,&work_write_ret);
if (in_mt.base)
in_mt.work_end();
Expand All @@ -446,3 +463,14 @@ int gzip_decompress_by_stream_mt(struct libdeflate_decompressor *d,
return LIBDEFLATE_DESTREAM_MT_EXCEPTION_ERROR;
}
}

int gzip_decompress_by_stream_mt(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size,struct file_stream *out,
int thread_num,u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret){
return _gzip_decompress_by_stream_mt(d,in,in_size,false,out,thread_num,actual_in_nbytes_ret,actual_out_nbytes_ret);
}
int gzips_decompress_by_stream_mt(struct libdeflate_decompressor *d,
struct file_stream *in, u64 in_size,struct file_stream *out,
int thread_num,u64* actual_out_nbytes_ret){
return _gzip_decompress_by_stream_mt(d,in,in_size,true,out,thread_num,0,actual_out_nbytes_ret);
}
15 changes: 13 additions & 2 deletions programs/gzip_decompress_by_stream_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,29 @@ enum libdeflate_destream_result{
LIBDEFLATE_DESTREAM_MT_WRITE_THREAD_EXCEPTION_ERROR,//30
};

//decompress gzip by stream
//decompress one gzip by stream
// 'out','actual_in_nbytes_ret','actual_out_nbytes_ret' can NULL;
// return value is libdeflate_result or libdeflate_destream_result.
int gzip_decompress_by_stream(struct libdeflate_decompressor *decompressor,
struct file_stream *in, u64 in_size, struct file_stream *out,
u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret);

//decompress gzip by stream & multi-thread
//decompress one gzip by stream & multi-thread
int gzip_decompress_by_stream_mt(struct libdeflate_decompressor *decompressor,
struct file_stream *in, u64 in_size, struct file_stream *out,
int thread_num,u64* actual_in_nbytes_ret,u64* actual_out_nbytes_ret);


//decompress multi concatenated gzip by stream
int gzips_decompress_by_stream(struct libdeflate_decompressor *decompressor,
struct file_stream *in, u64 in_size, struct file_stream *out,
u64* actual_out_nbytes_ret);

//decompress multi concatenated gzip by stream & multi-thread
int gzips_decompress_by_stream_mt(struct libdeflate_decompressor *decompressor,
struct file_stream *in, u64 in_size, struct file_stream *out,
int thread_num,u64* actual_out_nbytes_ret);

#ifdef __cplusplus
}
#endif
Expand Down
6 changes: 3 additions & 3 deletions programs/pgzip.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ decompress_file(struct libdeflate_decompressor *decompressor, const tchar *path,
if (ret != 0)
goto out_close_in;

ret = gzip_decompress_by_stream_mt(decompressor, &in, stbuf.st_size, (options->test?0:&out),
options->thread_num, NULL, NULL);
ret = gzips_decompress_by_stream_mt(decompressor, &in, stbuf.st_size, (options->test?0:&out),
options->thread_num, NULL);
if (ret != 0){
msg("\nERROR: gzip_decompress_by_stream_mt() error code %d\n\n",ret);
msg("\nERROR: gzips_decompress_by_stream_mt() error code %d\n\n",ret);
goto out_close_out;
}

Expand Down

1 comment on commit 81c3c99

@sisong
Copy link
Owner

@sisong sisong commented on 81c3c99 Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close #1

Please sign in to comment.