snix_castore/proto/
grpc_directoryservice_wrapper.rs1use crate::directoryservice::{DirectoryService, LeavesToRootValidator};
2use crate::{B3Digest, DirectoryError, proto};
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use tonic::{Request, Response, Status, Streaming, async_trait};
6use tracing::{instrument, warn};
7
8pub struct GRPCDirectoryServiceWrapper<T> {
9 directory_service: T,
10}
11
12impl<T> GRPCDirectoryServiceWrapper<T> {
13 pub fn new(directory_service: T) -> Self {
14 Self { directory_service }
15 }
16}
17
18#[async_trait]
19impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
20where
21 T: DirectoryService + Clone + Send + Sync + 'static,
22{
23 type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
24
25 #[instrument(skip_all)]
26 async fn get(
27 &self,
28 request: Request<proto::GetDirectoryRequest>,
29 ) -> Result<Response<Self::GetStream>, Status> {
30 let req_inner = request.into_inner();
31
32 match &req_inner
33 .by_what
34 .ok_or_else(|| Status::invalid_argument("invalid by_what"))?
35 {
36 proto::get_directory_request::ByWhat::Digest(digest) => {
37 let digest: B3Digest = digest
38 .clone()
39 .try_into()
40 .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
41
42 let directory_service = self.directory_service.clone();
43
44 Ok(tonic::Response::new({
45 async_stream::try_stream! {
46 if !req_inner.recursive {
47 let directory = directory_service
48 .get(&digest)
49 .await
50 .map_err(|e| {
51 warn!(err = %e, directory.digest=%digest, "failed to get directory");
52 tonic::Status::new(tonic::Code::Internal, e.to_string())
53 })?
54 .ok_or_else(|| {
55 Status::not_found(format!("directory {digest} not found"))
56 })?;
57
58 yield directory.into();
59 } else {
60 let mut s = get_recursive_owned(std::sync::Arc::new(directory_service),digest)
63 .map_ok(proto::Directory::from)
64 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()));
65
66 while let Some(directory) = s.try_next().await? {
67 yield directory
68 }
69 }
70
71 }.boxed()
72 }))
73 }
74 }
75 }
76
77 #[instrument(skip_all)]
78 async fn put(
79 &self,
80 request: Request<Streaming<proto::Directory>>,
81 ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
82 let mut req_inner = request.into_inner();
83
84 let mut validator = LeavesToRootValidator::new();
86
87 let mut directory_putter = self.directory_service.put_multiple_start();
89
90 while let Some(directory) = req_inner.message().await? {
91 let directory: crate::Directory =
92 directory.try_into().map_err(|e: DirectoryError| {
93 tonic::Status::new(tonic::Code::Internal, e.to_string())
94 })?;
95 validator
96 .try_accept(&directory)
97 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
98
99 directory_putter
100 .put(directory)
101 .await
102 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
103 }
104
105 validator
107 .finalize()
108 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
109
110 Ok(Response::new(proto::PutDirectoryResponse {
111 root_digest: directory_putter
113 .close()
114 .await
115 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
116 .into(),
117 }))
118 }
119}
120
121fn get_recursive_owned<S>(
124 svc: std::sync::Arc<S>,
125 root_directory_digest: B3Digest,
126) -> BoxStream<'static, Result<crate::Directory, crate::directoryservice::Error>>
127where
128 S: DirectoryService + 'static,
129{
130 async_stream::try_stream! {
131 let mut directories = svc.get_recursive(&root_directory_digest);
132 while let Some(directory) = directories.try_next().await? {
133 yield directory;
134 }
135 }
136 .boxed()
137}