snix_castore/fs/
mod.rs

1mod file_attr;
2mod inode_tracker;
3mod inodes;
4mod root_nodes;
5
6#[cfg(feature = "fuse")]
7pub mod fuse;
8
9#[cfg(feature = "virtiofs")]
10pub mod virtiofs;
11
12pub use self::root_nodes::RootNodes;
13use self::{
14    file_attr::ROOT_FILE_ATTR,
15    inode_tracker::InodeTracker,
16    inodes::{DirectoryInodeData, InodeData},
17};
18use crate::{
19    B3Digest, Node,
20    blobservice::{BlobReader, BlobService},
21    directoryservice::DirectoryService,
22    path::PathComponent,
23};
24use bstr::ByteVec;
25use fuse_backend_rs::abi::fuse_abi::{OpenOptions, stat64};
26use fuse_backend_rs::api::filesystem::{
27    Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
28};
29use futures::{StreamExt, stream::BoxStream};
30use parking_lot::RwLock;
31use std::sync::Mutex;
32use std::{
33    collections::HashMap,
34    io,
35    sync::atomic::AtomicU64,
36    sync::{Arc, atomic::Ordering},
37    time::Duration,
38};
39use std::{ffi::CStr, io::Cursor};
40use tokio::io::{AsyncReadExt, AsyncSeekExt};
41use tracing::{Span, debug, error, instrument, warn};
42
43/// This implements a read-only FUSE filesystem for a snix-store
44/// with the passed [BlobService], [DirectoryService] and [RootNodes].
45///
46/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
47/// *for* a given inode.
48///
49/// This means, we need to have a stable mapping of inode numbers to the
50/// corresponding store nodes.
51///
52/// We internally delegate all inode allocation and state keeping to the
53/// inode tracker.
54/// We store a mapping from currently "explored" names in the root to their
55/// inode.
56///
57/// There's some places where inodes are allocated / data inserted into
58/// the inode tracker, if not allocated before already:
59///  - Processing a `lookup` request, either in the mount root, or somewhere
60///    deeper.
61///  - Processing a `readdir` request
62///
63///  Things pointing to the same contents get the same inodes, irrespective of
64///  their own location.
65///  This means:
66///  - Symlinks with the same target will get the same inode.
67///  - Regular/executable files with the same contents will get the same inode
68///  - Directories with the same contents will get the same inode.
69///
70/// Due to the above being valid across the whole store, and considering the
71/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
72/// allocation", aka reserve Directory.size inodes for each directory node we
73/// explore.
74/// Tests for this live in the snix-store crate.
75pub struct SnixStoreFs<BS, DS, RN> {
76    blob_service: BS,
77    directory_service: DS,
78    root_nodes_provider: RN,
79
80    /// Whether to (try) listing elements in the root.
81    list_root: bool,
82
83    /// Whether to expose blob and directory digests as extended attributes.
84    show_xattr: bool,
85
86    /// This maps a given basename in the root to the inode we allocated for the node.
87    root_nodes: RwLock<HashMap<PathComponent, u64>>,
88
89    /// This keeps track of inodes and data alongside them.
90    inode_tracker: RwLock<InodeTracker>,
91
92    // FUTUREWORK: have a generic container type for dir/file handles and handle
93    // allocation.
94    /// This holds all opendir handles (for the root inode), keyed by the handle
95    /// returned from the opendir call.
96    /// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>.
97    /// The index is needed as we need to send offset information.
98    #[allow(clippy::type_complexity)]
99    dir_handles: RwLock<
100        HashMap<
101            u64,
102            (
103                Span,
104                Arc<
105                    Mutex<BoxStream<'static, (usize, Result<(PathComponent, Node), crate::Error>)>>,
106                >,
107            ),
108        >,
109    >,
110
111    next_dir_handle: AtomicU64,
112
113    /// This holds all open file handles
114    #[allow(clippy::type_complexity)]
115    file_handles: RwLock<HashMap<u64, (Span, Arc<Mutex<Box<dyn BlobReader>>>)>>,
116
117    next_file_handle: AtomicU64,
118
119    tokio_handle: tokio::runtime::Handle,
120}
121
122impl<BS, DS, RN> SnixStoreFs<BS, DS, RN>
123where
124    BS: BlobService,
125    DS: DirectoryService,
126    RN: RootNodes,
127{
128    pub fn new(
129        blob_service: BS,
130        directory_service: DS,
131        root_nodes_provider: RN,
132        list_root: bool,
133        show_xattr: bool,
134    ) -> Self {
135        Self {
136            blob_service,
137            directory_service,
138            root_nodes_provider,
139
140            list_root,
141            show_xattr,
142
143            root_nodes: RwLock::new(HashMap::default()),
144            inode_tracker: RwLock::new(Default::default()),
145
146            dir_handles: RwLock::new(Default::default()),
147            next_dir_handle: AtomicU64::new(1),
148
149            file_handles: RwLock::new(Default::default()),
150            next_file_handle: AtomicU64::new(1),
151            tokio_handle: tokio::runtime::Handle::current(),
152        }
153    }
154
155    /// Retrieves the inode for a given root node basename, if present.
156    /// This obtains a read lock on self.root_nodes.
157    fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> {
158        self.root_nodes.read().get(name).cloned()
159    }
160
161    /// For a given inode, look up the given directory behind it (from
162    /// self.inode_tracker), and return its children.
163    /// The inode_tracker MUST know about this inode already, and it MUST point
164    /// to a [InodeData::Directory].
165    /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
166    /// in self.directory_service is performed, and self.inode_tracker is updated with the
167    /// [DirectoryInodeData::Populated].
168    #[allow(clippy::type_complexity)]
169    #[instrument(skip(self), err)]
170    fn get_directory_children(
171        &self,
172        ino: u64,
173    ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> {
174        let data = self.inode_tracker.read().get(ino).unwrap();
175        match *data {
176            // if it's populated already, return children.
177            InodeData::Directory(DirectoryInodeData::Populated(
178                ref parent_digest,
179                ref children,
180            )) => Ok((parent_digest.clone(), children.clone())),
181            // if it's sparse, fetch data using directory_service, populate child nodes
182            // and update it in [self.inode_tracker].
183            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
184                let directory = self
185                    .tokio_handle
186                    .block_on(async { self.directory_service.get(parent_digest).await })?
187                    .ok_or_else(|| {
188                        warn!(directory.digest=%parent_digest, "directory not found");
189                        // If the Directory can't be found, this is a hole, bail out.
190                        io::Error::from_raw_os_error(libc::EIO)
191                    })?;
192
193                // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
194                // allocating inodes for the children on the way.
195                // FUTUREWORK: there's a bunch of cloning going on here, which we can probably avoid.
196                let children = {
197                    let mut inode_tracker = self.inode_tracker.write();
198
199                    let children: Vec<(u64, PathComponent, Node)> = directory
200                        .into_nodes()
201                        .map(|(child_name, child_node)| {
202                            let inode_data = InodeData::from_node(&child_node);
203
204                            let child_ino = inode_tracker.put(inode_data);
205                            (child_ino, child_name, child_node)
206                        })
207                        .collect();
208
209                    // replace.
210                    inode_tracker.replace(
211                        ino,
212                        Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
213                            parent_digest.clone(),
214                            children.clone(),
215                        ))),
216                    );
217
218                    children
219                };
220
221                Ok((parent_digest.clone(), children))
222            }
223            // if the parent inode was not a directory, this doesn't make sense
224            InodeData::Regular(..) | InodeData::Symlink(_) => {
225                Err(io::Error::from_raw_os_error(libc::ENOTDIR))
226            }
227        }
228    }
229
230    /// This will turn a lookup request for a name in the root to a ino and
231    /// [InodeData].
232    /// It will peek in [self.root_nodes], and then either look it up from
233    /// [self.inode_tracker],
234    /// or otherwise fetch from [self.root_nodes], and then insert into
235    /// [self.inode_tracker].
236    /// In the case the name can't be found, a libc::ENOENT is returned.
237    fn name_in_root_to_ino_and_data(
238        &self,
239        name: &PathComponent,
240    ) -> io::Result<(u64, Arc<InodeData>)> {
241        // Look up the inode for that root node.
242        // If there's one, [self.inode_tracker] MUST also contain the data,
243        // which we can then return.
244        if let Some(inode) = self.get_inode_for_root_name(name) {
245            return Ok((
246                inode,
247                self.inode_tracker
248                    .read()
249                    .get(inode)
250                    .expect("must exist")
251                    .to_owned(),
252            ));
253        }
254
255        // We don't have it yet, look it up in [self.root_nodes].
256        match self
257            .tokio_handle
258            .block_on(async { self.root_nodes_provider.get_by_basename(name).await })
259        {
260            // if there was an error looking up the root node, propagate up an IO error.
261            Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
262            // the root node doesn't exist, so the file doesn't exist.
263            Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
264            // The root node does exist
265            Ok(Some(root_node)) => {
266                // Let's check if someone else beat us to updating the inode tracker and
267                // root_nodes map. This avoids locking inode_tracker for writing.
268                if let Some(ino) = self.root_nodes.read().get(name) {
269                    return Ok((
270                        *ino,
271                        self.inode_tracker.read().get(*ino).expect("must exist"),
272                    ));
273                }
274
275                // Only in case it doesn't, lock [self.root_nodes] and
276                // [self.inode_tracker] for writing.
277                let mut root_nodes = self.root_nodes.write();
278                let mut inode_tracker = self.inode_tracker.write();
279
280                // insert the (sparse) inode data and register in
281                // self.root_nodes.
282                let inode_data = InodeData::from_node(&root_node);
283                let ino = inode_tracker.put(inode_data.clone());
284                root_nodes.insert(name.to_owned(), ino);
285
286                Ok((ino, Arc::new(inode_data)))
287            }
288        }
289    }
290}
291
292const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest";
293const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest";
294
295#[cfg(all(feature = "virtiofs", target_os = "linux"))]
296impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for SnixStoreFs<BS, DS, RN>
297where
298    BS: BlobService,
299    DS: DirectoryService,
300    RN: RootNodes,
301{
302    fn root_inode(&self) -> Self::Inode {
303        ROOT_ID
304    }
305}
306
307impl<BS, DS, RN> FileSystem for SnixStoreFs<BS, DS, RN>
308where
309    BS: BlobService,
310    DS: DirectoryService,
311    RN: RootNodes,
312{
313    type Handle = u64;
314    type Inode = u64;
315
316    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
317        let mut opts = FsOptions::empty();
318
319        // allow more than one pending read request per file-handle at any time
320        opts |= FsOptions::ASYNC_READ;
321
322        #[cfg(target_os = "linux")]
323        {
324            // the filesystem supports readdirplus
325            opts |= FsOptions::DO_READDIRPLUS;
326            // issue both readdir and readdirplus depending on the information expected to be required
327            opts |= FsOptions::READDIRPLUS_AUTO;
328            // allow concurrent lookup() and readdir() requests for the same directory
329            opts |= FsOptions::PARALLEL_DIROPS;
330            // have the kernel cache symlink contents
331            opts |= FsOptions::CACHE_SYMLINKS;
332        }
333        // TODO: figure out what dawrin options make sense.
334
335        Ok(opts)
336    }
337
338    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
339    fn getattr(
340        &self,
341        _ctx: &Context,
342        inode: Self::Inode,
343        _handle: Option<Self::Handle>,
344    ) -> io::Result<(stat64, Duration)> {
345        if inode == ROOT_ID {
346            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
347        }
348
349        match self.inode_tracker.read().get(inode) {
350            None => Err(io::Error::from_raw_os_error(libc::ENOENT)),
351            Some(inode_data) => {
352                debug!(inode_data = ?inode_data, "found node");
353                Ok((inode_data.as_fuse_file_attr(inode).into(), Duration::MAX))
354            }
355        }
356    }
357
358    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
359    fn lookup(
360        &self,
361        _ctx: &Context,
362        parent: Self::Inode,
363        name: &std::ffi::CStr,
364    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
365        debug!("lookup");
366
367        // convert the CStr to a PathComponent
368        // If it can't be converted, we definitely don't have anything here.
369        let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?;
370
371        // This goes from a parent inode to a node.
372        // - If the parent is [ROOT_ID], we need to check
373        //   [self.root_nodes] (fetching from a [RootNode] provider if needed)
374        // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
375        //   a [InodeData::Directory]), and find the child with that name.
376        if parent == ROOT_ID {
377            let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?;
378
379            debug!(inode_data=?&inode_data, ino=ino, "Some");
380            return Ok(inode_data.as_fuse_entry(ino));
381        }
382        // This is the "lookup for "a" inside inode 42.
383        // We already know that inode 42 must be a directory.
384        let (parent_digest, children) = self.get_directory_children(parent)?;
385
386        Span::current().record("directory.digest", parent_digest.to_string());
387        // Search for that name in the list of children and return the FileAttrs.
388
389        // in the children, find the one with the desired name.
390        if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) {
391            // lookup the child [InodeData] in [self.inode_tracker].
392            // We know the inodes for children have already been allocated.
393            let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
394
395            // Reply with the file attributes for the child.
396            // For child directories, we still have all data we need to reply.
397            Ok(child_inode_data.as_fuse_entry(*child_ino))
398        } else {
399            // Child not found, return ENOENT.
400            Err(io::Error::from_raw_os_error(libc::ENOENT))
401        }
402    }
403
404    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
405    fn opendir(
406        &self,
407        _ctx: &Context,
408        inode: Self::Inode,
409        _flags: u32,
410    ) -> io::Result<(Option<Self::Handle>, OpenOptions)> {
411        // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive.
412        // For all other directory inodes we just let readdir take care of it.
413        if inode == ROOT_ID {
414            if !self.list_root {
415                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
416            }
417
418            let stream = self.root_nodes_provider.list().enumerate().boxed();
419
420            // Put the stream into [self.dir_handles].
421            // TODO: this will overflow after 2**64 operations,
422            // which is fine for now.
423            // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
424            // for the discussion on alternatives.
425            let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst);
426
427            self.dir_handles
428                .write()
429                .insert(dh, (Span::current(), Arc::new(Mutex::new(stream))));
430
431            return Ok((Some(dh), OpenOptions::NONSEEKABLE));
432        }
433
434        let mut opts = OpenOptions::empty();
435
436        opts |= OpenOptions::KEEP_CACHE;
437        #[cfg(target_os = "linux")]
438        {
439            opts |= OpenOptions::CACHE_DIR;
440        }
441        // allow caching this directory contents, don't invalidate on open
442        Ok((None, opts))
443    }
444
445    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
446    fn readdir(
447        &self,
448        _ctx: &Context,
449        inode: Self::Inode,
450        handle: Self::Handle,
451        _size: u32,
452        offset: u64,
453        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
454    ) -> io::Result<()> {
455        debug!("readdir");
456
457        if inode == ROOT_ID {
458            if !self.list_root {
459                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
460            }
461
462            // get the stream from [self.dir_handles]
463            let dir_handles = self.dir_handles.read();
464            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
465                warn!("dir handle {} unknown", handle);
466                io::Error::from_raw_os_error(libc::EIO)
467            })?;
468
469            let mut stream = stream
470                .lock()
471                .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
472
473            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
474                let (name, node) = n.map_err(|e| {
475                    warn!("failed to retrieve root node: {}", e);
476                    io::Error::from_raw_os_error(libc::EIO)
477                })?;
478
479                let inode_data = InodeData::from_node(&node);
480
481                // obtain the inode, or allocate a new one.
482                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
483                    // insert the (sparse) inode data and register in
484                    // self.root_nodes.
485                    let ino = self.inode_tracker.write().put(inode_data.clone());
486                    self.root_nodes.write().insert(name.clone(), ino);
487                    ino
488                });
489
490                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
491                    ino,
492                    offset: offset + (i as u64) + 1,
493                    type_: inode_data.as_fuse_type(),
494                    name: name.as_ref(),
495                })?;
496                // If the buffer is full, add_entry will return `Ok(0)`.
497                if written == 0 {
498                    break;
499                }
500            }
501            return Ok(());
502        }
503
504        // Non root-node case: lookup the children, or return an error if it's not a directory.
505        let (parent_digest, children) = self.get_directory_children(inode)?;
506        Span::current().record("directory.digest", parent_digest.to_string());
507
508        for (i, (ino, child_name, child_node)) in
509            children.into_iter().skip(offset as usize).enumerate()
510        {
511            let inode_data = InodeData::from_node(&child_node);
512
513            // the second parameter will become the "offset" parameter on the next call.
514            let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
515                ino,
516                offset: offset + (i as u64) + 1,
517                type_: inode_data.as_fuse_type(),
518                name: child_name.as_ref(),
519            })?;
520            // If the buffer is full, add_entry will return `Ok(0)`.
521            if written == 0 {
522                break;
523            }
524        }
525
526        Ok(())
527    }
528
529    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
530    fn readdirplus(
531        &self,
532        _ctx: &Context,
533        inode: Self::Inode,
534        handle: Self::Handle,
535        _size: u32,
536        offset: u64,
537        add_entry: &mut dyn FnMut(
538            fuse_backend_rs::api::filesystem::DirEntry,
539            fuse_backend_rs::api::filesystem::Entry,
540        ) -> io::Result<usize>,
541    ) -> io::Result<()> {
542        debug!("readdirplus");
543
544        if inode == ROOT_ID {
545            if !self.list_root {
546                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
547            }
548
549            // get the stream from [self.dir_handles]
550            let dir_handles = self.dir_handles.read();
551            let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
552                warn!("dir handle {} unknown", handle);
553                io::Error::from_raw_os_error(libc::EIO)
554            })?;
555
556            let mut stream = stream
557                .lock()
558                .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
559
560            while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
561                let (name, node) = n.map_err(|e| {
562                    warn!("failed to retrieve root node: {}", e);
563                    io::Error::from_raw_os_error(libc::EPERM)
564                })?;
565
566                let inode_data = InodeData::from_node(&node);
567
568                // obtain the inode, or allocate a new one.
569                let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
570                    // insert the (sparse) inode data and register in
571                    // self.root_nodes.
572                    let ino = self.inode_tracker.write().put(inode_data.clone());
573                    self.root_nodes.write().insert(name.clone(), ino);
574                    ino
575                });
576
577                let written = add_entry(
578                    fuse_backend_rs::api::filesystem::DirEntry {
579                        ino,
580                        offset: offset + (i as u64) + 1,
581                        type_: inode_data.as_fuse_type(),
582                        name: name.as_ref(),
583                    },
584                    inode_data.as_fuse_entry(ino),
585                )?;
586                // If the buffer is full, add_entry will return `Ok(0)`.
587                if written == 0 {
588                    break;
589                }
590            }
591            return Ok(());
592        }
593
594        // Non root-node case: lookup the children, or return an error if it's not a directory.
595        let (parent_digest, children) = self.get_directory_children(inode)?;
596        Span::current().record("directory.digest", parent_digest.to_string());
597
598        for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
599            let inode_data = InodeData::from_node(&child_node);
600
601            // the second parameter will become the "offset" parameter on the next call.
602            let written = add_entry(
603                fuse_backend_rs::api::filesystem::DirEntry {
604                    ino,
605                    offset: offset + (i as u64) + 1,
606                    type_: inode_data.as_fuse_type(),
607                    name: name.as_ref(),
608                },
609                inode_data.as_fuse_entry(ino),
610            )?;
611            // If the buffer is full, add_entry will return `Ok(0)`.
612            if written == 0 {
613                break;
614            }
615        }
616
617        Ok(())
618    }
619
620    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))]
621    fn releasedir(
622        &self,
623        _ctx: &Context,
624        inode: Self::Inode,
625        _flags: u32,
626        handle: Self::Handle,
627    ) -> io::Result<()> {
628        if inode == ROOT_ID {
629            // drop the stream.
630            if let Some(stream) = self.dir_handles.write().remove(&handle) {
631                // drop it, which will close it.
632                drop(stream)
633            } else {
634                warn!("dir handle not found");
635            }
636        }
637
638        Ok(())
639    }
640
641    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
642    fn open(
643        &self,
644        _ctx: &Context,
645        inode: Self::Inode,
646        _flags: u32,
647        _fuse_flags: u32,
648    ) -> io::Result<(Option<Self::Handle>, OpenOptions, Option<u32>)> {
649        if inode == ROOT_ID {
650            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
651        }
652
653        // lookup the inode
654        match *self.inode_tracker.read().get(inode).unwrap() {
655            // read is invalid on non-files.
656            InodeData::Directory(..) | InodeData::Symlink(_) => {
657                warn!("is directory");
658                Err(io::Error::from_raw_os_error(libc::EISDIR))
659            }
660            InodeData::Regular(ref blob_digest, _blob_size, _) => {
661                Span::current().record("blob.digest", blob_digest.to_string());
662
663                match self
664                    .tokio_handle
665                    .block_on(async { self.blob_service.open_read(blob_digest).await })
666                {
667                    Ok(None) => {
668                        warn!("blob not found");
669                        Err(io::Error::from_raw_os_error(libc::EIO))
670                    }
671                    Err(e) => {
672                        warn!(e=?e, "error opening blob");
673                        Err(io::Error::from_raw_os_error(libc::EIO))
674                    }
675                    Ok(Some(blob_reader)) => {
676                        // get a new file handle
677                        // TODO: this will overflow after 2**64 operations,
678                        // which is fine for now.
679                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
680                        // for the discussion on alternatives.
681                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
682
683                        self.file_handles
684                            .write()
685                            .insert(fh, (Span::current(), Arc::new(Mutex::new(blob_reader))));
686
687                        Ok((
688                            Some(fh),
689                            // Don't invalidate the data cache on open.
690                            OpenOptions::KEEP_CACHE,
691                            None,
692                        ))
693                    }
694                }
695            }
696        }
697    }
698
699    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
700    fn release(
701        &self,
702        _ctx: &Context,
703        inode: Self::Inode,
704        _flags: u32,
705        handle: Self::Handle,
706        _flush: bool,
707        _flock_release: bool,
708        _lock_owner: Option<u64>,
709    ) -> io::Result<()> {
710        match self.file_handles.write().remove(&handle) {
711            // drop the blob reader, which will close it.
712            Some(blob_reader) => drop(blob_reader),
713            None => {
714                // These might already be dropped if a read error occured.
715                warn!("file handle not found");
716            }
717        }
718
719        Ok(())
720    }
721
722    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset, rq.size = size), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))]
723    fn read(
724        &self,
725        _ctx: &Context,
726        inode: Self::Inode,
727        handle: Self::Handle,
728        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
729        size: u32,
730        offset: u64,
731        _lock_owner: Option<u64>,
732        _flags: u32,
733    ) -> io::Result<usize> {
734        debug!("read");
735
736        // We need to take out the blob reader from self.file_handles, so we can
737        // interact with it in the separate task.
738        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
739        let (_span, blob_reader) = self
740            .file_handles
741            .read()
742            .get(&handle)
743            .ok_or_else(|| {
744                warn!("file handle {} unknown", handle);
745                io::Error::from_raw_os_error(libc::EIO)
746            })
747            .cloned()?;
748
749        let mut blob_reader = blob_reader
750            .lock()
751            .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
752
753        let buf = self.tokio_handle.block_on(async move {
754            // seek to the offset specified, which is relative to the start of the file.
755            let pos = blob_reader
756                .seek(io::SeekFrom::Start(offset))
757                .await
758                .map_err(|e| {
759                    warn!("failed to seek to offset {}: {}", offset, e);
760                    io::Error::from_raw_os_error(libc::EIO)
761                })?;
762
763            debug_assert_eq!(offset, pos);
764
765            // As written in the fuse docs, read should send exactly the number
766            // of bytes requested except on EOF or error.
767
768            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
769
770            // copy things from the internal buffer into buf to fill it till up until size
771            tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
772
773            Ok::<_, std::io::Error>(buf)
774        })?;
775
776        // We cannot use w.write() here, we're required to call write multiple
777        // times until we wrote the entirety of the buffer (which is `size`, except on EOF).
778        let buf_len = buf.len();
779        let bytes_written = io::copy(&mut Cursor::new(buf), w)?;
780        if bytes_written != buf_len as u64 {
781            error!(bytes_written=%bytes_written, "unable to write all of buf to kernel");
782            return Err(io::Error::from_raw_os_error(libc::EIO));
783        }
784
785        Ok(bytes_written as usize)
786    }
787
788    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
789    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
790        if inode == ROOT_ID {
791            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
792        }
793
794        // lookup the inode
795        match *self.inode_tracker.read().get(inode).unwrap() {
796            InodeData::Directory(..) | InodeData::Regular(..) => {
797                Err(io::Error::from_raw_os_error(libc::EINVAL))
798            }
799            InodeData::Symlink(ref target) => Ok(target.to_vec()),
800        }
801    }
802
803    #[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))]
804    fn getxattr(
805        &self,
806        _ctx: &Context,
807        inode: Self::Inode,
808        name: &CStr,
809        size: u32,
810    ) -> io::Result<GetxattrReply> {
811        if !self.show_xattr {
812            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
813        }
814
815        // Peek at the inode requested, and construct the response.
816        let digest_str = match *self
817            .inode_tracker
818            .read()
819            .get(inode)
820            .ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))?
821        {
822            InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _))
823            | InodeData::Directory(DirectoryInodeData::Populated(ref digest, _))
824                if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST =>
825            {
826                digest.to_string()
827            }
828            InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => {
829                digest.to_string()
830            }
831            _ => {
832                return Err(io::Error::from_raw_os_error(libc::ENODATA));
833            }
834        };
835
836        if size == 0 {
837            Ok(GetxattrReply::Count(digest_str.len() as u32))
838        } else if size < digest_str.len() as u32 {
839            Err(io::Error::from_raw_os_error(libc::ERANGE))
840        } else {
841            Ok(GetxattrReply::Value(digest_str.into_bytes()))
842        }
843    }
844
845    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
846    fn listxattr(
847        &self,
848        _ctx: &Context,
849        inode: Self::Inode,
850        size: u32,
851    ) -> io::Result<ListxattrReply> {
852        if !self.show_xattr {
853            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
854        }
855
856        // determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode.
857        let xattrs_names = {
858            let mut out = Vec::new();
859            if let Some(inode_data) = self.inode_tracker.read().get(inode) {
860                match *inode_data {
861                    InodeData::Directory(_) => {
862                        out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST);
863                        out.push_byte(b'\x00');
864                    }
865                    InodeData::Regular(..) => {
866                        out.extend_from_slice(XATTR_NAME_BLOB_DIGEST);
867                        out.push_byte(b'\x00');
868                    }
869                    _ => {}
870                }
871            }
872            out
873        };
874
875        if size == 0 {
876            Ok(ListxattrReply::Count(xattrs_names.len() as u32))
877        } else if size < xattrs_names.len() as u32 {
878            Err(io::Error::from_raw_os_error(libc::ERANGE))
879        } else {
880            Ok(ListxattrReply::Names(xattrs_names.to_vec()))
881        }
882    }
883}