线程池概要

一个线程池的主要组成部分:

  • 任务队列:存放Provider分配的任务(task),工作线程不断从队列中取任务并执行。通常被设计为FIFO队列。
  • 工作线程:多个线程,它们从任务队列获取任务并执行,通常它们的生命周期与线程池相同。工作线程的数目可以是动态的,由程序负载决定。
  • 管理线程:监测线程池的负载,为了提高线程池运算能力或降低资源开销,适时创建或销毁工作线程。管理线程不是必须的。

线程安全的队列

当Provider向任务队列中提供任务、工作线程从队列中取任务时,对同一个资源进行读写操作将会出现冲突,因此需要对原生队列进行包装,实现一个线程安全的任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
template <typename T>
class SafeQueue{
public:
bool empty() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
}
void push(const T & item) {
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.emplace(item);
}
condition_.notify_one();
}
void push(T && item){
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.emplace(std::move(item));
}
}
bool pop(T & item) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()){
return false;
}
item = std::move(queue_.front());
queue_.pop();
return true;
}

bool wait_pop(T & item) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this](){ return !queue_.empty() || stop; });
if (queue_.empty()){
return false;
}
item = std::move(queue_.front());
queue_.pop();
return true;
}
void destory(){
{
std::unique_lock<std::mutex> lock(mutex_);
stop = true;
}
condition_.notify_all();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable condition_;
bool stop = false;
};

下面对SafeQueue中的细节作简要说明:

  • stop表示该队列(或线程池)是否停止。当线程池停止时,将SafeQueue的stop设为true,此时工作线程停止从队列中取任务。
  • 在实现入队操作时,重载了两个push函数:
    • push(const T &item) 用于将一个左值对象推入队列,其中参数 item 是一个常量左值引用。这意味着 item 的生命周期应大于 push() 函数。
    • push(T &&item) 用于将一个右值对象推入队列,其中参数 item 是一个右值引用。这意味着 item 的生命周期可以比 push() 函数更短。
    • 在队列中将右值对象与左值对象区分开来的原因是:右值引用允许快速移动资源所有权,从而避免复制数据和额外的开销。
  • 定义两种类型地出队方式:
    • pop()表示非阻塞的从队列中弹出值,当队列为空时线程不会停等,直接返回。
    • wait_pop()通过条件变量实现阻塞地从队列中取值,只有当队列为空且队列已未停止时将停等。

单队列线程池

单队列线程池即有多个工作线程和一个任务队列,所有工作线程都从这个任务队列中取任务,这是线程池最简单的一种实现方式。

初始化线程池

初始化若干个工作线程,这些线程都在“抢占队列(锁)—>取任务—>执行任务”循环。

工作线程的数量需要根据应用场景和实际环境进行设计:

  • 对于IO密集型任务,线程数应该更多,因为这些任务通常需要等待 I/O 操作完成后才能继续执行。如果线程数不足,则会导致线程阻塞,从而降低了程序的整体吞吐量和性能
  • 对于计算密集型任务,线程数不应该过多,因为这些任务涉及到繁重的计算操作,需要占用 CPU 资源。如果线程数太多,将会大量消耗 CPU 的资源,反而造成系统资源的浪费和降低程序的性能。同时还可以通过实验和性能测试来调整线程池的大小。

这里将线程个数默认设置为std::thread::hardware_concurrency(),即机器支持的最大并发数。

1
2
3
4
5
6
7
8
9
10
11
12
13
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()){
for(size_t i = 0;i<threads;++i){
workers.emplace_back([this]{
while(1){
std::function<void()> task;
if(!queue.wait_pop(task)) {
return ;
}
task();
}
});
}
}

任务入队

利用模板机制,我们可以将任意可以调用的对象(函数、函数指针、Lambda表达式)作为一个任务,并且参数和返回值没有限制。

1
2
3
4
5
6
7
8
9
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
auto task = std::make_shared< std::packaged_task<decltype(f(args...))()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<decltype(f(args...))> res = task->get_future();
queue.push([task](){ (*task)(); });
return res;
}
  • auto ->auto为自动推导类型,当它作为函数类型时为占位符,->后为返回值类型。关键字decltype可以获取数据类型。
  • std::make_shared<T>将返回一个shared_ptr<T>智能指针,指针指向堆空间。这里不能直接使用变量,因为其生存周期要保证到task真正被调用。
  • std::bind绑定一个可调用对象和参数,生成一个新的可调用对象,调用的结果等同于原调用对象在该参数下的结果。
  • std::forward会将输入的参数原封不动地传递到下一个函数中,这个“原封不动”指的是,如果输入的参数是左值,那么传递给下一个函数的参数的也是左值;如果输入的参数是右值,那么传递给下一个函数的参数的也是右值。
  • std::future可以关联线程运行的函数和函数的返回值。std::future 通常由某个 Provider(异步任务提供者) 创建,Provider 在某个线程中设置共享状态的值,与该共享状态相关联的 std::future 对象调用 get(通常在另外一个线程中) 获取该值,如果共享状态的标志不为 ready,则调用 std::future::get 会阻塞当前的调用者,直到 Provider 设置了共享状态的值(此时共享状态的标志变为 ready),std::future::get 返回异步任务的值或异常(如果发生了异常)。通过std::future,线程执行函数的返回结果不必保存在全局变量中。

析构队列

调用SafeQueue::destory()

1
2
3
4
5
6
~SimplePool(){
queue.destory();
for(auto & th : workers){
th.join();
}
}

多队列线程池

传统的单队列线程池有一个瓶颈,当多个工作线程争抢任务时,会先进行加锁,频繁的加锁解锁会有一定的性能开销。为了减少线程之间的竞争,我们可以为每一个工作线程设置一个工作队列,Provider每次将任务随机放到一个队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MultiplePool{
public:
explicit MultiplePool(size_t threads = std::thread::hardware_concurrency()): threads_num(threads), queues(threads){
auto worker = [this] (size_t i) {
while(1){
std::function<void()> task;
if(!queues[i].wait_pop(task)) {
return;
}
task();
}
};
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back(worker, i);
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>{
auto task = std::make_shared< std::packaged_task<decltype(f(args...))()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<decltype(f(args...))> res = task->get_future();
size_t i = rand() % threads_num;
queues[i].push([task](){ (*task)(); });
return res;
}
~MultiplePool(){
for(auto & que : queues) {
que.destory();
}
for(auto & th : workers) {
th.join();
}
}
private:
std::vector<std::thread> workers;
std::vector<SafeQueue<std::function<void()>>> queues;
size_t threads_num;
};

work stealing

多队列线程池存在负载均衡的问题。例如有两个线程和两个任务队列,其中队列A有10个1ms的任务,队列B有10个100ms的任务,那么当线程A执行完所有任务后,线程B仍有大量任务未解决;如果线程A挂起,将大大降低线程池的效率。work stealing的主要思路是:当一个线程执行完自己队列中的任务后,不会立即挂起等待,而是将其它线程的任务队列中未执行的任务“偷过来”执行。

实现work stealing机制的方法有很多,这这里给出一种简单的方法:

1
2
3
4
5
6
7
8
9
10
auto worker = [this] (size_t i) {
while(! stop) {
std::function<void()> task;
if (queues[i].pop(task) || try_steal(task, i)) {
task();
}else{
std::this_thread::yield();
}
}
};

注意从自己队列取任务使用pop而非wait_popWorkStealPool::try_steal是尝试从其它队列偷取任务执行,实现如下:

1
2
3
4
5
6
7
8
9
bool try_steal(std::function<void()>& task, size_t index) {
for (size_t i = 0; i < threads_num; ++i) {
if (i == index) continue;
if (queues[i].pop(task)) {
return true;
}
}
return false;
}

三种线程池进行比较

编写一个简单的测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const int kTasks = 1000;  
const int kRange = 500;

void add_random(int& val) {
const int kMaxIterations = rand() % 50 * 10000;
for (int i = 0; i < kMaxIterations; ++i) {
val += rand() % (kRange / 2) - kRange;
}
}

template <typename T>
void test_threadPool(T* pool, const std::string & type ) {
auto start_time = std::chrono::system_clock::now();
std::vector<std::future<void>> futures;
for(int i = 0; i < kTasks; ++i) {
int val = i;
futures.emplace_back(pool->submit(add_random, val));
}
for (auto& future : futures) {
future.wait();
}
auto end_time = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
std::cout << type << " elapsed time: " << ms << "ms\n";
}

测试结果:

1
2
3
MultiplePool elapsed time: 3154ms
WorkStealPool elapsed time: 3109ms
SimplePool elapsed time: 3298ms

这三种线程池性能在该测试案例下并没有很大区别,甚至当单个任务执行时间相似时(kMaxIterations取常量)WorkStealPool不如MultiplePool。个人分析的原因如下:

  • SimplePool中的线程获取任务是主动抢占式的,而MultiplePool中的线程获取任务是被动随机分配的。尽管前者会存在更多的线程竞争,但线程执行任务的效率会更高。
  • WorkStealPool采用的是单头FIFO队列,在获取其它队列的任务时仍然会和原线程存在竞争。而为了减少这种竞争,最好使用双向队列。

参考