线程池概要 一个线程池的主要组成部分:
任务队列:存放Provider分配的任务(task),工作线程不断从队列中取任务并执行。通常被设计为FIFO队列。
工作线程:多个线程,它们从任务队列获取任务并执行,通常它们的生命周期与线程池相同。工作线程的数目可以是动态的,由程序负载决定。
管理线程:监测线程池的负载,为了提高线程池运算能力或降低资源开销,适时创建或销毁工作线程。管理线程不是必须的。
线程安全的队列 当Provider向任务队列中提供任务、工作线程从队列中取任务时,对同一个资源进行读写操作将会出现冲突,因此需要对原生队列进行包装,实现一个线程安全 的任务队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 template <typename T>class SafeQueue {public : bool empty () { std::unique_lock<std::mutex> lock (mutex_) ; return queue_.empty (); } size_t size () { std::unique_lock<std::mutex> lock (mutex_) ; return queue_.size (); } void push (const T & item) { { std::unique_lock<std::mutex> lock (mutex_) ; queue_.emplace (item); } condition_.notify_one (); } void push (T && item) { { std::unique_lock<std::mutex> lock (mutex_) ; queue_.emplace (std::move (item)); } } bool pop (T & item) { std::unique_lock<std::mutex> lock (mutex_) ; if (queue_.empty ()){ return false ; } item = std::move (queue_.front ()); queue_.pop (); return true ; } bool wait_pop (T & item) { std::unique_lock<std::mutex> lock (mutex_) ; condition_.wait (lock, [this ](){ return !queue_.empty () || stop; }); if (queue_.empty ()){ return false ; } item = std::move (queue_.front ()); queue_.pop (); return true ; } void destory () { { std::unique_lock<std::mutex> lock (mutex_) ; stop = true ; } condition_.notify_all (); } private : std::queue<T> queue_; std::mutex mutex_; std::condition_variable condition_; bool stop = false ; };
下面对SafeQueue中的细节作简要说明:
stop
表示该队列(或线程池)是否停止。当线程池停止时,将SafeQueue的stop
设为true
,此时工作线程停止从队列中取任务。
在实现入队操作时,重载了两个push
函数:
push(const T &item)
用于将一个左值对象推入队列,其中参数 item
是一个常量左值引用。这意味着 item 的生命周期应大于 push()
函数。
push(T &&item)
用于将一个右值对象推入队列,其中参数 item
是一个右值引用。这意味着 item
的生命周期可以比 push()
函数更短。
在队列中将右值对象与左值对象区分开来的原因是:右值引用允许快速移动资源所有权,从而避免复制数据和额外的开销。
定义两种类型地出队方式:
pop()
表示非阻塞的从队列中弹出值,当队列为空时线程不会停等,直接返回。
wait_pop()
通过条件变量实现阻塞地从队列中取值,只有当队列为空且队列已未停止时将停等。
单队列线程池 单队列线程池即有多个工作线程和一个任务队列,所有工作线程都从这个任务队列中取任务,这是线程池最简单的一种实现方式。
初始化线程池 初始化若干个工作线程,这些线程都在“抢占队列(锁)—>取任务—>执行任务”循环。
工作线程的数量需要根据应用场景和实际环境进行设计:
对于IO密集型任务,线程数应该更多,因为这些任务通常需要等待 I/O 操作完成后才能继续执行。如果线程数不足,则会导致线程阻塞,从而降低了程序的整体吞吐量和性能
对于计算密集型任务,线程数不应该过多,因为这些任务涉及到繁重的计算操作,需要占用 CPU 资源。如果线程数太多,将会大量消耗 CPU 的资源,反而造成系统资源的浪费和降低程序的性能。同时还可以通过实验和性能测试来调整线程池的大小。
这里将线程个数默认设置为std::thread::hardware_concurrency()
,即机器支持的最大并发数。
1 2 3 4 5 6 7 8 9 10 11 12 13 explicit ThreadPool (size_t threads = std::thread::hardware_concurrency()) { for (size_t i = 0 ;i<threads;++i){ workers.emplace_back ([this ]{ while (1 ){ std::function<void ()> task; if (!queue.wait_pop (task)) { return ; } task (); } }); } }
任务入队 利用模板机制,我们可以将任意可以调用的对象(函数、函数指针、Lambda表达式)作为一个任务,并且参数和返回值没有限制。
1 2 3 4 5 6 7 8 9 template <typename F, typename ... Args>auto enqueue (F&& f, Args&&... args) -> std::future<decltype (f(args...)) > { auto task = std::make_shared< std::packaged_task<decltype (f (args...))()> >( std::bind (std::forward<F>(f), std::forward<Args>(args)...) ); std::future<decltype (f (args...))> res = task->get_future (); queue.push ([task](){ (*task)(); }); return res; }
auto ->
:auto
为自动推导类型,当它作为函数类型时为占位符,->
后为返回值类型。关键字decltype
可以获取数据类型。
std::make_shared<T>
将返回一个shared_ptr<T>
智能指针,指针指向堆空间。这里不能直接使用变量,因为其生存周期要保证到task
真正被调用。
std::bind
绑定一个可调用对象和参数,生成一个新的可调用对象,调用的结果等同于原调用对象在该参数下的结果。
std::forward
会将输入的参数原封不动地传递到下一个函数中,这个“原封不动”指的是,如果输入的参数是左值,那么传递给下一个函数的参数的也是左值;如果输入的参数是右值,那么传递给下一个函数的参数的也是右值。
std::future
可以关联线程运行的函数和函数的返回值。std::future
通常由某个 Provider(异步任务提供者) 创建,Provider 在某个线程中设置共享状态的值,与该共享状态相关联的 std::future
对象调用 get
(通常在另外一个线程中) 获取该值,如果共享状态的标志不为 ready
,则调用 std::future::get
会阻塞当前的调用者,直到 Provider 设置了共享状态的值(此时共享状态的标志变为 ready
),std::future::get
返回异步任务的值或异常(如果发生了异常)。通过std::future
,线程执行函数的返回结果不必保存在全局变量中。
析构队列 调用SafeQueue::destory()
。
1 2 3 4 5 6 ~SimplePool (){ queue.destory (); for (auto & th : workers){ th.join (); } }
多队列线程池 传统的单队列线程池有一个瓶颈,当多个工作线程争抢任务时,会先进行加锁,频繁的加锁解锁会有一定的性能开销。为了减少线程之间的竞争,我们可以为每一个工作线程设置一个工作队列,Provider每次将任务随机放到一个队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class MultiplePool {public : explicit MultiplePool (size_t threads = std::thread::hardware_concurrency()) : threads_num(threads), queues(threads){ auto worker = [this ] (size_t i) { while (1 ){ std::function<void ()> task; if (!queues[i].wait_pop (task)) { return ; } task (); } }; for (size_t i = 0 ; i < threads; ++i) { workers.emplace_back (worker, i); } } template <typename F, typename ... Args> auto enqueue (F&& f, Args&&... args) -> std::future<decltype (f(args...)) > { auto task = std::make_shared< std::packaged_task<decltype (f (args...))()> >( std::bind (std::forward<F>(f), std::forward<Args>(args)...) ); std::future<decltype (f (args...))> res = task->get_future (); size_t i = rand () % threads_num; queues[i].push ([task](){ (*task)(); }); return res; } ~MultiplePool (){ for (auto & que : queues) { que.destory (); } for (auto & th : workers) { th.join (); } } private : std::vector<std::thread> workers; std::vector<SafeQueue<std::function<void ()>>> queues; size_t threads_num; };
work stealing 多队列线程池存在负载均衡 的问题。例如有两个线程和两个任务队列,其中队列A有10个1ms
的任务,队列B有10个100ms
的任务,那么当线程A执行完所有任务后,线程B仍有大量任务未解决;如果线程A挂起,将大大降低线程池的效率。work stealing 的主要思路是:当一个线程执行完自己队列中的任务后,不会立即挂起等待,而是将其它线程的任务队列中未执行的任务“偷过来”执行。
实现work stealing 机制的方法有很多,这这里给出一种简单的方法:
1 2 3 4 5 6 7 8 9 10 auto worker = [this ] (size_t i) { while (! stop) { std::function<void ()> task; if (queues[i].pop (task) || try_steal (task, i)) { task (); }else { std::this_thread::yield (); } } };
注意从自己队列取任务使用pop
而非wait_pop
。WorkStealPool::try_steal
是尝试从其它队列偷取任务执行,实现如下:
1 2 3 4 5 6 7 8 9 bool try_steal (std::function<void ()>& task, size_t index) { for (size_t i = 0 ; i < threads_num; ++i) { if (i == index) continue ; if (queues[i].pop (task)) { return true ; } } return false ; }
三种线程池进行比较 编写一个简单的测试程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const int kTasks = 1000 ; const int kRange = 500 ; void add_random (int & val) { const int kMaxIterations = rand () % 50 * 10000 ; for (int i = 0 ; i < kMaxIterations; ++i) { val += rand () % (kRange / 2 ) - kRange; } } template <typename T>void test_threadPool (T* pool, const std::string & type ) { auto start_time = std::chrono::system_clock::now (); std::vector<std::future<void >> futures; for (int i = 0 ; i < kTasks; ++i) { int val = i; futures.emplace_back (pool->submit (add_random, val)); } for (auto & future : futures) { future.wait (); } auto end_time = std::chrono::system_clock::now (); auto ms = std::chrono::duration_cast <std::chrono::milliseconds>(end_time - start_time).count (); std::cout << type << " elapsed time: " << ms << "ms\n" ; }
测试结果:
1 2 3 MultiplePool elapsed time: 3154ms WorkStealPool elapsed time: 3109ms SimplePool elapsed time: 3298ms
这三种线程池性能在该测试案例下并没有很大区别,甚至当单个任务执行时间相似时(kMaxIterations
取常量)WorkStealPool
不如MultiplePool
。个人分析的原因如下:
SimplePool
中的线程获取任务是主动抢占式的,而MultiplePool
中的线程获取任务是被动随机分配的。尽管前者会存在更多的线程竞争,但线程执行任务的效率会更高。
WorkStealPool
采用的是单头FIFO队列,在获取其它队列的任务时仍然会和原线程存在竞争。而为了减少这种竞争,最好使用双向队列。
参考
作者:
Zhou Yee
License:
Copyright (c) 2022 CC-BY-NC-4.0 LICENSE