snix_castore/fs/fuse/
mod.rs

1use std::{io, path::Path, sync::Arc};
2
3use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
4use parking_lot::Mutex;
5use threadpool::ThreadPool;
6use tracing::{error, instrument};
7
8#[cfg(test)]
9mod tests;
10
11struct FuseServer<FS>
12where
13    FS: FileSystem + Sync + Send,
14{
15    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
16    channel: fuse_backend_rs::transport::FuseChannel,
17}
18
19#[cfg(target_os = "macos")]
20const BADFD: libc::c_int = libc::EBADF;
21#[cfg(target_os = "linux")]
22const BADFD: libc::c_int = libc::EBADFD;
23
24impl<FS> FuseServer<FS>
25where
26    FS: FileSystem + Sync + Send,
27{
28    fn start(&mut self, tokio_handle: tokio::runtime::Handle) -> io::Result<()> {
29        let _guard = tokio_handle.enter();
30
31        while let Some((reader, writer)) = self
32            .channel
33            .get_request()
34            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
35        {
36            if let Err(e) = self
37                .server
38                .handle_message(reader, writer.into(), None, None)
39            {
40                match e {
41                    // This indicates the session has been shut down.
42                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
43                        break;
44                    }
45                    error => {
46                        error!(?error, "failed to handle fuse request");
47                        continue;
48                    }
49                }
50            }
51        }
52        Ok(())
53    }
54}
55
56/// Starts a [FileSystem] with the specified number of threads, and provides
57/// functions to unmount, and wait for it to have completed.
58#[derive(Clone)]
59pub struct FuseDaemon {
60    session: Arc<Mutex<FuseSession>>,
61    threads: Arc<ThreadPool>,
62}
63
64impl FuseDaemon {
65    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
66    pub fn new<FS, P>(
67        fs: FS,
68        mountpoint: P,
69        num_threads: usize,
70        allow_other: bool,
71    ) -> Result<Self, io::Error>
72    where
73        FS: FileSystem + Sync + Send + 'static,
74        P: AsRef<Path> + std::fmt::Debug,
75    {
76        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
77
78        let mut session = FuseSession::new(mountpoint.as_ref(), "snix-castore", "", true)
79            .map_err(|e| io::Error::other(e.to_string()))?;
80
81        #[cfg(target_os = "linux")]
82        session.set_allow_other(allow_other);
83        session
84            .mount()
85            .map_err(|e| io::Error::other(e.to_string()))?;
86
87        // construct a thread pool
88        let threads = threadpool::Builder::new()
89            .num_threads(num_threads)
90            .thread_name("fuse_server".to_string())
91            .build();
92
93        // get a handle to the current tokio runtime
94        let runtime_handle = tokio::runtime::Handle::current();
95
96        for _ in 0..num_threads {
97            // for each thread requested, create and start a FuseServer accepting requests.
98            let mut server = FuseServer {
99                server: server.clone(),
100                channel: session
101                    .new_channel()
102                    .map_err(|e| io::Error::other(e.to_string()))?,
103            };
104
105            // Start the FuseServer in each thread, and enter the tokio runtime context,
106            // so we can block on tasks.
107            threads.execute({
108                let runtime_handle = runtime_handle.clone();
109                move || {
110                    let _ = server.start(runtime_handle);
111                }
112            });
113        }
114
115        Ok(FuseDaemon {
116            session: Arc::new(Mutex::new(session)),
117            threads: Arc::new(threads),
118        })
119    }
120
121    /// Waits for all threads to finish.
122    #[instrument(skip_all)]
123    pub fn wait(&self) {
124        self.threads.join()
125    }
126
127    /// Send the unmount command, and waits for all threads to finish.
128    #[instrument(skip_all, err)]
129    pub fn unmount(&self) -> Result<(), io::Error> {
130        // Send the unmount command.
131        self.session
132            .lock()
133            .umount()
134            .map_err(|e| io::Error::other(e.to_string()))?;
135
136        self.wait();
137        Ok(())
138    }
139}
140
141impl Drop for FuseDaemon {
142    fn drop(&mut self) {
143        if let Err(error) = self.unmount() {
144            error!(?error, "failed to unmont fuse filesystem")
145        }
146    }
147}