snix_castore/fs/
mod.rs

1mod file_attr;
2mod inode_tracker;
3mod inodes;
4pub mod root_nodes;
5
6#[cfg(feature = "fuse")]
7pub mod fuse;
8
9#[cfg(feature = "virtiofs")]
10pub mod virtiofs;
11
12pub use self::root_nodes::RootNodes;
13use self::{
14    file_attr::ROOT_FILE_ATTR,
15    inode_tracker::InodeTracker,
16    inodes::{DirectoryInodeData, InodeData},
17};
18use crate::{
19    B3Digest, Node,
20    blobservice::{BlobReader, BlobService},
21    directoryservice::DirectoryService,
22    path::PathComponent,
23};
24use bstr::ByteVec;
25use fuse_backend_rs::api::filesystem::{
26    Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
27};
28use fuse_backend_rs::{
29    abi::fuse_abi::{Attr, OpenOptions, stat64},
30    api::filesystem::Entry,
31};
32use futures::{StreamExt, stream::BoxStream};
33use parking_lot::RwLock;
34use std::sync::Mutex;
35use std::{
36    collections::HashMap,
37    io,
38    sync::atomic::AtomicU64,
39    sync::{Arc, atomic::Ordering},
40    time::Duration,
41};
42use std::{ffi::CStr, io::Cursor};
43use tokio::io::{AsyncReadExt, AsyncSeekExt};
44use tracing::{Span, debug, error, instrument, warn};
45
46/// This implements a read-only [FileSystem] for a snix-castore
47/// with the passed [BlobService], [DirectoryService] and [RootNodes].
48///
49/// Linux uses inodes in filesystems. When implementing the trait, most calls
50/// *are for* a given inode.
51///
52/// This means, we need to have a stable mapping of inode numbers to the
53/// corresponding store nodes.
54///
55/// We internally delegate all inode allocation and state keeping to the
56/// inode tracker.
57/// We store a mapping from currently "explored" names in the root to their
58/// inode.
59///
60/// There's some places where inodes are allocated / data inserted into
61/// the inode tracker, if not allocated before already:
62///  - Processing a `lookup` request, either in the mount root, or somewhere
63///    deeper.
64///  - Processing a `readdir` request
65///
66///  Things pointing to the same contents get the same inodes, irrespective of
67///  their own location.
68///  This means:
69///  - Symlinks with the same target will get the same inode.
70///  - Regular/executable files with the same contents will get the same inode
71///  - Directories with the same contents will get the same inode.
72///
73/// Due to the above being valid across the whole store, and considering the
74/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
75/// allocation", aka reserve Directory.size inodes for each directory node we
76/// explore.
77/// Tests for this live in the snix-store crate.
78pub struct SnixStoreFs<BS, DS, RN> {
79    blob_service: BS,
80    directory_service: DS,
81    root_nodes_provider: RN,
82
83    /// Whether to (try) listing elements in the root.
84    list_root: bool,
85
86    /// If uid/gid should be overridden, their values
87    uid_gid_override: Option<(u32, u32)>,
88
89    /// Whether to expose blob and directory digests as extended attributes.
90    show_xattr: bool,
91
92    /// This maps a given basename in the root to the inode we allocated for the node.
93    root_nodes: RwLock<HashMap<PathComponent, u64>>,
94
95    /// This keeps track of inodes and data alongside them.
96    inode_tracker: RwLock<InodeTracker>,
97
98    // FUTUREWORK: have a generic container type for dir/file handles and handle
99    // allocation.
100    /// This holds all opendir handles (for the root inode), keyed by the handle
101    /// returned from the opendir call.
102    /// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>.
103    /// The index is needed as we need to send offset information.
104    #[allow(clippy::type_complexity)]
105    dir_handles: RwLock<
106        HashMap<
107            u64,
108            (
109                Span,
110                Arc<
111                    Mutex<
112                        BoxStream<
113                            'static,
114                            (usize, Result<(PathComponent, Node), root_nodes::Error>),
115                        >,
116                    >,
117                >,
118            ),
119        >,
120    >,
121
122    next_dir_handle: AtomicU64,
123
124    /// This holds all open file handles
125    #[allow(clippy::type_complexity)]
126    file_handles: RwLock<HashMap<u64, (Span, Arc<Mutex<Box<dyn BlobReader>>>)>>,
127
128    next_file_handle: AtomicU64,
129
130    tokio_handle: tokio::runtime::Handle,
131}
132
133impl<BS, DS, RN> SnixStoreFs<BS, DS, RN>
134where
135    BS: BlobService,
136    DS: DirectoryService,
137    RN: RootNodes,
138{
139    pub fn new(
140        blob_service: BS,
141        directory_service: DS,
142        root_nodes_provider: RN,
143        list_root: bool,
144        uid_gid_override: Option<(u32, u32)>,
145        show_xattr: bool,
146    ) -> Self {
147        Self {
148            blob_service,
149            directory_service,
150            root_nodes_provider,
151
152            list_root,
153            uid_gid_override,
154            show_xattr,
155
156            root_nodes: RwLock::new(HashMap::default()),
157            inode_tracker: RwLock::new(Default::default()),
158
159            dir_handles: RwLock::new(Default::default()),
160            next_dir_handle: AtomicU64::new(1),
161
162            file_handles: RwLock::new(Default::default()),
163            next_file_handle: AtomicU64::new(1),
164            tokio_handle: tokio::runtime::Handle::current(),
165        }
166    }
167
168    /// Retrieves the inode for a given root node basename, if present.
169    /// This obtains a read lock on self.root_nodes.
170    fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> {
171        self.root_nodes.read().get(name).cloned()
172    }
173
174    /// For a given inode, look up the given directory behind it (from
175    /// self.inode_tracker), and return its children.
176    /// The inode_tracker MUST know about this inode already, and it MUST point
177    /// to a [InodeData::Directory].
178    /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
179    /// in self.directory_service is performed, and self.inode_tracker is updated with the
180    /// [DirectoryInodeData::Populated].
181    #[allow(clippy::type_complexity)]
182    #[instrument(skip(self), err)]
183    fn get_directory_children(
184        &self,
185        ino: u64,
186    ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> {
187        let data = self.inode_tracker.read().get(ino).unwrap();
188        match *data {
189            // if it's populated already, return children.
190            InodeData::Directory(DirectoryInodeData::Populated(parent_digest, ref children)) => {
191                Ok((parent_digest, children.clone()))
192            }
193            // if it's sparse, fetch data using directory_service, populate child nodes
194            // and update it in [self.inode_tracker].
195            InodeData::Directory(DirectoryInodeData::Sparse(parent_digest, _)) => {
196                let directory = self
197                    .tokio_handle
198                    .block_on(async { self.directory_service.get(&parent_digest).await })
199                    .map_err(|err| {
200                        warn!(%err, "error from directory service");
201                        io::Error::other(err)
202                    })?
203                    .ok_or_else(|| {
204                        warn!(directory.digest=%parent_digest, "directory not found");
205                        // If the Directory can't be found, this is a hole, bail out.
206                        io::Error::from_raw_os_error(libc::EIO)
207                    })?;
208
209                // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
210                // allocating inodes for the children on the way.
211                // FUTUREWORK: there's a bunch of cloning going on here, which we can probably avoid.
212                let children = {
213                    let mut inode_tracker = self.inode_tracker.write();
214
215                    let children: Vec<(u64, PathComponent, Node)> = directory
216                        .into_nodes()
217                        .map(|(child_name, child_node)| {
218                            let inode_data = InodeData::from_node(&child_node);
219
220                            let child_ino = inode_tracker.put(inode_data);
221                            (child_ino, child_name, child_node)
222                        })
223                        .collect();
224
225                    // replace.
226                    inode_tracker.replace(
227                        ino,
228                        Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
229                            parent_digest,
230                            children.clone(),
231                        ))),
232                    );
233
234                    children
235                };
236
237                Ok((parent_digest, children))
238            }
239            // if the parent inode was not a directory, this doesn't make sense
240            InodeData::Regular(..) | InodeData::Symlink(_) => {
241                Err(io::Error::from_raw_os_error(libc::ENOTDIR))
242            }
243        }
244    }
245
246    /// This will turn a lookup request for a name in the root to a ino and
247    /// [InodeData].
248    /// It will peek in [self.root_nodes], and then either look it up from
249    /// [self.inode_tracker],
250    /// or otherwise fetch from [self.root_nodes], and then insert into
251    /// [self.inode_tracker].
252    /// In the case the name can't be found, a libc::ENOENT is returned.
253    fn name_in_root_to_ino_and_data(
254        &self,
255        name: &PathComponent,
256    ) -> io::Result<(u64, Arc<InodeData>)> {
257        // Look up the inode for that root node.
258        // If there's one, [self.inode_tracker] MUST also contain the data,
259        // which we can then return.
260        if let Some(inode) = self.get_inode_for_root_name(name) {
261            return Ok((
262                inode,
263                self.inode_tracker
264                    .read()
265                    .get(inode)
266                    .expect("must exist")
267                    .to_owned(),
268            ));
269        }
270
271        // We don't have it yet, look it up in [self.root_nodes].
272        match self
273            .tokio_handle
274            .block_on(async { self.root_nodes_provider.get_by_basename(name).await })
275        {
276            // if there was an error looking up the root node, propagate up an IO error.
277            Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
278            // the root node doesn't exist, so the file doesn't exist.
279            Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
280            // The root node does exist
281            Ok(Some(root_node)) => {
282                // Let's check if someone else beat us to updating the inode tracker and
283                // root_nodes map. This avoids locking inode_tracker for writing.
284                if let Some(ino) = self.root_nodes.read().get(name) {
285                    return Ok((
286                        *ino,
287                        self.inode_tracker.read().get(*ino).expect("must exist"),
288                    ));
289                }
290
291                // Only in case it doesn't, lock [self.root_nodes] and
292                // [self.inode_tracker] for writing.
293                let mut root_nodes = self.root_nodes.write();
294                let mut inode_tracker = self.inode_tracker.write();
295
296                // insert the (sparse) inode data and register in
297                // self.root_nodes.
298                let inode_data = InodeData::from_node(&root_node);
299                let ino = inode_tracker.put(inode_data.clone());
300                root_nodes.insert(name.to_owned(), ino);
301
302                Ok((ino, Arc::new(inode_data)))
303            }
304        }
305    }
306
307    /// Helper function, converting a [InodeData] to [Attr],
308    /// applying uid/gid override if configured.
309    fn inode_data_to_attr(&self, inode_data: &InodeData, ino: u64) -> Attr {
310        let mode = match inode_data {
311            InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444,
312            // executable
313            InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555,
314            InodeData::Symlink(_) => libc::S_IFLNK | 0o444,
315            InodeData::Directory(_) => libc::S_IFDIR | 0o555,
316        };
317        // libc::S_IFREG, libc::S_IFLNK & libc::S_IFDIR are u32 on Linux and u16 on MacOS
318        #[cfg(target_os = "macos")]
319        let mode = mode as u32;
320        let mut attr = Attr {
321            ino,
322            // FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
323            blocks: 1024,
324            size: match inode_data {
325                InodeData::Regular(_, size, _) => *size,
326                InodeData::Symlink(target) => target.len() as u64,
327                InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size,
328                InodeData::Directory(DirectoryInodeData::Populated(_, children)) => {
329                    children.len() as u64
330                }
331            },
332            mode,
333            mtime: 1, // Everything in /nix/store must have timestamp "1".
334            ..Default::default()
335        };
336
337        if let Some((uid, gid)) = self.uid_gid_override {
338            attr.uid = uid;
339            attr.gid = gid;
340        }
341
342        attr
343    }
344}
345fn attr_to_fuse_entry(attr: Attr) -> Entry {
346    Entry {
347        inode: attr.ino,
348        attr: attr.into(),
349        attr_timeout: Duration::MAX,
350        entry_timeout: Duration::MAX,
351        ..Default::default()
352    }
353}
354
355/// Returns the u32 fuse type
356fn node_to_fuse_type(node: &Node) -> u32 {
357    #[allow(clippy::let_and_return)]
358    let ty = match node {
359        Node::Directory { .. } => libc::S_IFDIR,
360        Node::File { .. } => libc::S_IFREG,
361        Node::Symlink { .. } => libc::S_IFLNK,
362    };
363    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
364    #[cfg(target_os = "macos")]
365    let ty = ty as u32;
366
367    ty
368}
369
370const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest";
371const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest";
372
373#[cfg(all(feature = "virtiofs", target_os = "linux"))]
374impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for SnixStoreFs<BS, DS, RN>
375where
376    BS: BlobService,
377    DS: DirectoryService,
378    RN: RootNodes,
379{
380    fn root_inode(&self) -> Self::Inode {
381        ROOT_ID
382    }
383}
384
385impl<BS, DS, RN> FileSystem for SnixStoreFs<BS, DS, RN>
386where
387    BS: BlobService,
388    DS: DirectoryService,
389    RN: RootNodes,
390{
391    type Handle = u64;
392    type Inode = u64;
393
394    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
395        let mut opts = FsOptions::empty();
396
397        // allow more than one pending read request per file-handle at any time
398        opts |= FsOptions::ASYNC_READ;
399
400        #[cfg(target_os = "linux")]
401        {
402            // the filesystem supports readdirplus
403            opts |= FsOptions::DO_READDIRPLUS;
404            // issue both readdir and readdirplus depending on the information expected to be required
405            opts |= FsOptions::READDIRPLUS_AUTO;
406            // allow concurrent lookup() and readdir() requests for the same directory
407            opts |= FsOptions::PARALLEL_DIROPS;
408            // have the kernel cache symlink contents
409            opts |= FsOptions::CACHE_SYMLINKS;
410        }
411        // TODO: figure out what dawrin options make sense.
412
413        Ok(opts)
414    }
415
416    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
417    fn getattr(
418        &self,
419        _ctx: &Context,
420        inode: Self::Inode,
421        _handle: Option<Self::Handle>,
422    ) -> io::Result<(stat64, Duration)> {
423        let attr = if inode == ROOT_ID {
424            ROOT_FILE_ATTR
425        } else {
426            self.inode_data_to_attr(
427                self.inode_tracker
428                    .read()
429                    .get(inode)
430                    .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?
431                    .as_ref(),
432                inode,
433            )
434        };
435
436        Ok((attr.into(), Duration::MAX))
437    }
438
439    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
440    fn lookup(
441        &self,
442        _ctx: &Context,
443        parent: Self::Inode,
444        name: &std::ffi::CStr,
445    ) -> io::Result<Entry> {
446        debug!("lookup");
447
448        // convert the CStr to a PathComponent
449        // If it can't be converted, we definitely don't have anything here.
450        let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?;
451
452        // This goes from a parent inode to a node.
453        let (ino, inode_data) = if parent == ROOT_ID {
454            // If the parent is [ROOT_ID], we need to check [self.root_nodes] (fetching from a [RootNode] provider if needed)
455            self.name_in_root_to_ino_and_data(&name)?
456        } else {
457            // else the parent must be a directory, otherwise we would never come up with this request.
458            // Lookup the parent in [self.inode_tracker] (which must be a [InodeData::Directory]), and find the child with that name.
459            let (parent_digest, children) = self.get_directory_children(parent)?;
460
461            Span::current().record("directory.digest", parent_digest.to_string());
462            // Search for that name in the list of children and return the FileAttrs.
463            // FUTUREWORK: we know children are sorted.
464            let (child_ino, _, child_node) = children
465                .into_iter()
466                .find(|(_, n, _)| n == &name)
467                .ok_or_else(|| {
468                    // Child not found, return ENOENT.
469                    io::Error::from_raw_os_error(libc::ENOENT)
470                })?;
471
472            // Reply with the file attributes for the child,
473            (child_ino, Arc::new(InodeData::from_node(&child_node)))
474        };
475
476        debug!(inode_data=?&inode_data, ino=ino, "Some");
477
478        let attr = self.inode_data_to_attr(
479            self.inode_tracker
480                .read()
481                .get(ino)
482                .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?
483                .as_ref(),
484            ino,
485        );
486        Ok(attr_to_fuse_entry(attr))
487    }
488
489    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
490    fn opendir(
491        &self,
492        _ctx: &Context,
493        inode: Self::Inode,
494        _flags: u32,
495    ) -> io::Result<(Option<Self::Handle>, OpenOptions)> {
496        // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive.
497        // For all other directory inodes we just let readdir take care of it.
498        if inode == ROOT_ID {
499            if !self.list_root {
500                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
501            }
502
503            let stream = self.root_nodes_provider.list().enumerate().boxed();
504
505            // Put the stream into [self.dir_handles].
506            // TODO: this will overflow after 2**64 operations,
507            // which is fine for now.
508            // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
509            // for the discussion on alternatives.
510            let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst);
511
512            self.dir_handles
513                .write()
514                .insert(dh, (Span::current(), Arc::new(Mutex::new(stream))));
515
516            return Ok((Some(dh), OpenOptions::NONSEEKABLE));
517        }
518
519        let mut opts = OpenOptions::empty();
520
521        opts |= OpenOptions::KEEP_CACHE;
522        #[cfg(target_os = "linux")]
523        {
524            opts |= OpenOptions::CACHE_DIR;
525        }
526        // allow caching this directory contents, don't invalidate on open
527        Ok((None, opts))
528    }
529
530    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
531    fn readdir(
532        &self,
533        _ctx: &Context,
534        inode: Self::Inode,
535        handle: Self::Handle,
536        _size: u32,
537        offset: u64,
538        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
539    ) -> io::Result<()> {
540        debug!("readdir");
541
542        if inode == ROOT_ID {
543            if !self.list_root {
544                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
545            }
546
547            // get the stream from [self.dir_handles]
548            let dir_handles = self.dir_handles.read();
549            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
550                warn!("dir handle {} unknown", handle);
551                io::Error::from_raw_os_error(libc::EIO)
552            })?;
553
554            let mut stream = stream
555                .lock()
556                .map_err(|_| io::Error::other("mutex poisoned"))?;
557
558            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
559                let (name, node) = n.map_err(|e| {
560                    warn!("failed to retrieve root node: {}", e);
561                    io::Error::from_raw_os_error(libc::EIO)
562                })?;
563
564                // obtain the inode, or allocate a new one.
565                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
566                    // insert the (sparse) inode data and register in
567                    // self.root_nodes.
568                    let ino = self.inode_tracker.write().put(InodeData::from_node(&node));
569                    self.root_nodes.write().insert(name.clone(), ino);
570                    ino
571                });
572
573                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
574                    ino,
575                    offset: offset + (i as u64) + 1,
576                    type_: node_to_fuse_type(&node),
577                    name: name.as_ref(),
578                })?;
579                // If the buffer is full, add_entry will return `Ok(0)`.
580                if written == 0 {
581                    break;
582                }
583            }
584            return Ok(());
585        }
586
587        // Non root-node case: lookup the children, or return an error if it's not a directory.
588        let (parent_digest, children) = self.get_directory_children(inode)?;
589        Span::current().record("directory.digest", parent_digest.to_string());
590
591        for (i, (ino, child_name, child_node)) in
592            children.into_iter().skip(offset as usize).enumerate()
593        {
594            // the second parameter will become the "offset" parameter on the next call.
595            let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
596                ino,
597                offset: offset + (i as u64) + 1,
598                type_: node_to_fuse_type(&child_node),
599                name: child_name.as_ref(),
600            })?;
601            // If the buffer is full, add_entry will return `Ok(0)`.
602            if written == 0 {
603                break;
604            }
605        }
606
607        Ok(())
608    }
609
610    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
611    fn readdirplus(
612        &self,
613        _ctx: &Context,
614        inode: Self::Inode,
615        handle: Self::Handle,
616        _size: u32,
617        offset: u64,
618        add_entry: &mut dyn FnMut(
619            fuse_backend_rs::api::filesystem::DirEntry,
620            Entry,
621        ) -> io::Result<usize>,
622    ) -> io::Result<()> {
623        debug!("readdirplus");
624
625        if inode == ROOT_ID {
626            if !self.list_root {
627                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
628            }
629
630            // get the stream from [self.dir_handles]
631            let dir_handles = self.dir_handles.read();
632            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
633                warn!("dir handle {} unknown", handle);
634                io::Error::from_raw_os_error(libc::EIO)
635            })?;
636
637            let mut stream = stream
638                .lock()
639                .map_err(|_| io::Error::other("mutex poisoned"))?;
640
641            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
642                let (name, node) = n.map_err(|e| {
643                    warn!("failed to retrieve root node: {}", e);
644                    io::Error::from_raw_os_error(libc::EPERM)
645                })?;
646
647                let inode_data = InodeData::from_node(&node);
648
649                // obtain the inode, or allocate a new one.
650                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
651                    // insert the (sparse) inode data and register in
652                    // self.root_nodes.
653                    let ino = self.inode_tracker.write().put(inode_data.clone());
654                    self.root_nodes.write().insert(name.clone(), ino);
655                    ino
656                });
657
658                let written = add_entry(
659                    fuse_backend_rs::api::filesystem::DirEntry {
660                        ino,
661                        offset: offset + (i as u64) + 1,
662                        type_: node_to_fuse_type(&node),
663                        name: name.as_ref(),
664                    },
665                    attr_to_fuse_entry(self.inode_data_to_attr(&inode_data, ino)),
666                )?;
667                // If the buffer is full, add_entry will return `Ok(0)`.
668                if written == 0 {
669                    break;
670                }
671            }
672            return Ok(());
673        }
674
675        // Non root-node case: lookup the children, or return an error if it's not a directory.
676        let (parent_digest, children) = self.get_directory_children(inode)?;
677        Span::current().record("directory.digest", parent_digest.to_string());
678
679        for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
680            let inode_data = InodeData::from_node(&child_node);
681
682            // the second parameter will become the "offset" parameter on the next call.
683            let written = add_entry(
684                fuse_backend_rs::api::filesystem::DirEntry {
685                    ino,
686                    offset: offset + (i as u64) + 1,
687                    type_: node_to_fuse_type(&child_node),
688                    name: name.as_ref(),
689                },
690                attr_to_fuse_entry(self.inode_data_to_attr(&inode_data, ino)),
691            )?;
692            // If the buffer is full, add_entry will return `Ok(0)`.
693            if written == 0 {
694                break;
695            }
696        }
697
698        Ok(())
699    }
700
701    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
702    fn releasedir(
703        &self,
704        _ctx: &Context,
705        inode: Self::Inode,
706        _flags: u32,
707        handle: Self::Handle,
708    ) -> io::Result<()> {
709        if inode == ROOT_ID {
710            // drop the stream.
711            if let Some(stream) = self.dir_handles.write().remove(&handle) {
712                // drop it, which will close it.
713                drop(stream)
714            } else {
715                warn!("dir handle not found");
716            }
717        }
718
719        Ok(())
720    }
721
722    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
723    fn open(
724        &self,
725        _ctx: &Context,
726        inode: Self::Inode,
727        _flags: u32,
728        _fuse_flags: u32,
729    ) -> io::Result<(Option<Self::Handle>, OpenOptions, Option<u32>)> {
730        if inode == ROOT_ID {
731            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
732        }
733
734        // lookup the inode
735        match *self.inode_tracker.read().get(inode).unwrap() {
736            // read is invalid on non-files.
737            InodeData::Directory(..) | InodeData::Symlink(_) => {
738                warn!("is directory");
739                Err(io::Error::from_raw_os_error(libc::EISDIR))
740            }
741            InodeData::Regular(ref blob_digest, _blob_size, _) => {
742                Span::current().record("blob.digest", blob_digest.to_string());
743
744                match self
745                    .tokio_handle
746                    .block_on(async { self.blob_service.open_read(blob_digest).await })
747                {
748                    Ok(None) => {
749                        warn!("blob not found");
750                        Err(io::Error::from_raw_os_error(libc::EIO))
751                    }
752                    Err(e) => {
753                        warn!(e=?e, "error opening blob");
754                        Err(io::Error::from_raw_os_error(libc::EIO))
755                    }
756                    Ok(Some(blob_reader)) => {
757                        // get a new file handle
758                        // TODO: this will overflow after 2**64 operations,
759                        // which is fine for now.
760                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
761                        // for the discussion on alternatives.
762                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
763
764                        self.file_handles
765                            .write()
766                            .insert(fh, (Span::current(), Arc::new(Mutex::new(blob_reader))));
767
768                        Ok((
769                            Some(fh),
770                            // Don't invalidate the data cache on open.
771                            OpenOptions::KEEP_CACHE,
772                            None,
773                        ))
774                    }
775                }
776            }
777        }
778    }
779
780    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
781    fn release(
782        &self,
783        _ctx: &Context,
784        inode: Self::Inode,
785        _flags: u32,
786        handle: Self::Handle,
787        _flush: bool,
788        _flock_release: bool,
789        _lock_owner: Option<u64>,
790    ) -> io::Result<()> {
791        match self.file_handles.write().remove(&handle) {
792            // drop the blob reader, which will close it.
793            Some(blob_reader) => drop(blob_reader),
794            None => {
795                // These might already be dropped if a read error occured.
796                warn!("file handle not found");
797            }
798        }
799
800        Ok(())
801    }
802
803    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset, rq.size = size), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
804    fn read(
805        &self,
806        _ctx: &Context,
807        inode: Self::Inode,
808        handle: Self::Handle,
809        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
810        size: u32,
811        offset: u64,
812        _lock_owner: Option<u64>,
813        _flags: u32,
814    ) -> io::Result<usize> {
815        debug!("read");
816
817        // We need to take out the blob reader from self.file_handles, so we can
818        // interact with it in the separate task.
819        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
820        let (_span, blob_reader) = self
821            .file_handles
822            .read()
823            .get(&handle)
824            .ok_or_else(|| {
825                warn!("file handle {} unknown", handle);
826                io::Error::from_raw_os_error(libc::EIO)
827            })
828            .cloned()?;
829
830        let mut blob_reader = blob_reader
831            .lock()
832            .map_err(|_| io::Error::other("mutex poisoned"))?;
833
834        let buf = self.tokio_handle.block_on(async move {
835            // seek to the offset specified, which is relative to the start of the file.
836            let pos = blob_reader
837                .seek(io::SeekFrom::Start(offset))
838                .await
839                .map_err(|e| {
840                    warn!("failed to seek to offset {}: {}", offset, e);
841                    io::Error::from_raw_os_error(libc::EIO)
842                })?;
843
844            debug_assert_eq!(offset, pos);
845
846            // As written in the fuse docs, read should send exactly the number
847            // of bytes requested except on EOF or error.
848
849            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
850
851            // copy things from the internal buffer into buf to fill it till up until size
852            tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
853
854            Ok::<_, std::io::Error>(buf)
855        })?;
856
857        // We cannot use w.write() here, we're required to call write multiple
858        // times until we wrote the entirety of the buffer (which is `size`, except on EOF).
859        let buf_len = buf.len();
860        let bytes_written = io::copy(&mut Cursor::new(buf), w)?;
861        if bytes_written != buf_len as u64 {
862            error!(bytes_written=%bytes_written, "unable to write all of buf to kernel");
863            return Err(io::Error::from_raw_os_error(libc::EIO));
864        }
865
866        Ok(bytes_written as usize)
867    }
868
869    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
870    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
871        if inode == ROOT_ID {
872            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
873        }
874
875        // lookup the inode
876        match *self.inode_tracker.read().get(inode).unwrap() {
877            InodeData::Directory(..) | InodeData::Regular(..) => {
878                Err(io::Error::from_raw_os_error(libc::EINVAL))
879            }
880            InodeData::Symlink(ref target) => Ok(target.to_vec()),
881        }
882    }
883
884    #[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))]
885    fn getxattr(
886        &self,
887        _ctx: &Context,
888        inode: Self::Inode,
889        name: &CStr,
890        size: u32,
891    ) -> io::Result<GetxattrReply> {
892        if !self.show_xattr {
893            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
894        }
895
896        // Peek at the inode requested, and construct the response.
897        let digest_str = match *self
898            .inode_tracker
899            .read()
900            .get(inode)
901            .ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))?
902        {
903            InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _))
904            | InodeData::Directory(DirectoryInodeData::Populated(ref digest, _))
905                if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST =>
906            {
907                digest.to_string()
908            }
909            InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => {
910                digest.to_string()
911            }
912            _ => {
913                return Err(io::Error::from_raw_os_error(libc::ENODATA));
914            }
915        };
916
917        if size == 0 {
918            Ok(GetxattrReply::Count(digest_str.len() as u32))
919        } else if size < digest_str.len() as u32 {
920            Err(io::Error::from_raw_os_error(libc::ERANGE))
921        } else {
922            Ok(GetxattrReply::Value(digest_str.into_bytes()))
923        }
924    }
925
926    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
927    fn listxattr(
928        &self,
929        _ctx: &Context,
930        inode: Self::Inode,
931        size: u32,
932    ) -> io::Result<ListxattrReply> {
933        if !self.show_xattr {
934            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
935        }
936
937        // determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode.
938        let xattrs_names = {
939            let mut out = Vec::new();
940            if let Some(inode_data) = self.inode_tracker.read().get(inode) {
941                match *inode_data {
942                    InodeData::Directory(_) => {
943                        out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST);
944                        out.push_byte(b'\x00');
945                    }
946                    InodeData::Regular(..) => {
947                        out.extend_from_slice(XATTR_NAME_BLOB_DIGEST);
948                        out.push_byte(b'\x00');
949                    }
950                    _ => {}
951                }
952            }
953            out
954        };
955
956        if size == 0 {
957            Ok(ListxattrReply::Count(xattrs_names.len() as u32))
958        } else if size < xattrs_names.len() as u32 {
959            Err(io::Error::from_raw_os_error(libc::ERANGE))
960        } else {
961            Ok(ListxattrReply::Names(xattrs_names.to_vec()))
962        }
963    }
964}