文件系统客户端作为应用或者用户访问文件系统的入口,其实现方式决定了用户最终实际能体验到的性能上限,客户端面对的情况更为复杂,代码量也不容小觑。为了充分发挥后端性能,3FS 的客户端部分经过精心设计和实现。
在了解 3FS 实现前,先介绍下并行文件系统的客户端实现方式。客户端根据实现方式和访问协议的不同,大致可以分为以下几类:
- 内核模块:将文件系统的访问逻辑实现为 POSIX 标准内核模块,用户只需要在使用时通过
modprobe
命令将客户端内核模块加载到内核中,然后通过mount
指令指定选项或者配置文件挂载到 VFS 系统中,即可像使用本地文件系统一样访问远端的网络并行文件系统,常用的ls
和vim
等工具也可以直接访问文件系统。例如 Ceph、BeeGFS、Lustre 等文件系统使用了这种方式。如果文件系统支持 NFS 或者 SMB 协议,也可以归类为这种。 - SDK:文件系统的访问逻辑以单独的 SDK 提供,用户程序需要完全修改源代码并编译链接 SDK,显式使用 SDK 的 API 来访问文件系统,其他常用的程序例如
ls
无法直接访问,需要通过专用命令行工具或客户端。例如 HDFS、盘古等文件系统使用了这种方式。 - 动态库/系统调用拦截:通过
LD_PRELOAD
或者修改加载器的方式在应用启动加载客户端动态库,抢占open
、read
等函数符号(原始的符号也会保存到另外的变量中),在应用调用read
等函数时,会调用到文件系统的动态库实现的函数中,从而实现访问远端网络文件系统,适合无法修改源代码的应用程序使用。例如 Intel DAOS 或者一些学术研究文件系统使用了这种方式。 - FUSE:将文件系统的访问逻辑实现到用户态守护进程,并通过内核
fuse
模块将文件系统以及mount
指令挂载到内核 VFS 中,和内核模块文件系统客户端有同样的使用效果。例如 OSSFS、BeeGFS FUSE、Ceph FUSE、SeaweedFS 等文件系统使用了这种方式。
以上几种客户端实现方式从使用兼容性、开发难度、访问性能等各有优劣:
实现方式 | 兼容性 | 开发难度 | 性能 |
---|---|---|---|
内核模块 | 好,兼容 POSIX | 高,内核态代码调试维护困难 | 中,需要经过 VFS 和一次内核态用户态内存拷贝 |
SDK | 差,需要修改应用源码 | 低,提供文件系统原生 API 即可,开发复杂度交给用户了 | 高,和用户程序在同一地址空间,可以实现 0 拷贝 |
动态库/系统调用拦截 | 中,不需要修改应用源码,但只适合动态加载 C 库的程序 | 中,需要穷举可能被调用的 C 接口,编译产物为动态库,加载时机难确定;不走挂载点,需要库实现区分目录挂载在什么文件系统下的逻辑 | 高,和用户程序在同一地址空间,可以实现 0 拷贝 |
FUSE | 好,兼容 POSIX | 低,和实现一个服务端程序差不多 | 低,需要经过 VFS 和两次内核态用户态内核拷贝 |
3FS 混合使用了 FUSE 和 SDK 模式:为性能不敏感的应用程序提供了 FUSE 方式访问,对有能力修改源码的应用程序提供了 USRBIO(UserSpace Ring Based IO)的类 SDK 接入访问实现 User Space Kernel Bypass IO,但元数据操作仍然走 FUSE 接口。
FUSE 实现
3FS FUSE 部分大量复用了 RPC 篇中提到的客户端实现,以使用协程操作完成 FUSE 守护进程和 3FS 服务端的通信,如果还不了解 3FS 的 RPC 实现原理,可以先阅读:DeepSeek 3FS 源码解读——RPC 篇
而 FUSE 客户端部分只需要照着 FUSE 规范实现一个符合 POSIX 接口的用户态守护进程即可。简单来说守护进程只需要链接 libfuse,设置好初始化参数,通过 fuse_session_mount
将文件系统挂载到指定挂载点中,然后进入 fuse_session_loop
(多线程是 fuse_session_loop_mt
) FUSE 库便会负责接收来自内核的 IO 请求,然后调用初始化时指定的函数回调指针调用 3FS 的 FUSE 实现。
const fuse_lowlevel_ops hf3fs_oper = {
// 节选部分 POSIX 接口...
.mknod = hf3fs_mknod,
.mkdir = hf3fs_mkdir,
.unlink = hf3fs_unlink,
.rmdir = hf3fs_rmdir,
.symlink = hf3fs_symlink,
.rename = hf3fs_rename,
.link = hf3fs_link,
.open = hf3fs_open,
.read = hf3fs_read,
.write = hf3fs_write,
// ...
};
// 启动守护进程主循环
int fuseMainLoop(const String &programName,
bool allowOther,
const String &mountpoint,
size_t maxbufsize,
const String &clusterId) {
auto &d = getFuseClientsInstance();
// 就是 hf3fs_oper
const auto &ops = getFuseOps();
std::stack<std::function<void()>> onStopHooks;
SCOPE_EXIT {
while (!onStopHooks.empty()) {
onStopHooks.top()();
onStopHooks.pop();
}
};
// 解析命令行参数
struct fuse_args args = FUSE_ARGS_INIT((int)fuseArgsPtr.size(), fuseArgsPtr.data());
// struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
struct fuse_cmdline_opts opts;
struct fuse_loop_config *config = fuse_loop_cfg_create();
SCOPE_EXIT { fuse_loop_cfg_destroy(config); };
if (fuse_parse_cmdline(&args, &opts) != 0) {
return 1;
}
// 初始化 FUSE 会话,设置 POSIX 操作对应的实现
d.se = fuse_session_new(&args, &ops, sizeof(ops), NULL);
if (d.se == nullptr) {
return 1;
}
onStopHooks.push([&] { fuse_session_destroy(d.se); });
if (fuse_set_signal_handlers(d.se) != 0) {
return 1;
}
onStopHooks.push([&] { fuse_remove_signal_handlers(d.se); });
if (fuse_session_mount(d.se, opts.mountpoint) != 0) {
return 1;
}
// 进程结束回调,unmount 文件系统
onStopHooks.push([&] { fuse_session_unmount(d.se); });
int ret = -1;
// 主循环
if (opts.singlethread) {
ret = fuse_session_loop(d.se);
} else {
fuse_loop_cfg_set_clone_fd(config, opts.clone_fd);
fuse_loop_cfg_set_idle_threads(config, d.maxIdleThreads);
fuse_loop_cfg_set_max_threads(config, d.maxThreads);
ret = fuse_session_loop_mt(d.se, config);
}
return ret ? 1 : 0;
}
值得一提的是 3FS 的 FUSE 守护进程同样使用了 C++ 20 协程来完成网络通信。然而 FUSE 本身是 C 库,也不支持异步接口,因此这里存在一个协程到同步操作的转换:
template <typename Awaitable>
auto withRequestInfo(fuse_req_t req, Awaitable &&awaitable) {
auto guard = RequestInfo::set(req);
return folly::coro::blockingWait(std::forward<Awaitable>(awaitable));
}
FUSE 的请求在经过一系列检查和翻译成 3FS 请求后,会调用对应的协程函数,并使用 folly::coro::blockingWait
阻塞等待完成,例如打开文件的操作:
void hf3fs_open(fuse_req_t req, fuse_ino_t fino, struct fuse_file_info *fi) {
auto userInfo = UserInfo(flat::Uid(fuse_req_ctx(req)->uid), flat::Gid(fuse_req_ctx(req)->gid), d.fuseToken);
// 判断是不是在只读挂载上打开可写文件了
if ((fi->flags & O_WRONLY) || (fi->flags & O_RDWR) || (fi->flags & O_CREAT) || (fi->flags & O_EXCL) ||
(fi->flags & O_TRUNC) || (fi->flags & O_APPEND)) {
if (d.userConfig.getConfig(userInfo).readonly()) {
fuse_reply_err(req, EROFS);
return;
}
}
auto ino = real_ino(fino);
auto ptr = inodeOf(*fi, ino);
Uuid session;
if ((fi->flags & O_ACCMODE) == O_WRONLY || (fi->flags & O_ACCMODE) == O_RDWR) {
session = meta::client::SessionId::random();
// 阻塞同步调用 MetaClient 的 RPC 协程函数
// 这里的 ino 是 FUSE 联合 VFS 调用 Lookup 函数查到的
auto res = withRequestInfo(req, d.metaClient->open(userInfo, ino, std::nullopt, session, fi->flags));
if (res.hasError()) {
handle_error(req, res);
return;
}
}
// O_DIRECT open means direc io
// read cached disabled && not O_NONBLOCK open (for mmap) means direct io too
fi->direct_io =
(fi->flags & O_DIRECT) || (!d.userConfig.getConfig(userInfo).enable_read_cache() && !(fi->flags & O_NONBLOCK))
? 1
: 0;
fi->fh = (uintptr_t)(new FileHandle{ptr, (bool)(fi->flags & O_DIRECT), session});
fuse_reply_open(req, fi);
}
这里跟读代码的话会发现其调用了 MetaClient
或者 StorageMessenger
、StorageClient
来和 3FS 服务器通信,不过一层套一层最后还是调用了 serde
类完成 RPC 操作。还不了解 RPC 实现的读者可以参考阅读 DeepSeek 3FS 源码解读——RPC 篇,只需要关注带 Serde
字样的类即可。客户端 RPC 和服务端 RPC 类似都是通过线程池完成的。简单梳理一下这里同步和异步的转换关系:
- 客户端调用 POSIX 系统调用陷入内核态 VFS,此处用户线程阻塞
- VFS 塞 IO 请求到 FUSE 队列中,唤醒阻塞在 fuse 循环中的守护进程的线程
libfuse
从队列中取 IO 请求,执行ops
对应的函数指针- 函数指针执行 RPC 请求协程发起 meta 或 storage RPC call,此时 RPC 被调度到网络 IO 线程池,fuse 循环线程因
blockingWait
同步阻塞在co_await baton
- 收到响应后,fuse 循环线程被唤醒,处理 RPC 响应,并通过
fuse_reply_*
系列函数通知内核 FUSE 模块对应的 IO 请求已完成,写回结果 - 内核 FUSE 模块找到 IO 请求对应的用户线程,唤醒用户线程,用户线程恢复执行,处理 IO 结果
RDMA 的 Buffer 由 FUSE Daemon 通过内存池分配。其他的具体的实现方法接口和逻辑细节都比较多,这里不深入解析了,对实现感兴趣的读者可以自行顺着 FuseOps.cc
中的若干 POSIX 接口开始跟读源码。另由于接入了 VFS 层,其实 FUSE 可以复用例如 Page Cache 等内核文件系统缓存机制或者 aio 等机制,当然也可以禁用。这里也比较复杂,也不再展开了。
另,一些复杂的元数据操作,例如 rm -rf
,通过单独的一个 symlink
文件实现,具体分析可参考:https://mp.weixin.qq.com/s/X60PsEPeFsb-ZPKATMrWrA(文中关于 Ior 的机制不太准确,请见下文)
USRBIO 机制
从上面的分析可以看到 FUSE 操作基本都会同步阻塞线程,且单次 IO 每方向涉及到 2 次内核用户态切换以及 1~2 次内核态和用户态的内存拷贝,性能一般。为了提升读写性能和延迟,3FS 提供了 USRBIO API,可以理解为 3FS SDK,对读写性能有要求,希望实现 Zero Copy 的应用程序可以通过接入 SDK 来提升读写性能。
通过环形 SQ CQ 队列 + Pinned Memory DMA 实现 Zero Context Switch + Zero Copy IO 是目前高性能 IO 软件常用的技术方案,例如 RDMA、DPDK、io_uring、SPDK 都采用了这种设计。USRBIO 就是在这种思想下设计出来的技术方案。
抽象地来说,Ring Based IO 涉及以下要素:
- 工作线程和应用线程,工作线程负责处理从应用线程提交的 IO 请求,应用线程负责处理 IO 完成事件。
- Submission Queue(SQ,提交队列)和 Completion Queue(CQ,完成队列),一对有界环形队列,队列中的元素分别为 SQE 和 CQE,分别用于描述 IO 请求和 IO 完成事件。工作线程轮询 SQ 获取 SQE,执行 IO 请求,读写 SQE 中指定的 Buffer,将完成状态 CQE 写入 CQ;用户线程将 IO 请求写入 SQ,并轮询 CQ 获取 CQE。
- Pinned Memory Buffers,工作线程和应用线程都可读写的共享内存。用于存放 IO 数据,在 IO 操作完成前不可读写、不可释放。IO 请求中包含的读写区域必须是这里面的内存区域。
- 可选:Submission Bell 和 Completion Bell,前者用于通知 IO 工作线程有新的 IO 任务,后者用于通知应用线程有新的 IO 完成事件。采用这种机制可以实现类似事件通知的机制,避免轮询消耗 CPU。
具体到 3FS,它提出了 Ior 和 Iov 的抽象,Ior 就是一个 IO Ring,只包含一些管理用的元数据属性;而 Iov 是一块共享内存的抽象,既提供了 Ior 的 SQ、CQ 共享内存管理,也提供了 Zero Copy IO Buffer 管理,总体的实现框架和关系是:
- 用户程序可以创建多个不同优先级的 Ior,每个 Ior 要么只负责读操作,要么只负责写操作;每一个 Ior 的实际队列存放在一段 Ior-Iov 共享内存,也就是一组 SQ、CQ;以及多个 Non-Ior Iov 文件用作 IO Buffer(实际上也是 IPC 共享内存)
- Non-Ior Iov 文件用来作为读写数据的 pinned memory buffer,通过 UUID 索引
- 用户提前规划好需要使用的总容量
- 文件创建之后 FUSE Daemon 将对应内存注册成 RDMA memory buffer,进而实现整个链路的零拷贝
- Iov 带有 Size、NUMA Node ID 等属性
- Ior Iov 文件用来实现 IoRing
- 用户提前规划好队列上界
- 在整个内存区域上抽象出了提交队列和完成队列,具体布局参考上图,代码定义参考
src/fuse/IoRing.h
中的IoRing
类 - SQE 间接引用了 RingSection 中的项描述 IO 的目的 Buffer,Userdata 即
epoll
中的ev.data.ptr
或者liburing
中的io_uring_sqe_set_data
,用来找回调/协程抓手的 - 内存的尾部是提交完成队列的 IPC 信号量,FUSE Daemon 在处理完 IO 后通过这个信号量通知到用户进程,唤醒
hf3fs_wait_for_ios
线程
- Non-Ior Iov 文件用来作为读写数据的 pinned memory buffer,通过 UUID 索引
- 一个挂载点的所有 Ior 共享 3 个 submit IPC sem
- 这三个 sem 作为 IO 提交事件的信号量(submit sem),每一个 sem 代表一个优先级,数字越小优先级越高
- 一旦某个 USRBIO 实例有 IO 需要提交,会通过这些信号量通知到 FUSE Daemon
- USRBIO 处理完成 CQE 之后也通过这些信号量通知 FUSE Daemon CQ 有空余了
- 所有的共享内存文件在挂载点
3fs-virt/iovs/
目录下均建有 symlink,指向/dev/shm
下的对应文件
篇幅限制,Ior 和 Iov 的具体管理逻辑这里先不贴代码分析了,感兴趣的读者可以参考阅读蚂蚁存储团队对 Ior 和 Iov 管理代码的阅读分析:https://mp.weixin.qq.com/s/sPkqOdVA3qBAUiMQltveoQ
用户侧 USRBIO SDK 分析
用户侧的 USRBIO SDK API 实现位于 src/lib
。先看看官方使用示例:
#include <hf3fs_usrbio.h>
constexpr uint64_t NUM_IOS = 1024;
constexpr uint64_t BLOCK_SIZE = (32 << 20);
int main() {
struct hf3fs_ior ior;
// 对应 io_uring 的 io_uring_queue_init_params
// 注意一个 Ior 要么只读要么只写,可以创建多个 Ior
hf3fs_iorcreate4(&ior, "/hf3fs/mount/point", NUM_IOS, true, 0, 0, -1, 0);
struct hf3fs_iov iov;
// 对应 io_uring 的 io_uring_register_buffers
// 可以创建多个 Iov
hf3fs_iovcreate(&iov, "/hf3fs/mount/point", NUM_IOS * BLOCK_SIZE, 0, -1);
int fd = open("/hf3fs/mount/point/example.bin", O_RDONLY);
// 对应 io_uring 的 io_uring_register_files
hf3fs_reg_fd(fd, 0);
for (int i = 0; i < NUM_IOS; i++) {
// 对应 io_uring 的 io_uring_prep_{read,write}
// 最后一个参数 nullptr 实际上就是用户自定义的上下文指针,会在 CQE 原封不动传回来
hf3fs_prep_io(&ior, &iov, true, iov.base + i * BLOCK_SIZE, fd, i * BLOCK_SIZE, BLOCK_SIZE, nullptr);
}
// 下面三行对应 io_uring 的 io_uring_submit_and_wait
hf3fs_submit_ios(&ior);
hf3fs_cqe cqes[NUM_IOS];
hf3fs_wait_for_ios(&ior, cqes, NUM_IOS, NUM_IOS, nullptr);
// 这里可以根据每个 CQE 中的 userdata 取回上下文,执行一些用户自定义的逻辑
// 对应 io_uring 一系列释放资源操作
hf3fs_dereg_fd(fd);
close(fd);
hf3fs_iovdestroy(&iov);
hf3fs_iordestroy(&ior);
return 0;
}
需要注意的是,提交 IO 和等待 IO 完成可以不在同一个线程,但同时只能有一个线程提交 IO 和一个线程等待 IO 完成。
USRBIO SDK 主要分几个功能:
- Ring + Pinned Memory Buffer 管理
- 非 IO 关键的 Meta 操作卸载为
ioctl
调用 - IO 关键(如
open
、close
)的 Meta 操作走 VFS FUSE - 读写 IO 构造请求往 SQ 塞 SQE 通知 FUSE Daemon,并等 FUSE Daemon 通知从 CQ 取 CQE
3FS 并没有实现 Polling Mode 的 Ring Based IO,而是通过 IPC 信号量来唤醒进程,这里可能是出于节省 CPU 和攒 Batch 考虑。要改成 Polling Mode 也不太困难,轮询队列的 head/tail 就可以了。
FUSE Daemon 侧 USRBIO 分析
FUSE Daemon 承担了 IoRing 的工作线程任务,在 FUSE Daemon 启动时,会启动 IoRing Workers(带取消机制,详见 https://sf-zhou.github.io/coroutine/folly_coro_cancellation.html) 和 IoRing Watchers(用于接收信号量唤醒):
// src/fuse/FuseClients.cc
Result<Void> FuseClients::init(...) {
// ...
// M:N 调度协程到线程池中
auto &tp = client->tpg().bgThreadPool();
auto coros = fuseConfig.batch_io_coros();
for (int i = 0; i < coros; ++i) {
auto exec = &tp.get(i % tp.size());
// 此处可以在 Fuse 被关闭的时候结束执行中的请求
// 取消机制原理详见:https://sf-zhou.github.io/coroutine/folly_coro_cancellation.html
co_withCancellation(cancelIos.getToken(), ioRingWorker(i, coros)).scheduleOn(exec).start();
}
// 每个信号量一个 Watcher 线程
ioWatches.reserve(3);
for (int i = 0; i < 3; ++i) {
ioWatches.emplace_back(folly::partial(&FuseClients::watch, this, i));
}
// ...
}
Watcher 实现比较简单,就是等信号量,然后从 IoRing 共享内存取一批 SQE,封装成一个 Job 塞入工作队列:
void FuseClients::watch(int prio, std::stop_token stop) {
while (!stop.stop_requested()) {
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
continue;
}
auto nsec = ts.tv_nsec + jitter.load().count();
ts.tv_nsec = nsec % 1000000000;
ts.tv_sec += nsec / 1000000000;
if (sem_timedwait(iors.sems[prio].get(), &ts) < 0 && errno == ETIMEDOUT) {
continue;
}
auto gotJobs = false;
do {
gotJobs = false;
auto n = iors.ioRings->slots.nextAvail.load();
for (int i = 0; i < n; ++i) {
auto ior = iors.ioRings->table[i].load();
if (ior && ior->priority == prio) {
auto jobs = ior->jobsToProc(config->max_jobs_per_ioring());
for (auto &&job : jobs) {
gotJobs = true;
iojqs[prio]->enqueue(std::move(job));
}
}
}
} while (gotJobs); // loop till we found no more jobs and then block in the next iter
}
}
工作协程从工作队列取 Job 完成一批批的 SQE 操作,取任务的逻辑比较复杂,这里简单总结下:
- 根据自己的协程序号确定自己应该处理哪个优先级的 IO 请求,协程序号小的协程对应处理高优先级的任务,反之亦然
- 从自己优先级任务队列取任务执行;队列为空的话,低优先级的协程也可取高优先级的任务队列中的任务执行,但执行一次后下次就不会 Steal 任务避免自己队列饿死
执行任务的起点是:
CoTask<Void> FuseClients::ioRingWorker(int, int) {
// For Loop, 从队列取任务, 一个 Job 可能处理多个 SQE!
job = co_await iojqs[prio]->co_dequeue();
co_await job.ior->process(job.sqeProcTail,
job.toProc,
*storageClient,
config->storage_io(),
userConfig,
std::move(lookupFiles),
std::move(lookupBufs));
// 如果高优队列满或是 Steal 的任务则唤醒对应 Ring
if (iojqs[0]->full() || job.ior->priority != prio) {
sem_post(iors.sems[job.ior->priority].get()); // wake the watchers
} else {
// 顺便检查当前 SQ 有没有新的任务,减少回到等待信号量的次数
auto jobs = job.ior->jobsToProc(1);
if (!jobs.empty()) {
job = jobs.front();
if (!iojqs[0]->try_enqueue(job)) {
continue;
}
}
}
}
执行任务的逻辑也比较关键和复杂,这里也不贴代码直接总结下:
- 调用
lookupFiles
获取这批 IO 涉及到的 Inode - 调用
lookupBufs
获取这批 IO 涉及到的 Buffer Iov - 根据 Ior 的读/写类型,往 PIoV 中塞入构造好的读写请求
- 调用 PIoV 的
executeRead
或者executeWrite
执行读写请求。对于读和写分别有不同的并行策略,详见后面分析 - 收集执行结果,构造 CQE,写入 Ior 的 CQ 中,Post cqeSem 唤醒用户进程
读写并行策略分析
对于并行读写,它们在一些逻辑上是相同的:
// 并行读
CoTryTask<void> StorageClientImpl::batchReadWithoutRetry(ClientRequestContext &requestCtx,
const std::vector<ReadIO *> &readIOs,
const flat::UserInfo &userInfo,
const ReadOptions &options) {
// collect target/chain infos
TargetSelectionOptions targetSelectionOptions = options.targetSelection();
targetSelectionOptions.set_mode(options.targetSelection().mode() == TargetSelectionMode::Default
? TargetSelectionMode::LoadBalance
: options.targetSelection().mode());
std::shared_ptr<hf3fs::client::RoutingInfo const> routingInfo = getCurrentRoutingInfo();
auto targetedOps = selectRoutingTargetForOps(requestCtx, routingInfo, targetSelectionOptions, readIOs);
// select storage target for each IO and group by node id
auto batches = groupOpsByNodeId(requestCtx,
targetedOps,
config_.traffic_control().read().max_batch_size(),
config_.traffic_control().read().max_batch_bytes(),
config_.traffic_control().read().random_shuffle_requests());
// ...
}
// 并行写
CoTryTask<void> StorageClientImpl::batchWriteWithoutRetry(ClientRequestContext &requestCtx,
const std::vector<WriteIO *> &writeIOs,
const flat::UserInfo &userInfo,
const WriteOptions &options) {
// collect target/chain infos
TargetSelectionOptions targetSelectionOptions = options.targetSelection();
targetSelectionOptions.set_mode(options.targetSelection().mode() == TargetSelectionMode::Default
? TargetSelectionMode::HeadTarget
: options.targetSelection().mode());
std::shared_ptr<hf3fs::client::RoutingInfo const> routingInfo = getCurrentRoutingInfo();
auto targetedIOs = selectRoutingTargetForOps(requestCtx, routingInfo, targetSelectionOptions, writeIOs);
// select storage target for each IO and group by node id
auto batches = groupOpsByNodeId(requestCtx,
targetedIOs,
config_.traffic_control().write().max_batch_size(),
config_.traffic_control().write().max_batch_bytes(),
config_.traffic_control().write().random_shuffle_requests());
// ...
}
- 根据 IO 请求的目的 Chunk 和 Chain 决定 IO 请求应该发往哪个 Storage Server;写操作必须选复制链中的头节点;读操作可以按照各种负载均衡策略任意选择链中的节点,充分聚合利用各 Storage Server 的读带宽
- 根据 1 中的决策,将 IO 请求分别按照目的 Storage Server 聚合起来,并在每个 Server 内再进行一次分批聚合,将请求聚合成请求 Batch
- 根据配置文件串行执行/并发执行 Batches,此时根据单个 Storage Server 并发收到的请求数和单个 Storage Client 总的并发发送请求数进行限流
Batch 间可以根据配置文件串行或者调用 collectAllRange
并发执行:
template <typename Op, typename Ops = std::vector<Op *>>
CoTask<void> processBatches(const std::vector<std::pair<NodeId, Ops>> &batches, auto &&func, bool parallel) {
std::vector<CoTask<bool>> tasks;
if (parallel) tasks.reserve(batches.size());
for (size_t index = 0; index < batches.size(); index++) {
const auto &[nodeId, ops] = batches[index];
if (!ops.empty()) {
if (parallel) {
tasks.push_back(func(index, nodeId, ops));
} else {
co_await func(index, nodeId, ops);
}
}
}
if (parallel) co_await folly::coro::collectAllRange(std::move(tasks));
}
// StorageClientImpl::batchReadWithoutRetry
bool parallelProcessing = config_.traffic_control().read().process_batches_in_parallel();
co_await processBatches<ReadIO>(batches, sendReq, parallelProcessing);
// StorageClientImpl::batchWriteWithoutRetry
bool parallelProcessing = config_.traffic_control().write().process_batches_in_parallel();
co_await processBatches<WriteIO>(batches, sendReq, parallelProcessing);
然而在具体执行读写操作时,它们实际发往 Storage Server 的请求会根据读写不同而有所不同。
并行读策略分析
对于并行读请求,会将每一批的请求,组装成一个单独的 BatchRead
请求:
// 组装 BatchRead 请求
template <>
typename hf3fs::storage::BatchReadReq buildBatchRequest(const ClientRequestContext &requestCtx,
const ClientId &clientId,
std::atomic_uint64_t &nextRequestId,
const StorageClient::Config &config,
const ReadOptions &options,
const flat::UserInfo &userInfo,
const std::vector<ReadIO *> &ops) {
std::vector<hf3fs::storage::ReadIO> payloads;
payloads.reserve(ops.size());
size_t requestedBytes = 0;
auto requestTagSet = monitor::instanceTagSet("batchRead");
auto tagged_bytes_per_operation = bytes_per_operation.getRecoderWithTag(requestTagSet);
hf3fs::storage::RequestId requestId(nextRequestId.fetch_add(1));
hf3fs::storage::MessageTag tag{clientId, requestId};
for (auto &op : ops) {
hf3fs::storage::GlobalKey key{op->routingTarget.getVersionedChainId(), op->chunkId};
size_t offset = op->data - op->buffer->data();
auto iobuf = op->buffer->subrange(offset, op->length);
requestedBytes += op->length;
tagged_bytes_per_operation->addSample(op->length);
op->requestId = requestId;
payloads.push_back({op->offset, op->length, std::move(key), iobuf.toRemoteBuf()});
}
// 设置一些 metrics 和请求 Flag
return hf3fs::storage::BatchReadReq{std::move(payloads),
tag,
requestCtx.retryCount,
userInfo,
featureFlags,
checksumType,
requestCtx.debugFlags};
}
// StorageClientImpl::batchReadWithoutRetry 实际执行的请求逻辑
auto batchReq = buildBatchRequest<ReadIO, BatchReadReq>(requestCtx,
clientId_,
nextRequestId_,
config_,
options,
userInfo,
batchIOs);
auto response =
co_await sendBatchRequest<ReadIO, BatchReadReq, BatchReadRsp, &StorageMessenger::batchRead>(messenger_,
requestCtx,
routingInfo,
nodeId,
batchReq,
batchIOs);
就是简单地将一个 Batch 中的 ReadIO 组装到一个 BatchRead
请求中。
并行写策略分析
写只有 Batch 间的并发,Batch 内串行执行:
// StorageClientImpl::batchWriteWithoutRetry 中实际执行的逻辑
if (!allocateChannelsForOps(chanAllocator_, batchIOs, false /*reallocate*/)) {
XLOGF(WARN,
"Cannot allocate channel ids for {} write IOs, first IO {}",
batchIOs.size(),
fmt::ptr(batchIOs.front()));
setErrorCodeOfOps(batchIOs, StorageClientCode::kResourceBusy);
co_return false;
}
// log the waiting time before communication starts
requestCtx.logWaitingTime();
auto statusCode =
co_await sendWriteRequestsSequentially(requestCtx, batchIOs, routingInfo, nodeId, userInfo, options);
总结
3FS 的客户端实现复杂程度不亚于其服务端,主要复杂在 FUSE 和 USRBIO 的 IPC、FUSE 本身对于 IO 请求的批处理逻辑。尽管从设计思想看客户端的实现思路比较简洁,但其中涉及到大量的跨进程、跨线程同步,以及各种提升性能的技巧,以及多个 Ior 和 Iov 的管理,实现细节非常繁琐。
个人感觉 FUSE Daemon 在此处起到了类似 Agent 的作用,收敛了网络线程避免创建太多 QP 和提供集中式元数据缓存减少 Meta RPC。未来有机会扩展成 P2P 的客户端数据缓存,支持 BatchRead 从另外的客户端内存中读取数据,进一步释放读性能。
另外,篇幅限制本文还有许多设计实现讨论未能涉及,例如还没提及 3FS 还对 USRBIO 封装了 Python 接口,还没有分析选择 Target 的详细策略等,对更多细节感兴趣的读者可以继续阅读蚂蚁存储团队的文章:https://mp.weixin.qq.com/s/sPkqOdVA3qBAUiMQltveoQ ,他们也在其中讨论了客户端设计的考量和权衡