tokio_listener/
listener.rs

1#[allow(unused_imports)]
2use std::{
3    ffi::c_int,
4    fmt::Display,
5    net::SocketAddr,
6    path::PathBuf,
7    pin::Pin,
8    str::FromStr,
9    sync::Arc,
10    task::{ready, Context, Poll},
11    time::Duration,
12};
13
14#[cfg(unix)]
15use std::os::fd::RawFd;
16
17use futures_core::{Future, Stream};
18#[cfg(feature = "inetd")]
19use futures_util::{future::Fuse, FutureExt};
20use tokio::{
21    net::{TcpListener, TcpStream},
22    sync::oneshot::{channel, Receiver, Sender},
23    time::Sleep,
24};
25use tracing::{debug, info, trace};
26
27#[cfg(unix)]
28use tokio::net::UnixListener;
29
30use crate::{
31    connection::ConnectionImpl, Connection, ListenerAddress, SomeSocketAddr, SystemOptions,
32    UserOptions,
33};
34
35/// Configured TCP. `AF_UNIX` or other stream socket acceptor.
36///
37/// Based on extended hyper 0.14's `AddrIncoming` code.
38pub struct Listener {
39    pub(crate) i: ListenerImpl,
40    sleep_on_errors: bool,
41    timeout: Option<Pin<Box<Sleep>>>,
42}
43
44impl std::fmt::Debug for Listener {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self.i {
47            ListenerImpl::Tcp { .. } => f.write_str("tokio_listener::Listener(tcp)"),
48            #[cfg(all(feature = "unix", unix))]
49            ListenerImpl::Unix { .. } => f.write_str("tokio_listener::Listener(unix)"),
50            #[cfg(feature = "inetd")]
51            ListenerImpl::Stdio(_) => f.write_str("tokio_listener::Listener(stdio)"),
52            #[cfg(feature = "multi-listener")]
53            ListenerImpl::Multi(ref x) => {
54                write!(f, "tokio_listener::Listener(multi, n={})", x.v.len())
55            }
56        }
57    }
58}
59
60async fn listen_tcp(
61    a: &SocketAddr,
62    usr_opts: &UserOptions,
63    sys_opts: &SystemOptions,
64) -> Result<ListenerImpl, std::io::Error> {
65    #[cfg(not(feature = "socket_options"))]
66    let s = TcpListener::bind(a).await?;
67    #[cfg(feature = "socket_options")]
68    let s =
69        if usr_opts.tcp_only_v6 || usr_opts.tcp_reuse_port || usr_opts.tcp_listen_backlog.is_some()
70        {
71            let s = socket2::Socket::new(
72                socket2::Domain::for_address(*a),
73                socket2::Type::STREAM,
74                None,
75            )?;
76            if usr_opts.tcp_only_v6 {
77                s.set_only_v6(true)?;
78            }
79            #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
80            if usr_opts.tcp_reuse_port {
81                s.set_reuse_port(true)?;
82            }
83            s.bind(&socket2::SockAddr::from(*a))?;
84            let backlog = usr_opts.tcp_listen_backlog.unwrap_or(1024);
85            let Ok(backlog): Result<c_int, _> = backlog.try_into() else {
86                return crate::error::BindError::InvalidUserOption {
87                    name: "tcp_listen_backlog",
88                }
89                .ioerr();
90            };
91            s.listen(backlog)?;
92            s.set_nonblocking(true)?;
93            TcpListener::from_std(std::net::TcpListener::from(s))?
94        } else {
95            TcpListener::bind(a).await?
96        };
97    Ok(ListenerImpl::Tcp(ListenerImplTcp {
98        s,
99        nodelay: sys_opts.nodelay,
100        #[cfg(feature = "socket_options")]
101        keepalive: usr_opts
102            .tcp_keepalive
103            .as_ref()
104            .map(crate::TcpKeepaliveParams::to_socket2),
105        #[cfg(feature = "socket_options")]
106        recv_buffer_size: usr_opts.recv_buffer_size,
107        #[cfg(feature = "socket_options")]
108        send_buffer_size: usr_opts.send_buffer_size,
109    }))
110}
111
112#[cfg(all(unix, feature = "unix"))]
113#[allow(clippy::similar_names)]
114fn listen_path(usr_opts: &UserOptions, p: &PathBuf) -> Result<ListenerImpl, std::io::Error> {
115    #[cfg(feature = "unix_path_tools")]
116    #[allow(clippy::collapsible_if)]
117    if usr_opts.unix_listen_unlink {
118        if std::fs::remove_file(p).is_ok() {
119            debug!(file=?p, "removed UNIX socket before listening");
120        }
121    }
122    let i = ListenerImpl::Unix(ListenerImplUnix {
123        s: UnixListener::bind(p)?,
124        #[cfg(feature = "socket_options")]
125        recv_buffer_size: usr_opts.recv_buffer_size,
126        #[cfg(feature = "socket_options")]
127        send_buffer_size: usr_opts.send_buffer_size,
128    });
129    #[cfg(feature = "unix_path_tools")]
130    {
131        use crate::UnixChmodVariant;
132        use std::os::unix::fs::PermissionsExt;
133        if let Some(chmod) = usr_opts.unix_listen_chmod {
134            let mode = match chmod {
135                UnixChmodVariant::Owner => 0o006,
136                UnixChmodVariant::Group => 0o066,
137                UnixChmodVariant::Everybody => 0o666,
138            };
139            let perms = std::fs::Permissions::from_mode(mode);
140            std::fs::set_permissions(p, perms)?;
141        }
142        if (usr_opts.unix_listen_uid, usr_opts.unix_listen_gid) != (None, None) {
143            let uid = usr_opts.unix_listen_uid.map(Into::into);
144            let gid = usr_opts.unix_listen_gid.map(Into::into);
145            nix::unistd::chown(p, uid, gid)?;
146        }
147    }
148    Ok(i)
149}
150
151#[cfg(all(feature = "unix", any(target_os = "linux", target_os = "android")))]
152fn listen_abstract(a: &String, usr_opts: &UserOptions) -> Result<ListenerImpl, std::io::Error> {
153    #[cfg(target_os = "android")]
154    use std::os::android::net::SocketAddrExt;
155    #[cfg(target_os = "linux")]
156    use std::os::linux::net::SocketAddrExt;
157    let a = std::os::unix::net::SocketAddr::from_abstract_name(a)?;
158    let s = std::os::unix::net::UnixListener::bind_addr(&a)?;
159    s.set_nonblocking(true)?;
160    Ok(ListenerImpl::Unix(ListenerImplUnix {
161        s: UnixListener::from_std(s)?,
162        #[cfg(feature = "socket_options")]
163        recv_buffer_size: usr_opts.recv_buffer_size,
164        #[cfg(feature = "socket_options")]
165        send_buffer_size: usr_opts.send_buffer_size,
166    }))
167}
168
169#[cfg(all(feature = "sd_listen", unix))]
170fn listen_from_fd(
171    usr_opts: &UserOptions,
172    fdnum: i32,
173    sys_opts: &SystemOptions,
174) -> Result<ListenerImpl, std::io::Error> {
175    use std::os::fd::FromRawFd;
176
177    use tracing::error;
178
179    use std::os::fd::IntoRawFd;
180
181    use crate::{listener_address::check_env_for_fd, BindError};
182    if !usr_opts.sd_accept_ignore_environment && check_env_for_fd(fdnum).is_none() {
183        return BindError::EvnVarError {
184            reason: "ensure specified file descriptor is valid to use as a socket",
185            var: "LISTEN_PID or LISTEN_FDS",
186            fault: "does not contain what we expect",
187        }
188        .ioerr();
189    }
190    let fd: RawFd = (fdnum).into();
191
192    let s = unsafe { socket2::Socket::from_raw_fd(fd) };
193    let sa = s.local_addr().map_err(|e| {
194        error!("Failed to determine socket domain of file descriptor {fd}: {e}");
195        e
196    })?;
197    let unix = sa.domain() == socket2::Domain::UNIX;
198    let fd = s.into_raw_fd();
199
200    if unix {
201        #[cfg(not(feature = "unix"))]
202        {
203            return BindError::MissingCompileTimeFeature {
204                reason: "use inherited UNIX socket",
205                feature: "unix",
206            }
207            .ioerr();
208        }
209        #[cfg(feature = "unix")]
210        {
211            let s = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) };
212            s.set_nonblocking(true)?;
213            Ok(ListenerImpl::Unix(ListenerImplUnix {
214                s: UnixListener::from_std(s)?,
215                #[cfg(feature = "socket_options")]
216                send_buffer_size: usr_opts.send_buffer_size,
217
218                #[cfg(feature = "socket_options")]
219                recv_buffer_size: usr_opts.recv_buffer_size,
220            }))
221        }
222    } else {
223        let s = unsafe { std::net::TcpListener::from_raw_fd(fd) };
224        s.set_nonblocking(true)?;
225        Ok(ListenerImpl::Tcp(ListenerImplTcp {
226            s: TcpListener::from_std(s)?,
227            nodelay: sys_opts.nodelay,
228            #[cfg(feature = "socket_options")]
229            keepalive: usr_opts
230                .tcp_keepalive
231                .as_ref()
232                .map(crate::TcpKeepaliveParams::to_socket2),
233            #[cfg(feature = "socket_options")]
234            recv_buffer_size: usr_opts.recv_buffer_size,
235            #[cfg(feature = "socket_options")]
236            send_buffer_size: usr_opts.send_buffer_size,
237        }))
238    }
239}
240
241#[cfg(all(feature = "sd_listen", unix))]
242fn listen_from_fd_named(
243    usr_opts: &UserOptions,
244    fdname: &str,
245    sys_opts: &SystemOptions,
246) -> Result<ListenerImpl, std::io::Error> {
247    use crate::error::BindError;
248
249    if fdname == "*" {
250        #[cfg(not(feature = "multi-listener"))]
251        {
252            return BindError::MissingCompileTimeFeature {
253                reason: "bind to all inherited sockets",
254                feature: "multi-listener",
255            }
256            .ioerr();
257        }
258
259        #[cfg(feature = "multi-listener")]
260        {
261            return listen_from_fd_all(usr_opts, sys_opts);
262        }
263    }
264
265    let listen_fdnames = crate::error::get_envvar("use named file descriptor", "LISTEN_FDNAMES")?;
266
267    let mut fd: RawFd = crate::listener_address::SD_LISTEN_FDS_START as RawFd;
268    for name in listen_fdnames.split(':') {
269        debug!("Considering LISTEN_FDNAMES chunk {name}");
270        if name == fdname {
271            return listen_from_fd(usr_opts, fd, sys_opts);
272        }
273        fd += 1;
274    }
275    debug!("Not found {fdname}");
276
277    BindError::EvnVarError {
278        reason: "use named file descriptor",
279        var: "LISTEN_FDNAMES",
280        fault: "does not contain the user-requested named file descriptor",
281    }
282    .ioerr()
283}
284
285#[cfg(all(feature = "sd_listen", unix, feature = "multi-listener"))]
286fn listen_from_fd_all(
287    usr_opts: &UserOptions,
288    sys_opts: &SystemOptions,
289) -> Result<ListenerImpl, std::io::Error> {
290    use crate::{listener_address::SD_LISTEN_FDS_START, BindError};
291    #[allow(unused_imports)]
292    use futures_util::FutureExt;
293
294    let listen_fds = crate::error::get_envvar("use all inherited file descriptors", "LISTEN_FDS")?;
295    let n: i32 = match listen_fds.parse() {
296        Ok(x) if x > 0 && x < 4096 => x,
297        _ => {
298            return BindError::EvnVarError {
299                reason: "use all inherited file descriptors",
300                var: "LISTEN_FDS",
301                fault: "bad value",
302            }
303            .ioerr()
304        }
305    };
306
307    debug!("Parsed LISTEN_FDS");
308
309    let addrs = Vec::from_iter(
310        (SD_LISTEN_FDS_START..(SD_LISTEN_FDS_START + n)).map(|x| ListenerAddress::FromFd(x)),
311    );
312
313    // Only new TCP sockets actually require real awaiting, everything else can be fast-forwarded
314    Ok(Listener::bind_multiple(&addrs, sys_opts, usr_opts)
315        .now_or_never()
316        .unwrap()?
317        .i)
318}
319
320impl Listener {
321    #[allow(clippy::missing_errors_doc)]
322    /// Creates listener corresponding specified to tokio-listener address and options.
323    ///
324    /// * For TCP addresses it tries to behave close to hyper 0.14's listener
325    /// * For UNIX path addresses, it can unlink or change permissions of the socket based on user options
326    /// * For raw fd sockets, it checkes `LISTEN_FD` and `LISTEN_PID` environment variables by default, unless opted out in user options
327    /// * For inetd it accepts only one connection. However, reporting of the error of
328    /// inability to accept the second connection is delayed until the first connection finishes, to avoid premature exit from process.
329    ///
330    /// With `hyper014` crate feature (default), the listener can be directly used as argument for `Server::builder`.
331    ///
332    /// Binding may fail due to unsupported address type, e.g. if trying to use UNIX addresses on Windows or abstract-namespaces sockets on Mac.
333    pub async fn bind(
334        addr: &ListenerAddress,
335        sys_opts: &SystemOptions,
336        usr_opts: &UserOptions,
337    ) -> std::io::Result<Self> {
338        let i: ListenerImpl = match addr {
339            ListenerAddress::Tcp(a) => listen_tcp(a, usr_opts, sys_opts).await?,
340            #[cfg(all(unix, feature = "unix"))]
341            ListenerAddress::Path(p) => listen_path(usr_opts, p)?,
342            #[cfg(all(feature = "unix", any(target_os = "linux", target_os = "android")))]
343            ListenerAddress::Abstract(a) => listen_abstract(a, usr_opts)?,
344            #[cfg(feature = "inetd")]
345            ListenerAddress::Inetd => {
346                let (tx, rx) = channel();
347                ListenerImpl::Stdio(StdioListener {
348                    rx: rx.fuse(),
349                    token: Some(tx),
350                })
351            }
352            #[cfg(all(feature = "sd_listen", unix))]
353            ListenerAddress::FromFd(fdnum) => listen_from_fd(usr_opts, *fdnum, sys_opts)?,
354            #[cfg(all(feature = "sd_listen", unix))]
355            ListenerAddress::FromFdNamed(fdname) => {
356                listen_from_fd_named(usr_opts, fdname, sys_opts)?
357            }
358            #[allow(unreachable_patterns)]
359            _ => {
360                #[allow(unused_imports)]
361                use crate::BindError::{MissingCompileTimeFeature, MissingPlatformSupport};
362                let err = match addr {
363                    ListenerAddress::Tcp(_) => unreachable!(),
364                    ListenerAddress::Path(_) => {
365                        #[cfg(unix)]
366                        {
367                            MissingCompileTimeFeature {
368                                reason: "bind UNIX path socket",
369                                feature: "unix",
370                            }
371                        }
372                        #[cfg(not(unix))]
373                        {
374                            MissingPlatformSupport {
375                                reason: "bind UNIX path socket",
376                                feature: "UNIX-like platform",
377                            }
378                        }
379                    }
380                    ListenerAddress::Abstract(_) => {
381                        #[cfg(any(target_os = "linux", target_os = "android"))]
382                        {
383                            MissingCompileTimeFeature {
384                                reason: "bind abstract-namespaced UNIX socket",
385                                feature: "unix",
386                            }
387                        }
388                        #[cfg(not(any(target_os = "linux", target_os = "android")))]
389                        {
390                            MissingPlatformSupport {
391                                reason: "bind abstract-namespaced UNIX socket",
392                                feature: "Linux or Android platform",
393                            }
394                        }
395                    }
396                    ListenerAddress::Inetd => MissingCompileTimeFeature {
397                        reason: "use stdin/stdout as a socket",
398                        feature: "inetd",
399                    },
400                    ListenerAddress::FromFd(_) | ListenerAddress::FromFdNamed(_) => {
401                        #[cfg(unix)]
402                        {
403                            MissingCompileTimeFeature {
404                                reason: "use inherited file descriptor",
405                                feature: "sd_listen",
406                            }
407                        }
408                        #[cfg(not(unix))]
409                        {
410                            MissingPlatformSupport {
411                                reason: "use inherited file descriptor",
412                                feature: "UNIX-like platform",
413                            }
414                        }
415                    }
416                };
417                return err.ioerr();
418            }
419        };
420        Ok(Listener {
421            i,
422            sleep_on_errors: sys_opts.sleep_on_errors,
423            timeout: None,
424        })
425    }
426
427    /// Create a listener that accepts connections on multipe sockets simultaneously.
428    ///
429    /// Fails if `addrs` is empty slice or if any of the parts failed to initialise.
430    ///
431    /// See documentation of [`bind`] method for other help.
432    #[cfg_attr(docsrs_alt, doc(cfg(feature = "multi-listener")))]
433    #[cfg(feature = "multi-listener")]
434    pub async fn bind_multiple(
435        addrs: &[ListenerAddress],
436        sys_opts: &SystemOptions,
437        usr_opts: &UserOptions,
438    ) -> std::io::Result<Self> {
439        if addrs.is_empty() {
440            return crate::error::BindError::MultiBindWithoutAddresses.ioerr();
441        }
442        if addrs.len() == 1 {
443            return Listener::bind(&addrs[0], sys_opts, usr_opts).await;
444        }
445        let mut v = Vec::with_capacity(addrs.len());
446        for addr in addrs {
447            debug!("Binding {addr}");
448            let l = Listener::bind(addr, sys_opts, usr_opts).await?;
449            v.push(l.i);
450        }
451        Ok(Listener {
452            i: ListenerImpl::Multi(ListenerImplMulti { v }),
453            sleep_on_errors: sys_opts.sleep_on_errors,
454            timeout: None,
455        })
456    }
457}
458
459#[cfg(feature = "inetd")]
460pub(crate) struct StdioListener {
461    rx: Fuse<Receiver<()>>,
462    token: Option<Sender<()>>,
463}
464
465#[cfg(feature = "inetd")]
466impl StdioListener {
467    fn poll_accept(
468        &mut self,
469        cx: &mut Context<'_>,
470    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
471        match self.token.take() {
472            Some(tx) => {
473                debug!(r#type = "stdio", "incoming connection");
474                Poll::Ready(Ok((
475                    Connection(ConnectionImpl::Stdio(
476                        tokio::io::stdin(),
477                        tokio::io::stdout(),
478                        Some(tx),
479                    )),
480                    SomeSocketAddr::Stdio,
481                )))
482            }
483            None => match Pin::new(&mut self.rx).poll(cx) {
484                Poll::Ready(..) => {
485                    trace!("finished waiting for liberation of stdout to stop listening loop");
486                    Poll::Ready(Err(std::io::Error::new(
487                        std::io::ErrorKind::Other,
488                        crate::error::AcceptError::InetdPseudosocketAlreadyTaken,
489                    )))
490                }
491                Poll::Pending => Poll::Pending,
492            },
493        }
494    }
495}
496
497#[allow(clippy::missing_errors_doc)]
498#[allow(missing_docs)]
499impl Listener {
500    pub fn try_borrow_tcp_listener(&self) -> Option<&TcpListener> {
501        if let ListenerImpl::Tcp(ListenerImplTcp { ref s, .. }) = self.i {
502            Some(s)
503        } else {
504            None
505        }
506    }
507    #[cfg(all(feature = "unix", unix))]
508    #[cfg_attr(docsrs_alt, doc(cfg(all(feature = "unix", unix))))]
509    pub fn try_borrow_unix_listener(&self) -> Option<&UnixListener> {
510        if let ListenerImpl::Unix(ListenerImplUnix { s: ref x, .. }) = self.i {
511            Some(x)
512        } else {
513            None
514        }
515    }
516
517    pub fn try_into_tcp_listener(self) -> Result<TcpListener, Self> {
518        if let ListenerImpl::Tcp(ListenerImplTcp { s, .. }) = self.i {
519            Ok(s)
520        } else {
521            Err(self)
522        }
523    }
524    #[cfg(all(feature = "unix", unix))]
525    #[cfg_attr(docsrs_alt, doc(cfg(all(feature = "unix", unix))))]
526    pub fn try_into_unix_listener(self) -> Result<UnixListener, Self> {
527        if let ListenerImpl::Unix(ListenerImplUnix { s, .. }) = self.i {
528            Ok(s)
529        } else {
530            Err(self)
531        }
532    }
533
534    /// This listener is in inetd (stdin/stdout) more and the sole connection is already accepted
535    #[allow(unreachable_code)]
536    pub fn no_more_connections(&self) -> bool {
537        #[cfg(feature = "inetd")]
538        return if let ListenerImpl::Stdio(ref x) = self.i {
539            x.token.is_none()
540        } else {
541            false
542        };
543        false
544    }
545
546    /// See main [`Listener::bind`] documentation for specifics of how it accepts connections
547    pub fn poll_accept(
548        &mut self,
549        cx: &mut Context<'_>,
550    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
551        loop {
552            if let Some(ref mut to) = self.timeout {
553                ready!(Pin::new(to).poll(cx));
554            }
555            self.timeout = None;
556
557            let ret = self.i.poll_accept(cx);
558
559            #[cfg(feature = "inetd")]
560            if matches!(self.i, ListenerImpl::Stdio(..)) {
561                return ret;
562            }
563
564            let e: std::io::Error = match ret {
565                Poll::Ready(Err(e)) => e,
566                Poll::Ready(Ok(x)) => return Poll::Ready(Ok(x)),
567                Poll::Pending => return Poll::Pending,
568            };
569            if is_connection_error(&e) {
570                info!(action = "retry", "failed_accept");
571                continue;
572            }
573            if self.sleep_on_errors {
574                info!(action = "sleep_retry", "failed_accept");
575                self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_secs(1))));
576            } else {
577                info!(action = "error", "failed_accept");
578                return Poll::Ready(Err(e));
579            }
580        }
581    }
582
583    /// See main [`Listener::bind`] documentation for specifics of how it accepts connections
584    pub async fn accept(&mut self) -> std::io::Result<(Connection, SomeSocketAddr)> {
585        std::future::poll_fn(|cx| self.poll_accept(cx)).await
586    }
587}
588
589impl Stream for Listener {
590    type Item = std::io::Result<Connection>;
591
592    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
593        match self.poll_accept(cx) {
594            Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(connection))),
595            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
596            Poll::Pending => Poll::Pending,
597        }
598    }
599}
600
601/// This function defines errors that are per-connection. Which basically
602/// means that if we get this error from `accept()` system call it means
603/// next connection might be ready to be accepted.
604///
605/// All other errors will incur a timeout before next `accept()` is performed.
606/// The timeout is useful to handle resource exhaustion errors like ENFILE
607/// and EMFILE. Otherwise, could enter into tight loop.
608///
609/// Based on <https://docs.rs/hyper/latest/src/hyper/server/tcp.rs.html#109-116>
610pub(crate) fn is_connection_error(e: &std::io::Error) -> bool {
611    matches!(
612        e.kind(),
613        std::io::ErrorKind::ConnectionRefused
614            | std::io::ErrorKind::ConnectionAborted
615            | std::io::ErrorKind::ConnectionReset
616    )
617}
618
619pub(crate) struct ListenerImplTcp {
620    pub(crate) s: TcpListener,
621    nodelay: bool,
622    #[cfg(feature = "socket_options")]
623    keepalive: Option<socket2::TcpKeepalive>,
624    #[cfg(feature = "socket_options")]
625    recv_buffer_size: Option<usize>,
626    #[cfg(feature = "socket_options")]
627    send_buffer_size: Option<usize>,
628}
629
630#[cfg(all(feature = "unix", unix))]
631pub(crate) struct ListenerImplUnix {
632    pub(crate) s: UnixListener,
633    #[cfg(feature = "socket_options")]
634    recv_buffer_size: Option<usize>,
635    #[cfg(feature = "socket_options")]
636    send_buffer_size: Option<usize>,
637}
638
639#[cfg(feature = "multi-listener")]
640pub(crate) struct ListenerImplMulti {
641    pub(crate) v: Vec<ListenerImpl>,
642}
643
644pub(crate) enum ListenerImpl {
645    Tcp(ListenerImplTcp),
646    #[cfg(all(feature = "unix", unix))]
647    Unix(ListenerImplUnix),
648    #[cfg(feature = "inetd")]
649    Stdio(StdioListener),
650    #[cfg(feature = "multi-listener")]
651    Multi(ListenerImplMulti),
652}
653
654impl ListenerImpl {
655    fn poll_accept(
656        &mut self,
657        cx: &mut Context<'_>,
658    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
659        match self {
660            ListenerImpl::Tcp(ti) => ti.poll_accept(cx),
661            #[cfg(all(feature = "unix", unix))]
662            ListenerImpl::Unix(ui) => ui.poll_accept(cx),
663            #[cfg(feature = "inetd")]
664            ListenerImpl::Stdio(x) => x.poll_accept(cx),
665            #[cfg(feature = "multi-listener")]
666            ListenerImpl::Multi(x) => x.poll_accept(cx),
667        }
668    }
669}
670
671impl ListenerImplTcp {
672    fn poll_accept(
673        &mut self,
674        cx: &mut Context<'_>,
675    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
676        let ListenerImplTcp {
677            s,
678            nodelay,
679            #[cfg(feature = "socket_options")]
680            keepalive,
681            #[cfg(feature = "socket_options")]
682            recv_buffer_size,
683            #[cfg(feature = "socket_options")]
684            send_buffer_size,
685        } = self;
686        match s.poll_accept(cx) {
687            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
688            Poll::Ready(Ok((c, a))) => {
689                debug!(fromaddr=%a, r#type="tcp", "incoming connection");
690                if *nodelay {
691                    c.set_nodelay(true)?;
692                }
693
694                #[cfg(feature = "socket_options")]
695                {
696                    apply_tcp_keepalive_opts(&c, keepalive)?;
697                    apply_socket_buf_opts(&c, recv_buffer_size, send_buffer_size)?;
698                }
699
700                Poll::Ready(Ok((
701                    Connection(ConnectionImpl::Tcp(c)),
702                    SomeSocketAddr::Tcp(a),
703                )))
704            }
705            Poll::Pending => Poll::Pending,
706        }
707    }
708}
709
710#[cfg(all(feature = "unix", unix))]
711impl ListenerImplUnix {
712    fn poll_accept(
713        &mut self,
714        cx: &mut Context<'_>,
715    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
716        let ListenerImplUnix {
717            s,
718            #[cfg(feature = "socket_options")]
719            recv_buffer_size,
720            #[cfg(feature = "socket_options")]
721            send_buffer_size,
722        } = self;
723        match s.poll_accept(cx) {
724            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
725            Poll::Ready(Ok((c, a))) => {
726                debug!(r#type = "unix", "incoming connection");
727                #[cfg(feature = "socket_options")]
728                {
729                    apply_socket_buf_opts(&c, recv_buffer_size, send_buffer_size)?;
730                }
731                Poll::Ready(Ok((
732                    Connection(ConnectionImpl::Unix(c)),
733                    SomeSocketAddr::Unix(a),
734                )))
735            }
736            Poll::Pending => Poll::Pending,
737        }
738    }
739}
740
741#[cfg(feature = "multi-listener")]
742impl ListenerImplMulti {
743    fn poll_accept(
744        &mut self,
745        cx: &mut Context<'_>,
746    ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
747        for s in self.v.iter_mut() {
748            match s.poll_accept(cx) {
749                Poll::Ready(x) => return Poll::Ready(x),
750                Poll::Pending => (),
751            }
752        }
753        Poll::Pending
754    }
755}
756
757#[cfg(feature = "socket_options")]
758fn apply_tcp_keepalive_opts(
759    c: &TcpStream,
760    keepalive: &Option<socket2::TcpKeepalive>,
761) -> std::io::Result<()> {
762    let sock_ref = socket2::SockRef::from(&c);
763    if let Some(ka) = keepalive {
764        sock_ref.set_tcp_keepalive(ka)?;
765    }
766    Ok(())
767}
768
769#[cfg(all(feature = "socket_options", unix))]
770fn apply_socket_buf_opts<T: std::os::fd::AsFd>(
771    c: &T,
772    recv_buffer_size: &Option<usize>,
773    send_buffer_size: &Option<usize>,
774) -> std::io::Result<()> {
775    let sock_ref = socket2::SockRef::from(&c);
776    if let Some(n) = recv_buffer_size {
777        sock_ref.set_recv_buffer_size(*n)?;
778    }
779    if let Some(n) = send_buffer_size {
780        sock_ref.set_send_buffer_size(*n)?;
781    }
782    Ok(())
783}
784
785#[cfg(all(feature = "socket_options", not(unix)))]
786fn apply_socket_buf_opts(
787    c: &TcpStream,
788    recv_buffer_size: &Option<usize>,
789    send_buffer_size: &Option<usize>,
790) -> std::io::Result<()> {
791    let sock_ref = socket2::SockRef::from(&c);
792    if let Some(n) = recv_buffer_size {
793        sock_ref.set_recv_buffer_size(*n)?;
794    }
795    if let Some(n) = send_buffer_size {
796        sock_ref.set_send_buffer_size(*n)?;
797    }
798    Ok(())
799}