snix_castore/
tonic.rs

1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::transport::{Channel, Endpoint};
4
5fn url_wants_wait_connect(url: &url::Url) -> bool {
6    url.query_pairs()
7        .filter(|(k, v)| k == "wait-connect" && v == "1")
8        .count()
9        > 0
10}
11
12/// Turn a [url::Url] to a [Channel] if it can be parsed successfully.
13/// It supports the following schemes (and URLs):
14///  - `grpc+http://[::1]:8000`, connecting over unencrypted HTTP/2 (h2c)
15///  - `grpc+https://[::1]:8000`, connecting over encrypted HTTP/2
16///  - `grpc+unix:/path/to/socket`, connecting to a unix domain socket
17///
18/// All URLs support adding `wait-connect=1` as a URL parameter, in which case
19/// the connection is established lazily.
20pub async fn channel_from_url(url: &url::Url) -> Result<Channel, self::Error> {
21    match url.scheme() {
22        "grpc+unix" => {
23            if url.has_authority() {
24                return Err(Error::AuthorityDisallowed());
25            }
26
27            let connector = tower::service_fn({
28                let url = url.clone();
29                move |_: tonic::transport::Uri| {
30                    let unix = UnixStream::connect(url.path().to_string().clone());
31                    async move { Ok::<_, std::io::Error>(TokioIo::new(unix.await?)) }
32                }
33            });
34
35            // the URL doesn't matter
36            let endpoint = Endpoint::from_static("http://[::]:50051");
37            if url_wants_wait_connect(url) {
38                Ok(endpoint.connect_with_connector(connector).await?)
39            } else {
40                Ok(endpoint.connect_with_connector_lazy(connector))
41            }
42        }
43        _ => {
44            // ensure path is empty, not supported with gRPC.
45            if !url.path().is_empty() {
46                return Err(Error::PathMayNotBeSet());
47            }
48
49            // Stringify the URL and remove the grpc+ prefix.
50            // We can't use `url.set_scheme(rest)`, as it disallows
51            // setting something http(s) that previously wasn't.
52            let unprefixed_url_str = match url.to_string().strip_prefix("grpc+") {
53                None => return Err(Error::MissingGRPCPrefix()),
54                Some(url_str) => url_str.to_owned(),
55            };
56
57            // Use the regular tonic transport::Endpoint logic, but unprefixed_url_str,
58            // as tonic doesn't know about grpc+http[s].
59            let endpoint = Endpoint::try_from(unprefixed_url_str)?;
60
61            let endpoint = if url.scheme() == "grpc+https" {
62                let tls_config = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
63                endpoint.tls_config(tls_config)?
64            } else {
65                endpoint
66            };
67
68            if url_wants_wait_connect(url) {
69                Ok(endpoint.connect().await?)
70            } else {
71                Ok(endpoint.connect_lazy())
72            }
73        }
74    }
75}
76
77/// Errors occuring when trying to connect to a backend
78#[derive(Debug, thiserror::Error)]
79pub enum Error {
80    #[error("grpc+ prefix is missing from URL")]
81    MissingGRPCPrefix(),
82
83    #[error("unix domain sockets URLs should not have authority")]
84    AuthorityDisallowed(),
85
86    #[error("path may not be set")]
87    PathMayNotBeSet(),
88
89    #[error("transport error: {0}")]
90    TransportError(tonic::transport::Error),
91}
92
93impl From<tonic::transport::Error> for Error {
94    fn from(value: tonic::transport::Error) -> Self {
95        Self::TransportError(value)
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::channel_from_url;
102    use rstest::rstest;
103    use url::Url;
104
105    #[rstest]
106    /// Correct scheme to connect to a unix socket.
107    #[case::valid_unix_socket("grpc+unix:/path/to/somewhere", true)]
108    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
109    #[case::valid_unix_socket_wait_connect_0("grpc+unix:/path/to/somewhere?wait-connect=0", true)]
110    /// Connecting with wait-connect set to 1 fails, as the path doesn't exist.
111    #[case::valid_unix_socket_wait_connect_1("grpc+unix:/path/to/somewhere?wait-connect=1", false)]
112    /// Correct scheme for unix socket, but setting authority, which is invalid.
113    #[case::invalid_unix_socket_with_authority("grpc+unix:///path/to/somewhere", false)]
114    /// Correct scheme for unix socket, but setting a host too, which is invalid.
115    #[case::invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
116    /// Correct scheme to connect to localhost, with port 12345
117    #[case::valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
118    /// Correct scheme to connect to localhost over http, without specifying a port.
119    #[case::valid_http_host_without_port("grpc+http://localhost", true)]
120    /// Correct scheme to connect to localhost over http, without specifying a port.
121    #[case::valid_https_host_without_port("grpc+https://localhost", true)]
122    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
123    #[case::invalid_host_and_path("grpc+http://localhost/some-path", false)]
124    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
125    #[case::valid_host_wait_connect_0("grpc+http://localhost?wait-connect=0", true)]
126    /// Connecting with wait-connect set to 1 fails, as the host doesn't exist.
127    #[case::valid_host_wait_connect_1_fails("grpc+http://nonexist.invalid?wait-connect=1", false)]
128    #[tokio::test]
129    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] is_ok: bool) {
130        let url = Url::parse(uri_str).expect("must parse");
131        assert_eq!(channel_from_url(&url).await.is_ok(), is_ok)
132    }
133}