DeepSeek 3FS 源码解读——客户端篇

文件系统客户端作为应用或者用户访问文件系统的入口,其实现方式决定了用户最终实际能体验到的性能上限,客户端面对的情况更为复杂,代码量也不容小觑。为了充分发挥后端性能,3FS 的客户端部分经过精心设计和实现。

在了解 3FS 实现前,先介绍下并行文件系统的客户端实现方式。客户端根据实现方式和访问协议的不同,大致可以分为以下几类:

  • 内核模块:将文件系统的访问逻辑实现为 POSIX 标准内核模块,用户只需要在使用时通过 modprobe 命令将客户端内核模块加载到内核中,然后通过 mount 指令指定选项或者配置文件挂载到 VFS 系统中,即可像使用本地文件系统一样访问远端的网络并行文件系统,常用的 lsvim 等工具也可以直接访问文件系统。例如 Ceph、BeeGFS、Lustre 等文件系统使用了这种方式。如果文件系统支持 NFS 或者 SMB 协议,也可以归类为这种。
  • SDK:文件系统的访问逻辑以单独的 SDK 提供,用户程序需要完全修改源代码并编译链接 SDK,显式使用 SDK 的 API 来访问文件系统,其他常用的程序例如 ls 无法直接访问,需要通过专用命令行工具或客户端。例如 HDFS、盘古等文件系统使用了这种方式。
  • 动态库/系统调用拦截:通过 LD_PRELOAD 或者修改加载器的方式在应用启动加载客户端动态库,抢占 openread 等函数符号(原始的符号也会保存到另外的变量中),在应用调用 read 等函数时,会调用到文件系统的动态库实现的函数中,从而实现访问远端网络文件系统,适合无法修改源代码的应用程序使用。例如 Intel DAOS 或者一些学术研究文件系统使用了这种方式。
  • FUSE:将文件系统的访问逻辑实现到用户态守护进程,并通过内核 fuse 模块将文件系统以及 mount 指令挂载到内核 VFS 中,和内核模块文件系统客户端有同样的使用效果。例如 OSSFS、BeeGFS FUSE、Ceph FUSE、SeaweedFS 等文件系统使用了这种方式。

以上几种客户端实现方式从使用兼容性、开发难度、访问性能等各有优劣:

实现方式 兼容性 开发难度 性能
内核模块 好,兼容 POSIX 高,内核态代码调试维护困难 中,需要经过 VFS 和一次内核态用户态内存拷贝
SDK 差,需要修改应用源码 低,提供文件系统原生 API 即可,开发复杂度交给用户了 高,和用户程序在同一地址空间,可以实现 0 拷贝
动态库/系统调用拦截 中,不需要修改应用源码,但只适合动态加载 C 库的程序 中,需要穷举可能被调用的 C 接口,编译产物为动态库,加载时机难确定;不走挂载点,需要库实现区分目录挂载在什么文件系统下的逻辑 高,和用户程序在同一地址空间,可以实现 0 拷贝
FUSE 好,兼容 POSIX 低,和实现一个服务端程序差不多 低,需要经过 VFS 和两次内核态用户态内核拷贝

3FS 混合使用了 FUSE 和 SDK 模式:为性能不敏感的应用程序提供了 FUSE 方式访问,对有能力修改源码的应用程序提供了 USRBIO(UserSpace Ring Based IO)的类 SDK 接入访问实现 User Space Kernel Bypass IO,但元数据操作仍然走 FUSE 接口。

FUSE 实现

3FS FUSE 部分大量复用了 RPC 篇中提到的客户端实现,以使用协程操作完成 FUSE 守护进程和 3FS 服务端的通信,如果还不了解 3FS 的 RPC 实现原理,可以先阅读:DeepSeek 3FS 源码解读——RPC 篇

而 FUSE 客户端部分只需要照着 FUSE 规范实现一个符合 POSIX 接口的用户态守护进程即可。简单来说守护进程只需要链接 libfuse,设置好初始化参数,通过 fuse_session_mount 将文件系统挂载到指定挂载点中,然后进入 fuse_session_loop(多线程是 fuse_session_loop_mt) FUSE 库便会负责接收来自内核的 IO 请求,然后调用初始化时指定的函数回调指针调用 3FS 的 FUSE 实现。

const fuse_lowlevel_ops hf3fs_oper = {
    // 节选部分 POSIX 接口...
    .mknod = hf3fs_mknod,
    .mkdir = hf3fs_mkdir,
    .unlink = hf3fs_unlink,
    .rmdir = hf3fs_rmdir,
    .symlink = hf3fs_symlink,
    .rename = hf3fs_rename,
    .link = hf3fs_link,
    .open = hf3fs_open,
    .read = hf3fs_read,
    .write = hf3fs_write,
    // ...
};

// 启动守护进程主循环
int fuseMainLoop(const String &programName,
                 bool allowOther,
                 const String &mountpoint,
                 size_t maxbufsize,
                 const String &clusterId) {
  auto &d = getFuseClientsInstance();
  // 就是 hf3fs_oper
  const auto &ops = getFuseOps();

  std::stack<std::function<void()>> onStopHooks;
  SCOPE_EXIT {
    while (!onStopHooks.empty()) {
      onStopHooks.top()();
      onStopHooks.pop();
    }
  };

  // 解析命令行参数
  struct fuse_args args = FUSE_ARGS_INIT((int)fuseArgsPtr.size(), fuseArgsPtr.data());
  // struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
  struct fuse_cmdline_opts opts;
  struct fuse_loop_config *config = fuse_loop_cfg_create();
  SCOPE_EXIT { fuse_loop_cfg_destroy(config); };

  if (fuse_parse_cmdline(&args, &opts) != 0) {
    return 1;
  }

  // 初始化 FUSE 会话,设置 POSIX 操作对应的实现
  d.se = fuse_session_new(&args, &ops, sizeof(ops), NULL);
  if (d.se == nullptr) {
    return 1;
  }
  onStopHooks.push([&] { fuse_session_destroy(d.se); });

  if (fuse_set_signal_handlers(d.se) != 0) {
    return 1;
  }
  onStopHooks.push([&] { fuse_remove_signal_handlers(d.se); });

  if (fuse_session_mount(d.se, opts.mountpoint) != 0) {
    return 1;
  }

  // 进程结束回调,unmount 文件系统
  onStopHooks.push([&] { fuse_session_unmount(d.se); });

  int ret = -1;
  // 主循环
  if (opts.singlethread) {
    ret = fuse_session_loop(d.se);
  } else {
    fuse_loop_cfg_set_clone_fd(config, opts.clone_fd);
    fuse_loop_cfg_set_idle_threads(config, d.maxIdleThreads);
    fuse_loop_cfg_set_max_threads(config, d.maxThreads);
    ret = fuse_session_loop_mt(d.se, config);
  }

  return ret ? 1 : 0;
}

值得一提的是 3FS 的 FUSE 守护进程同样使用了 C++ 20 协程来完成网络通信。然而 FUSE 本身是 C 库,也不支持异步接口,因此这里存在一个协程到同步操作的转换:

template <typename Awaitable>
auto withRequestInfo(fuse_req_t req, Awaitable &&awaitable) {
  auto guard = RequestInfo::set(req);
  return folly::coro::blockingWait(std::forward<Awaitable>(awaitable));
}

FUSE 的请求在经过一系列检查和翻译成 3FS 请求后,会调用对应的协程函数,并使用 folly::coro::blockingWait 阻塞等待完成,例如打开文件的操作:

void hf3fs_open(fuse_req_t req, fuse_ino_t fino, struct fuse_file_info *fi) {
  auto userInfo = UserInfo(flat::Uid(fuse_req_ctx(req)->uid), flat::Gid(fuse_req_ctx(req)->gid), d.fuseToken);

  // 判断是不是在只读挂载上打开可写文件了
  if ((fi->flags & O_WRONLY) || (fi->flags & O_RDWR) || (fi->flags & O_CREAT) || (fi->flags & O_EXCL) ||
      (fi->flags & O_TRUNC) || (fi->flags & O_APPEND)) {
    if (d.userConfig.getConfig(userInfo).readonly()) {
      fuse_reply_err(req, EROFS);
      return;
    }
  }

  auto ino = real_ino(fino);
  auto ptr = inodeOf(*fi, ino);

  Uuid session;
  if ((fi->flags & O_ACCMODE) == O_WRONLY || (fi->flags & O_ACCMODE) == O_RDWR) {
    session = meta::client::SessionId::random();
    // 阻塞同步调用 MetaClient 的 RPC 协程函数
    // 这里的 ino 是 FUSE 联合 VFS 调用 Lookup 函数查到的
    auto res = withRequestInfo(req, d.metaClient->open(userInfo, ino, std::nullopt, session, fi->flags));
    if (res.hasError()) {
      handle_error(req, res);
      return;
    }
  }

  // O_DIRECT open means direc io
  // read cached disabled && not O_NONBLOCK open (for mmap) means direct io too
  fi->direct_io =
      (fi->flags & O_DIRECT) || (!d.userConfig.getConfig(userInfo).enable_read_cache() && !(fi->flags & O_NONBLOCK))
          ? 1
          : 0;

  fi->fh = (uintptr_t)(new FileHandle{ptr, (bool)(fi->flags & O_DIRECT), session});
  fuse_reply_open(req, fi);
}

这里跟读代码的话会发现其调用了 MetaClient 或者 StorageMessengerStorageClient 来和 3FS 服务器通信,不过一层套一层最后还是调用了 serde 类完成 RPC 操作。还不了解 RPC 实现的读者可以参考阅读 DeepSeek 3FS 源码解读——RPC 篇,只需要关注带 Serde 字样的类即可。客户端 RPC 和服务端 RPC 类似都是通过线程池完成的。简单梳理一下这里同步和异步的转换关系:

  1. 客户端调用 POSIX 系统调用陷入内核态 VFS,此处用户线程阻塞
  2. VFS 塞 IO 请求到 FUSE 队列中,唤醒阻塞在 fuse 循环中的守护进程的线程
  3. libfuse 从队列中取 IO 请求,执行 ops 对应的函数指针
  4. 函数指针执行 RPC 请求协程发起 meta 或 storage RPC call,此时 RPC 被调度到网络 IO 线程池,fuse 循环线程因 blockingWait 同步阻塞在 co_await baton
  5. 收到响应后,fuse 循环线程被唤醒,处理 RPC 响应,并通过 fuse_reply_* 系列函数通知内核 FUSE 模块对应的 IO 请求已完成,写回结果
  6. 内核 FUSE 模块找到 IO 请求对应的用户线程,唤醒用户线程,用户线程恢复执行,处理 IO 结果

RDMA 的 Buffer 由 FUSE Daemon 通过内存池分配。其他的具体的实现方法接口和逻辑细节都比较多,这里不深入解析了,对实现感兴趣的读者可以自行顺着 FuseOps.cc 中的若干 POSIX 接口开始跟读源码。另由于接入了 VFS 层,其实 FUSE 可以复用例如 Page Cache 等内核文件系统缓存机制或者 aio 等机制,当然也可以禁用。这里也比较复杂,也不再展开了。

另,一些复杂的元数据操作,例如 rm -rf,通过单独的一个 symlink 文件实现,具体分析可参考:https://mp.weixin.qq.com/s/X60PsEPeFsb-ZPKATMrWrA(文中关于 Ior 的机制不太准确,请见下文)

USRBIO 机制

从上面的分析可以看到 FUSE 操作基本都会同步阻塞线程,且单次 IO 每方向涉及到 2 次内核用户态切换以及 1~2 次内核态和用户态的内存拷贝,性能一般。为了提升读写性能和延迟,3FS 提供了 USRBIO API,可以理解为 3FS SDK,对读写性能有要求,希望实现 Zero Copy 的应用程序可以通过接入 SDK 来提升读写性能。

通过环形 SQ CQ 队列 + Pinned Memory DMA 实现 Zero Context Switch + Zero Copy IO 是目前高性能 IO 软件常用的技术方案,例如 RDMA、DPDK、io_uring、SPDK 都采用了这种设计。USRBIO 就是在这种思想下设计出来的技术方案。

抽象地来说,Ring Based IO 涉及以下要素:

  1. 工作线程和应用线程,工作线程负责处理从应用线程提交的 IO 请求,应用线程负责处理 IO 完成事件。
  2. Submission Queue(SQ,提交队列)和 Completion Queue(CQ,完成队列),一对有界环形队列,队列中的元素分别为 SQE 和 CQE,分别用于描述 IO 请求和 IO 完成事件。工作线程轮询 SQ 获取 SQE,执行 IO 请求,读写 SQE 中指定的 Buffer,将完成状态 CQE 写入 CQ;用户线程将 IO 请求写入 SQ,并轮询 CQ 获取 CQE。
  3. Pinned Memory Buffers,工作线程和应用线程都可读写的共享内存。用于存放 IO 数据,在 IO 操作完成前不可读写、不可释放。IO 请求中包含的读写区域必须是这里面的内存区域。
  4. 可选:Submission Bell 和 Completion Bell,前者用于通知 IO 工作线程有新的 IO 任务,后者用于通知应用线程有新的 IO 完成事件。采用这种机制可以实现类似事件通知的机制,避免轮询消耗 CPU。

具体到 3FS,它提出了 Ior 和 Iov 的抽象,Ior 就是一个 IO Ring,只包含一些管理用的元数据属性;而 Iov 是一块共享内存的抽象,既提供了 Ior 的 SQ、CQ 共享内存管理,也提供了 Zero Copy IO Buffer 管理,总体的实现框架和关系是:

  1. 用户程序可以创建多个不同优先级的 Ior,每个 Ior 要么只负责读操作,要么只负责写操作;每一个 Ior 的实际队列存放在一段 Ior-Iov 共享内存,也就是一组 SQ、CQ;以及多个 Non-Ior Iov 文件用作 IO Buffer(实际上也是 IPC 共享内存)
    • Non-Ior Iov 文件用来作为读写数据的 pinned memory buffer,通过 UUID 索引
      • 用户提前规划好需要使用的总容量
      • 文件创建之后 FUSE Daemon 将对应内存注册成 RDMA memory buffer,进而实现整个链路的零拷贝
      • Iov 带有 Size、NUMA Node ID 等属性
    • Ior Iov 文件用来实现 IoRing
      • 用户提前规划好队列上界
      • 在整个内存区域上抽象出了提交队列和完成队列,具体布局参考上图,代码定义参考 src/fuse/IoRing.h 中的 IoRing
      • SQE 间接引用了 RingSection 中的项描述 IO 的目的 Buffer,Userdata 即 epoll 中的 ev.data.ptr 或者 liburing 中的 io_uring_sqe_set_data,用来找回调/协程抓手的
      • 内存的尾部是提交完成队列的 IPC 信号量,FUSE Daemon 在处理完 IO 后通过这个信号量通知到用户进程,唤醒 hf3fs_wait_for_ios 线程
  2. 一个挂载点的所有 Ior 共享 3 个 submit IPC sem
    • 这三个 sem 作为 IO 提交事件的信号量(submit sem),每一个 sem 代表一个优先级,数字越小优先级越高
    • 一旦某个 USRBIO 实例有 IO 需要提交,会通过这些信号量通知到 FUSE Daemon
    • USRBIO 处理完成 CQE 之后也通过这些信号量通知 FUSE Daemon CQ 有空余了
  3. 所有的共享内存文件在挂载点 3fs-virt/iovs/ 目录下均建有 symlink,指向 /dev/shm 下的对应文件

篇幅限制,Ior 和 Iov 的具体管理逻辑这里先不贴代码分析了,感兴趣的读者可以参考阅读蚂蚁存储团队对 Ior 和 Iov 管理代码的阅读分析:https://mp.weixin.qq.com/s/sPkqOdVA3qBAUiMQltveoQ

用户侧 USRBIO SDK 分析

用户侧的 USRBIO SDK API 实现位于 src/lib。先看看官方使用示例:

#include <hf3fs_usrbio.h>

constexpr uint64_t NUM_IOS = 1024;
constexpr uint64_t BLOCK_SIZE = (32 << 20);

int main() {
    struct hf3fs_ior ior;
    // 对应 io_uring 的 io_uring_queue_init_params
    // 注意一个 Ior 要么只读要么只写,可以创建多个 Ior
    hf3fs_iorcreate4(&ior, "/hf3fs/mount/point", NUM_IOS, true, 0, 0, -1, 0);

    struct hf3fs_iov iov;
    // 对应 io_uring 的 io_uring_register_buffers
    // 可以创建多个 Iov
    hf3fs_iovcreate(&iov, "/hf3fs/mount/point", NUM_IOS * BLOCK_SIZE, 0, -1);

    int fd = open("/hf3fs/mount/point/example.bin", O_RDONLY);
    // 对应 io_uring 的 io_uring_register_files
    hf3fs_reg_fd(fd, 0);

    for (int i = 0; i < NUM_IOS; i++) {
        // 对应 io_uring 的 io_uring_prep_{read,write}
        // 最后一个参数 nullptr 实际上就是用户自定义的上下文指针,会在 CQE 原封不动传回来
        hf3fs_prep_io(&ior, &iov, true, iov.base + i * BLOCK_SIZE, fd, i * BLOCK_SIZE, BLOCK_SIZE, nullptr);
    }

    // 下面三行对应 io_uring 的 io_uring_submit_and_wait
    hf3fs_submit_ios(&ior);

    hf3fs_cqe cqes[NUM_IOS];
    hf3fs_wait_for_ios(&ior, cqes, NUM_IOS, NUM_IOS, nullptr);
    // 这里可以根据每个 CQE 中的 userdata 取回上下文,执行一些用户自定义的逻辑

    // 对应 io_uring 一系列释放资源操作
    hf3fs_dereg_fd(fd);
    close(fd);
    hf3fs_iovdestroy(&iov);
    hf3fs_iordestroy(&ior);

    return 0;
}

需要注意的是,提交 IO 和等待 IO 完成可以不在同一个线程,但同时只能有一个线程提交 IO 和一个线程等待 IO 完成。

USRBIO SDK 主要分几个功能:

  1. Ring + Pinned Memory Buffer 管理
  2. 非 IO 关键的 Meta 操作卸载为 ioctl 调用
  3. IO 关键(如 openclose)的 Meta 操作走 VFS FUSE
  4. 读写 IO 构造请求往 SQ 塞 SQE 通知 FUSE Daemon,并等 FUSE Daemon 通知从 CQ 取 CQE

3FS 并没有实现 Polling Mode 的 Ring Based IO,而是通过 IPC 信号量来唤醒进程,这里可能是出于节省 CPU 和攒 Batch 考虑。要改成 Polling Mode 也不太困难,轮询队列的 head/tail 就可以了。

FUSE Daemon 侧 USRBIO 分析

FUSE Daemon 承担了 IoRing 的工作线程任务,在 FUSE Daemon 启动时,会启动 IoRing Workers(带取消机制,详见 https://sf-zhou.github.io/coroutine/folly_coro_cancellation.html) 和 IoRing Watchers(用于接收信号量唤醒):

// src/fuse/FuseClients.cc
Result<Void> FuseClients::init(...) {
  // ...
  // M:N 调度协程到线程池中
  auto &tp = client->tpg().bgThreadPool();
  auto coros = fuseConfig.batch_io_coros();
  for (int i = 0; i < coros; ++i) {
    auto exec = &tp.get(i % tp.size());
    // 此处可以在 Fuse 被关闭的时候结束执行中的请求
    // 取消机制原理详见:https://sf-zhou.github.io/coroutine/folly_coro_cancellation.html
    co_withCancellation(cancelIos.getToken(), ioRingWorker(i, coros)).scheduleOn(exec).start();
  }

  // 每个信号量一个 Watcher 线程
  ioWatches.reserve(3);
  for (int i = 0; i < 3; ++i) {
    ioWatches.emplace_back(folly::partial(&FuseClients::watch, this, i));
  }

  // ...
}

Watcher 实现比较简单,就是等信号量,然后从 IoRing 共享内存取一批 SQE,封装成一个 Job 塞入工作队列:

void FuseClients::watch(int prio, std::stop_token stop) {
  while (!stop.stop_requested()) {
    struct timespec ts;
    if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
      continue;
    }

    auto nsec = ts.tv_nsec + jitter.load().count();
    ts.tv_nsec = nsec % 1000000000;
    ts.tv_sec += nsec / 1000000000;
    if (sem_timedwait(iors.sems[prio].get(), &ts) < 0 && errno == ETIMEDOUT) {
      continue;
    }

    auto gotJobs = false;
    do {
      gotJobs = false;

      auto n = iors.ioRings->slots.nextAvail.load();
      for (int i = 0; i < n; ++i) {
        auto ior = iors.ioRings->table[i].load();

        if (ior && ior->priority == prio) {
          auto jobs = ior->jobsToProc(config->max_jobs_per_ioring());
          for (auto &&job : jobs) {
            gotJobs = true;
            iojqs[prio]->enqueue(std::move(job));
          }
        }
      }
    } while (gotJobs);  // loop till we found no more jobs and then block in the next iter
  }
}

工作协程从工作队列取 Job 完成一批批的 SQE 操作,取任务的逻辑比较复杂,这里简单总结下:

  1. 根据自己的协程序号确定自己应该处理哪个优先级的 IO 请求,协程序号小的协程对应处理高优先级的任务,反之亦然
  2. 从自己优先级任务队列取任务执行;队列为空的话,低优先级的协程也可取高优先级的任务队列中的任务执行,但执行一次后下次就不会 Steal 任务避免自己队列饿死

执行任务的起点是:

CoTask<Void> FuseClients::ioRingWorker(int, int) {
  // For Loop, 从队列取任务, 一个 Job 可能处理多个 SQE!
  job = co_await iojqs[prio]->co_dequeue();
  co_await job.ior->process(job.sqeProcTail,
                                  job.toProc,
                                  *storageClient,
                                  config->storage_io(),
                                  userConfig,
                                  std::move(lookupFiles),
                                  std::move(lookupBufs));
  // 如果高优队列满或是 Steal 的任务则唤醒对应 Ring
  if (iojqs[0]->full() || job.ior->priority != prio) {
          sem_post(iors.sems[job.ior->priority].get());  // wake the watchers
        } else {
          // 顺便检查当前 SQ 有没有新的任务,减少回到等待信号量的次数
          auto jobs = job.ior->jobsToProc(1);
          if (!jobs.empty()) {
            job = jobs.front();
            if (!iojqs[0]->try_enqueue(job)) {
              continue;
            }
          }
        }
}

执行任务的逻辑也比较关键和复杂,这里也不贴代码直接总结下:

  1. 调用 lookupFiles 获取这批 IO 涉及到的 Inode
  2. 调用 lookupBufs 获取这批 IO 涉及到的 Buffer Iov
  3. 根据 Ior 的读/写类型,往 PIoV 中塞入构造好的读写请求
  4. 调用 PIoV 的 executeRead 或者 executeWrite 执行读写请求。对于读和写分别有不同的并行策略,详见后面分析
  5. 收集执行结果,构造 CQE,写入 Ior 的 CQ 中,Post cqeSem 唤醒用户进程

读写并行策略分析

对于并行读写,它们在一些逻辑上是相同的:

// 并行读
CoTryTask<void> StorageClientImpl::batchReadWithoutRetry(ClientRequestContext &requestCtx,
                                                         const std::vector<ReadIO *> &readIOs,
                                                         const flat::UserInfo &userInfo,
                                                         const ReadOptions &options) {
  // collect target/chain infos
  TargetSelectionOptions targetSelectionOptions = options.targetSelection();
  targetSelectionOptions.set_mode(options.targetSelection().mode() == TargetSelectionMode::Default
                                      ? TargetSelectionMode::LoadBalance
                                      : options.targetSelection().mode());

  std::shared_ptr<hf3fs::client::RoutingInfo const> routingInfo = getCurrentRoutingInfo();
  auto targetedOps = selectRoutingTargetForOps(requestCtx, routingInfo, targetSelectionOptions, readIOs);

  // select storage target for each IO and group by node id

  auto batches = groupOpsByNodeId(requestCtx,
                                  targetedOps,
                                  config_.traffic_control().read().max_batch_size(),
                                  config_.traffic_control().read().max_batch_bytes(),
                                  config_.traffic_control().read().random_shuffle_requests());
// ...
}

// 并行写
CoTryTask<void> StorageClientImpl::batchWriteWithoutRetry(ClientRequestContext &requestCtx,
                                                          const std::vector<WriteIO *> &writeIOs,
                                                          const flat::UserInfo &userInfo,
                                                          const WriteOptions &options) {
  // collect target/chain infos

  TargetSelectionOptions targetSelectionOptions = options.targetSelection();
  targetSelectionOptions.set_mode(options.targetSelection().mode() == TargetSelectionMode::Default
                                      ? TargetSelectionMode::HeadTarget
                                      : options.targetSelection().mode());

  std::shared_ptr<hf3fs::client::RoutingInfo const> routingInfo = getCurrentRoutingInfo();
  auto targetedIOs = selectRoutingTargetForOps(requestCtx, routingInfo, targetSelectionOptions, writeIOs);

  // select storage target for each IO and group by node id

  auto batches = groupOpsByNodeId(requestCtx,
                                  targetedIOs,
                                  config_.traffic_control().write().max_batch_size(),
                                  config_.traffic_control().write().max_batch_bytes(),
                                  config_.traffic_control().write().random_shuffle_requests());
// ...
}
  1. 根据 IO 请求的目的 Chunk 和 Chain 决定 IO 请求应该发往哪个 Storage Server;写操作必须选复制链中的头节点;读操作可以按照各种负载均衡策略任意选择链中的节点,充分聚合利用各 Storage Server 的读带宽
  2. 根据 1 中的决策,将 IO 请求分别按照目的 Storage Server 聚合起来,并在每个 Server 内再进行一次分批聚合,将请求聚合成请求 Batch
  3. 根据配置文件串行执行/并发执行 Batches,此时根据单个 Storage Server 并发收到的请求数和单个 Storage Client 总的并发发送请求数进行限流

Batch 间可以根据配置文件串行或者调用 collectAllRange 并发执行:

template <typename Op, typename Ops = std::vector<Op *>>
CoTask<void> processBatches(const std::vector<std::pair<NodeId, Ops>> &batches, auto &&func, bool parallel) {
  std::vector<CoTask<bool>> tasks;
  if (parallel) tasks.reserve(batches.size());

  for (size_t index = 0; index < batches.size(); index++) {
    const auto &[nodeId, ops] = batches[index];
    if (!ops.empty()) {
      if (parallel) {
        tasks.push_back(func(index, nodeId, ops));
      } else {
        co_await func(index, nodeId, ops);
      }
    }
  }

  if (parallel) co_await folly::coro::collectAllRange(std::move(tasks));
}

// StorageClientImpl::batchReadWithoutRetry
  bool parallelProcessing = config_.traffic_control().read().process_batches_in_parallel();
  co_await processBatches<ReadIO>(batches, sendReq, parallelProcessing);

// StorageClientImpl::batchWriteWithoutRetry
  bool parallelProcessing = config_.traffic_control().write().process_batches_in_parallel();
  co_await processBatches<WriteIO>(batches, sendReq, parallelProcessing);

然而在具体执行读写操作时,它们实际发往 Storage Server 的请求会根据读写不同而有所不同。

并行读策略分析

对于并行读请求,会将每一批的请求,组装成一个单独的 BatchRead 请求:


// 组装 BatchRead 请求
template <>
typename hf3fs::storage::BatchReadReq buildBatchRequest(const ClientRequestContext &requestCtx,
                                                        const ClientId &clientId,
                                                        std::atomic_uint64_t &nextRequestId,
                                                        const StorageClient::Config &config,
                                                        const ReadOptions &options,
                                                        const flat::UserInfo &userInfo,
                                                        const std::vector<ReadIO *> &ops) {
  std::vector<hf3fs::storage::ReadIO> payloads;
  payloads.reserve(ops.size());
  size_t requestedBytes = 0;
  auto requestTagSet = monitor::instanceTagSet("batchRead");
  auto tagged_bytes_per_operation = bytes_per_operation.getRecoderWithTag(requestTagSet);

  hf3fs::storage::RequestId requestId(nextRequestId.fetch_add(1));
  hf3fs::storage::MessageTag tag{clientId, requestId};

  for (auto &op : ops) {
    hf3fs::storage::GlobalKey key{op->routingTarget.getVersionedChainId(), op->chunkId};

    size_t offset = op->data - op->buffer->data();
    auto iobuf = op->buffer->subrange(offset, op->length);

    requestedBytes += op->length;
    tagged_bytes_per_operation->addSample(op->length);

    op->requestId = requestId;
    payloads.push_back({op->offset, op->length, std::move(key), iobuf.toRemoteBuf()});
  }

  // 设置一些 metrics 和请求 Flag

  return hf3fs::storage::BatchReadReq{std::move(payloads),
                                      tag,
                                      requestCtx.retryCount,
                                      userInfo,
                                      featureFlags,
                                      checksumType,
                                      requestCtx.debugFlags};
}

// StorageClientImpl::batchReadWithoutRetry 实际执行的请求逻辑 
auto batchReq = buildBatchRequest<ReadIO, BatchReadReq>(requestCtx,
                                                            clientId_,
                                                            nextRequestId_,
                                                            config_,
                                                            options,
                                                            userInfo,
                                                            batchIOs);

auto response =
        co_await sendBatchRequest<ReadIO, BatchReadReq, BatchReadRsp, &StorageMessenger::batchRead>(messenger_,
                                                                                                    requestCtx,
                                                                                                    routingInfo,
                                                                                                    nodeId,
                                                                                                    batchReq,
                                                                                                    batchIOs);

就是简单地将一个 Batch 中的 ReadIO 组装到一个 BatchRead 请求中。

并行写策略分析

写只有 Batch 间的并发,Batch 内串行执行:

// StorageClientImpl::batchWriteWithoutRetry 中实际执行的逻辑
if (!allocateChannelsForOps(chanAllocator_, batchIOs, false /*reallocate*/)) {
      XLOGF(WARN,
            "Cannot allocate channel ids for {} write IOs, first IO {}",
            batchIOs.size(),
            fmt::ptr(batchIOs.front()));
      setErrorCodeOfOps(batchIOs, StorageClientCode::kResourceBusy);
      co_return false;
    }

    // log the waiting time before communication starts
    requestCtx.logWaitingTime();

    auto statusCode =
        co_await sendWriteRequestsSequentially(requestCtx, batchIOs, routingInfo, nodeId, userInfo, options);

总结

3FS 的客户端实现复杂程度不亚于其服务端,主要复杂在 FUSE 和 USRBIO 的 IPC、FUSE 本身对于 IO 请求的批处理逻辑。尽管从设计思想看客户端的实现思路比较简洁,但其中涉及到大量的跨进程、跨线程同步,以及各种提升性能的技巧,以及多个 Ior 和 Iov 的管理,实现细节非常繁琐。

个人感觉 FUSE Daemon 在此处起到了类似 Agent 的作用,收敛了网络线程避免创建太多 QP 和提供集中式元数据缓存减少 Meta RPC。未来有机会扩展成 P2P 的客户端数据缓存,支持 BatchRead 从另外的客户端内存中读取数据,进一步释放读性能。

另外,篇幅限制本文还有许多设计实现讨论未能涉及,例如还没提及 3FS 还对 USRBIO 封装了 Python 接口,还没有分析选择 Target 的详细策略等,对更多细节感兴趣的读者可以继续阅读蚂蚁存储团队的文章:https://mp.weixin.qq.com/s/sPkqOdVA3qBAUiMQltveoQ ,他们也在其中讨论了客户端设计的考量和权衡