Skip to content

Commit

Permalink
this_thread::yield() changed to condition_variable::wait & this_threa…
Browse files Browse the repository at this point in the history
…d::sleep_for(x); for reduce CPU usage;

try reduce mem when meet large block;
  • Loading branch information
housisong committed Aug 14, 2024
1 parent 7f88823 commit 342f0cd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pgzip/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ else
CXXFLAGS += -fvisibility-inlines-hidden
endif
CFLAGS += $(DEF_FLAGS)
CXXFLAGS += $(DEF_FLAGS) -std=c++11
CXXFLAGS += $(DEF_FLAGS) -std=c++11 -D_GLIBCXX_USE_NANOSLEEP

DEF_LINK := -lpthread -lstdc++
ifeq ($(M32),0)
Expand Down
35 changes: 21 additions & 14 deletions programs/gzip_decompress_by_stream_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <assert.h>
#include <string.h> //memcpy
#include "../lib/gzip_overhead.h"
Expand Down Expand Up @@ -111,7 +112,8 @@ static int _gzip_decompress_by_stream(struct libdeflate_decompressor *d,
&actual_in_nbytes_ret,&actual_out_nbytes_ret,
LIBDEFLATE_STOP_BY_ANY_BLOCK,&is_final_block_ret);
if (ret!=LIBDEFLATE_SUCCESS){
if (((in_cur==in_size)||((size_t)((code_buf_size-code_cur)-actual_in_nbytes_ret)>kInputSufficientSize))
const size_t inputRemSize=(size_t)((code_buf_size-code_cur)-actual_in_nbytes_ret);
if (((in_cur==in_size)||(inputRemSize>kInputSufficientSize))
&&(ret!=LIBDEFLATE_INSUFFICIENT_SPACE))
_check_d(ret);
kLimitDataSize=kDictSize;
Expand All @@ -135,7 +137,7 @@ static int _gzip_decompress_by_stream(struct libdeflate_decompressor *d,
memcpy(_dict_buf_back,data_buf,kDictSize);
free(data_buf); data_buf=0;
}
if (_code_buf_size>code_buf_size){
if ((_code_buf_size>code_buf_size)&&(inputRemSize<=kInputSufficientSize)){
u8* _code_buf=(u8*)realloc(code_buf,_code_buf_size);
_check(_code_buf!=0, LIBDEFLATE_DESTREAM_MEM_ALLOC_ERROR);
code_buf=_code_buf; _code_buf=0;
Expand Down Expand Up @@ -208,7 +210,7 @@ int gzips_decompress_by_stream(struct libdeflate_decompressor *d,
// multi-thread
static const size_t kBestWBufCount=3;
static const size_t kWBufSize=1024*256;
static const size_t kRBufSize=1024*64;
static const size_t kRBufSize=1024*128;

struct work_buf_t{
work_buf_t* next;
Expand Down Expand Up @@ -244,13 +246,16 @@ struct stream_mt_t{
}
inline void work_end(){
_is_work_exit.store(1);
_wait_variable.notify_all();
}inline void work_finished(){
_is_work_finished.store(1);
_wait_variable.notify_all();
}
void push_free_buf(work_buf_t* node_buf){
std::lock_guard<std::mutex> _auto_locker(_lock);
node_buf->next=free_list;
free_list=node_buf;
_wait_variable.notify_one();
}
inline work_buf_t* pop_free_buf(){//wait a free node
return _pop_buf(&free_list);
Expand All @@ -261,6 +266,7 @@ struct stream_mt_t{
work_buf_t** plist=&work_list;
while (*plist) plist=&((*plist)->next);//to last node
*plist=node_buf;
_wait_variable.notify_one();
}
inline work_buf_t* pop_work_buf(){//wait a work node
return _pop_buf(&work_list);
Expand All @@ -272,19 +278,20 @@ struct stream_mt_t{
std::mutex _lock;
std::atomic<int> _is_work_exit;
std::atomic<int> _is_work_finished;
std::condition_variable _wait_variable;

work_buf_t* _pop_buf(work_buf_t** plist){//wait a free node
while (!_is_work_exit.load()){
{
std::lock_guard<std::mutex> _auto_locker(_lock);
work_buf_t* result=*plist;
if (result){
*plist=(*plist)->next;
return result;
}else if (_is_work_finished.load())
break;
while (!_is_work_exit.load()){
std::unique_lock<std::mutex> _wait_locker(_lock);
work_buf_t* result=*plist;
if (result){
*plist=(*plist)->next;
return result;
}else if (_is_work_finished.load()){
break;
}else{//in wait
_wait_variable.wait(_wait_locker);
}
std::this_thread::yield(); //todo: wait by signal, not use thread yield
}
return 0; //exit work loop
}
Expand Down Expand Up @@ -355,7 +362,7 @@ struct out_stream_mt:public stream_mt_t{
}
work_finished();
while (!_is_work_exit.load()){
std::this_thread::yield(); //todo: wait by signal, not use thread yield
std::this_thread::sleep_for(std::chrono::microseconds(100)); //todo: wait by signal
}
}
};
Expand Down

1 comment on commit 342f0cd

@sisong
Copy link
Owner

@sisong sisong commented on 342f0cd Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close #2

Please sign in to comment.