snix_castore/proto/
grpc_directoryservice_wrapper.rs

1use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator};
2use crate::{B3Digest, DirectoryError, proto};
3use futures::TryStreamExt;
4use futures::stream::BoxStream;
5use std::ops::Deref;
6use tokio_stream::once;
7use tonic::{Request, Response, Status, Streaming, async_trait};
8use tracing::{instrument, warn};
9
10pub struct GRPCDirectoryServiceWrapper<T> {
11    directory_service: T,
12}
13
14impl<T> GRPCDirectoryServiceWrapper<T> {
15    pub fn new(directory_service: T) -> Self {
16        Self { directory_service }
17    }
18}
19
20#[async_trait]
21impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
22where
23    T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
24{
25    type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
26
27    #[instrument(skip_all)]
28    async fn get<'a>(
29        &'a self,
30        request: Request<proto::GetDirectoryRequest>,
31    ) -> Result<Response<Self::GetStream>, Status> {
32        let req_inner = request.into_inner();
33
34        let by_what = &req_inner
35            .by_what
36            .ok_or_else(|| Status::invalid_argument("invalid by_what"))?;
37
38        match by_what {
39            proto::get_directory_request::ByWhat::Digest(digest) => {
40                let digest: B3Digest = digest
41                    .clone()
42                    .try_into()
43                    .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
44
45                Ok(tonic::Response::new({
46                    if !req_inner.recursive {
47                        let directory = self
48                            .directory_service
49                            .get(&digest)
50                            .await
51                            .map_err(|e| {
52                                warn!(err = %e, directory.digest=%digest, "failed to get directory");
53                                tonic::Status::new(tonic::Code::Internal, e.to_string())
54                            })?
55                            .ok_or_else(|| {
56                                Status::not_found(format!("directory {} not found", digest))
57                            })?;
58
59                        Box::pin(once(Ok(directory.into())))
60                    } else {
61                        // If recursive was requested, traverse via get_recursive.
62                        Box::pin(
63                            self.directory_service
64                                .get_recursive(&digest)
65                                .map_ok(proto::Directory::from)
66                                .map_err(|e| {
67                                    tonic::Status::new(tonic::Code::Internal, e.to_string())
68                                }),
69                        )
70                    }
71                }))
72            }
73        }
74    }
75
76    #[instrument(skip_all)]
77    async fn put(
78        &self,
79        request: Request<Streaming<proto::Directory>>,
80    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
81        let mut req_inner = request.into_inner();
82
83        // We put all Directory messages we receive into DirectoryGraph.
84        let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
85        while let Some(directory) = req_inner.message().await? {
86            validator
87                .add(directory.try_into().map_err(|e: DirectoryError| {
88                    tonic::Status::new(tonic::Code::Internal, e.to_string())
89                })?)
90                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
91        }
92
93        // drain, which validates connectivity too.
94        let directories = validator
95            .validate()
96            .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
97            .drain_leaves_to_root()
98            .collect::<Vec<_>>();
99
100        let mut directory_putter = self.directory_service.put_multiple_start();
101        for directory in directories {
102            directory_putter.put(directory).await?;
103        }
104
105        // Properly close the directory putter. Peek at last_directory_digest
106        // and return it, or propagate errors.
107        let last_directory_dgst = directory_putter.close().await?;
108
109        Ok(Response::new(proto::PutDirectoryResponse {
110            root_digest: last_directory_dgst.into(),
111        }))
112    }
113}