snix_castore/directoryservice/traversal/
bfs.rs1use super::Error;
2use crate::{B3Digest, Directory, Node};
3use futures::StreamExt;
4use std::collections::{HashSet, VecDeque};
5use tracing::instrument;
6use tracing::warn;
7
8#[instrument(skip(get_directory))]
12pub fn root_to_leaves<F, Fut>(
13 root_directory_digest: B3Digest,
14 get_directory: F,
15) -> impl futures::Stream<Item = Result<Directory, Error>> + use<F, Fut>
16where
17 F: Fn(B3Digest) -> Fut + Sync + Send + 'static,
18 Fut: Future<Output = Result<Option<Directory>, crate::directoryservice::Error>> + Send,
19{
20 let mut worklist_directory_digests = VecDeque::from([root_directory_digest]);
24 let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
27
28 async_stream::try_stream! {
29 while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
30 let current_directory = match get_directory(current_directory_digest).await.map_err(|e| {
31 Error::GetFailure(current_directory_digest, e)
32 })? {
33 None if current_directory_digest == root_directory_digest => break,
35 None => {
37 Err(Error::NotFound(current_directory_digest))?;
38 break;
39 }
40 Some(dir) => dir,
41 };
42
43 sent_directory_digests.insert(current_directory_digest);
46
47 for (_, child_directory_node) in current_directory.nodes() {
51 if let Node::Directory{digest: child_digest, ..} = child_directory_node {
52 if worklist_directory_digests.contains(child_digest)
53 || sent_directory_digests.contains(child_digest)
54 {
55 continue;
56 }
57 worklist_directory_digests.push_back(*child_digest);
58 }
59 }
60
61 yield current_directory;
62 }
63 }.boxed()
64}