snix_castore/fs/fuse/
mod.rs

1use std::{io, path::Path, sync::Arc};
2
3use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
4use parking_lot::Mutex;
5use threadpool::ThreadPool;
6use tracing::{error, instrument};
7
8#[cfg(test)]
9mod tests;
10
11struct FuseServer<FS>
12where
13    FS: FileSystem + Sync + Send,
14{
15    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
16    channel: fuse_backend_rs::transport::FuseChannel,
17}
18
19#[cfg(target_os = "macos")]
20const BADFD: libc::c_int = libc::EBADF;
21#[cfg(target_os = "linux")]
22const BADFD: libc::c_int = libc::EBADFD;
23
24impl<FS> FuseServer<FS>
25where
26    FS: FileSystem + Sync + Send,
27{
28    fn start(&mut self) -> io::Result<()> {
29        while let Some((reader, writer)) = self
30            .channel
31            .get_request()
32            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
33        {
34            if let Err(e) = self
35                .server
36                .handle_message(reader, writer.into(), None, None)
37            {
38                match e {
39                    // This indicates the session has been shut down.
40                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
41                        break;
42                    }
43                    error => {
44                        error!(?error, "failed to handle fuse request");
45                        continue;
46                    }
47                }
48            }
49        }
50        Ok(())
51    }
52}
53
54/// Starts a [FileSystem] with the specified number of threads, and provides
55/// functions to unmount, and wait for it to have completed.
56#[derive(Clone)]
57pub struct FuseDaemon {
58    session: Arc<Mutex<FuseSession>>,
59    threads: Arc<ThreadPool>,
60}
61
62impl FuseDaemon {
63    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
64    pub fn new<FS, P>(
65        fs: FS,
66        mountpoint: P,
67        num_threads: usize,
68        allow_other: bool,
69    ) -> Result<Self, io::Error>
70    where
71        FS: FileSystem + Sync + Send + 'static,
72        P: AsRef<Path> + std::fmt::Debug,
73    {
74        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
75
76        let mut session = FuseSession::new(mountpoint.as_ref(), "snix-store", "", true)
77            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
78
79        #[cfg(target_os = "linux")]
80        session.set_allow_other(allow_other);
81        session
82            .mount()
83            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
84
85        // construct a thread pool
86        let threads = threadpool::Builder::new()
87            .num_threads(num_threads)
88            .thread_name("fuse_server".to_string())
89            .build();
90
91        for _ in 0..num_threads {
92            // for each thread requested, create and start a FuseServer accepting requests.
93            let mut server = FuseServer {
94                server: server.clone(),
95                channel: session
96                    .new_channel()
97                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
98            };
99
100            threads.execute(move || {
101                let _ = server.start();
102            });
103        }
104
105        Ok(FuseDaemon {
106            session: Arc::new(Mutex::new(session)),
107            threads: Arc::new(threads),
108        })
109    }
110
111    /// Waits for all threads to finish.
112    #[instrument(skip_all)]
113    pub fn wait(&self) {
114        self.threads.join()
115    }
116
117    /// Send the unmount command, and waits for all threads to finish.
118    #[instrument(skip_all, err)]
119    pub fn unmount(&self) -> Result<(), io::Error> {
120        // Send the unmount command.
121        self.session
122            .lock()
123            .umount()
124            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
125
126        self.wait();
127        Ok(())
128    }
129}
130
131impl Drop for FuseDaemon {
132    fn drop(&mut self) {
133        if let Err(error) = self.unmount() {
134            error!(?error, "failed to unmont fuse filesystem")
135        }
136    }
137}