snix_store/nar/
import.rs

1use nix_compat::{
2    nar::reader::r#async as nar_reader,
3    nixhash::{CAHash, NixHash},
4};
5use sha2::Digest;
6use snix_castore::{
7    Node, PathBuf,
8    blobservice::BlobService,
9    directoryservice::DirectoryService,
10    import::{
11        IngestionEntry, IngestionError,
12        blobs::{self, ConcurrentBlobUploader},
13        ingest_entries,
14    },
15};
16use tokio::{
17    io::{AsyncBufRead, AsyncRead},
18    sync::mpsc,
19    try_join,
20};
21
22use super::hashing_reader::HashingReader;
23
24/// Represents errors that can happen during nar ingestion.
25#[derive(Debug, thiserror::Error)]
26pub enum NarIngestionError {
27    #[error("{0}")]
28    IngestionError(#[from] IngestionError<Error>),
29
30    #[error("Hash mismatch, expected: {expected}, got: {actual}.")]
31    HashMismatch { expected: NixHash, actual: NixHash },
32
33    #[error("Expected the nar to contain a single file.")]
34    TypeMismatch,
35
36    #[error("Ingestion failed: {0}")]
37    Io(#[from] std::io::Error),
38}
39
40/// Ingests the contents from a [AsyncRead] providing NAR into the snix store,
41/// interacting with a [BlobService] and [DirectoryService].
42/// Returns the castore root node, as well as the sha256 and size of the NAR
43/// contents ingested.
44pub async fn ingest_nar_and_hash<R, BS, DS>(
45    blob_service: BS,
46    directory_service: DS,
47    r: &mut R,
48    expected_cahash: &Option<CAHash>,
49) -> Result<(Node, [u8; 32], u64), NarIngestionError>
50where
51    R: AsyncRead + Unpin + Send,
52    BS: BlobService + Clone + 'static,
53    DS: DirectoryService,
54{
55    let mut nar_hash = sha2::Sha256::new();
56    let mut nar_size = 0;
57
58    // Assemble NarHash and NarSize as we read bytes.
59    let mut r = tokio_util::io::InspectReader::new(r, |b| {
60        nar_size += b.len() as u64;
61        nar_hash.update(b);
62    });
63
64    match expected_cahash {
65        Some(CAHash::Nar(expected_hash)) => {
66            // We technically don't need the Sha256 hasher as we are already computing the nar hash with the reader above,
67            // but it makes the control flow more uniform and easier to understand.
68            let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r);
69            let mut r = tokio::io::BufReader::new(&mut ca_reader);
70            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
71            let actual_hash = ca_reader.consume();
72
73            if actual_hash != *expected_hash {
74                return Err(NarIngestionError::HashMismatch {
75                    expected: expected_hash.clone(),
76                    actual: actual_hash,
77                });
78            }
79            Ok((root_node, nar_hash.finalize().into(), nar_size))
80        }
81        Some(CAHash::Flat(expected_hash)) => {
82            let mut r = tokio::io::BufReader::new(&mut r);
83            let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?;
84            match &root_node {
85                Node::File { digest, .. } => match blob_service.open_read(digest).await? {
86                    Some(blob_reader) => {
87                        let mut ca_reader =
88                            HashingReader::new_with_algo(expected_hash.algo(), blob_reader);
89                        tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?;
90                        let actual_hash = ca_reader.consume();
91
92                        if actual_hash != *expected_hash {
93                            return Err(NarIngestionError::HashMismatch {
94                                expected: expected_hash.clone(),
95                                actual: actual_hash,
96                            });
97                        }
98                        Ok((root_node, nar_hash.finalize().into(), nar_size))
99                    }
100                    None => Err(NarIngestionError::Io(std::io::Error::other(
101                        "Ingested data not found",
102                    ))),
103                },
104                _ => Err(NarIngestionError::TypeMismatch),
105            }
106        }
107        // We either got CAHash::Text, or no CAHash at all, so we just don't do any additional
108        // hash calculation/validation.
109        // FUTUREWORK: We should figure out what to do with CAHash::Text, according to nix-cpp
110        // they don't handle it either:
111        // https://github.com/NixOS/nix/blob/3e9cc78eb5e5c4f1e762e201856273809fd92e71/src/libstore/local-store.cc#L1099-L1133
112        _ => {
113            let mut r = tokio::io::BufReader::new(&mut r);
114            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
115            Ok((root_node, nar_hash.finalize().into(), nar_size))
116        }
117    }
118}
119
120/// Ingests the contents from a [AsyncRead] providing NAR into the snix store,
121/// interacting with a [BlobService] and [DirectoryService].
122/// It returns the castore root node or an error.
123pub async fn ingest_nar<R, BS, DS>(
124    blob_service: BS,
125    directory_service: DS,
126    r: &mut R,
127) -> Result<Node, IngestionError<Error>>
128where
129    R: AsyncBufRead + Unpin + Send,
130    BS: BlobService + Clone + 'static,
131    DS: DirectoryService,
132{
133    // open the NAR for reading.
134    // The NAR reader emits nodes in DFS preorder.
135    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
136
137    let (tx, rx) = mpsc::channel(1);
138    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
139
140    let produce = async move {
141        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
142
143        let res = produce_nar_inner(
144            &mut blob_uploader,
145            root_node,
146            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
147            tx.clone(),
148        )
149        .await;
150
151        if let Err(err) = blob_uploader.join().await {
152            tx.send(Err(err.into()))
153                .await
154                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
155        }
156
157        tx.send(res)
158            .await
159            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
160
161        Ok(())
162    };
163
164    let consume = ingest_entries(directory_service, rx);
165
166    let (_, node) = try_join!(produce, consume)?;
167
168    Ok(node)
169}
170
171async fn produce_nar_inner<BS>(
172    blob_uploader: &mut ConcurrentBlobUploader<BS>,
173    node: nar_reader::Node<'_, '_>,
174    path: PathBuf,
175    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
176) -> Result<IngestionEntry, Error>
177where
178    BS: BlobService + Clone + 'static,
179{
180    Ok(match node {
181        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
182        nar_reader::Node::File {
183            executable,
184            mut reader,
185        } => {
186            let size = reader.len();
187            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
188
189            IngestionEntry::Regular {
190                path,
191                size,
192                executable,
193                digest,
194            }
195        }
196        nar_reader::Node::Directory(mut dir_reader) => {
197            while let Some(entry) = dir_reader.next().await? {
198                let mut path = path.clone();
199
200                // valid NAR names are valid castore names
201                path.try_push(entry.name)
202                    .expect("Snix bug: failed to join name");
203
204                let entry = Box::pin(produce_nar_inner(
205                    blob_uploader,
206                    entry.node,
207                    path,
208                    tx.clone(),
209                ))
210                .await?;
211
212                tx.send(Ok(entry)).await.map_err(|e| {
213                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
214                })?;
215            }
216
217            IngestionEntry::Dir { path }
218        }
219    })
220}
221
222#[derive(Debug, thiserror::Error)]
223pub enum Error {
224    #[error(transparent)]
225    IO(#[from] std::io::Error),
226
227    #[error(transparent)]
228    BlobUpload(#[from] blobs::Error),
229}
230
231#[cfg(test)]
232mod test {
233    use crate::fixtures::{
234        NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
235    };
236    use crate::nar::{NarIngestionError, ingest_nar, ingest_nar_and_hash};
237    use std::io::Cursor;
238    use std::sync::Arc;
239
240    use hex_literal::hex;
241    use nix_compat::nixhash::{CAHash, NixHash};
242    use rstest::*;
243    use snix_castore::blobservice::BlobService;
244    use snix_castore::directoryservice::DirectoryService;
245    use snix_castore::fixtures::{
246        DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
247        HELLOWORLD_BLOB_DIGEST,
248    };
249    use snix_castore::{Directory, Node};
250    use tokio_stream::StreamExt;
251
252    use crate::tests::fixtures::{blob_service, directory_service};
253
254    #[rstest]
255    #[tokio::test]
256    async fn single_symlink(
257        blob_service: Arc<dyn BlobService>,
258        directory_service: Arc<dyn DirectoryService>,
259    ) {
260        let root_node = ingest_nar(
261            blob_service,
262            directory_service,
263            &mut Cursor::new(&NAR_CONTENTS_SYMLINK),
264        )
265        .await
266        .expect("must parse");
267
268        assert_eq!(
269            Node::Symlink {
270                target: "/nix/store/somewhereelse".try_into().unwrap()
271            },
272            root_node
273        );
274    }
275
276    #[rstest]
277    #[tokio::test]
278    async fn single_file(
279        blob_service: Arc<dyn BlobService>,
280        directory_service: Arc<dyn DirectoryService>,
281    ) {
282        let root_node = ingest_nar(
283            blob_service.clone(),
284            directory_service,
285            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD),
286        )
287        .await
288        .expect("must parse");
289
290        assert_eq!(
291            Node::File {
292                digest: HELLOWORLD_BLOB_DIGEST.clone(),
293                size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
294                executable: false,
295            },
296            root_node
297        );
298
299        // blobservice must contain the blob
300        assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
301    }
302
303    #[rstest]
304    #[tokio::test]
305    async fn complicated(
306        blob_service: Arc<dyn BlobService>,
307        directory_service: Arc<dyn DirectoryService>,
308    ) {
309        let root_node = ingest_nar(
310            blob_service.clone(),
311            directory_service.clone(),
312            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED),
313        )
314        .await
315        .expect("must parse");
316
317        assert_eq!(
318            Node::Directory {
319                digest: DIRECTORY_COMPLICATED.digest(),
320                size: DIRECTORY_COMPLICATED.size()
321            },
322            root_node,
323        );
324
325        // blobservice must contain the blob
326        assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
327
328        // directoryservice must contain the directories, at least with get_recursive.
329        let resp: Result<Vec<Directory>, _> = directory_service
330            .get_recursive(&DIRECTORY_COMPLICATED.digest())
331            .collect()
332            .await;
333
334        let directories = resp.unwrap();
335
336        assert_eq!(2, directories.len());
337        assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
338        assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
339    }
340
341    #[rstest]
342    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), NAR_CONTENTS_COMPLICATED.as_slice())]
343    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), NAR_CONTENTS_COMPLICATED.as_slice())]
344    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), NAR_CONTENTS_HELLOWORLD.as_slice() )]
345    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), NAR_CONTENTS_SYMLINK.as_slice())]
346    #[tokio::test]
347    async fn ingest_with_cahash_mismatch(
348        blob_service: Arc<dyn BlobService>,
349        directory_service: Arc<dyn DirectoryService>,
350        #[case] ca_hash: Option<CAHash>,
351        #[case] nar_content: &[u8],
352    ) {
353        let err = ingest_nar_and_hash(
354            blob_service.clone(),
355            directory_service.clone(),
356            &mut Cursor::new(nar_content),
357            &ca_hash,
358        )
359        .await
360        .expect_err("Ingestion should have failed");
361        assert!(
362            matches!(err, NarIngestionError::HashMismatch { .. }),
363            "CAHash should have mismatched"
364        );
365    }
366
367    #[rstest]
368    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
369    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
370    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())]
371    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
372    #[tokio::test]
373    async fn ingest_with_cahash_correct(
374        blob_service: Arc<dyn BlobService>,
375        directory_service: Arc<dyn DirectoryService>,
376        #[case] ca_hash: Option<CAHash>,
377        #[case] nar_content: &[u8],
378    ) {
379        let _ = ingest_nar_and_hash(
380            blob_service.clone(),
381            directory_service,
382            &mut Cursor::new(nar_content),
383            &ca_hash,
384        )
385        .await
386        .expect("CAHash should have matched");
387    }
388
389    #[rstest]
390    #[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
391    #[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
392    #[tokio::test]
393    async fn ingest_with_flat_non_file(
394        blob_service: Arc<dyn BlobService>,
395        directory_service: Arc<dyn DirectoryService>,
396        #[case] ca_hash: Option<CAHash>,
397        #[case] nar_content: &[u8],
398    ) {
399        let err = ingest_nar_and_hash(
400            blob_service,
401            directory_service,
402            &mut Cursor::new(nar_content),
403            &ca_hash,
404        )
405        .await
406        .expect_err("Ingestion should have failed");
407
408        assert!(
409            matches!(err, NarIngestionError::TypeMismatch),
410            "Flat cahash should only be allowed for single file nars"
411        );
412    }
413}