snix_castore/fs/
mod.rs

1mod file_attr;
2mod inode_tracker;
3mod inodes;
4pub mod root_nodes;
5
6#[cfg(feature = "fuse")]
7pub mod fuse;
8
9#[cfg(feature = "virtiofs")]
10pub mod virtiofs;
11
12pub use self::root_nodes::RootNodes;
13use self::{
14    file_attr::ROOT_FILE_ATTR,
15    inode_tracker::InodeTracker,
16    inodes::{DirectoryInodeData, InodeData},
17};
18use crate::{
19    B3Digest, Node,
20    blobservice::{BlobReader, BlobService},
21    directoryservice::DirectoryService,
22    path::PathComponent,
23};
24use bstr::ByteVec;
25use fuse_backend_rs::api::filesystem::{
26    Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
27};
28use fuse_backend_rs::{
29    abi::fuse_abi::{Attr, OpenOptions, stat64},
30    api::filesystem::Entry,
31};
32use futures::{StreamExt, stream::BoxStream};
33use parking_lot::RwLock;
34use std::sync::Mutex;
35use std::{
36    collections::HashMap,
37    io,
38    sync::atomic::AtomicU64,
39    sync::{Arc, atomic::Ordering},
40    time::Duration,
41};
42use std::{ffi::CStr, io::Cursor};
43use tokio::io::{AsyncReadExt, AsyncSeekExt};
44use tracing::{Span, debug, error, instrument, warn};
45
46/// This implements a read-only [FileSystem] for a snix-castore
47/// with the passed [BlobService], [DirectoryService] and [RootNodes].
48///
49/// Linux uses inodes in filesystems. When implementing the trait, most calls
50/// *are for* a given inode.
51///
52/// This means, we need to have a stable mapping of inode numbers to the
53/// corresponding store nodes.
54///
55/// We internally delegate all inode allocation and state keeping to the
56/// inode tracker.
57/// We store a mapping from currently "explored" names in the root to their
58/// inode.
59///
60/// There's some places where inodes are allocated / data inserted into
61/// the inode tracker, if not allocated before already:
62///  - Processing a `lookup` request, either in the mount root, or somewhere
63///    deeper.
64///  - Processing a `readdir` request
65///
66///  Things pointing to the same contents get the same inodes, irrespective of
67///  their own location.
68///  This means:
69///  - Symlinks with the same target will get the same inode.
70///  - Regular/executable files with the same contents will get the same inode
71///  - Directories with the same contents will get the same inode.
72///
73/// Due to the above being valid across the whole store, and considering the
74/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
75/// allocation", aka reserve Directory.size inodes for each directory node we
76/// explore.
77/// Tests for this live in the snix-store crate.
78pub struct SnixStoreFs<BS, DS, RN: RootNodes> {
79    blob_service: BS,
80    directory_service: DS,
81    root_nodes_provider: RN,
82    settings: FSSettings,
83
84    /// This maps a given basename in the root to the inode we allocated for the node.
85    root_nodes: RwLock<HashMap<PathComponent, u64>>,
86
87    /// This keeps track of inodes and data alongside them.
88    inode_tracker: RwLock<InodeTracker>,
89
90    // FUTUREWORK: have a generic container type for dir/file handles and handle
91    // allocation.
92    /// This holds all opendir handles (for the root inode), keyed by the handle
93    /// returned from the opendir call.
94    /// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>.
95    /// The index is needed as we need to send offset information.
96    #[allow(clippy::type_complexity)]
97    dir_handles: RwLock<
98        HashMap<
99            u64,
100            (
101                Span,
102                Arc<Mutex<BoxStream<'static, (usize, Result<(PathComponent, Node), RN::Error>)>>>,
103            ),
104        >,
105    >,
106
107    next_dir_handle: AtomicU64,
108
109    /// This holds all open file handles
110    #[allow(clippy::type_complexity)]
111    file_handles: RwLock<HashMap<u64, (Span, Arc<Mutex<Box<dyn BlobReader>>>)>>,
112
113    next_file_handle: AtomicU64,
114
115    tokio_handle: tokio::runtime::Handle,
116}
117
118/// Configures some filesystem settings
119#[derive(Debug, Default)]
120pub struct FSSettings {
121    /// Whether to (try) listing elements in the root.
122    pub list_root: bool,
123
124    /// If uid/gid should be overridden, their values
125    pub uid_gid_override: Option<(u32, u32)>,
126
127    /// Whether to expose blob and directory digests as extended attributes.
128    pub show_xattr: bool,
129}
130
131impl<BS, DS, RN> SnixStoreFs<BS, DS, RN>
132where
133    BS: BlobService,
134    DS: DirectoryService,
135    RN: RootNodes,
136{
137    pub fn new(
138        blob_service: BS,
139        directory_service: DS,
140        root_nodes_provider: RN,
141
142        settings: FSSettings,
143        tokio_handle: tokio::runtime::Handle,
144    ) -> Self {
145        Self {
146            blob_service,
147            directory_service,
148            root_nodes_provider,
149
150            settings,
151
152            root_nodes: RwLock::new(HashMap::default()),
153            inode_tracker: RwLock::new(Default::default()),
154
155            dir_handles: RwLock::new(Default::default()),
156            next_dir_handle: AtomicU64::new(1),
157
158            file_handles: RwLock::new(Default::default()),
159            next_file_handle: AtomicU64::new(1),
160            tokio_handle,
161        }
162    }
163
164    /// Retrieves the inode for a given root node basename, if present.
165    /// This obtains a read lock on self.root_nodes.
166    fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> {
167        self.root_nodes.read().get(name).cloned()
168    }
169
170    /// For a given inode, look up the given directory behind it (from
171    /// self.inode_tracker), and return its children.
172    /// The inode_tracker MUST know about this inode already, and it MUST point
173    /// to a [InodeData::Directory].
174    /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
175    /// in self.directory_service is performed, and self.inode_tracker is updated with the
176    /// [DirectoryInodeData::Populated].
177    #[allow(clippy::type_complexity)]
178    #[instrument(skip(self), err)]
179    fn get_directory_children(
180        &self,
181        ino: u64,
182    ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> {
183        let data = self.inode_tracker.read().get(ino).unwrap();
184        match *data {
185            // if it's populated already, return children.
186            InodeData::Directory(DirectoryInodeData::Populated(parent_digest, ref children)) => {
187                Ok((parent_digest, children.clone()))
188            }
189            // if it's sparse, fetch data using directory_service, populate child nodes
190            // and update it in [self.inode_tracker].
191            InodeData::Directory(DirectoryInodeData::Sparse(parent_digest, _)) => {
192                let directory = self
193                    .tokio_handle
194                    .block_on(async { self.directory_service.get(&parent_digest).await })
195                    .map_err(|err| {
196                        warn!(%err, "error from directory service");
197                        io::Error::other(err)
198                    })?
199                    .ok_or_else(|| {
200                        warn!(directory.digest=%parent_digest, "directory not found");
201                        // If the Directory can't be found, this is a hole, bail out.
202                        io::Error::from_raw_os_error(libc::EIO)
203                    })?;
204
205                // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
206                // allocating inodes for the children on the way.
207                // FUTUREWORK: there's a bunch of cloning going on here, which we can probably avoid.
208                let children = {
209                    let mut inode_tracker = self.inode_tracker.write();
210
211                    let children: Vec<(u64, PathComponent, Node)> = directory
212                        .into_nodes()
213                        .map(|(child_name, child_node)| {
214                            let inode_data = InodeData::from_node(&child_node);
215
216                            let child_ino = inode_tracker.put(inode_data);
217                            (child_ino, child_name, child_node)
218                        })
219                        .collect();
220
221                    // replace.
222                    inode_tracker.replace(
223                        ino,
224                        Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
225                            parent_digest,
226                            children.clone(),
227                        ))),
228                    );
229
230                    children
231                };
232
233                Ok((parent_digest, children))
234            }
235            // if the parent inode was not a directory, this doesn't make sense
236            InodeData::Regular(..) | InodeData::Symlink(_) => {
237                Err(io::Error::from_raw_os_error(libc::ENOTDIR))
238            }
239        }
240    }
241
242    /// This will turn a lookup request for a name in the root to a ino and
243    /// [InodeData].
244    /// It will peek in [self.root_nodes], and then either look it up from
245    /// [self.inode_tracker],
246    /// or otherwise fetch from [self.root_nodes], and then insert into
247    /// [self.inode_tracker].
248    /// In the case the name can't be found, a libc::ENOENT is returned.
249    fn name_in_root_to_ino_and_data(
250        &self,
251        name: &PathComponent,
252    ) -> io::Result<(u64, Arc<InodeData>)> {
253        // Look up the inode for that root node.
254        // If there's one, [self.inode_tracker] MUST also contain the data,
255        // which we can then return.
256        if let Some(inode) = self.get_inode_for_root_name(name) {
257            return Ok((
258                inode,
259                self.inode_tracker
260                    .read()
261                    .get(inode)
262                    .expect("must exist")
263                    .to_owned(),
264            ));
265        }
266
267        // We don't have it yet, look it up in [self.root_nodes].
268        match self
269            .tokio_handle
270            .block_on(async { self.root_nodes_provider.get_by_basename(name).await })
271        {
272            // if there was an error looking up the root node, propagate up an IO error.
273            Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
274            // the root node doesn't exist, so the file doesn't exist.
275            Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
276            // The root node does exist
277            Ok(Some(root_node)) => {
278                // Let's check if someone else beat us to updating the inode tracker and
279                // root_nodes map. This avoids locking inode_tracker for writing.
280                if let Some(ino) = self.root_nodes.read().get(name) {
281                    return Ok((
282                        *ino,
283                        self.inode_tracker.read().get(*ino).expect("must exist"),
284                    ));
285                }
286
287                // Only in case it doesn't, lock [self.root_nodes] and
288                // [self.inode_tracker] for writing.
289                let mut root_nodes = self.root_nodes.write();
290                let mut inode_tracker = self.inode_tracker.write();
291
292                // insert the (sparse) inode data and register in
293                // self.root_nodes.
294                let inode_data = InodeData::from_node(&root_node);
295                let ino = inode_tracker.put(inode_data.clone());
296                root_nodes.insert(name.to_owned(), ino);
297
298                Ok((ino, Arc::new(inode_data)))
299            }
300        }
301    }
302
303    /// Helper function, converting a [InodeData] to [Attr],
304    /// applying uid/gid override if configured.
305    fn inode_data_to_attr(&self, inode_data: &InodeData, ino: u64) -> Attr {
306        let mode = match inode_data {
307            InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444,
308            // executable
309            InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555,
310            InodeData::Symlink(_) => libc::S_IFLNK | 0o444,
311            InodeData::Directory(_) => libc::S_IFDIR | 0o555,
312        };
313        // libc::S_IFREG, libc::S_IFLNK & libc::S_IFDIR are u32 on Linux and u16 on MacOS
314        #[cfg(target_os = "macos")]
315        let mode = mode as u32;
316        let mut attr = Attr {
317            ino,
318            // FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
319            blocks: 1024,
320            size: match inode_data {
321                InodeData::Regular(_, size, _) => *size,
322                InodeData::Symlink(target) => target.len() as u64,
323                InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size,
324                InodeData::Directory(DirectoryInodeData::Populated(_, children)) => {
325                    children.len() as u64
326                }
327            },
328            mode,
329            mtime: 1, // Everything in /nix/store must have timestamp "1".
330            ..Default::default()
331        };
332
333        if let Some((uid, gid)) = self.settings.uid_gid_override {
334            attr.uid = uid;
335            attr.gid = gid;
336        }
337
338        attr
339    }
340}
341fn attr_to_fuse_entry(attr: Attr) -> Entry {
342    Entry {
343        inode: attr.ino,
344        attr: attr.into(),
345        attr_timeout: Duration::MAX,
346        entry_timeout: Duration::MAX,
347        ..Default::default()
348    }
349}
350
351/// Returns the u32 fuse type
352fn node_to_fuse_type(node: &Node) -> u32 {
353    #[allow(clippy::let_and_return)]
354    let ty = match node {
355        Node::Directory { .. } => libc::S_IFDIR,
356        Node::File { .. } => libc::S_IFREG,
357        Node::Symlink { .. } => libc::S_IFLNK,
358    };
359    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
360    #[cfg(target_os = "macos")]
361    let ty = ty as u32;
362
363    ty
364}
365
366const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest";
367const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest";
368
369#[cfg(all(feature = "virtiofs", target_os = "linux"))]
370impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for SnixStoreFs<BS, DS, RN>
371where
372    BS: BlobService,
373    DS: DirectoryService,
374    RN: RootNodes,
375{
376    fn root_inode(&self) -> Self::Inode {
377        ROOT_ID
378    }
379}
380
381impl<BS, DS, RN> FileSystem for SnixStoreFs<BS, DS, RN>
382where
383    BS: BlobService,
384    DS: DirectoryService,
385    RN: RootNodes,
386{
387    type Handle = u64;
388    type Inode = u64;
389
390    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
391        let mut opts = FsOptions::empty();
392
393        // allow more than one pending read request per file-handle at any time
394        opts |= FsOptions::ASYNC_READ;
395
396        #[cfg(target_os = "linux")]
397        {
398            // the filesystem supports readdirplus
399            opts |= FsOptions::DO_READDIRPLUS;
400            // issue both readdir and readdirplus depending on the information expected to be required
401            opts |= FsOptions::READDIRPLUS_AUTO;
402            // allow concurrent lookup() and readdir() requests for the same directory
403            opts |= FsOptions::PARALLEL_DIROPS;
404            // have the kernel cache symlink contents
405            opts |= FsOptions::CACHE_SYMLINKS;
406        }
407        // TODO: figure out what dawrin options make sense.
408
409        Ok(opts)
410    }
411
412    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
413    fn getattr(
414        &self,
415        _ctx: &Context,
416        inode: Self::Inode,
417        _handle: Option<Self::Handle>,
418    ) -> io::Result<(stat64, Duration)> {
419        let attr = if inode == ROOT_ID {
420            ROOT_FILE_ATTR
421        } else {
422            self.inode_data_to_attr(
423                self.inode_tracker
424                    .read()
425                    .get(inode)
426                    .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?
427                    .as_ref(),
428                inode,
429            )
430        };
431
432        Ok((attr.into(), Duration::MAX))
433    }
434
435    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
436    fn lookup(
437        &self,
438        _ctx: &Context,
439        parent: Self::Inode,
440        name: &std::ffi::CStr,
441    ) -> io::Result<Entry> {
442        debug!("lookup");
443
444        // convert the CStr to a PathComponent
445        // If it can't be converted, we definitely don't have anything here.
446        let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?;
447
448        // This goes from a parent inode to a node.
449        let (ino, inode_data) = if parent == ROOT_ID {
450            // If the parent is [ROOT_ID], we need to check [self.root_nodes] (fetching from a [RootNode] provider if needed)
451            self.name_in_root_to_ino_and_data(&name)?
452        } else {
453            // else the parent must be a directory, otherwise we would never come up with this request.
454            // Lookup the parent in [self.inode_tracker] (which must be a [InodeData::Directory]), and find the child with that name.
455            let (parent_digest, children) = self.get_directory_children(parent)?;
456
457            Span::current().record("directory.digest", parent_digest.to_string());
458            // Search for that name in the list of children (which we know are sorted)
459            // and return the FileAttrs.
460            let idx = children
461                .binary_search_by_key(&&name, |(_, n, _)| n)
462                .map_err(|_| {
463                    // Child not found, return ENOENT.
464                    io::Error::from_raw_os_error(libc::ENOENT)
465                })?;
466
467            let (child_ino, _, child_node) = &children[idx];
468
469            // Reply with the file attributes for the child,
470            (*child_ino, Arc::new(InodeData::from_node(child_node)))
471        };
472
473        debug!(inode_data=?&inode_data, ino=ino, "Some");
474
475        let attr = self.inode_data_to_attr(
476            self.inode_tracker
477                .read()
478                .get(ino)
479                .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?
480                .as_ref(),
481            ino,
482        );
483        Ok(attr_to_fuse_entry(attr))
484    }
485
486    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
487    fn opendir(
488        &self,
489        _ctx: &Context,
490        inode: Self::Inode,
491        _flags: u32,
492    ) -> io::Result<(Option<Self::Handle>, OpenOptions)> {
493        // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive.
494        // For all other directory inodes we just let readdir take care of it.
495        if inode == ROOT_ID {
496            if !self.settings.list_root {
497                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
498            }
499
500            let stream = self.root_nodes_provider.list().enumerate().boxed();
501
502            // Put the stream into [self.dir_handles].
503            // TODO: this will overflow after 2**64 operations,
504            // which is fine for now.
505            // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
506            // for the discussion on alternatives.
507            let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst);
508
509            self.dir_handles
510                .write()
511                .insert(dh, (Span::current(), Arc::new(Mutex::new(stream))));
512
513            return Ok((Some(dh), OpenOptions::NONSEEKABLE));
514        }
515
516        let mut opts = OpenOptions::empty();
517
518        opts |= OpenOptions::KEEP_CACHE;
519        #[cfg(target_os = "linux")]
520        {
521            opts |= OpenOptions::CACHE_DIR;
522        }
523        // allow caching this directory contents, don't invalidate on open
524        Ok((None, opts))
525    }
526
527    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
528    fn readdir(
529        &self,
530        _ctx: &Context,
531        inode: Self::Inode,
532        handle: Self::Handle,
533        _size: u32,
534        offset: u64,
535        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
536    ) -> io::Result<()> {
537        debug!("readdir");
538
539        if inode == ROOT_ID {
540            if !self.settings.list_root {
541                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
542            }
543
544            // get the stream from [self.dir_handles]
545            let dir_handles = self.dir_handles.read();
546            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
547                warn!("dir handle {} unknown", handle);
548                io::Error::from_raw_os_error(libc::EIO)
549            })?;
550
551            let mut stream = stream
552                .lock()
553                .map_err(|_| io::Error::other("mutex poisoned"))?;
554
555            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
556                let (name, node) = n.map_err(|e| {
557                    warn!("failed to retrieve root node: {}", e);
558                    io::Error::from_raw_os_error(libc::EIO)
559                })?;
560
561                // obtain the inode, or allocate a new one.
562                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
563                    // insert the (sparse) inode data and register in
564                    // self.root_nodes.
565                    let ino = self.inode_tracker.write().put(InodeData::from_node(&node));
566                    self.root_nodes.write().insert(name.clone(), ino);
567                    ino
568                });
569
570                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
571                    ino,
572                    offset: offset + (i as u64) + 1,
573                    type_: node_to_fuse_type(&node),
574                    name: name.as_ref(),
575                })?;
576                // If the buffer is full, add_entry will return `Ok(0)`.
577                if written == 0 {
578                    break;
579                }
580            }
581            return Ok(());
582        }
583
584        // Non root-node case: lookup the children, or return an error if it's not a directory.
585        let (parent_digest, children) = self.get_directory_children(inode)?;
586        Span::current().record("directory.digest", parent_digest.to_string());
587
588        for (i, (ino, child_name, child_node)) in
589            children.into_iter().skip(offset as usize).enumerate()
590        {
591            // the second parameter will become the "offset" parameter on the next call.
592            let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
593                ino,
594                offset: offset + (i as u64) + 1,
595                type_: node_to_fuse_type(&child_node),
596                name: child_name.as_ref(),
597            })?;
598            // If the buffer is full, add_entry will return `Ok(0)`.
599            if written == 0 {
600                break;
601            }
602        }
603
604        Ok(())
605    }
606
607    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
608    fn readdirplus(
609        &self,
610        _ctx: &Context,
611        inode: Self::Inode,
612        handle: Self::Handle,
613        _size: u32,
614        offset: u64,
615        add_entry: &mut dyn FnMut(
616            fuse_backend_rs::api::filesystem::DirEntry,
617            Entry,
618        ) -> io::Result<usize>,
619    ) -> io::Result<()> {
620        debug!("readdirplus");
621
622        if inode == ROOT_ID {
623            if !self.settings.list_root {
624                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
625            }
626
627            // get the stream from [self.dir_handles]
628            let dir_handles = self.dir_handles.read();
629            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
630                warn!("dir handle {} unknown", handle);
631                io::Error::from_raw_os_error(libc::EIO)
632            })?;
633
634            let mut stream = stream
635                .lock()
636                .map_err(|_| io::Error::other("mutex poisoned"))?;
637
638            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
639                let (name, node) = n.map_err(|e| {
640                    warn!("failed to retrieve root node: {}", e);
641                    io::Error::from_raw_os_error(libc::EPERM)
642                })?;
643
644                let inode_data = InodeData::from_node(&node);
645
646                // obtain the inode, or allocate a new one.
647                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
648                    // insert the (sparse) inode data and register in
649                    // self.root_nodes.
650                    let ino = self.inode_tracker.write().put(inode_data.clone());
651                    self.root_nodes.write().insert(name.clone(), ino);
652                    ino
653                });
654
655                let written = add_entry(
656                    fuse_backend_rs::api::filesystem::DirEntry {
657                        ino,
658                        offset: offset + (i as u64) + 1,
659                        type_: node_to_fuse_type(&node),
660                        name: name.as_ref(),
661                    },
662                    attr_to_fuse_entry(self.inode_data_to_attr(&inode_data, ino)),
663                )?;
664                // If the buffer is full, add_entry will return `Ok(0)`.
665                if written == 0 {
666                    break;
667                }
668            }
669            return Ok(());
670        }
671
672        // Non root-node case: lookup the children, or return an error if it's not a directory.
673        let (parent_digest, children) = self.get_directory_children(inode)?;
674        Span::current().record("directory.digest", parent_digest.to_string());
675
676        for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
677            let inode_data = InodeData::from_node(&child_node);
678
679            // the second parameter will become the "offset" parameter on the next call.
680            let written = add_entry(
681                fuse_backend_rs::api::filesystem::DirEntry {
682                    ino,
683                    offset: offset + (i as u64) + 1,
684                    type_: node_to_fuse_type(&child_node),
685                    name: name.as_ref(),
686                },
687                attr_to_fuse_entry(self.inode_data_to_attr(&inode_data, ino)),
688            )?;
689            // If the buffer is full, add_entry will return `Ok(0)`.
690            if written == 0 {
691                break;
692            }
693        }
694
695        Ok(())
696    }
697
698    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
699    fn releasedir(
700        &self,
701        _ctx: &Context,
702        inode: Self::Inode,
703        _flags: u32,
704        handle: Self::Handle,
705    ) -> io::Result<()> {
706        if inode == ROOT_ID {
707            // drop the stream.
708            if let Some(stream) = self.dir_handles.write().remove(&handle) {
709                // drop it, which will close it.
710                drop(stream)
711            } else {
712                warn!("dir handle not found");
713            }
714        }
715
716        Ok(())
717    }
718
719    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
720    fn open(
721        &self,
722        _ctx: &Context,
723        inode: Self::Inode,
724        _flags: u32,
725        _fuse_flags: u32,
726    ) -> io::Result<(Option<Self::Handle>, OpenOptions, Option<u32>)> {
727        if inode == ROOT_ID {
728            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
729        }
730
731        // lookup the inode
732        match *self.inode_tracker.read().get(inode).unwrap() {
733            // read is invalid on non-files.
734            InodeData::Directory(..) | InodeData::Symlink(_) => {
735                warn!("is directory");
736                Err(io::Error::from_raw_os_error(libc::EISDIR))
737            }
738            InodeData::Regular(ref blob_digest, _blob_size, _) => {
739                Span::current().record("blob.digest", blob_digest.to_string());
740
741                match self
742                    .tokio_handle
743                    .block_on(async { self.blob_service.open_read(blob_digest).await })
744                {
745                    Ok(None) => {
746                        warn!("blob not found");
747                        Err(io::Error::from_raw_os_error(libc::EIO))
748                    }
749                    Err(e) => {
750                        warn!(e=?e, "error opening blob");
751                        Err(io::Error::from_raw_os_error(libc::EIO))
752                    }
753                    Ok(Some(blob_reader)) => {
754                        // get a new file handle
755                        // TODO: this will overflow after 2**64 operations,
756                        // which is fine for now.
757                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
758                        // for the discussion on alternatives.
759                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
760
761                        self.file_handles
762                            .write()
763                            .insert(fh, (Span::current(), Arc::new(Mutex::new(blob_reader))));
764
765                        Ok((
766                            Some(fh),
767                            // Don't invalidate the data cache on open.
768                            OpenOptions::KEEP_CACHE,
769                            None,
770                        ))
771                    }
772                }
773            }
774        }
775    }
776
777    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
778    fn release(
779        &self,
780        _ctx: &Context,
781        inode: Self::Inode,
782        _flags: u32,
783        handle: Self::Handle,
784        _flush: bool,
785        _flock_release: bool,
786        _lock_owner: Option<u64>,
787    ) -> io::Result<()> {
788        match self.file_handles.write().remove(&handle) {
789            // drop the blob reader, which will close it.
790            Some(blob_reader) => drop(blob_reader),
791            None => {
792                // These might already be dropped if a read error occured.
793                warn!("file handle not found");
794            }
795        }
796
797        Ok(())
798    }
799
800    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset, rq.size = size), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
801    fn read(
802        &self,
803        _ctx: &Context,
804        inode: Self::Inode,
805        handle: Self::Handle,
806        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
807        size: u32,
808        offset: u64,
809        _lock_owner: Option<u64>,
810        _flags: u32,
811    ) -> io::Result<usize> {
812        debug!("read");
813
814        // We need to take out the blob reader from self.file_handles, so we can
815        // interact with it in the separate task.
816        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
817        let (_span, blob_reader) = self
818            .file_handles
819            .read()
820            .get(&handle)
821            .ok_or_else(|| {
822                warn!("file handle {} unknown", handle);
823                io::Error::from_raw_os_error(libc::EIO)
824            })
825            .cloned()?;
826
827        let mut blob_reader = blob_reader
828            .lock()
829            .map_err(|_| io::Error::other("mutex poisoned"))?;
830
831        let buf = self.tokio_handle.block_on(async move {
832            // seek to the offset specified, which is relative to the start of the file.
833            let pos = blob_reader
834                .seek(io::SeekFrom::Start(offset))
835                .await
836                .map_err(|e| {
837                    warn!("failed to seek to offset {}: {}", offset, e);
838                    io::Error::from_raw_os_error(libc::EIO)
839                })?;
840
841            debug_assert_eq!(offset, pos);
842
843            // As written in the fuse docs, read should send exactly the number
844            // of bytes requested except on EOF or error.
845
846            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
847
848            // copy things from the internal buffer into buf to fill it till up until size
849            tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
850
851            Ok::<_, std::io::Error>(buf)
852        })?;
853
854        // We cannot use w.write() here, we're required to call write multiple
855        // times until we wrote the entirety of the buffer (which is `size`, except on EOF).
856        let buf_len = buf.len();
857        let bytes_written = io::copy(&mut Cursor::new(buf), w)?;
858        if bytes_written != buf_len as u64 {
859            error!(bytes_written=%bytes_written, "unable to write all of buf to kernel");
860            return Err(io::Error::from_raw_os_error(libc::EIO));
861        }
862
863        Ok(bytes_written as usize)
864    }
865
866    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
867    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
868        if inode == ROOT_ID {
869            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
870        }
871
872        // lookup the inode
873        match *self.inode_tracker.read().get(inode).unwrap() {
874            InodeData::Directory(..) | InodeData::Regular(..) => {
875                Err(io::Error::from_raw_os_error(libc::EINVAL))
876            }
877            InodeData::Symlink(ref target) => Ok(target.to_vec()),
878        }
879    }
880
881    #[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))]
882    fn getxattr(
883        &self,
884        _ctx: &Context,
885        inode: Self::Inode,
886        name: &CStr,
887        size: u32,
888    ) -> io::Result<GetxattrReply> {
889        if !self.settings.show_xattr {
890            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
891        }
892
893        // Peek at the inode requested, and construct the response.
894        let digest_str = match *self
895            .inode_tracker
896            .read()
897            .get(inode)
898            .ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))?
899        {
900            InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _))
901            | InodeData::Directory(DirectoryInodeData::Populated(ref digest, _))
902                if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST =>
903            {
904                digest.to_string()
905            }
906            InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => {
907                digest.to_string()
908            }
909            _ => {
910                return Err(io::Error::from_raw_os_error(libc::ENODATA));
911            }
912        };
913
914        if size == 0 {
915            Ok(GetxattrReply::Count(digest_str.len() as u32))
916        } else if size < digest_str.len() as u32 {
917            Err(io::Error::from_raw_os_error(libc::ERANGE))
918        } else {
919            Ok(GetxattrReply::Value(digest_str.into_bytes()))
920        }
921    }
922
923    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
924    fn listxattr(
925        &self,
926        _ctx: &Context,
927        inode: Self::Inode,
928        size: u32,
929    ) -> io::Result<ListxattrReply> {
930        if !self.settings.show_xattr {
931            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
932        }
933
934        // determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode.
935        let xattrs_names = {
936            let mut out = Vec::new();
937            if let Some(inode_data) = self.inode_tracker.read().get(inode) {
938                match *inode_data {
939                    InodeData::Directory(_) => {
940                        out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST);
941                        out.push_byte(b'\x00');
942                    }
943                    InodeData::Regular(..) => {
944                        out.extend_from_slice(XATTR_NAME_BLOB_DIGEST);
945                        out.push_byte(b'\x00');
946                    }
947                    _ => {}
948                }
949            }
950            out
951        };
952
953        if size == 0 {
954            Ok(ListxattrReply::Count(xattrs_names.len() as u32))
955        } else if size < xattrs_names.len() as u32 {
956            Err(io::Error::from_raw_os_error(libc::ERANGE))
957        } else {
958            Ok(ListxattrReply::Names(xattrs_names.to_vec()))
959        }
960    }
961}