DeepSeek 3FS 源码解读——RPC 篇

趁热打铁,这回我们看看 3FS 的 RPC 子系统是如何搭建的。粗略分析后,3FS 没有采用类似 Protocol Buffers 或者 Flatbuffers 等 IDL 语言,而是大量使用了 C++ 模板以及宏实现了一套简易的反射机制,并且使用 C++ 20 Concepts 和编译时 constexpr 使一些模板逻辑更清晰易读,从而使开发者用宏就能在 C++ 源代码中定义出 RPC 的请求回复结构体,以及 RPC 服务接口定义。这一块又是相当能体现 DeepSeek “妖”的代码风格,如果说协程部分是现代 C++,那么 RPC 框架的代码就是现代 C++,不禁让人感叹这是人能写出来的代码???

本文分析序列化工具和网络收发调用关系。还是请先在阅读本文前,阅读《DeepSeek 3FS 源码解读——协程&RDMA篇》大致了解 3FS 的网络处理流程。

RPC 框架主要逻辑位于 src/common/serde 目录下。

结构体序列化与反序列化

src/common/serde/Serde.h 中主要负责的是提供宏 SERDE_STRUCT_FIELD(NAME, DEFAULT) 用于定义 struct 中的字段,并自动通过反射注册字段,使得 RPC 框架能够自动进行序列化/反序列化。这个头文件中杂糅了二进制/JSON/TOML的序列化反序列化的逻辑。

先研究下 SERDE_STRUCT_FIELD,挑一个展开后的类来研究一下,以 UpdateIO 前两个字段为例:

// 展开 SERDE_STRUCT_FIELD(offset, uint32_t{});
// 展开 SERDE_STRUCT_FIELD(length, uint32_t{});

private:  
  friend struct ::hf3fs::refl::Helper;  
  struct Toffset : std::type_identity<decltype(CollectField(::hf3fs::refl::Rank<>{}))> {};  

public:  
  std::decay_t<decltype(uint32_t{})> offset = uint32_t{};  

protected:  
  constexpr auto Toffset() -> ::hf3fs::thief::steal<struct Toffset, std::decay_t<decltype(*this)>>;

  static ::hf3fs::refl::Append_t<  
  typename Toffset::type,  
  decltype((::hf3fs::serde::FieldInfo<"offset", &::hf3fs::thief::retrieve<struct Toffset>::offset>{}))>
CollectField(::hf3fs::refl::Rank<std::tuple_size_v<typename Toffset::type> + 1>);

private:  
  friend struct ::hf3fs::refl::Helper;  
  struct Tlength : std::type_identity<decltype(CollectField(::hf3fs::refl::Rank<>{}))> {};  

public:  
  std::decay_t<decltype(uint32_t{})> length = uint32_t{};  

protected:  
  constexpr auto Tlength() -> ::hf3fs::thief::steal<struct Tlength, std::decay_t<decltype(*this)>>;  
  static ::hf3fs::refl::Append_t<  
  typename Tlength::type,  
  decltype((::hf3fs::serde::FieldInfo<"length", &::hf3fs ::thief::retrieve<struct Tlength>::length>{}))>  
CollectField(::hf3fs::refl::Rank<std::tuple_size_v<typename Tlength::type> + 1>)

其中 stealretrieve 定义如下:

namespace hf3fs::thief {
namespace detail {

template <typename Tag>
struct Bridge {
  friend consteval auto ADL(Bridge<Tag>);
};

template <typename Tag, typename Store>
struct StealType {
  friend consteval auto ADL(Bridge<Tag>) { return std::type_identity<Store>{}; }
};

}  // namespace detail

template <typename Tag, typename Store>
using steal = decltype(detail::StealType<Tag, Store>{});

template <typename Tag>
using retrieve = typename decltype(ADL(detail::Bridge<Tag>{}))::type;

}  // namespace hf3fs::thief

我承认这里我实在看不太懂了,只能大概说说原理。看模板就一个诀窍:人肉展开!steal 在这里会实例化为 decltype(detail::StealType<Toffset, UpdateIO>{}),那么也就实例化出了里面的 ADL 方法。而 retrieve 实例化为 typename decltype(ADL(detail::Bridge<struct Toffset>{}))::type,通过 ADL 方法会匹配到该标签类曾经绑定到的 Store,也就是 UpdateIO

更新:经过请教多位网友,这里是一种名为 Friend Injection 的技巧,实现原理参考 https://zhuanlan.zhihu.com/p/646752343;DeepSeek 研发博客也有记录使用这种技巧是为了在类内部获取当前类的名字方便宏展开,参考 https://sf-zhou.github.io/programming/cpp_static_reflection.html 以及 https://github.com/MitalAshok/self_macro。也可以用来反射对象的私有字段,原理利用了类模版显式实例化时可以访问私有字段。

阿里开源的 yalantinglibs 采用了更先进的反射技巧,ylt 的反射甚至都不需要宏。
https://alibaba.github.io/yalantinglibs/zh/reflection/reflection_introduction.html
直接访问对象的所有字段,包括字段个数、字段名、字段值、字段索引等信息。

Append_t 则负责将当前的 field 注册到参数列表中,形成一个新的重载,经过一系列的模板实例化和别名替换后,最终展开的形式其实是:

private:  
  friend struct ::hf3fs::refl::Helper;
  // Rank<> 就是 Rank<64> 
  // 然后递归地 Rank<64> : Rank<63>,Rank<63> : Rank<62> 一路继承到 Rank<0>,终止递归
  // 由于 offset 是第一个字段,所以直接匹配到了默认的
  // [[maybe_unused]] static std::tuple<> CollectField(::hf3fs::refl::Rank<0>);
  // 此时展开为 
  struct Toffset : std::type_identity<std::tuple<>> {};  
  // 所以也能看出 RPC 框架应该最多支持 64 个字段

public:  
  uint32_t offset = uint32_t{};  

protected:
  // 显式实例化 steal,实现 Friend Injection
  constexpr auto Toffset() -> detail::StealType<Toffset, UpdateIO>;

  // 注意此处将新的 Field 添加到 Tuple 中
  // 并且此时 CollectField 形参中变为 Rank<1>
  // 由于 Rank 是按照从大到小去匹配形参
  // 下一次 decltype(CollectField(::hf3fs::refl::Rank<>{}))
  // 就会匹配到此函数声明,从而 decltype 会获取到添加了 field 后
  // 的新的 tuple 类型
  static ::hf3fs::refl::Append_t<  
  std::tuple<>,  
  ::hf3fs::serde::FieldInfo<"offset", &UpdateIO::offset>
  >
CollectField(::hf3fs::refl::Rank<1>);

private:  
  friend struct ::hf3fs::refl::Helper;  
  struct Tlength : std::type_identity<std::tuple<::hf3fs::serde::FieldInfo<"offset", &UpdateIO::offset>>> {};  

public:  
  uint32_t length = uint32_t{};  

protected:  
  constexpr auto Tlength() -> detail::StealType<Tlength, UpdateIO>;  
  static ::hf3fs::refl::Append_t<  
  std::tuple<::hf3fs::serde::FieldInfo<"offset", &UpdateIO::offset>>,  
  ::hf3fs::serde::FieldInfo<"length", &UpdateIO::length> 
CollectField(::hf3fs::refl::Rank<2>);

到这里注册 field 的逻辑我们就基本搞明白了,其实就是通过一系列的模板实例化,将 FieldInfo 通过声明新的 CollectField 重载不断添加到 tuple 中。

现在看看怎么获取一个类中注册了哪些 field,其实很简单,直接获取 Rank 最大的 CollectField 重载的返回值类型,就能拿到包含全部 FieldInfotuple,剩下的就是通过模板操作,遍历或者索引我们想要的 tuple 元素即可。

struct Helper {
  template <class T>
  static decltype(CollectField(refl::Rank<>{})) getFieldInfo();
  template <class T>
  requires requires {
    { T::CollectField(refl::Rank<>{}) } -> is_specialization<std::tuple>;
  }
  static decltype(T::CollectField(refl::Rank<>{})) getFieldInfo();

  template <class T>
  using FieldInfoList = decltype(getFieldInfo<T>());

  template <typename T>
  static constexpr auto Size = std::tuple_size_v<FieldInfoList<T>>;

  template <class T, size_t I>
  using FieldInfo = std::tuple_element_t<I, FieldInfoList<T>>;
};

template <SerdeType T>
constexpr inline auto count() {
  return refl::Helper::Size<T>;
}

template <SerdeType T, size_t Index>
constexpr inline auto name() {
  return refl::Helper::FieldInfo<T, Index>::name;
}

template <SerdeType T, size_t Index>
constexpr inline auto getter() {
  return refl::Helper::FieldInfo<T, Index>::getter;
}

constexpr inline auto count(auto &&o) { return count<std::decay_t<decltype(o)>>(); }

template <size_t Index>
constexpr inline auto name(auto &&o) {
  return name<std::decay_t<decltype(o)>, Index>();
}

template <size_t Index>
constexpr inline auto &value(auto &&o) {
  return o.*getter<std::decay_t<decltype(o)>, Index>();
}

template <size_t I = 0>
constexpr inline auto iterate(auto &&f, auto &&o, auto &&...args) {
  return refl::Helper::iterate<std::decay_t<decltype(o)>>(
      [&](auto type) { return f(type.name, o.*type.getter, args...); });
}

序列化的函数是:

template <class O>
inline void serialize(auto &&o, Out<O> &out)

里面通过一大堆的 if constexpr (requires { o.serdeToReadable(); } && !isBinaryOut) 类似的语句,去尝试匹配传进来的第一个参数的对象类型应该调用什么方法序列化。如果没有 C++ 17/20 的 if constexpr 那么将需要一大堆的 std::enable_if 通过 SFINAE 来编写相同的逻辑,那将会更加反人类。

对于一整个序列化类来说起点是:

  else if constexpr (SerdeType<T>) {
    auto start = out.tableBegin(false);
    if constexpr (isBinaryOut) {
      refl::Helper::iterate<T, true>([&](auto type) { serialize(o.*type.getter, out); },
                                     [&] { out.tableEnd(start), start = out.tableBegin(false); });
    } else {
      refl::Helper::iterate<T>([&](auto type) { out.key(type.name), serialize(o.*type.getter, out); });
    }
    out.tableEnd(start);
  }

调用了 Helper::iterate 去遍历每个 FieldInfo 然后调用序列化函数,递归地再次去匹配序列化函数。倒是还挺好理解,就是实现上其实也是使用了大量模板技巧,这里不贴代码了。

对于需要序列化/反序列化的类型,需要使用模板特化的方式,定义带有序列化/反序列化的工具类,例如:

template <class T>
struct SerdeMethod {
  // 真正的序列化反序列化
  static auto serialize(const T &o, auto &out) = delete;
  static auto serializeReadable(const T &o, auto &out) = delete;
  static auto deserialize(T &o, auto &in) = delete;
  static Result<T> deserializeReadable(T &o, auto &out) = delete;

  // 把类型序列化到别的中间类型
  static auto serdeTo(const T &t) = delete;
  static auto serdeToReadable(const T &o) = delete;
  static Result<T> serdeFrom(auto) = delete;
  static Result<T> serdeFromReadable(auto) = delete;
};

template <>
struct ::hf3fs::serde::SerdeMethod<::hf3fs::storage::ChunkId> {
  static std::string_view serdeTo(const storage::ChunkId &chunkId) { return chunkId.data(); }
  static Result<storage::ChunkId> serdeFrom(std::string_view str) { return storage::ChunkId(str); }
  static std::string serdeToReadable(const storage::ChunkId &chunkId) { return chunkId.describe(); };
  static Result<storage::ChunkId> serdeFromReadable(std::string_view s) { return storage::ChunkId::fromString(s); }
}

这样在被遍历到的时候就能自动调用特化后的模板类去序列化反序列化了。

总之大概是这么回事,喜欢研究 C++ 模板的读者可以下载代码细细品读后现代 C++ 代码。

最终,在需要将请求发送到网络的时候,会调用下面的代码序列化到二进制 Buffer(src/common/net/WriteItem.h):

class SerdeBuffer {
  template <serde::SerdeType T, class Allocator = net::Allocator<4_KB, 1024, 64 * 1024>>
  static auto create(const T &packet, const CoreRequestOptions &options) {
    serde::Out<DownwardBytes<Allocator>> out;
    // 这里就将类型序列化到 Buffer 里了
    serde::serialize(packet, out);
    MessageHeader header;
    header.size = out.bytes().size();
    out.bytes().append(&header, sizeof(header));
    out.bytes().reserve(header.size + kMessageHeaderSize + sizeof(SerdeBuffer));  // for headroom.

    uint32_t offset;
    uint32_t capacity;
    std::unique_ptr<SerdeBuffer, Deleter<Allocator>> ptr{
        reinterpret_cast<SerdeBuffer *>(out.bytes().release(offset, capacity))};
    ptr->headroom_ = offset;  // size of headroom.
    ptr->capacity_ = capacity;
    // ... 
  }
};

调用的逻辑是:

template <IsBinaryOut T>
class Out<T> {
  // 最终就是将 value 不断拷贝到 buffer 里,并处理嵌套 table
};

反序列化也是差不多的逻辑,调用起点是 deserialize 函数,从 In<T> 读 Buffer。反序列化还稍微复杂一些,不过分析的方法大同小异。

RPC 服务

RPC 服务定义

src/common/serde/Service.h 提供了宏 SERDE_SERVICE 定义一个 RPC 服务,注册其名字和服务 ID(全局唯一),并提供宏 SERDE_SERVICE_METHOD 定义 RPC 服务方法,注册其名字和出参入参,以及方法 ID(服务内唯一)。客户端和服务器都要包含这个宏声明的服务以及其接口,从而能正确地调用 RPC 方法。

SERDE_SERVICE_METHOD 首先是定义了调用的 Stub,然后还是和结构体序列化采用了类似的模板技巧,通过不断重载 CollectField 不停将方法元信息添加到 tuple 里。这里的 FieldInfo 会实例化为 MethodInfo

#define SERDE_SERVICE_METHOD_REFL(NAME, ID, REQ, RSP)                                                 
 private:                                                                                             \
  struct MethodId##ID : std::type_identity<REFL_NOW> {};                                              \
  static auto CollectField(::hf3fs::refl::Rank<std::tuple_size_v<typename MethodId##ID::type> + 1>) { \
    if constexpr (std::is_void_v<T>) {                                                                \
      return ::hf3fs::refl::Append_t<typename MethodId##ID::type,                                     \
                                     ::hf3fs::serde::MethodInfo<#NAME, T, REQ, RSP, ID, nullptr>>{};  \
    } else {                                                                                          \
      return ::hf3fs::refl::Append_t<typename MethodId##ID::type,                                     \
                                     ::hf3fs::serde::MethodInfo<#NAME, T, REQ, RSP, ID, &T::NAME>>{}; \
    }                                                                                                 \
  }                                                                                                   \
  friend struct ::hf3fs::refl::Helper
 private:                                                                                             \
  struct MethodId##ID : std::type_identity<REFL_NOW> {};                                              \
  static auto CollectField(::hf3fs::refl::Rank<std::tuple_size_v<typename MethodId##ID::type> + 1>) { \
    if constexpr (std::is_void_v<T>) {                                                                \
      return ::hf3fs::refl::Append_t<typename MethodId##ID::type,                                     \
                                     ::hf3fs::serde::MethodInfo<#NAME, T, REQ, RSP, ID, nullptr>>{};  \
    } else {                                                                                          \
      return ::hf3fs::refl::Append_t<typename MethodId##ID::type,                                     \
                                     ::hf3fs::serde::MethodInfo<#NAME, T, REQ, RSP, ID, &T::NAME>>{}; \
    }                                                                                                 \
  }                                                                                                   \
  friend struct ::hf3fs::refl::Helper

RPC 网络实现

RPC 请求通过 Transport 接收和发送,ServiceGroup 起到一个注册服务的作用,最终都是由 Processor 处理。CallContext 是服务端保存一次请求上下文的类,客户端侧是 ClientCallContext

RPC 服务端

服务器收请求的逻辑位于 Transport::handleEvents,最终会一路调用 Transport::doReadSocket::recv,读取收到的缓冲区,解析 RPC 请求,然后包装成 MessageWrapper 发送到 ioWorker_.processMsg,然后调用 processor_.processMsg 进入真正的 Dispatch 和处理 MessagePacket 逻辑。MessagePacket 是通用的 RPC 请求包,包含请求 ID、服务 ID、方法 ID、PayLoad 等通用字段。

RPC 服务实现方,需要包含用 SERDE_SERVICE 声明的 RPC 服务接口。继承 serde::ServiceWrapper 类后,实现名字一样的 RPC 处理逻辑即可,不需要开发者自己处理网络请求解析了。

class StorageService : public serde::ServiceWrapper<StorageService, storage::StorageSerde> {
   CoTryTask<BatchReadRsp> batchRead(serde::CallContext &ctx, const BatchReadReq &req);
   CoTryTask<WriteRsp> write(serde::CallContext &ctx, const BatchReadReq &req) 
};

最终在 Server 初始化时,会调用到 addService 注册服务:

template <class Service>
  Result<Void> addService(std::unique_ptr<Service> &&obj, bool isRDMA) {
    std::shared_ptr<Service> shared = std::move(obj);
    for (auto i = 0u; i <= uint32_t(isRDMA); ++i) {
      auto &service = services_[i][Service::kServiceID];
      if (UNLIKELY(service.object != nullptr)) {
        return makeError(StatusCode::kInvalidArg, fmt::format("redundant service id: {}", Service::kServiceID));
      }
      service.getter = &MethodExtractor<Service, CallContext, &CallContext::invalidId>::get;
      service.object = shared.get();
      service.alive = std::shared_ptr<void *>(shared, nullptr);
      if constexpr (requires { Service{}.onError(Status::OK); }) {
        service.onError = &CallContext::customOnError<Service, &Service::onError>;
      }
    }
    return Void{};
  }

其中:

service.getter = &MethodExtractor<Service, CallContext, &CallContext::invalidId>::get

是根据方法 ID 分派 RPC 服务执行逻辑的关键。负责解析并处理请求的类是 src/common/serde/Processor.h 中的类:

class Processor {
  CoTask<void> processSerdeRequest(IOBufPtr buf, serde::MessagePacket<> packet, TransportPtr tr) {
    // decrease the count in any case.
    auto guard = folly::makeGuard([&] { flags_ -= kCountInc; });

    (void)buf;  // keep alive.
    auto &service = serdeServices_.getServiceById(packet.serviceId, tr->isRDMA());
    serde::CallContext ctx(packet, std::move(tr), service);
    if (packet.useCompress()) {
      ctx.responseOptions().compression = {config_.response_compression_level(),
                                           config_.response_compression_threshold()};
    }
    co_await ctx.handle();
  }
};

CallContext 中的 handle 便会调用 MethodExtractor 根据方法 ID 找到对应的方法,然后调用它:

class CallContext {
  CoTask<void> handle() {
    auto method = service_.getter(packet_.methodId);
    co_await (this->*method)();
  }
};
template <class T, class C, auto DEFAULT = nullptr>
class MethodExtractor {
 public:
  static auto get(uint16_t id) {
    constexpr MethodExtractor ins;
    return id <= ins.kMaxThreadId ? ins.table[id] : DEFAULT;
  }

 protected:
  consteval MethodExtractor() {
    for (uint16_t i = 0; i <= kMaxThreadId; ++i) {
      table[i] = calc(i);
    }
  }

  template <size_t I = 0>
  consteval auto calc(uint16_t id) {
    if constexpr (I == std::tuple_size_v<FieldInfoList>) {
      return DEFAULT;
    } else {
      using FieldInfo = std::tuple_element_t<I, FieldInfoList>;
      return FieldInfo::id == id ? &C::template call<FieldInfo> : calc<I + 1>(id);
    }
  }

 private:
  using FieldInfoList = refl::Helper::FieldInfoList<T>;
  using Method = decltype(&C::template call<std::tuple_element_t<0, FieldInfoList>>);
  static constexpr auto kMaxThreadId = MaxMethodId<FieldInfoList>;
  std::array<Method, kMaxThreadId + 1> table;
};

这里 MethodExtractor 找到的其实是对应的 FieldInfo (实际上实例化为 MethodInfo)类型,然后实例化之后展开最终调用的是方法 ID 对应的 &C::template call<MethodInfo> 。需要注意:MethodInfo 是完全在编译时就计算好的,MethodExtractor 也是在编译时就计算好每个方法 ID 对应的 MethodInfo 了,这时候 table 存的已经是函数指针了。

template <NameWrapper NAME, class T, class REQ, class RSP, uint16_t ID, auto METHOD>
struct MethodInfo {
  static constexpr auto nameWrapper = NAME;
  static constexpr std::string_view name = NAME;
  using Object = T;
  using ReqType = REQ;
  using RspType = RSP;
  static constexpr auto id = ID;
  static constexpr auto method = METHOD;
};

// 虽然叫 call
// 其实是被 RPC call 的一方会调用这个函数
template <class F>
  CoTask<void> call() {
    // deserialize payload.
    if (packet_.timestamp) {
      packet_.timestamp->serverWaked = UtcClock::now().time_since_epoch().count();
    }
    typename F::ReqType req;
    // 反序列化
    auto deserializeResult = serde::deserialize(req, packet_.payload);
    if (UNLIKELY(!deserializeResult)) {
      onDeserializeFailed();
      co_return;
    }

    // call method.
    // 调用方法 ID 对应的服务实现
    auto obj = reinterpret_cast<typename F::Object *>(service_.object);
    auto result = co_await folly::coro::co_awaitTry((obj->*F::method)(*this, req));
    if (UNLIKELY(result.hasException())) {
      XLOGF(FATAL,
            "Processor has exception: {}, request {}:{} {}",
            result.exception().what(),
            packet_.serviceId,
            packet_.methodId,
            serde::toJsonString(req));
      co_return;
    }
    if (packet_.timestamp) {
      packet_.timestamp->serverProcessed = UtcClock::now().time_since_epoch().count();
    }
    // 返回结果
    // 调用 Transport::send
    makeResponse(result.value());
    co_return;
  }

服务器的逻辑大致就是这样。

RPC 客户端

客户端的 Stub 定义通过 SERDE_SERVICE_METHOD_SENDER 定义,就是把模板参数和函数名简单填进宏里调用 ClientCallContextcall 方法。

客户端逻辑主要是 ClientCallContext 实现,模板没有服务端那么套娃,直接就能看出调用的是这段代码,模板参数意义也很明确了:

template <NameWrapper kServiceName,
            NameWrapper kMethodName,
            class Req,
            class Rsp,
            uint16_t ServiceID,
            uint16_t MethodID>
  CoTryTask<Rsp> call(const Req &req,
                      const net::UserRequestOptions *customOptions = nullptr,
                      Timestamp *timestamp = nullptr) {
    auto options = *options_.load(std::memory_order_acquire);
    if (customOptions != nullptr) {
      options.merge(*customOptions);
    }

    net::Waiter::Item item;
    uint64_t uuid = net::Waiter::instance().bind(item);

    Timestamp ts;
    if ((options.logLongRunningThreshold != 0_ms || options.reportMetrics) && timestamp == nullptr) {
      timestamp = &ts;
    }

    MessagePacket packet(req);
    packet.uuid = uuid;
    packet.serviceId = ServiceID;
    packet.methodId = MethodID;
    packet.flags = EssentialFlags::IsReq;
    if (options.compression) {
      packet.flags |= EssentialFlags::UseCompress;
    }
    if (options.enableRDMAControl) {
      packet.flags |= EssentialFlags::ControlRDMA;
    }
    if (timestamp != nullptr) {
      packet.timestamp = Timestamp{UtcClock::now().time_since_epoch().count()};
    }

    // 序列化
    auto writeItem = net::WriteItem::createMessage(packet, options);
    writeItem->uuid = uuid;
    auto requestLength = writeItem->buf->length();
    // 发出去
    if (LIKELY(std::holds_alternative<net::IOWorker *>(connectionSource_))) {
      std::get<net::IOWorker *>(connectionSource_)->sendAsync(destAddr_, net::WriteList(std::move(writeItem)));
    } else if (std::holds_alternative<net::Transport *>(connectionSource_)) {
      std::get<net::Transport *>(connectionSource_)->send(net::WriteList(std::move(writeItem)));
    } else {
      co_return MAKE_ERROR_F(StatusCode::kFoundBug,
                             "Sync client call async method: service id {}, method id {}",
                             ServiceID,
                             MethodID);
    }

    // 看到我们的老朋友 Baton 了吗
    net::Waiter::instance().schedule(uuid, options.timeout);
    co_await item.baton;

    if (UNLIKELY(!item.status)) {
      // ...
    }

    Result<Rsp> rsp = makeError(StatusCode::kUnknown);
    auto deserializeResult = serde::deserialize(rsp, item.packet.payload);
    if (UNLIKELY(!deserializeResult)) {
      XLOGF(ERR, "deserialize rsp error: {}", deserializeResult.error());
      if (item.transport) {
        item.transport->invalidate();
      }
      co_return makeError(std::move(deserializeResult.error()));
    }
    // ...
    co_return rsp;
  }

客户端先生成一个请求 UUID(64 位整数,方便好使不占地,也有去重的作用),然后根据要调用的方法,序列化出一个请求,发出去。这时候可不能占着线程不放,得通过 co_await 切出去。收到服务器响应后,Waiter 根据 UUID 再把对应的协程捞出来,baton.post() 一下继续调度执行,解析响应返回结果。

动态协程池

3FS 的 RPC 框架还有一个功能就是可以根据方法分发到不同的协程池去运行逻辑,实现更精确地调度:

  inline DynamicCoroutinesPool &getCoroutinesPool(uint16_t methodId) {
    if (LIKELY(config.use_coroutines_pool_read()) && methodId == StorageSerde<>::batchReadMethodId) {
      return readPool;
    }
    if (LIKELY(config.use_coroutines_pool_update()) &&
        (methodId == StorageSerde<>::writeMethodId || methodId == StorageSerde<>::updateMethodId)) {
      return updatePool;
    }
    if (methodId == StorageSerde<>::syncStartMethodId || methodId == StorageSerde<>::getAllChunkMetadataMethodId) {
      return syncPool;
    }
    return defaultPool;
  }

BatchRead 请求调用分析

以 BatchRead 为例,看 RPC 和 RDMA 单边怎么结合的:

  1. 客户端发送 BatchRead 请求,指定一批要读的 Chunk 数据偏移长度请求列表,以及自己准备好的一堆接收区 Buffer
  2. 服务器收到请求,解包处理
  3. 服务器分配相应数量的 Local RDMA Buffer,调用 AIO 批量读到对应的 Local RDMA Buffer
  4. 如果客户端要求限流,服务器反向发起 RPC 请求客户端的 RDMAControl 服务,客户端根据自身负载在 RPC 处理逻辑中 wait 一段时间
  5. 限流放行返回 RPC 响应后,服务器批量 RDMA Write 到客户端的缓冲区列表对应的 Buffer 中
  6. 服务器 RDMA Write 成功后,返回 RPC 成功
  7. 客户端收到 RPC 响应,知道缓冲区数据已经就绪,美美处理缓冲区中的数据

总结

3FS 的 RPC 框架由于大量使用了模板技巧和宏导致代码极其抽象后现代,可阅读性极差,编译时间和内存爆炸。不过通过人肉展开宏和模板递归后还是能找到一些蛛丝马迹的。总的来说就是用 C++ 宏和模板实现了一套带 RPC 功能的 Protobuf。如果你也想在代码里炫技一下反射或者类似的功能,这个代码还是值得学习一下的,就是后面读的人多少得痛苦一下。

读的我很心累,不想多写什么评价了。不知道 C++ 26 的反射能不能帮助简化下代码。