coplus is a parallel programming library written in C++11. It started as an attempt to emulate Go-like concurrency in C++. Now it's growing to be a general parallel programming library aimed at high performance and elegant interface. To accomplish that, coplus is built upon highly-optimized data structure and provides various semantics for different application context.
- Wide use of wait-free data structure
- Non-hacking code and zero-dependency on third-party library like boost::context
- Various built-in concurrency models
- Message Channel
- Many-to-one thread pool
- Many-to-many thread pool with lock-sync queue
- Thread pool with wait-free queue
- Thread pool with coroutine
- Advance Features including schedule by priority
- Concurrent Tools
Targeted at light-weight task scheduling, ThreadPool provides simple interface for complex concurrent controls:
Machine
: Runs scheduler routine on user thread, easy toclose
andrestart
.Task
: Provides base interface for Schedulable job, easy to implement withbool call()
,void raw_call()
.ThreadPool
: Includessubmit
,resize
,close
,report
functions.
A limitation in coroutine's usage is worth notice: all coroutine methods can't be called in a regular thread, but in a thread-pool managed routine. To start a program main entry in thread-pool, use co_main
instead.
Traditional implementation of coroutine includes following interfaces:
-
create a coroutine
Each scheduler in ThreadPool is by itself a coroutine. When a task being schedule is a coroutine, the inner call method execute a Switch operation.
-
exit a coroutine
To exit a coroutine, we need to know the previous coroutine to go to. To accomplish that, all tasks that is possibly called as coroutine must take a return coroutine as parameter.
-
yield
Simply store the current state and yield to return to return coroutine. A global area is used to store all undone coroutine, and a waiting coroutine must be re-insert into active task queue.
// Use Case to poll a external event
auto trace = pool.go(std::bind([&](int input)-> void*{
colog << "Start Polling";
startEvent(input);
int retryCount = 100;
while(count --){
if(eventStatus == kReady){
return getEventResultPointer();
}
else{
yield(); // yield control
}
}
colog << "End Polling";
return nullptr;
}, 42));
colog << "Task finished? = " << (trace.get() != nullptr);
-
await
In my implementation, await has many forms:
a) await for function
// Use Case
auto trace = go(()-> int{
int count = 100;
while(count--){
auto trace = await([]{return 7;});
colog << trace.get(); // no blocking here
}
return 42;
});
colog << trace.get();
b) await for unconditional trigger
// Use Case for synchronization
Trigger lock = NewTrigger();
int data = -1;
auto trace = go([&]()-> int{
while(true){
await(lock);
// need sync if more task notify lock
if(data > 0)break;
}
return data;
});
go([&]{
data = 7;
notify(lock); // assume the waiter is in wait
return ;
});
colog << trace.get(); // should be 7 here
// Use Case for asynchronous IO
Trigger IOReady = NewTrigger();
Data IOStore = NewStore();
void IOCallback(void* p){
copy(IOStore, p);
IOReady.notify();
}
auto NewIOWorker = go([&]{
startIO(IOCallback);
await(IOReady);
displayToTerminal(IOStore);
return true;
});
// Do Non-IO Work Here //
return NewIOWorker.get();
c) await for timer: two possible semantics {wait for grant or wait for execution}, latter is for preemptive coroutine.
// Use Case
int coplus::main(int argc, char** argv){
colog << "this is main()";
colog << "wait for 10 seconds";
await(10); // 10 seconds
colog << "exit main()...";
return 0;
}
All four of them are in essence accomplished by a register-notify
mechanism. Register
bookkeep the caller's running context and address, Notify
will activate the kept context and resubmit into ready queue.
Based on these powerful async tools, IO operation can by easily managed by trigger, allowing user-level IO scheduling.
PushOnlyFastStack
: Provide a interface for concurrent data collection with push and retrieve-by-id. Also, This structure contains a partial spec for std::unique_ptr which return naked pointer as retrived result.SlowQueue
: Filo queue sync using a std mutex, used in low-concurrency condition like freelist inFastQueue
.FastLocalQueue
: Provide a multi-consumer single-producer queue structure, which is wait-free and highly optimized.
Basic idea is two sets of container meta data for accessor to conduct double-check entering.
FastQueue
: A synthesized structure composed of the structures mentioned before. Provide a fully concurrent filo queue structure.
Each producer will apply for a thread local buffer which is reusable. Then only race condition between consumers needs to be considered, which can be easily resolved by distributing consumer and support from FastLocalQueue
.
LeveledHashTable
: Multi-layered hash table optimized for concurrent access while mutating.
Based on semantics of await(Task), allow dynamic scheduling of complex inter-thread dependency.
Channel
: Message transport tool mocking Go's chan. Current implementation uses std mutex and condition_variable to provide basic function, further optimization will introduce lock-free shared memory.
Channel<bool,0> syncChan;
// thread A
syncChan >> isThreadBReady; // block here waiting for B
// thread B
if(!work())syncChan << false;
syncChan << true;
colog
: Message output tools merely for debug or strong ordering condition, synchronized with plain lock. Enhanced implementation could uses thread local buffer to increase throughput, orFastQueue
to avoid blocking.cotimer
: Thread local timer.Socket
: asynchronic encapsulation of system socket.rpc
Under 4-core computer, execute enqueue operation with 3 working threads on FastQueue and NaiveQueue(std::queue with mutex), FastQueue outperforms NaiveQueue by 8 times.