snix_castore/import/
mod.rs

1//! The main library function here is [ingest_entries], receiving a stream of
2//! [IngestionEntry].
3//!
4//! Specific implementations, such as ingesting from the filesystem, live in
5//! child modules.
6
7use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::HashMap;
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23/// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].
24/// On success, returns the root [Node].
25///
26/// The stream must have the following invariants:
27/// - All children entries must come before their parents.
28/// - The last entry must be the root node which must have a single path component.
29/// - Every entry should have a unique path, and only consist of normal components.
30///   This means, no windows path prefixes, absolute paths, `.` or `..`.
31/// - All referenced directories must have an associated directory entry in the stream.
32///   This means if there is a file entry for `foo/bar`, there must also be a `foo` directory
33///   entry.
34///
35/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
36/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
37/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
38///
39/// On success, returns the root node.
40#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42    directory_service: DS,
43    mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46    DS: DirectoryService,
47    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48    E: std::error::Error,
49{
50    // For a given path, this holds the [Directory] structs as they are populated.
51    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52    let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54    let root_node = loop {
55        let entry = entries
56            .next()
57            .await
58            // The last entry of the stream must have 1 path component, after which
59            // we break the loop manually.
60            .ok_or(IngestionError::UnexpectedEndOfStream)??;
61
62        let (path, node) = match entry {
63            IngestionEntry::Dir { path } => {
64                // If the entry is a directory, we traversed all its children (and
65                // populated it in `directories`).
66                // If we don't have it in directories, it's a directory without
67                // children.
68                let directory = directories
69                    .remove(&path)
70                    // In that case, it contained no children
71                    .unwrap_or_default();
72
73                let directory_size = directory.size();
74                let directory_digest = directory.digest();
75
76                // Use the directory_putter to upload the directory.
77                // If we don't have one yet (as that's the first one to upload),
78                // initialize the putter.
79                let directory_putter = maybe_directory_putter
80                    .get_or_insert_with(|| directory_service.put_multiple_start());
81
82                match directory_putter.put(directory).await {
83                    Ok(()) => (
84                        path,
85                        Node::Directory {
86                            digest: directory_digest,
87                            size: directory_size,
88                        },
89                    ),
90                    Err(e) => {
91                        return Err(IngestionError::UploadDirectoryError(path, e));
92                    }
93                }
94            }
95            IngestionEntry::Symlink { path, target } => {
96                let target: crate::SymlinkTarget = bytes::Bytes::from(target)
97                    .try_into()
98                    .map_err(|err| IngestionError::InvalidSymlinkTarget(path.clone(), err))?;
99                (path, Node::Symlink { target })
100            }
101            IngestionEntry::Regular {
102                path,
103                size,
104                executable,
105                digest,
106            } => (
107                path,
108                Node::File {
109                    digest,
110                    size,
111                    executable,
112                },
113            ),
114        };
115
116        let parent = path.parent().expect("Snix bug: got entry with root node");
117
118        if parent == crate::Path::ROOT {
119            break node;
120        } else {
121            let name = path
122                .file_name()
123                // If this is the root node, it will have an empty name.
124                .unwrap_or_else(|| "".try_into().unwrap())
125                .to_owned();
126
127            // record node in parent directory, creating a new [Directory] if not there yet.
128            directories
129                .entry_ref(parent)
130                .or_default()
131                .add(name, node)
132                .map_err(|e| IngestionError::UploadDirectoryError(path, Box::new(e)))?;
133        }
134    };
135
136    assert!(
137        entries.count().await == 0,
138        "Snix bug: left over elements in the stream"
139    );
140
141    assert!(
142        directories.is_empty(),
143        "Snix bug: left over directories after processing ingestion stream"
144    );
145
146    // if there were directories uploaded, make sure we flush the putter, so
147    // they're all persisted to the backend.
148    if let Some(mut directory_putter) = maybe_directory_putter {
149        #[cfg_attr(not(debug_assertions), allow(unused))]
150        let root_directory_digest = directory_putter
151            .close()
152            .await
153            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
154
155        #[cfg(debug_assertions)]
156        {
157            if let Node::Directory { digest, .. } = &root_node {
158                debug_assert_eq!(&root_directory_digest, digest);
159            } else {
160                unreachable!("Snix bug: directory putter initialized but no root directory node");
161            }
162        }
163    };
164
165    Ok(root_node)
166}
167
168#[derive(Debug, Clone, Eq, PartialEq)]
169pub enum IngestionEntry {
170    Regular {
171        path: PathBuf,
172        size: u64,
173        executable: bool,
174        digest: B3Digest,
175    },
176    Symlink {
177        path: PathBuf,
178        target: Vec<u8>,
179    },
180    Dir {
181        path: PathBuf,
182    },
183}
184
185impl IngestionEntry {
186    fn path(&self) -> &Path {
187        match self {
188            IngestionEntry::Regular { path, .. } => path,
189            IngestionEntry::Symlink { path, .. } => path,
190            IngestionEntry::Dir { path } => path,
191        }
192    }
193
194    fn is_dir(&self) -> bool {
195        matches!(self, IngestionEntry::Dir { .. })
196    }
197}
198
199#[cfg(test)]
200mod test {
201    use rstest::rstest;
202
203    use crate::fixtures::DUMMY_DIGEST;
204    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
205    use crate::utils::gen_test_directory_service;
206    use crate::{Directory, Node};
207
208    use super::IngestionEntry;
209    use super::ingest_entries;
210
211    #[rstest]
212    #[case::single_file(vec![IngestionEntry::Regular {
213        path: "foo".parse().unwrap(),
214        size: 42,
215        executable: true,
216        digest: *DUMMY_DIGEST,
217    }],
218        Node::File{digest: *DUMMY_DIGEST, size: 42, executable: true}
219    )]
220    #[case::single_symlink(vec![IngestionEntry::Symlink {
221        path: "foo".parse().unwrap(),
222        target: b"blub".into(),
223    }],
224        Node::Symlink{target: "blub".try_into().unwrap()}
225    )]
226    #[case::single_dir(vec![IngestionEntry::Dir {
227        path: "foo".parse().unwrap(),
228    }],
229        Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
230    )]
231    #[case::dir_with_keep(vec![
232        IngestionEntry::Regular {
233            path: "foo/.keep".parse().unwrap(),
234            size: 0,
235            executable: false,
236            digest: *EMPTY_BLOB_DIGEST,
237        },
238        IngestionEntry::Dir {
239            path: "foo".parse().unwrap(),
240        },
241    ],
242        Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
243    )]
244    /// This is intentionally a bit unsorted, though it still satisfies all
245    /// requirements we have on the order of elements in the stream.
246    #[case::directory_complicated(vec![
247        IngestionEntry::Regular {
248            path: "blub/.keep".parse().unwrap(),
249            size: 0,
250            executable: false,
251            digest: *EMPTY_BLOB_DIGEST,
252        },
253        IngestionEntry::Regular {
254            path: "blub/keep/.keep".parse().unwrap(),
255            size: 0,
256            executable: false,
257            digest: *EMPTY_BLOB_DIGEST,
258        },
259        IngestionEntry::Dir {
260            path: "blub/keep".parse().unwrap(),
261        },
262        IngestionEntry::Symlink {
263            path: "blub/aa".parse().unwrap(),
264            target: b"/nix/store/somewhereelse".into(),
265        },
266        IngestionEntry::Dir {
267            path: "blub".parse().unwrap(),
268        },
269    ],
270    Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
271    )]
272    #[tokio::test]
273    async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
274        let root_node = ingest_entries(
275            gen_test_directory_service(),
276            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
277        )
278        .await
279        .expect("must succeed");
280
281        assert_eq!(exp_root_node, root_node, "root node should match");
282    }
283
284    #[rstest]
285    #[case::empty_entries(vec![])]
286    #[case::missing_intermediate_dir(vec![
287        IngestionEntry::Regular {
288            path: "blub/.keep".parse().unwrap(),
289            size: 0,
290            executable: false,
291            digest: *EMPTY_BLOB_DIGEST,
292        },
293    ])]
294    #[tokio::test]
295    async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
296        use crate::import::IngestionError;
297
298        let result = ingest_entries(
299            gen_test_directory_service(),
300            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
301        )
302        .await;
303        assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
304    }
305
306    #[rstest]
307    #[should_panic]
308    #[case::leaf_after_parent(vec![
309        IngestionEntry::Dir {
310            path: "blub".parse().unwrap(),
311        },
312        IngestionEntry::Regular {
313            path: "blub/.keep".parse().unwrap(),
314            size: 0,
315            executable: false,
316            digest: *EMPTY_BLOB_DIGEST,
317        },
318    ])]
319    #[should_panic]
320    #[case::root_in_entry(vec![
321        IngestionEntry::Regular {
322            path: ".keep".parse().unwrap(),
323            size: 0,
324            executable: false,
325            digest: *EMPTY_BLOB_DIGEST,
326        },
327        IngestionEntry::Dir {
328            path: "".parse().unwrap(),
329        },
330    ])]
331    #[tokio::test]
332    async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
333        let _ = ingest_entries(
334            gen_test_directory_service(),
335            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
336        )
337        .await;
338    }
339}