snix_castore/directoryservice/
combinators.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryFutureExt;
5use futures::TryStreamExt;
6use futures::stream::BoxStream;
7use tonic::async_trait;
8use tracing::{instrument, trace};
9
10use super::{Directory, DirectoryService, SimplePutter};
11use crate::B3Digest;
12use crate::Error;
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::DirectoryPutter;
15use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
16use crate::directoryservice::directory_graph::DirectoryOrder;
17
18/// Asks near first, if not found, asks far.
19/// If found in there, returns it, and *inserts* it into
20/// near.
21/// Specifically, it always obtains the entire directory closure from far and inserts it into near,
22/// which is useful when far does not support accessing intermediate directories (but near does).
23/// There is no negative cache.
24/// Inserts and listings are not implemented for now.
25pub struct Cache<DS1, DS2> {
26    instance_name: String,
27    near: DS1,
28    far: DS2,
29}
30
31impl<DS1, DS2> Cache<DS1, DS2> {
32    pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
33        Self {
34            instance_name,
35            near,
36            far,
37        }
38    }
39}
40
41#[async_trait]
42impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
43where
44    DS1: DirectoryService + Clone + 'static,
45    DS2: DirectoryService + Clone + 'static,
46{
47    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
48    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
49        // check near
50        if let Some(directory) = self.near.get(digest).await? {
51            trace!("serving from cache");
52            return Ok(Some(directory));
53        }
54
55        trace!("not found in near, asking remote…");
56        // We always ask recursive, and fire off a background task populating the children.
57        let mut directories = self.far.get_recursive(digest);
58        if let Some(first_directory) = directories.try_next().await? {
59            // construct graph in background
60            let mut graph_builder =
61                DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
62            graph_builder
63                .insert(first_directory.clone())
64                .expect("Snix bug: inserting first directory for RTL should always work");
65
66            tokio::spawn({
67                let digest = digest.clone();
68                let near = self.near.clone();
69                async move {
70                    let mut near_putter = near.put_multiple_start();
71
72                    // Consume the rest of the elements
73                    while let Some(directory) = directories.try_next().await? {
74                        graph_builder.insert(directory)?;
75                    }
76
77                    let directory_graph = graph_builder.build()?;
78
79                    // Drain into near
80                    for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
81                        near_putter.put(directory).await?;
82                    }
83
84                    let actual_digest = near_putter.close().await?;
85                    debug_assert_eq!(digest, actual_digest);
86
87                    Ok::<_, Error>(())
88                }
89            });
90
91            Ok(Some(first_directory))
92        } else {
93            Ok(None)
94        }
95    }
96
97    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
98    async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
99        Err(Error::StorageError("unimplemented".to_string()))
100    }
101
102    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
103    fn get_recursive(
104        &self,
105        root_directory_digest: &B3Digest,
106    ) -> BoxStream<'static, Result<Directory, Error>> {
107        let near = self.near.clone();
108        let far = self.far.clone();
109        let digest = root_directory_digest.clone();
110        async move {
111            let mut stream = near.get_recursive(&digest);
112
113            if let Some(first) = stream.try_next().await? {
114                trace!("serving from cache");
115                return Ok(futures::stream::once(async { Ok(first) })
116                    .chain(stream)
117                    .boxed());
118            }
119
120            trace!("not found in near, asking remote…");
121
122            let mut directories = far.get_recursive(&digest);
123            let mut graph_builder =
124                DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
125
126            // Return to the client, while inserting to near.
127            Ok(async_stream::try_stream! {
128                let mut near_putter = near.put_multiple_start();
129
130                while let Some(directory) = directories.try_next().await? {
131                    graph_builder.insert(directory.clone())?;
132
133                    yield directory;
134                }
135
136                let directory_graph = graph_builder.build()?;
137
138                // Drain into near
139                for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
140                    near_putter.put(directory).await?;
141                }
142
143                let actual_digest = near_putter.close().await?;
144                debug_assert_eq!(digest, actual_digest);
145            }
146            .boxed())
147        }
148        .try_flatten_stream()
149        .boxed()
150    }
151
152    #[instrument(skip_all)]
153    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
154        Box::new(SimplePutter::new(self))
155    }
156}
157
158#[derive(serde::Deserialize, Debug)]
159#[serde(deny_unknown_fields)]
160pub struct CacheConfig {
161    near: String,
162    far: String,
163}
164
165impl TryFrom<url::Url> for CacheConfig {
166    type Error = Box<dyn std::error::Error + Send + Sync>;
167    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
168        // cache doesn't support host or path in the URL.
169        if url.has_host() || !url.path().is_empty() {
170            return Err(Error::StorageError("invalid url".to_string()).into());
171        }
172        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
173    }
174}
175
176#[async_trait]
177impl ServiceBuilder for CacheConfig {
178    type Output = dyn DirectoryService;
179    async fn build<'a>(
180        &'a self,
181        instance_name: &str,
182        context: &CompositionContext,
183    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
184        let (near, far) = futures::join!(
185            context.resolve::<Self::Output>(self.near.clone()),
186            context.resolve::<Self::Output>(self.far.clone())
187        );
188        Ok(Arc::new(Cache {
189            instance_name: instance_name.to_string(),
190            near: near?,
191            far: far?,
192        }))
193    }
194}