snix_castore/import/
mod.rs

1//! The main library function here is [ingest_entries], receiving a stream of
2//! [IngestionEntry].
3//!
4//! Specific implementations, such as ingesting from the filesystem, live in
5//! child modules.
6
7use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::{HashMap, HashSet, hash_set};
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23/// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].
24/// On success, returns the root [Node].
25///
26/// The stream must have the following invariants:
27/// - All children entries must come before their parents.
28/// - The last entry must be the root node which must have a single path component.
29/// - Every entry should have a unique path, and only consist of normal components.
30///   This means, no windows path prefixes, absolute paths, `.` or `..`.
31/// - All referenced directories must have an associated directory entry in the stream.
32///   This means if there is a file entry for `foo/bar`, there must also be a `foo` directory
33///   entry.
34///
35/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
36/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
37/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
38///
39/// On success, returns the root node.
40#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42    directory_service: DS,
43    mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46    DS: DirectoryService,
47    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48    E: std::error::Error,
49{
50    // For a given path, this holds the [Directory] structs as they are populated.
51    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52    let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54    // Directory digests we already sent out. When ingesting a tree containing two identical subtrees,
55    // we don't want to send these directories twice.
56    let mut sent_directories: HashSet<B3Digest> = HashSet::new();
57
58    let root_node = loop {
59        let entry = entries
60            .next()
61            .await
62            // The last entry of the stream must have 1 path component, after which
63            // we break the loop manually.
64            .ok_or(IngestionError::UnexpectedEndOfStream)??;
65
66        let (path, node) = match entry {
67            IngestionEntry::Dir { path } => {
68                // If the entry is a directory, we traversed all its children (and
69                // populated it in `directories`).
70                // If we don't have it in directories, it's a directory without
71                // children.
72                let directory = directories
73                    .remove(&path)
74                    // In that case, it contained no children
75                    .unwrap_or_default();
76
77                let directory_size = directory.size();
78                let directory_digest = directory.digest();
79
80                // Get a directory putter, or create a new one if this is the first directory uploaded.
81                let directory_putter = maybe_directory_putter
82                    .get_or_insert_with(|| directory_service.put_multiple_start());
83
84                // Use the directory_putter to upload the directory, if we didn't upload it yet.
85                if let hash_set::Entry::Vacant(vacant_entry) =
86                    sent_directories.entry(directory_digest)
87                {
88                    // upload, ...
89                    if let Err(e) = directory_putter.put(directory).await {
90                        return Err(IngestionError::UploadDirectoryError(path, e));
91                    }
92                    // and mark in sent_directories.
93                    vacant_entry.insert();
94                }
95
96                // return the Node::Directory, so it can be used in its parents.
97                (
98                    path,
99                    Node::Directory {
100                        digest: directory_digest,
101                        size: directory_size,
102                    },
103                )
104            }
105            IngestionEntry::Symlink { path, target } => {
106                let target: crate::SymlinkTarget = bytes::Bytes::from(target)
107                    .try_into()
108                    .map_err(|err| IngestionError::InvalidSymlinkTarget(path.clone(), err))?;
109                (path, Node::Symlink { target })
110            }
111            IngestionEntry::Regular {
112                path,
113                size,
114                executable,
115                digest,
116            } => (
117                path,
118                Node::File {
119                    digest,
120                    size,
121                    executable,
122                },
123            ),
124        };
125
126        let parent = path.parent().expect("Snix bug: got entry with root node");
127
128        if parent == crate::Path::ROOT {
129            break node;
130        } else {
131            let name = path
132                .file_name()
133                // If this is the root node, it will have an empty name.
134                .unwrap_or_else(|| "".try_into().unwrap())
135                .to_owned();
136
137            // record node in parent directory, creating a new [Directory] if not there yet.
138            directories
139                .entry_ref(parent)
140                .or_default()
141                .add(name, node)
142                .map_err(|e| IngestionError::UploadDirectoryError(path, Box::new(e)))?;
143        }
144    };
145
146    assert!(
147        entries.count().await == 0,
148        "Snix bug: left over elements in the stream"
149    );
150
151    assert!(
152        directories.is_empty(),
153        "Snix bug: left over directories after processing ingestion stream"
154    );
155
156    // if there were directories uploaded, make sure we flush the putter, so
157    // they're all persisted to the backend.
158    if let Some(mut directory_putter) = maybe_directory_putter {
159        #[cfg_attr(not(debug_assertions), allow(unused))]
160        let root_directory_digest = directory_putter
161            .close()
162            .await
163            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
164
165        #[cfg(debug_assertions)]
166        {
167            if let Node::Directory { digest, .. } = &root_node {
168                debug_assert_eq!(&root_directory_digest, digest);
169            } else {
170                unreachable!("Snix bug: directory putter initialized but no root directory node");
171            }
172        }
173    };
174
175    Ok(root_node)
176}
177
178#[derive(Debug, Clone, Eq, PartialEq)]
179pub enum IngestionEntry {
180    Regular {
181        path: PathBuf,
182        size: u64,
183        executable: bool,
184        digest: B3Digest,
185    },
186    Symlink {
187        path: PathBuf,
188        target: Vec<u8>,
189    },
190    Dir {
191        path: PathBuf,
192    },
193}
194
195impl IngestionEntry {
196    fn path(&self) -> &Path {
197        match self {
198            IngestionEntry::Regular { path, .. } => path,
199            IngestionEntry::Symlink { path, .. } => path,
200            IngestionEntry::Dir { path } => path,
201        }
202    }
203
204    fn is_dir(&self) -> bool {
205        matches!(self, IngestionEntry::Dir { .. })
206    }
207}
208
209#[cfg(test)]
210mod test {
211    use rstest::rstest;
212
213    use crate::fixtures::DUMMY_DIGEST;
214    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
215    use crate::utils::gen_test_directory_service;
216    use crate::{Directory, Node};
217
218    use super::IngestionEntry;
219    use super::ingest_entries;
220
221    #[rstest]
222    #[case::single_file(vec![IngestionEntry::Regular {
223        path: "foo".parse().unwrap(),
224        size: 42,
225        executable: true,
226        digest: *DUMMY_DIGEST,
227    }],
228        Node::File{digest: *DUMMY_DIGEST, size: 42, executable: true}
229    )]
230    #[case::single_symlink(vec![IngestionEntry::Symlink {
231        path: "foo".parse().unwrap(),
232        target: b"blub".into(),
233    }],
234        Node::Symlink{target: "blub".try_into().unwrap()}
235    )]
236    #[case::single_dir(vec![IngestionEntry::Dir {
237        path: "foo".parse().unwrap(),
238    }],
239        Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
240    )]
241    #[case::dir_with_keep(vec![
242        IngestionEntry::Regular {
243            path: "foo/.keep".parse().unwrap(),
244            size: 0,
245            executable: false,
246            digest: *EMPTY_BLOB_DIGEST,
247        },
248        IngestionEntry::Dir {
249            path: "foo".parse().unwrap(),
250        },
251    ],
252        Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
253    )]
254    /// This is intentionally a bit unsorted, though it still satisfies all
255    /// requirements we have on the order of elements in the stream.
256    #[case::directory_complicated(vec![
257        IngestionEntry::Regular {
258            path: "blub/.keep".parse().unwrap(),
259            size: 0,
260            executable: false,
261            digest: *EMPTY_BLOB_DIGEST,
262        },
263        IngestionEntry::Regular {
264            path: "blub/keep/.keep".parse().unwrap(),
265            size: 0,
266            executable: false,
267            digest: *EMPTY_BLOB_DIGEST,
268        },
269        IngestionEntry::Dir {
270            path: "blub/keep".parse().unwrap(),
271        },
272        IngestionEntry::Symlink {
273            path: "blub/aa".parse().unwrap(),
274            target: b"/nix/store/somewhereelse".into(),
275        },
276        IngestionEntry::Dir {
277            path: "blub".parse().unwrap(),
278        },
279    ],
280    Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
281    )]
282    #[tokio::test]
283    async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
284        let root_node = ingest_entries(
285            gen_test_directory_service(),
286            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
287        )
288        .await
289        .expect("must succeed");
290
291        assert_eq!(exp_root_node, root_node, "root node should match");
292    }
293
294    #[rstest]
295    #[case::empty_entries(vec![])]
296    #[case::missing_intermediate_dir(vec![
297        IngestionEntry::Regular {
298            path: "blub/.keep".parse().unwrap(),
299            size: 0,
300            executable: false,
301            digest: *EMPTY_BLOB_DIGEST,
302        },
303    ])]
304    #[tokio::test]
305    async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
306        use crate::import::IngestionError;
307
308        let result = ingest_entries(
309            gen_test_directory_service(),
310            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
311        )
312        .await;
313        assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
314    }
315
316    #[rstest]
317    #[should_panic]
318    #[case::leaf_after_parent(vec![
319        IngestionEntry::Dir {
320            path: "blub".parse().unwrap(),
321        },
322        IngestionEntry::Regular {
323            path: "blub/.keep".parse().unwrap(),
324            size: 0,
325            executable: false,
326            digest: *EMPTY_BLOB_DIGEST,
327        },
328    ])]
329    #[should_panic]
330    #[case::root_in_entry(vec![
331        IngestionEntry::Regular {
332            path: ".keep".parse().unwrap(),
333            size: 0,
334            executable: false,
335            digest: *EMPTY_BLOB_DIGEST,
336        },
337        IngestionEntry::Dir {
338            path: "".parse().unwrap(),
339        },
340    ])]
341    #[tokio::test]
342    async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
343        let _ = ingest_entries(
344            gen_test_directory_service(),
345            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
346        )
347        .await;
348    }
349}