snix_store/pathinfoservice/
nix_http.rs

1use super::{PathInfo, PathInfoService};
2use crate::nar::ingest_nar_and_hash;
3use futures::{TryStreamExt, stream::BoxStream};
4use nix_compat::{
5    narinfo::{self, NarInfo, Signature},
6    nixbase32,
7    nixhash::NixHash,
8    store_path::StorePath,
9};
10use reqwest::StatusCode;
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12use snix_castore::{Error, blobservice::BlobService, directoryservice::DirectoryService};
13use std::sync::Arc;
14use tokio::io::{self, AsyncRead};
15use tonic::async_trait;
16use tracing::{debug, instrument, warn};
17use url::Url;
18
19/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
20/// protocol provided by Nix binary caches such as cache.nixos.org, and the Snix
21/// Store Model.
22/// It implements the [PathInfoService] trait in an interesting way:
23/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
24/// inserting components into a [BlobService] and [DirectoryService], then
25/// returning a [PathInfo] struct with the root.
26///
27/// Due to this being quite a costly operation, clients are expected to layer
28/// this service with store composition, so they're only ingested once.
29///
30/// The client is expected to be (indirectly) using the same [BlobService] and
31/// [DirectoryService], so able to fetch referred Directories and Blobs.
32/// [PathInfoService::put] is not implemented and returns an error if called.
33/// TODO: what about reading from nix-cache-info?
34pub struct NixHTTPPathInfoService<BS, DS> {
35    instance_name: String,
36    base_url: url::Url,
37    http_client: reqwest_middleware::ClientWithMiddleware,
38
39    blob_service: BS,
40    directory_service: DS,
41
42    /// An optional list of [narinfo::VerifyingKey].
43    /// If set, the .narinfo files received need to have correct signature by at least one of these.
44    public_keys: Option<Vec<narinfo::VerifyingKey>>,
45}
46
47impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
48    pub fn new(
49        instance_name: String,
50        base_url: url::Url,
51        blob_service: BS,
52        directory_service: DS,
53    ) -> Self {
54        Self {
55            instance_name,
56            base_url,
57            http_client: reqwest_middleware::ClientBuilder::new(
58                reqwest::Client::builder()
59                    .user_agent(crate::USER_AGENT)
60                    .build()
61                    .expect("Client::new()"),
62            )
63            .with(snix_tracing::propagate::reqwest::tracing_middleware())
64            .build(),
65            blob_service,
66            directory_service,
67
68            public_keys: None,
69        }
70    }
71
72    /// Configures [Self] to validate NARInfo fingerprints with the public keys passed.
73    pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::VerifyingKey>) {
74        self.public_keys = Some(public_keys);
75    }
76}
77
78#[async_trait]
79impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
80where
81    BS: BlobService + Send + Sync + Clone + 'static,
82    DS: DirectoryService + Send + Sync + Clone + 'static,
83{
84    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
85    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
86        let narinfo_url = self
87            .base_url
88            .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
89            .map_err(|e| {
90                warn!(e = %e, "unable to join URL");
91                io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
92            })?;
93
94        debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
95
96        let resp = self
97            .http_client
98            .get(narinfo_url)
99            .send()
100            .await
101            .map_err(|e| {
102                warn!(e=%e,"unable to send NARInfo request");
103                io::Error::new(
104                    io::ErrorKind::InvalidInput,
105                    "unable to send NARInfo request",
106                )
107            })?;
108
109        // In the case of a 404, return a NotFound.
110        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
111        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
112        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
113            return Ok(None);
114        }
115
116        let narinfo_str = resp.text().await.map_err(|e| {
117            warn!(e=%e,"unable to decode response as string");
118            io::Error::new(
119                io::ErrorKind::InvalidData,
120                "unable to decode response as string",
121            )
122        })?;
123
124        // parse the received narinfo
125        let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
126            warn!(e=%e,"unable to parse response as NarInfo");
127            io::Error::new(
128                io::ErrorKind::InvalidData,
129                "unable to parse response as NarInfo",
130            )
131        })?;
132
133        // if [self.public_keys] is set, ensure there's at least one valid signature.
134        if let Some(public_keys) = &self.public_keys {
135            let fingerprint = narinfo.fingerprint();
136
137            if !public_keys.iter().any(|pubkey| {
138                narinfo
139                    .signatures
140                    .iter()
141                    .any(|sig| pubkey.verify(&fingerprint, sig))
142            }) {
143                warn!("no valid signature found");
144                Err(io::Error::new(
145                    io::ErrorKind::InvalidData,
146                    "no valid signature found",
147                ))?;
148            }
149        }
150
151        // To construct the full PathInfo, we also need to populate the node field,
152        // and for this we need to download the NAR file and ingest it into castore.
153        // FUTUREWORK: Keep some database around mapping from narsha256 to
154        // (unnamed) rootnode, so we can use that (and the name from the
155        // StorePath) and avoid downloading the same NAR a second time.
156
157        // create a request for the NAR file itself.
158        let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
159            warn!(e = %e, "unable to join URL");
160            io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
161        })?;
162        debug!(nar_url= %nar_url, "constructed NAR url");
163
164        let resp = self
165            .http_client
166            .get(nar_url.clone())
167            .send()
168            .await
169            .map_err(|e| {
170                warn!(e=%e,"unable to send NAR request");
171                io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
172            })?;
173
174        // if the request is not successful, return an error.
175        if !resp.status().is_success() {
176            return Err(Error::StorageError(format!(
177                "unable to retrieve NAR at {}, status {}",
178                nar_url,
179                resp.status()
180            )));
181        }
182
183        // get a reader of the response body.
184        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
185            let e = e.without_url();
186            warn!(e=%e, "failed to get response body");
187            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
188        }));
189
190        // handle decompression, depending on the compression field.
191        let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
192            None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
193            Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
194                as Box<dyn AsyncRead + Send + Unpin>,
195            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
196                as Box<dyn AsyncRead + Send + Unpin>,
197            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
198                as Box<dyn AsyncRead + Send + Unpin>,
199            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
200                as Box<dyn AsyncRead + Send + Unpin>,
201            Some(comp_str) => {
202                return Err(Error::StorageError(format!(
203                    "unsupported compression: {comp_str}"
204                )));
205            }
206        };
207
208        let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
209            self.blob_service.clone(),
210            self.directory_service.clone(),
211            &mut r,
212            &narinfo.ca,
213        )
214        .await
215        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
216
217        // ensure the ingested narhash and narsize do actually match.
218        if narinfo.nar_size != nar_size {
219            warn!(
220                narinfo.nar_size = narinfo.nar_size,
221                http.nar_size = nar_size,
222                "NarSize mismatch"
223            );
224            Err(io::Error::new(
225                io::ErrorKind::InvalidData,
226                "NarSize mismatch".to_string(),
227            ))?;
228        }
229        if narinfo.nar_hash != nar_hash {
230            warn!(
231                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
232                http.nar_hash = %NixHash::Sha256(nar_hash),
233                "NarHash mismatch"
234            );
235            Err(io::Error::new(
236                io::ErrorKind::InvalidData,
237                "NarHash mismatch".to_string(),
238            ))?;
239        }
240
241        Ok(Some(PathInfo {
242            store_path: narinfo.store_path.to_owned(),
243            node: root_node,
244            references: narinfo.references.iter().map(StorePath::to_owned).collect(),
245            nar_size: narinfo.nar_size,
246            nar_sha256: narinfo.nar_hash,
247            deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
248            signatures: narinfo
249                .signatures
250                .into_iter()
251                .map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
252                .collect(),
253            ca: narinfo.ca,
254        }))
255    }
256
257    #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))]
258    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
259        Err(Error::InvalidRequest(
260            "put not supported for this backend".to_string(),
261        ))
262    }
263
264    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
265        Box::pin(futures::stream::once(async {
266            Err(Error::InvalidRequest(
267                "list not supported for this backend".to_string(),
268            ))
269        }))
270    }
271}
272
273#[derive(serde::Deserialize)]
274pub struct NixHTTPPathInfoServiceConfig {
275    base_url: String,
276    blob_service: String,
277    directory_service: String,
278    #[serde(default)]
279    /// An optional list of [narinfo::VerifyingKey].
280    /// If set, the .narinfo files received need to have correct signature by at least one of these.
281    public_keys: Option<Vec<String>>,
282}
283
284impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
285    type Error = Box<dyn std::error::Error + Send + Sync>;
286    fn try_from(url: Url) -> Result<Self, Self::Error> {
287        // Be careful about the distinction between `None` and `Some(vec![])`!
288        let mut public_keys: Option<Vec<String>> = None;
289        for (_, v) in url
290            .query_pairs()
291            .into_iter()
292            .filter(|(k, _)| k == "trusted-public-keys")
293        {
294            public_keys
295                .get_or_insert(Default::default())
296                .extend(v.split_ascii_whitespace().map(ToString::to_string));
297        }
298
299        // FUTUREWORK: move url deserialization to serde?
300        let blob_service = url
301            .query_pairs()
302            .into_iter()
303            .find(|(k, _)| k == "blob_service")
304            .map(|(_, v)| v.to_string())
305            .unwrap_or("root".to_string());
306        let directory_service = url
307            .query_pairs()
308            .into_iter()
309            .find(|(k, _)| k == "directory_service")
310            .map(|(_, v)| v.to_string())
311            .unwrap_or("root".to_string());
312
313        Ok(NixHTTPPathInfoServiceConfig {
314            // Stringify the URL and remove the nix+ prefix.
315            // We can't use `url.set_scheme(rest)`, as it disallows
316            // setting something http(s) that previously wasn't.
317            base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
318            blob_service,
319            directory_service,
320            public_keys,
321        })
322    }
323}
324
325#[async_trait]
326impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
327    type Output = dyn PathInfoService;
328    async fn build<'a>(
329        &'a self,
330        instance_name: &str,
331        context: &CompositionContext,
332    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
333        let (blob_service, directory_service) = futures::join!(
334            context.resolve::<dyn BlobService>(self.blob_service.clone()),
335            context.resolve::<dyn DirectoryService>(self.directory_service.clone())
336        );
337        let mut svc = NixHTTPPathInfoService::new(
338            instance_name.to_string(),
339            Url::parse(&self.base_url)?,
340            blob_service?,
341            directory_service?,
342        );
343        if let Some(public_keys) = &self.public_keys {
344            svc.set_public_keys(
345                public_keys
346                    .iter()
347                    .map(|pubkey_str| {
348                        narinfo::VerifyingKey::parse(pubkey_str)
349                            .map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
350                    })
351                    .collect::<Result<Vec<_>, Error>>()?,
352            );
353        }
354        Ok(Arc::new(svc))
355    }
356}