snix_castore/fs/fuse/
mod.rs1use std::{io, path::Path, sync::Arc};
2
3use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
4use parking_lot::Mutex;
5use threadpool::ThreadPool;
6use tracing::{error, instrument};
7
8#[cfg(test)]
9mod tests;
10
11struct FuseServer<FS>
12where
13 FS: FileSystem + Sync + Send,
14{
15 server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
16 channel: fuse_backend_rs::transport::FuseChannel,
17}
18
19#[cfg(target_os = "macos")]
20const BADFD: libc::c_int = libc::EBADF;
21#[cfg(target_os = "linux")]
22const BADFD: libc::c_int = libc::EBADFD;
23
24impl<FS> FuseServer<FS>
25where
26 FS: FileSystem + Sync + Send,
27{
28 fn start(&mut self) -> io::Result<()> {
29 while let Some((reader, writer)) = self
30 .channel
31 .get_request()
32 .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
33 {
34 if let Err(e) = self
35 .server
36 .handle_message(reader, writer.into(), None, None)
37 {
38 match e {
39 fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
41 break;
42 }
43 error => {
44 error!(?error, "failed to handle fuse request");
45 continue;
46 }
47 }
48 }
49 }
50 Ok(())
51 }
52}
53
54#[derive(Clone)]
57pub struct FuseDaemon {
58 session: Arc<Mutex<FuseSession>>,
59 threads: Arc<ThreadPool>,
60}
61
62impl FuseDaemon {
63 #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
64 pub fn new<FS, P>(
65 fs: FS,
66 mountpoint: P,
67 num_threads: usize,
68 allow_other: bool,
69 ) -> Result<Self, io::Error>
70 where
71 FS: FileSystem + Sync + Send + 'static,
72 P: AsRef<Path> + std::fmt::Debug,
73 {
74 let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
75
76 let mut session = FuseSession::new(mountpoint.as_ref(), "snix-store", "", true)
77 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
78
79 #[cfg(target_os = "linux")]
80 session.set_allow_other(allow_other);
81 session
82 .mount()
83 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
84
85 let threads = threadpool::Builder::new()
87 .num_threads(num_threads)
88 .thread_name("fuse_server".to_string())
89 .build();
90
91 for _ in 0..num_threads {
92 let mut server = FuseServer {
94 server: server.clone(),
95 channel: session
96 .new_channel()
97 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
98 };
99
100 threads.execute(move || {
101 let _ = server.start();
102 });
103 }
104
105 Ok(FuseDaemon {
106 session: Arc::new(Mutex::new(session)),
107 threads: Arc::new(threads),
108 })
109 }
110
111 #[instrument(skip_all)]
113 pub fn wait(&self) {
114 self.threads.join()
115 }
116
117 #[instrument(skip_all, err)]
119 pub fn unmount(&self) -> Result<(), io::Error> {
120 self.session
122 .lock()
123 .umount()
124 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
125
126 self.wait();
127 Ok(())
128 }
129}
130
131impl Drop for FuseDaemon {
132 fn drop(&mut self) {
133 if let Err(error) = self.unmount() {
134 error!(?error, "failed to unmont fuse filesystem")
135 }
136 }
137}