snix_store/pathinfoservice/
grpc.rs

1use super::{PathInfo, PathInfoService};
2use crate::pathinfoservice;
3use crate::{nar::NarCalculationService, proto};
4use async_stream::try_stream;
5use futures::{StreamExt, stream::BoxStream};
6use nix_compat::nixbase32;
7use snix_castore::Node;
8use snix_castore::composition::{CompositionContext, ServiceBuilder};
9use std::sync::Arc;
10use tonic::{Code, async_trait};
11use tracing::{Span, instrument, warn};
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13
14/// Connects to a (remote) snix-store PathInfoService over gRPC.
15#[derive(Clone)]
16pub struct GRPCPathInfoService<T> {
17    instance_name: String,
18
19    /// The internal reference to a gRPC client.
20    /// Cloning it is cheap, and it internally handles concurrent requests.
21    grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
22}
23
24impl<T> GRPCPathInfoService<T> {
25    /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
26    /// panics if called outside the context of a tokio runtime.
27    pub fn from_client(
28        instance_name: String,
29        grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
30    ) -> Self {
31        Self {
32            instance_name,
33            grpc_client,
34        }
35    }
36}
37
38#[async_trait]
39impl<T> PathInfoService for GRPCPathInfoService<T>
40where
41    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
42    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
43    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
44    T::Future: Send,
45{
46    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
47    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
48        match self
49            .grpc_client
50            .clone()
51            .get(proto::GetPathInfoRequest {
52                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
53                    digest.to_vec().into(),
54                )),
55            })
56            .await
57        {
58            Ok(path_info) => Ok(Some(
59                PathInfo::try_from(path_info.into_inner()).map_err(Error::PathInfoValidation)?,
60            )),
61            Err(e) if e.code() == Code::NotFound => Ok(None),
62            Err(e) => Err(Error::Tonic(e))?,
63        }
64    }
65
66    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
67    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
68        let path_info = self
69            .grpc_client
70            .clone()
71            .put(proto::PathInfo::from(path_info))
72            .await
73            .map_err(Error::Tonic)?
74            .into_inner();
75        Ok(PathInfo::try_from(path_info).map_err(Error::PathInfoValidation)?)
76    }
77
78    #[instrument(level = "trace", skip_all)]
79    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
80        let mut grpc_client = self.grpc_client.clone();
81
82        try_stream! {
83            let resp = grpc_client.list(proto::ListPathInfoRequest::default()).await;
84
85            let mut stream = resp.map_err(Error::Tonic)?.into_inner();
86
87            while let Some(path_info_proto) =  stream.message().await.map_err(Error::Tonic)? {
88                yield PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?
89            }
90        }
91        .boxed()
92    }
93
94    #[instrument(level = "trace", skip_all)]
95    fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
96        Some(Box::new(self.clone()) as Box<dyn NarCalculationService>)
97    }
98}
99
100#[async_trait]
101impl<T> NarCalculationService for GRPCPathInfoService<T>
102where
103    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
104    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
105    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
106    T::Future: Send,
107{
108    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=tracing::field::Empty))]
109    async fn calculate_nar(
110        &self,
111        root_node: &Node,
112    ) -> Result<(u64, [u8; 32]), pathinfoservice::Error> {
113        let span = Span::current();
114        span.pb_set_message("Waiting for NAR calculation");
115        span.pb_start();
116
117        let path_info = self
118            .grpc_client
119            .clone()
120            .calculate_nar(snix_castore::proto::Entry::from_name_and_node(
121                "".into(),
122                root_node.to_owned(),
123            ))
124            .await
125            .map_err(Error::Tonic)?
126            .into_inner();
127
128        let nar_sha256: [u8; 32] = path_info
129            .nar_sha256
130            .to_vec()
131            .try_into()
132            .map_err(|_| Error::NarCalcInvalidDigestLen)?;
133
134        Ok((path_info.nar_size, nar_sha256))
135    }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum Error {
140    #[error("failed to decode protobuf: {0}")]
141    ProtobufDecode(#[from] prost::DecodeError),
142    #[error("failed to validate PathInfo: {0}")]
143    PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
144
145    #[error("tonic status: {0}")]
146    Tonic(#[from] tonic::Status),
147
148    #[error("invalid digest length returned from nar calculation")]
149    NarCalcInvalidDigestLen,
150
151    #[error("join error: {0}")]
152    TokioJoin(#[from] tokio::task::JoinError),
153    #[error("io error: {0}")]
154    IO(#[from] std::io::Error),
155}
156
157#[derive(serde::Deserialize, Debug)]
158#[serde(deny_unknown_fields)]
159pub struct GRPCPathInfoServiceConfig {
160    url: String,
161}
162
163impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
164    type Error = Box<dyn std::error::Error + Send + Sync>;
165    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
166        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
167        // - In the case of unix sockets, there must be a path, but may not be a host.
168        // - In the case of non-unix sockets, there must be a host, but no path.
169        // Constructing the channel is handled by snix_castore::channel::from_url.
170        Ok(GRPCPathInfoServiceConfig {
171            url: url.to_string(),
172        })
173    }
174}
175
176#[async_trait]
177impl ServiceBuilder for GRPCPathInfoServiceConfig {
178    type Output = dyn PathInfoService;
179    async fn build<'a>(
180        &'a self,
181        instance_name: &str,
182        _context: &CompositionContext,
183    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
184        let client = proto::path_info_service_client::PathInfoServiceClient::with_interceptor(
185            snix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
186            // tonic::service::Interceptor wants an unboxed Status as return type.
187            // https://github.com/hyperium/tonic/issues/2253
188            |rq| snix_tracing::propagate::tonic::send_trace(rq).map_err(|e| *e),
189        );
190        Ok(Arc::new(GRPCPathInfoService::from_client(
191            instance_name.to_string(),
192            client,
193        )))
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use crate::pathinfoservice::PathInfoService;
200    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
201
202    /// This ensures connecting via gRPC works as expected.
203    #[tokio::test]
204    async fn test_valid_unix_path_ping_pong() {
205        let (_blob_service, _directory_service, path_info_service) =
206            make_grpc_path_info_service_client().await;
207
208        let path_info = path_info_service
209            .get(crate::fixtures::DUMMY_PATH_DIGEST)
210            .await
211            .expect("must not be error");
212
213        assert!(path_info.is_none());
214    }
215}