1use data_encoding::HEXLOWER;
2use futures::TryStreamExt;
3use md5::{Md5, digest::DynDigest};
4use nix_compat::{
5 hashing::hash,
6 nixhash::{CAHash, CAHashMode, HashAlgo, NixHash},
7 store_path::{ParseStorePathError, StorePathRef, build_ca_path},
8};
9use sha1::Sha1;
10use sha2::{Digest, Sha256, Sha512};
11use snix_castore::{Node, blobservice::BlobService, directoryservice::DirectoryService};
12use snix_store::{
13 decompression::DecompressedReader,
14 nar::{NarCalculationService, NarIngestionError},
15 pathinfoservice::{PathInfo, PathInfoService},
16};
17use tokio::io::{AsyncBufRead, AsyncWriteExt, BufReader};
18use tokio_util::io::{InspectReader, InspectWriter};
19use tracing::{Span, instrument, warn};
20use tracing_indicatif::span_ext::IndicatifSpanExt;
21use url::Url;
22
23mod error;
24pub use error::FetcherError;
25
26#[derive(Clone, Eq, PartialEq)]
28pub enum Fetch {
29 URL {
32 url: Url,
34 exp_hash: Option<NixHash>,
36 },
37
38 Tarball {
46 url: Url,
48 exp_nar_sha256: Option<[u8; 32]>,
50 },
51
52 NAR {
55 url: Url,
57 hash: NixHash,
60 },
61
62 Executable {
70 url: Url,
72 hash: NixHash,
75 },
76
77 Git(),
79}
80
81fn redact_url(url: &Url) -> Url {
83 let mut url = url.to_owned();
84 if !url.username().is_empty() {
85 let _ = url.set_username("redacted");
86 }
87
88 if url.password().is_some() {
89 let _ = url.set_password(Some("redacted"));
90 }
91
92 url
93}
94
95impl std::fmt::Debug for Fetch {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 Fetch::URL { url, exp_hash } => {
99 let url = redact_url(url);
100 if let Some(exp_hash) = exp_hash {
101 write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash)
102 } else {
103 write!(f, "URL [url: {}, exp_hash: None]", &url)
104 }
105 }
106 Fetch::Tarball {
107 url,
108 exp_nar_sha256,
109 } => {
110 let url = redact_url(url);
111 if let Some(exp_nar_sha256) = exp_nar_sha256 {
112 write!(
113 f,
114 "Tarball [url: {}, exp_nar_sha256: Some({})]",
115 url,
116 NixHash::Sha256(*exp_nar_sha256)
117 )
118 } else {
119 write!(f, "Tarball [url: {url}, exp_hash: None]")
120 }
121 }
122 Fetch::NAR { url, hash } => {
123 let url = redact_url(url);
124 write!(f, "NAR [url: {}, hash: {}]", &url, hash)
125 }
126 Fetch::Executable { url, hash } => {
127 let url = redact_url(url);
128 write!(f, "Executable [url: {}, hash: {}]", &url, hash)
129 }
130 Fetch::Git() => todo!(),
131 }
132 }
133}
134
135impl Fetch {
136 pub fn store_path<'a>(
140 &self,
141 name: &'a str,
142 ) -> Result<Option<StorePathRef<'a>>, ParseStorePathError> {
143 let ca_hash = match self {
144 Fetch::URL {
145 exp_hash: Some(exp_hash),
146 ..
147 } => CAHash::Flat(exp_hash.clone()),
148
149 Fetch::Tarball {
150 exp_nar_sha256: Some(exp_nar_sha256),
151 ..
152 } => CAHash::Nar(NixHash::Sha256(*exp_nar_sha256)),
153
154 Fetch::NAR { hash, .. } | Fetch::Executable { hash, .. } => {
155 CAHash::Nar(hash.to_owned())
156 }
157
158 Fetch::Git() => unimplemented!(),
159
160 Fetch::URL { exp_hash: None, .. }
162 | Fetch::Tarball {
163 exp_nar_sha256: None,
164 ..
165 } => return Ok(None),
166 };
167
168 build_ca_path(
170 name,
171 ca_hash.mode() == CAHashMode::Nar,
172 &ca_hash.hash(),
173 [],
174 false,
175 )
176 .map(Some)
177 }
178}
179
180pub struct Fetcher<BS, DS, PS, NS> {
182 http_client: reqwest::Client,
183 blob_service: BS,
184 directory_service: DS,
185 path_info_service: PS,
186 nar_calculation_service: NS,
187 hashed_mirrors: Vec<Url>,
188}
189
190impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
191 pub fn new(
192 blob_service: BS,
193 directory_service: DS,
194 path_info_service: PS,
195 nar_calculation_service: NS,
196 hashed_mirrors: Vec<Url>,
197 ) -> Self {
198 Self {
199 http_client: reqwest::Client::builder()
200 .user_agent(crate::USER_AGENT)
201 .build()
202 .expect("Client::new()"),
203 blob_service,
204 directory_service,
205 path_info_service,
206 nar_calculation_service,
207 hashed_mirrors,
208 }
209 }
210
211 async fn do_download(
215 &self,
216 url: Url,
217 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
218 let span = Span::current();
219 match url.scheme() {
220 "file" => {
221 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
222 FetcherError::Io(std::io::Error::new(
225 std::io::ErrorKind::InvalidData,
226 "invalid host for file:// scheme",
227 ))
228 })?)
229 .await?;
230
231 span.pb_set_length(f.metadata().await?.len());
232 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
233 span.pb_start();
234 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
235 f,
236 move |d| {
237 span.pb_inc(d.len() as u64);
238 },
239 ))))
240 }
241 _ => {
242 let resp = self.http_client.get(url.clone()).send().await?;
243 if !resp.status().is_success() {
244 use reqwest::StatusCode;
245 use std::io::ErrorKind;
246 let kind = match resp.status() {
247 StatusCode::BAD_REQUEST
248 | StatusCode::NOT_ACCEPTABLE
249 | StatusCode::URI_TOO_LONG => ErrorKind::InvalidData,
250 StatusCode::FORBIDDEN
251 | StatusCode::UNAUTHORIZED
252 | StatusCode::NETWORK_AUTHENTICATION_REQUIRED => {
253 ErrorKind::PermissionDenied
254 }
255 StatusCode::NOT_FOUND | StatusCode::GONE => ErrorKind::NotFound,
256 StatusCode::METHOD_NOT_ALLOWED => ErrorKind::Unsupported,
257 StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
258 ErrorKind::TimedOut
259 }
260 StatusCode::TOO_MANY_REQUESTS => ErrorKind::QuotaExceeded,
261 StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE => {
262 ErrorKind::ResourceBusy
263 }
264 _ => ErrorKind::Other,
265 };
266 return Err(FetcherError::Io(std::io::Error::new(
267 kind,
268 format!("unable to download '{}': {}", url, resp.status()),
269 )));
270 }
271
272 if let Some(content_length) = resp.content_length() {
273 span.pb_set_length(content_length);
274 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
275 } else {
276 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
277 }
278 span.pb_start();
279
280 Ok(Box::new(tokio_util::io::StreamReader::new(
281 resp.bytes_stream()
282 .inspect_ok(move |d| {
283 span.pb_inc(d.len() as u64);
284 })
285 .map_err(|e| {
286 let e = e.without_url();
287 warn!(%e, "failed to get response body");
288 std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
289 }),
290 )))
291 }
292 }
293 }
294
295 #[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
302 async fn download(
303 &self,
304 url: Url,
305 exp_hash: Option<&NixHash>,
306 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
307 let span = Span::current();
308 span.pb_set_message(&format!(
309 "📡Fetching {}",
310 redact_url(&url)
312 ));
313 if let Some(hash) = exp_hash {
314 let urls = self.hashed_mirrors.iter().map(|u| {
315 u.join(&format!(
316 "{}/{}",
317 hash.algo(),
318 HEXLOWER.encode(hash.digest_as_bytes())
319 ))
320 .expect("Snix bug!")
323 });
324 for url in urls {
325 if let Ok(result) = self.do_download(url).await {
326 return Ok(result);
327 }
328 }
329 }
330 self.do_download(url).await
331 }
332}
333
334impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS>
335where
336 BS: BlobService + Clone + 'static,
337 DS: DirectoryService + Clone,
338 PS: PathInfoService,
339 NS: NarCalculationService,
340{
341 pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
346 match fetch {
347 Fetch::URL { url, exp_hash } => {
348 let mut r = self.download(url.clone(), exp_hash.as_ref()).await?;
350
351 let mut blob_writer = self.blob_service.open_write().await;
353
354 let (actual_hash, blob_size) = match exp_hash
358 .as_ref()
359 .map(NixHash::algo)
360 .unwrap_or_else(|| HashAlgo::Sha256)
361 {
362 HashAlgo::Sha256 => hash::<Sha256>(&mut r, &mut blob_writer).await.map(
363 |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written),
364 )?,
365 HashAlgo::Md5 => hash::<Md5>(&mut r, &mut blob_writer).await.map(
366 |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written),
367 )?,
368 HashAlgo::Sha1 => hash::<Sha1>(&mut r, &mut blob_writer).await.map(
369 |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written),
370 )?,
371 HashAlgo::Sha512 => hash::<Sha512>(&mut r, &mut blob_writer).await.map(
372 |(digest, bytes_written)| {
373 (NixHash::Sha512(Box::new(digest.into())), bytes_written)
374 },
375 )?,
376 };
377
378 if let Some(exp_hash) = exp_hash
379 && exp_hash != actual_hash
380 {
381 return Err(FetcherError::HashMismatch {
382 url,
383 wanted: exp_hash,
384 got: actual_hash,
385 });
386 }
387
388 Ok((
390 Node::File {
391 digest: blob_writer.close().await?,
392 size: blob_size,
393 executable: false,
394 },
395 CAHash::Flat(actual_hash),
396 blob_size,
397 ))
398 }
399 Fetch::Tarball {
400 url,
401 exp_nar_sha256,
402 } => {
403 let r = self.download(url.clone(), None).await?;
406
407 let r = DecompressedReader::new(r).await?;
409
410 let node = snix_castore::import::archive::ingest_archive(
412 self.blob_service.clone(),
413 self.directory_service.clone(),
414 r,
415 )
416 .await?;
417
418 let (nar_size, actual_nar_sha256) = self
423 .nar_calculation_service
424 .calculate_nar(&node)
425 .await
426 .map_err(|e| {
427 FetcherError::Io(std::io::Error::other(e))
429 })?;
430
431 if let Some(exp_nar_sha256) = exp_nar_sha256
432 && exp_nar_sha256 != actual_nar_sha256
433 {
434 return Err(FetcherError::HashMismatch {
435 url,
436 wanted: NixHash::Sha256(exp_nar_sha256),
437 got: NixHash::Sha256(actual_nar_sha256),
438 });
439 }
440
441 Ok((
442 node,
443 CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
444 nar_size,
445 ))
446 }
447 Fetch::NAR {
448 url,
449 hash: exp_hash,
450 } => {
451 let r = self.download(url.clone(), Some(&exp_hash)).await?;
453
454 let mut r = DecompressedReader::new(r).await?;
456
457 let (root_node, _actual_nar_sha256, actual_nar_size) =
459 snix_store::nar::ingest_nar_and_hash(
460 self.blob_service.clone(),
461 self.directory_service.clone(),
462 &mut r,
463 &Some(CAHash::Nar(exp_hash.clone())),
464 )
465 .await
466 .map_err(|e| match e {
467 NarIngestionError::HashMismatch { expected, actual } => {
468 FetcherError::HashMismatch {
469 url,
470 wanted: expected,
471 got: actual,
472 }
473 }
474 _ => FetcherError::Io(std::io::Error::other(e.to_string())),
475 })?;
476 Ok((
477 root_node,
478 CAHash::Nar(exp_hash),
480 actual_nar_size,
481 ))
482 }
483 Fetch::Executable {
484 url,
485 hash: exp_hash,
486 } => {
487 let mut r = self.download(url.clone(), Some(&exp_hash)).await?;
489
490 let mut blob_writer = self.blob_service.open_write().await;
492
493 let file_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
495 let blob_digest = blob_writer.close().await?;
496
497 let w = tokio::io::sink();
503 let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
505 HashAlgo::Md5 => Box::new(Md5::new()),
506 HashAlgo::Sha1 => Box::new(Sha1::new()),
507 HashAlgo::Sha256 => Box::new(Sha256::new()),
508 HashAlgo::Sha512 => Box::new(Sha512::new()),
509 };
510
511 let mut nar_size: u64 = 0;
512 let mut w = InspectWriter::new(w, |d| {
513 hasher.update(d);
514 nar_size += d.len() as u64;
515 });
516
517 {
518 let node = nix_compat::nar::writer::r#async::open(&mut w).await?;
519
520 let blob_reader = self
521 .blob_service
522 .open_read(&blob_digest)
523 .await?
524 .expect("Snix bug: just-uploaded blob not found");
525
526 node.file(true, file_size, &mut BufReader::new(blob_reader))
527 .await?;
528
529 w.flush().await?;
530 }
531
532 let actual_hash = {
534 match exp_hash.algo() {
535 HashAlgo::Md5 => {
536 NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
537 }
538 HashAlgo::Sha1 => {
539 NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
540 }
541 HashAlgo::Sha256 => {
542 NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
543 }
544 HashAlgo::Sha512 => {
545 NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
546 }
547 }
548 };
549
550 if exp_hash != actual_hash {
551 return Err(FetcherError::HashMismatch {
552 url,
553 wanted: exp_hash,
554 got: actual_hash,
555 });
556 }
557
558 let root_node = Node::File {
561 digest: blob_digest,
562 size: file_size,
563 executable: true,
564 };
565
566 Ok((root_node, CAHash::Nar(actual_hash), file_size))
567 }
568 Fetch::Git() => todo!(),
569 }
570 }
571
572 pub async fn ingest_and_persist<'a>(
578 &self,
579 name: &'a str,
580 fetch: Fetch,
581 ) -> Result<(StorePathRef<'a>, PathInfo), FetcherError> {
582 let (node, ca_hash, size) = self.ingest(fetch).await?;
584
585 let store_path = build_ca_path(
587 name,
588 ca_hash.mode() == CAHashMode::Nar,
589 &ca_hash.hash(),
590 [],
591 false,
592 )?;
593
594 let (nar_size, nar_sha256) = match &ca_hash {
600 CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
601 CAHash::Nar(_) | CAHash::Flat(_) => self
602 .nar_calculation_service
603 .calculate_nar(&node)
604 .await
605 .map_err(|e| FetcherError::Io(std::io::Error::other(e)))?,
606 CAHash::Text(_) => unreachable!("Snix bug: fetch returned CAHash::Text"),
607 };
608
609 let path_info = PathInfo {
611 store_path: store_path.to_owned(),
612 node: node.clone(),
613 references: vec![],
614 nar_size,
615 nar_sha256,
616 signatures: vec![],
617 deriver: None,
618 ca: Some(ca_hash),
619 };
620
621 self.path_info_service
622 .put(path_info.clone())
623 .await
624 .map_err(|e| FetcherError::Io(std::io::Error::other(e)))?;
625
626 Ok((store_path, path_info))
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 mod fetch {
633 use super::super::*;
634 use crate::fetchers::Fetch;
635 use nix_compat::{nixbase32, nixhash::NixHash};
636 use rstest::rstest;
637
638 #[rstest]
639 #[case::url_no_hash(
640 Fetch::URL{
641 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
642 exp_hash: None,
643 },
644 None,
645 "notmuch-extract-patch"
646 )]
647 #[case::url_sha256(
648 Fetch::URL{
649 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
650 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
651 },
652 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
653 "notmuch-extract-patch"
654 )]
655 #[case::url_custom_name(
656 Fetch::URL{
657 url: Url::parse("https://test.example/owo").unwrap(),
658 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
659 },
660 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
661 "notmuch-extract-patch"
662 )]
663 #[case::nar_sha256(
664 Fetch::NAR{
665 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
666 hash: NixHash::from_sri("sha256-oj6yfWKbcEerK8D9GdPJtIAOveNcsH1ztGeSARGypRA=").unwrap(),
667 },
668 Some(StorePathRef::from_bytes(b"b40vjphshq4fdgv8s3yrp0bdlafi4920-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
669 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
670 )]
671 #[case::nar_sha1(
672 Fetch::NAR{
673 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
674 hash: NixHash::from_sri("sha1-F/fMsgwkXF8fPCg1v9zPZ4yOFIA=").unwrap(),
675 },
676 Some(StorePathRef::from_bytes(b"8kx7fdkdbzs4fkfb57xq0cbhs20ymq2n-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
677 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
678 )]
679 #[case::nar_sha1(
680 Fetch::Executable{
681 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
682 hash: NixHash::from_sri("sha1-NKNeU1csW5YJ4lCeWH3Z/apppNU=").unwrap(),
683 },
684 Some(StorePathRef::from_bytes(b"y92hm2xfk1009hrq0ix80j4m5k4j4w21-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
685 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
686 )]
687 fn fetch_store_path(
688 #[case] fetch: Fetch,
689 #[case] exp_path: Option<StorePathRef>,
690 #[case] name: &str,
691 ) {
692 assert_eq!(
693 exp_path,
694 fetch.store_path(name).expect("invalid name"),
695 "unexpected calculated store path"
696 );
697 }
698
699 #[test]
700 fn fetch_tarball_store_path() {
701 let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap();
702 let exp_sha256 =
703 nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm")
704 .unwrap();
705 let fetch = Fetch::Tarball {
706 url,
707 exp_nar_sha256: Some(exp_sha256),
708 };
709
710 assert_eq!(
711 "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source",
712 &fetch.store_path("source").unwrap().unwrap().to_string(),
713 )
714 }
715 }
716}