DeepSeek 3FS 源码解读——协程&RDMA篇

重磅文件系统项目 3FS 在 DeepSeek 轰轰烈烈的开源周压轴登场,补齐了计算、网络以外的最后一块拼图——存储。和之前精巧的极致压榨性能或者提供巧妙算法的开源库不同,3FS 是完整的涉及多种节点、结合多种外部节点的高速并行文件系统,其代码结构清晰、模块间解耦程度高,充分利用了现代 C++ 的语言特性优化性能和代码可读性,还结合了 Rust 语言编写核心存储引擎,展现了 DeepSeek 工程师对复杂工程极强的驾驭能力。尽管已经提供了详尽的设计文档,其复杂程度对于想要阅读学习 3FS 项目的爱好者提出了不小的挑战。本文尝试一步步抽丝剥茧,在官方提供的设计文档之外,提供一些阅读源码的技巧和思路,希望能起到抛砖引玉的作用。

阅读完本文后,可以继续阅读DeepSeek 3FS 源码解读——磁盘 IO 篇

协程基本思想

现代网络和存储硬件基本已经全面拥抱了事件驱动或者完成事件驱动的异步编程范式。3FS 源码中最突出的一个特点便是大量使用了 C++ 20 提供的 coroutines 语言特性,使得程序能充分发挥异步性能的同时避免了大量的回调函数导致代码结构松散的问题。

在介绍 C++ 20 协程之前,我们先看看经典的异步编程是如何实现的。首先,异步编程最原始的 API 通常以回调的方式提供,总结起来为以下的伪代码:

class device {
    // 1. 提交 IO 任务
    error_code device::submit_io(io_request *req, function<(io_request *, io_result)> callback) {
        device_raw_io *raw_io;
        raw_io->ctx = new io_context(req, callback);
        raw_io->buf = req->buf;
        raw_io->opcode = from_req_op(req->op);
        driver_submit_io(device_ptr, raw_io);
    }

    // 2. 等待事件发生
    error_code device::wait_event();

    // 3. 读取事件/完成队列
    error_code poll_events(vector<event> &events);

    void loop() {
        while (wait_event());
        vector<event> events;
        poll_events(events);
        for (auto &event : events) {
            // 处理事件,调用回调
            // 回调中可能再次发起 IO 操作
            io_context *ctx = (io_context*)event.data;
            io_result res = io_result(event.status);
            ctx->callback(ctx->req, res);
        }
    }
};

// 事件循环
jthread io_thread([]() {
    io_device->loop();
});

jthread net_thread([]() {
    net_device->loop();
});

// 例如,读取磁盘上的数据,发送到网络上
void demo() {
    io_request *io_req, *net_req;
    io_req->buf = new buffer[1024];
    io_req->len = 1024;
    io_req->fd = 123;
    io_req->op = READ;

    io_device->submit_io(req, [](io_request *io_req, io_result) {
       memcpy(net_req->buf, io_req->buf, 1024);
       net_req->peer = 456;
       net_req->op = SEND;
       net_device->submit_io(req, [](io_request *net_req, io_result res) {
           if (!res.ok()) handle_error();
           print("ok");
       })
    });
} 

可以看到,如果一次请求处理涉及到多次异步操作,那么将会出现回调套回调的情况,并且使得生命周期变得非常难以管理。更不用说在 lambda 函数实现之前,需要手动分配上下文内存保存变量状态了。

为了解决异步编程回调地狱的问题,工程师提出了协程的解决方案,协程分为有栈协程和无栈协程两种。在 C++ 20 之前,工业界早已通过有栈协程实现了异步操作代码的同步化编写。有栈协程的基本实现原理是通过汇编指令或 ucontext 库保存恢复寄存器切换上下文的方式,将函数的栈从栈内存切换到堆内存中的空间,并将寄存器状态也保存到堆上使得协程发生切换时状态得以保留恢复。开发者只需要像写同步代码一样调用 readwrite 等函数,由协程库通过动态库或者静态库 Hook 的方式劫持这些系统调用,并在其中实现有栈协程的调度。这里有一些细节问题,例如切换上下文的开销,以及协程栈空间分配浪费的问题;同时有时候开发者会意识不到协程的切换点,或者在写异步代码导致调用了线程级别的同步锁或者阻塞的系统调用,导致线程 Hang 死。

为了解决有栈协程的问题,C++ 20 实现了无栈协程方案,引入了 co_await 以及 co_return 等关键词,由开发者自己显式地指定异步操作的切换点。具体的 coroutine 细节网上资料已经有很多,这里提供一些简化版的代码示例和讲解来帮助快速入门。

使用无栈协程,上面的代码可以改造成:

struct io_awaitable {
    coroutine_handle handle;
    io_request *req;
    io_result res;
    bool await_resume() { return false; }
    void await_suspend(coroutine_handle<> h) {
        // 挂起协程,发起 IO 操作
        this->handle = h;
        device_raw_io *raw_io;
        raw_io->ctx = this;
        raw_io->buf = req->buf;
        raw_io->opcode = from_req_op(req->op);
        driver_submit_io(device_ptr, raw_io);
    }
    io_result await_resume() {
        // 返回结果
        return res;
    }
};

class device {
    // 1. 提交 IO 任务
    io_awaitable device::submit_io(io_request *req) {
        return io_awaitable(req);
    }

    // 2. 等待事件发生
    error_code device::wait_event();

    // 3. 读取事件/完成队列
    error_code poll_events(vector<event> &events);

    void loop() {
        while (wait_event());
        vector<event> events;
        poll_events(events);
        for (auto &event : events) {
            // 处理事件,设置协程返回值
            io_awaitable *awaitable = (io_awaitble*) event.ctx;
            awaitable->res = io_result(event.status);
            // 继续协程
            awaitable->handle.resume();
        }
    }
};

// 事件循环
jthread io_thread([]() {
    io_device->loop();
});

jthread net_thread([]() {
    net_device->loop();
});

// 例如,读取磁盘上的数据,发送到网络上
CoTask<> demo() {
    io_request *io_req, *net_req;
    io_req->buf = new buffer[1024];
    io_req->len = 1024;
    io_req->fd = 123;
    io_req->op = READ;

    auto res = co_await io_device->submit_io(req);
    memcpy(net_req->buf, io_req->buf, 1024);
    net_req->peer = 456;
    net_req->op = SEND;
    res = co_await net_device->submit_io(req);
    if (!res.ok()) handle_error();
    co_return;
}
syncAwait(demo());

可以看到,我们通过将异步操作封装成 awaitable,就可以享受到 C++ 20 带来的协程语言特性的便利。其中最为关键的部分,一个是 await_suspend 函数,这个函数是协程被挂起后,编译器将协程的一些状态和后续的代码执行逻辑封装到了 coroutine_handle 中,此变量指向了堆内存中的一片空间,保存了驱动后续协程运行的状态信息和代码地址,IO 操作的开发者只需要将此变量妥善的保管起来,便能在 IO 操作完成后调用此类上的方法继续协程的运行;另一个是 co_await 关键词,开发者通过此关键词,告诉编译器此处需要发生一次协程调度,编译器便会在此处插入一个暂停点,并将后续可能用到的变量等状态,打包为 coroutine_handle,传入到 await_suspend 函数中,以便后续调度执行。

无栈协程的好处是,开发者不再需要通过栈指针或者 ucontext 切换上下文的方式来保存和恢复自己的函数状态了,因为这些工作都由编译器自行分析函数中的变量生命周期后打包到 coroutine_handle 中了。而且实际上,这里就是一次简单的函数调用,展开后可以理解为如下伪代码:

void continuation2(ctx_type2 *ctx2) {
    io_result res = ctx2->ret;
    if (!res.ok()) handle_error();
}

void continuation1(ctx_type1 *ctx1) {
    io_result res = ctx1->ret;
    ctx_type2 *ctx2 = new ctx2();
    store_vars(ctx2, var2, ...);
    io_awaitable a2 = io_device->submit_io(net_req);
    coroutine_handle h = from_ctx_and_cont(ctx2, continuation2);
    a2.await_suspend(h);
}

void demo() {
    ctx_type1 *ctx1 = new ctx1();
    store_vars(ctx1, var1, ...);
    io_awaitable a1 = io_device->submit_io(io_req);
    coroutine_handle h = from_ctx_and_cont(ctx1, continuation_1);
    a1.await_suspend(h);
}

所以,C++ 20 的协程,我更倾向于认为它是一个语法糖。事实上,C++ 20 的协程也仅仅提供了语言层面的一些语法支持,其标准库中并没有包含协程运行必须的调度器等关键组件。另外,await_suspend 仅仅提供了挂起和继续协程的功能,在实际编程中,我们通常会将多个异步操作也逐层封装成一个函数,对于这种嵌套的协程执行,还需要开发者自行封装一个类来保存嵌套关系,从而能正确地执行,这也是为什么示例中的函数签名,使用了 co_await 关键词的函数返回值是一个 CoTask 类。这也导致了无栈协程具有极强的侵入性,一旦代码中的一处使用了无栈协程,那么调用此代码上路径所涉及到的所有函数,都必须修改为无栈协程的调用方式,插入 co_await 点。

协程在 3FS 中的实现和应用

回到 3FS 的话题上,3FS 并没有选择自行编写协程库,甚至也没有使用 await_suspend 的方式来封装 IO(在代码库里是搜索不到这个关键词的),而是使用了 Facebook 开源的 folly 库,调用了其中实现的 Executor 等执行调度器驱动。由于 folly 本身已经将传统的网络 Socket 和 IO 等操作封装成为了协程函数,所以这里着重分析下 3FS 如何将 RDMA 操作封装成协程函数的。这里先建议读者自行下载 3FS 和其三方依赖库的代码后到本地后再继续参考阅读。

这里也提供下我的分析思路:对于封装底层的 IO 操作,一个常见的做法就是通过上述提到的 await_suspend 方法,来接收编译器挂起好的协程,但是没搜索到这个函数;所以换个思路,了解 RDMA 编程的开发者知道,发起 RDMA 操作调用 ibv_post_send,轮询并处理完成事件则是调用 ibv_poll_cq,搜索这两个函数顺藤摸瓜便能找到 3FS 是如何封装 RDMA 到协程中的了。

主要涉及到的代码文件是 src/net/IBSocket.{h,cc},在 IBSocket.cc 中,我们找到了两处关键地方:

int IBSocket::rdmaPostWR(RDMAPostCtx &ctx) {
  // ...
  wrs.rbegin()->next = nullptr;
  wrs.rbegin()->wr_id = WRId::rdma(&ctx, true);
  wrs.rbegin()->send_flags |= IBV_SEND_SIGNALED;

  // ...

  ibv_send_wr *bad = nullptr;
  const int ret = ibv_post_send(qp_.get(), &wrs[0], &bad);

  // ...
}

这里的 wr_id,实际上就是一次 RDMA 操作对应的 Context,设备在完成事件中,将程序传入的值原封不动的返回,这样程序就能找到完成事件对应的 Context 是什么,从而找到 Context 中包含的协程,将其重新调度运行。这里的 WRId 使用了 StampedPtr 技巧,利用了 64 位机器中的虚拟地址指针只有低 48 位用于寻址的特点,在未使用的高 16 位中保存了一些元信息:

  struct WRId : private folly::StampedPtr<void> {
    using Base = folly::StampedPtr<void>;

    static uint64_t send(uint32_t signalCount) { return WRId::pack(signalCount, WRType::SEND); }
    static uint64_t recv(uint32_t bufIndex) { return WRId::pack(bufIndex, WRType::RECV); }
    static uint64_t ack() { return WRId::pack(nullptr, WRType::ACK); }
    static uint64_t rdma(RDMAPostCtx *ptr, bool last) {
      return WRId::pack(ptr, last ? WRType::RDMA_LAST : WRType::RDMA);
    }
    static uint64_t close() { return WRId::pack(nullptr, WRType::CLOSE); }
    static uint64_t check() { return WRId::pack(nullptr, WRType::CHECK); }

    static uint64_t pack(uint32_t val, WRType type) { return Base::pack((void *)(uint64_t)val, (uint16_t)type); }
    static uint64_t pack(void *ptr, WRType type) { return Base::pack(ptr, (uint16_t)type); }

    WRId(uint64_t raw)
        : Base{raw} {}

    WRType type() const { return static_cast<WRType>(stamp()); }

    uint32_t sendSignalCount() const {
      assert(type() == WRType::SEND);
      return (uint32_t)(uint64_t)ptr();
    }
    uint32_t recvBufIndex() const {
      assert(type() == WRType::RECV);
      return (uint32_t)(uint64_t)ptr();
    }
    RDMAPostCtx *rdmaPostCtx() const {
      assert(type() == WRType::RDMA || type() == WRType::RDMA_LAST);
      return (RDMAPostCtx *)ptr();
    }
}

继续顺藤摸瓜,分析一下 RDMAPostCtx 这个类,位于 src/net/IBSocket.h

struct RDMAPostCtx {
    std::optional<std::reference_wrapper<folly::fibers::BatchSemaphore>> sem;
    std::optional<folly::fibers::BatchSemaphore::Waiter> waiter;

    ibv_wr_opcode opcode;
    std::span<const RDMAReq> reqs;
    std::span<RDMABuf> localBufs;
    size_t bytes = 0;

    folly::coro::Baton baton;
    ibv_wc_status status = IBV_WC_SUCCESS;
    std::chrono::steady_clock::time_point postBegin;
    std::chrono::steady_clock::time_point postEnd;

    CoTask<void> waitSem() {
      if (waiter.has_value()) {
        co_await waiter->baton;
      }
    }

    bool setError(ibv_wc_status error) {
      if (status == IBV_WC_SUCCESS) {
        status = error;
        return true;
      }
      return false;
    }

    __attribute__((no_sanitize("thread"))) void finish() {
      postEnd = std::chrono::steady_clock::now();
      baton.post();
    }
  };
}

这个类和之前示例中的 io_awaitable 类似,保存了一些 IO 请求的信息,并且预留了 status 字段保存返回结果,而其中的 finish() 函数,感觉便是调度协程运行的关键函数,尝试分析下调用方,发现位于 IBSocket::cqPoll 函数中。

void IBSocket::cqPoll(Events &events) {
  static constexpr int kPollCQBatch = 16;
  std::array<ibv_wc, kPollCQBatch> wcArr;

  int ret = 0;
  while ((ret = ibv_poll_cq(cq_.get(), kPollCQBatch, &wcArr[0])) > 0) {
    IBDBG("IBSocket {} get {} WCs", describe(), ret);
    for (int i = 0; i < ret; i++) {
      const ibv_wc &wc = wcArr[i];
      if (UNLIKELY(wc.status != IBV_WC_SUCCESS)) {
        wcError(wc);
        state_ = State::ERROR;
      } else if (UNLIKELY(wcSuccess(wc, events) != 0)) {
        state_ = State::ERROR;
      }
    }

    // break out of the loop if the CQ is empty
    if (ret < kPollCQBatch) break;
  }
}

wcSuccess 最终会调用的是此代码:

int IBSocket::onRDMAFinished(const ibv_wc &wc, Events &) {
  WRId wr(wc.wr_id);
  wr.rdmaPostCtx()->finish();
  return 0;
}

那么这里就能确定,finish 是在处理完成事件时被调用,从而唤醒协程了。我们继续分析下 Baton 类的实现原理(folly/experimental/coro/Baton.h),代码不多,直接贴过来了:

/// A baton is a synchronisation primitive for coroutines that allows a
/// coroutine to co_await the baton and suspend until the baton is posted by
/// some thread via a call to .post().
///
/// This primitive is typically used in the construction of larger library types
/// rather than directly in user code.
///
/// As a primitive, this is not cancellation-aware.
///
/// The Baton supports being awaited by multiple coroutines at a time. If the
/// baton is not ready at the time it is awaited then an awaiting coroutine
/// suspends. All suspended coroutines waiting for the baton to be posted will
/// be resumed when some thread next calls .post().
///
/// Example usage:
///
///   folly::coro::Baton baton;
///   std::string sharedValue;
///
///   folly::coro::Task<void> consumer()
///   {
///     // Wait until the baton is posted.
///     co_await baton;
///
///     // Now safe to read shared state.
///     std::cout << sharedValue << std::cout;
///   }
///
///   void producer()
///   {
///     // Write to shared state
///     sharedValue = "some result";
///
///     // Publish the value by 'posting' the baton.
///     // This will resume the consumer if it was currently suspended.
///     baton.post();
///   }
class Baton {
 public:
  class WaitOperation;

  /// Initialise the Baton to either the signalled or non-signalled state.
  explicit Baton(bool initiallySignalled = false) noexcept;

  ~Baton();

  /// Query whether the Baton is currently in the signalled state.
  bool ready() const noexcept;

  /// Asynchronously wait for the Baton to enter the signalled state.
  ///
  /// The returned object must be co_awaited from a coroutine. If the Baton
  /// is already signalled then the awaiting coroutine will continue without
  /// suspending. Otherwise, if the Baton is not yet signalled then the
  /// awaiting coroutine will suspend execution and will be resumed when some
  /// thread later calls post().
  [[nodiscard]] WaitOperation operator co_await() const noexcept;

  /// Set the Baton to the signalled state if it is not already signalled.
  ///
  /// This will resume any coroutines that are currently suspended waiting
  /// for the Baton inside 'co_await baton'.
  void post() noexcept;

  /// Atomically reset the baton back to the non-signalled state.
  ///
  /// This is a no-op if the baton was already in the non-signalled state.
  void reset() noexcept;

  class WaitOperation {
   public:
    explicit WaitOperation(const Baton& baton) noexcept : baton_(baton) {}

    bool await_ready() const noexcept { return baton_.ready(); }

    bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept {
      awaitingCoroutine_ = awaitingCoroutine;
      return baton_.waitImpl(this);
    }

    void await_resume() noexcept {}

   protected:
    friend class Baton;

    const Baton& baton_;
    coroutine_handle<> awaitingCoroutine_;
    WaitOperation* next_;
  };

 private:
  // Try to register the awaiter as
  bool waitImpl(WaitOperation* awaiter) const noexcept;

  // this  - Baton is in the signalled/posted state.
  // other - Baton is not signalled/posted and this is a pointer to the head
  //         of a potentially empty linked-list of Awaiter nodes that were
  //         waiting for the baton to become signalled.
  mutable std::atomic<void*> state_;
};

inline Baton::Baton(bool initiallySignalled) noexcept
    : state_(initiallySignalled ? static_cast<void*>(this) : nullptr) {}

inline bool Baton::ready() const noexcept {
  return state_.load(std::memory_order_acquire) ==
      static_cast<const void*>(this);
}

inline Baton::WaitOperation Baton::operator co_await() const noexcept {
  return Baton::WaitOperation{*this};
}

inline void Baton::reset() noexcept {
  // Transition from 'signalled' (ie. 'this') to not-signalled (ie. nullptr).
  void* oldState = this;
  (void)state_.compare_exchange_strong(
      oldState, nullptr, std::memory_order_acq_rel, std::memory_order_relaxed);
}

void Baton::post() noexcept {
  void* const signalledState = static_cast<void*>(this);
  void* oldValue = state_.exchange(signalledState, std::memory_order_acq_rel);
  if (oldValue != signalledState) {
    // We are the first thread to set the state to signalled and there is
    // a waiting coroutine. We are responsible for resuming it.
    WaitOperation* awaiter = static_cast<WaitOperation*>(oldValue);
    while (awaiter != nullptr) {
      std::exchange(awaiter, awaiter->next_)->awaitingCoroutine_.resume();
    }
  }
}

bool Baton::waitImpl(WaitOperation* awaiter) const noexcept {
  // Try to push the awaiter onto the front of the queue of waiters.
  const auto signalledState = static_cast<const void*>(this);
  void* oldValue = state_.load(std::memory_order_acquire);
  do {
    if (oldValue == signalledState) {
      // Already in the signalled state, don't enqueue it.
      return false;
    }
    awaiter->next_ = static_cast<WaitOperation*>(oldValue);
  } while (!folly::atomic_compare_exchange_weak_explicit(
      &state_,
      &oldValue,
      awaiter,
      std::memory_order_release,
      std::memory_order_acquire));
  return true;
}

查看 Baton 的实现,可以发现它确实使用了 await_suspendcoroutine_handle 保存起来了,并且在有人调用 post 的时候,通过调用 coroutine_handleresume 方法将其唤醒执行。

那么,什么时候发生的挂起呢?我们搜索 rdmaPostWR 的调用方,找到了:

CoTryTask<void> IBSocket::rdmaPost(RDMAPostCtx &ctx) {
  IBDBG("IBSocket {} postRdma: opcode {}, {} reqs", describe(), magic_enum::enum_name(ctx.opcode), ctx.reqs.size());

  if (ctx.waiter.has_value()) {
    co_await ctx.waiter->baton;
  }
  auto guard = folly::makeGuard([&]() { rdmaSem_.signal(ctx.reqs.size()); });
  CO_RETURN_ON_ERROR(checkState());

  if (auto ret = rdmaPostWR(ctx); UNLIKELY(ret != 0)) {
    co_return makeError(RPCCode::kRDMAPostFailed);
  }
  co_await ctx.baton;
  if (ctx.status != IBV_WC_SUCCESS) {
    XLOGF(DBG, "IBSocket {} RDMA failed, error {}", describe(), ibv_wc_status_str(ctx.status));
    co_return makeError(RPCCode::kRDMAError);
  }

  co_return Void{};
}

继续顺藤摸瓜,可以找到一些 RDMA 操作原语了:

class IBSocket {
  CoTryTask<void> rdmaRead(const RDMARemoteBuf &remoteBuf, RDMABuf &localBuf) {
    co_return co_await rdmaRead(remoteBuf, std::span(&localBuf, 1));
  }
  CoTryTask<void> rdmaRead(const RDMARemoteBuf &remoteBuf, std::span<RDMABuf> localBufs) {
    co_return co_await rdma(IBV_WR_RDMA_READ, remoteBuf, localBufs);
  }

  CoTryTask<void> rdmaWrite(const RDMARemoteBuf &remoteBuf, RDMABuf &localBuf) {
    co_return co_await rdmaWrite(remoteBuf, std::span(&localBuf, 1));
  }
  CoTryTask<void> rdmaWrite(const RDMARemoteBuf &remoteBuf, std::span<RDMABuf> localBufs) {
    co_return co_await rdma(IBV_WR_RDMA_WRITE, remoteBuf, localBufs);
  }
};

class RDMAReqBatch {
  CoTryTask<void> post() {
    co_return co_await socket_->rdmaBatch(opcode_, reqs_, localBufs_, waitLatency_, transferLatency_);
  }
}

这里已经到达了上层应用调用 RDMA 操作的边界处,上层应用通过 co_await rdmaWrite() 等方式去执行异步的 RDMA 操作,在 RDMA 操作结束前协程会被挂起,在完成后被调度执行。

这里还有一个 CoTryTask 还没有介绍,实际上它是 folly::coro::Task 一个简单的类型别名封装,它就是示例代码后面提到的,用于嵌套协程执行的类。它的关键逻辑是:

class Task {
  class Awaiter {
    bool await_ready() noexcept { return false; }

    template <typename Promise>
    FOLLY_NOINLINE auto await_suspend(
        coroutine_handle<Promise> continuation) noexcept {
      DCHECK(coro_);
      auto& promise = coro_.promise();

      promise.continuation_ = continuation;

      auto& calleeFrame = promise.getAsyncFrame();
      calleeFrame.setReturnAddress();

      if constexpr (detail::promiseHasAsyncFrame_v<Promise>) {
        auto& callerFrame = continuation.promise().getAsyncFrame();
        folly::pushAsyncStackFrameCallerCallee(callerFrame, calleeFrame);
        return coro_;
      } else {
        folly::resumeCoroutineWithNewAsyncStackRoot(coro_);
        return;
      }
    }

    T await_resume() {
      DCHECK(coro_);
      SCOPE_EXIT { std::exchange(coro_, {}).destroy(); };
      return std::move(coro_.promise().result()).value();
    }
  };
};

最关键的地方在于保存好 continuation 点,用于正确执行被嵌套的协程。这里还值得一提的是,folly 中通过 AsyncFrame 来保存协程的调用关系,从而能在调试时展开协程调用链帮助恢复现场。当然,这会引入额外开销,如果是自己编写协程库,可以去掉这一步。其他关于协程嵌套封装的逻辑,网络上介绍 C++ 20 协程的资料也都会详细讲解,这里不再展开了。

至此,我们已经梳理清楚了示例中的 IO 异步操作 Demo 逻辑,是如何在 3FS 中的 RDMA 操作实战的了。事实上,协程调度不仅仅局限于 IO 异步操作,在一些需要发生自定义的调度逻辑时,我们同样可以通过 co_await 灵活地挂起或调度协程的执行。

留意到,在 IBSocket::rdmaPost 中,有一段这样的逻辑:

CoTryTask<void> IBSocket::rdmaPost(RDMAPostCtx &ctx) {
  if (ctx.waiter.has_value()) {
    co_await ctx.waiter->baton;
  }
}

这个 waiter 变量,是一个 folly::fibers::BatchSemaphore::Waiter 类,可以理解为是一个信号量等待者的实现,用于限制并发数,在令牌不足的时候,负责调用信号量的 wait 函数挂起协程:

bool SemaphoreBase::waitSlow(Waiter& waiter, int64_t tokens) {
  // Slow path, create a baton and acquire a mutex to update the wait list
  {
    auto waitListLock = waitList_.wlock();
    auto& waitList = *waitListLock;

    auto testVal = tokens_.load(std::memory_order_acquire);
    if (testVal >= tokens) {
      return false;
    }
    // prepare baton and add to queue
    waitList.push_back(waiter);
    assert(!waitList.empty());
  }
  // Signal to caller that we managed to push a waiter
  return true;
}

并在令牌重新充足后,通过 baton.post() 调度执行被挂起的协程。

信号量是 rdmaSem_,相关的 PV 操作位于:


CoTryTask<void> IBSocket::rdmaBatch(...) {
  // ...
  /* setup post info and wait on semaphore */
  for (size_t i = 0; i < numPosts; i++) {
    auto &post = posts[i];
    post.opcode = opcode;
    post.reqs = reqs.subspan(i * wrsPerPost, std::min(reqs.size() - i * wrsPerPost, wrsPerPost));
    post.localBufs = localBufs;
    post.waiter.emplace(post.reqs.size());
    // 尝试获取信号量
    if (rdmaSem_.try_wait(post.waiter.value(), post.reqs.size())) {
      // 获取成功了就不必等待
      post.waiter = std::nullopt;
    }
  }
  // ...
}

CoTryTask<void> IBSocket::rdmaPost(RDMAPostCtx &ctx) {
  IBDBG("IBSocket {} postRdma: opcode {}, {} reqs", describe(), magic_enum::enum_name(ctx.opcode), ctx.reqs.size());

  if (ctx.waiter.has_value()) {
    co_await ctx.waiter->baton;
  }
  // 释放信号量,通过 RAII 实现在此函数执行完后调用
  auto guard = folly::makeGuard([&]() { rdmaSem_.signal(ctx.reqs.size()); });
  CO_RETURN_ON_ERROR(checkState());

  if (auto ret = rdmaPostWR(ctx); UNLIKELY(ret != 0)) {
    co_return makeError(RPCCode::kRDMAPostFailed);
  }
  co_await ctx.baton;
  if (ctx.status != IBV_WC_SUCCESS) {
    XLOGF(DBG, "IBSocket {} RDMA failed, error {}", describe(), ibv_wc_status_str(ctx.status));
    co_return makeError(RPCCode::kRDMAError);
  }

  co_return Void{};
}

此外,在 RDMATransmissionLimiter 类中也能看到类似的思想。

事件驱动

上面介绍的是一些离散的协程操作,我们现在尝试分析一下 3FS 是如何驱动事件循环让这些协程顺利地调度起来的。既然是异步操作,那么是免不了事件循环的,我们还是从 ibv_poll_cq 这个函数出发,找到调用它的事件循环逻辑。

可以从代码看到,有个函数 IBSocket::poll 是很明显的轮询事件的函数,调用它的是 Transport::handleEvents,再往上寻找,可以发现有一个 EventLoop::loop 函数调用了它,这里就是非常经典的基于 epoll 的事件循环了。

那么只需要分析,什么事件触发了 RDMA 的 handleEvents 即可。了解 RDMA 编程的开发者可能知道,RDMA 只规定了 Busy Loop 轮询完成事件的方式。这种方式延迟最低,但是无法很好地和其他事件循环方式(如 epoll)融合起来,要么将其他事件循环也改成 Busy Loop,要么单开一个线程专门轮询 RDMA,这两种方式各有弊端。为此,ibverbs 提供了一个完成事件通知通道,即 struct ibv_comp_channel,里面含有一个 fd,可以被加入到 epoll 等事件循环中,然后在完成队列有事件时,这个 fd 会被触发可读事件。所以,开发者只需要在这个 fd 被触发读事件时,轮询 RDMA 完成队列即可。具体到 3FS,就是 IOWorker::addIBSocket 这个函数将 RDMA 的 fd 添加到事件循环中的。代码的逻辑是: Transport 作为一个 EventHandler 的派生类,里面包含了 IBSocket,在 EventLoop::add 时,调用了 EventHandler::fd()(此时实际调用 Transport::fd() { return socket_->fd(); }),从而将 RDMA 的 fd 添加到事件循环中的。

EventLoop::start 就是负责拉起一个线程,循环执行 EventLoop::loop 函数。

Result<Void> EventLoop::start(const std::string &threadName) {
  // ...
  thread_ = std::jthread(&EventLoop::loop, this);
  folly::setThreadName(thread_.get_id(), threadName);
  return Void{};
}

至此,网络协程部分链路打通。

总结

本文简单地介绍了 C++ 20 协程引入的历史和基本思想,为读者提供了分析 C++ 20 协程代码的一些思路,并应用这些思路分析 3FS 中将 RDMA 封装为协程操作的逻辑,以及分析了一些 3FS 中对非 IO 操作的协程操作使用,最后再介绍了事件循环如何驱动所有逻辑的执行,希望能给大家阅读源码提供一些思路和技巧。

继续阅读:DeepSeek 3FS 源码解读——磁盘 IO 篇