snix_castore/
tonic.rs

1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::transport::{Channel, Endpoint};
4
5fn url_wants_wait_connect(url: &url::Url) -> bool {
6    url.query_pairs()
7        .filter(|(k, v)| k == "wait-connect" && v == "1")
8        .count()
9        > 0
10}
11
12/// Turn a [url::Url] to a [Channel] if it can be parsed successfully.
13/// It supports the following schemes (and URLs):
14///  - `grpc+http://[::1]:8000`, connecting over unencrypted HTTP/2 (h2c)
15///  - `grpc+https://[::1]:8000`, connecting over encrypted HTTP/2
16///  - `grpc+unix:/path/to/socket`, connecting to a unix domain socket
17///
18/// All URLs support adding `wait-connect=1` as a URL parameter, in which case
19/// the connection is established lazily.
20pub async fn channel_from_url(url: &url::Url) -> Result<Channel, self::Error> {
21    match url.scheme() {
22        "grpc+unix" => {
23            if url.host_str().is_some() {
24                return Err(Error::HostSetForUnixSocket());
25            }
26
27            let connector = tower::service_fn({
28                let url = url.clone();
29                move |_: tonic::transport::Uri| {
30                    let unix = UnixStream::connect(url.path().to_string().clone());
31                    async move { Ok::<_, std::io::Error>(TokioIo::new(unix.await?)) }
32                }
33            });
34
35            // the URL doesn't matter
36            let endpoint = Endpoint::from_static("http://[::]:50051");
37            if url_wants_wait_connect(url) {
38                Ok(endpoint.connect_with_connector(connector).await?)
39            } else {
40                Ok(endpoint.connect_with_connector_lazy(connector))
41            }
42        }
43        _ => {
44            // ensure path is empty, not supported with gRPC.
45            if !url.path().is_empty() {
46                return Err(Error::PathMayNotBeSet());
47            }
48
49            // Stringify the URL and remove the grpc+ prefix.
50            // We can't use `url.set_scheme(rest)`, as it disallows
51            // setting something http(s) that previously wasn't.
52            let unprefixed_url_str = match url.to_string().strip_prefix("grpc+") {
53                None => return Err(Error::MissingGRPCPrefix()),
54                Some(url_str) => url_str.to_owned(),
55            };
56
57            // Use the regular tonic transport::Endpoint logic, but unprefixed_url_str,
58            // as tonic doesn't know about grpc+http[s].
59            let endpoint = Endpoint::try_from(unprefixed_url_str)?;
60            if url_wants_wait_connect(url) {
61                Ok(endpoint.connect().await?)
62            } else {
63                Ok(endpoint.connect_lazy())
64            }
65        }
66    }
67}
68
69/// Errors occuring when trying to connect to a backend
70#[derive(Debug, thiserror::Error)]
71pub enum Error {
72    #[error("grpc+ prefix is missing from URL")]
73    MissingGRPCPrefix(),
74
75    #[error("host may not be set for unix domain sockets")]
76    HostSetForUnixSocket(),
77
78    #[error("path may not be set")]
79    PathMayNotBeSet(),
80
81    #[error("transport error: {0}")]
82    TransportError(tonic::transport::Error),
83}
84
85impl From<tonic::transport::Error> for Error {
86    fn from(value: tonic::transport::Error) -> Self {
87        Self::TransportError(value)
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::channel_from_url;
94    use rstest::rstest;
95    use url::Url;
96
97    #[rstest]
98    /// Correct scheme to connect to a unix socket.
99    #[case::valid_unix_socket("grpc+unix:///path/to/somewhere", true)]
100    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
101    #[case::valid_unix_socket_wait_connect_0("grpc+unix:///path/to/somewhere?wait-connect=0", true)]
102    /// Connecting with wait-connect set to 1 fails, as the path doesn't exist.
103    #[case::valid_unix_socket_wait_connect_1(
104        "grpc+unix:///path/to/somewhere?wait-connect=1",
105        false
106    )]
107    /// Correct scheme for unix socket, but setting a host too, which is invalid.
108    #[case::invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
109    /// Correct scheme to connect to localhost, with port 12345
110    #[case::valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
111    /// Correct scheme to connect to localhost over http, without specifying a port.
112    #[case::valid_http_host_without_port("grpc+http://localhost", true)]
113    /// Correct scheme to connect to localhost over http, without specifying a port.
114    #[case::valid_https_host_without_port("grpc+https://localhost", true)]
115    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
116    #[case::invalid_host_and_path("grpc+http://localhost/some-path", false)]
117    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
118    #[case::valid_host_wait_connect_0("grpc+http://localhost?wait-connect=0", true)]
119    /// Connecting with wait-connect set to 1 fails, as the host doesn't exist.
120    #[case::valid_host_wait_connect_1_fails("grpc+http://nonexist.invalid?wait-connect=1", false)]
121    #[tokio::test]
122    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] is_ok: bool) {
123        let url = Url::parse(uri_str).expect("must parse");
124        assert_eq!(channel_from_url(&url).await.is_ok(), is_ok)
125    }
126}