snix_castore/directoryservice/
directory_graph.rs

1use std::collections::HashMap;
2
3use petgraph::{
4    Direction, Incoming,
5    graph::{DiGraph, NodeIndex},
6    visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker},
7};
8use tracing::instrument;
9
10use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
11use crate::{B3Digest, Directory, Node, path::PathComponent};
12
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15    #[error("{0}")]
16    ValidationError(String),
17}
18
19struct EdgeWeight {
20    name: PathComponent,
21    size: u64,
22}
23
24/// This can be used to validate and/or re-order a Directory closure (DAG of
25/// connected Directories), and their insertion order.
26///
27/// The DirectoryGraph is parametrized on the insertion order, and can be
28/// constructed using the Default trait, or using `with_order` if the
29/// OrderValidator needs to be customized.
30///
31/// If the user is receiving directories from canonical protobuf encoding in
32/// root-to-leaves order, and parsing them, she can call `digest_allowed`
33/// _before_ parsing the protobuf record and then add it with `add_unchecked`.
34/// All other users insert the directories via `add`, in their specified order.
35/// During insertion, we validate as much as we can at that time:
36///
37///  - individual validation of Directory messages
38///  - validation of insertion order
39///  - validation of size fields of referred Directories
40///
41/// Internally it keeps all received Directories in a directed graph,
42/// with node weights being the Directories and edges pointing to child/parent
43/// directories.
44///
45/// Once all Directories have been inserted, a validate function can be
46/// called to perform a check for graph connectivity and ensure there's no
47/// disconnected components or missing nodes.
48/// Finally, the `drain_leaves_to_root` or `drain_root_to_leaves` can be
49/// _chained_ on validate to get an iterator over the (deduplicated and)
50/// validated list of directories in either order.
51#[derive(Default)]
52pub struct DirectoryGraph<O> {
53    // A directed graph, using Directory as node weight.
54    // Edges point from parents to children.
55    //
56    // Nodes with None weigths might exist when a digest has been referred to but the directory
57    // with this digest has not yet been sent.
58    //
59    // The option in the edge weight tracks the pending validation state of the respective edge, for example if
60    // the child has not been added yet.
61    graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
62
63    // A lookup table from directory digest to node index.
64    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
65
66    order_validator: O,
67}
68
69pub struct ValidatedDirectoryGraph {
70    graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
71
72    root: Option<NodeIndex>,
73}
74
75fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> {
76    // Ensure the size specified in the child node matches our records.
77    if edge.size != child.size() {
78        return Err(Error::ValidationError(format!(
79            "'{}' has wrong size, specified {}, recorded {}",
80            edge.name,
81            edge.size,
82            child.size(),
83        )));
84    }
85    Ok(())
86}
87
88impl DirectoryGraph<LeavesToRootValidator> {
89    /// Insert a new Directory into the closure
90    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
91    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
92        if !self.order_validator.add_directory(&directory) {
93            return Err(Error::ValidationError(
94                "unknown directory was referenced".into(),
95            ));
96        }
97        self.add_order_unchecked(directory)
98    }
99}
100
101impl DirectoryGraph<RootToLeavesValidator> {
102    /// If the user is parsing directories from canonical protobuf encoding, she can
103    /// call `digest_allowed` _before_ parsing the protobuf record and then add it
104    /// with `add_unchecked`.
105    pub fn digest_allowed(&self, digest: B3Digest) -> bool {
106        self.order_validator.digest_allowed(&digest)
107    }
108
109    /// Insert a new Directory into the closure
110    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
111    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
112        let digest = directory.digest();
113        if !self.order_validator.digest_allowed(&digest) {
114            return Err(Error::ValidationError("unexpected digest".into()));
115        }
116        self.order_validator.add_directory_unchecked(&directory);
117        self.add_order_unchecked(directory)
118    }
119}
120
121impl<O: OrderValidator> DirectoryGraph<O> {
122    /// Customize the ordering, i.e. for pre-setting the root of the RootToLeavesValidator
123    pub fn with_order(order_validator: O) -> Self {
124        Self {
125            graph: Default::default(),
126            digest_to_node_ix: Default::default(),
127            order_validator,
128        }
129    }
130
131    /// Adds a directory which has already been confirmed to be in-order to the graph
132    pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
133        let digest = directory.digest();
134
135        // Teach the graph about the existence of a node with this digest
136        let ix = *self
137            .digest_to_node_ix
138            .entry(digest)
139            .or_insert_with(|| self.graph.add_node(None));
140
141        if self.graph[ix].is_some() {
142            // The node is already in the graph, there is nothing to do here.
143            return Ok(());
144        }
145
146        // set up edges to all child directories
147        for (name, node) in directory.nodes() {
148            if let Node::Directory { digest, size } = node {
149                let child_ix = *self
150                    .digest_to_node_ix
151                    .entry(digest.clone())
152                    .or_insert_with(|| self.graph.add_node(None));
153
154                let pending_edge_check = match &self.graph[child_ix] {
155                    Some(child) => {
156                        // child is already available, validate the edge now
157                        check_edge(
158                            &EdgeWeight {
159                                name: name.clone(),
160                                size: *size,
161                            },
162                            child,
163                        )?;
164                        None
165                    }
166                    None => Some(EdgeWeight {
167                        name: name.clone(),
168                        size: *size,
169                    }), // pending validation
170                };
171                self.graph.add_edge(ix, child_ix, pending_edge_check);
172            }
173        }
174
175        // validate the edges from parents to this node
176        // this collects edge ids in a Vec because there is no edges_directed_mut :'c
177        for edge_id in self
178            .graph
179            .edges_directed(ix, Direction::Incoming)
180            .map(|edge_ref| edge_ref.id())
181            .collect::<Vec<_>>()
182            .into_iter()
183        {
184            let edge_weight = self
185                .graph
186                .edge_weight_mut(edge_id)
187                .expect("edge not found")
188                .take()
189                .expect("edge is already validated");
190
191            check_edge(&edge_weight, &directory)?;
192        }
193
194        // finally, store the directory information in the node weight
195        self.graph[ix] = Some(directory);
196
197        Ok(())
198    }
199
200    #[instrument(level = "trace", skip_all, err)]
201    pub fn validate(self) -> Result<ValidatedDirectoryGraph, Error> {
202        // find all initial nodes (nodes without incoming edges)
203        let mut roots = self
204            .graph
205            .node_identifiers()
206            .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none());
207
208        let root = roots.next();
209        if roots.next().is_some() {
210            return Err(Error::ValidationError(
211                "graph has disconnected roots".into(),
212            ));
213        }
214
215        // test that the graph is complete
216        if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) {
217            return Err(Error::ValidationError("graph is incomplete".into()));
218        }
219
220        Ok(ValidatedDirectoryGraph {
221            graph: self.graph,
222            root,
223        })
224    }
225}
226
227impl ValidatedDirectoryGraph {
228    /// Return the list of directories in from-root-to-leaves order.
229    /// In case no elements have been inserted, returns an empty list.
230    ///
231    /// panics if the specified root is not in the graph
232    #[instrument(level = "trace", skip_all)]
233    pub fn drain_root_to_leaves(self) -> impl Iterator<Item = Directory> {
234        let order = match self.root {
235            Some(root) => {
236                // do a BFS traversal of the graph, starting with the root node
237                Bfs::new(&self.graph, root)
238                    .iter(&self.graph)
239                    .collect::<Vec<_>>()
240            }
241            None => vec![], // No nodes have been inserted, do not traverse
242        };
243
244        let (mut nodes, _edges) = self.graph.into_nodes_edges();
245
246        order
247            .into_iter()
248            .filter_map(move |i| nodes[i.index()].weight.take())
249    }
250
251    /// Return the list of directories in from-leaves-to-root order.
252    /// In case no elements have been inserted, returns an empty list.
253    ///
254    /// panics when the specified root is not in the graph
255    #[instrument(level = "trace", skip_all)]
256    pub fn drain_leaves_to_root(self) -> impl Iterator<Item = Directory> {
257        let order = match self.root {
258            Some(root) => {
259                // do a DFS Post-Order traversal of the graph, starting with the root node
260                DfsPostOrder::new(&self.graph, root)
261                    .iter(&self.graph)
262                    .collect::<Vec<_>>()
263            }
264            None => vec![], // No nodes have been inserted, do not traverse
265        };
266
267        let (mut nodes, _edges) = self.graph.into_nodes_edges();
268
269        order
270            .into_iter()
271            .filter_map(move |i| nodes[i.index()].weight.take())
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
278    use crate::{Directory, Node};
279    use rstest::rstest;
280    use std::sync::LazyLock;
281
282    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
283
284    pub static BROKEN_PARENT_DIRECTORY: LazyLock<Directory> = LazyLock::new(|| {
285        Directory::try_from_iter([(
286            "foo".try_into().unwrap(),
287            Node::Directory {
288                digest: DIRECTORY_A.digest(),
289                size: DIRECTORY_A.size() + 42, // wrong!
290            },
291        )])
292        .unwrap()
293    });
294
295    #[rstest]
296    /// Uploading an empty directory should succeed.
297    #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
298    /// Uploading A, then B (referring to A) should succeed.
299    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
300    /// Uploading A, then A, then C (referring to A twice) should succeed.
301    /// We pretend to be a dumb client not deduping directories.
302    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
303    /// Uploading A, then C (referring to A twice) should succeed.
304    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
305    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
306    /// as B itself would be left unconnected.
307    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
308    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
309    #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
310    /// Uploading a directory which refers to another Directory with a wrong size should fail.
311    #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
312    fn test_uploads(
313        #[case] directories_to_upload: &[&Directory],
314        #[case] exp_fail_upload_last: bool,
315        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
316    ) {
317        let mut dcv = DirectoryGraph::<LeavesToRootValidator>::default();
318        let len_directories_to_upload = directories_to_upload.len();
319
320        for (i, d) in directories_to_upload.iter().enumerate() {
321            let resp = dcv.add((*d).clone());
322            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
323                assert!(resp.is_err(), "expect last put to fail");
324
325                // We don't really care anymore what finalize() would return, as
326                // the add() failed.
327                return;
328            } else {
329                assert!(resp.is_ok(), "expect put to succeed");
330            }
331        }
332
333        // everything was uploaded successfully. Test finalize().
334        let resp = dcv
335            .validate()
336            .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
337
338        match exp_finalize {
339            Some(directories) => {
340                assert_eq!(
341                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
342                    resp.expect("drain should succeed")
343                );
344            }
345            None => {
346                resp.expect_err("drain should fail");
347            }
348        }
349    }
350
351    #[rstest]
352    /// Downloading an empty directory should succeed.
353    #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
354    /// Downlading B, then A (referenced by B) should succeed.
355    #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
356    /// Downloading C (referring to A twice), then A should succeed.
357    #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
358    /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root)
359    #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
360    /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
361    #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
362    /// Downloading a directory which refers to another Directory with a wrong size should fail.
363    #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
364    fn test_downloads(
365        #[case] root: &Directory,
366        #[case] directories_to_upload: &[&Directory],
367        #[case] exp_fail_upload_last: bool,
368        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
369    ) {
370        let mut dcv =
371            DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest()));
372        let len_directories_to_upload = directories_to_upload.len();
373
374        for (i, d) in directories_to_upload.iter().enumerate() {
375            let resp = dcv.add((*d).clone());
376            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
377                assert!(resp.is_err(), "expect last put to fail");
378
379                // We don't really care anymore what finalize() would return, as
380                // the add() failed.
381                return;
382            } else {
383                assert!(resp.is_ok(), "expect put to succeed");
384            }
385        }
386
387        // everything was uploaded successfully. Test finalize().
388        let resp = dcv
389            .validate()
390            .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
391
392        match exp_finalize {
393            Some(directories) => {
394                assert_eq!(
395                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
396                    resp.expect("drain should succeed")
397                );
398            }
399            None => {
400                resp.expect_err("drain should fail");
401            }
402        }
403    }
404}