snix_castore/proto/
grpc_directoryservice_wrapper.rs

1use crate::directoryservice::{DirectoryService, LeavesToRootValidator};
2use crate::{B3Digest, DirectoryError, proto};
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use tonic::{Request, Response, Status, Streaming, async_trait};
6use tracing::{instrument, warn};
7
8pub struct GRPCDirectoryServiceWrapper<T> {
9    directory_service: T,
10}
11
12impl<T> GRPCDirectoryServiceWrapper<T> {
13    pub fn new(directory_service: T) -> Self {
14        Self { directory_service }
15    }
16}
17
18#[async_trait]
19impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
20where
21    T: DirectoryService + Clone + Send + Sync + 'static,
22{
23    type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
24
25    #[instrument(skip_all)]
26    async fn get(
27        &self,
28        request: Request<proto::GetDirectoryRequest>,
29    ) -> Result<Response<Self::GetStream>, Status> {
30        let req_inner = request.into_inner();
31
32        match &req_inner
33            .by_what
34            .ok_or_else(|| Status::invalid_argument("invalid by_what"))?
35        {
36            proto::get_directory_request::ByWhat::Digest(digest) => {
37                let digest: B3Digest = digest
38                    .clone()
39                    .try_into()
40                    .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
41
42                let directory_service = self.directory_service.clone();
43
44                Ok(tonic::Response::new({
45                    async_stream::try_stream! {
46                        if !req_inner.recursive {
47                            let directory = directory_service
48                                .get(&digest)
49                                .await
50                                .map_err(|e| {
51                                    warn!(err = %e, directory.digest=%digest, "failed to get directory");
52                                    tonic::Status::new(tonic::Code::Internal, e.to_string())
53                                })?
54                                .ok_or_else(|| {
55                                    Status::not_found(format!("directory {digest} not found"))
56                                })?;
57
58                            yield directory.into();
59                        } else {
60                            // If recursive was requested, traverse via get_recursive.
61                            // We need to use some type acrobatics as prost wants streams with 'static lifetimes.
62                            let mut s = get_recursive_owned(std::sync::Arc::new(directory_service),digest)
63                                .map_ok(proto::Directory::from)
64                                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()));
65
66                            while let Some(directory) = s.try_next().await? {
67                                yield directory
68                            }
69                        }
70
71                    }.boxed()
72                }))
73            }
74        }
75    }
76
77    #[instrument(skip_all)]
78    async fn put(
79        &self,
80        request: Request<Streaming<proto::Directory>>,
81    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
82        let mut req_inner = request.into_inner();
83
84        // Validate all received directories.
85        let mut validator = LeavesToRootValidator::new();
86
87        // Insert into the backing DirectoryService as we receive.
88        let mut directory_putter = self.directory_service.put_multiple_start();
89
90        while let Some(directory) = req_inner.message().await? {
91            let directory: crate::Directory =
92                directory.try_into().map_err(|e: DirectoryError| {
93                    tonic::Status::new(tonic::Code::Internal, e.to_string())
94                })?;
95            validator
96                .try_accept(&directory)
97                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
98
99            directory_putter
100                .put(directory)
101                .await
102                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
103        }
104
105        // Finalize validator, checks connectivity.
106        validator
107            .finalize()
108            .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
109
110        Ok(Response::new(proto::PutDirectoryResponse {
111            // Properly close the directory putter, returning any potential errors.
112            root_digest: directory_putter
113                .close()
114                .await
115                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
116                .into(),
117        }))
118    }
119}
120
121/// The same as [DirectoryService::get_recursive], but returning a stream with a static lifetime.
122/// It's only used for the gRPC server wrapper, which requires static lifetimes.
123fn get_recursive_owned<S>(
124    svc: std::sync::Arc<S>,
125    root_directory_digest: B3Digest,
126) -> BoxStream<'static, Result<crate::Directory, crate::directoryservice::Error>>
127where
128    S: DirectoryService + 'static,
129{
130    async_stream::try_stream! {
131        let mut  directories = svc.get_recursive(&root_directory_digest);
132        while let Some(directory) = directories.try_next().await? {
133            yield directory;
134        }
135    }
136    .boxed()
137}