1use data_encoding::HEXLOWER;
2use futures::TryStreamExt;
3use md5::{Md5, digest::DynDigest};
4use nix_compat::{
5 hashing::hash,
6 nixhash::{CAHash, HashAlgo, NixHash},
7 store_path::{BuildStorePathError, StorePathRef, build_ca_path},
8};
9use sha1::Sha1;
10use sha2::{Digest, Sha256, Sha512};
11use snix_castore::{Node, blobservice::BlobService, directoryservice::DirectoryService};
12use snix_store::{
13 decompression::DecompressedReader,
14 nar::{NarCalculationService, NarIngestionError},
15 pathinfoservice::{PathInfo, PathInfoService},
16};
17use tokio::io::{AsyncBufRead, AsyncWriteExt, BufReader};
18use tokio_util::io::{InspectReader, InspectWriter};
19use tracing::{Span, instrument, warn};
20use tracing_indicatif::span_ext::IndicatifSpanExt;
21use url::Url;
22
23use crate::builtins::FetcherError;
24
25#[derive(Clone, Eq, PartialEq)]
27pub enum Fetch {
28 URL {
31 url: Url,
33 exp_hash: Option<NixHash>,
35 },
36
37 Tarball {
45 url: Url,
47 exp_nar_sha256: Option<[u8; 32]>,
49 },
50
51 NAR {
54 url: Url,
56 hash: NixHash,
59 },
60
61 Executable {
69 url: Url,
71 hash: NixHash,
74 },
75
76 Git(),
78}
79
80fn redact_url(url: &Url) -> Url {
82 let mut url = url.to_owned();
83 if !url.username().is_empty() {
84 let _ = url.set_username("redacted");
85 }
86
87 if url.password().is_some() {
88 let _ = url.set_password(Some("redacted"));
89 }
90
91 url
92}
93
94impl std::fmt::Debug for Fetch {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 Fetch::URL { url, exp_hash } => {
98 let url = redact_url(url);
99 if let Some(exp_hash) = exp_hash {
100 write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash)
101 } else {
102 write!(f, "URL [url: {}, exp_hash: None]", &url)
103 }
104 }
105 Fetch::Tarball {
106 url,
107 exp_nar_sha256,
108 } => {
109 let url = redact_url(url);
110 if let Some(exp_nar_sha256) = exp_nar_sha256 {
111 write!(
112 f,
113 "Tarball [url: {}, exp_nar_sha256: Some({})]",
114 url,
115 NixHash::Sha256(*exp_nar_sha256)
116 )
117 } else {
118 write!(f, "Tarball [url: {url}, exp_hash: None]")
119 }
120 }
121 Fetch::NAR { url, hash } => {
122 let url = redact_url(url);
123 write!(f, "NAR [url: {}, hash: {}]", &url, hash)
124 }
125 Fetch::Executable { url, hash } => {
126 let url = redact_url(url);
127 write!(f, "Executable [url: {}, hash: {}]", &url, hash)
128 }
129 Fetch::Git() => todo!(),
130 }
131 }
132}
133
134impl Fetch {
135 pub fn store_path<'a>(
139 &self,
140 name: &'a str,
141 ) -> Result<Option<StorePathRef<'a>>, BuildStorePathError> {
142 let ca_hash = match self {
143 Fetch::URL {
144 exp_hash: Some(exp_hash),
145 ..
146 } => CAHash::Flat(exp_hash.clone()),
147
148 Fetch::Tarball {
149 exp_nar_sha256: Some(exp_nar_sha256),
150 ..
151 } => CAHash::Nar(NixHash::Sha256(*exp_nar_sha256)),
152
153 Fetch::NAR { hash, .. } | Fetch::Executable { hash, .. } => {
154 CAHash::Nar(hash.to_owned())
155 }
156
157 Fetch::Git() => todo!(),
158
159 Fetch::URL { exp_hash: None, .. }
161 | Fetch::Tarball {
162 exp_nar_sha256: None,
163 ..
164 } => return Ok(None),
165 };
166
167 build_ca_path(name, &ca_hash, [], false).map(Some)
169 }
170}
171
172pub struct Fetcher<BS, DS, PS, NS> {
174 http_client: reqwest::Client,
175 blob_service: BS,
176 directory_service: DS,
177 path_info_service: PS,
178 nar_calculation_service: NS,
179 hashed_mirrors: Vec<Url>,
180}
181
182impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
183 pub fn new(
184 blob_service: BS,
185 directory_service: DS,
186 path_info_service: PS,
187 nar_calculation_service: NS,
188 hashed_mirrors: Vec<Url>,
189 ) -> Self {
190 Self {
191 http_client: reqwest::Client::builder()
192 .user_agent(crate::USER_AGENT)
193 .build()
194 .expect("Client::new()"),
195 blob_service,
196 directory_service,
197 path_info_service,
198 nar_calculation_service,
199 hashed_mirrors,
200 }
201 }
202
203 async fn do_download(
207 &self,
208 url: Url,
209 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
210 let span = Span::current();
211 match url.scheme() {
212 "file" => {
213 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
214 FetcherError::Io(std::io::Error::new(
217 std::io::ErrorKind::InvalidData,
218 "invalid host for file:// scheme",
219 ))
220 })?)
221 .await?;
222
223 span.pb_set_length(f.metadata().await?.len());
224 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
225 span.pb_start();
226 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
227 f,
228 move |d| {
229 span.pb_inc(d.len() as u64);
230 },
231 ))))
232 }
233 _ => {
234 let resp = self.http_client.get(url.clone()).send().await?;
235 if !resp.status().is_success() {
236 use reqwest::StatusCode;
237 use std::io::ErrorKind;
238 let kind = match resp.status() {
239 StatusCode::BAD_REQUEST
240 | StatusCode::NOT_ACCEPTABLE
241 | StatusCode::URI_TOO_LONG => ErrorKind::InvalidData,
242 StatusCode::FORBIDDEN
243 | StatusCode::UNAUTHORIZED
244 | StatusCode::NETWORK_AUTHENTICATION_REQUIRED => {
245 ErrorKind::PermissionDenied
246 }
247 StatusCode::NOT_FOUND | StatusCode::GONE => ErrorKind::NotFound,
248 StatusCode::METHOD_NOT_ALLOWED => ErrorKind::Unsupported,
249 StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
250 ErrorKind::TimedOut
251 }
252 StatusCode::TOO_MANY_REQUESTS => ErrorKind::QuotaExceeded,
253 StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE => {
254 ErrorKind::ResourceBusy
255 }
256 _ => ErrorKind::Other,
257 };
258 return Err(FetcherError::Io(std::io::Error::new(
259 kind,
260 format!("unable to download '{}': {}", url, resp.status()),
261 )));
262 }
263
264 if let Some(content_length) = resp.content_length() {
265 span.pb_set_length(content_length);
266 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
267 } else {
268 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
269 }
270 span.pb_start();
271
272 Ok(Box::new(tokio_util::io::StreamReader::new(
273 resp.bytes_stream()
274 .inspect_ok(move |d| {
275 span.pb_inc(d.len() as u64);
276 })
277 .map_err(|e| {
278 let e = e.without_url();
279 warn!(%e, "failed to get response body");
280 std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
281 }),
282 )))
283 }
284 }
285 }
286
287 #[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
294 async fn download(
295 &self,
296 url: Url,
297 exp_hash: Option<&NixHash>,
298 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
299 let span = Span::current();
300 span.pb_set_message(&format!(
301 "📡Fetching {}",
302 redact_url(&url)
304 ));
305 if let Some(hash) = exp_hash {
306 let urls = self.hashed_mirrors.iter().map(|u| {
307 u.join(&format!(
308 "{}/{}",
309 hash.algo(),
310 HEXLOWER.encode(hash.digest_as_bytes())
311 ))
312 .expect("Snix bug!")
315 });
316 for url in urls {
317 if let Ok(result) = self.do_download(url).await {
318 return Ok(result);
319 }
320 }
321 }
322 self.do_download(url).await
323 }
324}
325
326impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS>
327where
328 BS: BlobService + Clone + 'static,
329 DS: DirectoryService + Clone,
330 PS: PathInfoService,
331 NS: NarCalculationService,
332{
333 pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
338 match fetch {
339 Fetch::URL { url, exp_hash } => {
340 let mut r = self.download(url.clone(), exp_hash.as_ref()).await?;
342
343 let mut blob_writer = self.blob_service.open_write().await;
345
346 let (actual_hash, blob_size) = match exp_hash
350 .as_ref()
351 .map(NixHash::algo)
352 .unwrap_or_else(|| HashAlgo::Sha256)
353 {
354 HashAlgo::Sha256 => hash::<Sha256>(&mut r, &mut blob_writer).await.map(
355 |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written),
356 )?,
357 HashAlgo::Md5 => hash::<Md5>(&mut r, &mut blob_writer).await.map(
358 |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written),
359 )?,
360 HashAlgo::Sha1 => hash::<Sha1>(&mut r, &mut blob_writer).await.map(
361 |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written),
362 )?,
363 HashAlgo::Sha512 => hash::<Sha512>(&mut r, &mut blob_writer).await.map(
364 |(digest, bytes_written)| {
365 (NixHash::Sha512(Box::new(digest.into())), bytes_written)
366 },
367 )?,
368 };
369
370 if let Some(exp_hash) = exp_hash
371 && exp_hash != actual_hash
372 {
373 return Err(FetcherError::HashMismatch {
374 url,
375 wanted: exp_hash,
376 got: actual_hash,
377 });
378 }
379
380 Ok((
382 Node::File {
383 digest: blob_writer.close().await?,
384 size: blob_size,
385 executable: false,
386 },
387 CAHash::Flat(actual_hash),
388 blob_size,
389 ))
390 }
391 Fetch::Tarball {
392 url,
393 exp_nar_sha256,
394 } => {
395 let r = self.download(url.clone(), None).await?;
398
399 let r = DecompressedReader::new(r).await?;
401
402 let node = snix_castore::import::archive::ingest_archive(
404 self.blob_service.clone(),
405 self.directory_service.clone(),
406 r,
407 )
408 .await?;
409
410 let (nar_size, actual_nar_sha256) = self
415 .nar_calculation_service
416 .calculate_nar(&node)
417 .await
418 .map_err(|e| {
419 FetcherError::Io(std::io::Error::other(e))
421 })?;
422
423 if let Some(exp_nar_sha256) = exp_nar_sha256
424 && exp_nar_sha256 != actual_nar_sha256
425 {
426 return Err(FetcherError::HashMismatch {
427 url,
428 wanted: NixHash::Sha256(exp_nar_sha256),
429 got: NixHash::Sha256(actual_nar_sha256),
430 });
431 }
432
433 Ok((
434 node,
435 CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
436 nar_size,
437 ))
438 }
439 Fetch::NAR {
440 url,
441 hash: exp_hash,
442 } => {
443 let r = self.download(url.clone(), Some(&exp_hash)).await?;
445
446 let mut r = DecompressedReader::new(r).await?;
448
449 let (root_node, _actual_nar_sha256, actual_nar_size) =
451 snix_store::nar::ingest_nar_and_hash(
452 self.blob_service.clone(),
453 self.directory_service.clone(),
454 &mut r,
455 &Some(CAHash::Nar(exp_hash.clone())),
456 )
457 .await
458 .map_err(|e| match e {
459 NarIngestionError::HashMismatch { expected, actual } => {
460 FetcherError::HashMismatch {
461 url,
462 wanted: expected,
463 got: actual,
464 }
465 }
466 _ => FetcherError::Io(std::io::Error::other(e.to_string())),
467 })?;
468 Ok((
469 root_node,
470 CAHash::Nar(exp_hash),
472 actual_nar_size,
473 ))
474 }
475 Fetch::Executable {
476 url,
477 hash: exp_hash,
478 } => {
479 let mut r = self.download(url.clone(), Some(&exp_hash)).await?;
481
482 let mut blob_writer = self.blob_service.open_write().await;
484
485 let file_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
487 let blob_digest = blob_writer.close().await?;
488
489 let w = tokio::io::sink();
495 let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
497 HashAlgo::Md5 => Box::new(Md5::new()),
498 HashAlgo::Sha1 => Box::new(Sha1::new()),
499 HashAlgo::Sha256 => Box::new(Sha256::new()),
500 HashAlgo::Sha512 => Box::new(Sha512::new()),
501 };
502
503 let mut nar_size: u64 = 0;
504 let mut w = InspectWriter::new(w, |d| {
505 hasher.update(d);
506 nar_size += d.len() as u64;
507 });
508
509 {
510 let node = nix_compat::nar::writer::r#async::open(&mut w).await?;
511
512 let blob_reader = self
513 .blob_service
514 .open_read(&blob_digest)
515 .await?
516 .expect("Snix bug: just-uploaded blob not found");
517
518 node.file(true, file_size, &mut BufReader::new(blob_reader))
519 .await?;
520
521 w.flush().await?;
522 }
523
524 let actual_hash = {
526 match exp_hash.algo() {
527 HashAlgo::Md5 => {
528 NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
529 }
530 HashAlgo::Sha1 => {
531 NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
532 }
533 HashAlgo::Sha256 => {
534 NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
535 }
536 HashAlgo::Sha512 => {
537 NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
538 }
539 }
540 };
541
542 if exp_hash != actual_hash {
543 return Err(FetcherError::HashMismatch {
544 url,
545 wanted: exp_hash,
546 got: actual_hash,
547 });
548 }
549
550 let root_node = Node::File {
553 digest: blob_digest,
554 size: file_size,
555 executable: true,
556 };
557
558 Ok((root_node, CAHash::Nar(actual_hash), file_size))
559 }
560 Fetch::Git() => todo!(),
561 }
562 }
563
564 pub async fn ingest_and_persist<'a>(
570 &self,
571 name: &'a str,
572 fetch: Fetch,
573 ) -> Result<(StorePathRef<'a>, PathInfo), FetcherError> {
574 let (node, ca_hash, size) = self.ingest(fetch).await?;
576
577 let store_path = build_ca_path(name, &ca_hash, [], false)?;
579
580 let (nar_size, nar_sha256) = match &ca_hash {
586 CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
587 CAHash::Nar(_) | CAHash::Flat(_) => self
588 .nar_calculation_service
589 .calculate_nar(&node)
590 .await
591 .map_err(|e| FetcherError::Io(std::io::Error::other(e)))?,
592 CAHash::Text(_) => unreachable!("Snix bug: fetch returned CAHash::Text"),
593 };
594
595 let path_info = PathInfo {
597 store_path: store_path.to_owned(),
598 node: node.clone(),
599 references: vec![],
600 nar_size,
601 nar_sha256,
602 signatures: vec![],
603 deriver: None,
604 ca: Some(ca_hash),
605 };
606
607 self.path_info_service
608 .put(path_info.clone())
609 .await
610 .map_err(|e| FetcherError::Io(std::io::Error::other(e)))?;
611
612 Ok((store_path, path_info))
613 }
614}
615
616pub(crate) fn url_basename(url: &Url) -> &str {
618 let s = url.path().trim_end_matches('/');
619
620 match s.rsplit_once('/') {
621 None => url.host_str().unwrap_or_default(),
622 Some((_, basename)) => basename,
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 mod fetch {
629 use super::super::*;
630 use crate::fetchers::Fetch;
631 use nix_compat::{nixbase32, nixhash::NixHash};
632 use rstest::rstest;
633
634 #[rstest]
635 #[case::url_no_hash(
636 Fetch::URL{
637 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
638 exp_hash: None,
639 },
640 None,
641 "notmuch-extract-patch"
642 )]
643 #[case::url_sha256(
644 Fetch::URL{
645 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
646 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
647 },
648 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
649 "notmuch-extract-patch"
650 )]
651 #[case::url_custom_name(
652 Fetch::URL{
653 url: Url::parse("https://test.example/owo").unwrap(),
654 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
655 },
656 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
657 "notmuch-extract-patch"
658 )]
659 #[case::nar_sha256(
660 Fetch::NAR{
661 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
662 hash: NixHash::from_sri("sha256-oj6yfWKbcEerK8D9GdPJtIAOveNcsH1ztGeSARGypRA=").unwrap(),
663 },
664 Some(StorePathRef::from_bytes(b"b40vjphshq4fdgv8s3yrp0bdlafi4920-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
665 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
666 )]
667 #[case::nar_sha1(
668 Fetch::NAR{
669 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
670 hash: NixHash::from_sri("sha1-F/fMsgwkXF8fPCg1v9zPZ4yOFIA=").unwrap(),
671 },
672 Some(StorePathRef::from_bytes(b"8kx7fdkdbzs4fkfb57xq0cbhs20ymq2n-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
673 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
674 )]
675 #[case::nar_sha1(
676 Fetch::Executable{
677 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
678 hash: NixHash::from_sri("sha1-NKNeU1csW5YJ4lCeWH3Z/apppNU=").unwrap(),
679 },
680 Some(StorePathRef::from_bytes(b"y92hm2xfk1009hrq0ix80j4m5k4j4w21-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
681 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
682 )]
683 fn fetch_store_path(
684 #[case] fetch: Fetch,
685 #[case] exp_path: Option<StorePathRef>,
686 #[case] name: &str,
687 ) {
688 assert_eq!(
689 exp_path,
690 fetch.store_path(name).expect("invalid name"),
691 "unexpected calculated store path"
692 );
693 }
694
695 #[test]
696 fn fetch_tarball_store_path() {
697 let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap();
698 let exp_sha256 =
699 nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm")
700 .unwrap();
701 let fetch = Fetch::Tarball {
702 url,
703 exp_nar_sha256: Some(exp_sha256),
704 };
705
706 assert_eq!(
707 "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source",
708 &fetch.store_path("source").unwrap().unwrap().to_string(),
709 )
710 }
711 }
712
713 mod url_basename {
714 use super::super::*;
715 use rstest::rstest;
716
717 #[rstest]
718 #[case::empty_path("", "localhost")]
719 #[case::path_on_root("/dir", "dir")]
720 #[case::relative_path("dir/foo", "foo")]
721 #[case::root_with_trailing_slash("/", "localhost")]
722 #[case::root_with_many_trailing_slashes("///", "localhost")]
723 #[case::trailing_slash("/dir/", "dir")]
724 #[case::many_trailing_slashes("/dir//", "dir")]
725 fn test_url_basename(#[case] url_path: &str, #[case] exp_basename: &str) {
726 let mut url = Url::parse("http://localhost").expect("invalid url");
727 url.set_path(url_path);
728 assert_eq!(url_basename(&url), exp_basename);
729 }
730 }
731}