-
Notifications
You must be signed in to change notification settings - Fork 0
/
t_pools.hpp
160 lines (134 loc) · 3.65 KB
/
t_pools.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/**
* @file: t_pools.hpp
* @brief: 线程池容器实现文件. 内部提供收集所有线程就绪的接口。
* @author: wusheng Hu
* @version: v0x0001
* @date: 2018-04-22
*/
#ifndef __T_THREAD_POOLS_HPP__
#define __T_THREAD_POOLS_HPP__
#include <pthread.h>
#include <vector>
#include <iostream>
namespace T_TCP
{
template <typename T>
class PthreadPools
{
public:
PthreadPools(const int iPoolSize);
virtual ~PthreadPools();
/**
* @brief: StartAllThreads
* 启动线程池中所有线程.线程池 使用方调用.
* @return
*/
bool StartAllThreads();
T* GetIndexThread(const int iIndex);
/**
* @brief: AllocateThread
* 从线程池中分配一个可用,已经被启动的线程.
* @return 返回线程的地址.
*/
T* AllocateThread();
private:
/**
* @brief: WaitThreadRegiste
* 等待线程池所有线程都创建完成.
*/
void WaitThreadRegiste();
/**
* @brief: DeleteThreads
* 等待线程池中所有线程都退出。
* 并删除线程对象资源
*/
void DeleteThreads();
pthread_mutex_t m_InitLock;
pthread_cond_t m_InitCond;
int m_iReadyThreadNums;
std::vector<T*> m_vPoolList;
int m_iPoolSize;
pthread_mutex_t m_AllocLock;
int m_iCurThreadIndex;
};
////////////////////////////////////////////
////////////////////////////////////////////
///implement for pthreadpools
template<typename T>
PthreadPools<T>::PthreadPools(const int iPoolSize): m_iReadyThreadNums(0), m_iPoolSize (iPoolSize),
m_iCurThreadIndex(0)
{
pthread_mutex_init(&m_InitLock, NULL);
pthread_cond_init(&m_InitCond, NULL);
pthread_mutex_init(&m_AllocLock, NULL);
}
template<typename T>
PthreadPools<T>::~PthreadPools ()
{
DeleteThreads();
}
template<typename T>
bool PthreadPools<T>::StartAllThreads()
{
if (m_iPoolSize <= 0)
{
return false;
}
for (int i = 0; i < m_iPoolSize; ++i)
{
T* pThread = new T(&m_InitLock, &m_InitCond, &m_iReadyThreadNums);
m_vPoolList.push_back(pThread);
}
for (int i = 0; i < m_iPoolSize; ++i)
{
m_vPoolList.at(i)->Start();
}
WaitThreadRegiste();
std::cout << "all threads start done ..." << std::endl;
return true;
}
template<typename T>
void PthreadPools<T>::WaitThreadRegiste()
{
pthread_mutex_lock(&m_InitLock);
while( m_iReadyThreadNums < m_iPoolSize)
{
pthread_cond_wait(&m_InitCond, &m_InitLock);
}
pthread_mutex_unlock(&m_InitLock);
}
template<typename T>
void PthreadPools<T>::DeleteThreads()
{
if (m_iPoolSize <=0)
{
return ;
}
for (int i = 0; i < m_iPoolSize; ++i)
{
m_vPoolList.at(i)->JoinWork();
delete m_vPoolList[i];
}
m_vPoolList.clear();
}
template<typename T>
T* PthreadPools<T>::GetIndexThread(const int iIndex)
{
if (iIndex + 1 > m_iPoolSize)
{
return NULL;
}
if (iIndex < 0)
{
return NULL;
}
return m_vPoolList.at(iIndex);;
}
template<typename T>
T* PthreadPools<T>::AllocateThread()
{
int iIndex = (m_iCurThreadIndex++) % m_iPoolSize;
return m_vPoolList.at(iIndex);
}
}
#endif