snix_castore/proto/
grpc_directoryservice_wrapper.rs1use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator};
2use crate::{B3Digest, DirectoryError, proto};
3use futures::TryStreamExt;
4use futures::stream::BoxStream;
5use std::ops::Deref;
6use tokio_stream::once;
7use tonic::{Request, Response, Status, Streaming, async_trait};
8use tracing::{instrument, warn};
9
10pub struct GRPCDirectoryServiceWrapper<T> {
11 directory_service: T,
12}
13
14impl<T> GRPCDirectoryServiceWrapper<T> {
15 pub fn new(directory_service: T) -> Self {
16 Self { directory_service }
17 }
18}
19
20#[async_trait]
21impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
22where
23 T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
24{
25 type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
26
27 #[instrument(skip_all)]
28 async fn get<'a>(
29 &'a self,
30 request: Request<proto::GetDirectoryRequest>,
31 ) -> Result<Response<Self::GetStream>, Status> {
32 let req_inner = request.into_inner();
33
34 let by_what = &req_inner
35 .by_what
36 .ok_or_else(|| Status::invalid_argument("invalid by_what"))?;
37
38 match by_what {
39 proto::get_directory_request::ByWhat::Digest(digest) => {
40 let digest: B3Digest = digest
41 .clone()
42 .try_into()
43 .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
44
45 Ok(tonic::Response::new({
46 if !req_inner.recursive {
47 let directory = self
48 .directory_service
49 .get(&digest)
50 .await
51 .map_err(|e| {
52 warn!(err = %e, directory.digest=%digest, "failed to get directory");
53 tonic::Status::new(tonic::Code::Internal, e.to_string())
54 })?
55 .ok_or_else(|| {
56 Status::not_found(format!("directory {} not found", digest))
57 })?;
58
59 Box::pin(once(Ok(directory.into())))
60 } else {
61 Box::pin(
63 self.directory_service
64 .get_recursive(&digest)
65 .map_ok(proto::Directory::from)
66 .map_err(|e| {
67 tonic::Status::new(tonic::Code::Internal, e.to_string())
68 }),
69 )
70 }
71 }))
72 }
73 }
74 }
75
76 #[instrument(skip_all)]
77 async fn put(
78 &self,
79 request: Request<Streaming<proto::Directory>>,
80 ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
81 let mut req_inner = request.into_inner();
82
83 let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
85 while let Some(directory) = req_inner.message().await? {
86 validator
87 .add(directory.try_into().map_err(|e: DirectoryError| {
88 tonic::Status::new(tonic::Code::Internal, e.to_string())
89 })?)
90 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
91 }
92
93 let directories = validator
95 .validate()
96 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
97 .drain_leaves_to_root()
98 .collect::<Vec<_>>();
99
100 let mut directory_putter = self.directory_service.put_multiple_start();
101 for directory in directories {
102 directory_putter.put(directory).await?;
103 }
104
105 let last_directory_dgst = directory_putter.close().await?;
108
109 Ok(Response::new(proto::PutDirectoryResponse {
110 root_digest: last_directory_dgst.into(),
111 }))
112 }
113}