snix_castore/import/
archive.rs

1//! Imports from an archive (tarballs)
2
3use std::collections::HashMap;
4
5use petgraph::Direction;
6use petgraph::graph::{DiGraph, NodeIndex};
7use petgraph::visit::{DfsPostOrder, EdgeRef};
8use tokio::io::AsyncRead;
9use tokio_stream::StreamExt;
10use tokio_tar::Archive;
11use tracing::{Level, instrument, warn};
12
13use crate::Node;
14use crate::blobservice::BlobService;
15use crate::directoryservice::DirectoryService;
16use crate::import::{IngestionEntry, IngestionError, ingest_entries};
17
18use super::blobs::{self, ConcurrentBlobUploader};
19
20type TarPathBuf = std::path::PathBuf;
21
22#[derive(Debug, thiserror::Error)]
23pub enum Error {
24    #[error("unable to construct stream of entries: {0}")]
25    Entries(std::io::Error),
26
27    #[error("unable to read next entry: {0}")]
28    NextEntry(std::io::Error),
29
30    #[error("unable to read path for entry: {0}")]
31    PathRead(std::io::Error),
32
33    #[error("unable to convert path {0} for entry: {1}")]
34    PathConvert(TarPathBuf, std::io::Error),
35
36    #[error("unable to read size field for {0}: {1}")]
37    Size(TarPathBuf, std::io::Error),
38
39    #[error("unable to read mode field for {0}: {1}")]
40    Mode(TarPathBuf, std::io::Error),
41
42    #[error("unable to read link name field for {0}: {1}")]
43    LinkName(TarPathBuf, std::io::Error),
44
45    #[error("unsupported tar entry {0} type: {1:?}")]
46    EntryType(TarPathBuf, tokio_tar::EntryType),
47
48    #[error("symlink missing target {0}")]
49    MissingSymlinkTarget(TarPathBuf),
50
51    #[error("unexpected number of top level directory entries")]
52    UnexpectedNumberOfTopLevelEntries,
53
54    #[error(transparent)]
55    BlobUploadError(#[from] blobs::Error),
56}
57
58/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
59/// [`DirectoryService`].
60#[instrument(skip_all, ret(level = Level::TRACE), err)]
61pub async fn ingest_archive<BS, DS, R>(
62    blob_service: BS,
63    directory_service: DS,
64    mut archive: Archive<R>,
65) -> Result<Node, IngestionError<Error>>
66where
67    BS: BlobService + Clone + 'static,
68    DS: DirectoryService,
69    R: AsyncRead + Unpin,
70{
71    // Since tarballs can have entries in any arbitrary order, we need to
72    // buffer all of the directory metadata so we can reorder directory
73    // contents and entries to meet the requires of the castore.
74
75    // In the first phase, collect up all the regular files and symlinks.
76    let mut nodes = IngestionEntryGraph::new();
77
78    let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
79
80    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
81    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
82        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
83
84        // construct a castore PathBuf, which we use in the produced IngestionEntry.
85        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
86            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
87
88        let header = entry.header();
89        let entry = match header.entry_type() {
90            tokio_tar::EntryType::Regular
91            | tokio_tar::EntryType::GNUSparse
92            | tokio_tar::EntryType::Continuous => {
93                let size = header
94                    .size()
95                    .map_err(|e| Error::Size(tar_path.clone(), e))?;
96
97                let digest = blob_uploader
98                    .upload(&path, size, &mut entry)
99                    .await
100                    .map_err(Error::BlobUploadError)?;
101
102                let executable = entry
103                    .header()
104                    .mode()
105                    .map_err(|e| Error::Mode(tar_path, e))?
106                    & 64
107                    != 0;
108
109                IngestionEntry::Regular {
110                    path,
111                    size,
112                    executable,
113                    digest,
114                }
115            }
116            tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
117                target: entry
118                    .link_name()
119                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
120                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
121                    .into_owned()
122                    .into_os_string()
123                    .into_encoded_bytes(),
124                path,
125            },
126            // Push a bogus directory marker so we can make sure this directoy gets
127            // created. We don't know the digest and size until after reading the full
128            // tarball.
129            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
130
131            tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
132
133            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
134        };
135
136        nodes.add(entry)?;
137    }
138
139    blob_uploader.join().await.map_err(Error::BlobUploadError)?;
140
141    let root_node = ingest_entries(
142        directory_service,
143        futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
144    )
145    .await?;
146
147    Ok(root_node)
148}
149
150/// Keep track of the directory structure of a file tree being ingested. This is used
151/// for ingestion sources which do not provide any ordering or uniqueness guarantees
152/// like tarballs.
153///
154/// If we ingest multiple entries with the same paths and both entries are not directories,
155/// the newer entry will replace the latter entry, disconnecting the old node's children
156/// from the graph.
157///
158/// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return
159/// a list of entries compute by performaing a DFS post order traversal of the graph
160/// from the top-level directory entry.
161///
162/// This expects the directory structure to contain a single top-level directory entry.
163/// An error is returned if this is not the case and ingestion will fail.
164struct IngestionEntryGraph {
165    graph: DiGraph<IngestionEntry, ()>,
166    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
167    root_node: Option<NodeIndex>,
168}
169
170impl Default for IngestionEntryGraph {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl IngestionEntryGraph {
177    /// Creates a new ingestion entry graph.
178    pub fn new() -> Self {
179        IngestionEntryGraph {
180            graph: DiGraph::new(),
181            path_to_index: HashMap::new(),
182            root_node: None,
183        }
184    }
185
186    /// Adds a new entry to the graph. Parent directories are automatically inserted.
187    /// If a node exists in the graph with the same name as the new entry and both the old
188    /// and new nodes are not directories, the node is replaced and is disconnected from its
189    /// children.
190    pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
191        let path = entry.path().to_owned();
192
193        let index = match self.path_to_index.get(entry.path()) {
194            Some(&index) => {
195                // If either the old entry or new entry are not directories, we'll replace the old
196                // entry.
197                if !entry.is_dir() || !self.get_node(index).is_dir() {
198                    self.replace_node(index, entry);
199                }
200
201                index
202            }
203            None => self.graph.add_node(entry),
204        };
205
206        // for archives, a path with 1 component is the root node
207        if path.components().count() == 1 {
208            // We expect archives to contain a single root node, if there is another root node
209            // entry with a different path name, this is unsupported.
210            if let Some(root_node) = self.root_node {
211                if self.get_node(root_node).path() != path.as_ref() {
212                    return Err(Error::UnexpectedNumberOfTopLevelEntries);
213                }
214            }
215
216            self.root_node = Some(index)
217        } else if let Some(parent_path) = path.parent() {
218            // Recursively add the parent node until it hits the root node.
219            let parent_index = self.add(IngestionEntry::Dir {
220                path: parent_path.to_owned(),
221            })?;
222
223            // Insert an edge from the parent directory to the child entry.
224            self.graph.add_edge(parent_index, index, ());
225        }
226
227        self.path_to_index.insert(path, index);
228
229        Ok(index)
230    }
231
232    /// Traverses the graph in DFS post order and collects the entries into a [`Vec<IngestionEntry>`].
233    ///
234    /// Unreachable parts of the graph are not included in the result.
235    pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> {
236        // There must be a root node.
237        let Some(root_node_index) = self.root_node else {
238            return Err(Error::UnexpectedNumberOfTopLevelEntries);
239        };
240
241        // The root node must be a directory.
242        if !self.get_node(root_node_index).is_dir() {
243            return Err(Error::UnexpectedNumberOfTopLevelEntries);
244        }
245
246        let mut traversal = DfsPostOrder::new(&self.graph, root_node_index);
247        let mut nodes = Vec::with_capacity(self.graph.node_count());
248        while let Some(node_index) = traversal.next(&self.graph) {
249            nodes.push(self.get_node(node_index).clone());
250        }
251
252        Ok(nodes)
253    }
254
255    /// Replaces the node with the specified entry. The node's children are disconnected.
256    ///
257    /// This should never be called if both the old and new nodes are directories.
258    fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) {
259        let entry = self
260            .graph
261            .node_weight_mut(index)
262            .expect("Snix bug: missing node entry");
263
264        debug_assert!(!(entry.is_dir() && new_entry.is_dir()));
265
266        // Replace the node itself.
267        warn!(
268            "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}",
269            entry.path(),
270            &entry,
271            &new_entry
272        );
273        *entry = new_entry;
274
275        // Remove any outgoing edges to disconnect the old node's children.
276        let edges = self
277            .graph
278            .edges_directed(index, Direction::Outgoing)
279            .map(|edge| edge.id())
280            .collect::<Vec<_>>();
281        for edge in edges {
282            self.graph.remove_edge(edge);
283        }
284    }
285
286    fn get_node(&self, index: NodeIndex) -> &IngestionEntry {
287        self.graph
288            .node_weight(index)
289            .expect("Snix bug: missing node entry")
290    }
291}
292
293#[cfg(test)]
294mod test {
295    use std::sync::LazyLock;
296
297    use super::{Error, IngestionEntryGraph};
298    use crate::B3Digest;
299    use crate::import::IngestionEntry;
300
301    use rstest::rstest;
302
303    pub static EMPTY_DIGEST: LazyLock<B3Digest> =
304        LazyLock::new(|| blake3::hash(&[]).as_bytes().into());
305    pub static DIR_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
306        path: "a".parse().unwrap(),
307    });
308    pub static DIR_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
309        path: "b".parse().unwrap(),
310    });
311    pub static DIR_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
312        path: "a/b".parse().unwrap(),
313    });
314    pub static FILE_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
315        path: "a".parse().unwrap(),
316        size: 0,
317        executable: false,
318        digest: EMPTY_DIGEST.clone(),
319    });
320    pub static FILE_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
321        path: "a/b".parse().unwrap(),
322        size: 0,
323        executable: false,
324        digest: EMPTY_DIGEST.clone(),
325    });
326    pub static FILE_A_B_C: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
327        path: "a/b/c".parse().unwrap(),
328        size: 0,
329        executable: false,
330        digest: EMPTY_DIGEST.clone(),
331    });
332
333    #[rstest]
334    #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
335    #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
336    #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])]
337    fn node_ingestion_success(
338        #[case] in_entries: &[&IngestionEntry],
339        #[case] exp_entries: &[&IngestionEntry],
340    ) {
341        let mut nodes = IngestionEntryGraph::new();
342
343        for entry in in_entries {
344            nodes.add((*entry).clone()).expect("failed to add entry");
345        }
346
347        let entries = nodes.finalize().expect("invalid entries");
348
349        let exp_entries: Vec<IngestionEntry> =
350            exp_entries.iter().map(|entry| (*entry).clone()).collect();
351
352        assert_eq!(entries, exp_entries);
353    }
354
355    #[rstest]
356    #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)]
357    #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)]
358    #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)]
359    fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) {
360        let mut nodes = IngestionEntryGraph::new();
361
362        let result = (|| {
363            for entry in in_entries {
364                nodes.add((*entry).clone())?;
365            }
366            nodes.finalize()
367        })();
368
369        let error = result.expect_err("expected error");
370        assert_eq!(error.to_string(), exp_error.to_string());
371    }
372}