1use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::HashMap;
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42 directory_service: DS,
43 mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46 DS: DirectoryService,
47 S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48 E: std::error::Error,
49{
50 let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52 let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54 let root_node = loop {
55 let entry = entries
56 .next()
57 .await
58 .ok_or(IngestionError::UnexpectedEndOfStream)??;
61
62 let (path, node) = match entry {
63 IngestionEntry::Dir { path } => {
64 let directory = directories
69 .remove(&path)
70 .unwrap_or_default();
72
73 let directory_size = directory.size();
74 let directory_digest = directory.digest();
75
76 let directory_putter = maybe_directory_putter
80 .get_or_insert_with(|| directory_service.put_multiple_start());
81
82 match directory_putter.put(directory).await {
83 Ok(()) => (
84 path,
85 Node::Directory {
86 digest: directory_digest,
87 size: directory_size,
88 },
89 ),
90 Err(e) => {
91 return Err(IngestionError::UploadDirectoryError(path, e));
92 }
93 }
94 }
95 IngestionEntry::Symlink { path, target } => {
96 let target: crate::SymlinkTarget = bytes::Bytes::from(target)
97 .try_into()
98 .map_err(|err| IngestionError::InvalidSymlinkTarget(path.clone(), err))?;
99 (path, Node::Symlink { target })
100 }
101 IngestionEntry::Regular {
102 path,
103 size,
104 executable,
105 digest,
106 } => (
107 path,
108 Node::File {
109 digest,
110 size,
111 executable,
112 },
113 ),
114 };
115
116 let parent = path.parent().expect("Snix bug: got entry with root node");
117
118 if parent == crate::Path::ROOT {
119 break node;
120 } else {
121 let name = path
122 .file_name()
123 .unwrap_or_else(|| "".try_into().unwrap())
125 .to_owned();
126
127 directories
129 .entry_ref(parent)
130 .or_default()
131 .add(name, node)
132 .map_err(|e| IngestionError::UploadDirectoryError(path, Box::new(e)))?;
133 }
134 };
135
136 assert!(
137 entries.count().await == 0,
138 "Snix bug: left over elements in the stream"
139 );
140
141 assert!(
142 directories.is_empty(),
143 "Snix bug: left over directories after processing ingestion stream"
144 );
145
146 if let Some(mut directory_putter) = maybe_directory_putter {
149 #[cfg_attr(not(debug_assertions), allow(unused))]
150 let root_directory_digest = directory_putter
151 .close()
152 .await
153 .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
154
155 #[cfg(debug_assertions)]
156 {
157 if let Node::Directory { digest, .. } = &root_node {
158 debug_assert_eq!(&root_directory_digest, digest);
159 } else {
160 unreachable!("Snix bug: directory putter initialized but no root directory node");
161 }
162 }
163 };
164
165 Ok(root_node)
166}
167
168#[derive(Debug, Clone, Eq, PartialEq)]
169pub enum IngestionEntry {
170 Regular {
171 path: PathBuf,
172 size: u64,
173 executable: bool,
174 digest: B3Digest,
175 },
176 Symlink {
177 path: PathBuf,
178 target: Vec<u8>,
179 },
180 Dir {
181 path: PathBuf,
182 },
183}
184
185impl IngestionEntry {
186 fn path(&self) -> &Path {
187 match self {
188 IngestionEntry::Regular { path, .. } => path,
189 IngestionEntry::Symlink { path, .. } => path,
190 IngestionEntry::Dir { path } => path,
191 }
192 }
193
194 fn is_dir(&self) -> bool {
195 matches!(self, IngestionEntry::Dir { .. })
196 }
197}
198
199#[cfg(test)]
200mod test {
201 use rstest::rstest;
202
203 use crate::fixtures::DUMMY_DIGEST;
204 use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
205 use crate::utils::gen_test_directory_service;
206 use crate::{Directory, Node};
207
208 use super::IngestionEntry;
209 use super::ingest_entries;
210
211 #[rstest]
212 #[case::single_file(vec![IngestionEntry::Regular {
213 path: "foo".parse().unwrap(),
214 size: 42,
215 executable: true,
216 digest: *DUMMY_DIGEST,
217 }],
218 Node::File{digest: *DUMMY_DIGEST, size: 42, executable: true}
219 )]
220 #[case::single_symlink(vec![IngestionEntry::Symlink {
221 path: "foo".parse().unwrap(),
222 target: b"blub".into(),
223 }],
224 Node::Symlink{target: "blub".try_into().unwrap()}
225 )]
226 #[case::single_dir(vec![IngestionEntry::Dir {
227 path: "foo".parse().unwrap(),
228 }],
229 Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
230 )]
231 #[case::dir_with_keep(vec![
232 IngestionEntry::Regular {
233 path: "foo/.keep".parse().unwrap(),
234 size: 0,
235 executable: false,
236 digest: *EMPTY_BLOB_DIGEST,
237 },
238 IngestionEntry::Dir {
239 path: "foo".parse().unwrap(),
240 },
241 ],
242 Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
243 )]
244 #[case::directory_complicated(vec![
247 IngestionEntry::Regular {
248 path: "blub/.keep".parse().unwrap(),
249 size: 0,
250 executable: false,
251 digest: *EMPTY_BLOB_DIGEST,
252 },
253 IngestionEntry::Regular {
254 path: "blub/keep/.keep".parse().unwrap(),
255 size: 0,
256 executable: false,
257 digest: *EMPTY_BLOB_DIGEST,
258 },
259 IngestionEntry::Dir {
260 path: "blub/keep".parse().unwrap(),
261 },
262 IngestionEntry::Symlink {
263 path: "blub/aa".parse().unwrap(),
264 target: b"/nix/store/somewhereelse".into(),
265 },
266 IngestionEntry::Dir {
267 path: "blub".parse().unwrap(),
268 },
269 ],
270 Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
271 )]
272 #[tokio::test]
273 async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
274 let root_node = ingest_entries(
275 gen_test_directory_service(),
276 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
277 )
278 .await
279 .expect("must succeed");
280
281 assert_eq!(exp_root_node, root_node, "root node should match");
282 }
283
284 #[rstest]
285 #[case::empty_entries(vec![])]
286 #[case::missing_intermediate_dir(vec![
287 IngestionEntry::Regular {
288 path: "blub/.keep".parse().unwrap(),
289 size: 0,
290 executable: false,
291 digest: *EMPTY_BLOB_DIGEST,
292 },
293 ])]
294 #[tokio::test]
295 async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
296 use crate::import::IngestionError;
297
298 let result = ingest_entries(
299 gen_test_directory_service(),
300 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
301 )
302 .await;
303 assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
304 }
305
306 #[rstest]
307 #[should_panic]
308 #[case::leaf_after_parent(vec![
309 IngestionEntry::Dir {
310 path: "blub".parse().unwrap(),
311 },
312 IngestionEntry::Regular {
313 path: "blub/.keep".parse().unwrap(),
314 size: 0,
315 executable: false,
316 digest: *EMPTY_BLOB_DIGEST,
317 },
318 ])]
319 #[should_panic]
320 #[case::root_in_entry(vec![
321 IngestionEntry::Regular {
322 path: ".keep".parse().unwrap(),
323 size: 0,
324 executable: false,
325 digest: *EMPTY_BLOB_DIGEST,
326 },
327 IngestionEntry::Dir {
328 path: "".parse().unwrap(),
329 },
330 ])]
331 #[tokio::test]
332 async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
333 let _ = ingest_entries(
334 gen_test_directory_service(),
335 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
336 )
337 .await;
338 }
339}