Understanding 3FS from its call stacks / spans.
Reference:
- deepseek-ai/3FS
- Fire-Flyer AI-HPC: A Cost-Effective Software-Hardware Co-Design for Deep Learning
html pdf - High-FLYER - AI BLOG
- DeepSeek 3FS 源码解读 by howardlau1999
- DeepSeek 3FS解读与源码分析 by StorageScale
I/O
Read
Client interface
FUSE
TODO
Native client
TODO
Metadata plane
For locating the corresponding server, a.k.a. storage target, of the blobs reading.
Locating chain
TODO
Locating target
TODO
Location chunk
TODO
Data plane
For reading blobs from storage target.
Networking
TLDR:
- Client RDMA Send / TCP send read request to target.
- If controlled, target RDMA Send / TCP send to client for approval to transmit blob.
- Target RDMA Write blob back to client.
- Target RDMA Send / TCP send response back to client, marking the completion of the batched read.
- client/storage ######## client (behind USRBIO) ########
- StorageClient::createReadIO # create read request
StorageClientImpl::{read,batchRead} # send read request
- batchReadWithRetry
- batchReadWithoutRetry
- sendBatchRequest
- StorageMessenger::batchRead # invoke rpc
- fbs/storage/Service ######## rpc & serde framework ########
- SERDE_SERVICE StorageSerde::SERDE_SERVICE_METHOD batchRead
- common/serde/ClientContext::call <batchRead> # send rpc request
- common/net ######## client network layer ########
- IOWorker::sendAsync
- Transport::send
- IOWorker::startWriteTask
- Transport::doWrite
- Transport::writeAll
- Socket::send
* ib/IBSocket
- postSend
- ::ibv_post_send IBV_WR_SEND, IBV_SEND_SIGNALED
* tcp/TcpSocket
- ::sendmsg
...... client ---> target ......
- common/net ######## server network layer ########
- EventLoop::loop
- ::epoll_wait # wake on transport signal
EventLoop::EventHandler::handleEvents
- Transport::handleEvents
- Socket::poll # poll for recv event
* ib/IBSocket
- cqPoll
- ::ibv_poll_cq
- wcSuccess
- onRecved # push recv body to recvBufs_
* tcp/TcpSocket
- IOWorker::startReadTask
- Transport::doRead
- Socket::recv # retrieve rpc request body of the recv event
* ib/IBSocket
- recvBufs_.front() && recvBufs_.pop()
postRecv # keep ib recv buffer watermark
* tcp/TcpSocket
- ::read
- IOWorker::processMsg # handle rpc request
Processor::processMsg
- unpackMsg
- unpackSerdeMsg
- tryToProcessSerdeRequest
- processSerdeRequest
- common/serde # reflect rpc handler and invoke
- CallContext::handle
- CallContext::call <batchRead>
- folly::coro::co_awaitTry(...)
storage/service ######## server, i.e. storage target ########
- StorageService::batchRead # rpc handler
- StorageOperator::batchRead
- ... locate target of chain ...
... allocate rdma buffer ...
components_.aioReadWorker.enqueue(batch) # start async disk read (see [#retrieving-chunk-from-disk])
await batch.complete() # await for disk read done
... copy result to response buffer ...
* SEND_DATA_INLINE
- batch.copyToRespBuffer(rsp.inlineBuf.data)
* !BYPASS_RDMAXMIT (debugging flag, stands for "bypass rdma transmit")
- writeBatch: CallContext::RDMATransmission (IBV_WR_RDMA_WRITE) = (ctx: common/serde/CallContext).writeTransmission()
# NOTE rdma is the only transmission implementation
batch.addBufferToBatch(writeBatch)
writeBatch.applyTransmission(RDMATransmissionReqTimeout)
# apply rdma control (see [#client-centric-concurrency-control])
- common/net/SERDE_SERVICE RDMAControl::apply
... acquire rdma device semaphore for writing out ... (default 256 concurrency)
writeBatch.post() # write blob back to client and notify for each request in batch
- common/net/ib/IBSocket
- RDMAReqBatch::post
- rdmaBatch
- rdmaPost
- rdmaPostWR
- ::ibv_post_send IBV_WR_RDMA_WRITE
WRType::RDMA, RDMA, ..., RDMA_LAST | IBV_SEND_SIGNALED
# NOTE the RDMA_LAST flag is used to mark the last CQ of the batched ibv_post_send
# NOTE signal issued by RDMA Write does not bump Socket::Events
makeResponse # sends the response of the batchRead
- Transport::send
- ... WRType::SEND
...... client <--- target ...... ######## target rdma send response back to client ########
- common/net
- ...
- Transport::handleEvents
- Socket::poll
* ib/IBSocket
- ...
- onReceved
* tcp/TCPSocket
- IOWorker::startReadTask
- Transport::doRead
- Socket::recv
- IOWorker::processMsg
Processor::processMsg
- unpackMsg
- unpackSerdeMsg
- Waiter::instance().post(packet, buf) # wake the coro awaiting on the return of rpc
- common/serde/ClientContext::call <batchRead>
await item.baton # waked
...
co_return rsp
- client/storage
- ...
- StorageClient::batchRead
Client-centric concurrency control
At peak load, incast congestion is observed on the client side. To mitigate this congestion, a request-to-send control mechanism is implemented in storage service and client [1]. After receiving a read request from a client, the service reads data from SSD and asks the client’s permission to transfer the data. The client limits the number of concurrent senders. When a storage service is granted the permission to transfer, it sends the data with an RDMA WRITE followed by an RDMA SEND to notify the client [2]. The request-to-send control increases end-to-end IO latency but it’s required to achieve sustainable high throughput.
-- Fire-Flyer AI-HPC: A Cost-Effective Software-Hardware Co-Design for Deep Learning #VI-B3 Key Techinical Points of 3FS
- E. B. Nightingale, J. Elson, J. Fan, O. Hofmann, J. Howell, and Y. Suzue, “Flat datacenter storage,” in Proceedings of the 10th USENIX Conference on Operating Systems Design and Implementation, ser. OSDI’12. USA: USENIX Association, 2012, p. 1–15.
- The current source code flags the last RDMA Write of a response as signaling, mitigating the trailing RDMA Send.
Takeaways
-
Why not fully implement the control on the client side, before issuing the requests?
Wrapping the QoS point closer to the data transmission step, excluding the RDMA Sends for RPCs from the controlled scope, as those are not bandwidth-intensive.
- common/serde ######## server network layer ########
- CallContext::RDMATransmission::applyTransmission # request permission to write to client
- common/net/RDMAControl
- SERDE_SERVICE RDMAControl::SERDE_SERVICE_METHOD apply
...... target ---> client ......
- ... ######## client network layer ########
- common/net::RDMAControl # client service impl
- RDMAControlImpl::apply
- RDMATransmissionLimiter::co_await # limiting concurrency (default 64 concurrent transmissions)
prepareLatency: Option<Duration> = Waiter::instance().setTransmissionLimiterPtr(req.uuid, limiter, startTs)
# NOTE `req.uuid` is generated by the client for each request, see ClientContext::call Waiter::instance().bind(item)
# prepareLatency is time between client constructing request and client approving data transmit
transmissionPrepareLatency.addSample(prepareLatency) # update metrics
...... target <--- client ......
- ... # target rdma write data to client and reply with response
...... target ---> client ......
- common/net ######## client network layer ########
- ...
- Processor
- unpackSerdeMsg
- Waiter::instance().post(packet, buf)
- item->limiter_->signal(...) # release semaphore on receiving batched read rpc response
Retrieving chunk from disk
Takeaways
-
Why not SPDK?
As cited in Client-centric concurrency control, HF prioritizes “sustainable high throughput” over lower latency. If they can get away with ingesting RDMA Send to every read request for concurrency control, the choice of coroutine + io_uring is well justified.
Besides using a file system is just easy on us all.
3FS minimizes context switching by batching requests, its io_urings are setup with no additional flags, i.e. not polling.
-
Why XFS for disks?
- Logged for strong consistency
- Optimized for large files (3FS uses append-only files for physical data placement)
- Stable and commonly available (?)
-
Why prefer AIO over io_uring as default?
The README of 3FS states that it supports Ubuntu 22.04, whose kernel version is 5.15. However 6.x kernel is recommended to use io_uring. In my not-so-rigorous testing (on 25 Gbps x4 eRDMA on Ubuntu 22.04), io_uring did not bring noticeable performance gain over libaio in sequential 4 MiB reads (9597 MiB/s vs 9520 MiB/s, the conclusion may change if not bounded by network).
Or maybe SF-Zhou was just tired of replying to issues related to io_uring.
- storage/service ######## target service impl ########
- StorageOperator::batchRead
- ...
components_.aioReadWorker.enqueue(batch) # start async disk read batch
await batch.complete() # await for disk read done
- storage/aio ######## target disk io ########
- AioReadWorker::run # async io event loop, on folly::CPUThreadPoolExecutor, default 32 threads
# NOTE 3FS modified the impl of tpool so it essentially becomes a fixed tpool, never shrinking
- it: AioReadJobIterator = queue_.dequeue() # wait on common/utils/BoundedQueue on folly::MPMCQueue<Dynamic = false> for aio job batch
# NOTE folly::MPMCQueue<Dynamic = false> uses adaptive futex for synchronizing
&status = use io_uring ? IoUringStatus : AioStatus # choose which async io engine to use, libaio or liburing
# NOTE the `IoStatus`es live within the closure provided to executor, they are exclusive to each event loop
# NOTE `IoUringStatus` is associated with each storage target, initialized with the target's files and rdma buffers registered on start
status.setAioReadJobIterator(it) # set job batch description to the executor-local engine
- storage/aio
* AioStatus # NOTE default engine, possibly due to compatibility concerns
- collect
- storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
* ChunkEngine::aioPrepareRead # get chunk metadata if not already present in job
- storage/chunk_engine
- cxx/Engine::get_raw_chunk # rust/cxx bridge
- core/Engine::get # lock the meta cache entry
- core/Engine::get_with_entry # unwrap cached or create reference from allocator
- ... the chunk meta `Arc<Chunk>` is leaked to cxx ...
* ChunkReplica::aioPrepareRead
- ... directly get from kv, uncached, not ref-counted (?) ...
::io_prep_pread # prepare for each job
submit
- ::io_submit # submit prepared jobs in batch
while status.inflight() IoStatus::reap # reap until no inflight jobs
- ::io_getevents
setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
- job.setResult
- state_.chunkEngineJob.reset() # explicitly return `Arc<Chunk>` to rust
(batch_: BatchReadJob).finish(this: AioReadJob)
- baton_.post() # wake on all jobs in batch are finished
* IoUringStatus
- collect
- storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
- ...
sqe = ::io_uring_get_sqe
::io_uring_prep_read_fixed(sqe) # prepare read with pre-allocated buffer
::io_uring_sqe_set_data(sqe, &job) # accompany job context
submit
- ::io_uring_submit
while status.inflight() IoStatus::reap # reap until no inflight jobs
- ::io_uring_wait_cqes
::io_uring_for_each_cqe
- setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
- ...
::io_uring_cq_advance
- storage/service ######## target service impl ########
- StorageOperator::batchRead
- ...
await batch.complete() # waked
Write
TODO
Recovery
TODO
Data placement
Scaling on-the-fly
Takeaways
-
In case the disk / node backing the storage target fails, its traffic is redirected to the secondary (or tertiary) target on the chains. By allocating multiple storage targets on the same disk allows the disk to be part of multiple chains, so more targets can share its redirected traffic, alleviating congestion.
-
There are rules for targets and chains:
- A target can only be referenced by at most one chain at any given time.
- A chain must have at least one target at any given time.
- For any chain, its consisting targets must come from different nodes.
- The last target of a chain cannot be removed, i.e. once a chain is created it cannot be removed.
The rules above implies that the # of targets must be larger than or equal to the # of chains, and the # of nodes must be larger than the # of targets in the longest chain.
Getting the capacity of the file system
TLDR, the capacity of a 3FS is the sum of the capacities of all storage nodes.
The capacity has nothing to do with chain / target / replication, it is the naïve summation of all disks in the cluster. That is to say if a 3FS is 2-replicated, the used space reported by df
will be slightly larger than twice of that reported by du
, accounting all the chunk replicas and metadata databases.
-
Querying (cached) capacity from meta service
- meta/store/ops ######## meta service impl ######## - StatFs::run - meta/components - FileHelper::statFs ... return cachedFsStatus_ ...
-
Refreshing
FileHelper::cachedFsStatus_
- meta/components - FileHelper::updateStatFs nodes = mgmtdClient_->getRoutingInfo()->getNodeBy(NodeType::STORAGE && active) ... reduce cap, free from nodes ...
Managing storage target’s state
offline-target
Marks a storage target as offline, it can then be safely removed from its chain.
TODO
remove-target
Removes a storage target from the cluster.
TODO
create-target
Creates a new storage target belonging to a specific chain. The target will be online and start syncing after creation.
TODO
Updating placement of a chain
update-chain
Adding or removing targets from a chain.
TODO
rotate-lastsrv
The LASTSRV
target in a chain cannot be offlined. However we can choose which target is LASTSRV
in a chain, by rotating the order of targets in the chain.
TODO
upload-chains
Adding chains to the cluster.
TODO
upload-chain-table
Adding new chains to chain table, so they can start serving.
TODO
Physical placement of chunks
DeepSeek 3FS解读与源码分析(3):Storage模块解读 - 空间分配 |
Storage hierarchy
deepseek-ai/3fs chunk-engine - storage hierarchy
Node
`-- 1:10~20 Disks (??t)
`-- 1:256 Files (???g)
`-- 1:~1k Groups (16m/128m/1g)
( `-- 1:256 Chunks, spreaded over Files (64k/512k/4m) )
When chunks are 512k, expecting (20 * 30t disks / 512k chunk =)
~1.2b chunks or (20 * 30t disks / 512k chunk / 256 chunks per group =)
~5m groups per node.
Allocate
TODO
Deallocate
TODO
Defragmentation
TODO