Mutex and Semaphore

C++11标准在标准库中为多线程提供了组件,这意味着使用C++编写与平台无关的多线程程序成为可能,而且C++程序的可移植性也得到了有力的保证。

互斥量(Mutex)

互斥量提供了独占所有权的概念,它可以帮助程序控制对资源的访问。

一个简单的例子如下,它展示了 std::mutex 能如何用于保护共享于二个线程间的 std::map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
#include <iostream>

std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;

void save_page(const std::string &url)
{
// 模拟长页面读取
std::this_thread::sleep_for(std::chrono::seconds(2));
std::string result = "fake content";

std::lock_guard<std::mutex> guard(g_pages_mutex);
g_pages[url] = result;
}

int main()
{
std::thread t1(save_page, "http://foo");
std::thread t2(save_page, "http://bar");
t1.join();
t2.join();

// 现在访问g_pages是安全的,因为线程t1/t2生命周期已结束
for (const auto &pair : g_pages) {
std::cout << pair.first << " => " << pair.second << '\n';
}
}

输出为:

1
2
http://bar => fake content
http://foo => fake content

信号量(Semaphore)

信号量是一个计数器,它限制了并发访问同一资源的线程数量。

在创建信号量时计数器的值总是在0和最大值之间。当计数器的值严格大于0时,对Wait()的调用会立刻返回,并且计数器的值减一;当计数器的值为0时,对Wait()的调用会阻塞。对于阻塞的信号量,只有Signal()调用后,计数器的值重新大于0时才返回。

简单来说,信号量适用场景非常典型——同一时刻只有固定数量消费者访问共享资源。举个栗子,信号量可以表示一个酒店可预订房间数,房间被预定代表一次对信号量Wait()的调用,而退房的操作即是对Signal()的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#pragma once

#include <mutex>
#include <condition_variable>

// Simplest implementation
class Semaphore {
public:
explicit Semaphore(int count = 0) : count_(count) {
}

void Signal() {
std::unique_lock<std::mutex> lock(mutex_);
++count_;
cv_.notify_one();
}

void Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=] { return count_ > 0; });
--count_;
}

private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;
};

下面是一些测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//io操作锁
std::mutex g_io_mutex;
//信号量本身是一个全局对象,count 为 1,一次只允许一个线程访问:
Semaphore g_semaphore(1);

void Worker() {
g_semaphore.Wait();

std::thread::id thread_id = std::this_thread::get_id();

std::string now = FormatTimeNow("%H:%M:%S");
{
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << "Thread " << thread_id << ": wait succeeded"
<< " (" << now << ")" << std::endl;
}

//模拟数据处理
std::this_thread::sleep_for(std::chrono::seconds(1));

g_semaphore.Signal();
}

std::string FormatTimeNow(const char* format) {
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm* now_tm = std::localtime(&now_c);

char buf[20];
std::strftime(buf, sizeof(buf), format, now_tm);
return std::string(buf);
}

int main() {
const std::size_t SIZE = 3;

std::vector<std::thread> v;
v.reserve(SIZE);

for (std::size_t i = 0; i < SIZE; ++i) {
v.emplace_back(&Worker);
}

for (std::thread& t : v) {
t.join();
}

return 0;
}

输出为:

1
2
3
4
//每个线程相隔一秒,即一次只允许一个线程访问。
Thread 1d38: wait succeeded (13:10:10)
Thread 20f4: wait succeeded (13:10:11)
Thread 2348: wait succeeded (13:10:12)