snix_store/pathinfoservice/
grpc.rs

1use super::{PathInfo, PathInfoService};
2use crate::{
3    nar::NarCalculationService,
4    proto::{self, ListPathInfoRequest},
5};
6use async_stream::try_stream;
7use futures::stream::BoxStream;
8use nix_compat::nixbase32;
9use snix_castore::Error;
10use snix_castore::Node;
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12use std::sync::Arc;
13use tonic::{Code, async_trait};
14use tracing::{Span, instrument, warn};
15use tracing_indicatif::span_ext::IndicatifSpanExt;
16
17/// Connects to a (remote) snix-store PathInfoService over gRPC.
18#[derive(Clone)]
19pub struct GRPCPathInfoService<T> {
20    instance_name: String,
21
22    /// The internal reference to a gRPC client.
23    /// Cloning it is cheap, and it internally handles concurrent requests.
24    grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
25}
26
27impl<T> GRPCPathInfoService<T> {
28    /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
29    /// panics if called outside the context of a tokio runtime.
30    pub fn from_client(
31        instance_name: String,
32        grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
33    ) -> Self {
34        Self {
35            instance_name,
36            grpc_client,
37        }
38    }
39}
40
41#[async_trait]
42impl<T> PathInfoService for GRPCPathInfoService<T>
43where
44    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
45    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
46    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
47    T::Future: Send,
48{
49    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
50    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
51        let path_info = self
52            .grpc_client
53            .clone()
54            .get(proto::GetPathInfoRequest {
55                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
56                    digest.to_vec().into(),
57                )),
58            })
59            .await;
60
61        match path_info {
62            Ok(path_info) => Ok(Some(
63                PathInfo::try_from(path_info.into_inner())
64                    .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
65            )),
66            Err(e) if e.code() == Code::NotFound => Ok(None),
67            Err(e) => Err(Error::StorageError(e.to_string())),
68        }
69    }
70
71    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
72    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
73        let path_info = self
74            .grpc_client
75            .clone()
76            .put(proto::PathInfo::from(path_info))
77            .await
78            .map_err(|e| Error::StorageError(e.to_string()))?
79            .into_inner();
80        Ok(PathInfo::try_from(path_info)
81            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?)
82    }
83
84    #[instrument(level = "trace", skip_all)]
85    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
86        let mut grpc_client = self.grpc_client.clone();
87
88        let stream = try_stream! {
89            let resp = grpc_client.list(ListPathInfoRequest::default()).await;
90
91            let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner();
92
93            loop {
94                match stream.message().await {
95                    Ok(Some(path_info)) => yield PathInfo::try_from(path_info).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
96                    Ok(None) => return,
97                    Err(e) => Err(Error::StorageError(e.to_string()))?,
98                }
99            }
100        };
101
102        Box::pin(stream)
103    }
104
105    #[instrument(level = "trace", skip_all)]
106    fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
107        Some(Box::new(GRPCPathInfoService {
108            instance_name: self.instance_name.clone(),
109            grpc_client: self.grpc_client.clone(),
110        }) as Box<dyn NarCalculationService>)
111    }
112}
113
114#[async_trait]
115impl<T> NarCalculationService for GRPCPathInfoService<T>
116where
117    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
118    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
119    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
120    T::Future: Send,
121{
122    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=tracing::field::Empty))]
123    async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> {
124        let span = Span::current();
125        span.pb_set_message("Waiting for NAR calculation");
126        span.pb_start();
127
128        let path_info = self
129            .grpc_client
130            .clone()
131            .calculate_nar(snix_castore::proto::Entry::from_name_and_node(
132                "".into(),
133                root_node.to_owned(),
134            ))
135            .await
136            .map_err(|e| Error::StorageError(e.to_string()))?
137            .into_inner();
138
139        let nar_sha256: [u8; 32] = path_info
140            .nar_sha256
141            .to_vec()
142            .try_into()
143            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
144
145        Ok((path_info.nar_size, nar_sha256))
146    }
147}
148
149#[derive(serde::Deserialize, Debug)]
150#[serde(deny_unknown_fields)]
151pub struct GRPCPathInfoServiceConfig {
152    url: String,
153}
154
155impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
156    type Error = Box<dyn std::error::Error + Send + Sync>;
157    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
158        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
159        // - In the case of unix sockets, there must be a path, but may not be a host.
160        // - In the case of non-unix sockets, there must be a host, but no path.
161        // Constructing the channel is handled by snix_castore::channel::from_url.
162        Ok(GRPCPathInfoServiceConfig {
163            url: url.to_string(),
164        })
165    }
166}
167
168#[async_trait]
169impl ServiceBuilder for GRPCPathInfoServiceConfig {
170    type Output = dyn PathInfoService;
171    async fn build<'a>(
172        &'a self,
173        instance_name: &str,
174        _context: &CompositionContext,
175    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
176        let client = proto::path_info_service_client::PathInfoServiceClient::new(
177            snix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
178        );
179        Ok(Arc::new(GRPCPathInfoService::from_client(
180            instance_name.to_string(),
181            client,
182        )))
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use crate::pathinfoservice::PathInfoService;
189    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
190
191    /// This ensures connecting via gRPC works as expected.
192    #[tokio::test]
193    async fn test_valid_unix_path_ping_pong() {
194        let (_blob_service, _directory_service, path_info_service) =
195            make_grpc_path_info_service_client().await;
196
197        let path_info = path_info_service
198            .get(crate::fixtures::DUMMY_PATH_DIGEST)
199            .await
200            .expect("must not be error");
201
202        assert!(path_info.is_none());
203    }
204}