The Design of Task System

在前面的文章中,我们实现了一个可用的WorkQueue。但是它在效率上是有问题的。主要的原因就是所有的工作线程都在争夺任务队列的控制权,产生了竞争(contention)。

本文将介绍如何设计一种高效的任务执行系统。

竞争问题

如果我们在同一进程的多个线程中同时对某一份资源进行访问,就可能出现竞争的问题。这一现象可以用下图表示。

contention

要正确地解决这样的问题,我们可以利用诸如互斥量等同步手段进行对资源的访问进行限制。但是这种做法也带来了一些问题:

  • 像锁这样的同步原语非常容易被错误地使用,为程序开发带来潜在的破坏性影响;
  • 竞争问题仍然存在,而这可能降低程序运行的效率。

朴素的线程池设计

首先我们考虑无返回值、无参数的异步过程的执行。在这种情况下,最直接的处理方案就是利用一个队列来存储提交的异步任务,同时建立一个线程池来消费这个任务队列。为此,我们需要实现两个部分:多线程的任务队列,以及任务的提交和请求。

多线程的任务队列的基本设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
using lock_t = unique_lock<mutex>;
class notification_queue {
deque<function<void()>> _q;
bool _done{false};
mutex _mutex;
condition_variable _ready;
public:
void done() {
{
unique_lock<mutex> lock{_mutex};
_done = true;
}
_ready.notify_all();
}
bool pop(function<void()>& x) {
lock_t lock{_mutex};
while (_q.empty() && !_done) _ready.wait(lock);
if (_q.empty()) return false;
x = move(_q.front());
_q.pop_front();
}
template<typename F>
void push(F&& f) {
{
lock_t lock{_mutex};
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
}
};

在此多线程队列的支持下,初步的线程池系统可以有如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class task_system {
const unsigned _count{thread::hardware_concurrency()};
vector<thread> _threads;
notification_queue _q;

void run() {
while (true) {
function<void()> f;
if (!_q.pop(f)) break;
f();
}
}
public:
task_system() {
for (unsigned n = 0; n != _count; ++n) {
_threads.emplace_back([&]{ run(); });
}
}
~task_system() {
_q.done();
for (auto& e : _threads) e.join();
}
template <typename F>
void async_(F&& f) {
_q.push(forward<F>(f));
}
};

上述代码虽然实现了一个简单的线程池,但是在效率上是有问题的。主要的原因就是所有的工作线程都在争夺任务队列的控制权,产生了contention。为了缓解contention现象,可以从以下两个方面来入手:

  • 显式的以待头节点的链表来实现队列,从而使得任务的提交和请求所需要的锁分开;
  • 为每一个线程分配一个专有的任务队列,同时允许线程向其他任务队列请求任务,即 work_steal

为每个线程分配独立的任务队列

修改task_system的成员变量:

1
2
3
4
5
6
7
class task_system {
const unsigned _count{thread::hardware_concurrency()};
vector<thread> _threads;
vector<notification_queue> _q{_count}; // queue for each thread
atomic<unsigned> _index{0}; //
......
}

修改run函数,使之支持多个任务队列的运行:

1
2
3
4
5
6
7
void run(unsigned i) {
while (true) {
function<void()> f;
if (!_q[i].pop(f)) break;
f();
}
}

接下来是析构函数:

1
2
3
4
~task_system() {
for (auto& e : _q) e.done();
for (auto& e : _threads) e.join();
}

和轮转式的任务提交:

1
2
3
4
5
template <typename F>
void async_(F&& f) {
auto i = _index++;
_q[i % _count].push(forward<F>(f));
}

这样我们就为每一个线程都分配了一个独立的任务队列,并提供了任务的提交功能。

支持work_steal的线程池设计

为了支持work_steal,我们首先为notification_queue添加新的成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool try_pop(function<void()>& x) {
lock_t lock{_mutex, try_to_lock};
if (!lock || _q.empty()) return false;

x = move(_q.front());
_q.pop_front();
return true;
}
template<typename F>
bool try_push(F&& f) {
{
lock_t lock{_mutex, try_to_lock};
if (!lock) return false;
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
return true;
}

接下来我们就可以在run函数中实现work_steal了:

1
2
3
4
5
6
7
8
9
10
11
void run(unsigned i) {
while (true) {
function<void()> f;
for (unsigned n = 0; n != _count; ++n) {
if (_q[(i + n) % _count].try_pop(f)) break;
}
if (!f && !_q[i].pop(f)) break;

f();
}
}

附:完整的源码

总结

一图胜千言。

evolution