fuse_backend_rs/transport/fusedev/
linux_session.rs

1// Copyright 2020-2022 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! FUSE session management.
6//!
7//! A FUSE channel is a FUSE request handling context that takes care of handling FUSE requests
8//! sequentially. A FUSE session is a connection from a FUSE mountpoint to a FUSE server daemon.
9//! A FUSE session can have multiple FUSE channels so that FUSE requests are handled in parallel.
10
11use std::fs::{File, OpenOptions};
12use std::ops::Deref;
13use std::os::unix::fs::PermissionsExt;
14use std::os::unix::io::AsRawFd;
15use std::os::unix::net::UnixStream;
16use std::path::{Path, PathBuf};
17use std::sync::{Arc, Mutex};
18
19use mio::{Events, Poll, Token, Waker};
20use nix::errno::Errno;
21use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
22use nix::mount::{mount, umount2, MntFlags, MsFlags};
23use nix::poll::{poll, PollFd, PollFlags};
24use nix::sys::epoll::{epoll_ctl, EpollEvent, EpollFlags, EpollOp};
25use nix::unistd::{getgid, getuid, read};
26
27use super::{
28    super::pagesize,
29    Error::{IoError, SessionFailure},
30    FuseBuf, FuseDevWriter, Reader, Result, FUSE_HEADER_SIZE, FUSE_KERN_BUF_PAGES,
31};
32
33// These follows definition from libfuse.
34const POLL_EVENTS_CAPACITY: usize = 1024;
35
36const FUSE_DEVICE: &str = "/dev/fuse";
37const FUSE_FSTYPE: &str = "fuse";
38const FUSERMOUNT_BIN: &str = "fusermount3";
39
40const EXIT_FUSE_EVENT: Token = Token(0);
41const FUSE_DEV_EVENT: Token = Token(1);
42
43/// A fuse session manager to manage the connection with the in kernel fuse driver.
44pub struct FuseSession {
45    mountpoint: PathBuf,
46    fsname: String,
47    subtype: String,
48    file: Option<File>,
49    // Socket to keep alive / drop for fusermount's auto_unmount.
50    keep_alive: Option<UnixStream>,
51    bufsize: usize,
52    readonly: bool,
53    wakers: Mutex<Vec<Arc<Waker>>>,
54    auto_unmount: bool,
55    allow_other: bool,
56    target_mntns: Option<libc::pid_t>,
57    // fusermount binary, default to fusermount3
58    fusermount: String,
59}
60
61impl FuseSession {
62    /// Create a new fuse session, without mounting/connecting to the in kernel fuse driver.
63    pub fn new(
64        mountpoint: &Path,
65        fsname: &str,
66        subtype: &str,
67        readonly: bool,
68    ) -> Result<FuseSession> {
69        FuseSession::new_with_autounmount(mountpoint, fsname, subtype, readonly, false)
70    }
71
72    /// Create a new fuse session, without mounting/connecting to the in kernel fuse driver.
73    pub fn new_with_autounmount(
74        mountpoint: &Path,
75        fsname: &str,
76        subtype: &str,
77        readonly: bool,
78        auto_unmount: bool,
79    ) -> Result<FuseSession> {
80        let dest = mountpoint
81            .canonicalize()
82            .map_err(|_| SessionFailure(format!("invalid mountpoint {mountpoint:?}")))?;
83        if !dest.is_dir() {
84            return Err(SessionFailure(format!("{dest:?} is not a directory")));
85        }
86
87        Ok(FuseSession {
88            mountpoint: dest,
89            fsname: fsname.to_owned(),
90            subtype: subtype.to_owned(),
91            file: None,
92            keep_alive: None,
93            bufsize: FUSE_KERN_BUF_PAGES * pagesize() + FUSE_HEADER_SIZE,
94            readonly,
95            wakers: Mutex::new(Vec::new()),
96            auto_unmount,
97            target_mntns: None,
98            fusermount: FUSERMOUNT_BIN.to_string(),
99            allow_other: true,
100        })
101    }
102
103    /// Set the target pid of mount namespace of the fuse session mount, the fuse will be mounted
104    /// under the given mnt ns.
105    pub fn set_target_mntns(&mut self, pid: Option<libc::pid_t>) {
106        self.target_mntns = pid;
107    }
108
109    /// Set fusermount binary, default to fusermount3.
110    pub fn set_fusermount(&mut self, bin: &str) {
111        self.fusermount = bin.to_string();
112    }
113
114    /// Set the allow_other mount option. This allows other users than the one mounting the
115    /// filesystem to access the filesystem. However, this option is usually restricted to the root
116    /// user unless configured otherwise.
117    pub fn set_allow_other(&mut self, allow_other: bool) {
118        self.allow_other = allow_other;
119    }
120
121    /// Get current fusermount binary.
122    pub fn get_fusermount(&self) -> &str {
123        self.fusermount.as_str()
124    }
125
126    /// Expose the associated FUSE session file.
127    pub fn get_fuse_file(&self) -> Option<&File> {
128        self.file.as_ref()
129    }
130
131    /// Force setting the associated FUSE session file.
132    pub fn set_fuse_file(&mut self, file: File) {
133        self.file = Some(file);
134    }
135
136    /// Clone fuse file using ioctl FUSE_DEV_IOC_CLONE.
137    pub fn clone_fuse_file(&self) -> Result<File> {
138        let mut old_fd = self
139            .file
140            .as_ref()
141            .ok_or(SessionFailure(
142                "fuse session file doesn't exist".to_string(),
143            ))?
144            .as_raw_fd();
145
146        let cloned_file = OpenOptions::new()
147            .create(false)
148            .read(true)
149            .write(true)
150            .open(FUSE_DEVICE)
151            .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
152
153        // define the function which invokes "ioctl FUSE_DEV_IOC_CLONE"
154        // refer: https://github.com/torvalds/linux/blob/c42d9eeef8e5ba9292eda36fd8e3c11f35ee065c/include/uapi/linux/fuse.h#L1051-L1052
155        // #define FUSE_DEV_IOC_MAGIC   229
156        // #define FUSE_DEV_IOC_CLONE   _IOR(FUSE_DEV_IOC_MAGIC, 0, uint32_t)
157        nix::ioctl_read!(clone_fuse_fd, 229, 0, i32);
158
159        unsafe { clone_fuse_fd(cloned_file.as_raw_fd(), (&mut old_fd) as *mut i32) }
160            .map_err(|e| SessionFailure(format!("failed to clone fuse file: {:?}", e)))?;
161
162        Ok(cloned_file)
163    }
164
165    /// Get the mountpoint of the session.
166    pub fn mountpoint(&self) -> &Path {
167        &self.mountpoint
168    }
169
170    /// Get the file system name of the session.
171    pub fn fsname(&self) -> &str {
172        &self.fsname
173    }
174
175    /// Get the subtype of the session.
176    pub fn subtype(&self) -> &str {
177        &self.subtype
178    }
179
180    /// Get the default buffer size of the session.
181    pub fn bufsize(&self) -> usize {
182        self.bufsize
183    }
184
185    /// Mount the fuse mountpoint, building connection with the in kernel fuse driver.
186    pub fn mount(&mut self) -> Result<()> {
187        let mut flags = MsFlags::MS_NOSUID | MsFlags::MS_NODEV | MsFlags::MS_NOATIME;
188        if self.readonly {
189            flags |= MsFlags::MS_RDONLY;
190        }
191        let (file, socket) = fuse_kern_mount(
192            &self.mountpoint,
193            &self.fsname,
194            &self.subtype,
195            flags,
196            self.auto_unmount,
197            self.allow_other,
198            self.target_mntns,
199            &self.fusermount,
200        )?;
201
202        fcntl(file.as_raw_fd(), FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
203            .map_err(|e| SessionFailure(format!("set fd nonblocking: {e}")))?;
204        self.file = Some(file);
205        self.keep_alive = socket;
206
207        Ok(())
208    }
209
210    /// Destroy a fuse session.
211    pub fn umount(&mut self) -> Result<()> {
212        // If we have a keep_alive socket, just drop it,
213        // and let fusermount do the unmount.
214        if let (None, Some(file)) = (self.keep_alive.take(), self.file.take()) {
215            if let Some(mountpoint) = self.mountpoint.to_str() {
216                fuse_kern_umount(mountpoint, file, self.fusermount.as_str())
217            } else {
218                Err(SessionFailure("invalid mountpoint".to_string()))
219            }
220        } else {
221            Ok(())
222        }
223    }
224
225    /// Create a new fuse message channel.
226    pub fn new_channel(&self) -> Result<FuseChannel> {
227        if let Some(file) = &self.file {
228            let file = file
229                .try_clone()
230                .map_err(|e| SessionFailure(format!("dup fd: {e}")))?;
231            let channel = FuseChannel::new(file, self.bufsize)?;
232            let waker = channel.get_waker();
233            self.add_waker(waker)?;
234
235            Ok(channel)
236        } else {
237            Err(SessionFailure("invalid fuse session".to_string()))
238        }
239    }
240
241    /// Wake channel loop and exit
242    pub fn wake(&self) -> Result<()> {
243        let wakers = self
244            .wakers
245            .lock()
246            .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
247        for waker in wakers.iter() {
248            waker
249                .wake()
250                .map_err(|e| SessionFailure(format!("wake channel: {e}")))?;
251        }
252        Ok(())
253    }
254
255    fn add_waker(&self, waker: Arc<Waker>) -> Result<()> {
256        let mut wakers = self
257            .wakers
258            .lock()
259            .map_err(|e| SessionFailure(format!("lock wakers: {e}")))?;
260        wakers.push(waker);
261        Ok(())
262    }
263}
264
265impl Drop for FuseSession {
266    fn drop(&mut self) {
267        let _ = self.umount();
268    }
269}
270
271/// A fuse channel abstraction.
272///
273/// Each session can hold multiple channels.
274pub struct FuseChannel {
275    file: File,
276    poll: Poll,
277    waker: Arc<Waker>,
278    buf: Vec<u8>,
279}
280
281impl FuseChannel {
282    fn new(file: File, bufsize: usize) -> Result<Self> {
283        let poll = Poll::new().map_err(|e| SessionFailure(format!("epoll create: {e}")))?;
284        let waker = Waker::new(poll.registry(), EXIT_FUSE_EVENT)
285            .map_err(|e| SessionFailure(format!("epoll register session fd: {e}")))?;
286        let waker = Arc::new(waker);
287
288        // mio default add EPOLLET to event flags, so epoll will use edge-triggered mode.
289        // It may let poll miss some event, so manually register the fd with only EPOLLIN flag
290        // to use level-triggered mode.
291        let epoll = poll.as_raw_fd();
292        let mut event = EpollEvent::new(EpollFlags::EPOLLIN, usize::from(FUSE_DEV_EVENT) as u64);
293        epoll_ctl(
294            epoll,
295            EpollOp::EpollCtlAdd,
296            file.as_raw_fd(),
297            Some(&mut event),
298        )
299        .map_err(|e| SessionFailure(format!("epoll register channel fd: {e}")))?;
300
301        Ok(FuseChannel {
302            file,
303            poll,
304            waker,
305            buf: vec![0x0u8; bufsize],
306        })
307    }
308
309    fn get_waker(&self) -> Arc<Waker> {
310        self.waker.clone()
311    }
312
313    /// Get next available FUSE request from the underlying fuse device file.
314    ///
315    /// Returns:
316    /// - Ok(None): signal has pending on the exiting event channel
317    /// - Ok(Some((reader, writer))): reader to receive request and writer to send reply
318    /// - Err(e): error message
319    pub fn get_request(&mut self) -> Result<Option<(Reader, FuseDevWriter)>> {
320        let mut events = Events::with_capacity(POLL_EVENTS_CAPACITY);
321        let mut need_exit = false;
322        loop {
323            let mut fusereq_available = false;
324            match self.poll.poll(&mut events, None) {
325                Ok(_) => {}
326                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
327                Err(e) => return Err(SessionFailure(format!("epoll wait: {e}"))),
328            }
329
330            for event in events.iter() {
331                if event.is_readable() {
332                    match event.token() {
333                        EXIT_FUSE_EVENT => need_exit = true,
334                        FUSE_DEV_EVENT => fusereq_available = true,
335                        x => {
336                            error!("unexpected epoll event");
337                            return Err(SessionFailure(format!("unexpected epoll event: {}", x.0)));
338                        }
339                    }
340                } else if event.is_error() {
341                    info!("FUSE channel already closed!");
342                    return Err(SessionFailure("epoll error".to_string()));
343                } else {
344                    // We should not step into this branch as other event is not registered.
345                    panic!("unknown epoll result events");
346                }
347            }
348
349            // Handle wake up event first. We don't read the event fd so that a LEVEL triggered
350            // event can still be delivered to other threads/daemons.
351            if need_exit {
352                info!("Will exit from fuse service");
353                return Ok(None);
354            }
355            if fusereq_available {
356                let fd = self.file.as_raw_fd();
357                match read(fd, &mut self.buf) {
358                    Ok(len) => {
359                        // ###############################################
360                        // Note: it's a heavy hack to reuse the same underlying data
361                        // buffer for both Reader and Writer, in order to reduce memory
362                        // consumption. Here we assume Reader won't be used anymore once
363                        // we start to write to the Writer. To get rid of this hack,
364                        // just allocate a dedicated data buffer for Writer.
365                        let buf = unsafe {
366                            std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
367                        };
368                        // Reader::new() and Writer::new() should always return success.
369                        let reader =
370                            Reader::from_fuse_buffer(FuseBuf::new(&mut self.buf[..len])).unwrap();
371                        let writer = FuseDevWriter::new(fd, buf).unwrap();
372                        return Ok(Some((reader, writer)));
373                    }
374                    Err(e) => match e {
375                        Errno::ENOENT => {
376                            // ENOENT means the operation was interrupted, it's safe to restart
377                            trace!("restart reading due to ENOENT");
378                            continue;
379                        }
380                        Errno::EAGAIN => {
381                            trace!("restart reading due to EAGAIN");
382                            continue;
383                        }
384                        Errno::EINTR => {
385                            trace!("syscall interrupted");
386                            continue;
387                        }
388                        Errno::ENODEV => {
389                            info!("fuse filesystem umounted");
390                            return Ok(None);
391                        }
392                        e => {
393                            warn! {"read fuse dev failed on fd {}: {}", fd, e};
394                            return Err(SessionFailure(format!("read new request: {e:?}")));
395                        }
396                    },
397                }
398            }
399        }
400    }
401}
402
403/// Mount a fuse file system
404#[allow(clippy::too_many_arguments)]
405fn fuse_kern_mount(
406    mountpoint: &Path,
407    fsname: &str,
408    subtype: &str,
409    flags: MsFlags,
410    auto_unmount: bool,
411    allow_other: bool,
412    target_mntns: Option<libc::pid_t>,
413    fusermount: &str,
414) -> Result<(File, Option<UnixStream>)> {
415    let file = OpenOptions::new()
416        .create(false)
417        .read(true)
418        .write(true)
419        .open(FUSE_DEVICE)
420        .map_err(|e| SessionFailure(format!("open {FUSE_DEVICE}: {e}")))?;
421    let meta = mountpoint
422        .metadata()
423        .map_err(|e| SessionFailure(format!("stat {mountpoint:?}: {e}")))?;
424    let mut opts = format!(
425        "default_permissions,fd={},rootmode={:o},user_id={},group_id={}",
426        file.as_raw_fd(),
427        meta.permissions().mode() & libc::S_IFMT,
428        getuid(),
429        getgid(),
430    );
431    if allow_other {
432        opts.push_str(",allow_other");
433    }
434    let mut fstype = String::from(FUSE_FSTYPE);
435    if !subtype.is_empty() {
436        fstype.push('.');
437        fstype.push_str(subtype);
438    }
439
440    if let Some(mountpoint) = mountpoint.to_str() {
441        info!(
442            "mount source {} dest {} with fstype {} opts {} fd {}",
443            fsname,
444            mountpoint,
445            fstype,
446            opts,
447            file.as_raw_fd(),
448        );
449    }
450
451    // mount in another mntns requires mounting with fusermount, which is a new process, as
452    // multithreaded program is not allowed to join to another mntns, and the process running fuse
453    // session might be multithreaded.
454    if auto_unmount || target_mntns.is_some() {
455        fuse_fusermount_mount(
456            mountpoint,
457            fsname,
458            subtype,
459            opts,
460            flags,
461            auto_unmount,
462            target_mntns,
463            fusermount,
464        )
465    } else {
466        match mount(
467            Some(fsname),
468            mountpoint,
469            Some(fstype.deref()),
470            flags,
471            Some(opts.deref()),
472        ) {
473            Ok(()) => Ok((file, None)),
474            Err(Errno::EPERM) => fuse_fusermount_mount(
475                mountpoint,
476                fsname,
477                subtype,
478                opts,
479                flags,
480                auto_unmount,
481                target_mntns,
482                fusermount,
483            ),
484            Err(e) => Err(SessionFailure(format!(
485                "failed to mount {mountpoint:?}: {e}"
486            ))),
487        }
488    }
489}
490
491fn msflags_to_string(flags: MsFlags) -> String {
492    [
493        (MsFlags::MS_RDONLY, ("rw", "ro")),
494        (MsFlags::MS_NOSUID, ("suid", "nosuid")),
495        (MsFlags::MS_NODEV, ("dev", "nodev")),
496        (MsFlags::MS_NOEXEC, ("exec", "noexec")),
497        (MsFlags::MS_SYNCHRONOUS, ("async", "sync")),
498        (MsFlags::MS_NOATIME, ("atime", "noatime")),
499    ]
500    .map(
501        |(flag, (neg, pos))| {
502            if flags.contains(flag) {
503                pos
504            } else {
505                neg
506            }
507        },
508    )
509    .join(",")
510}
511
512/// Mount a fuse file system with fusermount
513#[allow(clippy::too_many_arguments)]
514fn fuse_fusermount_mount(
515    mountpoint: &Path,
516    fsname: &str,
517    subtype: &str,
518    opts: String,
519    flags: MsFlags,
520    auto_unmount: bool,
521    target_mntns: Option<libc::pid_t>,
522    fusermount: &str,
523) -> Result<(File, Option<UnixStream>)> {
524    let mut opts = vec![format!("fsname={fsname}"), opts, msflags_to_string(flags)];
525    if !subtype.is_empty() {
526        opts.push(format!("subtype={subtype}"));
527    }
528    if auto_unmount {
529        opts.push("auto_unmount".to_owned());
530    }
531    let opts = opts.join(",");
532
533    let (send, recv) = UnixStream::pair().unwrap();
534
535    // Keep the sending socket around after exec to pass to fusermount.
536    // When its partner recv closes, fusermount will unmount.
537    // Remove the close-on-exec flag from the socket, so we can pass it to
538    // fusermount.
539    fcntl(send.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
540        .map_err(|e| SessionFailure(format!("Failed to remove close-on-exec flag: {e}")))?;
541
542    let mut cmd = match target_mntns {
543        Some(pid) => {
544            let mut c = std::process::Command::new("nsenter");
545            c.arg("-t")
546                .arg(format!("{}", pid))
547                .arg("-m")
548                .arg(fusermount);
549            c
550        }
551        None => std::process::Command::new(fusermount),
552    };
553    // Old version of fusermount doesn't support long --options, yet.
554    let mut proc = cmd
555        .env("_FUSE_COMMFD", format!("{}", send.as_raw_fd()))
556        .arg("-o")
557        .arg(opts)
558        .arg("--")
559        .arg(mountpoint)
560        .spawn()
561        .map_err(IoError)?;
562
563    if auto_unmount {
564        std::thread::spawn(move || {
565            let _ = proc.wait();
566        });
567    } else {
568        match proc.wait().map_err(IoError)?.code() {
569            Some(0) => {}
570            exit_code => {
571                return Err(SessionFailure(format!(
572                    "Unexpected exit code when running fusermount: {exit_code:?}"
573                )))
574            }
575        }
576    }
577    drop(send);
578
579    match vmm_sys_util::sock_ctrl_msg::ScmSocket::recv_with_fd(&recv, &mut [0u8; 8]).map_err(
580        |e| {
581            SessionFailure(format!(
582                "Unexpected error when receiving fuse file descriptor from fusermount: {}",
583                e
584            ))
585        },
586    )? {
587        (_recv_bytes, Some(file)) => Ok((file, if auto_unmount { Some(recv) } else { None })),
588        (recv_bytes, None) => Err(SessionFailure(format!(
589            "fusermount did not send a file descriptor.  We received {recv_bytes} bytes."
590        ))),
591    }
592}
593
594/// Umount a fuse file system
595fn fuse_kern_umount(mountpoint: &str, file: File, fusermount: &str) -> Result<()> {
596    let mut fds = [PollFd::new(file.as_raw_fd(), PollFlags::empty())];
597
598    if poll(&mut fds, 0).is_ok() {
599        // POLLERR means the file system is already umounted,
600        // or the connection has been aborted via /sys/fs/fuse/connections/NNN/abort
601        if let Some(event) = fds[0].revents() {
602            if event == PollFlags::POLLERR {
603                return Ok(());
604            }
605        }
606    }
607
608    // Drop to close fuse session fd, otherwise synchronous umount can recurse into filesystem and
609    // cause deadlock.
610    drop(file);
611    match umount2(mountpoint, MntFlags::MNT_DETACH) {
612        Ok(()) => Ok(()),
613        Err(Errno::EPERM) => fuse_fusermount_umount(mountpoint, fusermount),
614        Err(e) => Err(SessionFailure(format!(
615            "failed to umount {mountpoint}: {e}"
616        ))),
617    }
618}
619
620/// Umount a fuse file system by fusermount helper
621fn fuse_fusermount_umount(mountpoint: &str, fusermount: &str) -> Result<()> {
622    match std::process::Command::new(fusermount)
623        .arg("--unmount")
624        .arg("--quiet")
625        .arg("--lazy")
626        .arg("--")
627        .arg(mountpoint)
628        .status()
629        .map_err(IoError)?
630        .code()
631    {
632        Some(0) => Ok(()),
633        exit_code => Err(SessionFailure(format!(
634            "Unexpected exit code when unmounting via running fusermount: {exit_code:?}"
635        ))),
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642    use std::fs::File;
643    use std::os::unix::io::FromRawFd;
644    use std::path::Path;
645    use vmm_sys_util::tempdir::TempDir;
646
647    #[test]
648    fn test_new_session() {
649        let se = FuseSession::new(Path::new("haha"), "foo", "bar", true);
650        assert!(se.is_err());
651
652        let dir = TempDir::new().unwrap();
653        let se = FuseSession::new(dir.as_path(), "foo", "bar", false);
654        assert!(se.is_ok());
655    }
656
657    #[test]
658    fn test_new_channel() {
659        let fd = nix::unistd::dup(std::io::stdout().as_raw_fd()).unwrap();
660        let file = unsafe { File::from_raw_fd(fd) };
661        let _ = FuseChannel::new(file, 3).unwrap();
662    }
663
664    #[test]
665    fn test_fusermount() {
666        let dir = TempDir::new().unwrap();
667        let se = FuseSession::new(dir.as_path(), "foo", "bar", true);
668        assert!(se.is_ok());
669        let mut se = se.unwrap();
670        assert_eq!(se.get_fusermount(), FUSERMOUNT_BIN);
671
672        se.set_fusermount("fusermount");
673        assert_eq!(se.get_fusermount(), "fusermount");
674    }
675
676    #[test]
677    fn test_clone_fuse_file() {
678        let dir = TempDir::new().unwrap();
679        let mut se = FuseSession::new(dir.as_path(), "foo", "bar", true).unwrap();
680        se.mount().unwrap();
681
682        let cloned_file = se.clone_fuse_file().unwrap();
683        assert!(cloned_file.as_raw_fd() > 0);
684
685        se.umount().unwrap();
686        se.set_fuse_file(cloned_file);
687        se.mount().unwrap();
688    }
689}
690
691#[cfg(feature = "async_io")]
692pub use asyncio::FuseDevTask;
693
694#[cfg(feature = "async_io")]
695/// Task context to handle fuse request in asynchronous mode.
696mod asyncio {
697    use std::os::unix::io::RawFd;
698    use std::sync::Arc;
699
700    use crate::api::filesystem::AsyncFileSystem;
701    use crate::api::server::Server;
702    use crate::transport::{FuseBuf, Reader, Writer};
703
704    /// Task context to handle fuse request in asynchronous mode.
705    ///
706    /// This structure provides a context to handle fuse request in asynchronous mode, including
707    /// the fuse fd, a internal buffer and a `Server` instance to serve requests.
708    ///
709    /// ## Examples
710    /// ```ignore
711    /// let buf_size = 0x1_0000;
712    /// let state = AsyncExecutorState::new();
713    /// let mut task = FuseDevTask::new(buf_size, fuse_dev_fd, fs_server, state.clone());
714    ///
715    /// // Run the task
716    /// executor.spawn(async move { task.poll_handler().await });
717    ///
718    /// // Stop the task
719    /// state.quiesce();
720    /// ```
721    pub struct FuseDevTask<F: AsyncFileSystem + Sync> {
722        fd: RawFd,
723        buf: Vec<u8>,
724        state: AsyncExecutorState,
725        server: Arc<Server<F>>,
726    }
727
728    impl<F: AsyncFileSystem + Sync> FuseDevTask<F> {
729        /// Create a new fuse task context for asynchronous IO.
730        ///
731        /// # Parameters
732        /// - buf_size: size of buffer to receive requests from/send reply to the fuse fd
733        /// - fd: fuse device file descriptor
734        /// - server: `Server` instance to serve requests from the fuse fd
735        /// - state: shared state object to control the task object
736        ///
737        /// # Safety
738        /// The caller must ensure `fd` is valid during the lifetime of the returned task object.
739        pub fn new(
740            buf_size: usize,
741            fd: RawFd,
742            server: Arc<Server<F>>,
743            state: AsyncExecutorState,
744        ) -> Self {
745            FuseDevTask {
746                fd,
747                server,
748                state,
749                buf: vec![0x0u8; buf_size],
750            }
751        }
752
753        /// Handler to process fuse requests in asynchronous mode.
754        ///
755        /// An async fn to handle requests from the fuse fd. It works in asynchronous IO mode when:
756        /// - receiving request from fuse fd
757        /// - handling requests by calling Server::async_handle_requests()
758        /// - sending reply to fuse fd
759        ///
760        /// The async fn repeatedly return Poll::Pending when polled until the state has been set
761        /// to quiesce mode.
762        pub async fn poll_handler(&mut self) {
763            // TODO: register self.buf as io uring buffers.
764            let drive = AsyncDriver::default();
765
766            while !self.state.quiescing() {
767                let result = AsyncUtil::read(drive.clone(), self.fd, &mut self.buf, 0).await;
768                match result {
769                    Ok(len) => {
770                        // ###############################################
771                        // Note: it's a heavy hack to reuse the same underlying data
772                        // buffer for both Reader and Writer, in order to reduce memory
773                        // consumption. Here we assume Reader won't be used anymore once
774                        // we start to write to the Writer. To get rid of this hack,
775                        // just allocate a dedicated data buffer for Writer.
776                        let buf = unsafe {
777                            std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
778                        };
779                        // Reader::new() and Writer::new() should always return success.
780                        let reader =
781                            Reader::<()>::new(FuseBuf::new(&mut self.buf[0..len])).unwrap();
782                        let writer = Writer::new(self.fd, buf).unwrap();
783                        let result = unsafe {
784                            self.server
785                                .async_handle_message(drive.clone(), reader, writer, None, None)
786                                .await
787                        };
788
789                        if let Err(e) = result {
790                            // TODO: error handling
791                            error!("failed to handle fuse request, {}", e);
792                        }
793                    }
794                    Err(e) => {
795                        // TODO: error handling
796                        error!("failed to read request from fuse device fd, {}", e);
797                    }
798                }
799            }
800
801            // TODO: unregister self.buf as io uring buffers.
802
803            // Report that the task has been quiesced.
804            self.state.report();
805        }
806    }
807
808    impl<F: AsyncFileSystem + Sync> Clone for FuseDevTask<F> {
809        fn clone(&self) -> Self {
810            FuseDevTask {
811                fd: self.fd,
812                server: self.server.clone(),
813                state: self.state.clone(),
814                buf: vec![0x0u8; self.buf.capacity()],
815            }
816        }
817    }
818
819    #[cfg(test)]
820    mod tests {
821        use std::os::unix::io::AsRawFd;
822
823        use super::*;
824        use crate::api::{Vfs, VfsOptions};
825        use crate::async_util::{AsyncDriver, AsyncExecutor};
826
827        #[test]
828        fn test_fuse_task() {
829            let state = AsyncExecutorState::new();
830            let fs = Vfs::<AsyncDriver, ()>::new(VfsOptions::default());
831            let _server = Arc::new(Server::<Vfs<AsyncDriver, ()>, AsyncDriver, ()>::new(fs));
832            let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
833            let _fd = file.as_file().as_raw_fd();
834
835            let mut executor = AsyncExecutor::new(32);
836            executor.setup().unwrap();
837
838            /*
839            // Create three tasks, which could handle three concurrent fuse requests.
840            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
841            executor
842                .spawn(async move { task.poll_handler().await })
843                .unwrap();
844            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
845            executor
846                .spawn(async move { task.poll_handler().await })
847                .unwrap();
848            let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
849            executor
850                .spawn(async move { task.poll_handler().await })
851                .unwrap();
852             */
853
854            for _i in 0..10 {
855                executor.run_once(false).unwrap();
856            }
857
858            // Set existing flag
859            state.quiesce();
860            // Close the fusedev fd, so all pending async io requests will be aborted.
861            drop(file);
862
863            for _i in 0..10 {
864                executor.run_once(false).unwrap();
865            }
866        }
867    }
868}