Concurrency In Cpp(2)

在这篇文章中我们介绍Cpp标准库和拓展库中有哪些并发工具。

timelineCpp17andCpp20

基于多核架构的要求,C++11标准定义程序了在多线程中的表现。包括两个方面,其一是标准内存模型,其二则是一些标准多线程API。从此你可以通过std::asyncpackaged_taskpromisefuture来构建异步任务。

总的来说,C++11提供的工具毁誉参半。一方面,这些工具比起直接使用thread或条件变量来说要方便的多;另一方面,这种形式并不完整——它们无法被组合。C++20/23标准尝试解决这些问题,并且提供了更多的并发拓展。

工具分类

按功能来分有这些:

  1. 资源的访问——C++11标准提供了一些基本的同步原语atomicmutexlockcondition_variable
  2. 任务的封装——C++11标准提供了几种任务封装形式futurepromisepackaged_task
  3. 并行版算法——C++17标准提供了常用算法的并行版本;
  4. 任务的结构——在C++20里将补全任务结构控制,包括then、 when_all、 when_any这三个用来关联多个future的函数。
  5. 任务的执行——现有的任务执行者基本都是线程池,每个线程不断的尝试获取一个任务并执行,类似于一个while循环。此外,在C++20/23中有executor的提案;
  6. 任务的调度——这部分负责了任务的投递和分发,他在多线程之间维持了一个任务容器集合,提供的接口主要包括接受新任务、取出一个任务和判断容器是否为空,最常见的是concurrenct_queue。这部分标准库并没有提供,有多种不同的实现方式

C++17

C++17标准为标准模板库中大部分的算法提供了并行的版本,你可以通过所谓的执行策略来调用。执行策略指明了算法是串行(std::seq),并行(std::par)还是并行矢量式(std::par_unseq)运行。

1
2
3
4
5
6
std::vector<int> vec ={3, 2, 1, 4, 5, 6, 10, 8, 9, 4};

std::sort(vec.begin(), vec.end()); // sequential as ever
std::sort(std::execution::seq, vec.begin(), vec.end()); // sequential
std::sort(std::execution::par, vec.begin(), vec.end()); // parallel
std::sort(std::execution::par_unseq, vec.begin(), vec.end()); // parallel and vectorized

C++20

基于新的多线程理念,C++20标准提供了更多组件。

原子智能指针

现有的智能指针std::shared_ptrstd::weak_ptr在多线程序中有一个潜在的问题——共享了一个可变对象的状态。这种方式可能造成数据竞争导致未定义的行为。

现有的智能指针通过原子引用计数来保证资源的正确释放,但是这没有保证资源的访问是原子性的,新的原子智能指针std::atomic_shared_ptrstd::atomic_weak_ptr就是用来解决这些问题的。

std::future拓展

promisefuture提供了一种在多线程程序开发中构建异步任务的概念。尽管简化了问题的复杂性,但是它们仍存在不能组合的问题。在C++20中提供了更多的拓展来解决上述的问题。

  • then

    1
    2
    3
    4
    5
    future<int> f1= async([]() { return 123; });
    future<string> f2 = f1.then([](future<int> f) {
    return f.get().to_string();
    }); // won't block
    f2.get(); // block
  • when_any

    1
    2
    3
    future<int> futures[] = {async([]() { return intResult(125); }),                       
    async([]() { return intResult(456); })};
    future<vector<future<int>>> any_f = when_any(begin(futures),end(futures));
  • when_all

    1
    2
    3
    4
    future<int> futures[] = {
    async([]() { return intResult(125); }),
    async([]() { return intResult(456); })};
    future<vector<future<int>>> all_f = when_all(begin(futures), end(futures));

Latches and barriers

迟来的同步原语。栓和栅都是通过计数器来控制资源访问的信号量。不同的是,std::latch的使用是一次性的, 而std::barrier可以被反复使用。

1
2
3
4
5
6
7
8
9
10
11
12
void doWork(threadpool* pool){
latch completion_latch(NUMBER_TASKS);
for (int i = 0; i < NUMBER_TASKS; ++i){
pool->add_task([&]{
// perform the work
...
completion_latch.count_down();
});
}
// block until all tasks are done
completion_latch.wait();
}

协程Coroutines

协程是函数的一种泛化,它提供了挂起和恢复函数调用上下文的语义。协程是实现操作系统,事件循环,无穷李彪或者管线中合作式多任务的一种常用工具。

1
2
3
4
5
6
7
8
9
10
generator<int> getInts(int first, int last){
for (auto i= first; i <= last; ++i){
co_yield i;
}
}

int main(){
for (auto i: getInts(5, 10)){
std::cout << i << " "; // 5 6 7 8 9 10
}

事务内存Transactional memory

事务内存是基于数据库理论中的事务处理的一种概念。事务表示一个原子的,一致的,独立的,持久的动作。除了持久的特性,其他的的特性在C++事务内存中都得到了保证。

C++事务内存分为同步块和原子块两种形式,他们都保证了内部的代码按照固定的顺序执行,而且持有全局锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int func() { 
static int i = 0;
synchronized{
std::cout << "Not interleaved \n";
++i;
return i;
}
}

int main(){
std::vector<std::thread> v(10);
for(auto& t: v)
t = std::thread([]{ for(int n = 0; n < 10; ++n) func(); });
}

Task blocks

任务区实现了fork-join范式,如图:

ForkJoin

通过在任务区使用关键字run你可以fork新的任务,这些任务在离开作用域时会自动join。

1
2
3
4
5
6
7
8
9
10
11
template <typename Func> 
int traverse(node& n, Func && f){
int left = 0, right = 0;
define_task_block(
[&](task_block& tb){
if (n.left) tb.run([&]{ left = traverse(*n.left, f); });
if (n.right) tb.run([&]{ right = traverse(*n.right, f); });
}
);
return f(n) + left + right;
}

C++23

Executors

Executors是C++中执行代码的基本单元的一种抽象,就像容器(containers)中的allocators一样。

Executors定义了一组关于何时何地如何执行函数对象的一组规则。

  • Where——函数对象可能在内部或外部的处理器上运行,执行的结果从内部或外部处理器中返回;
  • When——函数对象可能立即执行,也可能遵从指定的安排。
  • How——函数对象可能在CPU或GPU上执行,甚至可能以一种矢量化的方式执行。

作为一种基础工具,本文中提到的C++并行和并发特性都非常依赖于Executors,甚至在未来的网络库中也可以见到Executors的身影(有生之年希望见到std::asio)。

Executors提供的抽象提供了如下特性:

  • 批处理有助于平衡小型函数对象的调用代价。
  • 使得函数对象能够在不同的上下文执行并返回结果。
  • 调度序列化。无论是先进先出或者后进先出队列,还是优先级或者时间约束,函数对象的调用顺序的各种调度模式都可以适应。
  • 函数对象的执行与具体的计算机资源绑定,而且能够实现延迟或者取消。

一些例子:

  • std::async
1
2
3
4
5
6
7
// get an executor through some means
my_executor_type my_executor = ...

// launch an async using my executor
auto future = std::async(my_executor, [] {
std::cout << "Hello world, from a new execution agent!" << std::endl;
});
  • std::for_each
1
2
3
4
5
6
// get an executor through some means
my_executor_type my_executor = ...

// execute a parallel for_each "on" my executor
std::for_each(std::execution::par.on(my_executor),
data.begin(), data.end(), func);
  • static_thread_pool
1
2
3
4
5
6
7
8
// create a thread pool with 4 threads
static_thread_pool pool(4);

// get an executor from the thread pool
auto exec = pool.executor();

// use the executor on some long-running task
auto task1 = long_running_task(exec);

可用的拓展库

除开标准库,C++中有许多并行并发三方库可以使用。

附录:[C++ Concurrency In Action.pdf](C++ Concurrency In Action.pdf “下载”)