1use nix_compat::{
2 nar::reader::r#async as nar_reader,
3 nixhash::{CAHash, NixHash},
4};
5use sha2::Digest;
6use snix_castore::{
7 Node, PathBuf,
8 blobservice::BlobService,
9 directoryservice::DirectoryService,
10 import::{
11 IngestionEntry, IngestionError,
12 blobs::{self, ConcurrentBlobUploader},
13 ingest_entries,
14 },
15};
16use tokio::{
17 io::{AsyncBufRead, AsyncRead},
18 sync::mpsc,
19 try_join,
20};
21
22use super::hashing_reader::HashingReader;
23
24#[derive(Debug, thiserror::Error)]
26pub enum NarIngestionError {
27 #[error("{0}")]
28 IngestionError(#[from] IngestionError<Error>),
29
30 #[error("Hash mismatch, expected: {expected}, got: {actual}.")]
31 HashMismatch { expected: NixHash, actual: NixHash },
32
33 #[error("Expected the nar to contain a single file.")]
34 TypeMismatch,
35
36 #[error("Ingestion failed: {0}")]
37 Io(#[from] std::io::Error),
38}
39
40pub async fn ingest_nar_and_hash<R, BS, DS>(
45 blob_service: BS,
46 directory_service: DS,
47 r: &mut R,
48 expected_cahash: &Option<CAHash>,
49) -> Result<(Node, [u8; 32], u64), NarIngestionError>
50where
51 R: AsyncRead + Unpin + Send,
52 BS: BlobService + Clone + 'static,
53 DS: DirectoryService,
54{
55 let mut nar_hash = sha2::Sha256::new();
56 let mut nar_size = 0;
57
58 let mut r = tokio_util::io::InspectReader::new(r, |b| {
60 nar_size += b.len() as u64;
61 nar_hash.update(b);
62 });
63
64 match expected_cahash {
65 Some(CAHash::Nar(expected_hash)) => {
66 let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r);
69 let mut r = tokio::io::BufReader::new(&mut ca_reader);
70 let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
71 let actual_hash = ca_reader.consume();
72
73 if actual_hash != *expected_hash {
74 return Err(NarIngestionError::HashMismatch {
75 expected: expected_hash.clone(),
76 actual: actual_hash,
77 });
78 }
79 Ok((root_node, nar_hash.finalize().into(), nar_size))
80 }
81 Some(CAHash::Flat(expected_hash)) => {
82 let mut r = tokio::io::BufReader::new(&mut r);
83 let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?;
84 match &root_node {
85 Node::File { digest, .. } => match blob_service.open_read(digest).await? {
86 Some(blob_reader) => {
87 let mut ca_reader =
88 HashingReader::new_with_algo(expected_hash.algo(), blob_reader);
89 tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?;
90 let actual_hash = ca_reader.consume();
91
92 if actual_hash != *expected_hash {
93 return Err(NarIngestionError::HashMismatch {
94 expected: expected_hash.clone(),
95 actual: actual_hash,
96 });
97 }
98 Ok((root_node, nar_hash.finalize().into(), nar_size))
99 }
100 None => Err(NarIngestionError::Io(std::io::Error::other(
101 "Ingested data not found",
102 ))),
103 },
104 _ => Err(NarIngestionError::TypeMismatch),
105 }
106 }
107 _ => {
113 let mut r = tokio::io::BufReader::new(&mut r);
114 let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
115 Ok((root_node, nar_hash.finalize().into(), nar_size))
116 }
117 }
118}
119
120pub async fn ingest_nar<R, BS, DS>(
124 blob_service: BS,
125 directory_service: DS,
126 r: &mut R,
127) -> Result<Node, IngestionError<Error>>
128where
129 R: AsyncBufRead + Unpin + Send,
130 BS: BlobService + Clone + 'static,
131 DS: DirectoryService,
132{
133 let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
136
137 let (tx, rx) = mpsc::channel(1);
138 let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
139
140 let produce = async move {
141 let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
142
143 let res = produce_nar_inner(
144 &mut blob_uploader,
145 root_node,
146 "root".parse().unwrap(), tx.clone(),
148 )
149 .await;
150
151 if let Err(err) = blob_uploader.join().await {
152 tx.send(Err(err.into()))
153 .await
154 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
155 }
156
157 tx.send(res)
158 .await
159 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
160
161 Ok(())
162 };
163
164 let consume = ingest_entries(directory_service, rx);
165
166 let (_, node) = try_join!(produce, consume)?;
167
168 Ok(node)
169}
170
171async fn produce_nar_inner<BS>(
172 blob_uploader: &mut ConcurrentBlobUploader<BS>,
173 node: nar_reader::Node<'_, '_>,
174 path: PathBuf,
175 tx: mpsc::Sender<Result<IngestionEntry, Error>>,
176) -> Result<IngestionEntry, Error>
177where
178 BS: BlobService + Clone + 'static,
179{
180 Ok(match node {
181 nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
182 nar_reader::Node::File {
183 executable,
184 mut reader,
185 } => {
186 let size = reader.len();
187 let digest = blob_uploader.upload(&path, size, &mut reader).await?;
188
189 IngestionEntry::Regular {
190 path,
191 size,
192 executable,
193 digest,
194 }
195 }
196 nar_reader::Node::Directory(mut dir_reader) => {
197 while let Some(entry) = dir_reader.next().await? {
198 let mut path = path.clone();
199
200 path.try_push(entry.name)
202 .expect("Snix bug: failed to join name");
203
204 let entry = Box::pin(produce_nar_inner(
205 blob_uploader,
206 entry.node,
207 path,
208 tx.clone(),
209 ))
210 .await?;
211
212 tx.send(Ok(entry)).await.map_err(|e| {
213 Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
214 })?;
215 }
216
217 IngestionEntry::Dir { path }
218 }
219 })
220}
221
222#[derive(Debug, thiserror::Error)]
223pub enum Error {
224 #[error(transparent)]
225 IO(#[from] std::io::Error),
226
227 #[error(transparent)]
228 BlobUpload(#[from] blobs::Error),
229}
230
231#[cfg(test)]
232mod test {
233 use crate::fixtures::{
234 NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
235 };
236 use crate::nar::{NarIngestionError, ingest_nar, ingest_nar_and_hash};
237 use std::io::Cursor;
238 use std::sync::Arc;
239
240 use hex_literal::hex;
241 use nix_compat::nixhash::{CAHash, NixHash};
242 use rstest::*;
243 use snix_castore::blobservice::BlobService;
244 use snix_castore::directoryservice::DirectoryService;
245 use snix_castore::fixtures::{
246 DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
247 HELLOWORLD_BLOB_DIGEST,
248 };
249 use snix_castore::{Directory, Node};
250 use tokio_stream::StreamExt;
251
252 use crate::tests::fixtures::{blob_service, directory_service};
253
254 #[rstest]
255 #[tokio::test]
256 async fn single_symlink(
257 blob_service: Arc<dyn BlobService>,
258 directory_service: Arc<dyn DirectoryService>,
259 ) {
260 let root_node = ingest_nar(
261 blob_service,
262 directory_service,
263 &mut Cursor::new(&NAR_CONTENTS_SYMLINK),
264 )
265 .await
266 .expect("must parse");
267
268 assert_eq!(
269 Node::Symlink {
270 target: "/nix/store/somewhereelse".try_into().unwrap()
271 },
272 root_node
273 );
274 }
275
276 #[rstest]
277 #[tokio::test]
278 async fn single_file(
279 blob_service: Arc<dyn BlobService>,
280 directory_service: Arc<dyn DirectoryService>,
281 ) {
282 let root_node = ingest_nar(
283 blob_service.clone(),
284 directory_service,
285 &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD),
286 )
287 .await
288 .expect("must parse");
289
290 assert_eq!(
291 Node::File {
292 digest: HELLOWORLD_BLOB_DIGEST.clone(),
293 size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
294 executable: false,
295 },
296 root_node
297 );
298
299 assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
301 }
302
303 #[rstest]
304 #[tokio::test]
305 async fn complicated(
306 blob_service: Arc<dyn BlobService>,
307 directory_service: Arc<dyn DirectoryService>,
308 ) {
309 let root_node = ingest_nar(
310 blob_service.clone(),
311 directory_service.clone(),
312 &mut Cursor::new(&NAR_CONTENTS_COMPLICATED),
313 )
314 .await
315 .expect("must parse");
316
317 assert_eq!(
318 Node::Directory {
319 digest: DIRECTORY_COMPLICATED.digest(),
320 size: DIRECTORY_COMPLICATED.size()
321 },
322 root_node,
323 );
324
325 assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
327
328 let resp: Result<Vec<Directory>, _> = directory_service
330 .get_recursive(&DIRECTORY_COMPLICATED.digest())
331 .collect()
332 .await;
333
334 let directories = resp.unwrap();
335
336 assert_eq!(2, directories.len());
337 assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
338 assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
339 }
340
341 #[rstest]
342 #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), NAR_CONTENTS_COMPLICATED.as_slice())]
343 #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), NAR_CONTENTS_COMPLICATED.as_slice())]
344 #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), NAR_CONTENTS_HELLOWORLD.as_slice() )]
345 #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), NAR_CONTENTS_SYMLINK.as_slice())]
346 #[tokio::test]
347 async fn ingest_with_cahash_mismatch(
348 blob_service: Arc<dyn BlobService>,
349 directory_service: Arc<dyn DirectoryService>,
350 #[case] ca_hash: Option<CAHash>,
351 #[case] nar_content: &[u8],
352 ) {
353 let err = ingest_nar_and_hash(
354 blob_service.clone(),
355 directory_service.clone(),
356 &mut Cursor::new(nar_content),
357 &ca_hash,
358 )
359 .await
360 .expect_err("Ingestion should have failed");
361 assert!(
362 matches!(err, NarIngestionError::HashMismatch { .. }),
363 "CAHash should have mismatched"
364 );
365 }
366
367 #[rstest]
368 #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
369 #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
370 #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())]
371 #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
372 #[tokio::test]
373 async fn ingest_with_cahash_correct(
374 blob_service: Arc<dyn BlobService>,
375 directory_service: Arc<dyn DirectoryService>,
376 #[case] ca_hash: Option<CAHash>,
377 #[case] nar_content: &[u8],
378 ) {
379 let _ = ingest_nar_and_hash(
380 blob_service.clone(),
381 directory_service,
382 &mut Cursor::new(nar_content),
383 &ca_hash,
384 )
385 .await
386 .expect("CAHash should have matched");
387 }
388
389 #[rstest]
390 #[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
391 #[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
392 #[tokio::test]
393 async fn ingest_with_flat_non_file(
394 blob_service: Arc<dyn BlobService>,
395 directory_service: Arc<dyn DirectoryService>,
396 #[case] ca_hash: Option<CAHash>,
397 #[case] nar_content: &[u8],
398 ) {
399 let err = ingest_nar_and_hash(
400 blob_service,
401 directory_service,
402 &mut Cursor::new(nar_content),
403 &ca_hash,
404 )
405 .await
406 .expect_err("Ingestion should have failed");
407
408 assert!(
409 matches!(err, NarIngestionError::TypeMismatch),
410 "Flat cahash should only be allowed for single file nars"
411 );
412 }
413}