snix_castore/directoryservice/
grpc.rs

1use super::{Directory, DirectoryPutter, DirectoryService};
2use crate::B3Digest;
3use crate::composition::{CompositionContext, ServiceBuilder};
4use crate::directoryservice::RootToLeavesValidator;
5use crate::proto::{self, get_directory_request::ByWhat};
6use futures::StreamExt;
7use futures::stream::BoxStream;
8use std::sync::Arc;
9use tokio::spawn;
10use tokio::sync::mpsc::UnboundedSender;
11use tokio::task::JoinHandle;
12use tokio_stream::wrappers::UnboundedReceiverStream;
13use tonic::{Code, Status, async_trait};
14use tracing::{Instrument as _, instrument, warn};
15
16/// Connects to a (remote) snix-store DirectoryService over gRPC.
17#[derive(Clone)]
18pub struct GRPCDirectoryService<T> {
19    instance_name: String,
20    /// The internal reference to a gRPC client.
21    /// Cloning it is cheap, and it internally handles concurrent requests.
22    grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
23}
24
25impl<T> GRPCDirectoryService<T> {
26    /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient].
27    /// panics if called outside the context of a tokio runtime.
28    pub fn from_client(
29        instance_name: String,
30        grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
31    ) -> Self {
32        Self {
33            instance_name,
34            grpc_client,
35        }
36    }
37}
38
39#[async_trait]
40impl<T> DirectoryService for GRPCDirectoryService<T>
41where
42    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
43    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
44    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
45    T::Future: Send,
46{
47    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
48    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
49        // clone the client, as it takes a &mut.
50        // We retrieve the first message only, then close the stream (we set recursive to false)
51        match self
52            .grpc_client
53            .clone()
54            .get(proto::GetDirectoryRequest {
55                recursive: false,
56                by_what: Some(ByWhat::Digest((*digest).into())),
57            })
58            .await
59            .map_err(Error::Tonic)?
60            .into_inner()
61            .message()
62            .await
63        {
64            Ok(Some(proto_directory)) => {
65                // Validate the retrieved Directory indeed has the
66                // digest we expect it to have, to detect corruptions.
67                let actual_digest = proto_directory.digest();
68                if &actual_digest != digest {
69                    Err(Error::WrongDigest {
70                        expected: *digest,
71                        actual: actual_digest,
72                    })?
73                } else {
74                    let directory =
75                        Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
76                    Ok(Some(directory))
77                }
78            }
79            Ok(None) => Ok(None),
80            Err(e) if e.code() == Code::NotFound => Ok(None),
81            Err(e) => Err(Error::Tonic(e))?,
82        }
83    }
84
85    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
86    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
87        let resp = self
88            .grpc_client
89            .clone()
90            .put(tokio_stream::once(proto::Directory::from(directory)))
91            .await
92            .map_err(Error::Tonic)?;
93
94        let digest = resp
95            .into_inner()
96            .root_digest
97            .try_into()
98            .map_err(|_| Error::InvalidDigestLen)?;
99
100        Ok(digest)
101    }
102
103    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
104    fn get_recursive(
105        &self,
106        root_directory_digest: &B3Digest,
107    ) -> BoxStream<'static, Result<Directory, super::Error>> {
108        let mut grpc_client = self.grpc_client.clone();
109        let root_directory_digest = *root_directory_digest;
110
111        let mut order_validator =
112            RootToLeavesValidator::new_with_root_digest(root_directory_digest);
113
114        async_stream::try_stream! {
115            let mut directories = grpc_client
116                .get(proto::GetDirectoryRequest {
117                    recursive: true,
118                    by_what: Some(ByWhat::Digest((root_directory_digest).into())),
119                })
120                .await
121                .map_err(Error::Tonic)?
122                .into_inner();
123
124            while let Some(proto_directory) = directories.message().await? {
125                let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
126                order_validator.try_accept(&directory).map_err(Error::DirectoryOrdering)?;
127
128                yield directory;
129            }
130        }.boxed()
131    }
132
133    #[instrument(skip_all)]
134    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + 'static> {
135        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
136
137        let task = spawn({
138            let mut grpc_client = self.grpc_client.clone();
139
140            async move {
141                Ok::<_, Status>(
142                    grpc_client
143                        .put(UnboundedReceiverStream::new(rx))
144                        .await?
145                        .into_inner(),
146                )
147            }
148            // instrument the task with the current span, this is not done by default
149            .in_current_span()
150        });
151
152        Box::new(GRPCPutter {
153            rq: Some((task, tx)),
154        })
155    }
156}
157
158/// Allows uploading multiple Directory messages in the same gRPC stream.
159pub struct GRPCPutter {
160    /// Data about the current request - a handle to the task, and the tx part
161    /// of the channel.
162    /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
163    /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed.
164    #[allow(clippy::type_complexity)] // lol
165    rq: Option<(
166        JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
167        UnboundedSender<proto::Directory>,
168    )>,
169}
170
171#[async_trait]
172impl DirectoryPutter for GRPCPutter {
173    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
174    async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
175        let (_, directory_sender) = self
176            .rq
177            .as_ref()
178            .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
179        // If we're not already closed, send the directory to directory_sender.
180        if directory_sender.send(directory.into()).is_err() {
181            // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
182            // That error code is much more helpful, because it
183            // contains the error message from the server.
184            self.close().await?;
185        }
186        Ok(())
187    }
188
189    /// Closes the stream for sending, and returns the value.
190    #[instrument(level = "trace", skip_all, ret, err)]
191    async fn close(&mut self) -> Result<B3Digest, super::Error> {
192        // get self.rq, and replace it with None.
193        // This ensures we can only close it once.
194        let (task, directory_sender) =
195            std::mem::take(&mut self.rq).ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
196
197        // close directory_sender, so blocking on task will finish.
198        drop(directory_sender);
199
200        let resp = task.await?.map_err(Error::Tonic)?;
201
202        Ok(B3Digest::try_from(resp.root_digest).map_err(|_| Error::InvalidDigestLen)?)
203    }
204}
205
206#[derive(thiserror::Error, Debug)]
207pub enum Error {
208    #[error("Directory Graph ordering error")]
209    DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
210
211    #[error("DirectoryPutter already closed")]
212    DirectoryPutterAlreadyClosed,
213
214    #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
215    WrongDigest {
216        expected: B3Digest,
217        actual: B3Digest,
218    },
219
220    #[error("tonic status: {0}")]
221    Tonic(#[from] tonic::Status),
222
223    #[error("invalid digest length returned from put")]
224    InvalidDigestLen,
225
226    #[error("failed to decode protobuf: {0}")]
227    ProtobufDecode(#[from] prost::DecodeError),
228    #[error("failed to validate directory: {0}")]
229    DirectoryValidation(#[from] crate::DirectoryError),
230
231    #[error("join error: {0}")]
232    TokioJoin(#[from] tokio::task::JoinError),
233    #[error("io error: {0}")]
234    IO(#[from] std::io::Error),
235}
236
237#[derive(serde::Deserialize, Debug)]
238#[serde(deny_unknown_fields)]
239pub struct GRPCDirectoryServiceConfig {
240    url: String,
241}
242
243impl TryFrom<url::Url> for GRPCDirectoryServiceConfig {
244    type Error = Box<dyn std::error::Error + Send + Sync>;
245    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
246        //   This is normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
247        // - In the case of unix sockets, there must be a path, but may not be a host.
248        // - In the case of non-unix sockets, there must be a host, but no path.
249        // Constructing the channel is handled by snix_castore::channel::from_url.
250        Ok(GRPCDirectoryServiceConfig {
251            url: url.to_string(),
252        })
253    }
254}
255
256#[async_trait]
257impl ServiceBuilder for GRPCDirectoryServiceConfig {
258    type Output = dyn DirectoryService;
259    async fn build<'a>(
260        &'a self,
261        instance_name: &str,
262        _context: &CompositionContext,
263    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
264        let client = proto::directory_service_client::DirectoryServiceClient::with_interceptor(
265            crate::tonic::channel_from_url(&self.url.parse()?).await?,
266            // tonic::service::Interceptor wants an unboxed Status as return type.
267            // https://github.com/hyperium/tonic/issues/2253
268            |rq| snix_tracing::propagate::tonic::send_trace(rq).map_err(|e| *e),
269        );
270        Ok(Arc::new(GRPCDirectoryService::from_client(
271            instance_name.to_string(),
272            client,
273        )))
274    }
275}
276#[cfg(test)]
277mod tests {
278    use std::time::Duration;
279    use tempfile::TempDir;
280    use tokio::net::UnixListener;
281    use tokio_retry::{Retry, strategy::ExponentialBackoff};
282    use tokio_stream::wrappers::UnixListenerStream;
283
284    use crate::{
285        directoryservice::{DirectoryService, GRPCDirectoryService},
286        fixtures,
287        proto::{GRPCDirectoryServiceWrapper, directory_service_client::DirectoryServiceClient},
288        utils::gen_test_directory_service,
289    };
290
291    /// This ensures connecting via gRPC works as expected.
292    #[tokio::test]
293    async fn test_valid_unix_path_ping_pong() {
294        let tmpdir = TempDir::new().unwrap();
295        let socket_path = tmpdir.path().join("daemon");
296
297        let path_clone = socket_path.clone();
298
299        // Spin up a server
300        tokio::spawn(async {
301            let uds = UnixListener::bind(path_clone).unwrap();
302            let uds_stream = UnixListenerStream::new(uds);
303
304            // spin up a new server
305            let mut server = tonic::transport::Server::builder();
306            let router = server.add_service(
307                crate::proto::directory_service_server::DirectoryServiceServer::new(
308                    GRPCDirectoryServiceWrapper::new(Box::new(gen_test_directory_service())),
309                ),
310            );
311            router.serve_with_incoming(uds_stream).await
312        });
313
314        // wait for the socket to be created
315        Retry::spawn(
316            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
317            || async {
318                if socket_path.exists() {
319                    Ok(())
320                } else {
321                    Err(())
322                }
323            },
324        )
325        .await
326        .expect("failed to wait for socket");
327
328        // prepare a client
329        let grpc_client = {
330            let url = url::Url::parse(&format!(
331                "grpc+unix://{}?wait-connect=1",
332                socket_path.display()
333            ))
334            .expect("must parse");
335            let client = DirectoryServiceClient::new(
336                crate::tonic::channel_from_url(&url)
337                    .await
338                    .expect("must succeed"),
339            );
340            GRPCDirectoryService::from_client("test-instance".into(), client)
341        };
342
343        assert!(
344            grpc_client
345                .get(&fixtures::DIRECTORY_A.digest())
346                .await
347                .expect("must not fail")
348                .is_none()
349        )
350    }
351}