1use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::{HashMap, HashSet, hash_set};
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42 directory_service: DS,
43 mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46 DS: DirectoryService,
47 S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48 E: std::error::Error,
49{
50 let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52 let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54 let mut sent_directories: HashSet<B3Digest> = HashSet::new();
57
58 let root_node = loop {
59 let entry = entries
60 .next()
61 .await
62 .ok_or(IngestionError::UnexpectedEndOfStream)??;
65
66 let (path, node) = match entry {
67 IngestionEntry::Dir { path } => {
68 let directory = directories
73 .remove(&path)
74 .unwrap_or_default();
76
77 let directory_size = directory.size();
78 let directory_digest = directory.digest();
79
80 let directory_putter = maybe_directory_putter
82 .get_or_insert_with(|| directory_service.put_multiple_start());
83
84 if let hash_set::Entry::Vacant(vacant_entry) =
86 sent_directories.entry(directory_digest)
87 {
88 if let Err(e) = directory_putter.put(directory).await {
90 return Err(IngestionError::UploadDirectoryError(path, e));
91 }
92 vacant_entry.insert();
94 }
95
96 (
98 path,
99 Node::Directory {
100 digest: directory_digest,
101 size: directory_size,
102 },
103 )
104 }
105 IngestionEntry::Symlink { path, target } => {
106 let target: crate::SymlinkTarget = bytes::Bytes::from(target)
107 .try_into()
108 .map_err(|err| IngestionError::InvalidSymlinkTarget(path.clone(), err))?;
109 (path, Node::Symlink { target })
110 }
111 IngestionEntry::Regular {
112 path,
113 size,
114 executable,
115 digest,
116 } => (
117 path,
118 Node::File {
119 digest,
120 size,
121 executable,
122 },
123 ),
124 };
125
126 let parent = path.parent().expect("Snix bug: got entry with root node");
127
128 if parent == crate::Path::ROOT {
129 break node;
130 } else {
131 let name = path
132 .file_name()
133 .unwrap_or_else(|| "".try_into().unwrap())
135 .to_owned();
136
137 directories
139 .entry_ref(parent)
140 .or_default()
141 .add(name, node)
142 .map_err(|e| IngestionError::UploadDirectoryError(path, Box::new(e)))?;
143 }
144 };
145
146 assert!(
147 entries.count().await == 0,
148 "Snix bug: left over elements in the stream"
149 );
150
151 assert!(
152 directories.is_empty(),
153 "Snix bug: left over directories after processing ingestion stream"
154 );
155
156 if let Some(mut directory_putter) = maybe_directory_putter {
159 #[cfg_attr(not(debug_assertions), allow(unused))]
160 let root_directory_digest = directory_putter
161 .close()
162 .await
163 .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
164
165 #[cfg(debug_assertions)]
166 {
167 if let Node::Directory { digest, .. } = &root_node {
168 debug_assert_eq!(&root_directory_digest, digest);
169 } else {
170 unreachable!("Snix bug: directory putter initialized but no root directory node");
171 }
172 }
173 };
174
175 Ok(root_node)
176}
177
178#[derive(Debug, Clone, Eq, PartialEq)]
179pub enum IngestionEntry {
180 Regular {
181 path: PathBuf,
182 size: u64,
183 executable: bool,
184 digest: B3Digest,
185 },
186 Symlink {
187 path: PathBuf,
188 target: Vec<u8>,
189 },
190 Dir {
191 path: PathBuf,
192 },
193}
194
195impl IngestionEntry {
196 fn path(&self) -> &Path {
197 match self {
198 IngestionEntry::Regular { path, .. } => path,
199 IngestionEntry::Symlink { path, .. } => path,
200 IngestionEntry::Dir { path } => path,
201 }
202 }
203
204 fn is_dir(&self) -> bool {
205 matches!(self, IngestionEntry::Dir { .. })
206 }
207}
208
209#[cfg(test)]
210mod test {
211 use rstest::rstest;
212
213 use crate::fixtures::DUMMY_DIGEST;
214 use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
215 use crate::utils::gen_test_directory_service;
216 use crate::{Directory, Node};
217
218 use super::IngestionEntry;
219 use super::ingest_entries;
220
221 #[rstest]
222 #[case::single_file(vec![IngestionEntry::Regular {
223 path: "foo".parse().unwrap(),
224 size: 42,
225 executable: true,
226 digest: *DUMMY_DIGEST,
227 }],
228 Node::File{digest: *DUMMY_DIGEST, size: 42, executable: true}
229 )]
230 #[case::single_symlink(vec![IngestionEntry::Symlink {
231 path: "foo".parse().unwrap(),
232 target: b"blub".into(),
233 }],
234 Node::Symlink{target: "blub".try_into().unwrap()}
235 )]
236 #[case::single_dir(vec![IngestionEntry::Dir {
237 path: "foo".parse().unwrap(),
238 }],
239 Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
240 )]
241 #[case::dir_with_keep(vec![
242 IngestionEntry::Regular {
243 path: "foo/.keep".parse().unwrap(),
244 size: 0,
245 executable: false,
246 digest: *EMPTY_BLOB_DIGEST,
247 },
248 IngestionEntry::Dir {
249 path: "foo".parse().unwrap(),
250 },
251 ],
252 Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
253 )]
254 #[case::directory_complicated(vec![
257 IngestionEntry::Regular {
258 path: "blub/.keep".parse().unwrap(),
259 size: 0,
260 executable: false,
261 digest: *EMPTY_BLOB_DIGEST,
262 },
263 IngestionEntry::Regular {
264 path: "blub/keep/.keep".parse().unwrap(),
265 size: 0,
266 executable: false,
267 digest: *EMPTY_BLOB_DIGEST,
268 },
269 IngestionEntry::Dir {
270 path: "blub/keep".parse().unwrap(),
271 },
272 IngestionEntry::Symlink {
273 path: "blub/aa".parse().unwrap(),
274 target: b"/nix/store/somewhereelse".into(),
275 },
276 IngestionEntry::Dir {
277 path: "blub".parse().unwrap(),
278 },
279 ],
280 Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
281 )]
282 #[tokio::test]
283 async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
284 let root_node = ingest_entries(
285 gen_test_directory_service(),
286 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
287 )
288 .await
289 .expect("must succeed");
290
291 assert_eq!(exp_root_node, root_node, "root node should match");
292 }
293
294 #[rstest]
295 #[case::empty_entries(vec![])]
296 #[case::missing_intermediate_dir(vec![
297 IngestionEntry::Regular {
298 path: "blub/.keep".parse().unwrap(),
299 size: 0,
300 executable: false,
301 digest: *EMPTY_BLOB_DIGEST,
302 },
303 ])]
304 #[tokio::test]
305 async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
306 use crate::import::IngestionError;
307
308 let result = ingest_entries(
309 gen_test_directory_service(),
310 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
311 )
312 .await;
313 assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
314 }
315
316 #[rstest]
317 #[should_panic]
318 #[case::leaf_after_parent(vec![
319 IngestionEntry::Dir {
320 path: "blub".parse().unwrap(),
321 },
322 IngestionEntry::Regular {
323 path: "blub/.keep".parse().unwrap(),
324 size: 0,
325 executable: false,
326 digest: *EMPTY_BLOB_DIGEST,
327 },
328 ])]
329 #[should_panic]
330 #[case::root_in_entry(vec![
331 IngestionEntry::Regular {
332 path: ".keep".parse().unwrap(),
333 size: 0,
334 executable: false,
335 digest: *EMPTY_BLOB_DIGEST,
336 },
337 IngestionEntry::Dir {
338 path: "".parse().unwrap(),
339 },
340 ])]
341 #[tokio::test]
342 async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
343 let _ = ingest_entries(
344 gen_test_directory_service(),
345 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
346 )
347 .await;
348 }
349}