snix_castore/import/
mod.rs

1//! The main library function here is [ingest_entries], receiving a stream of
2//! [IngestionEntry].
3//!
4//! Specific implementations, such as ingesting from the filesystem, live in
5//! child modules.
6
7use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::HashMap;
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23/// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].
24/// On success, returns the root [Node].
25///
26/// The stream must have the following invariants:
27/// - All children entries must come before their parents.
28/// - The last entry must be the root node which must have a single path component.
29/// - Every entry should have a unique path, and only consist of normal components.
30///   This means, no windows path prefixes, absolute paths, `.` or `..`.
31/// - All referenced directories must have an associated directory entry in the stream.
32///   This means if there is a file entry for `foo/bar`, there must also be a `foo` directory
33///   entry.
34///
35/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
36/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
37/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
38///
39/// On success, returns the root node.
40#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42    directory_service: DS,
43    mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46    DS: DirectoryService,
47    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48    E: std::error::Error,
49{
50    // For a given path, this holds the [Directory] structs as they are populated.
51    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52    let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54    let root_node = loop {
55        let entry = entries
56            .next()
57            .await
58            // The last entry of the stream must have 1 path component, after which
59            // we break the loop manually.
60            .ok_or(IngestionError::UnexpectedEndOfStream)??;
61
62        let (path, node) = match entry {
63            IngestionEntry::Dir { path } => {
64                // If the entry is a directory, we traversed all its children (and
65                // populated it in `directories`).
66                // If we don't have it in directories, it's a directory without
67                // children.
68                let directory = directories
69                    .remove(&path)
70                    // In that case, it contained no children
71                    .unwrap_or_default();
72
73                let directory_size = directory.size();
74                let directory_digest = directory.digest();
75
76                // Use the directory_putter to upload the directory.
77                // If we don't have one yet (as that's the first one to upload),
78                // initialize the putter.
79                let directory_putter = maybe_directory_putter
80                    .get_or_insert_with(|| directory_service.put_multiple_start());
81
82                match directory_putter.put(directory).await {
83                    Ok(()) => (
84                        path,
85                        Node::Directory {
86                            digest: directory_digest,
87                            size: directory_size,
88                        },
89                    ),
90                    Err(e) => {
91                        return Err(IngestionError::UploadDirectoryError(path, e));
92                    }
93                }
94            }
95            IngestionEntry::Symlink { path, target } => {
96                match bytes::Bytes::from(target).try_into() {
97                    Ok(target) => (path, Node::Symlink { target }),
98                    Err(e) => {
99                        return Err(IngestionError::UploadDirectoryError(
100                            path,
101                            crate::Error::StorageError(format!("invalid symlink target: {}", e)),
102                        ));
103                    }
104                }
105            }
106            IngestionEntry::Regular {
107                path,
108                size,
109                executable,
110                digest,
111            } => (
112                path,
113                Node::File {
114                    digest: digest.clone(),
115                    size,
116                    executable,
117                },
118            ),
119        };
120
121        let parent = path.parent().expect("Snix bug: got entry with root node");
122
123        if parent == crate::Path::ROOT {
124            break node;
125        } else {
126            let name = path
127                .file_name()
128                // If this is the root node, it will have an empty name.
129                .unwrap_or_else(|| "".try_into().unwrap())
130                .to_owned();
131
132            // record node in parent directory, creating a new [Directory] if not there yet.
133            directories
134                .entry_ref(parent)
135                .or_default()
136                .add(name, node)
137                .map_err(|e| {
138                    IngestionError::UploadDirectoryError(
139                        path,
140                        crate::Error::StorageError(e.to_string()),
141                    )
142                })?;
143        }
144    };
145
146    assert!(
147        entries.count().await == 0,
148        "Snix bug: left over elements in the stream"
149    );
150
151    assert!(
152        directories.is_empty(),
153        "Snix bug: left over directories after processing ingestion stream"
154    );
155
156    // if there were directories uploaded, make sure we flush the putter, so
157    // they're all persisted to the backend.
158    if let Some(mut directory_putter) = maybe_directory_putter {
159        #[cfg_attr(not(debug_assertions), allow(unused))]
160        let root_directory_digest = directory_putter
161            .close()
162            .await
163            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
164
165        #[cfg(debug_assertions)]
166        {
167            if let Node::Directory { digest, .. } = &root_node {
168                debug_assert_eq!(&root_directory_digest, digest);
169            } else {
170                unreachable!("Snix bug: directory putter initialized but no root directory node");
171            }
172        }
173    };
174
175    Ok(root_node)
176}
177
178#[derive(Debug, Clone, Eq, PartialEq)]
179pub enum IngestionEntry {
180    Regular {
181        path: PathBuf,
182        size: u64,
183        executable: bool,
184        digest: B3Digest,
185    },
186    Symlink {
187        path: PathBuf,
188        target: Vec<u8>,
189    },
190    Dir {
191        path: PathBuf,
192    },
193}
194
195impl IngestionEntry {
196    fn path(&self) -> &Path {
197        match self {
198            IngestionEntry::Regular { path, .. } => path,
199            IngestionEntry::Symlink { path, .. } => path,
200            IngestionEntry::Dir { path } => path,
201        }
202    }
203
204    fn is_dir(&self) -> bool {
205        matches!(self, IngestionEntry::Dir { .. })
206    }
207}
208
209#[cfg(test)]
210mod test {
211    use rstest::rstest;
212
213    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
214    use crate::{Directory, Node};
215    use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST};
216
217    use super::IngestionEntry;
218    use super::ingest_entries;
219
220    #[rstest]
221    #[case::single_file(vec![IngestionEntry::Regular {
222        path: "foo".parse().unwrap(),
223        size: 42,
224        executable: true,
225        digest: DUMMY_DIGEST.clone(),
226    }],
227        Node::File{digest: DUMMY_DIGEST.clone(), size: 42, executable: true}
228    )]
229    #[case::single_symlink(vec![IngestionEntry::Symlink {
230        path: "foo".parse().unwrap(),
231        target: b"blub".into(),
232    }],
233        Node::Symlink{target: "blub".try_into().unwrap()}
234    )]
235    #[case::single_dir(vec![IngestionEntry::Dir {
236        path: "foo".parse().unwrap(),
237    }],
238        Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
239    )]
240    #[case::dir_with_keep(vec![
241        IngestionEntry::Regular {
242            path: "foo/.keep".parse().unwrap(),
243            size: 0,
244            executable: false,
245            digest: EMPTY_BLOB_DIGEST.clone(),
246        },
247        IngestionEntry::Dir {
248            path: "foo".parse().unwrap(),
249        },
250    ],
251        Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
252    )]
253    /// This is intentionally a bit unsorted, though it still satisfies all
254    /// requirements we have on the order of elements in the stream.
255    #[case::directory_complicated(vec![
256        IngestionEntry::Regular {
257            path: "blub/.keep".parse().unwrap(),
258            size: 0,
259            executable: false,
260            digest: EMPTY_BLOB_DIGEST.clone(),
261        },
262        IngestionEntry::Regular {
263            path: "blub/keep/.keep".parse().unwrap(),
264            size: 0,
265            executable: false,
266            digest: EMPTY_BLOB_DIGEST.clone(),
267        },
268        IngestionEntry::Dir {
269            path: "blub/keep".parse().unwrap(),
270        },
271        IngestionEntry::Symlink {
272            path: "blub/aa".parse().unwrap(),
273            target: b"/nix/store/somewhereelse".into(),
274        },
275        IngestionEntry::Dir {
276            path: "blub".parse().unwrap(),
277        },
278    ],
279    Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
280    )]
281    #[tokio::test]
282    async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
283        let directory_service = MemoryDirectoryService::default();
284
285        let root_node = ingest_entries(
286            directory_service.clone(),
287            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
288        )
289        .await
290        .expect("must succeed");
291
292        assert_eq!(exp_root_node, root_node, "root node should match");
293    }
294
295    #[rstest]
296    #[case::empty_entries(vec![])]
297    #[case::missing_intermediate_dir(vec![
298        IngestionEntry::Regular {
299            path: "blub/.keep".parse().unwrap(),
300            size: 0,
301            executable: false,
302            digest: EMPTY_BLOB_DIGEST.clone(),
303        },
304    ])]
305    #[tokio::test]
306    async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
307        use crate::import::IngestionError;
308
309        let directory_service = MemoryDirectoryService::default();
310
311        let result = ingest_entries(
312            directory_service.clone(),
313            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
314        )
315        .await;
316        assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
317    }
318
319    #[rstest]
320    #[should_panic]
321    #[case::leaf_after_parent(vec![
322        IngestionEntry::Dir {
323            path: "blub".parse().unwrap(),
324        },
325        IngestionEntry::Regular {
326            path: "blub/.keep".parse().unwrap(),
327            size: 0,
328            executable: false,
329            digest: EMPTY_BLOB_DIGEST.clone(),
330        },
331    ])]
332    #[should_panic]
333    #[case::root_in_entry(vec![
334        IngestionEntry::Regular {
335            path: ".keep".parse().unwrap(),
336            size: 0,
337            executable: false,
338            digest: EMPTY_BLOB_DIGEST.clone(),
339        },
340        IngestionEntry::Dir {
341            path: "".parse().unwrap(),
342        },
343    ])]
344    #[tokio::test]
345    async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
346        let directory_service = MemoryDirectoryService::default();
347
348        let _ = ingest_entries(
349            directory_service.clone(),
350            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
351        )
352        .await;
353    }
354}