snix_castore/directoryservice/
order_validator.rs

1use async_stream::try_stream;
2use futures::StreamExt;
3use futures::{Stream, stream::BoxStream};
4use std::collections::{HashMap, HashSet, hash_map};
5use tracing::warn;
6
7use super::Directory;
8use crate::{B3Digest, Node};
9
10/// Emitted when directories are sent in the wrong order
11#[derive(thiserror::Error, Debug, Eq, PartialEq)]
12pub enum OrderingError {
13    #[error("wrong size {size} for digest {digest}")]
14    WrongSize { digest: B3Digest, size: u64 },
15
16    #[error("unknown digest {digest} referenced for {path_component} in parent {parent_digest}")]
17    UnknownLTR {
18        digest: B3Digest,
19        parent_digest: B3Digest,
20        path_component: crate::PathComponent,
21    },
22
23    #[error("unexpected Directory with digest {0} encountered", directory.digest())]
24    Unexpected { directory: Directory },
25
26    #[error("some directories missing")]
27    DirectoriesMissing(HashSet<B3Digest>),
28
29    #[error("no directories received")]
30    EmptySet,
31}
32
33/// A struct holding state while consuming a sequence of Directories in
34/// Root-To-Leaves order.
35///
36/// It allows querying whether a certain digest could be acceptable
37/// (to be able to skip parsing entirely if present in serialized form)
38///
39/// Validates that newly received directories are already referenced from
40/// the root via existing directories.
41/// It also ensures the actual directory sizes are the same as the sizes
42/// communicated previously alongside the pointers.
43/// Commonly used when _receiving_ a directory closure _from_ a store.
44///
45/// Internally keeps a list of digests introduced (pointers in previously
46/// received directories), to recognize getting sent unrelated directories,
47/// as well as a list of introduced, but not yet received digest (to detect
48/// still-missing directories).
49pub struct RootToLeavesValidator {
50    /// the expected root digest
51    root_digest: B3Digest,
52
53    /// references of (directory digest, size) we have seen so far.
54    referenced_directories: HashMap<B3Digest, u64>,
55
56    /// Directories we still wait to receive, because we heard about them but
57    /// didn't accept them yet.
58    /// These consist of the root digest and any subset of `referenced_directories`.
59    pending_directories: HashSet<B3Digest>,
60
61    /// tracks whether [Self::finalize] has been called,
62    /// or an error has occured while trying to accept.
63    poison: bool,
64}
65
66impl RootToLeavesValidator {
67    /// Initialize with an expected root directory
68    /// That directory should be sent next.
69    pub fn new_with_root_digest(root_digest: B3Digest) -> Self {
70        Self {
71            root_digest,
72            referenced_directories: HashMap::default(),
73            pending_directories: HashSet::from_iter([root_digest]),
74            poison: false,
75        }
76    }
77
78    /// Checks a directory digest on whether it's introduced.
79    /// Particularly useful when receiving directories in canonical protobuf
80    /// encoding, so that directories not connected to the root can be rejected
81    /// without parsing.
82    ///
83    /// After parsing, the directory can be passed to [Self::try_accept]
84    /// to add its children to the list of expected digests.
85    pub fn would_accept(&self, digest: &B3Digest) -> bool {
86        assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
87        digest == &self.root_digest || self.referenced_directories.contains_key(digest)
88    }
89
90    /// Accepts a directory if previously introduced, or returns an error if it's unknown.
91    pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
92        assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
93
94        let size = directory.size();
95        let digest = directory.digest();
96
97        // Every incoming directory must already have been introduced.
98        match self.referenced_directories.get(&digest) {
99            Some(s) if *s == size => {
100                if !self.pending_directories.remove(&digest) {
101                    warn!("directory received multiple times");
102                };
103
104                // Introduce children
105                self.introduce_children_of(directory);
106                Ok(())
107            }
108            Some(_) => {
109                self.poison = true;
110                Err(OrderingError::WrongSize { digest, size })
111            }
112            // The root may be inserted even if's not in self.referenced_directories.
113            None if digest == self.root_digest => {
114                // Introduce children
115                self.introduce_children_of(directory);
116                self.pending_directories.remove(&self.root_digest);
117                Ok(())
118            }
119            None => {
120                self.poison = true;
121                Err(OrderingError::Unexpected {
122                    directory: directory.clone(),
123                })
124            }
125        }
126    }
127
128    /// Should be called after accepting the last Directory
129    /// Ensures there's no more pending directories.
130    pub fn finalize(mut self) -> Result<(), OrderingError> {
131        // At the end of the stream, pending must be empty.
132        if !self.pending_directories.is_empty() {
133            return Err(OrderingError::DirectoriesMissing(
134                self.pending_directories.clone(),
135            ));
136        }
137
138        self.poison = true;
139
140        Ok(())
141    }
142
143    // Adds each child node to introduced_directories and pending_directories.
144    fn introduce_children_of(&mut self, directory: &Directory) {
145        for (_name, node) in directory.nodes() {
146            if let Node::Directory { digest, size } = node {
147                // if there's a pointer to a new directory
148                if self
149                    .referenced_directories
150                    .insert(digest.to_owned(), *size)
151                    .is_none()
152                {
153                    self.pending_directories.insert(digest.to_owned());
154                }
155            }
156        }
157    }
158
159    /// This receives a stream of Directories, validating them to be in Root-To-Leaves order.
160    /// The expected root digest needs to be passed in.
161    /// If the order is correct, they are yielded wrapped in an Ok().
162    /// If not, we yield an error.
163    pub fn validate_stream<'s, S>(
164        root_digest: B3Digest,
165        directories: S,
166    ) -> BoxStream<'s, Result<Directory, OrderingError>>
167    where
168        S: Stream<Item = Directory> + Send + 's,
169    {
170        let mut validator = RootToLeavesValidator::new_with_root_digest(root_digest);
171        let mut directories = directories.boxed();
172
173        Box::pin(try_stream! {
174            while let Some(directory) = directories.next().await {
175                        validator.try_accept(&directory)?;
176                        yield directory;
177            }
178            validator.finalize()?;
179        })
180    }
181}
182
183#[derive(Default)]
184/// A struct holding state while consuming a sequence of Directories in
185/// Leaves-To-Root order.
186///
187/// Validates that newly accepted directories only reference directories which
188/// have already been accepted before, and that the sizes attached alongside the
189/// pointers match the actual sizes.
190/// Commonly used when _uploading_ a directory closure _to_ a store.
191pub struct LeavesToRootValidator {
192    /// tracks inserted directories, and their sizes observed.
193    accepted_directories: HashMap<B3Digest, u64>,
194
195    /// tracks seen directories which are not yet referenced by parents.
196    /// (root candidates)
197    pending_directories: HashSet<B3Digest>,
198
199    /// Tracks the last received digest
200    #[cfg(debug_assertions)]
201    last_inserted_digest: Option<B3Digest>,
202
203    /// tracks whether [Self::finalize] has been called,
204    /// or an error has occured while trying to accept.
205    poison: bool,
206}
207
208impl LeavesToRootValidator {
209    pub fn new() -> Self {
210        Self {
211            accepted_directories: Default::default(),
212            pending_directories: Default::default(),
213            #[cfg(debug_assertions)]
214            last_inserted_digest: None,
215            poison: false,
216        }
217    }
218
219    /// Accepts a directory if previously introduced, or returns an error if it's unknown.
220    pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
221        assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
222
223        // every directory referenced must already have been seen.
224        // Remove them from pending if still in there.
225        for (name, node) in directory.nodes() {
226            if let Node::Directory { digest, size } = node {
227                match self.accepted_directories.get(digest) {
228                    Some(s) if s == size => {
229                        self.pending_directories.remove(digest);
230                    }
231                    Some(s) => {
232                        self.poison = true;
233                        Err(OrderingError::WrongSize {
234                            digest: digest.to_owned(),
235                            size: *s,
236                        })?
237                    }
238                    None => {
239                        self.poison = true;
240                        Err(OrderingError::UnknownLTR {
241                            digest: digest.to_owned(),
242                            parent_digest: directory.digest(),
243                            path_component: name.to_owned(),
244                        })?
245                    }
246                }
247            }
248        }
249
250        // All elements were checked to only refer to directories previously seen,
251        // we can accept the directory, and add it to pending.
252        let directory_digest = directory.digest();
253        match self.accepted_directories.entry(directory_digest) {
254            hash_map::Entry::Occupied(_) => {
255                warn!("directory received multiple times");
256            }
257            hash_map::Entry::Vacant(entry) => {
258                entry.insert(directory.size());
259                #[cfg(debug_assertions)]
260                {
261                    self.last_inserted_digest = Some(directory_digest)
262                }
263                self.pending_directories.insert(directory_digest);
264            }
265        }
266
267        Ok(())
268    }
269
270    /// Should be called before Drop, to ensure there's no introduced but unsent
271    /// directories.
272    #[allow(unused_mut)]
273    pub fn finalize(mut self) -> Result<(), OrderingError> {
274        assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
275
276        if self.accepted_directories.is_empty() {
277            return Err(OrderingError::EmptySet);
278        }
279
280        // At the end, there may only be one unreferenced directory
281        // (which is the root)
282        if self.pending_directories.len() != 1 {
283            Err(OrderingError::DirectoriesMissing(
284                self.pending_directories.clone(),
285            ))?
286        }
287        #[cfg(debug_assertions)]
288        {
289            let last_inserted_digest = self
290                .last_inserted_digest
291                .expect("Snix bug: have dangling_directories, but no last_inserted_digest");
292            self.pending_directories
293                .get(&last_inserted_digest)
294                .expect("Snix bug: dangling directory is not last inserted one");
295            self.poison = true;
296        }
297
298        Ok(())
299    }
300
301    /// This receives a stream of Directories, validating them to be in Leaves-To-Root order.
302    /// If the order is correct, they are yielded wrapped in an Ok().
303    /// If not, we yield an error.
304    pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
305    where
306        S: Stream<Item = Directory> + Send + 's,
307    {
308        let mut directories = directories.boxed();
309        let mut validator = Self::new();
310
311        Box::pin(try_stream! {
312            while let Some(directory) = directories.next().await {
313                validator.try_accept(&directory)?;
314                yield directory;
315            }
316
317            validator.finalize()?;
318        })
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::{LeavesToRootValidator, RootToLeavesValidator};
325    use crate::directoryservice::Directory;
326    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D, DIRECTORY_E};
327    use futures::TryStreamExt;
328    use rstest::rstest;
329
330    #[rstest]
331    /// Uploading an empty directory should succeed.
332    #[case::empty_directory(&[&*DIRECTORY_A], false, false)]
333    /// Uploading A, then B (referring to A) should succeed.
334    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, false)]
335    /// Uploading A, then A, then C (referring to A twice) should succeed.
336    /// We pretend to be a dumb client not deduping directories.
337    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, false)]
338    /// Uploading A, then C (referring to A twice) should succeed.
339    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, false)]
340    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
341    /// as B itself would be left unconnected.
342    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, true)]
343    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
344    #[case::dangling_pointer(&[&*DIRECTORY_B], true, false)]
345    /// An empty set is disallowed.
346    #[case::empty(&[], false, true)]
347    fn leaves_to_root(
348        #[case] directories_to_upload: &[&Directory],
349        #[case] exp_fail_upload_last: bool,
350        #[case] exp_fail_finalize: bool,
351    ) {
352        let mut validator = LeavesToRootValidator::default();
353        let mut it = directories_to_upload.iter().peekable();
354
355        while let Some(d) = it.next() {
356            if it.peek().is_none() /* is last */ && exp_fail_upload_last {
357                validator
358                    .try_accept(d)
359                    .expect_err("last try_accept to fail");
360            } else {
361                assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
362            }
363        }
364
365        if !exp_fail_upload_last {
366            if !exp_fail_finalize {
367                validator.finalize().expect("finalize to succeed");
368            } else {
369                let _ = validator.finalize();
370            }
371        }
372    }
373
374    #[rstest]
375    /// Downloading an empty directory should succeed.
376    #[case::empty_directory(&[&*DIRECTORY_A], false)]
377    /// Downlading B, then A (referenced by B) should succeed.
378    #[case::simple_closure(&[&*DIRECTORY_B, &*DIRECTORY_A], false)]
379    /// Downloading C (referring to A twice), then A should succeed.
380    #[case::same_child_dedup(&[&*DIRECTORY_C, &*DIRECTORY_A], false)]
381    /// Downloading C, then A twice should succeed.
382    #[case::same_child_redundant(&[&*DIRECTORY_C, &*DIRECTORY_A, &*DIRECTORY_A], false)]
383    /// Downloading C, then A should succeed, even if we receive C twice
384    #[case::with_root_sent_twice(&[&*DIRECTORY_C, &*DIRECTORY_C, &*DIRECTORY_A], false)]
385    /// Downloading E -> D -> A,B should succeed.
386    #[case::more_levels(&[&*DIRECTORY_E, &*DIRECTORY_D, &*DIRECTORY_A, &*DIRECTORY_B], false)]
387    /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root)
388    #[case::unconnected_node(&[&*DIRECTORY_C, &*DIRECTORY_B], true)]
389    fn root_to_leaves(
390        #[case] directories_to_upload: &[&Directory],
391        #[case] exp_fail_upload_last: bool,
392    ) {
393        let root_digest = directories_to_upload[0].digest();
394        let mut validator = RootToLeavesValidator::new_with_root_digest(root_digest);
395        let mut it = directories_to_upload.iter().peekable();
396
397        while let Some(d) = it.next() {
398            if it.peek().is_none() /* is last */ && exp_fail_upload_last {
399                assert!(
400                    !validator.would_accept(&d.digest()),
401                    "would_accept not expected to accept last failing element"
402                );
403
404                validator
405                    .try_accept(d)
406                    .expect_err("last try_accept to fail");
407            } else {
408                assert!(
409                    validator.would_accept(&d.digest()),
410                    "would_accept expected to accept directory"
411                );
412                assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
413            }
414        }
415
416        if !exp_fail_upload_last {
417            validator.finalize().expect("finalize to succeed");
418        }
419    }
420
421    #[test]
422    /// This initializes a validator with another root than what we try to upload.
423    fn root_to_leaves_root_mismatch() {
424        let mut validator = RootToLeavesValidator::new_with_root_digest(DIRECTORY_A.digest());
425
426        validator
427            .try_accept(&DIRECTORY_B)
428            .expect_err("shouldn't accept wrong first directory");
429        validator.finalize().expect_err("expect finalize to fail");
430    }
431
432    #[tokio::test]
433    async fn root_to_leaves_stream() {
434        let directories_to_upload = vec![
435            DIRECTORY_E.to_owned(),
436            DIRECTORY_D.to_owned(),
437            DIRECTORY_A.to_owned(),
438            DIRECTORY_B.to_owned(),
439        ];
440        let root_digest = directories_to_upload[0].digest();
441
442        let validated_stream = RootToLeavesValidator::validate_stream(
443            root_digest,
444            futures::stream::iter(directories_to_upload.iter().map(|d| (*d).to_owned())),
445        );
446
447        let validated_directories: Vec<Directory> = validated_stream
448            .try_collect()
449            .await
450            .expect("stream to collect successfully");
451
452        assert_eq!(directories_to_upload, validated_directories);
453
454        RootToLeavesValidator::validate_stream(root_digest, futures::stream::empty())
455            .try_collect::<Vec<_>>()
456            .await
457            .expect_err("an empty stream to fail");
458    }
459}