snix_store/pathinfoservice/
nix_http.rs

1use super::{PathInfo, PathInfoService};
2use crate::{
3    nar::{NarIngestionError, ingest_nar_and_hash},
4    pathinfoservice,
5};
6use futures::{TryStreamExt, stream::BoxStream};
7use nix_compat::{
8    narinfo::{self, NarInfo, Signature},
9    nixbase32,
10    nixhash::NixHash,
11    store_path::StorePath,
12};
13use reqwest::StatusCode;
14use snix_castore::composition::{CompositionContext, ServiceBuilder};
15use snix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
16use std::sync::Arc;
17use tokio::io::{self, AsyncRead};
18use tonic::async_trait;
19use tracing::{Span, instrument, warn};
20use url::Url;
21
22/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
23/// protocol provided by Nix binary caches such as cache.nixos.org, and the Snix
24/// Store Model.
25/// It implements the [PathInfoService] trait in an interesting way:
26/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
27/// inserting components into a [BlobService] and [DirectoryService], then
28/// returning a [PathInfo] struct with the root.
29///
30/// Due to this being quite a costly operation, clients are expected to layer
31/// this service with store composition, so they're only ingested once.
32///
33/// The client is expected to be (indirectly) using the same [BlobService] and
34/// [DirectoryService], so able to fetch referred Directories and Blobs.
35/// [PathInfoService::put] is not implemented and returns an error if called.
36/// TODO: what about reading from nix-cache-info?
37pub struct NixHTTPPathInfoService<BS, DS> {
38    instance_name: String,
39    base_url: url::Url,
40    http_client: reqwest_middleware::ClientWithMiddleware,
41
42    blob_service: BS,
43    directory_service: DS,
44
45    /// An optional list of [narinfo::VerifyingKey].
46    /// If the list is not empty, the .narinfo files received need to have
47    /// correct signature by at least one of these.
48    trusted_public_keys: Vec<narinfo::VerifyingKey>,
49}
50
51impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
52    pub fn try_build(
53        instance_name: String,
54        config: NixHTTPPathInfoServiceConfig,
55        blob_service: BS,
56        directory_service: DS,
57    ) -> Result<Self, Error> {
58        let mut trusted_public_keys = Vec::new();
59        for s in config.params.trusted_public_keys {
60            trusted_public_keys.push(
61                narinfo::VerifyingKey::parse(&s).map_err(|e| Error::ParseTrustedPublicKey(s, e))?,
62            )
63        }
64
65        Ok(Self {
66            instance_name,
67            base_url: config.base_url,
68            http_client: reqwest_middleware::ClientBuilder::new(
69                reqwest::Client::builder()
70                    .user_agent(crate::USER_AGENT)
71                    .build()
72                    .map_err(reqwest_middleware::Error::Reqwest)?,
73            )
74            .with(snix_tracing::propagate::reqwest::tracing_middleware())
75            .build(),
76            blob_service,
77            directory_service,
78
79            trusted_public_keys,
80        })
81    }
82
83    #[instrument(level=tracing::Level::TRACE, skip_all,fields(path.digest=nixbase32::encode(&digest)),err)]
84    fn derive_narinfo_url(&self, digest: [u8; 20]) -> Result<Url, Error> {
85        let s = format!("{}.narinfo", nixbase32::encode(&digest));
86        self.base_url
87            .join(&s)
88            .map_err(|e| Error::JoinUrl(self.base_url.to_owned(), s.to_owned(), e))
89    }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum Error {
94    #[error("wrong arguments: {0}")]
95    WrongConfig(&'static str),
96    #[error("serde-qs error: {0}")]
97    SerdeQS(#[from] serde_qs::Error),
98    #[error("unable to parse pubkey {0}")]
99    ParseTrustedPublicKey(String, nix_compat::narinfo::VerifyingKeyError),
100
101    #[error("unable to join URL {0} with {1}")]
102    JoinUrl(Url, String, url::ParseError),
103    #[error("reqwest error")]
104    Reqwest(#[from] reqwest_middleware::Error),
105    #[error("unable to decode NARInfo response as string")]
106    DecodeBody(reqwest::Error),
107    #[error("unable to parse NARInfo")]
108    ParseNARInfo(nix_compat::narinfo::Error),
109    #[error("no valid signature found")]
110    NoValidSignature,
111    #[error("failed to request NAR, status {0}")]
112    FailedToRequestNAR(reqwest::StatusCode),
113    #[error("unsupported NAR compression: {0}")]
114    UnsupportedNARCompression(String),
115    #[error("failed to ingest NAR")]
116    IngestNAR(NarIngestionError),
117    #[error("NARSize mismatch, narinfo size {narinfo_size}, actual size {actual_size}")]
118    NARSizeMismatch { narinfo_size: u64, actual_size: u64 },
119    #[error("NARHash mismatch, narinfo NARHash {exp}, actual NARHash {act}",
120        exp = NixHash::Sha256(*.narinfo_nar_sha256),
121        act = NixHash::Sha256(*.actual_nar_sha256))]
122    NARHashMismatch {
123        narinfo_nar_sha256: [u8; 32],
124        actual_nar_sha256: [u8; 32],
125    },
126
127    #[error("put not supported")]
128    PutNotSupported,
129    #[error("list not supported")]
130    ListNotSupported,
131}
132
133#[async_trait]
134impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
135where
136    BS: BlobService + Send + Sync + Clone + 'static,
137    DS: DirectoryService + Send + Sync + Clone + 'static,
138{
139    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
140    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
141        let narinfo_url = self.derive_narinfo_url(digest)?;
142
143        let span = Span::current();
144        span.record("narinfo.url", narinfo_url.to_string());
145
146        let resp = self
147            .http_client
148            .get(narinfo_url)
149            .send()
150            .await
151            .map_err(Error::Reqwest)?;
152
153        // In the case of a 404, return a NotFound.
154        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
155        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
156        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
157            return Ok(None);
158        }
159
160        let narinfo_str = resp.text().await.map_err(Error::DecodeBody)?;
161
162        // parse the received narinfo
163        let narinfo = NarInfo::parse(&narinfo_str).map_err(Error::ParseNARInfo)?;
164
165        // if [self.trusted_public_keys] is set, ensure there's at least one valid signature.
166        if !self.trusted_public_keys.is_empty() {
167            let fingerprint = narinfo.fingerprint();
168
169            if !self.trusted_public_keys.iter().any(|pubkey| {
170                narinfo
171                    .signatures
172                    .iter()
173                    .any(|sig| pubkey.verify(&fingerprint, sig))
174            }) {
175                Err(Error::NoValidSignature)?
176            }
177        }
178
179        // To construct the full PathInfo, we also need to populate the node field,
180        // and for this we need to download the NAR file and ingest it into castore.
181        // FUTUREWORK: Keep some database around mapping from narsha256 to
182        // (unnamed) rootnode, so we can use that and avoid downloading the same
183        // NAR a second time.
184
185        // create a request for the NAR file itself.
186        let nar_url = self
187            .base_url
188            .join(narinfo.url)
189            .map_err(|e| Error::JoinUrl(self.base_url.clone(), narinfo.url.to_owned(), e))?;
190        span.record("nar.url", nar_url.to_string());
191
192        let resp = self
193            .http_client
194            .get(nar_url.clone())
195            .send()
196            .await
197            .map_err(Error::Reqwest)?;
198
199        // if the request is not successful, return an error.
200        if !resp.status().is_success() {
201            Err(Error::FailedToRequestNAR(resp.status()))?;
202        }
203
204        // get a reader of the response body.
205        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
206            let e = e.without_url();
207            warn!(e=%e, "failed to get response body");
208            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
209        }));
210
211        // handle decompression, depending on the compression field.
212        let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
213            None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
214            Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
215                as Box<dyn AsyncRead + Send + Unpin>,
216            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
217                as Box<dyn AsyncRead + Send + Unpin>,
218            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
219                as Box<dyn AsyncRead + Send + Unpin>,
220            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
221                as Box<dyn AsyncRead + Send + Unpin>,
222            Some(comp_str) => Err(Error::UnsupportedNARCompression(comp_str.to_owned()))?,
223        };
224
225        let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
226            self.blob_service.clone(),
227            &self.directory_service,
228            &mut r,
229            &narinfo.ca,
230        )
231        .await
232        .map_err(Error::IngestNAR)?;
233
234        // ensure the ingested narhash and narsize do actually match.
235        if narinfo.nar_size != nar_size {
236            Err(Error::NARSizeMismatch {
237                narinfo_size: narinfo.nar_size,
238                actual_size: nar_size,
239            })?
240        }
241        if narinfo.nar_hash != nar_hash {
242            Err(Error::NARHashMismatch {
243                narinfo_nar_sha256: narinfo.nar_hash,
244                actual_nar_sha256: nar_hash,
245            })?
246        }
247
248        Ok(Some(PathInfo {
249            store_path: narinfo.store_path.to_owned(),
250            node: root_node,
251            references: narinfo.references.iter().map(StorePath::to_owned).collect(),
252            nar_size: narinfo.nar_size,
253            nar_sha256: narinfo.nar_hash,
254            deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
255            signatures: narinfo
256                .signatures
257                .into_iter()
258                .map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
259                .collect(),
260            ca: narinfo.ca,
261        }))
262    }
263
264    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
265    async fn has(&self, digest: [u8; 20]) -> Result<bool, pathinfoservice::Error> {
266        let narinfo_url = self.derive_narinfo_url(digest)?;
267
268        let span = Span::current();
269        span.record("narinfo.url", narinfo_url.to_string());
270
271        let resp = self
272            .http_client
273            .head(narinfo_url)
274            .send()
275            .await
276            .map_err(Error::Reqwest)?;
277
278        // In the case of a 404, return a NotFound.
279        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
280        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
281        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
282            Ok(false)
283        } else {
284            Ok(true)
285        }
286    }
287
288    #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))]
289    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
290        Err(Box::new(Error::PutNotSupported))
291    }
292
293    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
294        Box::pin(futures::stream::once(async {
295            Err(Error::ListNotSupported)?
296        }))
297    }
298}
299
300#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
301#[serde(deny_unknown_fields)]
302pub struct NixHTTPPathInfoServiceConfig {
303    base_url: Url,
304
305    #[serde(flatten)]
306    params: NixHTTPPathInfoServiceParams,
307}
308
309#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
310#[serde(deny_unknown_fields)]
311struct NixHTTPPathInfoServiceParams {
312    #[serde(default = "default_blob_service")]
313    blob_service: String,
314    #[serde(default = "default_directory_service")]
315    directory_service: String,
316    #[serde(default)]
317    /// An optional list of [narinfo::VerifyingKey].
318    /// If not empty, the .narinfo files received need to have correct signature by at least one of these.
319    trusted_public_keys: Vec<String>,
320}
321
322fn default_blob_service() -> String {
323    "&root".to_string()
324}
325fn default_directory_service() -> String {
326    "&root".to_string()
327}
328
329impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
330    type Error = Box<dyn std::error::Error + Send + Sync>;
331    fn try_from(url: Url) -> Result<Self, Self::Error> {
332        let scheme = url
333            .scheme()
334            .strip_prefix("nix+")
335            .ok_or_else(|| Error::WrongConfig("scheme must start with nix+"))?;
336
337        if !url.has_authority() {
338            Err(Error::WrongConfig("url must have authority component"))?
339        }
340        if !url.has_host() {
341            Err(Error::WrongConfig("url must have host component"))?
342        }
343        if !["http", "https"].contains(&scheme) {
344            Err(Error::WrongConfig("unknown scheme"))?
345        }
346
347        Ok(NixHTTPPathInfoServiceConfig {
348            // Stringify the URL and remove the nix+ prefix.
349            // We can't use `url.set_scheme(rest)`, as it disallows
350            // setting something http(s) that previously wasn't.
351            // Also make sure to drop the query, we don't want to leak our
352            // config to the remote HTTP endpoint we query.
353            base_url: {
354                let mut url: Url = url
355                    .to_string()
356                    .strip_prefix("nix+")
357                    .unwrap()
358                    .parse()
359                    .expect("stripped URL to parse again");
360                url.set_query(None);
361                url
362            },
363            params: serde_qs::from_str(url.query().unwrap_or_default())?,
364        })
365    }
366}
367
368#[async_trait]
369impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
370    type Output = dyn PathInfoService;
371    async fn build<'a>(
372        &'a self,
373        instance_name: &str,
374        context: &CompositionContext,
375    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
376        let (blob_service, directory_service) = futures::join!(
377            context.resolve::<dyn BlobService>(&self.params.blob_service),
378            context.resolve::<dyn DirectoryService>(&self.params.directory_service)
379        );
380        let svc = NixHTTPPathInfoService::try_build(
381            instance_name.to_string(),
382            self.to_owned(),
383            blob_service?,
384            directory_service?,
385        )?;
386        Ok(Arc::new(svc))
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::{NixHTTPPathInfoServiceConfig, NixHTTPPathInfoServiceParams};
393    use rstest::rstest;
394    use url::Url;
395
396    #[rstest]
397    /// Correct Scheme for the cache.nixos.org binary cache.
398    #[case::correct_nix_https("nix+https://cache.nixos.org", Some(
399        NixHTTPPathInfoServiceConfig {
400            base_url: "https://cache.nixos.org".try_into().unwrap(),
401            params: NixHTTPPathInfoServiceParams {
402                blob_service: "&root".to_string(),
403                directory_service: "&root".to_string(),
404                trusted_public_keys: vec![]
405            }
406        }
407    ))]
408    /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
409    #[case::correct_nix_http("nix+http://cache.nixos.org", Some(
410        NixHTTPPathInfoServiceConfig {
411            base_url: "http://cache.nixos.org".try_into().unwrap(),
412            params: NixHTTPPathInfoServiceParams {
413                blob_service: "&root".to_string(),
414                directory_service: "&root".to_string(),
415                trusted_public_keys: vec![]
416            }
417        }
418    ))]
419    /// Correct Scheme for Nix HTTP Binary cache, with a subpath.
420    #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", Some(
421        NixHTTPPathInfoServiceConfig {
422            base_url: "http://192.0.2.1/foo".try_into().unwrap(),
423            params: NixHTTPPathInfoServiceParams {
424                blob_service: "&root".to_string(),
425                directory_service: "&root".to_string(),
426                trusted_public_keys: vec![]
427            }
428        }
429    ))]
430    /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
431    #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", Some(
432        NixHTTPPathInfoServiceConfig {
433            base_url: "http://[::1]:8080/foo".try_into().unwrap(),
434            params: NixHTTPPathInfoServiceParams {
435                blob_service: "&root".to_string(),
436                directory_service: "&root".to_string(),
437                trusted_public_keys: vec![]
438            }
439        }
440
441    ))]
442    /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set
443    #[case::correct_nix_https_with_trusted_public_key(
444        "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", Some(
445        NixHTTPPathInfoServiceConfig {
446            base_url: "https://cache.nixos.org".try_into().unwrap(),
447            params: NixHTTPPathInfoServiceParams {
448                blob_service: "&root".to_string(),
449                directory_service: "&root".to_string(),
450                trusted_public_keys: vec![
451                    "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string()
452                ]
453            }
454        }
455    ))]
456    /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set
457    #[case::correct_nix_https_with_two_trusted_public_keys(
458        "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=&trusted_public_keys[1]=foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", Some(
459        NixHTTPPathInfoServiceConfig {
460            base_url: "https://cache.nixos.org".try_into().unwrap(),
461            params: NixHTTPPathInfoServiceParams {
462                blob_service: "&root".to_string(),
463                directory_service: "&root".to_string(),
464                trusted_public_keys: vec![
465                    "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string(),
466                    "foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=".to_string()
467                ]
468            }
469        }
470    ))]
471    #[case::wrong_scheme("nix+grpc://example.com", None)]
472    #[case::missing_host("nix+http:///", None)]
473    #[case::missing_authority("nix+http:", None)]
474    /// Correct cache.nixos.org binary cache URL, but wrong `trusted_public_keys` param usage (should be list)
475    #[case::trusted_public_keys_no_sequence(
476        "nix+https://cache.nixos.org?trusted_public_keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
477        None
478    )]
479    /// Correct cache.nixos.org binary cache URL, but wrong param name
480    #[case::trusted_public_keys_wrong_pubkey(
481        "nix+https://cache.nixos.org?trustedpublickeys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
482        None
483    )]
484    fn parse_url(#[case] url_str: &str, #[case] exp_config: Option<NixHTTPPathInfoServiceConfig>) {
485        let url: Url = url_str.parse().expect("url to parse");
486
487        match (NixHTTPPathInfoServiceConfig::try_from(url), exp_config) {
488            (Ok(_), None) => panic!("parsing url unexpectedly succeeded"),
489            (Ok(config), Some(exp_config)) => assert_eq!(exp_config, config),
490            (Err(_), None) => {}
491            (Err(e), Some(_)) => panic!("parsing url unexpectedly failed: {e}"),
492        }
493    }
494}