3FS - A source code perspective

Understanding 3FS from its call stacks / spans.

References


Meta

Open

Open files and retrieve their Inodes.

- fuse                                                              ######## client issue open request ########
    - FuseOps::hf3fs_open                                           # open file with inode id
- client/meta
    - MetaClient::open
    - stubs/MetaService/IMetaServiceStub::open                      # rpc call

- meta/service                                                      ######## server impl ########
    - MetaOperator::open
    - MetaOperator::runOp(MetaStore::open, req)
- meta/store/ops
    - MetaStore::open
    - OpenOp<Open>::run
        + entry = resolve(...).path(req_.path, ...).dirEntry
        |                                                           # resolve if already created
        | if !entry { return kNotFound }                            # return if not already created
        | inode = entry.snapshotLoadInode(...)                      # load inode of already created
        |   - meta/store
        |       - DirEntry::loadInodeFromDirEntry<Inode::snapshotLoad>
        |       - Inode::snapshotLoad
        | return openExists(..., entry, ...)                        # open already created
        |   + if O_EXCL { return kExists }                          # return already exists if O_EXCL is specified
        |   | openExistsFile
        |   |   + ... check inode type ...
        |   |   | ... check permission ...
        |   |   | if O_TRUNC { replaced = replaceExistsFile(...); if replaced { return replaced_inode } }
        |   |   |                                                   # return a new inode if truncate, orphans the existing
        |   |   |   - replaceExistsFile(...)
        |   |   |       + if !otrunc_replace_file || nlink != 1 || inode.len < otrunc_replace_file_threshold { return false }
        |   |   |       | gcManager.removeEntry(..., old, ...)
        |   |   |       | allocateInodeId(...)
        |   |   |       | createInodeAndEntry(...)
        |   |   |       | createSession(...)
        |   |   | createSession(...)                                # store the read / write session in kv

Create

TLDR

  • The meta server to perform the create op is randomly chosen.
  • By default, a file is created with the ChainRange layout, where the chains the file resides on consists of some permutation of chains with consecutive chain IDs, the start of the range is randomly chosen on inode creation.

References

- fuse/FuseOps                                                      ######## fuse client ########
    - hf3fs_create
- client/meta
    - MetaClient::create
    - openCreate(IMetaServiceStub::create, req)
        + result = IMetaServiceStub::create
        | if result->needTruncate { truncateImpl(..., targetLength = 0) }

- meta/service                                                      ######## meta service impl ########
    - MetaOperator::create
        + return if runOp(MetaStore::tryOpen, req)                  # try open first, see [#open]
        | node = distributor_->getServer(req.path.parent: InodeId)  # randomly choose a meta server to perform the create op
        |   - meta/components/Distributor::getServer
        |   - fbs/meta/Utils/Weight::select                         # `Weight` is 128-bit hash value of (NodeId, InodeId)
        |   - fbs/meta/Utils/Weight::selectImpl                     # straw from all active nodes, whose NodeId hashed
        |                                                           # with InodeId is biggest wins
        | if node == distributor_->nodeId()                         # should run create on this node
        |   - runInBatch<Create>(req.path.parent: InodeId, req)
        | else                                                      # forward create op to the selected node
        |   - forward_->forward(req)
- meta/service                                                      ######## meta service perform create op ########
    - MetaOperator::runInBatch<Create>(parentId: InodeId, req)
    - MetaOperator::runBatch(parentId, op{Create}, deadline=now+5s)
        + driver = OperationDriver(op, (), deadline)
        | result = driver.run(&txn, ...)

- meta/store                                                        ######## modifying meta kv store ########
    - OperationDriver::run
    - OperationDriver::runAndCommit
- meta/store/ops
    - BatchedOp::run
        + ...
        | syncAndClose(&txn, parentId)
        | setAttr(&txn, parentId)
        | create(&txn, parentId)
        | ... store meta kv ...

- meta/store/ops/BatchOperation/BatchedOp::create(&txn, parentId)   # atomically create files and open them
    + create(&txn, parentId, &chainAllocCounter, creates_[..])
    |   + ...
    |   | for each file to create { create(&txn, parentId, &chainAllocCounter, req) }
    |   | openExists(...)                                           # open just created file
    | SetAttr::update(parentId.asDirectory().chainAllocCounter, chainAllocCounter.rlock())

- meta/store/ops/BatchOperation/BatchedOp::create(&txn, parentId, &chainAllocCounter, req)
                                                                    ######## innermost create file impl ########
    + ... sanity checks ...
    | if layout.empty { chainAlloc().allocateChainsForLayout(&mut layout, ...) }
    |                                                               # if chain layout is not specified, populate the range of
    |                                                               # chains it is supposed to reside on
    |   - meta/components/ChainAllocator::allocateChainsForLayout(...)
    |                                                               # for a file created with no specified layout, a range of
    |                                                               # chains are chosen from the working chain table,
    |                                                               # the start of the range is randomly chosen
    | inodeid = allocateInodeId(...)                                # allocate a new inode id in meta kv store (may mark the
    |                                                               # inode to use new chunk engine when write)
    |   - meta/components/InodeIdAllocator::allocateInodeId(...)
    | entry = DirEntry::new(parentId, name, inodeid)                # construct the directory entry
    | inode = Inode::newFile(...)                                   # construct the file's meta
    | ... persist entry and inode into meta kv ...

I/O

Read

Client interface

FUSE

TODO

Native client

  • Putting IO jobs on the ring

      TODO
    
  • Polling jobs from the ring and process them

      - fuse
          - FuseClients::ioRingWorker
          - IoRing::process
              + (ioExec: PioV).addRead(...)
          - PioV::executeRead
      - client/storage
          - StorageClientImpl::batchRead
    

Metadata plane

For locating the corresponding server, a.k.a. storage target, of the blobs reading.

  • Query the opened Inodes during read

      - fuse
          - IoRing::process(..., lookupFiles)
          - FuseClients::ioRingWorker::lambda lookupFiles [&FuseClients::inodes : HashMap<InodeId, RcInode>]
                                                                      # query from opened inode map
    
  • Extract chain and destination chunk

      - fuse
          - PioV::addRead
          - PioV::chunkIo                                             # convert inode and offset to chain and chunk
              + f: File = (inode: Inode).asFile()                     # extract file meta from inode, where `Layout` is stored
              | chain: ChainId = f.getChainId(...)
              | fchunk: ChunkId = f.getChunkId(...)
    
      - fbs/meta/Schema
          - File::getChainId(inode, offset, routingInfo_, track=0)
              + ref: ChainRef = layout.getChainOfChunk(inode, offset / layout.chunkSize + track * (TRACK_OFFSET_FOR_CHAIN=7))
              |   + chains = getChainIndexList()                      # get the permutation of chains
              |   |   * Layout::ChainRange                            # calculate based on `baseIndex` and a `seed` for permutation
              |   |   * Layout::ChainList                             # user-specified layout
              |   |   * Layout::Empty                                 # no consisting chains, e.g. directory
              |   | stripe = chunkIdx % stripeSize                    # chunks reside on the chain vector in round-robin
              |   | return ChainRef{tableId, tableVer, chains[stripe]}
              | cid: [ChainId] = routingInfo.getChainId(ref)          # will wrap chain index to fit within the specified chain table
              | return cid
    

    track is always 0 in the entire repo, not sure what it does.

      - fbs/meta/Schema
          - File::getChunkId(inodeId, offset)
              + chunk: ulong = offset / layout.chunkSize
              | return chunk
    
  • Determine the storage target to perform this read request

      - fuse
          - IoRing::process
          - PioV::executeRead
      - client/storage
          - StorageClientImpl::batchRead
          - StorageClientImpl::batchReadWithRetry
          - StorageClientImpl::batchReadWithoutRetry
          - StorageClientImpl::selectRoutingTargetForOps              # default target selection mode for read is `LoadBalance`
              + slimChain = ...                                       # get the chain of this op
              | selectedTarget = targetSelectionStrategy->selectTarget(slimChain)
              | ...
    

Data plane

For reading blobs from storage target.

Networking

TLDR:

  1. Client RDMA Send / TCP send read request to target.
  2. If controlled, target RDMA Send / TCP send to client for approval to transmit blob.
  3. Target RDMA Write blob back to client.
  4. Target RDMA Send / TCP send response back to client, marking the completion of the batched read.
- client/storage                                                    ######## client (behind USRBIO) ########
    + StorageClient::createReadIO                                   # create read request
    | StorageClientImpl::{read,batchRead}                           # send read request
    |   - batchReadWithRetry
    |   - batchReadWithoutRetry
    |   - sendBatchRequest
    - StorageMessenger::batchRead                                   # invoke rpc

- fbs/storage/Service                                               ######## rpc & serde framework ########
    - SERDE_SERVICE StorageSerde::SERDE_SERVICE_METHOD batchRead
- common/serde/ClientContext::call <batchRead>                      # send rpc request
- common/net                                                        ######## client network layer ########
    - IOWorker::sendAsync
    - Transport::send
    - IOWorker::startWriteTask
    - Transport::doWrite
        - Transport::writeAll
    - Socket::send
        * ib/IBSocket
            - postSend
                - ::ibv_post_send IBV_WR_SEND, IBV_SEND_SIGNALED
        * tcp/TcpSocket
            - ::sendmsg

...... client ---> target ......

- common/net                                                        ######## server network layer ########
    - EventLoop::loop
    - ::epoll_wait                                                  # wake on transport signal
      EventLoop::EventHandler::handleEvents
    - Transport::handleEvents
        - Socket::poll                                              # poll for recv event
            * ib/IBSocket
                - cqPoll
                    - ::ibv_poll_cq
                - wcSuccess
                - onRecved                                          # push recv body to recvBufs_
            * tcp/TcpSocket
    - IOWorker::startReadTask
    - Transport::doRead
        - Socket::recv                                              # retrieve rpc request body of the recv event
            * ib/IBSocket
                - recvBufs_.front() && recvBufs_.pop()
                  postRecv                                          # keep ib recv buffer watermark
            * tcp/TcpSocket
                - ::read
    - IOWorker::processMsg                                          # handle rpc request
      Processor::processMsg
        - unpackMsg
        - unpackSerdeMsg
        - tryToProcessSerdeRequest
        - processSerdeRequest
- common/serde                                                      # reflect rpc handler and invoke
    - CallContext::handle
    - CallContext::call <batchRead>
        - folly::coro::co_awaitTry(...)
          storage/service                                           ######## server, i.e. storage target ########
            - StorageService::batchRead                             # rpc handler
            - StorageOperator::batchRead
                + ... locate target of chain ...
                | ... allocate rdma buffer ...
                | components_.aioReadWorker.enqueue(batch)          # start async disk read (see [#retrieving-chunk-from-disk])
                | await batch.complete()                            # await for disk read done
                | ... copy result to response buffer ...
                |     * SEND_DATA_INLINE
                |         - batch.copyToRespBuffer(rsp.inlineBuf.data)
                |     * !BYPASS_RDMAXMIT  (debugging flag, stands for "bypass rdma transmit")
                |         + writeBatch: CallContext::RDMATransmission (IBV_WR_RDMA_WRITE) = (ctx: common/serde/CallContext).writeTransmission()
                |         |                                         # NOTE rdma is the only transmission implementation
                |         | batch.addBufferToBatch(writeBatch)
                |         | writeBatch.applyTransmission(RDMATransmissionReqTimeout)
                |         |                                         # apply rdma control (see [#client-centric-concurrency-control])
                |         |   - common/net/SERDE_SERVICE RDMAControl::apply
                |         | ... acquire rdma device semaphore for writing out ...  (default 256 concurrency)
                |         | writeBatch.post()                       # write blob back to client and notify for each request in batch
                |         |   - common/net/ib/IBSocket
                |         |       - RDMAReqBatch::post
                |         |       - rdmaBatch
                |         |       - rdmaPost
                |         |       - rdmaPostWR
                |         |           - ::ibv_post_send IBV_WR_RDMA_WRITE
                |         |               WRType::RDMA, RDMA, ..., RDMA_LAST | IBV_SEND_SIGNALED
                |         |                                         # NOTE the RDMA_LAST flag is used to mark the last CQ of the batched ibv_post_send
                |         |                                         # NOTE signal issued by RDMA Write does not bump Socket::Events
          makeResponse                                              # sends the response of the batchRead
            - Transport::send
            - ... WRType::SEND

...... client <--- target ......                                    ######## target rdma send response back to client ########

- common/net
    - ...
    - Transport::handleEvents
        - Socket::poll
            * ib/IBSocket
                - ...
                - onRecved
            * tcp/TCPSocket
    - IOWorker::startReadTask
    - Transport::doRead
        - Socket::recv
    - IOWorker::processMsg
      Processor::processMsg
        - unpackMsg
        - unpackSerdeMsg
        - Waiter::instance().post(packet, buf)                      # wake the coro awaiting on the return of rpc
- common/serde/ClientContext::call <batchRead>
    + await item.baton                                              # waked
    | ...
    | co_return rsp
- client/storage                                                    # return all the way up
    - ...
    - StorageClient::batchRead

Client-centric concurrency control

At peak load, incast congestion is observed on the client side. To mitigate this congestion, a request-to-send control mechanism is implemented in storage service and client [1]. After receiving a read request from a client, the service reads data from SSD and asks the client’s permission to transfer the data. The client limits the number of concurrent senders. When a storage service is granted the permission to transfer, it sends the data with an RDMA WRITE followed by an RDMA SEND to notify the client [2]. The request-to-send control increases end-to-end IO latency but it’s required to achieve sustainable high throughput.

-- Fire-Flyer AI-HPC: A Cost-Effective Software-Hardware Co-Design for Deep Learning #VI-B3 Key Techinical Points of 3FS


  1. E. B. Nightingale, J. Elson, J. Fan, O. Hofmann, J. Howell, and Y. Suzue, “Flat datacenter storage,” in Proceedings of the 10th USENIX Conference on Operating Systems Design and Implementation, ser. OSDI’12. USA: USENIX Association, 2012, p. 1–15.
  2. The current source code flags the last RDMA Write of a response as signaling, mitigating the trailing RDMA Send.

Takeaways

  • Why not fully implement the control on the client side, before issuing the requests?

    Wrapping the QoS point closer to the data transmission step, excluding the RDMA Sends for RPCs from the controlled scope, as those are not bandwidth-intensive.

- common/serde                                                      ######## server network layer ########
    - CallContext::RDMATransmission::applyTransmission              # request permission to write to client
- common/net/RDMAControl
    - SERDE_SERVICE RDMAControl::SERDE_SERVICE_METHOD apply

...... target ---> client ......

- ...                                                               ######## client network layer ########
- common/net::RDMAControl                                           # client service impl
    - RDMAControlImpl::apply
        + RDMATransmissionLimiter::co_await                         # limiting concurrency (default 64 concurrent transmissions)
        | prepareLatency: Option<Duration> = Waiter::instance().setTransmissionLimiterPtr(req.uuid, limiter, startTs)
        |                                                           # NOTE `req.uuid` is generated by the client for each request, see ClientContext::call Waiter::instance().bind(item)
        |                                                           # prepareLatency is time between client constructing request and client approving data transmit
        | transmissionPrepareLatency.addSample(prepareLatency)      # update metrics

...... target <--- client ......

- ...                                                               # target rdma write data to client and reply with response

...... target ---> client ......

- common/net                                                        ######## client network layer ########
    - ...
    - Processor
        - unpackSerdeMsg
        - Waiter::instance().post(packet, buf)
        - item->limiter_->signal(...)                               # release semaphore on receiving batched read rpc response

Retrieving chunk from disk

Takeaways

  • Why not SPDK?

    As cited in Client-centric concurrency control, HF prioritizes “sustainable high throughput” over lower latency. If they can get away with ingesting RDMA Send to every read request for concurrency control, the choice of coroutine + io_uring is well justified.

    Besides using a file system is just easy on us all.

    3FS minimizes context switching by batching requests, its io_urings are setup with no additional flags, i.e. not polling.

  • Why XFS for disks?

    • Logged for strong consistency
    • Optimized for large files (3FS uses append-only files for physical data placement)
    • Stable and commonly available (?)
  • Why prefer AIO over io_uring as default?

    The README of 3FS states that it supports Ubuntu 22.04, whose kernel version is 5.15. However 6.x kernel is recommended to use io_uring. In my not-so-rigorous testing (on 25 Gbps x4 eRDMA on Ubuntu 22.04), io_uring did not bring noticeable performance gain over libaio in sequential 4 MiB reads (9597 MiB/s vs 9520 MiB/s, the conclusion may change if not bounded by network).

    Or maybe SF-Zhou was just tired of replying to issues related to io_uring.

- storage/service                                                   ######## target service impl ########
    - StorageOperator::batchRead
        + ...
        | components_.aioReadWorker.enqueue(batch)                  # start async disk read batch
        | await batch.complete()                                    # await for disk read done

- storage/aio                                                       ######## target disk io ########
    - AioReadWorker::run                                            # async io event loop, on folly::CPUThreadPoolExecutor, default 32 threads
                                                                    # NOTE 3FS modified the impl of tpool so it essentially becomes a fixed tpool, never shrinking
        + it: AioReadJobIterator = queue_.dequeue()                 # wait on common/utils/BoundedQueue on folly::MPMCQueue<Dynamic = false> for aio job batch
        |                                                           # NOTE folly::MPMCQueue<Dynamic = false> uses adaptive futex for synchronizing
        | &status = use io_uring ? IoUringStatus : AioStatus        # choose which async io engine to use, libaio or liburing
        |                                                           # NOTE the `IoStatus`es live within the closure provided to executor, they are exclusive to each event loop
        |                                                           # NOTE `IoUringStatus` is associated with each storage target, initialized with the target's files and rdma buffers registered on start
        | status.setAioReadJobIterator(it)                          # set job batch description to the executor-local engine

- storage/aio
    * AioStatus                                                     # NOTE default engine, possibly due to compatibility concerns
        + collect
        |   - storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
        |       * ChunkEngine::aioPrepareRead                       # get chunk metadata if not already present in job
        |           - storage/chunk_engine
        |               - cxx/Engine::get_raw_chunk                 # rust/cxx bridge
        |               - core/Engine::get                          # lock the meta cache entry
        |               - core/Engine::get_with_entry               # unwrap cached or create reference from allocator
        |               - ... the chunk meta `Arc<Chunk>` is leaked to cxx ...
        |       * ChunkReplica::aioPrepareRead
        |           - ... directly get from kv, uncached, not ref-counted (?) ...
        |     ::io_prep_pread                                       # prepare for each job
        | submit
        |   - ::io_submit                                           # submit prepared jobs in batch
        | while status.inflight()  IoStatus::reap                   # reap until no inflight jobs
        |   - ::io_getevents
        |     setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
        |       - job.setResult
        |           - state_.chunkEngineJob.reset()                 # explicitly return `Arc<Chunk>` to rust
        |               (batch_: BatchReadJob).finish(this: AioReadJob)
        |               - baton_.post()                             # wake on all jobs in batch are finished
    * IoUringStatus
        + collect
        |   - storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
        |       - ...
        |     sqe = ::io_uring_get_sqe
        |     ::io_uring_prep_read_fixed(sqe)                       # prepare read with pre-allocated buffer
        |     ::io_uring_sqe_set_data(sqe, &job)                    # accompany job context
        | submit
        |   - ::io_uring_submit
        | while status.inflight()  IoStatus::reap                   # reap until no inflight jobs
        |   - ::io_uring_wait_cqes
        |     ::io_uring_for_each_cqe
        |       - setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
        |           - ...
        |     ::io_uring_cq_advance

- storage/service                                                   ######## target service impl ########
    - StorageOperator::batchRead
        + ...
        | await batch.complete()                                    # waked

Write

- client/storage                                                    ######## client (behind USRBIO) ########
    + StorageClient::createWriteIO                                  # create write request
    | StorageClientImpl::{write,batchWrite}                         # send write request
    |   - batchWriteWithRetry
    |   - batchWriteWithoutRetry
    |   - sendBatchRequest
    - StorageMessenger::batchWrite                                  # invoke rpc

TODO

TODO


Recovery

TODO

Data placement

Scaling on-the-fly

Takeaways

  • In case the disk / node backing the storage target fails, its traffic is redirected to the secondary (or tertiary) target on the chains. By allocating multiple storage targets on the same disk allows the disk to be part of multiple chains, so more targets can share its redirected traffic, alleviating congestion.

  • There are rules for targets and chains:

    • A target can only be referenced by at most one chain at any given time.
    • A chain must have at least one target at any given time.
    • For any chain, its consisting targets must come from different nodes.
    • The last target of a chain cannot be removed, i.e. once a chain is created it cannot be removed.

    The rules above implies that the # of targets must be larger than or equal to the # of chains, and the # of nodes must be larger than the # of targets in the longest chain.

Getting the capacity of the file system

TLDR, the capacity of a 3FS is the sum of the capacities of all storage nodes.

The capacity has nothing to do with chain / target / replication, it is the naïve summation of all disks in the cluster. That is to say if a 3FS is 2-replicated, the used space reported by df will be slightly larger than twice of that reported by du, accounting all the chunk replicas and metadata databases.

  • Querying (cached) capacity from meta service

      - meta/store/ops                                        ######## meta service impl ########
          - StatFs::run
      - meta/components
          - FileHelper::statFs
            ... return cachedFsStatus_ ...
    
  • Refreshing FileHelper::cachedFsStatus_

      - meta/components
          - FileHelper::updateStatFs
            nodes = mgmtdClient_->getRoutingInfo()->getNodeBy(NodeType::STORAGE && active)
            ... reduce cap, free from nodes ...
    

Managing storage target’s state

offline-target

Marks a storage target as offline, it can then be safely removed from its chain.

TODO

remove-target

Removes a storage target from the cluster.

TODO

create-target

Creates a new storage target belonging to a specific chain. The target will be online and start syncing after creation.

TODO

Updating placement of a chain

update-chain

Adding or removing targets from a chain.

TODO

rotate-lastsrv

The LASTSRV target in a chain cannot be offlined. However we can choose which target is LASTSRV in a chain, by rotating the order of targets in the chain.

TODO

upload-chains

Adding chains to the cluster.

TODO

upload-chain-table

Adding new chains to chain table, so they can start serving.

TODO

Physical placement of chunks

空间分配
DeepSeek 3FS解读与源码分析(3):Storage模块解读 - 空间分配

Storage hierarchy

deepseek-ai/3fs chunk-engine - storage hierarchy

Node
`-- 1:10~20 Disks (??t)
    `-- 1:256 Files (???g)
        `-- 1:~1k Groups (16m/128m/1g)
            ( `-- 1:256 Chunks, spreaded over Files (64k/512k/4m) )

When chunks are 512k, expecting (20 * 30t disks / 512k chunk =) ~1.2b chunks or (20 * 30t disks / 512k chunk / 256 chunks per group =) ~5m groups per node.

Allocate

TODO

Deallocate

TODO

Defragmentation

TODO