snix_castore/directoryservice/
order_validator.rs1use async_stream::try_stream;
2use futures::StreamExt;
3use futures::{Stream, stream::BoxStream};
4use std::collections::{HashMap, HashSet, hash_map};
5use tracing::warn;
6
7use super::Directory;
8use crate::{B3Digest, Node};
9
10#[derive(thiserror::Error, Debug, Eq, PartialEq)]
12pub enum OrderingError {
13 #[error("wrong size {size} for digest {digest}")]
14 WrongSize { digest: B3Digest, size: u64 },
15
16 #[error("unknown digest {digest} referenced for {path_component} in parent {parent_digest}")]
17 UnknownLTR {
18 digest: B3Digest,
19 parent_digest: B3Digest,
20 path_component: crate::PathComponent,
21 },
22
23 #[error("unexpected Directory with digest {0} encountered", directory.digest())]
24 Unexpected { directory: Directory },
25
26 #[error("some directories missing")]
27 DirectoriesMissing(HashSet<B3Digest>),
28
29 #[error("no directories received")]
30 EmptySet,
31}
32
33pub struct RootToLeavesValidator {
50 root_digest: B3Digest,
52
53 referenced_directories: HashMap<B3Digest, u64>,
55
56 pending_directories: HashSet<B3Digest>,
60
61 poison: bool,
64}
65
66impl RootToLeavesValidator {
67 pub fn new_with_root_digest(root_digest: B3Digest) -> Self {
70 Self {
71 root_digest,
72 referenced_directories: HashMap::default(),
73 pending_directories: HashSet::from_iter([root_digest]),
74 poison: false,
75 }
76 }
77
78 pub fn would_accept(&self, digest: &B3Digest) -> bool {
86 assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
87 digest == &self.root_digest || self.referenced_directories.contains_key(digest)
88 }
89
90 pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
92 assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
93
94 let size = directory.size();
95 let digest = directory.digest();
96
97 match self.referenced_directories.get(&digest) {
99 Some(s) if *s == size => {
100 if !self.pending_directories.remove(&digest) {
101 warn!("directory received multiple times");
102 };
103
104 self.introduce_children_of(directory);
106 Ok(())
107 }
108 Some(_) => {
109 self.poison = true;
110 Err(OrderingError::WrongSize { digest, size })
111 }
112 None if digest == self.root_digest => {
114 self.introduce_children_of(directory);
116 self.pending_directories.remove(&self.root_digest);
117 Ok(())
118 }
119 None => {
120 self.poison = true;
121 Err(OrderingError::Unexpected {
122 directory: directory.clone(),
123 })
124 }
125 }
126 }
127
128 pub fn finalize(mut self) -> Result<(), OrderingError> {
131 if !self.pending_directories.is_empty() {
133 return Err(OrderingError::DirectoriesMissing(
134 self.pending_directories.clone(),
135 ));
136 }
137
138 self.poison = true;
139
140 Ok(())
141 }
142
143 fn introduce_children_of(&mut self, directory: &Directory) {
145 for (_name, node) in directory.nodes() {
146 if let Node::Directory { digest, size } = node {
147 if self
149 .referenced_directories
150 .insert(digest.to_owned(), *size)
151 .is_none()
152 {
153 self.pending_directories.insert(digest.to_owned());
154 }
155 }
156 }
157 }
158
159 pub fn validate_stream<'s, S>(
164 root_digest: B3Digest,
165 directories: S,
166 ) -> BoxStream<'s, Result<Directory, OrderingError>>
167 where
168 S: Stream<Item = Directory> + Send + 's,
169 {
170 let mut validator = RootToLeavesValidator::new_with_root_digest(root_digest);
171 let mut directories = directories.boxed();
172
173 Box::pin(try_stream! {
174 while let Some(directory) = directories.next().await {
175 validator.try_accept(&directory)?;
176 yield directory;
177 }
178 validator.finalize()?;
179 })
180 }
181}
182
183#[derive(Default)]
184pub struct LeavesToRootValidator {
192 accepted_directories: HashMap<B3Digest, u64>,
194
195 pending_directories: HashSet<B3Digest>,
198
199 #[cfg(debug_assertions)]
201 last_inserted_digest: Option<B3Digest>,
202
203 poison: bool,
206}
207
208impl LeavesToRootValidator {
209 pub fn new() -> Self {
210 Self {
211 accepted_directories: Default::default(),
212 pending_directories: Default::default(),
213 #[cfg(debug_assertions)]
214 last_inserted_digest: None,
215 poison: false,
216 }
217 }
218
219 pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
221 assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
222
223 for (name, node) in directory.nodes() {
226 if let Node::Directory { digest, size } = node {
227 match self.accepted_directories.get(digest) {
228 Some(s) if s == size => {
229 self.pending_directories.remove(digest);
230 }
231 Some(s) => {
232 self.poison = true;
233 Err(OrderingError::WrongSize {
234 digest: digest.to_owned(),
235 size: *s,
236 })?
237 }
238 None => {
239 self.poison = true;
240 Err(OrderingError::UnknownLTR {
241 digest: digest.to_owned(),
242 parent_digest: directory.digest(),
243 path_component: name.to_owned(),
244 })?
245 }
246 }
247 }
248 }
249
250 let directory_digest = directory.digest();
253 match self.accepted_directories.entry(directory_digest) {
254 hash_map::Entry::Occupied(_) => {
255 warn!("directory received multiple times");
256 }
257 hash_map::Entry::Vacant(entry) => {
258 entry.insert(directory.size());
259 #[cfg(debug_assertions)]
260 {
261 self.last_inserted_digest = Some(directory_digest)
262 }
263 self.pending_directories.insert(directory_digest);
264 }
265 }
266
267 Ok(())
268 }
269
270 #[allow(unused_mut)]
273 pub fn finalize(mut self) -> Result<(), OrderingError> {
274 assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
275
276 if self.accepted_directories.is_empty() {
277 return Err(OrderingError::EmptySet);
278 }
279
280 if self.pending_directories.len() != 1 {
283 Err(OrderingError::DirectoriesMissing(
284 self.pending_directories.clone(),
285 ))?
286 }
287 #[cfg(debug_assertions)]
288 {
289 let last_inserted_digest = self
290 .last_inserted_digest
291 .expect("Snix bug: have dangling_directories, but no last_inserted_digest");
292 self.pending_directories
293 .get(&last_inserted_digest)
294 .expect("Snix bug: dangling directory is not last inserted one");
295 self.poison = true;
296 }
297
298 Ok(())
299 }
300
301 pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
305 where
306 S: Stream<Item = Directory> + Send + 's,
307 {
308 let mut directories = directories.boxed();
309 let mut validator = Self::new();
310
311 Box::pin(try_stream! {
312 while let Some(directory) = directories.next().await {
313 validator.try_accept(&directory)?;
314 yield directory;
315 }
316
317 validator.finalize()?;
318 })
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::{LeavesToRootValidator, RootToLeavesValidator};
325 use crate::directoryservice::Directory;
326 use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D, DIRECTORY_E};
327 use futures::TryStreamExt;
328 use rstest::rstest;
329
330 #[rstest]
331 #[case::empty_directory(&[&*DIRECTORY_A], false, false)]
333 #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, false)]
335 #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, false)]
338 #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, false)]
340 #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, true)]
343 #[case::dangling_pointer(&[&*DIRECTORY_B], true, false)]
345 #[case::empty(&[], false, true)]
347 fn leaves_to_root(
348 #[case] directories_to_upload: &[&Directory],
349 #[case] exp_fail_upload_last: bool,
350 #[case] exp_fail_finalize: bool,
351 ) {
352 let mut validator = LeavesToRootValidator::default();
353 let mut it = directories_to_upload.iter().peekable();
354
355 while let Some(d) = it.next() {
356 if it.peek().is_none() && exp_fail_upload_last {
357 validator
358 .try_accept(d)
359 .expect_err("last try_accept to fail");
360 } else {
361 assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
362 }
363 }
364
365 if !exp_fail_upload_last {
366 if !exp_fail_finalize {
367 validator.finalize().expect("finalize to succeed");
368 } else {
369 let _ = validator.finalize();
370 }
371 }
372 }
373
374 #[rstest]
375 #[case::empty_directory(&[&*DIRECTORY_A], false)]
377 #[case::simple_closure(&[&*DIRECTORY_B, &*DIRECTORY_A], false)]
379 #[case::same_child_dedup(&[&*DIRECTORY_C, &*DIRECTORY_A], false)]
381 #[case::same_child_redundant(&[&*DIRECTORY_C, &*DIRECTORY_A, &*DIRECTORY_A], false)]
383 #[case::with_root_sent_twice(&[&*DIRECTORY_C, &*DIRECTORY_C, &*DIRECTORY_A], false)]
385 #[case::more_levels(&[&*DIRECTORY_E, &*DIRECTORY_D, &*DIRECTORY_A, &*DIRECTORY_B], false)]
387 #[case::unconnected_node(&[&*DIRECTORY_C, &*DIRECTORY_B], true)]
389 fn root_to_leaves(
390 #[case] directories_to_upload: &[&Directory],
391 #[case] exp_fail_upload_last: bool,
392 ) {
393 let root_digest = directories_to_upload[0].digest();
394 let mut validator = RootToLeavesValidator::new_with_root_digest(root_digest);
395 let mut it = directories_to_upload.iter().peekable();
396
397 while let Some(d) = it.next() {
398 if it.peek().is_none() && exp_fail_upload_last {
399 assert!(
400 !validator.would_accept(&d.digest()),
401 "would_accept not expected to accept last failing element"
402 );
403
404 validator
405 .try_accept(d)
406 .expect_err("last try_accept to fail");
407 } else {
408 assert!(
409 validator.would_accept(&d.digest()),
410 "would_accept expected to accept directory"
411 );
412 assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
413 }
414 }
415
416 if !exp_fail_upload_last {
417 validator.finalize().expect("finalize to succeed");
418 }
419 }
420
421 #[test]
422 fn root_to_leaves_root_mismatch() {
424 let mut validator = RootToLeavesValidator::new_with_root_digest(DIRECTORY_A.digest());
425
426 validator
427 .try_accept(&DIRECTORY_B)
428 .expect_err("shouldn't accept wrong first directory");
429 validator.finalize().expect_err("expect finalize to fail");
430 }
431
432 #[tokio::test]
433 async fn root_to_leaves_stream() {
434 let directories_to_upload = vec![
435 DIRECTORY_E.to_owned(),
436 DIRECTORY_D.to_owned(),
437 DIRECTORY_A.to_owned(),
438 DIRECTORY_B.to_owned(),
439 ];
440 let root_digest = directories_to_upload[0].digest();
441
442 let validated_stream = RootToLeavesValidator::validate_stream(
443 root_digest,
444 futures::stream::iter(directories_to_upload.iter().map(|d| (*d).to_owned())),
445 );
446
447 let validated_directories: Vec<Directory> = validated_stream
448 .try_collect()
449 .await
450 .expect("stream to collect successfully");
451
452 assert_eq!(directories_to_upload, validated_directories);
453
454 RootToLeavesValidator::validate_stream(root_digest, futures::stream::empty())
455 .try_collect::<Vec<_>>()
456 .await
457 .expect_err("an empty stream to fail");
458 }
459}