snix_castore/directoryservice/
grpc.rs1use super::{Directory, DirectoryPutter, DirectoryService};
2use crate::B3Digest;
3use crate::composition::{CompositionContext, ServiceBuilder};
4use crate::directoryservice::RootToLeavesValidator;
5use crate::proto::{self, get_directory_request::ByWhat};
6use futures::StreamExt;
7use futures::stream::BoxStream;
8use std::sync::Arc;
9use tokio::spawn;
10use tokio::sync::mpsc::UnboundedSender;
11use tokio::task::JoinHandle;
12use tokio_stream::wrappers::UnboundedReceiverStream;
13use tonic::{Code, Status, async_trait};
14use tracing::{Instrument as _, instrument, warn};
15
16#[derive(Clone)]
18pub struct GRPCDirectoryService<T> {
19 instance_name: String,
20 grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
23}
24
25impl<T> GRPCDirectoryService<T> {
26 pub fn from_client(
29 instance_name: String,
30 grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
31 ) -> Self {
32 Self {
33 instance_name,
34 grpc_client,
35 }
36 }
37}
38
39#[async_trait]
40impl<T> DirectoryService for GRPCDirectoryService<T>
41where
42 T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
43 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
44 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
45 T::Future: Send,
46{
47 #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
48 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
49 match self
52 .grpc_client
53 .clone()
54 .get(proto::GetDirectoryRequest {
55 recursive: false,
56 by_what: Some(ByWhat::Digest((*digest).into())),
57 })
58 .await
59 .map_err(Error::Tonic)?
60 .into_inner()
61 .message()
62 .await
63 {
64 Ok(Some(proto_directory)) => {
65 let actual_digest = proto_directory.digest();
68 if &actual_digest != digest {
69 Err(Error::WrongDigest {
70 expected: *digest,
71 actual: actual_digest,
72 })?
73 } else {
74 let directory =
75 Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
76 Ok(Some(directory))
77 }
78 }
79 Ok(None) => Ok(None),
80 Err(e) if e.code() == Code::NotFound => Ok(None),
81 Err(e) => Err(Error::Tonic(e))?,
82 }
83 }
84
85 #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
86 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
87 let resp = self
88 .grpc_client
89 .clone()
90 .put(tokio_stream::once(proto::Directory::from(directory)))
91 .await
92 .map_err(Error::Tonic)?;
93
94 let digest = resp
95 .into_inner()
96 .root_digest
97 .try_into()
98 .map_err(|_| Error::InvalidDigestLen)?;
99
100 Ok(digest)
101 }
102
103 #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
104 fn get_recursive(
105 &self,
106 root_directory_digest: &B3Digest,
107 ) -> BoxStream<'static, Result<Directory, super::Error>> {
108 let mut grpc_client = self.grpc_client.clone();
109 let root_directory_digest = *root_directory_digest;
110
111 let mut order_validator =
112 RootToLeavesValidator::new_with_root_digest(root_directory_digest);
113
114 async_stream::try_stream! {
115 let mut directories = grpc_client
116 .get(proto::GetDirectoryRequest {
117 recursive: true,
118 by_what: Some(ByWhat::Digest((root_directory_digest).into())),
119 })
120 .await
121 .map_err(Error::Tonic)?
122 .into_inner();
123
124 while let Some(proto_directory) = directories.message().await? {
125 let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
126 order_validator.try_accept(&directory).map_err(Error::DirectoryOrdering)?;
127
128 yield directory;
129 }
130 }.boxed()
131 }
132
133 #[instrument(skip_all)]
134 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + 'static> {
135 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
136
137 let task = spawn({
138 let mut grpc_client = self.grpc_client.clone();
139
140 async move {
141 Ok::<_, Status>(
142 grpc_client
143 .put(UnboundedReceiverStream::new(rx))
144 .await?
145 .into_inner(),
146 )
147 }
148 .in_current_span()
150 });
151
152 Box::new(GRPCPutter {
153 rq: Some((task, tx)),
154 })
155 }
156}
157
158pub struct GRPCPutter {
160 #[allow(clippy::type_complexity)] rq: Option<(
166 JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
167 UnboundedSender<proto::Directory>,
168 )>,
169}
170
171#[async_trait]
172impl DirectoryPutter for GRPCPutter {
173 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
174 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
175 let (_, directory_sender) = self
176 .rq
177 .as_ref()
178 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
179 if directory_sender.send(directory.into()).is_err() {
181 self.close().await?;
185 }
186 Ok(())
187 }
188
189 #[instrument(level = "trace", skip_all, ret, err)]
191 async fn close(&mut self) -> Result<B3Digest, super::Error> {
192 let (task, directory_sender) =
195 std::mem::take(&mut self.rq).ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
196
197 drop(directory_sender);
199
200 let resp = task.await?.map_err(Error::Tonic)?;
201
202 Ok(B3Digest::try_from(resp.root_digest).map_err(|_| Error::InvalidDigestLen)?)
203 }
204}
205
206#[derive(thiserror::Error, Debug)]
207pub enum Error {
208 #[error("Directory Graph ordering error")]
209 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
210
211 #[error("DirectoryPutter already closed")]
212 DirectoryPutterAlreadyClosed,
213
214 #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
215 WrongDigest {
216 expected: B3Digest,
217 actual: B3Digest,
218 },
219
220 #[error("tonic status: {0}")]
221 Tonic(#[from] tonic::Status),
222
223 #[error("invalid digest length returned from put")]
224 InvalidDigestLen,
225
226 #[error("failed to decode protobuf: {0}")]
227 ProtobufDecode(#[from] prost::DecodeError),
228 #[error("failed to validate directory: {0}")]
229 DirectoryValidation(#[from] crate::DirectoryError),
230
231 #[error("join error: {0}")]
232 TokioJoin(#[from] tokio::task::JoinError),
233 #[error("io error: {0}")]
234 IO(#[from] std::io::Error),
235}
236
237#[derive(serde::Deserialize, Debug)]
238#[serde(deny_unknown_fields)]
239pub struct GRPCDirectoryServiceConfig {
240 url: String,
241}
242
243impl TryFrom<url::Url> for GRPCDirectoryServiceConfig {
244 type Error = Box<dyn std::error::Error + Send + Sync>;
245 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
246 Ok(GRPCDirectoryServiceConfig {
251 url: url.to_string(),
252 })
253 }
254}
255
256#[async_trait]
257impl ServiceBuilder for GRPCDirectoryServiceConfig {
258 type Output = dyn DirectoryService;
259 async fn build<'a>(
260 &'a self,
261 instance_name: &str,
262 _context: &CompositionContext,
263 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
264 let client = proto::directory_service_client::DirectoryServiceClient::with_interceptor(
265 crate::tonic::channel_from_url(&self.url.parse()?).await?,
266 |rq| snix_tracing::propagate::tonic::send_trace(rq).map_err(|e| *e),
269 );
270 Ok(Arc::new(GRPCDirectoryService::from_client(
271 instance_name.to_string(),
272 client,
273 )))
274 }
275}
276#[cfg(test)]
277mod tests {
278 use std::time::Duration;
279 use tempfile::TempDir;
280 use tokio::net::UnixListener;
281 use tokio_retry::{Retry, strategy::ExponentialBackoff};
282 use tokio_stream::wrappers::UnixListenerStream;
283
284 use crate::{
285 directoryservice::{DirectoryService, GRPCDirectoryService},
286 fixtures,
287 proto::{GRPCDirectoryServiceWrapper, directory_service_client::DirectoryServiceClient},
288 utils::gen_test_directory_service,
289 };
290
291 #[tokio::test]
293 async fn test_valid_unix_path_ping_pong() {
294 let tmpdir = TempDir::new().unwrap();
295 let socket_path = tmpdir.path().join("daemon");
296
297 let path_clone = socket_path.clone();
298
299 tokio::spawn(async {
301 let uds = UnixListener::bind(path_clone).unwrap();
302 let uds_stream = UnixListenerStream::new(uds);
303
304 let mut server = tonic::transport::Server::builder();
306 let router = server.add_service(
307 crate::proto::directory_service_server::DirectoryServiceServer::new(
308 GRPCDirectoryServiceWrapper::new(Box::new(gen_test_directory_service())),
309 ),
310 );
311 router.serve_with_incoming(uds_stream).await
312 });
313
314 Retry::spawn(
316 ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
317 || async {
318 if socket_path.exists() {
319 Ok(())
320 } else {
321 Err(())
322 }
323 },
324 )
325 .await
326 .expect("failed to wait for socket");
327
328 let grpc_client = {
330 let url = url::Url::parse(&format!(
331 "grpc+unix://{}?wait-connect=1",
332 socket_path.display()
333 ))
334 .expect("must parse");
335 let client = DirectoryServiceClient::new(
336 crate::tonic::channel_from_url(&url)
337 .await
338 .expect("must succeed"),
339 );
340 GRPCDirectoryService::from_client("test-instance".into(), client)
341 };
342
343 assert!(
344 grpc_client
345 .get(&fixtures::DIRECTORY_A.digest())
346 .await
347 .expect("must not fail")
348 .is_none()
349 )
350 }
351}