3FS - A source code perspective

Understanding 3FS from its call stacks / spans.

Reference:

I/O

Read

Client interface

FUSE

TODO

Native client

TODO

Metadata plane

For locating the corresponding server, a.k.a. storage target, of the blobs reading.

Locating chain

TODO

Locating target

TODO

Location chunk

TODO

Data plane

For reading blobs from storage target.

Networking

TLDR:

  1. Client RDMA Send / TCP send read request to target.
  2. If controlled, target RDMA Send / TCP send to client for approval to transmit blob.
  3. Target RDMA Write blob back to client.
  4. Target RDMA Send / TCP send response back to client, marking the completion of the batched read.
- client/storage                                                    ######## client (behind USRBIO) ########
    - StorageClient::createReadIO                                   # create read request
      StorageClientImpl::{read,batchRead}                           # send read request
        - batchReadWithRetry
        - batchReadWithoutRetry
        - sendBatchRequest
    - StorageMessenger::batchRead                                   # invoke rpc

- fbs/storage/Service                                               ######## rpc & serde framework ########
    - SERDE_SERVICE StorageSerde::SERDE_SERVICE_METHOD batchRead
- common/serde/ClientContext::call <batchRead>                      # send rpc request
- common/net                                                        ######## client network layer ########
    - IOWorker::sendAsync
    - Transport::send
    - IOWorker::startWriteTask
    - Transport::doWrite
        - Transport::writeAll
    - Socket::send
        * ib/IBSocket
            - postSend
                - ::ibv_post_send IBV_WR_SEND, IBV_SEND_SIGNALED
        * tcp/TcpSocket
            - ::sendmsg

...... client ---> target ......

- common/net                                                        ######## server network layer ########
    - EventLoop::loop
    - ::epoll_wait                                                  # wake on transport signal
      EventLoop::EventHandler::handleEvents
    - Transport::handleEvents
        - Socket::poll                                              # poll for recv event
            * ib/IBSocket
                - cqPoll
                    - ::ibv_poll_cq
                - wcSuccess
                - onRecved                                          # push recv body to recvBufs_
            * tcp/TcpSocket
    - IOWorker::startReadTask
    - Transport::doRead
        - Socket::recv                                              # retrieve rpc request body of the recv event
            * ib/IBSocket
                - recvBufs_.front() && recvBufs_.pop()
                  postRecv                                          # keep ib recv buffer watermark
            * tcp/TcpSocket
                - ::read
    - IOWorker::processMsg                                          # handle rpc request
      Processor::processMsg
        - unpackMsg
        - unpackSerdeMsg
        - tryToProcessSerdeRequest
        - processSerdeRequest
- common/serde                                                      # reflect rpc handler and invoke
    - CallContext::handle
    - CallContext::call <batchRead>
        - folly::coro::co_awaitTry(...)
          storage/service                                           ######## server, i.e. storage target ########
            - StorageService::batchRead                             # rpc handler
            - StorageOperator::batchRead
                - ... locate target of chain ...
                  ... allocate rdma buffer ...
                  components_.aioReadWorker.enqueue(batch)          # start async disk read (see [#retrieving-chunk-from-disk])
                  await batch.complete()                            # await for disk read done
                  ... copy result to response buffer ...
                      * SEND_DATA_INLINE
                          - batch.copyToRespBuffer(rsp.inlineBuf.data)
                      * !BYPASS_RDMAXMIT  (debugging flag, stands for "bypass rdma transmit")
                          - writeBatch: CallContext::RDMATransmission (IBV_WR_RDMA_WRITE) = (ctx: common/serde/CallContext).writeTransmission()
                                                                    # NOTE rdma is the only transmission implementation
                          batch.addBufferToBatch(writeBatch)
                          writeBatch.applyTransmission(RDMATransmissionReqTimeout)
                                                                    # apply rdma control (see [#client-centric-concurrency-control])
                              - common/net/SERDE_SERVICE RDMAControl::apply
                          ... acquire rdma device semaphore for writing out ...  (default 256 concurrency)
                          writeBatch.post()                         # write blob back to client and notify for each request in batch
                              - common/net/ib/IBSocket
                                  - RDMAReqBatch::post
                                  - rdmaBatch
                                  - rdmaPost
                                  - rdmaPostWR
                                      - ::ibv_post_send IBV_WR_RDMA_WRITE
                                          WRType::RDMA, RDMA, ..., RDMA_LAST | IBV_SEND_SIGNALED
                                                                    # NOTE the RDMA_LAST flag is used to mark the last CQ of the batched ibv_post_send
                                                                    # NOTE signal issued by RDMA Write does not bump Socket::Events
          makeResponse                                              # sends the response of the batchRead
            - Transport::send
            - ... WRType::SEND

...... client <--- target ......                                    ######## target rdma send response back to client ########

- common/net
    - ...
    - Transport::handleEvents
        - Socket::poll
            * ib/IBSocket
                - ...
                - onReceved
            * tcp/TCPSocket
    - IOWorker::startReadTask
    - Transport::doRead
        - Socket::recv
    - IOWorker::processMsg
      Processor::processMsg
        - unpackMsg
        - unpackSerdeMsg
        - Waiter::instance().post(packet, buf)                      # wake the coro awaiting on the return of rpc
- common/serde/ClientContext::call <batchRead>
  await item.baton                                                  # waked
  ...
  co_return rsp
- client/storage
    - ...
    - StorageClient::batchRead

Client-centric concurrency control

At peak load, incast congestion is observed on the client side. To mitigate this congestion, a request-to-send control mechanism is implemented in storage service and client [1]. After receiving a read request from a client, the service reads data from SSD and asks the client’s permission to transfer the data. The client limits the number of concurrent senders. When a storage service is granted the permission to transfer, it sends the data with an RDMA WRITE followed by an RDMA SEND to notify the client [2]. The request-to-send control increases end-to-end IO latency but it’s required to achieve sustainable high throughput.

-- Fire-Flyer AI-HPC: A Cost-Effective Software-Hardware Co-Design for Deep Learning #VI-B3 Key Techinical Points of 3FS


  1. E. B. Nightingale, J. Elson, J. Fan, O. Hofmann, J. Howell, and Y. Suzue, “Flat datacenter storage,” in Proceedings of the 10th USENIX Conference on Operating Systems Design and Implementation, ser. OSDI’12. USA: USENIX Association, 2012, p. 1–15.
  2. The current source code flags the last RDMA Write of a response as signaling, mitigating the trailing RDMA Send.

Takeaways

  • Why not fully implement the control on the client side, before issuing the requests?

    Wrapping the QoS point closer to the data transmission step, excluding the RDMA Sends for RPCs from the controlled scope, as those are not bandwidth-intensive.

- common/serde                                                      ######## server network layer ########
    - CallContext::RDMATransmission::applyTransmission              # request permission to write to client
- common/net/RDMAControl
    - SERDE_SERVICE RDMAControl::SERDE_SERVICE_METHOD apply

...... target ---> client ......

- ...                                                               ######## client network layer ########
- common/net::RDMAControl                                           # client service impl
    - RDMAControlImpl::apply
        - RDMATransmissionLimiter::co_await                         # limiting concurrency (default 64 concurrent transmissions)
          prepareLatency: Option<Duration> = Waiter::instance().setTransmissionLimiterPtr(req.uuid, limiter, startTs)
                                                                    # NOTE `req.uuid` is generated by the client for each request, see ClientContext::call Waiter::instance().bind(item)
                                                                    # prepareLatency is time between client constructing request and client approving data transmit
          transmissionPrepareLatency.addSample(prepareLatency)      # update metrics

...... target <--- client ......

- ...                                                               # target rdma write data to client and reply with response

...... target ---> client ......

- common/net                                                        ######## client network layer ########
    - ...
    - Processor
        - unpackSerdeMsg
        - Waiter::instance().post(packet, buf)
        - item->limiter_->signal(...)                               # release semaphore on receiving batched read rpc response

Retrieving chunk from disk

Takeaways

  • Why not SPDK?

    As cited in Client-centric concurrency control, HF prioritizes “sustainable high throughput” over lower latency. If they can get away with ingesting RDMA Send to every read request for concurrency control, the choice of coroutine + io_uring is well justified.

    Besides using a file system is just easy on us all.

    3FS minimizes context switching by batching requests, its io_urings are setup with no additional flags, i.e. not polling.

  • Why XFS for disks?

    • Logged for strong consistency
    • Optimized for large files (3FS uses append-only files for physical data placement)
    • Stable and commonly available (?)
  • Why prefer AIO over io_uring as default?

    The README of 3FS states that it supports Ubuntu 22.04, whose kernel version is 5.15. However 6.x kernel is recommended to use io_uring. In my not-so-rigorous testing (on 25 Gbps x4 eRDMA on Ubuntu 22.04), io_uring did not bring noticeable performance gain over libaio in sequential 4 MiB reads (9597 MiB/s vs 9520 MiB/s, the conclusion may change if not bounded by network).

    Or maybe SF-Zhou was just tired of replying to issues related to io_uring.

- storage/service                                                   ######## target service impl ########
    - StorageOperator::batchRead
        - ...
          components_.aioReadWorker.enqueue(batch)                  # start async disk read batch
          await batch.complete()                                    # await for disk read done

- storage/aio                                                       ######## target disk io ########
    - AioReadWorker::run                                            # async io event loop, on folly::CPUThreadPoolExecutor, default 32 threads
                                                                    # NOTE 3FS modified the impl of tpool so it essentially becomes a fixed tpool, never shrinking
        - it: AioReadJobIterator = queue_.dequeue()                 # wait on common/utils/BoundedQueue on folly::MPMCQueue<Dynamic = false> for aio job batch
                                                                    # NOTE folly::MPMCQueue<Dynamic = false> uses adaptive futex for synchronizing
          &status = use io_uring ? IoUringStatus : AioStatus        # choose which async io engine to use, libaio or liburing
                                                                    # NOTE the `IoStatus`es live within the closure provided to executor, they are exclusive to each event loop
                                                                    # NOTE `IoUringStatus` is associated with each storage target, initialized with the target's files and rdma buffers registered on start
          status.setAioReadJobIterator(it)                          # set job batch description to the executor-local engine

- storage/aio
    * AioStatus                                                     # NOTE default engine, possibly due to compatibility concerns
        - collect
            - storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
                * ChunkEngine::aioPrepareRead                       # get chunk metadata if not already present in job
                    - storage/chunk_engine
                        - cxx/Engine::get_raw_chunk                 # rust/cxx bridge
                        - core/Engine::get                          # lock the meta cache entry
                        - core/Engine::get_with_entry               # unwrap cached or create reference from allocator
                        - ... the chunk meta `Arc<Chunk>` is leaked to cxx ...
                * ChunkReplica::aioPrepareRead
                    - ... directly get from kv, uncached, not ref-counted (?) ...
              ::io_prep_pread                                       # prepare for each job
          submit
            - ::io_submit                                           # submit prepared jobs in batch
          while status.inflight()  IoStatus::reap                   # reap until no inflight jobs
            - ::io_getevents
              setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
                - job.setResult
                    - state_.chunkEngineJob.reset()                 # explicitly return `Arc<Chunk>` to rust
                        (batch_: BatchReadJob).finish(this: AioReadJob)
                        - baton_.post()                             # wake on all jobs in batch are finished
    * IoUringStatus
        - collect
            - storageTarget->aioPrepareRead(job: BatchReadJob/AioReadJob)
                - ...
              sqe = ::io_uring_get_sqe
              ::io_uring_prep_read_fixed(sqe)                       # prepare read with pre-allocated buffer
              ::io_uring_sqe_set_data(sqe, &job)                    # accompany job context
          submit
            - ::io_uring_submit
          while status.inflight()  IoStatus::reap                   # reap until no inflight jobs
            - ::io_uring_wait_cqes
              ::io_uring_for_each_cqe
                - setReadJobResult(::io_uring_cqe_get_data(cqe), cqe->res)
                    - ...
              ::io_uring_cq_advance

- storage/service                                                   ######## target service impl ########
    - StorageOperator::batchRead
        - ...
          await batch.complete()                                    # waked

Write

TODO


Recovery

TODO

Data placement

Scaling on-the-fly

Takeaways

  • In case the disk / node backing the storage target fails, its traffic is redirected to the secondary (or tertiary) target on the chains. By allocating multiple storage targets on the same disk allows the disk to be part of multiple chains, so more targets can share its redirected traffic, alleviating congestion.

  • There are rules for targets and chains:

    • A target can only be referenced by at most one chain at any given time.
    • A chain must have at least one target at any given time.
    • For any chain, its consisting targets must come from different nodes.
    • The last target of a chain cannot be removed, i.e. once a chain is created it cannot be removed.

    The rules above implies that the # of targets must be larger than or equal to the # of chains, and the # of nodes must be larger than the # of targets in the longest chain.

Getting the capacity of the file system

TLDR, the capacity of a 3FS is the sum of the capacities of all storage nodes.

The capacity has nothing to do with chain / target / replication, it is the naïve summation of all disks in the cluster. That is to say if a 3FS is 2-replicated, the used space reported by df will be slightly larger than twice of that reported by du, accounting all the chunk replicas and metadata databases.

  • Querying (cached) capacity from meta service

      - meta/store/ops                                        ######## meta service impl ########
          - StatFs::run
      - meta/components
          - FileHelper::statFs
            ... return cachedFsStatus_ ...
    
  • Refreshing FileHelper::cachedFsStatus_

      - meta/components
          - FileHelper::updateStatFs
            nodes = mgmtdClient_->getRoutingInfo()->getNodeBy(NodeType::STORAGE && active)
            ... reduce cap, free from nodes ...
    

Managing storage target’s state

offline-target

Marks a storage target as offline, it can then be safely removed from its chain.

TODO

remove-target

Removes a storage target from the cluster.

TODO

create-target

Creates a new storage target belonging to a specific chain. The target will be online and start syncing after creation.

TODO

Updating placement of a chain

update-chain

Adding or removing targets from a chain.

TODO

rotate-lastsrv

The LASTSRV target in a chain cannot be offlined. However we can choose which target is LASTSRV in a chain, by rotating the order of targets in the chain.

TODO

upload-chains

Adding chains to the cluster.

TODO

upload-chain-table

Adding new chains to chain table, so they can start serving.

TODO

Physical placement of chunks

空间分配
DeepSeek 3FS解读与源码分析(3):Storage模块解读 - 空间分配

Storage hierarchy

deepseek-ai/3fs chunk-engine - storage hierarchy

Node
`-- 1:10~20 Disks (??t)
    `-- 1:256 Files (???g)
        `-- 1:~1k Groups (16m/128m/1g)
            ( `-- 1:256 Chunks, spreaded over Files (64k/512k/4m) )

When chunks are 512k, expecting (20 * 30t disks / 512k chunk =) ~1.2b chunks or (20 * 30t disks / 512k chunk / 256 chunks per group =) ~5m groups per node.

Allocate

TODO

Deallocate

TODO

Defragmentation

TODO