snix_castore/directoryservice/
utils.rs1use super::Directory;
2use super::DirectoryService;
3use crate::B3Digest;
4use crate::Error;
5use crate::Node;
6use async_stream::try_stream;
7use futures::stream::BoxStream;
8use std::collections::{HashSet, VecDeque};
9use tracing::instrument;
10use tracing::warn;
11
12#[instrument(skip(directory_service))]
16pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
17 directory_service: DS,
18 root_directory_digest: &B3Digest,
19) -> BoxStream<'a, Result<Directory, Error>> {
20 let mut worklist_directory_digests: VecDeque<B3Digest> =
24 VecDeque::from([root_directory_digest.clone()]);
25 let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
28
29 let root_directory_digest = root_directory_digest.clone();
30
31 Box::pin(try_stream! {
32 while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
33 let current_directory = match directory_service.get(¤t_directory_digest).await.map_err(|e| {
34 warn!("failed to look up directory");
35 Error::StorageError(format!(
36 "unable to look up directory {}: {}",
37 current_directory_digest, e
38 ))
39 })? {
40 None if current_directory_digest == root_directory_digest => break,
42 None => {
44 warn!("directory {} does not exist", current_directory_digest);
45 Err(Error::StorageError(format!(
46 "directory {} does not exist",
47 current_directory_digest
48 )))?;
49 break;
50 }
51 Some(dir) => dir,
52 };
53
54 sent_directory_digests.insert(current_directory_digest);
57
58 for (_, child_directory_node) in current_directory.nodes() {
62 if let Node::Directory{digest: child_digest, ..} = child_directory_node {
63 if worklist_directory_digests.contains(child_digest)
64 || sent_directory_digests.contains(child_digest)
65 {
66 continue;
67 }
68 worklist_directory_digests.push_back(child_digest.clone());
69 }
70 }
71
72 yield current_directory;
73 }
74 })
75}