snix_castore/directoryservice/
grpc.rs1use super::{Directory, DirectoryPutter, DirectoryService};
2use crate::B3Digest;
3use crate::composition::{CompositionContext, ServiceBuilder};
4use crate::directoryservice::RootToLeavesValidator;
5use crate::proto::{self, get_directory_request::ByWhat};
6use futures::StreamExt;
7use futures::stream::BoxStream;
8use std::sync::Arc;
9use tokio::spawn;
10use tokio::sync::mpsc::UnboundedSender;
11use tokio::task::JoinHandle;
12use tokio_stream::wrappers::UnboundedReceiverStream;
13use tonic::{Code, Status, async_trait};
14use tracing::{Instrument as _, instrument, warn};
15
16#[derive(Clone)]
18pub struct GRPCDirectoryService<T> {
19 instance_name: String,
20 grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
23}
24
25impl<T> GRPCDirectoryService<T> {
26 pub fn from_client(
29 instance_name: String,
30 grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
31 ) -> Self {
32 Self {
33 instance_name,
34 grpc_client,
35 }
36 }
37}
38
39#[async_trait]
40impl<T> DirectoryService for GRPCDirectoryService<T>
41where
42 T: tonic::client::GrpcService<tonic::body::Body> + Send + Sync + Clone + 'static,
43 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
44 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
45 T::Future: Send,
46{
47 #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
48 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
49 match self
52 .grpc_client
53 .clone()
54 .get(proto::GetDirectoryRequest {
55 recursive: false,
56 by_what: Some(ByWhat::Digest((*digest).into())),
57 })
58 .await
59 .map_err(Error::Tonic)?
60 .into_inner()
61 .message()
62 .await
63 {
64 Ok(Some(proto_directory)) => {
65 let actual_digest = proto_directory.digest();
68 if &actual_digest != digest {
69 Err(Error::WrongDigest {
70 expected: *digest,
71 actual: actual_digest,
72 })?
73 } else {
74 let directory =
75 Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
76 Ok(Some(directory))
77 }
78 }
79 Ok(None) => Ok(None),
80 Err(e) if e.code() == Code::NotFound => Ok(None),
81 Err(e) => Err(Error::Tonic(e))?,
82 }
83 }
84
85 #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
86 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
87 let resp = self
88 .grpc_client
89 .clone()
90 .put(tokio_stream::once(proto::Directory::from(directory)))
91 .await
92 .map_err(Error::Tonic)?;
93
94 let digest = resp
95 .into_inner()
96 .root_digest
97 .try_into()
98 .map_err(|_| Error::InvalidDigestLen)?;
99
100 Ok(digest)
101 }
102
103 #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
104 fn get_recursive(
105 &self,
106 root_directory_digest: &B3Digest,
107 ) -> BoxStream<'static, Result<Directory, super::Error>> {
108 let mut grpc_client = self.grpc_client.clone();
109 let root_directory_digest = *root_directory_digest;
110
111 let mut order_validator =
112 RootToLeavesValidator::new_with_root_digest(root_directory_digest);
113
114 async_stream::try_stream! {
115 let mut directories = grpc_client
116 .get(proto::GetDirectoryRequest {
117 recursive: true,
118 by_what: Some(ByWhat::Digest((root_directory_digest).into())),
119 })
120 .await
121 .map_err(Error::Tonic)?
122 .into_inner();
123
124 while let Some(proto_directory) = directories.message().await? {
125 let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
126 order_validator.try_accept(&directory).map_err(Error::DirectoryOrdering)?;
127
128 yield directory;
129 }
130 }.boxed()
131 }
132
133 #[instrument(skip_all)]
134 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + 'static> {
135 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
136
137 let task = spawn({
138 let mut grpc_client = self.grpc_client.clone();
139
140 async move {
141 Ok::<_, Status>(
142 grpc_client
143 .put(UnboundedReceiverStream::new(rx))
144 .await?
145 .into_inner(),
146 )
147 }
148 .in_current_span()
150 });
151
152 Box::new(GRPCPutter {
153 rq: Some((task, tx)),
154 })
155 }
156}
157
158pub struct GRPCPutter {
160 #[allow(clippy::type_complexity)] rq: Option<(
166 JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
167 UnboundedSender<proto::Directory>,
168 )>,
169}
170
171#[async_trait]
172impl DirectoryPutter for GRPCPutter {
173 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
174 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
175 let (_, directory_sender) = self
176 .rq
177 .as_ref()
178 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
179 if directory_sender.send(directory.into()).is_err() {
181 self.close().await?;
185 }
186 Ok(())
187 }
188
189 #[instrument(level = "trace", skip_all, ret, err)]
191 async fn close(&mut self) -> Result<B3Digest, super::Error> {
192 let (task, directory_sender) =
195 std::mem::take(&mut self.rq).ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
196
197 drop(directory_sender);
199
200 let resp = task.await?.map_err(Error::Tonic)?;
201
202 Ok(B3Digest::try_from(resp.root_digest).map_err(|_| Error::InvalidDigestLen)?)
203 }
204}
205
206#[derive(thiserror::Error, Debug)]
207pub enum Error {
208 #[error("Directory Graph ordering error")]
209 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
210
211 #[error("DirectoryPutter already closed")]
212 DirectoryPutterAlreadyClosed,
213
214 #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
215 WrongDigest {
216 expected: B3Digest,
217 actual: B3Digest,
218 },
219
220 #[error("tonic status: {0}")]
221 Tonic(#[from] tonic::Status),
222
223 #[error("invalid digest length returned from put")]
224 InvalidDigestLen,
225
226 #[error("failed to decode protobuf: {0}")]
227 ProtobufDecode(#[from] prost::DecodeError),
228 #[error("failed to validate directory: {0}")]
229 DirectoryValidation(#[from] crate::DirectoryError),
230
231 #[error("join error: {0}")]
232 TokioJoin(#[from] tokio::task::JoinError),
233 #[error("io error: {0}")]
234 IO(#[from] std::io::Error),
235}
236
237#[derive(serde::Deserialize, Debug)]
238#[serde(deny_unknown_fields)]
239pub struct GRPCDirectoryServiceConfig {
240 url: String,
241}
242
243impl TryFrom<url::Url> for GRPCDirectoryServiceConfig {
244 type Error = Box<dyn std::error::Error + Send + Sync>;
245 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
246 Ok(GRPCDirectoryServiceConfig {
251 url: url.to_string(),
252 })
253 }
254}
255
256#[async_trait]
257impl ServiceBuilder for GRPCDirectoryServiceConfig {
258 type Output = dyn DirectoryService;
259 async fn build<'a>(
260 &'a self,
261 instance_name: &str,
262 _context: &CompositionContext,
263 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
264 let client = proto::directory_service_client::DirectoryServiceClient::with_interceptor(
265 crate::tonic::channel_from_url(&self.url.parse()?).await?,
266 snix_tracing::propagate::tonic::send_trace,
267 );
268 Ok(Arc::new(GRPCDirectoryService::from_client(
269 instance_name.to_string(),
270 client,
271 )))
272 }
273}
274#[cfg(test)]
275mod tests {
276 use std::time::Duration;
277 use tempfile::TempDir;
278 use tokio::net::UnixListener;
279 use tokio_retry::{Retry, strategy::ExponentialBackoff};
280 use tokio_stream::wrappers::UnixListenerStream;
281
282 use crate::{
283 directoryservice::{DirectoryService, GRPCDirectoryService},
284 fixtures,
285 proto::{GRPCDirectoryServiceWrapper, directory_service_client::DirectoryServiceClient},
286 utils::gen_test_directory_service,
287 };
288
289 #[tokio::test]
291 async fn test_valid_unix_path_ping_pong() {
292 let tmpdir = TempDir::new().unwrap();
293 let socket_path = tmpdir.path().join("daemon");
294
295 let path_clone = socket_path.clone();
296
297 tokio::spawn(async {
299 let uds = UnixListener::bind(path_clone).unwrap();
300 let uds_stream = UnixListenerStream::new(uds);
301
302 let mut server = tonic::transport::Server::builder();
304 let router = server.add_service(
305 crate::proto::directory_service_server::DirectoryServiceServer::new(
306 GRPCDirectoryServiceWrapper::new(Box::new(gen_test_directory_service())),
307 ),
308 );
309 router.serve_with_incoming(uds_stream).await
310 });
311
312 Retry::spawn(
314 ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
315 || async {
316 if socket_path.exists() {
317 Ok(())
318 } else {
319 Err(())
320 }
321 },
322 )
323 .await
324 .expect("failed to wait for socket");
325
326 let grpc_client = {
328 let url = url::Url::parse(&format!(
329 "grpc+unix:{}?wait-connect=1",
330 socket_path.display()
331 ))
332 .expect("must parse");
333 let client = DirectoryServiceClient::new(
334 crate::tonic::channel_from_url(&url)
335 .await
336 .expect("must succeed"),
337 );
338 GRPCDirectoryService::from_client("test-instance".into(), client)
339 };
340
341 assert!(
342 grpc_client
343 .get(&fixtures::DIRECTORY_A.digest())
344 .await
345 .expect("must not fail")
346 .is_none()
347 )
348 }
349}