snix_store/pathinfoservice/
grpc.rs1use super::{PathInfo, PathInfoService};
2use crate::pathinfoservice;
3use crate::{nar::NarCalculationService, proto};
4use async_stream::try_stream;
5use futures::{StreamExt, stream::BoxStream};
6use nix_compat::nixbase32;
7use snix_castore::Node;
8use snix_castore::composition::{CompositionContext, ServiceBuilder};
9use std::sync::Arc;
10use tonic::{Code, async_trait};
11use tracing::{Span, instrument, warn};
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13
14#[derive(Clone)]
16pub struct GRPCPathInfoService<T> {
17 instance_name: String,
18
19 grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
22}
23
24impl<T> GRPCPathInfoService<T> {
25 pub fn from_client(
28 instance_name: String,
29 grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
30 ) -> Self {
31 Self {
32 instance_name,
33 grpc_client,
34 }
35 }
36}
37
38#[async_trait]
39impl<T> PathInfoService for GRPCPathInfoService<T>
40where
41 T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
42 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
43 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
44 T::Future: Send,
45{
46 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
47 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
48 match self
49 .grpc_client
50 .clone()
51 .get(proto::GetPathInfoRequest {
52 by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
53 digest.to_vec().into(),
54 )),
55 })
56 .await
57 {
58 Ok(path_info) => Ok(Some(
59 PathInfo::try_from(path_info.into_inner()).map_err(Error::PathInfoValidation)?,
60 )),
61 Err(e) if e.code() == Code::NotFound => Ok(None),
62 Err(e) => Err(Error::Tonic(e))?,
63 }
64 }
65
66 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
67 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
68 let path_info = self
69 .grpc_client
70 .clone()
71 .put(proto::PathInfo::from(path_info))
72 .await
73 .map_err(Error::Tonic)?
74 .into_inner();
75 Ok(PathInfo::try_from(path_info).map_err(Error::PathInfoValidation)?)
76 }
77
78 #[instrument(level = "trace", skip_all)]
79 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
80 let mut grpc_client = self.grpc_client.clone();
81
82 try_stream! {
83 let resp = grpc_client.list(proto::ListPathInfoRequest::default()).await;
84
85 let mut stream = resp.map_err(Error::Tonic)?.into_inner();
86
87 while let Some(path_info_proto) = stream.message().await.map_err(Error::Tonic)? {
88 yield PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?
89 }
90 }
91 .boxed()
92 }
93
94 #[instrument(level = "trace", skip_all)]
95 fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
96 Some(Box::new(self.clone()) as Box<dyn NarCalculationService>)
97 }
98}
99
100#[async_trait]
101impl<T> NarCalculationService for GRPCPathInfoService<T>
102where
103 T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
104 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
105 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
106 T::Future: Send,
107{
108 #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=tracing::field::Empty))]
109 async fn calculate_nar(
110 &self,
111 root_node: &Node,
112 ) -> Result<(u64, [u8; 32]), pathinfoservice::Error> {
113 let span = Span::current();
114 span.pb_set_message("Waiting for NAR calculation");
115 span.pb_start();
116
117 let path_info = self
118 .grpc_client
119 .clone()
120 .calculate_nar(snix_castore::proto::Entry::from_name_and_node(
121 "".into(),
122 root_node.to_owned(),
123 ))
124 .await
125 .map_err(Error::Tonic)?
126 .into_inner();
127
128 let nar_sha256: [u8; 32] = path_info
129 .nar_sha256
130 .to_vec()
131 .try_into()
132 .map_err(|_| Error::NarCalcInvalidDigestLen)?;
133
134 Ok((path_info.nar_size, nar_sha256))
135 }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum Error {
140 #[error("failed to decode protobuf: {0}")]
141 ProtobufDecode(#[from] prost::DecodeError),
142 #[error("failed to validate PathInfo: {0}")]
143 PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
144
145 #[error("tonic status: {0}")]
146 Tonic(#[from] tonic::Status),
147
148 #[error("invalid digest length returned from nar calculation")]
149 NarCalcInvalidDigestLen,
150
151 #[error("join error: {0}")]
152 TokioJoin(#[from] tokio::task::JoinError),
153 #[error("io error: {0}")]
154 IO(#[from] std::io::Error),
155}
156
157#[derive(serde::Deserialize, Debug)]
158#[serde(deny_unknown_fields)]
159pub struct GRPCPathInfoServiceConfig {
160 url: String,
161}
162
163impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
164 type Error = Box<dyn std::error::Error + Send + Sync>;
165 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
166 Ok(GRPCPathInfoServiceConfig {
171 url: url.to_string(),
172 })
173 }
174}
175
176#[async_trait]
177impl ServiceBuilder for GRPCPathInfoServiceConfig {
178 type Output = dyn PathInfoService;
179 async fn build<'a>(
180 &'a self,
181 instance_name: &str,
182 _context: &CompositionContext,
183 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
184 let client = proto::path_info_service_client::PathInfoServiceClient::with_interceptor(
185 snix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
186 |rq| snix_tracing::propagate::tonic::send_trace(rq).map_err(|e| *e),
189 );
190 Ok(Arc::new(GRPCPathInfoService::from_client(
191 instance_name.to_string(),
192 client,
193 )))
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use crate::pathinfoservice::PathInfoService;
200 use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
201
202 #[tokio::test]
204 async fn test_valid_unix_path_ping_pong() {
205 let (_blob_service, _directory_service, path_info_service) =
206 make_grpc_path_info_service_client().await;
207
208 let path_info = path_info_service
209 .get(crate::fixtures::DUMMY_PATH_DIGEST)
210 .await
211 .expect("must not be error");
212
213 assert!(path_info.is_none());
214 }
215}