snix_castore/fs/fuse/
mod.rs1use std::{io, path::Path, sync::Arc};
2
3use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
4use parking_lot::Mutex;
5use threadpool::ThreadPool;
6use tracing::{error, instrument};
7
8#[cfg(test)]
9mod tests;
10
11struct FuseServer<FS>
12where
13 FS: FileSystem + Sync + Send,
14{
15 server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
16 channel: fuse_backend_rs::transport::FuseChannel,
17}
18
19#[cfg(target_os = "macos")]
20const BADFD: libc::c_int = libc::EBADF;
21#[cfg(target_os = "linux")]
22const BADFD: libc::c_int = libc::EBADFD;
23
24impl<FS> FuseServer<FS>
25where
26 FS: FileSystem + Sync + Send,
27{
28 fn start(&mut self, tokio_handle: tokio::runtime::Handle) -> io::Result<()> {
29 let _guard = tokio_handle.enter();
30
31 while let Some((reader, writer)) = self
32 .channel
33 .get_request()
34 .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
35 {
36 if let Err(e) = self
37 .server
38 .handle_message(reader, writer.into(), None, None)
39 {
40 match e {
41 fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
43 break;
44 }
45 error => {
46 error!(?error, "failed to handle fuse request");
47 continue;
48 }
49 }
50 }
51 }
52 Ok(())
53 }
54}
55
56#[derive(Clone)]
59pub struct FuseDaemon {
60 session: Arc<Mutex<FuseSession>>,
61 threads: Arc<ThreadPool>,
62}
63
64impl FuseDaemon {
65 #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
66 pub fn new<FS, P>(
67 fs: FS,
68 mountpoint: P,
69 num_threads: usize,
70 allow_other: bool,
71 ) -> Result<Self, io::Error>
72 where
73 FS: FileSystem + Sync + Send + 'static,
74 P: AsRef<Path> + std::fmt::Debug,
75 {
76 let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
77
78 let mut session = FuseSession::new(mountpoint.as_ref(), "snix-castore", "", true)
79 .map_err(|e| io::Error::other(e.to_string()))?;
80
81 #[cfg(target_os = "linux")]
82 session.set_allow_other(allow_other);
83 session
84 .mount()
85 .map_err(|e| io::Error::other(e.to_string()))?;
86
87 let threads = threadpool::Builder::new()
89 .num_threads(num_threads)
90 .thread_name("fuse_server".to_string())
91 .build();
92
93 let runtime_handle = tokio::runtime::Handle::current();
95
96 for _ in 0..num_threads {
97 let mut server = FuseServer {
99 server: server.clone(),
100 channel: session
101 .new_channel()
102 .map_err(|e| io::Error::other(e.to_string()))?,
103 };
104
105 threads.execute({
108 let runtime_handle = runtime_handle.clone();
109 move || {
110 let _ = server.start(runtime_handle);
111 }
112 });
113 }
114
115 Ok(FuseDaemon {
116 session: Arc::new(Mutex::new(session)),
117 threads: Arc::new(threads),
118 })
119 }
120
121 #[instrument(skip_all)]
123 pub fn wait(&self) {
124 self.threads.join()
125 }
126
127 #[instrument(skip_all, err)]
129 pub fn unmount(&self) -> Result<(), io::Error> {
130 self.session
132 .lock()
133 .umount()
134 .map_err(|e| io::Error::other(e.to_string()))?;
135
136 self.wait();
137 Ok(())
138 }
139}
140
141impl Drop for FuseDaemon {
142 fn drop(&mut self) {
143 if let Err(error) = self.unmount() {
144 error!(?error, "failed to unmont fuse filesystem")
145 }
146 }
147}