Strands

如果你用过Boost Asio,那你肯定用过或者见到过strands

在编写异步代码时,函数时常会被多个线程并发调用。为了避免函数对同一份资源的竞争,我们可能需要使用诸如互斥量的办法来显式地限制对资源的访问,这无疑为程序的设计和代码的编写带来了不必要的压力。

Strands正是解决此类问题的一种设计,它可以对一组函数的执行进行调度,保证这一组函数不会并发执行。通过Strand执行的函数不需要显式地同步,这简化了异步代码的编写。

在程序中,如果你只有一个IO线程(比如在Boost::Asio中,只有一个线程调用了 io_service::run​),那么你并不需要同步。这种情况下该线程中所有的函数会依次执行。但是当你希望提高处理速度分配了多个IO线程时,那资源的竞争问题要求你显式地同步函数的执行,或者使用Strands之类的设计。

显式地同步函数执行是可能的,但是这会引入不必要的复杂性到代码中,也可能因此带来bugs。

在一般的任务处理队列中,工作线程会拿到队列中函数并执行。Strands通过在工作线程和函数执行间引入中间层,保证了函数的执行顺序,避免了对资源的竞争。

示意图如下:middle-layer

可能的场景

Remotery可以帮助我们可视化线程执行和函数调用的情况。

测试代码模拟了4个工作线程和8个连接。每个连接会给出一组计算任务到任务队列中,每个任务耗时5 ms到15 ms。尽管这与实际情况下的线程/连接比和任务耗时有所出入,但是这个例子可以很好的解释这个话题。

阻塞问题

现在看一下每个线程随时间的执行情况:

workthread-connection

每一个连接Conn N都用不同的颜色标记出来,连接提交的任务在不同线程中处理。

如果我们观察每个时间片上线程的运行情况:

time-slice

工作线程会从任务队列中取出每个连接提交的任务并执行,当一个线程正在处理一个连接提交的任务时,如果另外一个线程也在处理这个连接提交的任务,那么后者会被阻塞(避免资源竞争)。

时间记录如下:

time-static

在上述场景中,工作线程大约19%的时间都被阻塞浪费掉了,换句话说,只有81%的时间是在真正处理任务。

如果我们使用Strand帮助我们同步每一个连接的任务:

time-slice-strand

time-statistic-strand

只有非常少的时间被内部工作和同步问题浪费了。

缓存局部性

另一个可能的好处是更好的CPU缓存利用(cache utilization)。工作线程会倾向于处理同一个连接的多个任务后再处理另一个连接。

没有使用Strands时的执行情况

cache-without-strands

使用Strands时的执行情况

cache-with-strands

尽管实际工作中函数处理的情况并不像测试假设的这样,但是测试中的这种结果也表明了Strand可能的好处。

Strands实现

作为一个练习,这个实现并不能达到工业强度,但是可以很好地用于我们的实验。

首先让我们定义Stands需要完成的功能:

  1. 没有函数可以并发执行

    • 这要求我们检测是Stands是否在某个线程中运行

    • 为了避免阻塞,Stands需要一个内部的任务队列用于确保函数能正确的执行

  2. 函数只在对应的工作线程中被执行

    • 这要求其他线程中提交的任务正确地被加入到Stands的任务队列中
  3. 函数的执行顺序没有保证

    • 我们可能从多个线程中提交任务到Stands的任务队列中,所以任务的执行顺序并没有保证

Boost::Asio::Strands相似,我们为Stands定义了3个主要的接口:

  • post——将任务加入到队列中,稍后执行;
  • dispatch——允许时立刻执行任务,否则将任务加入到队列中;
  • run——处理所有的任务。

先不考虑同步的问题,我们可以绘制出这3个方法的行为:

post

dispatch

run

为了推导全部的代码,我们需要用到一些在前面的文章中介绍的辅助类:

  • Callstack——允许我们在当前的调用栈中放置标记,用于检测我们是否正在当前线程中执行某个函数;

  • WorkQueue——简单的多消费者/多生产者任务队列。当没有任务时消费者会阻塞;

  • Monitor——对T类型的对象的并发访问提供同步控制。

具体的推导如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once
#include "Callstack.h"
#include "Monitor.h"
#include <assert.h>
#include <queue>
#include <functional>

//
// A strand serializes handler execution.
// It guarantees the following:
// - No handlers executes concurrently
// - Handlers are only executed from the specified Processor
// - Handler execution order is not guaranteed
//
// Specified Processor must implement the following interface:
//
// template <typename F> void Processor::push(F w);
// Add a new work item to the processor. F is a callable convertible
// to std::function<void()>
//
// bool Processor::canDispatch();
// Should return true if we are in the Processor's dispatching function in
// the current thread.
//
template <typename Processor>
class Strand {
public:
Strand(Processor& proc) : m_proc(proc) {}

Strand(const Strand&) = delete;
Strand& operator=(const Strand&) = delete;

// Executes the handler immediately if all the strand guarantees are met,
// or posts the handler for later execution if the guarantees are not met
// from inside this call
template <typename F>
void dispatch(F handler) {
// If we are not currently in the processor dispatching function (in
// this thread), then we cannot possibly execute the handler here, so
// enqueue it and bail out
if (!m_proc.canDispatch()) {
post(std::move(handler));
return;
}

// NOTE: At this point we know we are in a worker thread (because of the
// check above)

// If we are running the strand in this thread, then we can execute the
// handler immediately without any other checks, since by design no
// other threads can be running the strand
if (runningInThisThread()) {
handler();
return;
}

// At this point we know we are in a worker thread, but not running the
// strand in this thread.
// The strand can still be running in another worker thread, so we need
// to atomically enqueue the handler for the other thread to execute OR
// mark the strand as running in this thread
auto trigger = m_data([&](Data& data) {
if (data.running) {
data.q.push(std::move(handler));
return false;
} else {
data.running = true;
return true;
}
});

if (trigger) {
// Add a marker to the callstack, so the handler knows the strand is
// running in the current thread
Callstack<Strand>::Context ctx(this);
handler();

// Run any remaining handlers.
// At this point we own the strand (It's marked as running in
// this thread), and we don't release it until the queue is empty.
// This means any other threads adding handlers to the strand will
// enqueue them, and they will be executed here.
run();
}
}

// Post an handler for execution and returns immediately.
// The handler is never executed as part of this call.
template <typename F>
void post(F handler) {
// We atomically enqueue the handler AND check if we need to start the
// running process.
bool trigger = m_data([&](Data& data) {
data.q.push(std::move(handler));
if (data.running) {
return false;
} else {
data.running = true;
return true;
}
});

// The strand was not running, so trigger a run
if (trigger) {
m_proc.push([this] { run(); });
}
}

// Checks if we are currently running the strand in this thread
bool runningInThisThread() {
return Callstack<Strand>::contains(this) != nullptr;
}

private:
// Processes any enqueued handlers.
// This assumes the strand is marked as running.
// When there are no more handlers, it marks the strand as not running.
void run() {
Callstack<Strand>::Context ctx(this);
while (true) {
std::function<void()> handler;
m_data([&](Data& data) {
assert(data.running);
if (data.q.size()) {
handler = std::move(data.q.front());
data.q.pop();
} else {
data.running = false;
}
});

if (handler)
handler();
else
return;
}
}

struct Data {
bool running = false;
std::queue<std::function<void()>> q;
};
Monitor<Data> m_data;
Processor& m_proc;
};

为了减少代码依赖,Strands被设计为模板类,你必须指定一个合适的Processor类。

值得再次强调的是,Strands并不是用Processor来执行函数,而是用它来执行自己的Run方法。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include "Strand.h"
#include "WorkQueue.h"
#include <random>
#include <stdlib.h>
#include <string>
#include <atomic>

// http://stackoverflow.com/questions/7560114/random-number-c-in-some-range
int randInRange(int min, int max) {
std::random_device rd; // obtain a random number from hardware
std::mt19937 eng(rd()); // seed the generator
std::uniform_int_distribution<> distr(min, max); // define the range
return distr(eng);
}

struct Obj {
explicit Obj(int n, WorkQueue& wp) : strand(wp) {
name = "Obj " + std::to_string(n);
}

void doSomething(int val) {
printf("%s : doing %dn", name.c_str(), val);
}
std::string name;
Strand<WorkQueue> strand;
};

void strandSample() {
WorkQueue workQueue;
// Start a couple of worker threads
std::vector<std::thread> workerThreads;
for (int i = 0; i < 4; i++) {
workerThreads.push_back(std::thread([&workQueue] { workQueue.run(); }));
}

// Create a couple of objects that need strands
std::vector<std::unique_ptr<Obj>> objs;
for (int i = 0; i < 8; i++) {
objs.push_back(std::make_unique<Obj>(i, workQueue));
}

// Counter used by all strands, so we can check if all work was done
std::atomic<int> doneCount(0);

// Add work to random objects
const int todo = 20;
for (int i = 0; i < todo; i++) {
auto&& obj = objs[randInRange(0, objs.size() - 1)];
obj->strand.post([&obj, i, &doneCount] {
obj->doSomething(i);
++doneCount;
});
}

workQueue.stop();
for (auto&& t : workerThreads) {
t.join();
}

assert(doneCount == todo);
}

测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Obj 2 : doing 0
Obj 1 : doing 1
Obj 1 : doing 3
Obj 1 : doing 4
Obj 3 : doing 6
Obj 5 : doing 2
Obj 4 : doing 5
Obj 6 : doing 11
Obj 3 : doing 8
Obj 5 : doing 10
Obj 5 : doing 12
Obj 6 : doing 17
Obj 3 : doing 9
Obj 3 : doing 13
Obj 5 : doing 18
Obj 0 : doing 14
Obj 2 : doing 15
Obj 3 : doing 16
Obj 5 : doing 19
Obj 1 : doing 7

总结

使用Strand有明显的优势:

  • 不在需要显示同步

    • 对于绑定了同一个Strands的一组函数,我们不必再担心并发执行的问题
  • 更少的阻塞

    • 降低了对同一个连接的资源竞争,如在TCP连接中Buffer的读写顺序控制
  • 缓存局部性

    • 取决于具体的场景,但好过没有

附: 测试项目

参考

  1. How strands work and why you should use them
  2. Boost::Asio::Strands