snix_castore/directoryservice/
combinators.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryStreamExt;
5use futures::stream::BoxStream;
6use tonic::async_trait;
7use tracing::{instrument, trace};
8
9use super::{Directory, DirectoryService, SimplePutter};
10use crate::B3Digest;
11use crate::composition::{CompositionContext, ServiceBuilder};
12use crate::directoryservice::DirectoryPutter;
13use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
14
15/// Asks near first, if not found, asks far.
16/// If found in there, returns it, and *inserts* it into
17/// near.
18/// Specifically, it always obtains the entire directory closure from far and inserts it into near,
19/// which is useful when far does not support accessing intermediate directories (but near does).
20/// There is no negative cache.
21/// Inserts and listings are not implemented for now.
22pub struct Cache<DS1, DS2> {
23    instance_name: String,
24    near: DS1,
25    far: DS2,
26}
27
28impl<DS1, DS2> Cache<DS1, DS2> {
29    pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
30        Self {
31            instance_name,
32            near,
33            far,
34        }
35    }
36}
37
38#[async_trait]
39impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
40where
41    DS1: DirectoryService + Clone + 'static,
42    DS2: DirectoryService + Clone + 'static,
43{
44    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
45    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
46        // check near
47        if let Some(directory) = self.near.get(digest).await.map_err(Error::NearGet)? {
48            trace!("serving from cache");
49            return Ok(Some(directory));
50        }
51
52        trace!("not found in near, asking remote…");
53        // We always ask recursive, and populate the children to support far not allowing non-root access
54        // We currently wait for all children to be received before returning
55        // the requested directory, so subsequent children requests don't fail when these
56        // stores are used.
57        // FUTUREWORK: make this configurable, allow firing off a background task populating the children.
58        let mut directories = self.far.get_recursive(digest);
59        let mut graph_builder = DirectoryGraphBuilder::new_root_to_leaves(*digest);
60
61        let mut resp_directory = None;
62        while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
63            graph_builder.try_insert(directory.clone())?;
64            if resp_directory.is_none() {
65                resp_directory = Some(directory);
66            }
67        }
68
69        // If far had the directory, put into near.
70        if let Some(resp_directory) = resp_directory {
71            let directory_graph = graph_builder.build()?;
72            // Drain into near
73            let mut near_putter = self.near.put_multiple_start();
74            for directory in directory_graph.drain_leaves_to_root() {
75                near_putter.put(directory).await.map_err(Error::NearPut)?;
76            }
77
78            let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
79            debug_assert_eq!(digest, &actual_digest);
80            Ok(Some(resp_directory))
81        } else {
82            Ok(None)
83        }
84    }
85
86    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
87    async fn put(&self, _directory: Directory) -> Result<B3Digest, super::Error> {
88        Err(Error::Unimplemented.into())
89    }
90
91    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
92    fn get_recursive(
93        &self,
94        root_directory_digest: &B3Digest,
95    ) -> BoxStream<'_, Result<Directory, super::Error>> {
96        let near = &self.near;
97        let far = &self.far;
98        let digest = *root_directory_digest;
99
100        async_stream::try_stream! {
101            let mut directories = near.get_recursive(&digest);
102
103            if let Some(first) = directories.try_next().await.map_err(Error::NearGet)? {
104                trace!("serving from cache");
105                yield first;
106
107                while let Some(dir) = directories.try_next().await.map_err(Error::NearGet)? {
108                    yield dir;
109                }
110                return;
111            }
112
113            trace!("not found in near, asking remote…");
114
115            let mut directories = far.get_recursive(&digest);
116            let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest);
117
118            // Return to the client, while inserting to the graph builder.
119            while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
120                builder.try_insert(directory.clone())?;
121                yield directory;
122            }
123
124            match builder.build() {
125                Ok(directory_graph) => {
126                    // Drain into near
127                    let mut near_putter = near.put_multiple_start();
128                    for directory in directory_graph.drain_leaves_to_root() {
129                        near_putter.put(directory).await.map_err(Error::NearPut)?;
130                    }
131                    let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
132                    debug_assert_eq!(digest, actual_digest);
133                }
134                Err(crate::directoryservice::OrderingError::EmptySet) => return,
135                Err(e) => Err(e)?
136            }
137        }
138        .boxed()
139    }
140
141    #[instrument(skip_all)]
142    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
143        Box::new(SimplePutter::new(self))
144    }
145}
146
147#[derive(thiserror::Error, Debug)]
148pub enum Error {
149    #[error("wrong arguments: {0}")]
150    WrongConfig(&'static str),
151    #[error("serde-qs error: {0}")]
152    SerdeQS(#[from] serde_qs::Error),
153
154    #[error("getting from near: {0}")]
155    NearGet(#[source] super::Error),
156    #[error("putting into near: {0}")]
157    NearPut(#[source] super::Error),
158    #[error("getting from far: {0}")]
159    FarGet(#[source] super::Error),
160
161    #[error("puts are unimplemented")]
162    Unimplemented,
163}
164
165#[derive(serde::Deserialize, Debug)]
166#[serde(deny_unknown_fields)]
167pub struct CacheConfig {
168    near: String,
169    far: String,
170}
171
172impl TryFrom<url::Url> for CacheConfig {
173    type Error = Box<dyn std::error::Error + Send + Sync>;
174    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
175        // cache doesn't support host or path in the URL.
176        if url.has_host() || !url.path().is_empty() {
177            return Err(Error::WrongConfig("no host or path allowed").into());
178        }
179        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
180    }
181}
182
183#[async_trait]
184impl ServiceBuilder for CacheConfig {
185    type Output = dyn DirectoryService;
186    async fn build<'a>(
187        &'a self,
188        instance_name: &str,
189        context: &CompositionContext,
190    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
191        let (near, far) = futures::join!(
192            context.resolve::<Self::Output>(&self.near),
193            context.resolve::<Self::Output>(&self.far)
194        );
195        Ok(Arc::new(Cache {
196            instance_name: instance_name.to_string(),
197            near: near?,
198            far: far?,
199        }))
200    }
201}