1use std::collections::HashMap;
4
5use petgraph::Direction;
6use petgraph::graph::{DiGraph, NodeIndex};
7use petgraph::visit::{DfsPostOrder, EdgeRef};
8use tokio::io::AsyncRead;
9use tokio_stream::StreamExt;
10use tokio_tar::Archive;
11use tracing::{Level, instrument, warn};
12
13use crate::Node;
14use crate::blobservice::BlobService;
15use crate::directoryservice::DirectoryService;
16use crate::import::{IngestionEntry, IngestionError, ingest_entries};
17
18use super::blobs::{self, ConcurrentBlobUploader};
19
20type TarPathBuf = std::path::PathBuf;
21
22#[derive(Debug, thiserror::Error)]
23pub enum Error {
24 #[error("unable to construct stream of entries: {0}")]
25 Entries(std::io::Error),
26
27 #[error("unable to read next entry: {0}")]
28 NextEntry(std::io::Error),
29
30 #[error("unable to read path for entry: {0}")]
31 PathRead(std::io::Error),
32
33 #[error("unable to convert path {0} for entry: {1}")]
34 PathConvert(TarPathBuf, std::io::Error),
35
36 #[error("unable to read size field for {0}: {1}")]
37 Size(TarPathBuf, std::io::Error),
38
39 #[error("unable to read mode field for {0}: {1}")]
40 Mode(TarPathBuf, std::io::Error),
41
42 #[error("unable to read link name field for {0}: {1}")]
43 LinkName(TarPathBuf, std::io::Error),
44
45 #[error("unsupported tar entry {0} type: {1:?}")]
46 EntryType(TarPathBuf, tokio_tar::EntryType),
47
48 #[error("symlink missing target {0}")]
49 MissingSymlinkTarget(TarPathBuf),
50
51 #[error("unexpected number of top level directory entries")]
52 UnexpectedNumberOfTopLevelEntries,
53
54 #[error(transparent)]
55 BlobUploadError(#[from] blobs::Error),
56}
57
58#[instrument(skip_all, ret(level = Level::TRACE), err)]
61pub async fn ingest_archive<BS, DS, R>(
62 blob_service: BS,
63 directory_service: DS,
64 mut archive: Archive<R>,
65) -> Result<Node, IngestionError<Error>>
66where
67 BS: BlobService + Clone + 'static,
68 DS: DirectoryService,
69 R: AsyncRead + Unpin,
70{
71 let mut nodes = IngestionEntryGraph::new();
77
78 let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
79
80 let mut entries_iter = archive.entries().map_err(Error::Entries)?;
81 while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
82 let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
83
84 let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
86 .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
87
88 let header = entry.header();
89 let entry = match header.entry_type() {
90 tokio_tar::EntryType::Regular
91 | tokio_tar::EntryType::GNUSparse
92 | tokio_tar::EntryType::Continuous => {
93 let size = header
94 .size()
95 .map_err(|e| Error::Size(tar_path.clone(), e))?;
96
97 let digest = blob_uploader
98 .upload(&path, size, &mut entry)
99 .await
100 .map_err(Error::BlobUploadError)?;
101
102 let executable = entry
103 .header()
104 .mode()
105 .map_err(|e| Error::Mode(tar_path, e))?
106 & 64
107 != 0;
108
109 IngestionEntry::Regular {
110 path,
111 size,
112 executable,
113 digest,
114 }
115 }
116 tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
117 target: entry
118 .link_name()
119 .map_err(|e| Error::LinkName(tar_path.clone(), e))?
120 .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
121 .into_owned()
122 .into_os_string()
123 .into_encoded_bytes(),
124 path,
125 },
126 tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
130
131 tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
132
133 entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
134 };
135
136 nodes.add(entry)?;
137 }
138
139 blob_uploader.join().await.map_err(Error::BlobUploadError)?;
140
141 let root_node = ingest_entries(
142 directory_service,
143 futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
144 )
145 .await?;
146
147 Ok(root_node)
148}
149
150struct IngestionEntryGraph {
165 graph: DiGraph<IngestionEntry, ()>,
166 path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
167 root_node: Option<NodeIndex>,
168}
169
170impl Default for IngestionEntryGraph {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl IngestionEntryGraph {
177 pub fn new() -> Self {
179 IngestionEntryGraph {
180 graph: DiGraph::new(),
181 path_to_index: HashMap::new(),
182 root_node: None,
183 }
184 }
185
186 pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
191 let path = entry.path().to_owned();
192
193 let index = match self.path_to_index.get(entry.path()) {
194 Some(&index) => {
195 if !entry.is_dir() || !self.get_node(index).is_dir() {
198 self.replace_node(index, entry);
199 }
200
201 index
202 }
203 None => self.graph.add_node(entry),
204 };
205
206 if path.components().count() == 1 {
208 if let Some(root_node) = self.root_node {
211 if self.get_node(root_node).path() != path.as_ref() {
212 return Err(Error::UnexpectedNumberOfTopLevelEntries);
213 }
214 }
215
216 self.root_node = Some(index)
217 } else if let Some(parent_path) = path.parent() {
218 let parent_index = self.add(IngestionEntry::Dir {
220 path: parent_path.to_owned(),
221 })?;
222
223 self.graph.add_edge(parent_index, index, ());
225 }
226
227 self.path_to_index.insert(path, index);
228
229 Ok(index)
230 }
231
232 pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> {
236 let Some(root_node_index) = self.root_node else {
238 return Err(Error::UnexpectedNumberOfTopLevelEntries);
239 };
240
241 if !self.get_node(root_node_index).is_dir() {
243 return Err(Error::UnexpectedNumberOfTopLevelEntries);
244 }
245
246 let mut traversal = DfsPostOrder::new(&self.graph, root_node_index);
247 let mut nodes = Vec::with_capacity(self.graph.node_count());
248 while let Some(node_index) = traversal.next(&self.graph) {
249 nodes.push(self.get_node(node_index).clone());
250 }
251
252 Ok(nodes)
253 }
254
255 fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) {
259 let entry = self
260 .graph
261 .node_weight_mut(index)
262 .expect("Snix bug: missing node entry");
263
264 debug_assert!(!(entry.is_dir() && new_entry.is_dir()));
265
266 warn!(
268 "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}",
269 entry.path(),
270 &entry,
271 &new_entry
272 );
273 *entry = new_entry;
274
275 let edges = self
277 .graph
278 .edges_directed(index, Direction::Outgoing)
279 .map(|edge| edge.id())
280 .collect::<Vec<_>>();
281 for edge in edges {
282 self.graph.remove_edge(edge);
283 }
284 }
285
286 fn get_node(&self, index: NodeIndex) -> &IngestionEntry {
287 self.graph
288 .node_weight(index)
289 .expect("Snix bug: missing node entry")
290 }
291}
292
293#[cfg(test)]
294mod test {
295 use std::sync::LazyLock;
296
297 use super::{Error, IngestionEntryGraph};
298 use crate::B3Digest;
299 use crate::import::IngestionEntry;
300
301 use rstest::rstest;
302
303 pub static EMPTY_DIGEST: LazyLock<B3Digest> =
304 LazyLock::new(|| blake3::hash(&[]).as_bytes().into());
305 pub static DIR_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
306 path: "a".parse().unwrap(),
307 });
308 pub static DIR_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
309 path: "b".parse().unwrap(),
310 });
311 pub static DIR_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
312 path: "a/b".parse().unwrap(),
313 });
314 pub static FILE_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
315 path: "a".parse().unwrap(),
316 size: 0,
317 executable: false,
318 digest: EMPTY_DIGEST.clone(),
319 });
320 pub static FILE_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
321 path: "a/b".parse().unwrap(),
322 size: 0,
323 executable: false,
324 digest: EMPTY_DIGEST.clone(),
325 });
326 pub static FILE_A_B_C: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
327 path: "a/b/c".parse().unwrap(),
328 size: 0,
329 executable: false,
330 digest: EMPTY_DIGEST.clone(),
331 });
332
333 #[rstest]
334 #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
335 #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
336 #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])]
337 fn node_ingestion_success(
338 #[case] in_entries: &[&IngestionEntry],
339 #[case] exp_entries: &[&IngestionEntry],
340 ) {
341 let mut nodes = IngestionEntryGraph::new();
342
343 for entry in in_entries {
344 nodes.add((*entry).clone()).expect("failed to add entry");
345 }
346
347 let entries = nodes.finalize().expect("invalid entries");
348
349 let exp_entries: Vec<IngestionEntry> =
350 exp_entries.iter().map(|entry| (*entry).clone()).collect();
351
352 assert_eq!(entries, exp_entries);
353 }
354
355 #[rstest]
356 #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)]
357 #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)]
358 #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)]
359 fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) {
360 let mut nodes = IngestionEntryGraph::new();
361
362 let result = (|| {
363 for entry in in_entries {
364 nodes.add((*entry).clone())?;
365 }
366 nodes.finalize()
367 })();
368
369 let error = result.expect_err("expected error");
370 assert_eq!(error.to_string(), exp_error.to_string());
371 }
372}