书接上回,作为一个文件系统,磁盘 IO 也是非常重要的一环。在大致浏览了源码后,个人发现 3FS 中,发起磁盘 IO 的可能是前台服务客户端读写的线程、负责链式复制的线程、也有可能是后台的垃圾回收线程。同时,和网络 IO 充分协程化不太一样,磁盘 IO 基本是由线程池来完成同步 IO 操作,而且读 Chunk 实际数据使用了 AIO 来异步读取,但是写数据则调用了同步 IO。这也就意味着 3FS 没有采用目前高性能存储系统使用的 Run To Completion 模型。因此,我们需要分析一下,磁盘的 IO 是怎样调度,又是怎样将同步 IO 或者 AIO 和网络协程结合起来的,以及协程调度过程中线程切换点。
存储引擎
代码中一个比较吸引眼球的文件夹便是 src/storage/chunk_engine
中用 Rust 编写的 Chunk 存储引擎,简单地搜索后发现其中并没有包含 async
关键词,因此基本可以确定这部分的代码没有使用异步操作。另外,StorageTarget
中包含了这么一段代码:
inline bool useChunkEngine() const { return targetConfig_.only_chunk_engine; }
简单搜索此函数的调用方,可以发现,实际上 3FS 实现了两套 Chunk 存储引擎,一套是 Rust、另一套是 C++ 中的 ChunkStore
,并且 C++ 中的 Chunk 存储引擎,实现同样是基于同步阻塞的 IO 操作。
熟悉异步编程的开发者知道,在异步的协程中,是不可以调用同步阻塞的系统调用的,否则会造成协程所属的线程被阻塞,从而阻塞其他所有该线程上的协程。
那么,问题就变成了:3FS 在处理网络请求的协程中,是怎么做到对磁盘发起同步 IO 操作的?粗略分析后,个人认为 3FS 在写场景完全用了同步 IO;而读场景,Chunk 引擎同步读 Chunk 元数据后(在 aioPrepareRead
函数中),通过 AioReadJob
发起实际的异步读取 IO。
在 Rust 的设计文档中可以看到,在 Chunk 引擎中需要有两个后台线程来完成 Chunk 的一些后台管理:一个是 allocate_thread
负责预分配一些 Group
到内存中加速分配;而 compact_thread
负责整理离散的 Chunk 或者垃圾回收,尽可能将一些未利用满的 Group
中的数据重新整理,释放出 active_group
加速分配。对于这种多线程并发操作,利用 Rust 能很好地在编译期就检查出数据竞争问题,保证 Chunk 分配数据在内存中是正确的。
写路径
继续分析下写路径,可以发现写路径的起点是 StorageOperator::doUpdate
:
CoTask<IOResult> StorageOperator::doUpdate() {
// ...
UpdateJob job;
co_await updateWorker_.enqueue(&job);
co_await job.complete();
// ...
}
class UpdateJob {
void setResult(Result<uint32_t> result) {
result_.lengthInfo = std::move(result);
baton_.post();
}
CoTask<void> complete() const { co_await baton_; }
};
这几行就是打通网络协程和磁盘 IO 的关键代码了,这里和网络 IO 一样,还是用了 Baton
类来挂起和唤醒协程。下面看看 UpdateWorker
的相关实现代码:
CoTask<void> enqueue(UpdateJob *job) {
assert(job->target()->diskIndex() < queueVec_.size());
co_await queueVec_[job->target()->diskIndex()]->co_enqueue(job);
}
可以发现就是将 IO 任务提交到了磁盘对应的处理队列中:
template <typename U = T>
folly::coro::Task<void> co_enqueue(U &&item) {
co_await enqueueSemaphore_.co_wait();
queue_.writeIfNotFull(std::forward<U>(item));
dequeueSemaphore_.signal();
}
一个比较经典的有界阻塞队列实现,这里的队列 queue_
是一个 MPMC
多生产者多消费者有锁队列。因为已经通过信号量控制入队数量了,理论上这里不会发生阻塞。
在 UpdateWorker
中,启动了两个线程池用来处理 IO 任务,前台线程池用来处理任务队列,将其提交每个磁盘对应的后台 IO 线程:
Result<Void> UpdateWorker::start(uint32_t numberOfDisks) {
if (config_.num_threads() < numberOfDisks) {
return makeError(StatusCode::kInvalidConfig,
fmt::format("too few update worker threads, {} < {}", config_.num_threads(), numberOfDisks));
}
queueVec_.reserve(numberOfDisks);
for (auto i = 0u; i < numberOfDisks; ++i) {
queueVec_.emplace_back(std::make_unique<Queue>(config_.queue_size()));
}
for (auto i = 0u; i < config_.num_threads(); ++i) {
executors_.add([this, i] { run(*queueVec_[i % queueVec_.size()]); });
}
return Void{};
}
void UpdateWorker::run(Queue &queue) {
while (true) {
auto job = queue.dequeue();
if (UNLIKELY(job == nullptr)) {
XLOGF(DBG, "Storage worker {} stop...", fmt::ptr(this));
break;
}
job->target()->updateChunk(*job, bgExecutors_);
}
}
在 updateChunk
的实现中可以发现:
void StorageTarget::updateChunk(UpdateJob &job, folly::CPUThreadPoolExecutor &executor) {
if (job.type() == UpdateType::COMMIT) {
if (useChunkEngine()) {
job.setResult(ChunkEngine::commit(*engine_, job, config_.kv_store().sync_when_write()));
} else {
job.setResult(ChunkReplica::commit(chunkStore_, job));
}
} else {
auto result =
useChunkEngine() ? ChunkEngine::update(*engine_, job) : ChunkReplica::update(chunkStore_, job, executor);
if (LIKELY(result.hasValue())) {
// 一些 metrics 统计...
} else {
uint32_t code = result.error().code();
switch (code) {
// 同样是 metrics 统计...
}
}
// 注意这里设置了结果,也就 post 了 Baton,从而唤醒协程
// 特别注意,post 是在 IO 线程中调用的,最终也就是在 IO 线程
// 调用了 coroutine 的 resume 函数。
// 但是 folly 会保证一个 Task 中的 co_await 点都能正确调度回原来的 Executor 执行
/// Within the body of a Task's coroutine, executor binding to the parent
/// executor is maintained by implicitly transforming all 'co_await expr'
/// expressions into `co_await co_viaIfAsync(parentExecutor, expr)' to ensure
/// that the coroutine always resumes on the parent's executor.
job.setResult(std::move(result));
}
}
这里就是调用 Chunk 引擎落盘后,设置协程的结果并唤醒协程的逻辑了。
落盘相关的逻辑:
Rust(src/storage/chunk_engine/src/alloc/chunk.rs
):
impl Chunk {
// 关注这个函数,里面主要是数据的对齐之类的操作,最后调用的是 `pwrite` 调用
pub fn safe_write();
}
pub fn pwrite(&self, pos: Position, mut buf: &[u8], offset: u32) -> Result<()> {
let aligned = is_aligned_io(buf, offset);
let mut offset = pos.offset() + offset;
while !buf.is_empty() {
let fd = if aligned && is_aligned_len(buf.len() as u32) {
&self.direct_fd
} else {
&self.normal_fd
};
match fd.write_at(buf, offset.into()) {
Ok(0) => return Err(Error::IoError(format!("write {:?} return 0", fd))),
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
}
Err(e) => Self::handle_error(e)?,
}
}
Ok(())
}
C++(src/storage/store/ChunkFileView.cc
):
Result<uint32_t> ChunkFileView::write(const uint8_t *buf, size_t size, size_t offset, const ChunkMetadata &meta) {
// 同样是对齐操作等
ExponentialBackoffRetry retry(100_ms, 5_s, 30_s);
while (size > 0) {
int ret = ::pwrite(fd, buf, size, offset);
if (LIKELY(ret > 0)) {
w += ret;
buf += ret;
size -= ret;
offset += ret;
} else {
auto msg = fmt::format("write chunk file failed: fd {}, direct {}, buf {}, offset {}, size {}, ret {}, errno {}",
fd,
fd == direct_,
fmt::ptr(buf),
offset,
size,
ret,
errno);
XLOG(ERR, msg);
auto waitTime = retry.getWaitTime();
if (waitTime.count() == 0) {
return makeError(StorageCode::kChunkWriteFailed, std::move(msg));
}
std::this_thread::sleep_for(waitTime);
}
}
recordGuard.succ();
return w;
}
读路径
读取路径如上文所述,Chunk 引擎通过同步 IO 获取 Chunk 元数据,转换成需要实际发起 IO 操作的 fd
和偏移以及长度,最后提交到 AIO 或者 io_uring 异步读取:
具体来说,看代码在网络线程中构造了 BatchReadJob
,然后将其通过 MPMC 队列提交到线程池中,关键逻辑已添加注释,请留意阅读:
CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &requestCtx,
const BatchReadReq &req,
serde::CallContext &ctx) {
// 一些 metrics 统计...
auto snapshot = components_.targetMap.snapshot();
auto batchSize = req.payloads.size();
// 准备发起磁盘 BatchRead 工作
BatchReadRsp rsp;
rsp.results.resize(batchSize);
BatchReadJob batch(req.payloads, rsp.results, req.checksumType);
// 一些 metrics 统计...
size_t totalLength = 0;
size_t totalHeadLength = 0;
size_t totalTailLength = 0;
for (AioReadJobIterator it(&batch); it; it++) {
// get target for batch read, need check public and local state.
// 纯内存操作
}
// 一些 metrics 统计...
auto prepareBufferRecordGuard = storageReadPrepareBuffer.record();
auto buffer = components_.rdmabufPool.get();
for (AioReadJobIterator it(&batch); it; it++) {
// 分配一下 RDMA Buffer
}
prepareBufferRecordGuard.report(true);
if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_DISKIO)) {
// ...
} else {
auto recordGuard = storageAioEnqueueRecoder.record();
auto splitSize = config_.batch_read_job_split_size();
for (uint32_t start = 0; start < batchSize; start += splitSize) {
// 提交 ReadJob
// 同样通过 MPMC 队列提交
co_await components_.aioReadWorker.enqueue(AioReadJobIterator(&batch, start, splitSize));
}
recordGuard.report(true);
}
// 一些 metrics 统计...
// 等待磁盘 BatchRead 完成
co_await batch.complete();
// 执行到此处时,folly 会保证协程切换回 RDMA 线程继续执行
waitAioRecordGuard.report(true);
if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::SEND_DATA_INLINE)) {
batch.copyToRespBuffer(rsp.inlinebuf.data);
} else if (!BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_RDMAXMIT)) {
auto ibSocket = ctx.transport()->ibSocket();
// 准备发起一批 RDMA Write 操作
auto waitBatchRecordGuard = storageWaitBatchRecoder.record();
auto writeBatch = ctx.writeTransmission();
batch.addBufferToBatch(writeBatch);
waitBatchRecordGuard.report(true);
// RDMA 限流
// ...
// RDMA 限流
if (ctx.packet().controlRDMA() && RDMATransmissionReqTimeout != 0_ms && !applyTransmissionBeforeGettingSemaphore) {
co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
}
auto waitPostRecordGuard = storageWaitPostRecoder.record(ibdevTagSet);
// 此时在 RDMA 线程执行 RDMA Write
auto postResult = FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
makeError(RPCCode::kRDMAPostFailed),
(co_await writeBatch.post()));
// RDMA 成功后,还在 RDMA 线程中
}
waitAioAndPostRecordGuard.report(true);
recordGuard.succ();
co_return rsp;
}
上述代码片段关键逻辑已添加注释,请留意阅读。继续分析下实际的 AIO 逻辑,这里是从队列出取出任务,根据提交的 ReadJob 构造 AIO 调用(或者 io_uring 调用):
void AioStatus::collect() {
auto recordGuard = ioCollectRecorder.record();
while (availableToSubmit() && iterator_) {
auto &job = *iterator_++;
auto result = job.state().storageTarget->aioPrepareRead(job);
if (UNLIKELY(!result)) {
job.setResult(makeError(std::move(result.error())));
continue;
}
++readyToSubmit_;
++inflight_;
auto iocb = availables_.back();
availables_.pop_back();
auto &state = job.state();
job.resetStartTime();
::io_prep_pread(iocb, state.readFd, state.localbuf.ptr(), state.readLength, state.readOffset);
iocb->data = &job;
}
recordGuard.succ();
}
void IoUringStatus::collect() {
auto recordGuard = ioCollectRecorder.record();
while (availableToSubmit() && iterator_) {
auto &job = *iterator_++;
auto result = job.state().storageTarget->aioPrepareRead(job);
if (UNLIKELY(!result)) {
job.setResult(makeError(std::move(result.error())));
continue;
}
++inflight_;
auto &state = job.state();
job.resetStartTime();
struct io_uring_sqe *sqe = ::io_uring_get_sqe(&ring_);
assert(sqe != nullptr);
::io_uring_prep_read_fixed(sqe,
state.fdIndex.value_or(state.readFd),
state.localbuf.ptr(),
state.readLength,
state.readOffset,
state.bufferIndex);
if (state.fdIndex) {
sqe->flags |= IOSQE_FIXED_FILE;
}
::io_uring_sqe_set_data(sqe, &job);
submittingJobs_.push_back(&job);
}
recordGuard.succ();
}
需要特别注意这里的 aioPrepareRead
是同步调用。然后将构造好的请求塞入到系统调用,工作线程池也是很经典的 submit
和 reap
循环:
Result<Void> AioReadWorker::start(const std::vector<int> &fds, const std::vector<struct iovec> &iovecs) {
uint32_t numThreads = config_.num_threads();
for (auto i = 0u; i < numThreads; ++i) {
executors_.add([&]() {
AioStatus aioStatus;
IoUringStatus ioUringStatus;
// 初始化一下 AIO 或者 uring,开跑
run(aioStatus, ioUringStatus);
});
}
for (int i = 0; initialized_ != numThreads; ++i) {
XLOGF_IF(INFO, i % 5 == 0, "Waiting for AioReadWorker@{}::run start...", fmt::ptr(this));
std::this_thread::sleep_for(100_ms);
}
RETURN_AND_LOG_ON_ERROR(*initResult_.lock());
return Void{};
}
Result<Void> AioReadWorker::run(AioStatus &aioStatus, IoUringStatus &ioUringStatus) {
aioRunningThreadsCount.addSample(1);
auto guard = folly::makeGuard([] { aioRunningThreadsCount.addSample(-1); });
while (true) {
// 1. try to fetch a batch read job.
aioRunningThreadsCount.addSample(-1);
auto it = queue_.dequeue(); // waiting.
aioRunningThreadsCount.addSample(1);
if (it.isNull()) {
XLOGF(DBG, "Stop AioReadWorker {}...", fmt::ptr(this));
return Void{};
}
batchReadInQueueRecorder.addSample(RelativeTime::now() - it.startTime());
it->batch().resetStartTime();
IoStatus &status = config_.useIoUring() ? static_cast<IoStatus &>(ioUringStatus) : aioStatus;
status.setAioReadJobIterator(it);
do {
// 2. collect a batch of read jobs.
status.collect();
// 3. submit a batch of read jobs.
status.submit();
// 4. wait a batch of events.
while (status.inflight()) {
status.reap(config_.min_complete());
};
} while (status.hasUnfinishedBatchReadJob());
}
return Void{};
}
这里贴一下 AIO 的 submit
和 reap
代码,uring 也类似的逻辑,换成对应的系统调用就行:
void AioStatus::submit() {
uint32_t submitStartPoint = availables_.size();
uint32_t loopCnt = 0;
while (readyToSubmit_) {
++loopCnt;
auto recordGuard = ioSubmitRecorder.record();
int ret = ::io_submit(aioContext_, readyToSubmit_, &availables_[submitStartPoint]);
}
// 一些错误处理...
}
void AioStatus::reap(uint32_t minCompleteIn) {
uint32_t minComplete = std::min(inflight(), minCompleteIn);
auto recordGuard = ioGetEventsRecorder.record();
int ret = ::io_getevents(aioContext_, minComplete, inflight(), events_.data(), nullptr);
if (LIKELY(ret >= 0)) {
recordGuard.succ();
ioGetEventsSize.addSample(ret);
inflight_ -= ret;
for (int i = 0; i < ret; ++i) {
auto &event = events_[i];
availables_.push_back(event.obj);
setReadJobResult(event.data, event.res);
}
} else if (ret == -EINTR) {
XLOGF(INFO, "aio is interrupted by a signal handler");
return;
} else {
XLOGF(ERR, "aio io_getevents error: {}", ret);
return;
}
}
setReadJobResult
就是负责设置结果,增加 BatchReadJob
的计数器,做完了就唤醒协程,还是我们的老朋友 Baton
:
void AioReadJob::setResult(Result<uint32_t> lengthInfo) {
// 检查 Checksum,错误处理
// ...
result_.lengthInfo = std::move(lengthInfo);
state_.chunkEngineJob.reset();
batch_.finish(this);
}
void BatchReadJob::finish(AioReadJob *job) {
(void)job;
if (++finishedCount_ == jobs_.size()) {
batchReadLatency.addSample(RelativeTime::now() - startTime());
baton_.post();
}
}
后台 IO 线程
这块设计文档说的挺明白的了,这里简单贴一下线程在哪启动的,Rust 引擎本身不会启动后台线程,反而是由 C++ 线程来驱动,线程中也全部是同步 IO 操作,感兴趣的读者顺藤摸瓜找一找具体的实现逻辑就好。个人理解这些线程是计算为主。
对应的类 AllocateWorker
,代码位于 src/storage/worker/AllocateWorker.cc
:
void AllocateWorker::loop() {
while (!stopping_) {
auto lock = std::unique_lock(mutex_);
if (cond_.wait_for(lock, 100_ms, [&] { return stopping_.load(); })) {
break;
}
auto minRemainGroups = config_.min_remain_groups();
auto maxRemainGroups = config_.max_remain_groups();
auto minRemainUltraGroups = config_.min_remain_ultra_groups();
auto maxRemainUltraGroups = config_.max_remain_ultra_groups();
auto maxReserved = config_.max_reserved_chunks();
for (auto &engine : components_.storageTargets.engines()) {
engine->allocate_groups(minRemainGroups, maxRemainGroups, 128);
engine->allocate_ultra_groups(minRemainUltraGroups, maxRemainUltraGroups, 32);
engine->compact_groups(maxReserved);
}
}
XLOGF(INFO, "AllocateWorker@{}::loop stopped", fmt::ptr(this));
stopped_ = true;
}
将 allocate_thread
和 compact_thread
合二为一了。有个比较奇怪的地方是,就算一个 Target 没有用到 Rust 的 Chunk Engine,也会启动 Rust 的引擎,只是对应的目录下应该啥东西都没有,实际运行中也不会调用到 Rust 引擎,而是调用 C++ 的 Chunk 引擎,对应的 Rust 引擎直接空转:
Result<Void> StorageTargets::init(CPUExecutorGroup &executor) {
targetPaths_ = config_.target_paths();
std::vector<folly::coro::TaskWithExecutor<Result<rust::Box<chunk_engine::Engine>>>> tasks;
for (auto &path : targetPaths_) {
auto engine_path = path / "engine";
bool create = !boost::filesystem::exists(engine_path);
create |= config_.create_engine_path();
tasks.push_back(folly::coro::co_invoke([engine_path, create]() -> CoTryTask<rust::Box<chunk_engine::Engine>> {
std::string error;
auto engine = chunk_engine::create(engine_path.c_str(), create, sizeof(ChainId), error);
if (!error.empty()) {
co_return makeError(StorageCode::kStorageStatFailed, std::move(error));
}
co_return rust::Box<chunk_engine::Engine>::from_raw(engine);
}).scheduleOn(&executor.pickNext()));
}
auto results = folly::coro::blockingWait(folly::coro::collectAllRange(std::move(tasks)));
for (auto &result : results) {
RETURN_AND_LOG_ON_ERROR(result);
engines_.push_back(std::move(result.value()));
}
return Void{};
}
另外还会启动 PunchHoleWorker
,应该就是 compact_thread
,只对使用了 C++ 引擎的 StorageTarget 有作用,Rust 的垃圾回收在 AllocateWorker
里:
class StorageTarget : public enable_shared_from_this<StorageTarget> {
Result<bool> punchHole() {
if (useChunkEngine()) {
return true;
} else {
return chunkStore_.punchHole();
}
}
};
void PunchHoleWorker::loop() {
RelativeTime lastPunchHoleTime = RelativeTime::now();
bool allTargetsRecycled = true;
while (!stopping_) {
if (allTargetsRecycled) {
auto lock = std::unique_lock(mutex_);
if (cond_.wait_for(lock, 1_s, [&] { return stopping_.load(); })) {
break;
}
}
// 1. recycle all targets.
if (RelativeTime::now() - lastPunchHoleTime >= 10_s) {
std::vector<std::weak_ptr<StorageTarget>> targets;
{
auto targetMap = components_.targetMap.snapshot();
for (auto &[targetId, target] : targetMap->getTargets()) {
if (target.unrecoverableOffline()) {
continue;
}
if (target.localState != flat::LocalTargetState::OFFLINE && target.storageTarget != nullptr) {
targets.push_back(target.storageTarget);
}
}
}
allTargetsRecycled = true;
for (auto &weakTarget : targets) {
if (stopping_) {
break;
}
auto target = weakTarget.lock();
if (!target) {
continue;
}
bool targetRecycled = false;
for (auto i = 0u; i < 128u && !stopping_; ++i) {
auto result = target->punchHole();
if (result.hasError()) {
XLOGF(ERR, "recycle target {} failed: {}", target->path(), result.error());
targetRecycled = true;
break;
} else if (*result) {
targetRecycled = true;
break;
}
}
allTargetsRecycled &= targetRecycled;
}
lastPunchHoleTime = RelativeTime::now();
}
}
stopped_ = true;
XLOGF(INFO, "PunchHoleWorker@{}::loop stopped", fmt::ptr(this));
}
个人觉得后台线程这块还是挺混乱的,主要是线程功能职责混乱,还有多了一些空转的逻辑。还有 PunchHoleWorker
这里直接暴力重试 128 次调用 punchHole
,不太符合 DeepSeek 对代码的极致追求啊,可能 C++ 不是主引擎。
总结
3FS 的磁盘 IO 出人意料地采用了同步 IO 的模式,使用线程池去完成实际的 IO 操作。写操作通过 UpdateWorker
线程池完成,读操作使用了 AIO 异步读取实现 AioReadWorker
(实际里面还实现了 uring
),此时 ChunkEngine
或者 ChunkStore
被 Bypass,只起到提供读取的目标 fd
以及偏移和长度的作用,这也符合 3FS 所 claim 的对于读场景优化(对写场景看起来是真的没有做什么优化,甚至还加了锁将写操作串行化)。我们关心的对于网络协程和磁盘 IO 的交互,使用了有锁 MPMC 任务队列的方式将网络上收到的 IO 请求发送到 IO 线程池完成,最后也是通过 Baton
类来负责协程的挂起和唤醒。
这里的线程切换时机需要特别注意:从 RDMA 网络线程切换到 IO 线程,发生在 IO 工作线程取出 IO 任务这一刻;而 IO 任务完成后,虽然是在 IO 线程中执行的 baton.post()
,但 folly 的 Task
通过 await_transform
变换保证 co_await baton
能正确地调度回 RDMA 线程的执行器执行后续流程,如果后面要提交网络 RDMA IO,实际上是切换回了 RDMA 线程去提交。所以 co_await
时如果不做特殊处理,其实是有可能发生线程切换的,如果是自己编写无栈协程库,这是编程中需要特别注意的一个点。关于 Folly 协程实现细节,可以参考:https://www.yinkuiwang.cn/2023/09/24/Folly%20coro%E5%AD%A6%E4%B9%A0/
总的来说,个人看法是 3FS 在性能和开发复杂度之间进行了取舍,磁盘 IO 实现并没有 RDMA 网络那么精巧,实际上并没有在整个 IO 链路都采用纯异步的方式来编程。个人推测可能是因为其 Chunk 引擎使用了 RocksDB 来进行元数据持久化,Chunk 读写的关键路径上必定涉及 RocksDB IO 操作。但是,将 RocksDB 改造成全异步并不是一件简单的事,于是退而求其次将磁盘 IO 用同步阻塞的方式实现了。只要提交队列够深,或者磁盘 IO 的开销大于线程间协程的同步,那么总的来说是可以牺牲少量的性能来换取代码开发的大幅简化。