C++ 20 中的协程非常适合封装异步操作,可以像 JavaScript 或者 Rust 那样按照顺序的方法去编写异步代码。没有协程的时候,异步操作往往是通过回调函数的方式来实现的。这就要求我们手动将程序的状态保存起来,然后在回调操作的时候重新恢复之前的执行状态。对于一些简单的操作而言,手动保存状态也可以接受,但是这不 scalable。当一个函数涉及到很多异步操作的话,手动管理保存状态就会显得很繁琐。此外,由于操作被拆分成多个回调函数,在编写代码的时候逻辑会显得比较零散,既不方便编写,也不方便阅读。
协程实际上就是将状态的打包和回调函数的编写交给编译器来实现,这样程序员就可以用顺序而且紧凑的方法去编写异步代码了。C++ 20 中的协程比起其他语言的协程实现要复杂许多,而且不像其他语言有一个比较广泛使用的运行时库。换句话说,如果你想使用 C++ 20 的协程,需要按照你自己的应用需求去编写一个驱动协程运行的运行时。这个运行时做的事情并不复杂,只需要调用编译器打包好的回调函数就可以了,理论上使用什么作为运行时都无所谓。可以是 epoll
事件循环,可以是一个线程池,也可以是 RDMA。
本文提供了一种基于 epoll
和 ibverbs
编写的 RDMA 操作库实现思路,将复杂的 RDMA 操作封装成基于协程的函数,大大提升了程序的可读性。其中 epoll
方面的作用类似 rdmacm
,是在 Queue Pair(类似 TCP Socket)建立过程中使用 TCP 交换 QP 信息的。
完整代码见:https://github.com/howardlau1999/rdmapp
C++ 20 协程入门
网上关于 C++ 20 的协程资料比较多而且杂,但是都比较专注于某一个点的细节解析,比较少结合实际场合讲解的。正如前文所说,C++ 的协程实际上是编译器对我们的程序做的一种变换。我们使用协程语法,更像是在使用一种非常高级的宏,告诉编译器应该如何变换。例如,使用协程编写 TCP 服务器程序,伪代码可以理解为:
auto handle_connection(tcp_connection conn) {
for (;;) {
char buffer[4096];
int n = co_await conn.recv(buffer, 4096);
if (n == 0) co_return; // Connection closed
// Process buffer...
}
}
这里的 recv
函数签名是:
tcp_connection::recv_awaitable tcp_connection::recv(char *buffer, size_t length)
那么对于 co_await
那一行,实际上编译器会将程序转换为以下一系列操作:
- 调用
recv
函数,这个函数返回一个类awaitable
。一个awaitable
可以理解为一个promise
,也就是一个可能完成了,也可能没有完成的异步操作。一个awaitable
无需继承任何类,只需要实现以下三种方法:class recv_awaitable { public: bool await_ready(); // 这里的返回值类型是什么后面会讲到 ??? await_suspend(std::coroutine_handle<> h); int await_resume(); };
- 然后,调用
awaitable
的await_ready
函数,检查这个异步操作完成了没有。如果返回true
,就不用进行一系列复杂的打包操作了,继续执行。 - 重点来了,如果
await_ready
返回false
,也就是这个异步操作还没完成(例子里就是 TCP 还没数据可读),那么编译器会将函数还没执行完成的部分以及需要使用的状态(例如 buffer 和一些局部变量)打包成一个coroutine_handle
,我们不需要关心里面具体是怎么实现的,只需要知道,这个coroutine_handle
,有一个resume
方法,用来执行函数还没执行完的部分,以及done
方法,检查还有没有需要执行的部分。所以很简单,无论我们使用的库是什么,只需要在事件处理函数去调用resume
方法就可以了! - 那么这个
coroutine_handle
要怎么获取到?答案就是await_suspend
方法,它的签名可以是:void await_suspend(std::coroutine_handle<> h); bool await_suspend(std::coroutine_handle<> h); std::coroutine_handle<> await_suspend(std::coroutine_handle<> h);
第一次看到这些签名,一个疑惑就是为什么可以有参数相同返回值不一样的签名?还有一个疑惑就是 coroutine_handle<>
是什么意思?
对于第一个疑问,答案就是,这几个签名只能选择一种实现在 awaitable
里。编译器在编译的过程中,是可以分辨出不同返回值的签名的。根据返回值类型的不同,它们对于后续协程运行过程也有不同的影响。
对于第二个疑问,这其实是 C++ 中的一种编程技巧:类型擦除。这是因为实际上每一个协程的”剩余部分“都是不同的类型,而我们又不想用虚函数这种对性能有影响的方法,那么就可以将具体的类型给”擦除“掉,变成同一种类型,从而也可以使用统一的接口。
回到 await_suspend
的三幅面孔,不同的返回类型的作用是:
- 对于
void
类型,当前函数(例子是recv
)的执行就到此为止了,直到下次调用h.resume()
方法才会继续执行 - 对于
bool
类型,含义和await_ready
是相反的,如果返回了false
,函数就不会暂停,会继续执行,适合调用 API 失败的时候使用,否则就和void
一样,暂停执行 - 对于
std::coroutine_handle<>
类型,会调用这个返回了的协程类型的resume
方法
那么我们需要在 await_suspend
方法里做的事就很清楚了,也就是调用其他库,把调用 h.resume()
作为回调函数注册到事件处理函数中。
最后,无论这个协程是在 await_ready
返回 true
后继续执行,又或者是 await_suspend
之后被继续执行了,都会在继续执行的一开始调用 await_resume
函数,并且把它的返回值作为整个 co_await
的返回值。
梳理下来,recv_awaitable
的实现思路就很清楚了:在 await_ready
里我们先试着 recv
一下,返回 EAGAIN
或者 EWOULDBLOCK
的话我们就返回 false
,让编译器把协程抓手传递给我们,我们在 await_suspend
里把相关的 fd 和这个抓手的地址注册到 epoll 里。最后,在 await_resume
里,检查我们需不需要再 recv
一次,再把 recv
返回值返回即可。
伪代码就是:
extern int epfd;
class recv_awaitable {
int fd_;
int n_;
char *buffer_;
size_t length_;
public:
recv_awaitable(int fd, char *buffer, size_t length) : fd_(fd), buffer_(buffer), length_(length) {}
bool await_ready() {
n_ = ::read(fd_, buffer_, length_);
return n_ >= 0;
}
void await_suspend(std::coroutine_handle<> h) {
struct epoll_event event;
event.events = EPOLLIN;
event.data.ptr = h.address();
::epoll_ctl(epfd, EPOLL_CTL_ADD, fd_, &event);
}
int await_resume() {
if (n_ < 0) {
::epoll_ctl(epfd, EPOLL_CTL_DEL, fd_, nullptr);
n_ = ::read(fd_, buffer_, length_);
}
return n_;
}
};
而在事件循环中,我们像往常一样,不停 epoll_wait
,然后调用事件的回调函数即可:
for (;;) {
struct epoll_event event;
::epoll_wait(epfd, &event, 1);
auto h = std::coroutine_handle<>::from_address(event.data.ptr);
h.resume();
}
可以看到,我们只需要对代码进行一点点封装,就可以享受到使用协程编程的便利了。封装的方法就是将以前的 epoll_ctl
等系统调用封装成 awaitable
即可,这样经验也能轻松迁移。
当然,还有一个大问题没有解决,就是 handle_connection
这种调用了 co_await
的函数应该返回什么?我们也没有像普通函数一样使用 return
,而是使用了 co_return
,这又会带来什么不同?
首先,既然它能被 co_await
,那么这个函数的返回值一定也需要提供上面说的 awaitable
的三个接口,否则编译器无法完成变换。而且,它也可以被“暂停执行”,也会被编译器打包成 std::coroutine_handle<>
,同时,由于函数有返回值,也可能抛出异常,我们还需要更多接口去处理。
对于这种情况,我们需要返回一种 task
类型,它同样不需要继承任何类,需要实现的接口有:
template<class T>
class task {
public:
struct promise_type {
task<T> get_return_object() { return std::coroutine_handle::from_promise(*this); }
awaitable initial_suspend();
awaitable final_suspend();
void return_value(T &&value);
// 或者
void return_void();
};
task_awaitable operator co_await();
task(std::coroutine_handle<promise_type> h) : h_(h) {}
~task() { h_.destroy(); }
std::coroutine_handle<promise_type> h_;
};
其中一定要包含一个类型 promise_type
,编译器才能做变换,变换后的结果大概是:
{
promise-type promise promise-constructor-arguments ;
try {
co_await promise.initial_suspend() ;
function-body
} catch ( ... ) {
if (!initial-await-resume-called)
throw ;
promise.unhandled_exception() ;
}
final-suspend :
co_await promise.final_suspend() ;
}
其中 initial_suspend
的作用是允许函数在开始执行之前先用一个 awaitable
暂停一下,比如你想先创建好任务,再一起运行。而 return_value
和 return_void
就如字面意思,在 co_return
的时候会调用。在返回前,还会再调用 final_suspend
,也可以用 awaitable
暂停执行。这是为了可以做一些收尾工作,同时也可以嵌套调用协程。
这里的 task_awaitable
实现接口虽然相同,但是具体实现则不太一样。首先 await_ready
直接检查 h_.done()
,而 await_suspend
则需要把这个 h_
保存到某个位置,我们可以给 promise_type
添加一个成员变量 continuation_
来保存,然后在 final_suspend
的时候返回出去,继续嵌套的协程。await_resume
则不需要实现什么。
template<class T>
class task {
class task_awaitable {
std::coroutine_handle<task<T>> h_;
bool await_ready() { return h_.done(); }
void await_suspend(std::coroutine_handle<> suspended) {
h_.promise().continuation_ = suspended;
}
void await_resume() {}
};
};
而这个 continuation_
可以在 final_suspend
的时候返回出去,让它能够继续执行:
struct promise_type {
// ...
auto final_suspend() noexcept {
struct awaiter {
bool await_ready() noexcept { return false; }
std::coroutine_handle<>
await_suspend(CoroutineHandle suspended) noexcept {
if (suspended.promise().continuation_) {
return suspended.promise().continuation_;
} else {
return std::noop_coroutine();
}
}
void await_resume() noexcept {}
};
return awaiter{};
}
};
这样,完整可用的协程运行时就编写完成了。代码量其实并不大,主要难度在于理解编译器是如何变换我们的程序,一旦理解后就可以轻松自如地编写协程程序了!
RDMA
RDMA 编程也和 epoll
类似,有一个循环不停地获取“完成事件”,这个完成事件中同样可以携带一个指针,利用这个指针,就可以完成我们的回调操作了,需要注意的是一般我们不在 poll 线程直接执行回调,而是放到别的线程去做,避免影响 poll:
for (;;) {
struct ibv_wc wc;
if (auto n = ::ibv_poll_cq(cq, &wc, 1); n > 0) {
auto cb = reinterpret_cast<callback_ptr>(wc.wr_id);
executor->run(cb);
}
}
而 recv_awaitable
则和上面的 epoll
实现思路大同小异,在 await_suspend
函数中,发布一个 recv
操作(send
同理),把回调地址传到 wr_id
里就可以了。
void qp::send_awaitable::await_suspend(std::coroutine_handle<> h) {
// ...
auto callback = executor::make_callback([h, this](struct ibv_wc const &wc) {
wc_ = wc;
h.resume();
});
// ...
send_wr.wr_id = reinterpret_cast<uint64_t>(callback);
qp_->post_send(send_wr, bad_send_wr);
}
这里需要注意回调函数的生命周期起码要在回调结束之后才能结束。
扩展
如果想把协程放到后台去运行(类似 tokio::spawn
或者 thread::detach
),我们可以利用 std::promise
和 std::future
,在协程运行结束时通过 promise
来设置值,调用方则使用 future
来等待执行完成或者获取返回值。而为了避免 task
被提前销毁,我们需要将其移动到堆上:
void detach() {
auto detached_task = new task<void>(std::move(*this));
h_.promise().set_detached_task(detached_task);
detached_ = true;
}
对于已经被放到后台的任务,我们在 ~task
里就不销毁 coroutine_handle
了,而是在 final_suspend
后去注意释放 task
的内存即可。由于一旦注册回调之后,协程就会在事件循环的驱动下不断执行,所以我们也不需要轮询 task
的状态,如果想在完成后收到通知,用 std::future
就足够了。