1use super::{PathInfo, PathInfoService};
2use crate::{
3 nar::{NarIngestionError, ingest_nar_and_hash},
4 pathinfoservice::{self, nix_http::castore_infused::try_infused_nar_path},
5};
6use futures::{TryStreamExt, stream::BoxStream};
7use nix_compat::{
8 narinfo::{self, NarInfo, Signature},
9 nixbase32,
10 nixhash::NixHash,
11 store_path::StorePath,
12};
13use reqwest::StatusCode;
14use snix_castore::{
15 blobservice::{self, BlobService},
16 directoryservice::{self, DirectoryService},
17 proto::{
18 blob_service_client::BlobServiceClient, directory_service_client::DirectoryServiceClient,
19 },
20};
21use snix_castore::{
22 composition::{CompositionContext, ServiceBuilder},
23 directoryservice::GRPCDirectoryService,
24};
25use std::sync::Arc;
26use tokio::io::{self, AsyncRead};
27use tonic::{async_trait, transport::Channel};
28use tracing::{Span, instrument, warn};
29use url::Url;
30
31mod castore_infused;
32
33pub struct NixHTTPPathInfoService<BS: Clone, DS> {
49 instance_name: String,
50 base_url: url::Url,
51 http_client: reqwest_middleware::ClientWithMiddleware,
52
53 blob_service: BS,
54 directory_service: DS,
55
56 layered_blob_service: blobservice::Cache<BS, blobservice::GRPCBlobService<Channel>>,
60 layered_directory_service: directoryservice::Cache<DS, GRPCDirectoryService<Channel>>,
64
65 trusted_public_keys: Vec<narinfo::VerifyingKey>,
69
70 force_download_nar: bool,
72}
73
74impl<BS, DS> NixHTTPPathInfoService<BS, DS>
75where
76 BS: Clone,
77 DS: DirectoryService + Clone,
78{
79 pub fn try_build(
80 instance_name: String,
81 config: NixHTTPPathInfoServiceConfig,
82 blob_service: BS,
83 directory_service: DS,
84 ) -> Result<Self, Error> {
85 let mut trusted_public_keys = Vec::new();
86 for s in config.params.trusted_public_keys {
87 trusted_public_keys.push(
88 narinfo::VerifyingKey::parse(&s).map_err(|e| Error::ParseTrustedPublicKey(s, e))?,
89 )
90 }
91
92 let (layered_blob_service, layered_directory_service) = {
93 let grpc_url = {
94 let url_str = format!("grpc+{}", config.base_url);
95 let mut url: Url = url_str.parse().expect("url to parse");
96 url.set_path("");
97 url
98 };
99
100 let channel =
101 snix_castore::tonic::TonicConnector::from_url(&grpc_url)?.connect_expect_lazy();
102
103 let instance_name_layered = format!("{}-layered", &instance_name);
104 let instance_name_grpc = format!("{}-grpc", &instance_name);
105
106 (
107 blobservice::Cache::new(
108 instance_name_layered.clone(),
109 blob_service.clone(),
110 blobservice::GRPCBlobService::from_client(
111 instance_name_grpc.clone(),
112 BlobServiceClient::new(channel.clone()),
113 ),
114 ),
115 directoryservice::Cache::new(instance_name_layered, directory_service.clone(), {
116 GRPCDirectoryService::from_client(
117 instance_name_grpc,
118 DirectoryServiceClient::new(channel),
119 )
120 }),
121 )
122 };
123
124 Ok(Self {
125 instance_name,
126 base_url: config.base_url,
127 http_client: reqwest_middleware::ClientBuilder::new(
128 reqwest::Client::builder()
129 .user_agent(crate::USER_AGENT)
130 .build()
131 .map_err(reqwest_middleware::Error::Reqwest)?,
132 )
133 .with(snix_tracing::propagate::reqwest::tracing_middleware())
134 .build(),
135 blob_service,
136 directory_service,
137
138 layered_blob_service,
139 layered_directory_service,
140
141 trusted_public_keys,
142 force_download_nar: config.params.force_download_nar,
143 })
144 }
145
146 #[instrument(level=tracing::Level::TRACE, skip_all,fields(path.digest=nixbase32::encode(&digest)),err)]
147 fn derive_narinfo_url(&self, digest: [u8; 20]) -> Result<Url, Error> {
148 let s = format!("{}.narinfo", nixbase32::encode(&digest));
149 self.base_url
150 .join(&s)
151 .map_err(|e| Error::JoinUrl(self.base_url.to_owned(), s.to_owned(), e))
152 }
153}
154
155#[derive(Debug, thiserror::Error)]
156pub enum Error {
157 #[error("wrong arguments: {0}")]
158 WrongConfig(&'static str),
159 #[error("serde-qs error: {0}")]
160 SerdeQS(#[from] serde_qs::Error),
161 #[error("unable to parse pubkey {0}")]
162 ParseTrustedPublicKey(String, nix_compat::narinfo::VerifyingKeyError),
163 #[error("unable to construct tonic channel: {0}")]
164 TonicChannel(#[from] snix_castore::tonic::Error),
165
166 #[error("unable to join URL {0} with {1}")]
167 JoinUrl(Url, String, url::ParseError),
168 #[error("reqwest error")]
169 Reqwest(#[from] reqwest_middleware::Error),
170 #[error("unable to decode NARInfo response as string")]
171 DecodeBody(reqwest::Error),
172 #[error("unable to parse NARInfo")]
173 ParseNARInfo(nix_compat::narinfo::Error),
174 #[error("no valid signature found")]
175 NoValidSignature,
176 #[error("failed to request NAR, status {0}")]
177 FailedToRequestNAR(reqwest::StatusCode),
178 #[error("unsupported NAR compression: {0}")]
179 UnsupportedNARCompression(String),
180 #[error("failed to ingest NAR")]
181 IngestNAR(NarIngestionError),
182 #[error("NARSize mismatch, narinfo size {narinfo_size}, actual size {actual_size}")]
183 NARSizeMismatch { narinfo_size: u64, actual_size: u64 },
184 #[error("NARHash mismatch, narinfo NARHash {exp}, actual NARHash {act}",
185 exp = NixHash::Sha256(*.narinfo_nar_sha256),
186 act = NixHash::Sha256(*.actual_nar_sha256))]
187 NARHashMismatch {
188 narinfo_nar_sha256: [u8; 32],
189 actual_nar_sha256: [u8; 32],
190 },
191
192 #[error("put not supported")]
193 PutNotSupported,
194 #[error("list not supported")]
195 ListNotSupported,
196}
197
198#[async_trait]
199impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
200where
201 BS: BlobService + Send + Sync + Clone + 'static,
202 DS: DirectoryService + Send + Sync + Clone + 'static,
203{
204 #[instrument(skip_all, err, fields(
205 path.digest=nixbase32::encode(&digest),
206 instance_name=%self.instance_name,
207 narinfo.url=tracing::field::Empty,
208 nar.url=tracing::field::Empty,
209 ))]
210 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
211 let narinfo_url = self.derive_narinfo_url(digest)?;
212
213 let span = Span::current();
214 span.record("narinfo.url", narinfo_url.to_string());
215
216 let resp = self
217 .http_client
218 .get(narinfo_url)
219 .send()
220 .await
221 .map_err(Error::Reqwest)?;
222
223 if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
227 return Ok(None);
228 }
229
230 let narinfo_str = resp.text().await.map_err(Error::DecodeBody)?;
231
232 let narinfo = NarInfo::parse(&narinfo_str).map_err(Error::ParseNARInfo)?;
234
235 if narinfo.store_path.digest() != &digest {
237 return Err("Store path digest in NARInfo doesn't match".into());
238 }
239
240 if !self.trusted_public_keys.is_empty() {
242 let fingerprint = narinfo.fingerprint();
243
244 if !self.trusted_public_keys.iter().any(|pubkey| {
245 narinfo
246 .signatures
247 .iter()
248 .any(|sig| pubkey.verify(&fingerprint, sig))
249 }) {
250 Err(Error::NoValidSignature)?
251 }
252 }
253
254 let root_node = if !self.force_download_nar
264 && let Some(root_node) = try_infused_nar_path(
265 &narinfo,
266 self.layered_blob_service.clone(),
267 &self.layered_directory_service,
268 )
269 .await
270 .unwrap_or_else(|err| {
271 warn!(%err, "unable to use infused store path");
272 None
273 }) {
274 root_node
275 } else {
276 let nar_url = self
278 .base_url
279 .join(narinfo.url)
280 .map_err(|e| Error::JoinUrl(self.base_url.clone(), narinfo.url.to_owned(), e))?;
281 span.record("nar.url", nar_url.to_string());
282
283 let resp = self
284 .http_client
285 .get(nar_url.clone())
286 .send()
287 .await
288 .map_err(Error::Reqwest)?;
289
290 if !resp.status().is_success() {
292 Err(Error::FailedToRequestNAR(resp.status()))?;
293 }
294
295 let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
297 let e = e.without_url();
298 warn!(e=%e, "failed to get response body");
299 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
300 }));
301
302 let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
304 None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
305 Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
306 as Box<dyn AsyncRead + Send + Unpin>,
307 Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
308 as Box<dyn AsyncRead + Send + Unpin>,
309 Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
310 as Box<dyn AsyncRead + Send + Unpin>,
311 Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
312 as Box<dyn AsyncRead + Send + Unpin>,
313 Some(comp_str) => Err(Error::UnsupportedNARCompression(comp_str.to_owned()))?,
314 };
315
316 let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
317 self.blob_service.clone(),
318 &self.directory_service,
319 &mut r,
320 &narinfo.ca,
321 )
322 .await
323 .map_err(Error::IngestNAR)?;
324
325 if narinfo.nar_size != nar_size {
327 Err(Error::NARSizeMismatch {
328 narinfo_size: narinfo.nar_size,
329 actual_size: nar_size,
330 })?
331 }
332 if narinfo.nar_hash != nar_hash {
333 Err(Error::NARHashMismatch {
334 narinfo_nar_sha256: narinfo.nar_hash,
335 actual_nar_sha256: nar_hash,
336 })?
337 }
338 root_node
339 };
340
341 Ok(Some(PathInfo {
342 store_path: narinfo.store_path.to_owned(),
343 node: root_node,
344 references: narinfo.references.iter().map(StorePath::to_owned).collect(),
345 nar_size: narinfo.nar_size,
346 nar_sha256: narinfo.nar_hash,
347 deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
348 signatures: narinfo
349 .signatures
350 .into_iter()
351 .map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
352 .collect(),
353 ca: narinfo.ca,
354 }))
355 }
356
357 #[instrument(skip_all, err, fields(
358 path.digest=nixbase32::encode(&digest),
359 instance_name=%self.instance_name,
360 narinfo.url=tracing::field::Empty,
361 ))]
362 async fn has(&self, digest: [u8; 20]) -> Result<bool, pathinfoservice::Error> {
363 let narinfo_url = self.derive_narinfo_url(digest)?;
364
365 let span = Span::current();
366 span.record("narinfo.url", narinfo_url.to_string());
367
368 let resp = self
369 .http_client
370 .head(narinfo_url)
371 .send()
372 .await
373 .map_err(Error::Reqwest)?;
374
375 if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
379 Ok(false)
380 } else {
381 Ok(true)
382 }
383 }
384
385 #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))]
386 async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
387 Err(Box::new(Error::PutNotSupported))
388 }
389
390 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
391 Box::pin(futures::stream::once(async {
392 Err(Error::ListNotSupported)?
393 }))
394 }
395}
396
397#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
398#[serde(deny_unknown_fields)]
399pub struct NixHTTPPathInfoServiceConfig {
400 base_url: Url,
401
402 #[serde(flatten)]
403 params: NixHTTPPathInfoServiceParams,
404}
405
406#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
407#[serde(deny_unknown_fields)]
408struct NixHTTPPathInfoServiceParams {
409 #[serde(default = "default_blob_service")]
410 blob_service: String,
411 #[serde(default = "default_directory_service")]
412 directory_service: String,
413 #[serde(default)]
414 trusted_public_keys: Vec<String>,
417
418 #[serde(default)]
419 force_download_nar: bool,
421}
422
423fn default_blob_service() -> String {
424 "&root".to_string()
425}
426fn default_directory_service() -> String {
427 "&root".to_string()
428}
429
430impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
431 type Error = Box<dyn std::error::Error + Send + Sync>;
432 fn try_from(url: Url) -> Result<Self, Self::Error> {
433 let scheme = url
434 .scheme()
435 .strip_prefix("nix+")
436 .ok_or_else(|| Error::WrongConfig("scheme must start with nix+"))?;
437
438 if !url.has_authority() {
439 Err(Error::WrongConfig("url must have authority component"))?
440 }
441 if !url.has_host() {
442 Err(Error::WrongConfig("url must have host component"))?
443 }
444 if !["http", "https"].contains(&scheme) {
445 Err(Error::WrongConfig("unknown scheme"))?
446 }
447
448 Ok(NixHTTPPathInfoServiceConfig {
449 base_url: {
455 let mut url: Url = url
456 .to_string()
457 .strip_prefix("nix+")
458 .unwrap()
459 .parse()
460 .expect("stripped URL to parse again");
461 url.set_query(None);
462 url
463 },
464 params: serde_qs::from_str(url.query().unwrap_or_default())?,
465 })
466 }
467}
468
469#[async_trait]
470impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
471 type Output = dyn PathInfoService;
472 async fn build<'a>(
473 &'a self,
474 instance_name: &str,
475 context: &CompositionContext,
476 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
477 let (blob_service, directory_service) = futures::join!(
478 context.resolve::<dyn BlobService>(&self.params.blob_service),
479 context.resolve::<dyn DirectoryService>(&self.params.directory_service)
480 );
481 let svc = NixHTTPPathInfoService::try_build(
482 instance_name.to_string(),
483 self.to_owned(),
484 blob_service?,
485 directory_service?,
486 )?;
487 Ok(Arc::new(svc))
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::{NixHTTPPathInfoServiceConfig, NixHTTPPathInfoServiceParams};
494 use rstest::rstest;
495 use url::Url;
496
497 #[rstest]
498 #[case::correct_nix_https("nix+https://cache.nixos.org", Some(
500 NixHTTPPathInfoServiceConfig {
501 base_url: "https://cache.nixos.org".try_into().unwrap(),
502 params: NixHTTPPathInfoServiceParams {
503 blob_service: "&root".to_string(),
504 directory_service: "&root".to_string(),
505 trusted_public_keys: vec![],
506 force_download_nar: false,
507 }
508 }
509 ))]
510 #[case::correct_nix_http("nix+http://cache.nixos.org", Some(
512 NixHTTPPathInfoServiceConfig {
513 base_url: "http://cache.nixos.org".try_into().unwrap(),
514 params: NixHTTPPathInfoServiceParams {
515 blob_service: "&root".to_string(),
516 directory_service: "&root".to_string(),
517 trusted_public_keys: vec![],
518 force_download_nar: false,
519 }
520 }
521 ))]
522 #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", Some(
524 NixHTTPPathInfoServiceConfig {
525 base_url: "http://192.0.2.1/foo".try_into().unwrap(),
526 params: NixHTTPPathInfoServiceParams {
527 blob_service: "&root".to_string(),
528 directory_service: "&root".to_string(),
529 trusted_public_keys: vec![],
530 force_download_nar: false,
531 }
532 }
533 ))]
534 #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", Some(
536 NixHTTPPathInfoServiceConfig {
537 base_url: "http://[::1]:8080/foo".try_into().unwrap(),
538 params: NixHTTPPathInfoServiceParams {
539 blob_service: "&root".to_string(),
540 directory_service: "&root".to_string(),
541 trusted_public_keys: vec![],
542 force_download_nar: false,
543 }
544 }
545
546 ))]
547 #[case::correct_nix_https_with_trusted_public_key(
549 "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", Some(
550 NixHTTPPathInfoServiceConfig {
551 base_url: "https://cache.nixos.org".try_into().unwrap(),
552 params: NixHTTPPathInfoServiceParams {
553 blob_service: "&root".to_string(),
554 directory_service: "&root".to_string(),
555 trusted_public_keys: vec![
556 "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string()
557 ],
558 force_download_nar: false,
559 }
560 }
561 ))]
562 #[case::correct_nix_https_with_two_trusted_public_keys(
564 "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=&trusted_public_keys[1]=foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", Some(
565 NixHTTPPathInfoServiceConfig {
566 base_url: "https://cache.nixos.org".try_into().unwrap(),
567 params: NixHTTPPathInfoServiceParams {
568 blob_service: "&root".to_string(),
569 directory_service: "&root".to_string(),
570 trusted_public_keys: vec![
571 "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string(),
572 "foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=".to_string()
573 ],
574 force_download_nar: false,
575 }
576 }
577 ))]
578 #[case::wrong_scheme("nix+grpc://example.com", None)]
579 #[case::missing_host("nix+http:///", None)]
580 #[case::missing_authority("nix+http:", None)]
581 #[case::trusted_public_keys_no_sequence(
583 "nix+https://cache.nixos.org?trusted_public_keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
584 None
585 )]
586 #[case::trusted_public_keys_wrong_pubkey(
588 "nix+https://cache.nixos.org?trustedpublickeys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
589 None
590 )]
591 fn parse_url(#[case] url_str: &str, #[case] exp_config: Option<NixHTTPPathInfoServiceConfig>) {
592 let url: Url = url_str.parse().expect("url to parse");
593
594 match (NixHTTPPathInfoServiceConfig::try_from(url), exp_config) {
595 (Ok(_), None) => panic!("parsing url unexpectedly succeeded"),
596 (Ok(config), Some(exp_config)) => assert_eq!(exp_config, config),
597 (Err(_), None) => {}
598 (Err(e), Some(_)) => panic!("parsing url unexpectedly failed: {e}"),
599 }
600 }
601}