使用 C++20 协程实现 RDMA 操作

C++ 20 中的协程非常适合封装异步操作,可以像 JavaScript 或者 Rust 那样按照顺序的方法去编写异步代码。没有协程的时候,异步操作往往是通过回调函数的方式来实现的。这就要求我们手动将程序的状态保存起来,然后在回调操作的时候重新恢复之前的执行状态。对于一些简单的操作而言,手动保存状态也可以接受,但是这不 scalable。当一个函数涉及到很多异步操作的话,手动管理保存状态就会显得很繁琐。此外,由于操作被拆分成多个回调函数,在编写代码的时候逻辑会显得比较零散,既不方便编写,也不方便阅读。

协程实际上就是将状态的打包和回调函数的编写交给编译器来实现,这样程序员就可以用顺序而且紧凑的方法去编写异步代码了。C++ 20 中的协程比起其他语言的协程实现要复杂许多,而且不像其他语言有一个比较广泛使用的运行时库。换句话说,如果你想使用 C++ 20 的协程,需要按照你自己的应用需求去编写一个驱动协程运行的运行时。这个运行时做的事情并不复杂,只需要调用编译器打包好的回调函数就可以了,理论上使用什么作为运行时都无所谓。可以是 epoll 事件循环,可以是一个线程池,也可以是 RDMA。

本文提供了一种基于 epollibverbs 编写的 RDMA 操作库实现思路,将复杂的 RDMA 操作封装成基于协程的函数,大大提升了程序的可读性。其中 epoll 方面的作用类似 rdmacm,是在 Queue Pair(类似 TCP Socket)建立过程中使用 TCP 交换 QP 信息的。

完整代码见:https://github.com/howardlau1999/rdmapp

C++ 20 协程入门

网上关于 C++ 20 的协程资料比较多而且杂,但是都比较专注于某一个点的细节解析,比较少结合实际场合讲解的。正如前文所说,C++ 的协程实际上是编译器对我们的程序做的一种变换。我们使用协程语法,更像是在使用一种非常高级的宏,告诉编译器应该如何变换。例如,使用协程编写 TCP 服务器程序,伪代码可以理解为:

auto handle_connection(tcp_connection conn) {
  for (;;) {
    char buffer[4096];
    int n = co_await conn.recv(buffer, 4096);
    if (n == 0) co_return; // Connection closed
    // Process buffer... 
  }
}

这里的 recv 函数签名是:

tcp_connection::recv_awaitable tcp_connection::recv(char *buffer, size_t length) 

那么对于 co_await 那一行,实际上编译器会将程序转换为以下一系列操作:

  1. 调用 recv 函数,这个函数返回一个类 awaitable。一个 awaitable 可以理解为一个 promise,也就是一个可能完成了,也可能没有完成的异步操作。一个 awaitable 无需继承任何类,只需要实现以下三种方法:
    class recv_awaitable {
    public:
    bool await_ready();
    // 这里的返回值类型是什么后面会讲到
    ??? await_suspend(std::coroutine_handle<> h);
    int await_resume(); 
    };
  2. 然后,调用 awaitableawait_ready 函数,检查这个异步操作完成了没有。如果返回 true,就不用进行一系列复杂的打包操作了,继续执行。
  3. 重点来了,如果 await_ready 返回 false,也就是这个异步操作还没完成(例子里就是 TCP 还没数据可读),那么编译器会将函数还没执行完成的部分以及需要使用的状态(例如 buffer 和一些局部变量)打包成一个 coroutine_handle,我们不需要关心里面具体是怎么实现的,只需要知道,这个 coroutine_handle,有一个 resume 方法,用来执行函数还没执行完的部分,以及 done 方法,检查还有没有需要执行的部分。所以很简单,无论我们使用的库是什么,只需要在事件处理函数去调用 resume 方法就可以了!
  4. 那么这个 coroutine_handle 要怎么获取到?答案就是 await_suspend 方法,它的签名可以是:
    void await_suspend(std::coroutine_handle<> h);
    bool await_suspend(std::coroutine_handle<> h);
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> h);

第一次看到这些签名,一个疑惑就是为什么可以有参数相同返回值不一样的签名?还有一个疑惑就是 coroutine_handle<> 是什么意思?

对于第一个疑问,答案就是,这几个签名只能选择一种实现在 awaitable 里。编译器在编译的过程中,是可以分辨出不同返回值的签名的。根据返回值类型的不同,它们对于后续协程运行过程也有不同的影响。

对于第二个疑问,这其实是 C++ 中的一种编程技巧:类型擦除。这是因为实际上每一个协程的”剩余部分“都是不同的类型,而我们又不想用虚函数这种对性能有影响的方法,那么就可以将具体的类型给”擦除“掉,变成同一种类型,从而也可以使用统一的接口。

回到 await_suspend 的三幅面孔,不同的返回类型的作用是:

  • 对于 void 类型,当前函数(例子是 recv)的执行就到此为止了,直到下次调用 h.resume() 方法才会继续执行
  • 对于 bool 类型,含义和 await_ready 是相反的,如果返回了 false,函数就不会暂停,会继续执行,适合调用 API 失败的时候使用,否则就和 void 一样,暂停执行
  • 对于 std::coroutine_handle<> 类型,会调用这个返回了的协程类型的 resume 方法

那么我们需要在 await_suspend 方法里做的事就很清楚了,也就是调用其他库,把调用 h.resume() 作为回调函数注册到事件处理函数中。

最后,无论这个协程是在 await_ready 返回 true 后继续执行,又或者是 await_suspend 之后被继续执行了,都会在继续执行的一开始调用 await_resume 函数,并且把它的返回值作为整个 co_await 的返回值。

梳理下来,recv_awaitable 的实现思路就很清楚了:在 await_ready 里我们先试着 recv 一下,返回 EAGAIN 或者 EWOULDBLOCK 的话我们就返回 false,让编译器把协程抓手传递给我们,我们在 await_suspend 里把相关的 fd 和这个抓手的地址注册到 epoll 里。最后,在 await_resume 里,检查我们需不需要再 recv 一次,再把 recv 返回值返回即可。

伪代码就是:

extern int epfd;
class recv_awaitable {
  int fd_;
  int n_;
  char *buffer_;
  size_t length_;
public:
  recv_awaitable(int fd, char *buffer, size_t length) : fd_(fd), buffer_(buffer), length_(length) {}
  bool await_ready() {
    n_ = ::read(fd_, buffer_, length_);
    return n_ >= 0;
  }
  void await_suspend(std::coroutine_handle<> h) {
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.ptr = h.address();
    ::epoll_ctl(epfd, EPOLL_CTL_ADD, fd_, &event);
  }
  int await_resume() {
    if (n_ < 0) {
      ::epoll_ctl(epfd, EPOLL_CTL_DEL, fd_, nullptr);
      n_ = ::read(fd_, buffer_, length_);
    }
    return n_;
  }
};

而在事件循环中,我们像往常一样,不停 epoll_wait,然后调用事件的回调函数即可:

for (;;) {
  struct epoll_event event;
  ::epoll_wait(epfd, &event, 1);
  auto h = std::coroutine_handle<>::from_address(event.data.ptr);
  h.resume();
}

可以看到,我们只需要对代码进行一点点封装,就可以享受到使用协程编程的便利了。封装的方法就是将以前的 epoll_ctl 等系统调用封装成 awaitable 即可,这样经验也能轻松迁移。

当然,还有一个大问题没有解决,就是 handle_connection 这种调用了 co_await 的函数应该返回什么?我们也没有像普通函数一样使用 return,而是使用了 co_return,这又会带来什么不同?

首先,既然它能被 co_await,那么这个函数的返回值一定也需要提供上面说的 awaitable 的三个接口,否则编译器无法完成变换。而且,它也可以被“暂停执行”,也会被编译器打包成 std::coroutine_handle<>,同时,由于函数有返回值,也可能抛出异常,我们还需要更多接口去处理。

对于这种情况,我们需要返回一种 task 类型,它同样不需要继承任何类,需要实现的接口有:

template<class T>
class task {
public:
  struct promise_type {
    task<T> get_return_object() { return std::coroutine_handle::from_promise(*this); }
    awaitable initial_suspend();
    awaitable final_suspend();
    void return_value(T &&value);
    // 或者 
    void return_void();
  };
  task_awaitable operator co_await();
  task(std::coroutine_handle<promise_type> h) : h_(h) {}
  ~task() { h_.destroy(); }
  std::coroutine_handle<promise_type> h_;
};

其中一定要包含一个类型 promise_type,编译器才能做变换,变换后的结果大概是:

{
    promise-type promise promise-constructor-arguments ;
    try {
        co_await promise.initial_suspend() ;
        function-body
    } catch ( ... ) {
        if (!initial-await-resume-called)
            throw ;
        promise.unhandled_exception() ;
    }
final-suspend :
    co_await promise.final_suspend() ;
}

其中 initial_suspend 的作用是允许函数在开始执行之前先用一个 awaitable 暂停一下,比如你想先创建好任务,再一起运行。而 return_valuereturn_void 就如字面意思,在 co_return 的时候会调用。在返回前,还会再调用 final_suspend,也可以用 awaitable 暂停执行。这是为了可以做一些收尾工作,同时也可以嵌套调用协程。

这里的 task_awaitable 实现接口虽然相同,但是具体实现则不太一样。首先 await_ready 直接检查 h_.done(),而 await_suspend 则需要把这个 h_ 保存到某个位置,我们可以给 promise_type 添加一个成员变量 continuation_ 来保存,然后在 final_suspend 的时候返回出去,继续嵌套的协程。await_resume 则不需要实现什么。

template<class T>
class task {
  class task_awaitable {
    std::coroutine_handle<task<T>> h_;
    bool await_ready() { return h_.done(); }
    void await_suspend(std::coroutine_handle<> suspended) {
      h_.promise().continuation_ = suspended;
    }
    void await_resume() {}
  };
};

而这个 continuation_ 可以在 final_suspend 的时候返回出去,让它能够继续执行:

struct promise_type {
// ...
  auto final_suspend() noexcept {
    struct awaiter {
      bool await_ready() noexcept { return false; }
      std::coroutine_handle<>
      await_suspend(CoroutineHandle suspended) noexcept {
        if (suspended.promise().continuation_) {
          return suspended.promise().continuation_;
        } else {
          return std::noop_coroutine();
          }
        }
      void await_resume() noexcept {}
    };
    return awaiter{};
  }
};

这样,完整可用的协程运行时就编写完成了。代码量其实并不大,主要难度在于理解编译器是如何变换我们的程序,一旦理解后就可以轻松自如地编写协程程序了!

RDMA

RDMA 编程也和 epoll 类似,有一个循环不停地获取“完成事件”,这个完成事件中同样可以携带一个指针,利用这个指针,就可以完成我们的回调操作了,需要注意的是一般我们不在 poll 线程直接执行回调,而是放到别的线程去做,避免影响 poll:

for (;;) {
  struct ibv_wc wc;
  if (auto n = ::ibv_poll_cq(cq, &wc, 1); n > 0) {
    auto cb = reinterpret_cast<callback_ptr>(wc.wr_id);
    executor->run(cb);
  }
}

recv_awaitable 则和上面的 epoll 实现思路大同小异,在 await_suspend 函数中,发布一个 recv 操作(send 同理),把回调地址传到 wr_id 里就可以了。

void qp::send_awaitable::await_suspend(std::coroutine_handle<> h) {
  // ...
  auto callback = executor::make_callback([h, this](struct ibv_wc const &wc) {
    wc_ = wc;
    h.resume();
  });
  // ...
  send_wr.wr_id = reinterpret_cast<uint64_t>(callback);
  qp_->post_send(send_wr, bad_send_wr);
}

这里需要注意回调函数的生命周期起码要在回调结束之后才能结束。

扩展

如果想把协程放到后台去运行(类似 tokio::spawn 或者 thread::detach ),我们可以利用 std::promisestd::future,在协程运行结束时通过 promise 来设置值,调用方则使用 future 来等待执行完成或者获取返回值。而为了避免 task 被提前销毁,我们需要将其移动到堆上:

void detach() {
  auto detached_task = new task<void>(std::move(*this));
  h_.promise().set_detached_task(detached_task);
  detached_ = true;
}

对于已经被放到后台的任务,我们在 ~task 里就不销毁 coroutine_handle 了,而是在 final_suspend 后去注意释放 task 的内存即可。由于一旦注册回调之后,协程就会在事件循环的驱动下不断执行,所以我们也不需要轮询 task 的状态,如果想在完成后收到通知,用 std::future 就足够了。