A TCP echo server with Boost.Asio(1)

在前一篇文章中,我们有介绍如何利用Boost.Asio构建线程池。

本文继续谈一下Boost.Asio是如何使用无锁的同步方式解决竞态条件的,以及如何构建一个TCP echo server。

无锁同步

线程池

一个简单的线程池实现如下,在每个 worker 线程中执行io_servicerun()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AsioThreadPool
{
public:
// the constructor just launches some amount of threads
AsioThreadPool(int threadNum = std::thread::hardware_concurrency())
: work_guard_(boost::asio::make_work_guard(service_))
{
// one io_context and multi thread
for (int i = 0; i < threadNum; ++i)
{
threads_.emplace_back([this] () { service_.run(); });
}
}

AsioThreadPool(const AsioThreadPool &) = delete;
AsioThreadPool &operator=(const AsioThreadPool &) = delete;

boost::asio::io_context &getIOService()
{
return service_;
}

void stop()
{
// Once the work object is destroyed, the service will stop.
work_guard_.reset();
for (auto &t: threads_) {
t.join();
}
}
private:
boost::asio::io_context service_;

typedef boost::asio::io_context::executor_type ExecutorType;
boost::asio::executor_work_guard<ExecutorType> work_guard_;
std::vector<std::thread> threads_;
};

由于多个线程同时运行事件循环(event loop),所以会导致一个问题:即一个 socket 描述符可能会在多个线程之间共享,容易出现竞态条件(race condition)。譬如说,如果某个 socket 的可读事件很快发生了两次,那么就会出现两个线程同时读同一个 socket 的问题。

无锁的同步方式

要怎样解决前面提到的竞态条件呢?Boost.Asio 提供了io_service::strand:如果多个 event handler 通过同一个 strand 对象分发 (dispatch),那么这些 event handler 就会保证顺序地执行。
例如,下面的例子使用 strand,所以不需要使用互斥锁保证同步了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
AsioThreadPool pool(4);    // 开启 4 个线程

boost::asio::steady_timer timer1{ pool.getIOService(), std::chrono::seconds{1} };
boost::asio::steady_timer timer2{ pool.getIOService(), std::chrono::seconds{1} };

int value = 0;
boost::asio::io_context::strand strand{ pool.getIOService() };

timer1.async_wait(
boost::asio::bind_executor(strand_, [&value] (const boost::system::error_code &ec)
{
std::cout << "Hello, World! " << value++ << std::endl;
}));
timer2.async_wait(
boost::asio::bind_executor(strand_, [&value] (const boost::system::error_code &ec)
{
std::cout << "Hello, World! " << value++ << std::endl;
}));

pool.stop();

多线程 Echo Server

下面的EchoServer可以在多线程中使用,它使用asio::strand来解决前面提到的竞态问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>

#include <boost/asio.hpp>
#include <boost/asio/bind_executor.hpp>

#include "asio_thread_pool.hpp"
using boost::asio::ip::tcp;

// We will use shared_ptr and enable_shared_from_this because we want to keep
// the tcp_connection object alive as long as there is an operation that refers to it.
class TCPConnection : public std::enable_shared_from_this<TCPConnection>
{
public:
typedef boost::shared_ptr<TCPConnection> pointer;

TCPConnection(boost::asio::io_context &io_context)
: socket_(io_context),
strand_(io_context) {}

tcp::socket &socket() { return socket_; }
void start() { doRead(); }

private:
void doRead()
{
auto self = shared_from_this();
socket_.async_read_some(
boost::asio::buffer(buffer_, buffer_.size()),
boost::asio::bind_executor(strand_,
[this, self](boost::system::error_code ec, std::size_t bytes_transferred)
{
// error_code process here

if (!ec) {
doWrite(bytes_transferred);
}
}));

// Notice:
// You’ll capture the shared pointer to the session in the lambdas
// that process connection events. As long as there’s an event the session
// is waiting for, the session object won’t be deleted, because the
// lambda that handles that event will hold an instance of the shared pointer.
// when there are no more events you want to process, the object will be deleted.
}

void doWrite(std::size_t length)
{
auto self = shared_from_this();
boost::asio::async_write(
socket_, boost::asio::buffer(buffer_, length),
boost::asio::bind_executor(strand_,
[this, self](boost::system::error_code ec,
std::size_t /* bytes_transferred */)
{
if (!ec) {
doRead();
}
}));
}

private:
tcp::socket socket_;
boost::asio::io_context::strand strand_;
std::array<char, 8192> buffer_;
};

class EchoServer
{
public:
EchoServer(boost::asio::io_context &io_context, unsigned short port)
: io_context(io_context),
acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
{
// The constructor initialises an acceptor to listen on TCP port.
doAccept();
}

// The function doAccept() creates a socket and initiates
// an asynchronous accept operation to wait for a new connection.
void doAccept()
{
auto conn = std::make_shared<TCPConnection>(io_context);
acceptor_.async_accept(conn->socket(),
[this, conn](boost::system::error_code ec)
{
// services the client request
if (!ec) { conn->start(); }

// then calls start_accept() to initiate
// the next accept operation.
this->doAccept();
});
}

private:
boost::asio::io_context &io_context;
tcp::acceptor acceptor_;
};

int main(int argc, char *argv[])
{
AsioThreadPool pool(4);

unsigned short port = 5800;
EchoServer server(pool.getIOService(), port);

return 0;
}

附录:源码

参考资料