1use futures::TryStreamExt;
2use md5::{Md5, digest::DynDigest};
3use nix_compat::{
4 nixhash::{CAHash, HashAlgo, NixHash},
5 store_path::{BuildStorePathError, StorePathRef, build_ca_path},
6};
7use sha1::Sha1;
8use sha2::{Digest, Sha256, Sha512, digest::Output};
9use snix_castore::{Node, blobservice::BlobService, directoryservice::DirectoryService};
10use snix_store::{
11 nar::{NarCalculationService, NarIngestionError},
12 pathinfoservice::{PathInfo, PathInfoService},
13};
14use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
15use tokio_util::io::{InspectReader, InspectWriter};
16use tracing::{Span, instrument, warn};
17use tracing_indicatif::span_ext::IndicatifSpanExt;
18use url::Url;
19
20use crate::builtins::FetcherError;
21
22mod decompression;
23use decompression::DecompressedReader;
24
25#[derive(Clone, Eq, PartialEq)]
27pub enum Fetch {
28 URL {
31 url: Url,
33 exp_hash: Option<NixHash>,
35 },
36
37 Tarball {
45 url: Url,
47 exp_nar_sha256: Option<[u8; 32]>,
49 },
50
51 NAR {
54 url: Url,
56 hash: NixHash,
59 },
60
61 Executable {
69 url: Url,
71 hash: NixHash,
74 },
75
76 Git(),
78}
79
80fn redact_url(url: &Url) -> Url {
82 let mut url = url.to_owned();
83 if !url.username().is_empty() {
84 let _ = url.set_username("redacted");
85 }
86
87 if url.password().is_some() {
88 let _ = url.set_password(Some("redacted"));
89 }
90
91 url
92}
93
94impl std::fmt::Debug for Fetch {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 Fetch::URL { url, exp_hash } => {
98 let url = redact_url(url);
99 if let Some(exp_hash) = exp_hash {
100 write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash)
101 } else {
102 write!(f, "URL [url: {}, exp_hash: None]", &url)
103 }
104 }
105 Fetch::Tarball {
106 url,
107 exp_nar_sha256,
108 } => {
109 let url = redact_url(url);
110 if let Some(exp_nar_sha256) = exp_nar_sha256 {
111 write!(
112 f,
113 "Tarball [url: {}, exp_nar_sha256: Some({})]",
114 url,
115 NixHash::Sha256(*exp_nar_sha256)
116 )
117 } else {
118 write!(f, "Tarball [url: {}, exp_hash: None]", url)
119 }
120 }
121 Fetch::NAR { url, hash } => {
122 let url = redact_url(url);
123 write!(f, "NAR [url: {}, hash: {}]", &url, hash)
124 }
125 Fetch::Executable { url, hash } => {
126 let url = redact_url(url);
127 write!(f, "Executable [url: {}, hash: {}]", &url, hash)
128 }
129 Fetch::Git() => todo!(),
130 }
131 }
132}
133
134impl Fetch {
135 pub fn store_path<'a>(
139 &self,
140 name: &'a str,
141 ) -> Result<Option<StorePathRef<'a>>, BuildStorePathError> {
142 let ca_hash = match self {
143 Fetch::URL {
144 exp_hash: Some(exp_hash),
145 ..
146 } => CAHash::Flat(exp_hash.clone()),
147
148 Fetch::Tarball {
149 exp_nar_sha256: Some(exp_nar_sha256),
150 ..
151 } => CAHash::Nar(NixHash::Sha256(*exp_nar_sha256)),
152
153 Fetch::NAR { hash, .. } | Fetch::Executable { hash, .. } => {
154 CAHash::Nar(hash.to_owned())
155 }
156
157 Fetch::Git() => todo!(),
158
159 Fetch::URL { exp_hash: None, .. }
161 | Fetch::Tarball {
162 exp_nar_sha256: None,
163 ..
164 } => return Ok(None),
165 };
166
167 build_ca_path(name, &ca_hash, Vec::<String>::new(), false).map(Some)
169 }
170}
171
172pub struct Fetcher<BS, DS, PS, NS> {
174 http_client: reqwest::Client,
175 blob_service: BS,
176 directory_service: DS,
177 path_info_service: PS,
178 nar_calculation_service: NS,
179}
180
181impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
182 pub fn new(
183 blob_service: BS,
184 directory_service: DS,
185 path_info_service: PS,
186 nar_calculation_service: NS,
187 ) -> Self {
188 Self {
189 http_client: reqwest::Client::builder()
190 .user_agent(crate::USER_AGENT)
191 .build()
192 .expect("Client::new()"),
193 blob_service,
194 directory_service,
195 path_info_service,
196 nar_calculation_service,
197 }
198 }
199
200 #[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
203 async fn download(
204 &self,
205 url: Url,
206 ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
207 let span = Span::current();
208 span.pb_set_message(&format!(
209 "📡Fetching {}",
210 redact_url(&url)
212 ));
213
214 match url.scheme() {
215 "file" => {
216 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
217 FetcherError::Io(std::io::Error::new(
220 std::io::ErrorKind::Other,
221 "invalid host for file:// scheme",
222 ))
223 })?)
224 .await?;
225
226 span.pb_set_length(f.metadata().await?.len());
227 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
228 span.pb_start();
229 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
230 f,
231 move |d| {
232 span.pb_inc(d.len() as u64);
233 },
234 ))))
235 }
236 _ => {
237 let resp = self.http_client.get(url).send().await?;
238
239 if let Some(content_length) = resp.content_length() {
240 span.pb_set_length(content_length);
241 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
242 } else {
243 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
244 }
245 span.pb_start();
246
247 Ok(Box::new(tokio_util::io::StreamReader::new(
248 resp.bytes_stream()
249 .inspect_ok(move |d| {
250 span.pb_inc(d.len() as u64);
251 })
252 .map_err(|e| {
253 let e = e.without_url();
254 warn!(%e, "failed to get response body");
255 std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
256 }),
257 )))
258 }
259 }
260 }
261}
262
263async fn hash<D: Digest + std::io::Write>(
268 mut r: impl AsyncRead + Unpin,
269 mut w: impl AsyncWrite + Unpin,
270) -> std::io::Result<(Output<D>, u64)> {
271 let mut hasher = D::new();
272 let bytes_copied = tokio::io::copy(
273 &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()),
274 &mut w,
275 )
276 .await?;
277 Ok((hasher.finalize(), bytes_copied))
278}
279
280impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS>
281where
282 BS: BlobService + Clone + 'static,
283 DS: DirectoryService + Clone,
284 PS: PathInfoService,
285 NS: NarCalculationService,
286{
287 pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
292 match fetch {
293 Fetch::URL { url, exp_hash } => {
294 let mut r = self.download(url.clone()).await?;
296
297 let mut blob_writer = self.blob_service.open_write().await;
299
300 let (actual_hash, blob_size) = match exp_hash
304 .as_ref()
305 .map(NixHash::algo)
306 .unwrap_or_else(|| HashAlgo::Sha256)
307 {
308 HashAlgo::Sha256 => hash::<Sha256>(&mut r, &mut blob_writer).await.map(
309 |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written),
310 )?,
311 HashAlgo::Md5 => hash::<Md5>(&mut r, &mut blob_writer).await.map(
312 |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written),
313 )?,
314 HashAlgo::Sha1 => hash::<Sha1>(&mut r, &mut blob_writer).await.map(
315 |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written),
316 )?,
317 HashAlgo::Sha512 => hash::<Sha512>(&mut r, &mut blob_writer).await.map(
318 |(digest, bytes_written)| {
319 (NixHash::Sha512(Box::new(digest.into())), bytes_written)
320 },
321 )?,
322 };
323
324 if let Some(exp_hash) = exp_hash {
325 if exp_hash != actual_hash {
326 return Err(FetcherError::HashMismatch {
327 url,
328 wanted: exp_hash,
329 got: actual_hash,
330 });
331 }
332 }
333
334 Ok((
336 Node::File {
337 digest: blob_writer.close().await?,
338 size: blob_size,
339 executable: false,
340 },
341 CAHash::Flat(actual_hash),
342 blob_size,
343 ))
344 }
345 Fetch::Tarball {
346 url,
347 exp_nar_sha256,
348 } => {
349 let r = self.download(url.clone()).await?;
351
352 let r = DecompressedReader::new(r);
354 let archive = tokio_tar::Archive::new(r);
356
357 let node = snix_castore::import::archive::ingest_archive(
359 self.blob_service.clone(),
360 self.directory_service.clone(),
361 archive,
362 )
363 .await?;
364
365 let (nar_size, actual_nar_sha256) = self
370 .nar_calculation_service
371 .calculate_nar(&node)
372 .await
373 .map_err(|e| {
374 FetcherError::Io(e.into())
376 })?;
377
378 if let Some(exp_nar_sha256) = exp_nar_sha256 {
379 if exp_nar_sha256 != actual_nar_sha256 {
380 return Err(FetcherError::HashMismatch {
381 url,
382 wanted: NixHash::Sha256(exp_nar_sha256),
383 got: NixHash::Sha256(actual_nar_sha256),
384 });
385 }
386 }
387
388 Ok((
389 node,
390 CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
391 nar_size,
392 ))
393 }
394 Fetch::NAR {
395 url,
396 hash: exp_hash,
397 } => {
398 let r = self.download(url.clone()).await?;
400
401 let mut r = DecompressedReader::new(r);
403
404 let (root_node, _actual_nar_sha256, actual_nar_size) =
406 snix_store::nar::ingest_nar_and_hash(
407 self.blob_service.clone(),
408 self.directory_service.clone(),
409 &mut r,
410 &Some(CAHash::Nar(exp_hash.clone())),
411 )
412 .await
413 .map_err(|e| match e {
414 NarIngestionError::HashMismatch { expected, actual } => {
415 FetcherError::HashMismatch {
416 url,
417 wanted: expected,
418 got: actual,
419 }
420 }
421 _ => FetcherError::Io(std::io::Error::other(e.to_string())),
422 })?;
423 Ok((
424 root_node,
425 CAHash::Nar(exp_hash),
427 actual_nar_size,
428 ))
429 }
430 Fetch::Executable {
431 url,
432 hash: exp_hash,
433 } => {
434 let mut r = self.download(url.clone()).await?;
436
437 let mut blob_writer = self.blob_service.open_write().await;
439
440 let file_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
442 let blob_digest = blob_writer.close().await?;
443
444 let w = tokio::io::sink();
450 let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
452 HashAlgo::Md5 => Box::new(Md5::new()),
453 HashAlgo::Sha1 => Box::new(Sha1::new()),
454 HashAlgo::Sha256 => Box::new(Sha256::new()),
455 HashAlgo::Sha512 => Box::new(Sha512::new()),
456 };
457
458 let mut nar_size: u64 = 0;
459 let mut w = InspectWriter::new(w, |d| {
460 hasher.update(d);
461 nar_size += d.len() as u64;
462 });
463
464 {
465 let node = nix_compat::nar::writer::r#async::open(&mut w).await?;
466
467 let blob_reader = self
468 .blob_service
469 .open_read(&blob_digest)
470 .await?
471 .expect("Snix bug: just-uploaded blob not found");
472
473 node.file(true, file_size, &mut BufReader::new(blob_reader))
474 .await?;
475
476 w.flush().await?;
477 }
478
479 let actual_hash = {
481 match exp_hash.algo() {
482 HashAlgo::Md5 => {
483 NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
484 }
485 HashAlgo::Sha1 => {
486 NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
487 }
488 HashAlgo::Sha256 => {
489 NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
490 }
491 HashAlgo::Sha512 => {
492 NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
493 }
494 }
495 };
496
497 if exp_hash != actual_hash {
498 return Err(FetcherError::HashMismatch {
499 url,
500 wanted: exp_hash,
501 got: actual_hash,
502 });
503 }
504
505 let root_node = Node::File {
508 digest: blob_digest,
509 size: file_size,
510 executable: true,
511 };
512
513 Ok((root_node, CAHash::Nar(actual_hash), file_size))
514 }
515 Fetch::Git() => todo!(),
516 }
517 }
518
519 pub async fn ingest_and_persist<'a>(
525 &self,
526 name: &'a str,
527 fetch: Fetch,
528 ) -> Result<(StorePathRef<'a>, PathInfo), FetcherError> {
529 let (node, ca_hash, size) = self.ingest(fetch).await?;
531
532 let store_path = build_ca_path(name, &ca_hash, Vec::<String>::new(), false)?;
534
535 let (nar_size, nar_sha256) = match &ca_hash {
541 CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
542 CAHash::Nar(_) | CAHash::Flat(_) => self
543 .nar_calculation_service
544 .calculate_nar(&node)
545 .await
546 .map_err(|e| FetcherError::Io(e.into()))?,
547 CAHash::Text(_) => unreachable!("Snix bug: fetch returned CAHash::Text"),
548 };
549
550 let path_info = PathInfo {
552 store_path: store_path.to_owned(),
553 node: node.clone(),
554 references: vec![],
555 nar_size,
556 nar_sha256,
557 signatures: vec![],
558 deriver: None,
559 ca: Some(ca_hash),
560 };
561
562 self.path_info_service
563 .put(path_info.clone())
564 .await
565 .map_err(|e| FetcherError::Io(e.into()))?;
566
567 Ok((store_path, path_info))
568 }
569}
570
571pub(crate) fn url_basename(url: &Url) -> &str {
573 let s = url.path();
574 if s.is_empty() {
575 return "";
576 }
577
578 let mut last = s.len() - 1;
579 if s.chars().nth(last).unwrap() == '/' && last > 0 {
580 last -= 1;
581 }
582
583 if last == 0 {
584 return "";
585 }
586
587 let pos = match s[..=last].rfind('/') {
588 Some(pos) => {
589 if pos == last - 1 {
590 0
591 } else {
592 pos
593 }
594 }
595 None => 0,
596 };
597
598 &s[(pos + 1)..=last]
599}
600
601#[cfg(test)]
602mod tests {
603 mod fetch {
604 use super::super::*;
605 use crate::fetchers::Fetch;
606 use nix_compat::{nixbase32, nixhash::NixHash};
607 use rstest::rstest;
608
609 #[rstest]
610 #[case::url_no_hash(
611 Fetch::URL{
612 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
613 exp_hash: None,
614 },
615 None,
616 "notmuch-extract-patch"
617 )]
618 #[case::url_sha256(
619 Fetch::URL{
620 url: Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(),
621 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
622 },
623 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
624 "notmuch-extract-patch"
625 )]
626 #[case::url_custom_name(
627 Fetch::URL{
628 url: Url::parse("https://test.example/owo").unwrap(),
629 exp_hash: Some(NixHash::from_sri("sha256-Xa1Jbl2Eq5+L0ww+Ph1osA3Z/Dxe/RkN1/dITQCdXFk=").unwrap()),
630 },
631 Some(StorePathRef::from_bytes(b"06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()),
632 "notmuch-extract-patch"
633 )]
634 #[case::nar_sha256(
635 Fetch::NAR{
636 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
637 hash: NixHash::from_sri("sha256-oj6yfWKbcEerK8D9GdPJtIAOveNcsH1ztGeSARGypRA=").unwrap(),
638 },
639 Some(StorePathRef::from_bytes(b"b40vjphshq4fdgv8s3yrp0bdlafi4920-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
640 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
641 )]
642 #[case::nar_sha1(
643 Fetch::NAR{
644 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
645 hash: NixHash::from_sri("sha1-F/fMsgwkXF8fPCg1v9zPZ4yOFIA=").unwrap(),
646 },
647 Some(StorePathRef::from_bytes(b"8kx7fdkdbzs4fkfb57xq0cbhs20ymq2n-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
648 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
649 )]
650 #[case::nar_sha1(
651 Fetch::Executable{
652 url: Url::parse("https://cache.nixos.org/nar/0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap(),
653 hash: NixHash::from_sri("sha1-NKNeU1csW5YJ4lCeWH3Z/apppNU=").unwrap(),
654 },
655 Some(StorePathRef::from_bytes(b"y92hm2xfk1009hrq0ix80j4m5k4j4w21-0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz").unwrap()),
656 "0r8nqa1klm5v17ifc6z96m9wywxkjvgbnqq9pmy0sgqj53wj3n12.nar.xz"
657 )]
658 fn fetch_store_path(
659 #[case] fetch: Fetch,
660 #[case] exp_path: Option<StorePathRef>,
661 #[case] name: &str,
662 ) {
663 assert_eq!(
664 exp_path,
665 fetch.store_path(name).expect("invalid name"),
666 "unexpected calculated store path"
667 );
668 }
669
670 #[test]
671 fn fetch_tarball_store_path() {
672 let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap();
673 let exp_sha256 =
674 nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm")
675 .unwrap();
676 let fetch = Fetch::Tarball {
677 url,
678 exp_nar_sha256: Some(exp_sha256),
679 };
680
681 assert_eq!(
682 "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source",
683 &fetch.store_path("source").unwrap().unwrap().to_string(),
684 )
685 }
686 }
687
688 mod url_basename {
689 use super::super::*;
690 use rstest::rstest;
691
692 #[rstest]
693 #[case::empty_path("", "")]
694 #[case::path_on_root("/dir", "dir")]
695 #[case::relative_path("dir/foo", "foo")]
696 #[case::root_with_trailing_slash("/", "")]
697 #[case::trailing_slash("/dir/", "dir")]
698 fn test_url_basename(#[case] url_path: &str, #[case] exp_basename: &str) {
699 let mut url = Url::parse("http://localhost").expect("invalid url");
700 url.set_path(url_path);
701 assert_eq!(url_basename(&url), exp_basename);
702 }
703 }
704}