Monitor and Work Queue

利用诸如互斥量和信号量的同步原语,我们可以构建出更加实用的多线程工具。

Monitor

在多线程程序中,常常有会一些闭包需要跨线程调用,当这些闭包对同一资源产生竞争时,我们需要对该资源加锁;利用互斥量我们可以构建一个非常有用的小工具——监视器(Monitor)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#pragma once
#include <mutex>

template <class T>
class Monitor {
private:
mutable T m_t;
mutable std::mutex m_mtx;

public:
using Type = T;
Monitor() {}
Monitor(T t_) : m_t(std::move(t_)) {}
template <typename F>
auto operator()(F f) const -> decltype(f(m_t)) {
std::lock_guard<std::mutex> hold{m_mtx};
return f(m_t);
}
};

简单的使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <thread>
#include <iostream>

#include "Monitor.h"

int main()
{
int count = 0;
Monitor<int> monitor(count);

auto printInChildThread = [](int& c){
for(int i = 0; i < 5; ++i)
std::cout << "Child: " << c++ << std::endl;
};

auto printInMainThread = [](int& c){
for(int i = 0; i < 3; ++i)
std::cout << " Main: " << c++ << std::endl;
};

std::thread t1([&](){
monitor(printInChildThread);
});
monitor(printInMainThread);

t1.join();
// monitor.m_t -> 8
}

Work Queue

在多线程模型中,最常见的便是生产者/消费者模型。

队列可以帮助我们简化多线程程序设计,一个仿照Boost::Asio:: io_service的任务队列推导如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#pragma once
#include <mutex>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>

// Really simple Multiple producer / Multiple consumer work queue
class WorkQueue {
public:
// Add a new work item
template <typename F>
void push(F w) {
std::lock_guard<std::mutex> lock(m_mtx);
m_q.push(std::move(w));
m_cond.notify_all();
}

// Continuously waits for and executes any work items, until "stop" is
// called
void run() {
while (true) {
std::function<void()> w;
{
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [this] { return !m_q.empty(); });
w = std::move(m_q.front());
m_q.pop();
}

if (w) {
w();
} else {
// An empty work item means we are shutting down, so enqueue
// another empty work item. This will in turn shut down another
// thread that is executing "run"
push(nullptr);
return;
}
};
}

// Causes any calls to "run" to exit.
void stop() {
push(nullptr);
}

private:
std::condition_variable m_cond;
std::mutex m_mtx;
std::queue<std::function<void()>> m_q;
};

这种实现的特点如下:

  • 对Stop()的调用会将结束标志加入队列
  • 当消费者取出结束标志,它会在退出前再推入一个结束标志到队列
  • 多次对Stop()的调用——队列中的多余结束标志,不会有副作用

简单的使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include "WorkQueue.h"

int main() {
WorkQueue work;

// Start a couple of consumer threads
std::vector<std::thread> ths;
for (int i = 0; i < 4; i++) {
ths.push_back(std::thread([&work] { work.run(); }));
}

std::vector<int> res;
res.resize(10000);

// Enqueue work.
for (int i = 0; i < res.size(); i++) {
// These work items simply increment the element at index i.
work.push([i, &res] { res[i]++; });
}

// Stop consumers, and wait for the threads to finish
work.stop();
for (auto&& t : ths) t.join();

// Test if all work items were executed
for (int i = 0; i < res.size(); i++) {
if (res[i] != 1)
printf("ERROR: Index %d set to %d\n", i, res[i]);
}
}