1use data_encoding::HEXLOWER;
2use futures::TryStreamExt;
3use md5::{Md5, digest::DynDigest};
4use nix_compat::{
5 nixhash::{CAHash, HashAlgo, NixHash},
6 store_path::{BuildStorePathError, StorePathRef, build_ca_path},
7};
8use sha1::Sha1;
9use sha2::{Digest, Sha256, Sha512, digest::Output};
10use snix_castore::{Node, blobservice::BlobService, directoryservice::DirectoryService};
11use snix_store::{
12 nar::{NarCalculationService, NarIngestionError},
13 pathinfoservice::{PathInfo, PathInfoService},
14};
15use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
16use tokio_util::io::{InspectReader, InspectWriter};
17use tracing::{Span, instrument, warn};
18use tracing_indicatif::span_ext::IndicatifSpanExt;
19use url::Url;
20
21use crate::builtins::FetcherError;
22
23mod decompression;
24use decompression::DecompressedReader;
25
26#[derive(Clone, Eq, PartialEq)]
28pub enum Fetch {
29 URL {
32 url: Url,
34 exp_hash: Option<NixHash>,
36 },
37
38 Tarball {
46 url: Url,
48 exp_nar_sha256: Option<[u8; 32]>,
50 },
51
52 NAR {
55 url: Url,
57 hash: NixHash,
60 },
61
62 Executable {
70 url: Url,
72 hash: NixHash,
75 },
76
77 Git(),
79}
80
81fn redact_url(url: &Url) -> Url {
83 let mut url = url.to_owned();
84 if !url.username().is_empty() {
85 let _ = url.set_username("redacted");
86 }
87
88 if url.password().is_some() {
89 let _ = url.set_password(Some("redacted"));
90 }
91
92 url
93}
94
95impl std::fmt::Debug for Fetch {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 Fetch::URL { url, exp_hash } => {
99 let url = redact_url(url);
100 if let Some(exp_hash) = exp_hash {
101 write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash)
102 } else {
103 write!(f, "URL [url: {}, exp_hash: None]", &url)
104 }
105 }
106 Fetch::Tarball {
107 url,
108 exp_nar_sha256,
109 } => {
110 let url = redact_url(url);
111 if let Some(exp_nar_sha256) = exp_nar_sha256 {
112 write!(
113 f,
114 "Tarball [url: {}, exp_nar_sha256: Some({})]",
115 url,
116 NixHash::Sha256(*exp_nar_sha256)
117 )
118 } else {
119 write!(f, "Tarball [url: {url}, exp_hash: None]")
120 }
121 }
122 Fetch::NAR { url, hash } => {
123 let url = redact_url(url);
124 write!(f, "NAR [url: {}, hash: {}]", &url, hash)
125 }
126 Fetch::Executable { url, hash } => {
127 let url = redact_url(url);
128 write!(f, "Executable [url: {}, hash: {}]", &url, hash)
129 }
130 Fetch::Git() => todo!(),
131 }
132 }
133}
134
135impl Fetch {
136 pub fn store_path<'a>(
140 &self,
141 name: &'a str,
142 ) -> Result<Option<StorePathRef<'a>>, BuildStorePathError> {
143 let ca_hash = match self {
144 Fetch::URL {
145 exp_hash: Some(exp_hash),
146 ..
147 } => CAHash::Flat(exp_hash.clone()),
148
149 Fetch::Tarball {
150 exp_nar_sha256: Some(exp_nar_sha256),
151 ..
152 } => CAHash::Nar(NixHash::Sha256(*exp_nar_sha256)),
153
154 Fetch::NAR { hash, .. } | Fetch::Executable { hash, .. } => {
155 CAHash::Nar(hash.to_owned())
156 }
157
158 Fetch::Git() => todo!(),
159
160 Fetch::URL { exp_hash: None, .. }
162 | Fetch::Tarball {
163 exp_nar_sha256: None,
164 ..
165 } => return Ok(None),
166 };
167
168 build_ca_path(name, &ca_hash, Vec::<String>::new(), false).map(Some)
170 }
171}
172
173pub struct Fetcher<BS, DS, PS, NS> {
175 http_client: reqwest::Client,
176 blob_service: BS,
177 directory_service: DS,
178 path_info_service: PS,
179 nar_calculation_service: NS,
180 hashed_mirrors: Vec<Url>,
181}
182
183impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
184 pub fn new(
185 blob_service: BS,
186 directory_service: DS,
187 path_info_service: PS,
188 nar_calculation_service: NS,
189 hashed_mirrors: Vec<Url>,
190 ) -> Self {
191 Self {
192 http_client: reqwest::Client::builder()
193 .user_agent(crate::USER_AGENT)
194 .build()
195 .expect("Client::new()"),
196 blob_service,
197 directory_service,
198 path_info_service,
199 nar_calculation_service,
200 hashed_mirrors,
201 }
202 }
203
204 async fn do_download(
208 &self,
209 url: Url,
210 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
211 let span = Span::current();
212 match url.scheme() {
213 "file" => {
214 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
215 FetcherError::Io(std::io::Error::new(
218 std::io::ErrorKind::InvalidData,
219 "invalid host for file:// scheme",
220 ))
221 })?)
222 .await?;
223
224 span.pb_set_length(f.metadata().await?.len());
225 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
226 span.pb_start();
227 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
228 f,
229 move |d| {
230 span.pb_inc(d.len() as u64);
231 },
232 ))))
233 }
234 _ => {
235 let resp = self.http_client.get(url.clone()).send().await?;
236 if !resp.status().is_success() {
237 use reqwest::StatusCode;
238 use std::io::ErrorKind;
239 let kind = match resp.status() {
240 StatusCode::BAD_REQUEST
241 | StatusCode::NOT_ACCEPTABLE
242 | StatusCode::URI_TOO_LONG => ErrorKind::InvalidData,
243 StatusCode::FORBIDDEN
244 | StatusCode::UNAUTHORIZED
245 | StatusCode::NETWORK_AUTHENTICATION_REQUIRED => {
246 ErrorKind::PermissionDenied
247 }
248 StatusCode::NOT_FOUND | StatusCode::GONE => ErrorKind::NotFound,
249 StatusCode::METHOD_NOT_ALLOWED => ErrorKind::Unsupported,
250 StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
251 ErrorKind::TimedOut
252 }
253 StatusCode::TOO_MANY_REQUESTS => ErrorKind::QuotaExceeded,
254 StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE => {
255 ErrorKind::ResourceBusy
256 }
257 _ => ErrorKind::Other,
258 };
259 return Err(FetcherError::Io(std::io::Error::new(
260 kind,
261 format!("unable to download '{}': {}", url, resp.status()),
262 )));
263 }
264
265 if let Some(content_length) = resp.content_length() {
266 span.pb_set_length(content_length);
267 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
268 } else {
269 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
270 }
271 span.pb_start();
272
273 Ok(Box::new(tokio_util::io::StreamReader::new(
274 resp.bytes_stream()
275 .inspect_ok(move |d| {
276 span.pb_inc(d.len() as u64);
277 })
278 .map_err(|e| {
279 let e = e.without_url();
280 warn!(%e, "failed to get response body");
281 std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
282 }),
283 )))
284 }
285 }
286 }
287
288 #[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
295 async fn download(
296 &self,
297 url: Url,
298 exp_hash: Option<&NixHash>,
299 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
300 let span = Span::current();
301 span.pb_set_message(&format!(
302 "📡Fetching {}",
303 redact_url(&url)
305 ));
306 if let Some(hash) = exp_hash {
307 let urls = self.hashed_mirrors.iter().map(|u| {
308 u.join(&format!(
309 "{}/{}",
310 hash.algo(),
311 HEXLOWER.encode(hash.digest_as_bytes())
312 ))
313 .expect("Snix bug!")
316 });
317 for url in urls {
318 if let Ok(result) = self.do_download(url).await {
319 return Ok(result);
320 }
321 }
322 }
323 self.do_download(url).await
324 }
325}
326
327async fn hash<D: Digest + std::io::Write>(
332 mut r: impl AsyncRead + Unpin,
333 mut w: impl AsyncWrite + Unpin,
334) -> std::io::Result<(Output<D>, u64)> {
335 let mut hasher = D::new();
336 let bytes_copied = tokio::io::copy(
337 &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()),
338 &mut w,
339 )
340 .await?;
341 Ok((hasher.finalize(), bytes_copied))
342}
343
344impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS>
345where
346 BS: BlobService + Clone + 'static,
347 DS: DirectoryService + Clone,
348 PS: PathInfoService,
349 NS: NarCalculationService,
350{
351 pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
356 match fetch {
357 Fetch::URL { url, exp_hash } => {
358 let mut r = self.download(url.clone(), exp_hash.as_ref()).await?;
360
361 let mut blob_writer = self.blob_service.open_write().await;
363
364 let (actual_hash, blob_size) = match exp_hash
368 .as_ref()
369 .map(NixHash::algo)
370 .unwrap_or_else(|| HashAlgo::Sha256)
371 {
372 HashAlgo::Sha256 => hash::<Sha256>(&mut r, &mut blob_writer).await.map(
373 |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written),
374 )?,
375 HashAlgo::Md5 => hash::<Md5>(&mut r, &mut blob_writer).await.map(
376 |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written),
377 )?,
378 HashAlgo::Sha1 => hash::<Sha1>(&mut r, &mut blob_writer).await.map(
379 |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written),
380 )?,
381 HashAlgo::Sha512 => hash::<Sha512>(&mut r, &mut blob_writer).await.map(
382 |(digest, bytes_written)| {
383 (NixHash::Sha512(Box::new(digest.into())), bytes_written)
384 },
385 )?,
386 };
387
388 if let Some(exp_hash) = exp_hash {
389 if exp_hash != actual_hash {
390 return Err(FetcherError::HashMismatch {
391 url,
392 wanted: exp_hash,
393 got: actual_hash,
394 });
395 }
396 }
397
398 Ok((
400 Node::File {
401 digest: blob_writer.close().await?,
402 size: blob_size,
403 executable: false,
404 },
405 CAHash::Flat(actual_hash),
406 blob_size,
407 ))
408 }
409 Fetch::Tarball {
410 url,
411 exp_nar_sha256,
412 } => {
413 let r = self.download(url.clone(), None).await?;
416
417 let r = DecompressedReader::new(r);
419 let archive = tokio_tar::Archive::new(r);
421
422 let node = snix_castore::import::archive::ingest_archive(
424 self.blob_service.clone(),
425 self.directory_service.clone(),
426 archive,
427 )
428 .await?;
429
430 let (nar_size, actual_nar_sha256) = self
435 .nar_calculation_service
436 .calculate_nar(&node)
437 .await
438 .map_err(|e| {
439 FetcherError::Io(e.into())
441 })?;
442
443 if let Some(exp_nar_sha256) = exp_nar_sha256 {
444 if exp_nar_sha256 != actual_nar_sha256 {
445 return Err(FetcherError::HashMismatch {
446 url,
447 wanted: NixHash::Sha256(exp_nar_sha256),
448 got: NixHash::Sha256(actual_nar_sha256),
449 });
450 }
451 }
452
453 Ok((
454 node,
455 CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
456 nar_size,
457 ))
458 }
459 Fetch::NAR {
460 url,
461 hash: exp_hash,
462 } => {
463 let r = self.download(url.clone(), Some(&exp_hash)).await?;
465
466 let mut r = DecompressedReader::new(r);
468
469 let (root_node, _actual_nar_sha256, actual_nar_size) =
471 snix_store::nar::ingest_nar_and_hash(
472 self.blob_service.clone(),
473 self.directory_service.clone(),
474 &mut r,
475 &Some(CAHash::Nar(exp_hash.clone())),
476 )
477 .await
478 .map_err(|e| match e {
479 NarIngestionError::HashMismatch { expected, actual } => {
480 FetcherError::HashMismatch {
481 url,
482 wanted: expected,
483 got: actual,
484 }
485 }
486 _ => FetcherError::Io(std::io::Error::other(e.to_string())),
487 })?;
488 Ok((
489 root_node,
490 CAHash::Nar(exp_hash),
492 actual_nar_size,
493 ))
494 }
495 Fetch::Executable {
496 url,
497 hash: exp_hash,
498 } => {
499 let mut r = self.download(url.clone(), Some(&exp_hash)).await?;
501
502 let mut blob_writer = self.blob_service.open_write().await;
504
505 let file_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
507 let blob_digest = blob_writer.close().await?;
508
509 let w = tokio::io::sink();
515 let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
517 HashAlgo::Md5 => Box::new(Md5::new()),
518 HashAlgo::Sha1 => Box::new(Sha1::new()),
519 HashAlgo::Sha256 => Box::new(Sha256::new()),
520 HashAlgo::Sha512 => Box::new(Sha512::new()),
521 };
522
523 let mut nar_size: u64 = 0;
524 let mut w = InspectWriter::new(w, |d| {
525 hasher.update(d);
526 nar_size += d.len() as u64;
527 });
528
529 {
530 let node = nix_compat::nar::writer::r#async::open(&mut w).await?;
531
532 let blob_reader = self
533 .blob_service
534 .open_read(&blob_digest)
535 .await?
536 .expect("Snix bug: just-uploaded blob not found");
537
538 node.file(true, file_size, &mut BufReader::new(blob_reader))
539 .await?;
540
541 w.flush().await?;
542 }
543
544 let actual_hash = {
546 match exp_hash.algo() {
547 HashAlgo::Md5 => {
548 NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
549 }
550 HashAlgo::Sha1 => {
551 NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
552 }
553 HashAlgo::Sha256 => {
554 NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
555 }
556 HashAlgo::Sha512 => {
557 NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
558 }
559 }
560 };
561
562 if exp_hash != actual_hash {
563 return Err(FetcherError::HashMismatch {
564 url,
565 wanted: exp_hash,
566 got: actual_hash,
567 });
568 }
569
570 let root_node = Node::File {
573 digest: blob_digest,
574 size: file_size,
575 executable: true,
576 };
577
578 Ok((root_node, CAHash::Nar(actual_hash), file_size))
579 }
580 Fetch::Git() => todo!(),
581 }
582 }
583
584 pub async fn ingest_and_persist<'a>(
590 &self,
591 name: &'a str,
592 fetch: Fetch,
593 ) -> Result<(StorePathRef<'a>, PathInfo), FetcherError> {
594 let (node, ca_hash, size) = self.ingest(fetch).await?;
596
597 let store_path = build_ca_path(name, &ca_hash, Vec::<String>::new(), false)?;
599
600 let (nar_size, nar_sha256) = match &ca_hash {
606 CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
607 CAHash::Nar(_) | CAHash::Flat(_) => self
608 .nar_calculation_service
609 .calculate_nar(&node)
610 .await
611 .map_err(|e| FetcherError::Io(e.into()))?,
612 CAHash::Text(_) => unreachable!("Snix bug: fetch returned CAHash::Text"),
613 };
614
615 let path_info = PathInfo {
617 store_path: store_path.to_owned(),
618 node: node.clone(),
619 references: vec![],
620 nar_size,
621 nar_sha256,
622 signatures: vec![],
623 deriver: None,
624 ca: Some(ca_hash),
625 };
626
627 self.path_info_service
628 .put(path_info.clone())
629 .await
630 .map_err(|e| FetcherError::Io(e.into()))?;
631
632 Ok((store_path, path_info))
633 }
634}
635
636pub(crate) fn url_basename(url: &Url) -> &str {
638 let s = url.path();
639 if s.is_empty() {
640 return "";
641 }
642
643 let mut last = s.len() - 1;
644 if s.chars().nth(last).unwrap() == '/' && last > 0 {
645 last -= 1;
646 }
647
648 if last == 0 {
649 return "";
650 }
651
652 let pos = match s[..=last].rfind('/') {
653 Some(pos) => {
654 if pos == last - 1 {
655 0
656 } else {
657 pos
658 }
659 }
660 None => 0,
661 };
662
663 &s[(pos + 1)..=last]
664}
665
666#[cfg(test)]
667mod tests {
668 mod fetch {
669 use super::super::*;
670 use crate::fetchers::Fetch;
671 use nix_compat::{nixbase32, nixhash::NixHash};
672 use rstest::rstest;
673
674 #[rstest]
675 #[case::url_no_hash(
676 Fetch::URL{
677 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
678 exp_hash: None,
679 },
680 None,
681 "notmuch-extract-patch"
682 )]
683 #[case::url_sha256(
684 Fetch::URL{
685 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
686 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
687 },
688 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
689 "notmuch-extract-patch"
690 )]
691 #[case::url_custom_name(
692 Fetch::URL{
693 url: Url::parse("https://test.example/owo").unwrap(),
694 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
695 },
696 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
697 "notmuch-extract-patch"
698 )]
699 #[case::nar_sha256(
700 Fetch::NAR{
701 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
702 hash: NixHash::from_sri("sha256-oj6yfWKbcEerK8D9GdPJtIAOveNcsH1ztGeSARGypRA=").unwrap(),
703 },
704 Some(StorePathRef::from_bytes(b"b40vjphshq4fdgv8s3yrp0bdlafi4920-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
705 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
706 )]
707 #[case::nar_sha1(
708 Fetch::NAR{
709 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
710 hash: NixHash::from_sri("sha1-F/fMsgwkXF8fPCg1v9zPZ4yOFIA=").unwrap(),
711 },
712 Some(StorePathRef::from_bytes(b"8kx7fdkdbzs4fkfb57xq0cbhs20ymq2n-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
713 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
714 )]
715 #[case::nar_sha1(
716 Fetch::Executable{
717 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
718 hash: NixHash::from_sri("sha1-NKNeU1csW5YJ4lCeWH3Z/apppNU=").unwrap(),
719 },
720 Some(StorePathRef::from_bytes(b"y92hm2xfk1009hrq0ix80j4m5k4j4w21-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
721 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
722 )]
723 fn fetch_store_path(
724 #[case] fetch: Fetch,
725 #[case] exp_path: Option<StorePathRef>,
726 #[case] name: &str,
727 ) {
728 assert_eq!(
729 exp_path,
730 fetch.store_path(name).expect("invalid name"),
731 "unexpected calculated store path"
732 );
733 }
734
735 #[test]
736 fn fetch_tarball_store_path() {
737 let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap();
738 let exp_sha256 =
739 nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm")
740 .unwrap();
741 let fetch = Fetch::Tarball {
742 url,
743 exp_nar_sha256: Some(exp_sha256),
744 };
745
746 assert_eq!(
747 "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source",
748 &fetch.store_path("source").unwrap().unwrap().to_string(),
749 )
750 }
751 }
752
753 mod url_basename {
754 use super::super::*;
755 use rstest::rstest;
756
757 #[rstest]
758 #[case::empty_path("", "")]
759 #[case::path_on_root("/dir", "dir")]
760 #[case::relative_path("dir/foo", "foo")]
761 #[case::root_with_trailing_slash("/", "")]
762 #[case::trailing_slash("/dir/", "dir")]
763 fn test_url_basename(#[case] url_path: &str, #[case] exp_basename: &str) {
764 let mut url = Url::parse("http://localhost").expect("invalid url");
765 url.set_path(url_path);
766 assert_eq!(url_basename(&url), exp_basename);
767 }
768 }
769}