A Priority Queue with Boost.Asio

在这篇文章中,我们先介绍一下Executor模式,然后试着用Asio实现一个优先队列。

Executors

Executors定义了一组关于何时何地如何执行函数对象的一组规则。在多线程序中,可以使用它将任务的提交和执行解耦。

举例来说:

Type of executor Where, when and how
操作系统 进程中的任意线程
Thread pool 仅限于线程池中的线程
Strand 按照FIFO的顺序依次执行队列中的函数对象
Future / Promise 任意线程。函数对象抛出的异常保存在promise中

满足一定规则的类都可以被称为Executors,所以Executors不仅指上述这些类别。就像标准库里的allocators一样,用户可以根据自己的规则定义自己的executor

一般来说,executor提供了两种基本的提交任务的方式:dispatchpost。它们的区别在于任务调度的优先程度。

dispatch

dispatch操作的优先程度最高,它表示在满足executor条件的情况下尽快地执行函数对象。

1
2
3
4
5
6
7
8
void f1()
{
std::cout << "Hello, world!\n";
}

// ...

dispatch(ex, f1);

通过对ex调用一个dispatch操作,我们向executor发起了立刻执行指定任务的一个请求。当然,任务是否立刻执行依赖于executor具体的规则:

Type of executor Behaviour of dispatch
操作系统 在从dispatch()返回前就调用函数对象
线程池 如果我们在线程池内部调用该方法,线程池会在从dispatch()返回前就
调用函数对象。否则的话会将任务加入到线程池的任务队列中(稍后执行)
Strand 如果我们在Strand内部调用该方法,或者Strand内的任务队列为空,
Strand会在从dispatch()返回前就调用函数对象。
否则的话会将任务加入到Strand的任务队列中(稍后执行)
Future / Promise 将函数对象包裹在try/catch中,并且在从dispatch()返回前就调用它

这种方式的另一个好处是如果允许的话编译器会折叠(inline)函数调用。

post

post操作则会将任务排进处理队列,然后返回,任务会在稍后某个时机被处理。

1
post(ex, f1);

任务的执行方式取决于executor具体的规则:

Type of executor Behaviour of post
操作系统 将函数对象加入到系统的线程池任务队列中
线程池 将函数对象加入到线程池的任务队列中
Strand 将函数对象加入到Strand的任务队列中
Future / Promise 将函数对象包裹在try/catch中,并加入到系统的任务队列中

example

我们写一个向线程池post任务的小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class bank_account
{
int balance_ = 0;
std::experimental::thread_pool pool_{1};

public:
void deposit(int amount)
{
post(pool_.get_executor(), [=]
{
balance_ += amount;
});
}

void withdraw(int amount)
{
post(pool_.get_executor(), [=]
{
if (balance_ >= amount)
balance_ -= amount;
});
}
};

这里,线程池就是一个典型的execution context,它表示了函数对象执行的位置。类似的还有Boost::Asio::io_context

Priority Queue

仿照Boost::Asio::io_context的实现,我们可以定义一个priority_handler_queue类,它继承于boost::asio::execution_context,内部有一个executor满足Executor的要求,允许用户提交任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class priority_handler_queue : public boost::asio::execution_context
{
public:
// A class that satisfies the Executor requirements.
class executor
{
public:
executor(priority_handler_queue& q, int p) : context_(q), priority_(p) {}

priority_handler_queue& context() const noexcept { return context_; }

template <typename Function, typename Allocator>
void dispatch(Function f, const Allocator& a) const
{
post(std::forward<Function>(f), a);
}

template <typename Function, typename Allocator>
void post(Function f, const Allocator& a) const
{
auto p(std::allocate_shared<queued_handler<Function>>(
typename std::allocator_traits<
Allocator>::template rebind_alloc<char>(a),
priority_, std::move(f)));

std::lock_guard<std::mutex> lock(context_.mutex_);
context_.queue_.push(std::move(p));
context_.condition_.notify_one();
}

template <typename Function, typename Allocator>
void defer(Function f, const Allocator& a) const
{
post(std::forward<Function>(f), a);
}

void on_work_started() const noexcept {}
void on_work_finished() const noexcept {}

bool operator==(const executor& other) const noexcept
{
return &context_ == &other.context_ && priority_ == other.priority_;
}

bool operator!=(const executor& other) const noexcept
{
return !operator==(other);
}

private:
priority_handler_queue& context_;
int priority_;
};

executor get_executor(int pri = 0) noexcept
{
return executor(*const_cast<priority_handler_queue*>(this), pri);
}

...
}

同时,它也有一个可以多线程运行的run()方法和stop()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void run()
{
while(!stopped_)
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return stopped_ || !queue_.empty(); });
if (stopped_ || queue_.empty()) return;

auto p(queue_.top());
queue_.pop();
lock.unlock();

// make sure there is no mutex being locked
// when user's code runs
p->execute();

lock.lock();
}
}

void stop()
{
std::lock_guard<std::mutex> lock(mutex_);
stopped_ = true;
condition_.notify_all();
}

基于标准库的工具,我们可以实现一个能够按照优先级对提交的函数对象进行保存的队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using handler_ptr = typename std::shared_ptr<queued_handler_base>;

class queued_handler_base
{
public:
explicit queued_handler_base(int p) : priority_(p) {}
virtual ~queued_handler_base() = default;

virtual void execute() = 0;

int priority_;
};

template <typename Function>
class queued_handler : public queued_handler_base
{
public:
queued_handler(int p, Function f) : queued_handler_base(p), function_(std::move(f)){}

void execute() override { function_(); }

private:
Function function_;
};

struct handler_comp
{
bool operator()(const handler_ptr& a, const handler_ptr& b)
{
return a->priority_ < b->priority_;
}
};

...
std::priority_queue<handler_ptr, std::vector<handler_ptr>, handler_comp> queue_;

测试

一个典型的测试场景如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

int main()
{
priority_handler_queue queue;

auto low = queue.get_executor(0);
auto med = queue.get_executor(1);
auto high = queue.get_executor(2);

// those tasks will be executed at last
for (int j = 0; j < 10; ++j) {
dispatch(queue, [j] {

sleep(1);
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << std::this_thread::get_id() << " low:" << j << std::endl;
});
}

for (int j = 0; j < 20; ++j) {
dispatch(med, [j] {
sleep(2);
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << std::this_thread::get_id() << " med:" << j << std::endl;
});
}

// those tasks will be executed at first
for (int j = 0; j < 5; ++j) {
boost::packaged_task<int> task([=]() {
sleep(1);
return 111;
});
auto f = task.get_future();
// we can chain a handler which executed
// when the future is satisfied
f.then([](boost::future<int> future) {
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << std::this_thread::get_id() << " high:"
<< future.get() << std::endl;
});

// we can post boost::packaged_task to executor as well
post(high, std::move(task));
}

// stop when all done
dispatch(queue.get_executor(-1), [&]{ queue.stop(); });

boost::thread_group group_;
for (std::size_t i = 0; i < std::thread::hardware_concurrency() - 1; ++i) {
group_.create_thread([&](){ queue.run(); });
}

// block main thread here
queue.run();

// all is done
group_.join_all();
}

附录:源码

Executors_and_Asynchronous_Operations_Slides.pdf

参考

  1. A Short Detour: Executors
  2. A C++14 library for executors