1#[allow(unused_imports)]
2use std::{
3 ffi::c_int,
4 fmt::Display,
5 net::SocketAddr,
6 path::PathBuf,
7 pin::Pin,
8 str::FromStr,
9 sync::Arc,
10 task::{ready, Context, Poll},
11 time::Duration,
12};
13
14#[cfg(unix)]
15use std::os::fd::RawFd;
16
17use futures_core::{Future, Stream};
18#[cfg(feature = "inetd")]
19use futures_util::{future::Fuse, FutureExt};
20use tokio::{
21 net::{TcpListener, TcpStream},
22 sync::oneshot::{channel, Receiver, Sender},
23 time::Sleep,
24};
25use tracing::{debug, info, trace};
26
27#[cfg(unix)]
28use tokio::net::UnixListener;
29
30use crate::{
31 connection::ConnectionImpl, Connection, ListenerAddress, SomeSocketAddr, SystemOptions,
32 UserOptions,
33};
34
35pub struct Listener {
39 pub(crate) i: ListenerImpl,
40 sleep_on_errors: bool,
41 timeout: Option<Pin<Box<Sleep>>>,
42}
43
44impl std::fmt::Debug for Listener {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self.i {
47 ListenerImpl::Tcp { .. } => f.write_str("tokio_listener::Listener(tcp)"),
48 #[cfg(all(feature = "unix", unix))]
49 ListenerImpl::Unix { .. } => f.write_str("tokio_listener::Listener(unix)"),
50 #[cfg(feature = "inetd")]
51 ListenerImpl::Stdio(_) => f.write_str("tokio_listener::Listener(stdio)"),
52 #[cfg(feature = "multi-listener")]
53 ListenerImpl::Multi(ref x) => {
54 write!(f, "tokio_listener::Listener(multi, n={})", x.v.len())
55 }
56 }
57 }
58}
59
60async fn listen_tcp(
61 a: &SocketAddr,
62 usr_opts: &UserOptions,
63 sys_opts: &SystemOptions,
64) -> Result<ListenerImpl, std::io::Error> {
65 #[cfg(not(feature = "socket_options"))]
66 let s = TcpListener::bind(a).await?;
67 #[cfg(feature = "socket_options")]
68 let s =
69 if usr_opts.tcp_only_v6 || usr_opts.tcp_reuse_port || usr_opts.tcp_listen_backlog.is_some()
70 {
71 let s = socket2::Socket::new(
72 socket2::Domain::for_address(*a),
73 socket2::Type::STREAM,
74 None,
75 )?;
76 if usr_opts.tcp_only_v6 {
77 s.set_only_v6(true)?;
78 }
79 #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
80 if usr_opts.tcp_reuse_port {
81 s.set_reuse_port(true)?;
82 }
83 s.bind(&socket2::SockAddr::from(*a))?;
84 let backlog = usr_opts.tcp_listen_backlog.unwrap_or(1024);
85 let Ok(backlog): Result<c_int, _> = backlog.try_into() else {
86 return crate::error::BindError::InvalidUserOption {
87 name: "tcp_listen_backlog",
88 }
89 .ioerr();
90 };
91 s.listen(backlog)?;
92 s.set_nonblocking(true)?;
93 TcpListener::from_std(std::net::TcpListener::from(s))?
94 } else {
95 TcpListener::bind(a).await?
96 };
97 Ok(ListenerImpl::Tcp(ListenerImplTcp {
98 s,
99 nodelay: sys_opts.nodelay,
100 #[cfg(feature = "socket_options")]
101 keepalive: usr_opts
102 .tcp_keepalive
103 .as_ref()
104 .map(crate::TcpKeepaliveParams::to_socket2),
105 #[cfg(feature = "socket_options")]
106 recv_buffer_size: usr_opts.recv_buffer_size,
107 #[cfg(feature = "socket_options")]
108 send_buffer_size: usr_opts.send_buffer_size,
109 }))
110}
111
112#[cfg(all(unix, feature = "unix"))]
113#[allow(clippy::similar_names)]
114fn listen_path(usr_opts: &UserOptions, p: &PathBuf) -> Result<ListenerImpl, std::io::Error> {
115 #[cfg(feature = "unix_path_tools")]
116 #[allow(clippy::collapsible_if)]
117 if usr_opts.unix_listen_unlink {
118 if std::fs::remove_file(p).is_ok() {
119 debug!(file=?p, "removed UNIX socket before listening");
120 }
121 }
122 let i = ListenerImpl::Unix(ListenerImplUnix {
123 s: UnixListener::bind(p)?,
124 #[cfg(feature = "socket_options")]
125 recv_buffer_size: usr_opts.recv_buffer_size,
126 #[cfg(feature = "socket_options")]
127 send_buffer_size: usr_opts.send_buffer_size,
128 });
129 #[cfg(feature = "unix_path_tools")]
130 {
131 use crate::UnixChmodVariant;
132 use std::os::unix::fs::PermissionsExt;
133 if let Some(chmod) = usr_opts.unix_listen_chmod {
134 let mode = match chmod {
135 UnixChmodVariant::Owner => 0o006,
136 UnixChmodVariant::Group => 0o066,
137 UnixChmodVariant::Everybody => 0o666,
138 };
139 let perms = std::fs::Permissions::from_mode(mode);
140 std::fs::set_permissions(p, perms)?;
141 }
142 if (usr_opts.unix_listen_uid, usr_opts.unix_listen_gid) != (None, None) {
143 let uid = usr_opts.unix_listen_uid.map(Into::into);
144 let gid = usr_opts.unix_listen_gid.map(Into::into);
145 nix::unistd::chown(p, uid, gid)?;
146 }
147 }
148 Ok(i)
149}
150
151#[cfg(all(feature = "unix", any(target_os = "linux", target_os = "android")))]
152fn listen_abstract(a: &String, usr_opts: &UserOptions) -> Result<ListenerImpl, std::io::Error> {
153 #[cfg(target_os = "android")]
154 use std::os::android::net::SocketAddrExt;
155 #[cfg(target_os = "linux")]
156 use std::os::linux::net::SocketAddrExt;
157 let a = std::os::unix::net::SocketAddr::from_abstract_name(a)?;
158 let s = std::os::unix::net::UnixListener::bind_addr(&a)?;
159 s.set_nonblocking(true)?;
160 Ok(ListenerImpl::Unix(ListenerImplUnix {
161 s: UnixListener::from_std(s)?,
162 #[cfg(feature = "socket_options")]
163 recv_buffer_size: usr_opts.recv_buffer_size,
164 #[cfg(feature = "socket_options")]
165 send_buffer_size: usr_opts.send_buffer_size,
166 }))
167}
168
169#[cfg(all(feature = "sd_listen", unix))]
170fn listen_from_fd(
171 usr_opts: &UserOptions,
172 fdnum: i32,
173 sys_opts: &SystemOptions,
174) -> Result<ListenerImpl, std::io::Error> {
175 use std::os::fd::FromRawFd;
176
177 use tracing::error;
178
179 use std::os::fd::IntoRawFd;
180
181 use crate::{listener_address::check_env_for_fd, BindError};
182 if !usr_opts.sd_accept_ignore_environment && check_env_for_fd(fdnum).is_none() {
183 return BindError::EvnVarError {
184 reason: "ensure specified file descriptor is valid to use as a socket",
185 var: "LISTEN_PID or LISTEN_FDS",
186 fault: "does not contain what we expect",
187 }
188 .ioerr();
189 }
190 let fd: RawFd = (fdnum).into();
191
192 let s = unsafe { socket2::Socket::from_raw_fd(fd) };
193 let sa = s.local_addr().map_err(|e| {
194 error!("Failed to determine socket domain of file descriptor {fd}: {e}");
195 e
196 })?;
197 let unix = sa.domain() == socket2::Domain::UNIX;
198 let fd = s.into_raw_fd();
199
200 if unix {
201 #[cfg(not(feature = "unix"))]
202 {
203 return BindError::MissingCompileTimeFeature {
204 reason: "use inherited UNIX socket",
205 feature: "unix",
206 }
207 .ioerr();
208 }
209 #[cfg(feature = "unix")]
210 {
211 let s = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) };
212 s.set_nonblocking(true)?;
213 Ok(ListenerImpl::Unix(ListenerImplUnix {
214 s: UnixListener::from_std(s)?,
215 #[cfg(feature = "socket_options")]
216 send_buffer_size: usr_opts.send_buffer_size,
217
218 #[cfg(feature = "socket_options")]
219 recv_buffer_size: usr_opts.recv_buffer_size,
220 }))
221 }
222 } else {
223 let s = unsafe { std::net::TcpListener::from_raw_fd(fd) };
224 s.set_nonblocking(true)?;
225 Ok(ListenerImpl::Tcp(ListenerImplTcp {
226 s: TcpListener::from_std(s)?,
227 nodelay: sys_opts.nodelay,
228 #[cfg(feature = "socket_options")]
229 keepalive: usr_opts
230 .tcp_keepalive
231 .as_ref()
232 .map(crate::TcpKeepaliveParams::to_socket2),
233 #[cfg(feature = "socket_options")]
234 recv_buffer_size: usr_opts.recv_buffer_size,
235 #[cfg(feature = "socket_options")]
236 send_buffer_size: usr_opts.send_buffer_size,
237 }))
238 }
239}
240
241#[cfg(all(feature = "sd_listen", unix))]
242fn listen_from_fd_named(
243 usr_opts: &UserOptions,
244 fdname: &str,
245 sys_opts: &SystemOptions,
246) -> Result<ListenerImpl, std::io::Error> {
247 use crate::error::BindError;
248
249 if fdname == "*" {
250 #[cfg(not(feature = "multi-listener"))]
251 {
252 return BindError::MissingCompileTimeFeature {
253 reason: "bind to all inherited sockets",
254 feature: "multi-listener",
255 }
256 .ioerr();
257 }
258
259 #[cfg(feature = "multi-listener")]
260 {
261 return listen_from_fd_all(usr_opts, sys_opts);
262 }
263 }
264
265 let listen_fdnames = crate::error::get_envvar("use named file descriptor", "LISTEN_FDNAMES")?;
266
267 let mut fd: RawFd = crate::listener_address::SD_LISTEN_FDS_START as RawFd;
268 for name in listen_fdnames.split(':') {
269 debug!("Considering LISTEN_FDNAMES chunk {name}");
270 if name == fdname {
271 return listen_from_fd(usr_opts, fd, sys_opts);
272 }
273 fd += 1;
274 }
275 debug!("Not found {fdname}");
276
277 BindError::EvnVarError {
278 reason: "use named file descriptor",
279 var: "LISTEN_FDNAMES",
280 fault: "does not contain the user-requested named file descriptor",
281 }
282 .ioerr()
283}
284
285#[cfg(all(feature = "sd_listen", unix, feature = "multi-listener"))]
286fn listen_from_fd_all(
287 usr_opts: &UserOptions,
288 sys_opts: &SystemOptions,
289) -> Result<ListenerImpl, std::io::Error> {
290 use crate::{listener_address::SD_LISTEN_FDS_START, BindError};
291 #[allow(unused_imports)]
292 use futures_util::FutureExt;
293
294 let listen_fds = crate::error::get_envvar("use all inherited file descriptors", "LISTEN_FDS")?;
295 let n: i32 = match listen_fds.parse() {
296 Ok(x) if x > 0 && x < 4096 => x,
297 _ => {
298 return BindError::EvnVarError {
299 reason: "use all inherited file descriptors",
300 var: "LISTEN_FDS",
301 fault: "bad value",
302 }
303 .ioerr()
304 }
305 };
306
307 debug!("Parsed LISTEN_FDS");
308
309 let addrs = Vec::from_iter(
310 (SD_LISTEN_FDS_START..(SD_LISTEN_FDS_START + n)).map(|x| ListenerAddress::FromFd(x)),
311 );
312
313 Ok(Listener::bind_multiple(&addrs, sys_opts, usr_opts)
315 .now_or_never()
316 .unwrap()?
317 .i)
318}
319
320impl Listener {
321 #[allow(clippy::missing_errors_doc)]
322 pub async fn bind(
334 addr: &ListenerAddress,
335 sys_opts: &SystemOptions,
336 usr_opts: &UserOptions,
337 ) -> std::io::Result<Self> {
338 let i: ListenerImpl = match addr {
339 ListenerAddress::Tcp(a) => listen_tcp(a, usr_opts, sys_opts).await?,
340 #[cfg(all(unix, feature = "unix"))]
341 ListenerAddress::Path(p) => listen_path(usr_opts, p)?,
342 #[cfg(all(feature = "unix", any(target_os = "linux", target_os = "android")))]
343 ListenerAddress::Abstract(a) => listen_abstract(a, usr_opts)?,
344 #[cfg(feature = "inetd")]
345 ListenerAddress::Inetd => {
346 let (tx, rx) = channel();
347 ListenerImpl::Stdio(StdioListener {
348 rx: rx.fuse(),
349 token: Some(tx),
350 })
351 }
352 #[cfg(all(feature = "sd_listen", unix))]
353 ListenerAddress::FromFd(fdnum) => listen_from_fd(usr_opts, *fdnum, sys_opts)?,
354 #[cfg(all(feature = "sd_listen", unix))]
355 ListenerAddress::FromFdNamed(fdname) => {
356 listen_from_fd_named(usr_opts, fdname, sys_opts)?
357 }
358 #[allow(unreachable_patterns)]
359 _ => {
360 #[allow(unused_imports)]
361 use crate::BindError::{MissingCompileTimeFeature, MissingPlatformSupport};
362 let err = match addr {
363 ListenerAddress::Tcp(_) => unreachable!(),
364 ListenerAddress::Path(_) => {
365 #[cfg(unix)]
366 {
367 MissingCompileTimeFeature {
368 reason: "bind UNIX path socket",
369 feature: "unix",
370 }
371 }
372 #[cfg(not(unix))]
373 {
374 MissingPlatformSupport {
375 reason: "bind UNIX path socket",
376 feature: "UNIX-like platform",
377 }
378 }
379 }
380 ListenerAddress::Abstract(_) => {
381 #[cfg(any(target_os = "linux", target_os = "android"))]
382 {
383 MissingCompileTimeFeature {
384 reason: "bind abstract-namespaced UNIX socket",
385 feature: "unix",
386 }
387 }
388 #[cfg(not(any(target_os = "linux", target_os = "android")))]
389 {
390 MissingPlatformSupport {
391 reason: "bind abstract-namespaced UNIX socket",
392 feature: "Linux or Android platform",
393 }
394 }
395 }
396 ListenerAddress::Inetd => MissingCompileTimeFeature {
397 reason: "use stdin/stdout as a socket",
398 feature: "inetd",
399 },
400 ListenerAddress::FromFd(_) | ListenerAddress::FromFdNamed(_) => {
401 #[cfg(unix)]
402 {
403 MissingCompileTimeFeature {
404 reason: "use inherited file descriptor",
405 feature: "sd_listen",
406 }
407 }
408 #[cfg(not(unix))]
409 {
410 MissingPlatformSupport {
411 reason: "use inherited file descriptor",
412 feature: "UNIX-like platform",
413 }
414 }
415 }
416 };
417 return err.ioerr();
418 }
419 };
420 Ok(Listener {
421 i,
422 sleep_on_errors: sys_opts.sleep_on_errors,
423 timeout: None,
424 })
425 }
426
427 #[cfg_attr(docsrs_alt, doc(cfg(feature = "multi-listener")))]
433 #[cfg(feature = "multi-listener")]
434 pub async fn bind_multiple(
435 addrs: &[ListenerAddress],
436 sys_opts: &SystemOptions,
437 usr_opts: &UserOptions,
438 ) -> std::io::Result<Self> {
439 if addrs.is_empty() {
440 return crate::error::BindError::MultiBindWithoutAddresses.ioerr();
441 }
442 if addrs.len() == 1 {
443 return Listener::bind(&addrs[0], sys_opts, usr_opts).await;
444 }
445 let mut v = Vec::with_capacity(addrs.len());
446 for addr in addrs {
447 debug!("Binding {addr}");
448 let l = Listener::bind(addr, sys_opts, usr_opts).await?;
449 v.push(l.i);
450 }
451 Ok(Listener {
452 i: ListenerImpl::Multi(ListenerImplMulti { v }),
453 sleep_on_errors: sys_opts.sleep_on_errors,
454 timeout: None,
455 })
456 }
457}
458
459#[cfg(feature = "inetd")]
460pub(crate) struct StdioListener {
461 rx: Fuse<Receiver<()>>,
462 token: Option<Sender<()>>,
463}
464
465#[cfg(feature = "inetd")]
466impl StdioListener {
467 fn poll_accept(
468 &mut self,
469 cx: &mut Context<'_>,
470 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
471 match self.token.take() {
472 Some(tx) => {
473 debug!(r#type = "stdio", "incoming connection");
474 Poll::Ready(Ok((
475 Connection(ConnectionImpl::Stdio(
476 tokio::io::stdin(),
477 tokio::io::stdout(),
478 Some(tx),
479 )),
480 SomeSocketAddr::Stdio,
481 )))
482 }
483 None => match Pin::new(&mut self.rx).poll(cx) {
484 Poll::Ready(..) => {
485 trace!("finished waiting for liberation of stdout to stop listening loop");
486 Poll::Ready(Err(std::io::Error::new(
487 std::io::ErrorKind::Other,
488 crate::error::AcceptError::InetdPseudosocketAlreadyTaken,
489 )))
490 }
491 Poll::Pending => Poll::Pending,
492 },
493 }
494 }
495}
496
497#[allow(clippy::missing_errors_doc)]
498#[allow(missing_docs)]
499impl Listener {
500 pub fn try_borrow_tcp_listener(&self) -> Option<&TcpListener> {
501 if let ListenerImpl::Tcp(ListenerImplTcp { ref s, .. }) = self.i {
502 Some(s)
503 } else {
504 None
505 }
506 }
507 #[cfg(all(feature = "unix", unix))]
508 #[cfg_attr(docsrs_alt, doc(cfg(all(feature = "unix", unix))))]
509 pub fn try_borrow_unix_listener(&self) -> Option<&UnixListener> {
510 if let ListenerImpl::Unix(ListenerImplUnix { s: ref x, .. }) = self.i {
511 Some(x)
512 } else {
513 None
514 }
515 }
516
517 pub fn try_into_tcp_listener(self) -> Result<TcpListener, Self> {
518 if let ListenerImpl::Tcp(ListenerImplTcp { s, .. }) = self.i {
519 Ok(s)
520 } else {
521 Err(self)
522 }
523 }
524 #[cfg(all(feature = "unix", unix))]
525 #[cfg_attr(docsrs_alt, doc(cfg(all(feature = "unix", unix))))]
526 pub fn try_into_unix_listener(self) -> Result<UnixListener, Self> {
527 if let ListenerImpl::Unix(ListenerImplUnix { s, .. }) = self.i {
528 Ok(s)
529 } else {
530 Err(self)
531 }
532 }
533
534 #[allow(unreachable_code)]
536 pub fn no_more_connections(&self) -> bool {
537 #[cfg(feature = "inetd")]
538 return if let ListenerImpl::Stdio(ref x) = self.i {
539 x.token.is_none()
540 } else {
541 false
542 };
543 false
544 }
545
546 pub fn poll_accept(
548 &mut self,
549 cx: &mut Context<'_>,
550 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
551 loop {
552 if let Some(ref mut to) = self.timeout {
553 ready!(Pin::new(to).poll(cx));
554 }
555 self.timeout = None;
556
557 let ret = self.i.poll_accept(cx);
558
559 #[cfg(feature = "inetd")]
560 if matches!(self.i, ListenerImpl::Stdio(..)) {
561 return ret;
562 }
563
564 let e: std::io::Error = match ret {
565 Poll::Ready(Err(e)) => e,
566 Poll::Ready(Ok(x)) => return Poll::Ready(Ok(x)),
567 Poll::Pending => return Poll::Pending,
568 };
569 if is_connection_error(&e) {
570 info!(action = "retry", "failed_accept");
571 continue;
572 }
573 if self.sleep_on_errors {
574 info!(action = "sleep_retry", "failed_accept");
575 self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_secs(1))));
576 } else {
577 info!(action = "error", "failed_accept");
578 return Poll::Ready(Err(e));
579 }
580 }
581 }
582
583 pub async fn accept(&mut self) -> std::io::Result<(Connection, SomeSocketAddr)> {
585 std::future::poll_fn(|cx| self.poll_accept(cx)).await
586 }
587}
588
589impl Stream for Listener {
590 type Item = std::io::Result<Connection>;
591
592 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
593 match self.poll_accept(cx) {
594 Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(connection))),
595 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
596 Poll::Pending => Poll::Pending,
597 }
598 }
599}
600
601pub(crate) fn is_connection_error(e: &std::io::Error) -> bool {
611 matches!(
612 e.kind(),
613 std::io::ErrorKind::ConnectionRefused
614 | std::io::ErrorKind::ConnectionAborted
615 | std::io::ErrorKind::ConnectionReset
616 )
617}
618
619pub(crate) struct ListenerImplTcp {
620 pub(crate) s: TcpListener,
621 nodelay: bool,
622 #[cfg(feature = "socket_options")]
623 keepalive: Option<socket2::TcpKeepalive>,
624 #[cfg(feature = "socket_options")]
625 recv_buffer_size: Option<usize>,
626 #[cfg(feature = "socket_options")]
627 send_buffer_size: Option<usize>,
628}
629
630#[cfg(all(feature = "unix", unix))]
631pub(crate) struct ListenerImplUnix {
632 pub(crate) s: UnixListener,
633 #[cfg(feature = "socket_options")]
634 recv_buffer_size: Option<usize>,
635 #[cfg(feature = "socket_options")]
636 send_buffer_size: Option<usize>,
637}
638
639#[cfg(feature = "multi-listener")]
640pub(crate) struct ListenerImplMulti {
641 pub(crate) v: Vec<ListenerImpl>,
642}
643
644pub(crate) enum ListenerImpl {
645 Tcp(ListenerImplTcp),
646 #[cfg(all(feature = "unix", unix))]
647 Unix(ListenerImplUnix),
648 #[cfg(feature = "inetd")]
649 Stdio(StdioListener),
650 #[cfg(feature = "multi-listener")]
651 Multi(ListenerImplMulti),
652}
653
654impl ListenerImpl {
655 fn poll_accept(
656 &mut self,
657 cx: &mut Context<'_>,
658 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
659 match self {
660 ListenerImpl::Tcp(ti) => ti.poll_accept(cx),
661 #[cfg(all(feature = "unix", unix))]
662 ListenerImpl::Unix(ui) => ui.poll_accept(cx),
663 #[cfg(feature = "inetd")]
664 ListenerImpl::Stdio(x) => x.poll_accept(cx),
665 #[cfg(feature = "multi-listener")]
666 ListenerImpl::Multi(x) => x.poll_accept(cx),
667 }
668 }
669}
670
671impl ListenerImplTcp {
672 fn poll_accept(
673 &mut self,
674 cx: &mut Context<'_>,
675 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
676 let ListenerImplTcp {
677 s,
678 nodelay,
679 #[cfg(feature = "socket_options")]
680 keepalive,
681 #[cfg(feature = "socket_options")]
682 recv_buffer_size,
683 #[cfg(feature = "socket_options")]
684 send_buffer_size,
685 } = self;
686 match s.poll_accept(cx) {
687 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
688 Poll::Ready(Ok((c, a))) => {
689 debug!(fromaddr=%a, r#type="tcp", "incoming connection");
690 if *nodelay {
691 c.set_nodelay(true)?;
692 }
693
694 #[cfg(feature = "socket_options")]
695 {
696 apply_tcp_keepalive_opts(&c, keepalive)?;
697 apply_socket_buf_opts(&c, recv_buffer_size, send_buffer_size)?;
698 }
699
700 Poll::Ready(Ok((
701 Connection(ConnectionImpl::Tcp(c)),
702 SomeSocketAddr::Tcp(a),
703 )))
704 }
705 Poll::Pending => Poll::Pending,
706 }
707 }
708}
709
710#[cfg(all(feature = "unix", unix))]
711impl ListenerImplUnix {
712 fn poll_accept(
713 &mut self,
714 cx: &mut Context<'_>,
715 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
716 let ListenerImplUnix {
717 s,
718 #[cfg(feature = "socket_options")]
719 recv_buffer_size,
720 #[cfg(feature = "socket_options")]
721 send_buffer_size,
722 } = self;
723 match s.poll_accept(cx) {
724 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
725 Poll::Ready(Ok((c, a))) => {
726 debug!(r#type = "unix", "incoming connection");
727 #[cfg(feature = "socket_options")]
728 {
729 apply_socket_buf_opts(&c, recv_buffer_size, send_buffer_size)?;
730 }
731 Poll::Ready(Ok((
732 Connection(ConnectionImpl::Unix(c)),
733 SomeSocketAddr::Unix(a),
734 )))
735 }
736 Poll::Pending => Poll::Pending,
737 }
738 }
739}
740
741#[cfg(feature = "multi-listener")]
742impl ListenerImplMulti {
743 fn poll_accept(
744 &mut self,
745 cx: &mut Context<'_>,
746 ) -> Poll<std::io::Result<(Connection, SomeSocketAddr)>> {
747 for s in self.v.iter_mut() {
748 match s.poll_accept(cx) {
749 Poll::Ready(x) => return Poll::Ready(x),
750 Poll::Pending => (),
751 }
752 }
753 Poll::Pending
754 }
755}
756
757#[cfg(feature = "socket_options")]
758fn apply_tcp_keepalive_opts(
759 c: &TcpStream,
760 keepalive: &Option<socket2::TcpKeepalive>,
761) -> std::io::Result<()> {
762 let sock_ref = socket2::SockRef::from(&c);
763 if let Some(ka) = keepalive {
764 sock_ref.set_tcp_keepalive(ka)?;
765 }
766 Ok(())
767}
768
769#[cfg(all(feature = "socket_options", unix))]
770fn apply_socket_buf_opts<T: std::os::fd::AsFd>(
771 c: &T,
772 recv_buffer_size: &Option<usize>,
773 send_buffer_size: &Option<usize>,
774) -> std::io::Result<()> {
775 let sock_ref = socket2::SockRef::from(&c);
776 if let Some(n) = recv_buffer_size {
777 sock_ref.set_recv_buffer_size(*n)?;
778 }
779 if let Some(n) = send_buffer_size {
780 sock_ref.set_send_buffer_size(*n)?;
781 }
782 Ok(())
783}
784
785#[cfg(all(feature = "socket_options", not(unix)))]
786fn apply_socket_buf_opts(
787 c: &TcpStream,
788 recv_buffer_size: &Option<usize>,
789 send_buffer_size: &Option<usize>,
790) -> std::io::Result<()> {
791 let sock_ref = socket2::SockRef::from(&c);
792 if let Some(n) = recv_buffer_size {
793 sock_ref.set_recv_buffer_size(*n)?;
794 }
795 if let Some(n) = send_buffer_size {
796 sock_ref.set_send_buffer_size(*n)?;
797 }
798 Ok(())
799}