snix_castore/directoryservice/
directory_graph.rs1use std::collections::HashMap;
2
3use petgraph::{
4 Direction, Incoming,
5 graph::{DiGraph, NodeIndex},
6 visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker},
7};
8use tracing::instrument;
9
10use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
11use crate::{B3Digest, Directory, Node, path::PathComponent};
12
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15 #[error("{0}")]
16 ValidationError(String),
17}
18
19struct EdgeWeight {
20 name: PathComponent,
21 size: u64,
22}
23
24#[derive(Default)]
52pub struct DirectoryGraph<O> {
53 graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
62
63 digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
65
66 order_validator: O,
67}
68
69pub struct ValidatedDirectoryGraph {
70 graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
71
72 root: Option<NodeIndex>,
73}
74
75fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> {
76 if edge.size != child.size() {
78 return Err(Error::ValidationError(format!(
79 "'{}' has wrong size, specified {}, recorded {}",
80 edge.name,
81 edge.size,
82 child.size(),
83 )));
84 }
85 Ok(())
86}
87
88impl DirectoryGraph<LeavesToRootValidator> {
89 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
91 pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
92 if !self.order_validator.add_directory(&directory) {
93 return Err(Error::ValidationError(
94 "unknown directory was referenced".into(),
95 ));
96 }
97 self.add_order_unchecked(directory)
98 }
99}
100
101impl DirectoryGraph<RootToLeavesValidator> {
102 pub fn digest_allowed(&self, digest: B3Digest) -> bool {
106 self.order_validator.digest_allowed(&digest)
107 }
108
109 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
111 pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
112 let digest = directory.digest();
113 if !self.order_validator.digest_allowed(&digest) {
114 return Err(Error::ValidationError("unexpected digest".into()));
115 }
116 self.order_validator.add_directory_unchecked(&directory);
117 self.add_order_unchecked(directory)
118 }
119}
120
121impl<O: OrderValidator> DirectoryGraph<O> {
122 pub fn with_order(order_validator: O) -> Self {
124 Self {
125 graph: Default::default(),
126 digest_to_node_ix: Default::default(),
127 order_validator,
128 }
129 }
130
131 pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
133 let digest = directory.digest();
134
135 let ix = *self
137 .digest_to_node_ix
138 .entry(digest)
139 .or_insert_with(|| self.graph.add_node(None));
140
141 if self.graph[ix].is_some() {
142 return Ok(());
144 }
145
146 for (name, node) in directory.nodes() {
148 if let Node::Directory { digest, size } = node {
149 let child_ix = *self
150 .digest_to_node_ix
151 .entry(digest.clone())
152 .or_insert_with(|| self.graph.add_node(None));
153
154 let pending_edge_check = match &self.graph[child_ix] {
155 Some(child) => {
156 check_edge(
158 &EdgeWeight {
159 name: name.clone(),
160 size: *size,
161 },
162 child,
163 )?;
164 None
165 }
166 None => Some(EdgeWeight {
167 name: name.clone(),
168 size: *size,
169 }), };
171 self.graph.add_edge(ix, child_ix, pending_edge_check);
172 }
173 }
174
175 for edge_id in self
178 .graph
179 .edges_directed(ix, Direction::Incoming)
180 .map(|edge_ref| edge_ref.id())
181 .collect::<Vec<_>>()
182 .into_iter()
183 {
184 let edge_weight = self
185 .graph
186 .edge_weight_mut(edge_id)
187 .expect("edge not found")
188 .take()
189 .expect("edge is already validated");
190
191 check_edge(&edge_weight, &directory)?;
192 }
193
194 self.graph[ix] = Some(directory);
196
197 Ok(())
198 }
199
200 #[instrument(level = "trace", skip_all, err)]
201 pub fn validate(self) -> Result<ValidatedDirectoryGraph, Error> {
202 let mut roots = self
204 .graph
205 .node_identifiers()
206 .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none());
207
208 let root = roots.next();
209 if roots.next().is_some() {
210 return Err(Error::ValidationError(
211 "graph has disconnected roots".into(),
212 ));
213 }
214
215 if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) {
217 return Err(Error::ValidationError("graph is incomplete".into()));
218 }
219
220 Ok(ValidatedDirectoryGraph {
221 graph: self.graph,
222 root,
223 })
224 }
225}
226
227impl ValidatedDirectoryGraph {
228 #[instrument(level = "trace", skip_all)]
233 pub fn drain_root_to_leaves(self) -> impl Iterator<Item = Directory> {
234 let order = match self.root {
235 Some(root) => {
236 Bfs::new(&self.graph, root)
238 .iter(&self.graph)
239 .collect::<Vec<_>>()
240 }
241 None => vec![], };
243
244 let (mut nodes, _edges) = self.graph.into_nodes_edges();
245
246 order
247 .into_iter()
248 .filter_map(move |i| nodes[i.index()].weight.take())
249 }
250
251 #[instrument(level = "trace", skip_all)]
256 pub fn drain_leaves_to_root(self) -> impl Iterator<Item = Directory> {
257 let order = match self.root {
258 Some(root) => {
259 DfsPostOrder::new(&self.graph, root)
261 .iter(&self.graph)
262 .collect::<Vec<_>>()
263 }
264 None => vec![], };
266
267 let (mut nodes, _edges) = self.graph.into_nodes_edges();
268
269 order
270 .into_iter()
271 .filter_map(move |i| nodes[i.index()].weight.take())
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
278 use crate::{Directory, Node};
279 use rstest::rstest;
280 use std::sync::LazyLock;
281
282 use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
283
284 pub static BROKEN_PARENT_DIRECTORY: LazyLock<Directory> = LazyLock::new(|| {
285 Directory::try_from_iter([(
286 "foo".try_into().unwrap(),
287 Node::Directory {
288 digest: DIRECTORY_A.digest(),
289 size: DIRECTORY_A.size() + 42, },
291 )])
292 .unwrap()
293 });
294
295 #[rstest]
296 #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
298 #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
300 #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
303 #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
305 #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
308 #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
310 #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
312 fn test_uploads(
313 #[case] directories_to_upload: &[&Directory],
314 #[case] exp_fail_upload_last: bool,
315 #[case] exp_finalize: Option<Vec<&Directory>>, ) {
317 let mut dcv = DirectoryGraph::<LeavesToRootValidator>::default();
318 let len_directories_to_upload = directories_to_upload.len();
319
320 for (i, d) in directories_to_upload.iter().enumerate() {
321 let resp = dcv.add((*d).clone());
322 if i == len_directories_to_upload - 1 && exp_fail_upload_last {
323 assert!(resp.is_err(), "expect last put to fail");
324
325 return;
328 } else {
329 assert!(resp.is_ok(), "expect put to succeed");
330 }
331 }
332
333 let resp = dcv
335 .validate()
336 .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
337
338 match exp_finalize {
339 Some(directories) => {
340 assert_eq!(
341 Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
342 resp.expect("drain should succeed")
343 );
344 }
345 None => {
346 resp.expect_err("drain should fail");
347 }
348 }
349 }
350
351 #[rstest]
352 #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
354 #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
356 #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
358 #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
360 #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
362 #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
364 fn test_downloads(
365 #[case] root: &Directory,
366 #[case] directories_to_upload: &[&Directory],
367 #[case] exp_fail_upload_last: bool,
368 #[case] exp_finalize: Option<Vec<&Directory>>, ) {
370 let mut dcv =
371 DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest()));
372 let len_directories_to_upload = directories_to_upload.len();
373
374 for (i, d) in directories_to_upload.iter().enumerate() {
375 let resp = dcv.add((*d).clone());
376 if i == len_directories_to_upload - 1 && exp_fail_upload_last {
377 assert!(resp.is_err(), "expect last put to fail");
378
379 return;
382 } else {
383 assert!(resp.is_ok(), "expect put to succeed");
384 }
385 }
386
387 let resp = dcv
389 .validate()
390 .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
391
392 match exp_finalize {
393 Some(directories) => {
394 assert_eq!(
395 Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
396 resp.expect("drain should succeed")
397 );
398 }
399 None => {
400 resp.expect_err("drain should fail");
401 }
402 }
403 }
404}