1use crate::directoryservice::{DirectoryPutter, DirectoryService};
8use crate::path::{Path, PathBuf};
9use crate::{B3Digest, Directory, Node};
10use futures::{Stream, StreamExt};
11use tracing::Level;
12
13use hashbrown::HashMap;
14use tracing::instrument;
15
16mod error;
17pub use error::IngestionError;
18
19pub mod archive;
20pub mod blobs;
21pub mod fs;
22
23#[instrument(skip_all, ret(level = Level::TRACE), err)]
41pub async fn ingest_entries<DS, S, E>(
42 directory_service: DS,
43 mut entries: S,
44) -> Result<Node, IngestionError<E>>
45where
46 DS: DirectoryService,
47 S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
48 E: std::error::Error,
49{
50 let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
52 let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
53
54 let root_node = loop {
55 let entry = entries
56 .next()
57 .await
58 .ok_or(IngestionError::UnexpectedEndOfStream)??;
61
62 let (path, node) = match entry {
63 IngestionEntry::Dir { path } => {
64 let directory = directories
69 .remove(&path)
70 .unwrap_or_default();
72
73 let directory_size = directory.size();
74 let directory_digest = directory.digest();
75
76 let directory_putter = maybe_directory_putter
80 .get_or_insert_with(|| directory_service.put_multiple_start());
81
82 match directory_putter.put(directory).await {
83 Ok(()) => (
84 path,
85 Node::Directory {
86 digest: directory_digest,
87 size: directory_size,
88 },
89 ),
90 Err(e) => {
91 return Err(IngestionError::UploadDirectoryError(path, e));
92 }
93 }
94 }
95 IngestionEntry::Symlink { path, target } => {
96 match bytes::Bytes::from(target).try_into() {
97 Ok(target) => (path, Node::Symlink { target }),
98 Err(e) => {
99 return Err(IngestionError::UploadDirectoryError(
100 path,
101 crate::Error::StorageError(format!("invalid symlink target: {}", e)),
102 ));
103 }
104 }
105 }
106 IngestionEntry::Regular {
107 path,
108 size,
109 executable,
110 digest,
111 } => (
112 path,
113 Node::File {
114 digest: digest.clone(),
115 size,
116 executable,
117 },
118 ),
119 };
120
121 let parent = path.parent().expect("Snix bug: got entry with root node");
122
123 if parent == crate::Path::ROOT {
124 break node;
125 } else {
126 let name = path
127 .file_name()
128 .unwrap_or_else(|| "".try_into().unwrap())
130 .to_owned();
131
132 directories
134 .entry_ref(parent)
135 .or_default()
136 .add(name, node)
137 .map_err(|e| {
138 IngestionError::UploadDirectoryError(
139 path,
140 crate::Error::StorageError(e.to_string()),
141 )
142 })?;
143 }
144 };
145
146 assert!(
147 entries.count().await == 0,
148 "Snix bug: left over elements in the stream"
149 );
150
151 assert!(
152 directories.is_empty(),
153 "Snix bug: left over directories after processing ingestion stream"
154 );
155
156 if let Some(mut directory_putter) = maybe_directory_putter {
159 #[cfg_attr(not(debug_assertions), allow(unused))]
160 let root_directory_digest = directory_putter
161 .close()
162 .await
163 .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
164
165 #[cfg(debug_assertions)]
166 {
167 if let Node::Directory { digest, .. } = &root_node {
168 debug_assert_eq!(&root_directory_digest, digest);
169 } else {
170 unreachable!("Snix bug: directory putter initialized but no root directory node");
171 }
172 }
173 };
174
175 Ok(root_node)
176}
177
178#[derive(Debug, Clone, Eq, PartialEq)]
179pub enum IngestionEntry {
180 Regular {
181 path: PathBuf,
182 size: u64,
183 executable: bool,
184 digest: B3Digest,
185 },
186 Symlink {
187 path: PathBuf,
188 target: Vec<u8>,
189 },
190 Dir {
191 path: PathBuf,
192 },
193}
194
195impl IngestionEntry {
196 fn path(&self) -> &Path {
197 match self {
198 IngestionEntry::Regular { path, .. } => path,
199 IngestionEntry::Symlink { path, .. } => path,
200 IngestionEntry::Dir { path } => path,
201 }
202 }
203
204 fn is_dir(&self) -> bool {
205 matches!(self, IngestionEntry::Dir { .. })
206 }
207}
208
209#[cfg(test)]
210mod test {
211 use rstest::rstest;
212
213 use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
214 use crate::{Directory, Node};
215 use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST};
216
217 use super::IngestionEntry;
218 use super::ingest_entries;
219
220 #[rstest]
221 #[case::single_file(vec![IngestionEntry::Regular {
222 path: "foo".parse().unwrap(),
223 size: 42,
224 executable: true,
225 digest: DUMMY_DIGEST.clone(),
226 }],
227 Node::File{digest: DUMMY_DIGEST.clone(), size: 42, executable: true}
228 )]
229 #[case::single_symlink(vec![IngestionEntry::Symlink {
230 path: "foo".parse().unwrap(),
231 target: b"blub".into(),
232 }],
233 Node::Symlink{target: "blub".try_into().unwrap()}
234 )]
235 #[case::single_dir(vec![IngestionEntry::Dir {
236 path: "foo".parse().unwrap(),
237 }],
238 Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
239 )]
240 #[case::dir_with_keep(vec![
241 IngestionEntry::Regular {
242 path: "foo/.keep".parse().unwrap(),
243 size: 0,
244 executable: false,
245 digest: EMPTY_BLOB_DIGEST.clone(),
246 },
247 IngestionEntry::Dir {
248 path: "foo".parse().unwrap(),
249 },
250 ],
251 Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
252 )]
253 #[case::directory_complicated(vec![
256 IngestionEntry::Regular {
257 path: "blub/.keep".parse().unwrap(),
258 size: 0,
259 executable: false,
260 digest: EMPTY_BLOB_DIGEST.clone(),
261 },
262 IngestionEntry::Regular {
263 path: "blub/keep/.keep".parse().unwrap(),
264 size: 0,
265 executable: false,
266 digest: EMPTY_BLOB_DIGEST.clone(),
267 },
268 IngestionEntry::Dir {
269 path: "blub/keep".parse().unwrap(),
270 },
271 IngestionEntry::Symlink {
272 path: "blub/aa".parse().unwrap(),
273 target: b"/nix/store/somewhereelse".into(),
274 },
275 IngestionEntry::Dir {
276 path: "blub".parse().unwrap(),
277 },
278 ],
279 Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
280 )]
281 #[tokio::test]
282 async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
283 let directory_service = MemoryDirectoryService::default();
284
285 let root_node = ingest_entries(
286 directory_service.clone(),
287 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
288 )
289 .await
290 .expect("must succeed");
291
292 assert_eq!(exp_root_node, root_node, "root node should match");
293 }
294
295 #[rstest]
296 #[case::empty_entries(vec![])]
297 #[case::missing_intermediate_dir(vec![
298 IngestionEntry::Regular {
299 path: "blub/.keep".parse().unwrap(),
300 size: 0,
301 executable: false,
302 digest: EMPTY_BLOB_DIGEST.clone(),
303 },
304 ])]
305 #[tokio::test]
306 async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
307 use crate::import::IngestionError;
308
309 let directory_service = MemoryDirectoryService::default();
310
311 let result = ingest_entries(
312 directory_service.clone(),
313 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
314 )
315 .await;
316 assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
317 }
318
319 #[rstest]
320 #[should_panic]
321 #[case::leaf_after_parent(vec![
322 IngestionEntry::Dir {
323 path: "blub".parse().unwrap(),
324 },
325 IngestionEntry::Regular {
326 path: "blub/.keep".parse().unwrap(),
327 size: 0,
328 executable: false,
329 digest: EMPTY_BLOB_DIGEST.clone(),
330 },
331 ])]
332 #[should_panic]
333 #[case::root_in_entry(vec![
334 IngestionEntry::Regular {
335 path: ".keep".parse().unwrap(),
336 size: 0,
337 executable: false,
338 digest: EMPTY_BLOB_DIGEST.clone(),
339 },
340 IngestionEntry::Dir {
341 path: "".parse().unwrap(),
342 },
343 ])]
344 #[tokio::test]
345 async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
346 let directory_service = MemoryDirectoryService::default();
347
348 let _ = ingest_entries(
349 directory_service.clone(),
350 futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
351 )
352 .await;
353 }
354}