snix_castore/directoryservice/
order_validator.rs

1use async_stream::try_stream;
2use futures::StreamExt;
3use futures::{Stream, stream::BoxStream};
4use std::collections::{HashMap, HashSet, hash_map};
5use tracing::warn;
6
7use super::Directory;
8use crate::{B3Digest, Node};
9
10/// Emitted when directories are sent in the wrong order
11#[derive(thiserror::Error, Debug, Eq, PartialEq)]
12pub enum OrderingError {
13    #[error("wrong size {size} for digest {digest}")]
14    WrongSize { digest: B3Digest, size: u64 },
15
16    #[error("unknown digest {digest} referenced for {path_component} in parent {parent_digest}")]
17    UnknownLTR {
18        digest: B3Digest,
19        parent_digest: B3Digest,
20        path_component: crate::PathComponent,
21    },
22
23    #[error("unexpected Directory with digest {0} encountered", directory.digest())]
24    Unexpected { directory: Directory },
25
26    #[error("found more than one pending directory")]
27    MoreThanOnePending(HashSet<B3Digest>),
28
29    #[error("No directories received")]
30    NoNodesReceived,
31}
32
33/// A struct holding state while consuming a sequence of Directories in
34/// Root-To-Leaves order.
35///
36/// It allows querying whether a certain digest could be acceptable
37/// (to be able to skip parsing entirely if present in serialized form)
38///
39/// Validates that newly received directories are already referenced from
40/// the root via existing directories.
41/// It also ensures the actual directory sizes are the same as the sizes
42/// communicated previously alongside the pointers.
43/// Commonly used when _receiving_ a directory closure _from_ a store.
44///
45/// Internally keeps a list of digests introduced (pointers in previously
46/// received directories), to recognize getting sent unrelated directories,
47/// as well as a list of introduced, but not yet received digest (to detect
48/// still-missing directories).
49pub struct RootToLeavesValidator {
50    /// directory digest introduced so far, and the sizes they are expected to have.
51    introduced_directories: HashMap<B3Digest, u64>,
52
53    /// the subset of `introduced_directories` that we still wait to receive.
54    pending_directories: HashSet<B3Digest>,
55
56    /// tracks whether [Self::finalize] has been called,
57    /// or an error has occured while trying to accept.
58    poison: bool,
59}
60
61impl RootToLeavesValidator {
62    /// Initialize with the passed root directory
63    /// That directory is implicitly accepted and should not be sent again
64    pub fn new_with_root(directory: &Directory) -> Self {
65        let mut this = Self {
66            introduced_directories: Default::default(),
67            pending_directories: Default::default(),
68            poison: false,
69        };
70        this.introduce_children_of(directory);
71        this
72    }
73
74    /// Checks a directory digest on whether it's introduced.
75    /// Particularly useful when receiving directories in canonical protobuf
76    /// encoding, so that directories not connected to the root can be rejected
77    /// without parsing.
78    ///
79    /// After parsing, the directory can be passed to [Self::try_accept]
80    /// to add its children to the list of expected digests.
81    pub fn would_accept(&self, digest: &B3Digest) -> bool {
82        assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
83        self.introduced_directories.contains_key(digest)
84    }
85
86    /// Accepts a directory if previously introduced, or returns an error if it's unknown.
87    pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
88        assert!(!self.poison, "Snix bug: RootToLeavesValidator poisoned");
89
90        // every incoming directory must already have been introduced.
91        let size = directory.size();
92        let digest = directory.digest();
93
94        match self.introduced_directories.get(&digest) {
95            Some(s) if *s == size => {
96                if !self.pending_directories.remove(&digest) {
97                    warn!("directory received multiple times");
98                };
99                Ok(())
100            }
101            Some(_) => Err(OrderingError::WrongSize { digest, size }),
102            None => Err(OrderingError::Unexpected {
103                directory: directory.clone(),
104            }),
105        }
106    }
107
108    /// Should be called after accepting the last Directory
109    /// Ensures there's no more pending directories.
110    pub fn finalize(mut self) -> Result<(), OrderingError> {
111        // At the end of the stream, pending must be empty.
112        if !self.pending_directories.is_empty() {
113            return Err(OrderingError::MoreThanOnePending(
114                self.pending_directories.clone(),
115            ));
116        }
117
118        self.poison = true;
119
120        Ok(())
121    }
122
123    // Adds each child node to introduced_directories and pending_directories.
124    fn introduce_children_of(&mut self, directory: &Directory) {
125        for (_name, node) in directory.nodes() {
126            if let Node::Directory { digest, size } = node {
127                // if there's a pointer to a new directory
128                if self
129                    .introduced_directories
130                    .insert(digest.to_owned(), *size)
131                    .is_none()
132                {
133                    self.pending_directories.insert(digest.to_owned());
134                }
135            }
136        }
137    }
138
139    /// This receives a stream of Directories, validating them to be in Root-To-Leaves order.
140    /// If the order is correct, they are yielded wrapped in an Ok().
141    /// If not, we yield an error.
142    pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
143    where
144        S: Stream<Item = Directory> + Send + 's,
145    {
146        let mut directories = directories.boxed();
147
148        Box::pin(try_stream! {
149            // in the else case (empty stream), we emit an empty stream.
150            if let Some(first_incoming_directory) = directories.next().await {
151                let mut validator = RootToLeavesValidator::new_with_root(&first_incoming_directory);
152
153                while let Some(incoming_directory) = directories.next().await {
154                    validator.try_accept(&incoming_directory)?;
155                    yield incoming_directory;
156                }
157
158                validator.finalize()?;
159            }
160
161        })
162    }
163}
164
165#[derive(Default)]
166/// A struct holding state while consuming a sequence of Directories in
167/// Leaves-To-Root order.
168///
169/// Validates that newly accepted directories only reference directories which
170/// have already been accepted before, and that the sizes attached alongside the
171/// pointers match the actual sizes.
172/// Commonly used when _uploading_ a directory closure _to_ a store.
173pub struct LeavesToRootValidator {
174    /// tracks inserted directories, and their sizes observed.
175    accepted_directories: HashMap<B3Digest, u64>,
176
177    /// tracks seen directories which are not yet referenced by parents.
178    /// (root candidates)
179    pending_directories: HashSet<B3Digest>,
180
181    /// Tracks the last received digest
182    #[cfg(debug_assertions)]
183    last_inserted_digest: Option<B3Digest>,
184
185    /// tracks whether [Self::finalize] has been called,
186    /// or an error has occured while trying to accept.
187    poison: bool,
188}
189
190impl LeavesToRootValidator {
191    pub fn new() -> Self {
192        Self {
193            accepted_directories: Default::default(),
194            pending_directories: Default::default(),
195            #[cfg(debug_assertions)]
196            last_inserted_digest: None,
197            poison: false,
198        }
199    }
200
201    /// Accepts a directory if previously introduced, or returns an error if it's unknown.
202    pub fn try_accept(&mut self, directory: &Directory) -> Result<(), OrderingError> {
203        assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
204
205        // every directory referenced must already have been seen.
206        // Remove them from pending if still in there.
207        for (name, node) in directory.nodes() {
208            if let Node::Directory { digest, size } = node {
209                match self.accepted_directories.get(digest) {
210                    Some(s) if s == size => {
211                        self.pending_directories.remove(digest);
212                    }
213                    Some(s) => {
214                        self.poison = true;
215                        Err(OrderingError::WrongSize {
216                            digest: digest.to_owned(),
217                            size: *s,
218                        })?
219                    }
220                    None => {
221                        self.poison = true;
222                        Err(OrderingError::UnknownLTR {
223                            digest: digest.to_owned(),
224                            parent_digest: directory.digest(),
225                            path_component: name.to_owned(),
226                        })?
227                    }
228                }
229            }
230        }
231
232        // All elements were checked to only refer to directories previously seen,
233        // we can accept the directory, and add it to pending.
234        let directory_digest = directory.digest();
235        match self.accepted_directories.entry(directory_digest.clone()) {
236            hash_map::Entry::Occupied(_) => {
237                warn!("directory received multiple times");
238            }
239            hash_map::Entry::Vacant(entry) => {
240                entry.insert(directory.size());
241                #[cfg(debug_assertions)]
242                {
243                    self.last_inserted_digest = Some(directory_digest.clone())
244                }
245                self.pending_directories.insert(directory_digest);
246            }
247        }
248
249        Ok(())
250    }
251
252    /// Should be called before Drop, to ensure there's no introduced but unsent
253    /// directories.
254    #[allow(unused_mut)]
255    pub fn finalize(mut self) -> Result<(), OrderingError> {
256        assert!(!self.poison, "Snix bug: LeavesToRootValidator poisoned");
257
258        if self.accepted_directories.is_empty() {
259            return Err(OrderingError::NoNodesReceived);
260        }
261
262        // At the end, there may only be one unreferenced directory
263        // (which is the root)
264        if self.pending_directories.len() != 1 {
265            Err(OrderingError::MoreThanOnePending(
266                self.pending_directories.clone(),
267            ))?
268        }
269        #[cfg(debug_assertions)]
270        {
271            let last_inserted_digest = self
272                .last_inserted_digest
273                .clone()
274                .expect("Snix bug: have dangling_directories, but no last_inserted_digest");
275            self.pending_directories
276                .get(&last_inserted_digest)
277                .expect("Snix bug: dangling directory is not last inserted one");
278            self.poison = true;
279        }
280
281        Ok(())
282    }
283
284    /// This receives a stream of Directories, validating them to be in Leaves-To-Root order.
285    /// If the order is correct, they are yielded wrapped in an Ok().
286    /// If not, we yield an error.
287    pub fn validate_stream<'s, S>(directories: S) -> BoxStream<'s, Result<Directory, OrderingError>>
288    where
289        S: Stream<Item = Directory> + Send + 's,
290    {
291        let mut directories = directories.boxed();
292        let mut validator = Self::new();
293
294        Box::pin(try_stream! {
295            while let Some(directory) = directories.next().await {
296                validator.try_accept(&directory)?;
297                yield directory;
298            }
299
300            validator.finalize()?;
301        })
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::{LeavesToRootValidator, RootToLeavesValidator};
308    use crate::directoryservice::Directory;
309    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
310    use rstest::rstest;
311
312    #[rstest]
313    /// Uploading an empty directory should succeed.
314    #[case::empty_directory(&[&*DIRECTORY_A], false, false)]
315    /// Uploading A, then B (referring to A) should succeed.
316    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, false)]
317    /// Uploading A, then A, then C (referring to A twice) should succeed.
318    /// We pretend to be a dumb client not deduping directories.
319    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, false)]
320    /// Uploading A, then C (referring to A twice) should succeed.
321    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, false)]
322    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
323    /// as B itself would be left unconnected.
324    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, true)]
325    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
326    #[case::dangling_pointer(&[&*DIRECTORY_B], true, false)]
327    /// An empty set is disallowed.
328    #[case::empty(&[], false, true)]
329    fn leaves_to_root(
330        #[case] directories_to_upload: &[&Directory],
331        #[case] exp_fail_upload_last: bool,
332        #[case] exp_fail_finalize: bool,
333    ) {
334        let mut validator = LeavesToRootValidator::default();
335        let mut it = directories_to_upload.iter().peekable();
336
337        while let Some(d) = it.next() {
338            if it.peek().is_none() /* is last */ && exp_fail_upload_last {
339                validator
340                    .try_accept(d)
341                    .expect_err("last try_accept to fail");
342            } else {
343                assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
344            }
345        }
346
347        if !exp_fail_upload_last {
348            if !exp_fail_finalize {
349                validator.finalize().expect("finalize to succeed");
350            } else {
351                let _ = validator.finalize();
352            }
353        }
354    }
355
356    #[rstest]
357    /// Downloading an empty directory should succeed.
358    #[case::empty_directory(&*DIRECTORY_A, &[], false)]
359    /// Downlading B, then A (referenced by B) should succeed.
360    #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_A], false)]
361    /// Downloading C (referring to A twice), then A should succeed.
362    #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_A], false)]
363    /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root)
364    #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_B], true)]
365    fn root_to_leaves(
366        #[case] root: &Directory,
367        #[case] directories_to_upload: &[&Directory],
368        #[case] exp_fail_upload_last: bool,
369    ) {
370        let mut validator = RootToLeavesValidator::new_with_root(root);
371        let mut it = directories_to_upload.iter().peekable();
372
373        while let Some(d) = it.next() {
374            if it.peek().is_none() /* is last */ && exp_fail_upload_last {
375                assert!(
376                    !validator.would_accept(&d.digest()),
377                    "would_accept not expected to accept last failing element"
378                );
379
380                validator
381                    .try_accept(d)
382                    .expect_err("last try_accept to fail");
383            } else {
384                assert!(
385                    validator.would_accept(&d.digest()),
386                    "would_accept expected to accept directory"
387                );
388                assert!(validator.try_accept(d).is_ok(), "try_accept to succeed");
389            }
390        }
391
392        if !exp_fail_upload_last {
393            validator.finalize().expect("finalize to succeed");
394        }
395    }
396}