snix_castore/directoryservice/
order_validator.rs1use async_stream::try_stream;
2use futures::StreamExt;
3use futures::{Stream, stream::BoxStream};
4use std::collections::{HashMap, HashSet, hash_map};
5use tracing::warn;
6
7use super::Directory;
8use crate::{B3Digest, Node};
9
10#[derive(thiserror::Error, Debug, Eq, PartialEq)]
12pub enum OrderingError {
13 #[error("wrong size {size} for digest {digest}")]
14 WrongSize { digest: B3Digest, size: u64 },
15
16 #[error("unknown digest {digest} referenced for {path_component} in parent {parent_digest}")]
17 UnknownLTR {
18 digest: B3Digest,
19 parent_digest: B3Digest,
20 path_component: crate::PathComponent,
21 },
22
23 #[error("unexpected Directory with digest {0} encountered", directory.digest())]
24 Unexpected { directory: Directory },
25
26 #[error("found more than one pending directory")]
27 MoreThanOnePending(HashSet<B3Digest>),
28
29 #[error("No directories received")]
30 NoNodesReceived,
31}
32
33pub struct RootToLeavesValidator {
50 introduced_directories: HashMap<B3Digest, u64>,
52
53 pending_directories: HashSet<B3Digest>,
55
56 poison: bool,
59}
60
61impl RootToLeavesValidator {
62 pub fn new_with_root(directory: &Directory) -> Self {
65 let mut this = Self {
66 introduced_directories: Default::default(),
67 pending_directories: Default::default(),
68 poison: false,
69 };
70 this.introduce_children_of(directory);
71 this
72 }
73
74 pub fn would_accept(&self, digest: &B3Digest) -> bool {
82 assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
83 self.introduced_directories.contains_key(digest)
84 }
85
86 pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
88 assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
89
90 let size = directory.size();
92 let digest = directory.digest();
93
94 match self.introduced_directories.get(&digest) {
95 Some(s) if *s == size => {
96 if !self.pending_directories.remove(&digest) {
97 warn!("directory received multiple times");
98 };
99 Ok(())
100 }
101 Some(_) => Err(OrderingError::WrongSize { digest, size }),
102 None => Err(OrderingError::Unexpected {
103 directory: directory.clone(),
104 }),
105 }
106 }
107
108 pub fn finalize(mut self) -> Result<(), OrderingError> {
111 if !self.pending_directories.is_empty() {
113 return Err(OrderingError::MoreThanOnePending(
114 self.pending_directories.clone(),
115 ));
116 }
117
118 self.poison = true;
119
120 Ok(())
121 }
122
123 fn introduce_children_of(&mut self, directory: &Directory) {
125 for (_name, node) in directory.nodes() {
126 if let Node::Directory { digest, size } = node {
127 if self
129 .introduced_directories
130 .insert(digest.to_owned(), *size)
131 .is_none()
132 {
133 self.pending_directories.insert(digest.to_owned());
134 }
135 }
136 }
137 }
138
139 pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
143 where
144 S: Stream<Item = Directory> + Send + 's,
145 {
146 let mut directories = directories.boxed();
147
148 Box::pin(try_stream! {
149 if let Some(first_incoming_directory) = directories.next().await {
151 let mut validator = RootToLeavesValidator::new_with_root(&first_incoming_directory);
152
153 while let Some(incoming_directory) = directories.next().await {
154 validator.try_accept(&incoming_directory)?;
155 yield incoming_directory;
156 }
157
158 validator.finalize()?;
159 }
160
161 })
162 }
163}
164
165#[derive(Default)]
166pub struct LeavesToRootValidator {
174 accepted_directories: HashMap<B3Digest, u64>,
176
177 pending_directories: HashSet<B3Digest>,
180
181 #[cfg(debug_assertions)]
183 last_inserted_digest: Option<B3Digest>,
184
185 poison: bool,
188}
189
190impl LeavesToRootValidator {
191 pub fn new() -> Self {
192 Self {
193 accepted_directories: Default::default(),
194 pending_directories: Default::default(),
195 #[cfg(debug_assertions)]
196 last_inserted_digest: None,
197 poison: false,
198 }
199 }
200
201 pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
203 assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
204
205 for (name, node) in directory.nodes() {
208 if let Node::Directory { digest, size } = node {
209 match self.accepted_directories.get(digest) {
210 Some(s) if s == size => {
211 self.pending_directories.remove(digest);
212 }
213 Some(s) => {
214 self.poison = true;
215 Err(OrderingError::WrongSize {
216 digest: digest.to_owned(),
217 size: *s,
218 })?
219 }
220 None => {
221 self.poison = true;
222 Err(OrderingError::UnknownLTR {
223 digest: digest.to_owned(),
224 parent_digest: directory.digest(),
225 path_component: name.to_owned(),
226 })?
227 }
228 }
229 }
230 }
231
232 let directory_digest = directory.digest();
235 match self.accepted_directories.entry(directory_digest.clone()) {
236 hash_map::Entry::Occupied(_) => {
237 warn!("directory received multiple times");
238 }
239 hash_map::Entry::Vacant(entry) => {
240 entry.insert(directory.size());
241 #[cfg(debug_assertions)]
242 {
243 self.last_inserted_digest = Some(directory_digest.clone())
244 }
245 self.pending_directories.insert(directory_digest);
246 }
247 }
248
249 Ok(())
250 }
251
252 #[allow(unused_mut)]
255 pub fn finalize(mut self) -> Result<(), OrderingError> {
256 assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
257
258 if self.accepted_directories.is_empty() {
259 return Err(OrderingError::NoNodesReceived);
260 }
261
262 if self.pending_directories.len() != 1 {
265 Err(OrderingError::MoreThanOnePending(
266 self.pending_directories.clone(),
267 ))?
268 }
269 #[cfg(debug_assertions)]
270 {
271 let last_inserted_digest = self
272 .last_inserted_digest
273 .clone()
274 .expect("Snix bug: have dangling_directories, but no last_inserted_digest");
275 self.pending_directories
276 .get(&last_inserted_digest)
277 .expect("Snix bug: dangling directory is not last inserted one");
278 self.poison = true;
279 }
280
281 Ok(())
282 }
283
284 pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
288 where
289 S: Stream<Item = Directory> + Send + 's,
290 {
291 let mut directories = directories.boxed();
292 let mut validator = Self::new();
293
294 Box::pin(try_stream! {
295 while let Some(directory) = directories.next().await {
296 validator.try_accept(&directory)?;
297 yield directory;
298 }
299
300 validator.finalize()?;
301 })
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::{LeavesToRootValidator, RootToLeavesValidator};
308 use crate::directoryservice::Directory;
309 use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
310 use rstest::rstest;
311
312 #[rstest]
313 #[case::empty_directory(&[&*DIRECTORY_A], false, false)]
315 #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, false)]
317 #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, false)]
320 #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, false)]
322 #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, true)]
325 #[case::dangling_pointer(&[&*DIRECTORY_B], true, false)]
327 #[case::empty(&[], false, true)]
329 fn leaves_to_root(
330 #[case] directories_to_upload: &[&Directory],
331 #[case] exp_fail_upload_last: bool,
332 #[case] exp_fail_finalize: bool,
333 ) {
334 let mut validator = LeavesToRootValidator::default();
335 let mut it = directories_to_upload.iter().peekable();
336
337 while let Some(d) = it.next() {
338 if it.peek().is_none() && exp_fail_upload_last {
339 validator
340 .try_accept(d)
341 .expect_err("last try_accept to fail");
342 } else {
343 assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
344 }
345 }
346
347 if !exp_fail_upload_last {
348 if !exp_fail_finalize {
349 validator.finalize().expect("finalize to succeed");
350 } else {
351 let _ = validator.finalize();
352 }
353 }
354 }
355
356 #[rstest]
357 #[case::empty_directory(&*DIRECTORY_A, &[], false)]
359 #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_A], false)]
361 #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_A], false)]
363 #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_B], true)]
365 fn root_to_leaves(
366 #[case] root: &Directory,
367 #[case] directories_to_upload: &[&Directory],
368 #[case] exp_fail_upload_last: bool,
369 ) {
370 let mut validator = RootToLeavesValidator::new_with_root(root);
371 let mut it = directories_to_upload.iter().peekable();
372
373 while let Some(d) = it.next() {
374 if it.peek().is_none() && exp_fail_upload_last {
375 assert!(
376 !validator.would_accept(&d.digest()),
377 "would_accept not expected to accept last failing element"
378 );
379
380 validator
381 .try_accept(d)
382 .expect_err("last try_accept to fail");
383 } else {
384 assert!(
385 validator.would_accept(&d.digest()),
386 "would_accept expected to accept directory"
387 );
388 assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
389 }
390 }
391
392 if !exp_fail_upload_last {
393 validator.finalize().expect("finalize to succeed");
394 }
395 }
396}