Skip to main content

snix_castore/directoryservice/combinators/
cache.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryStreamExt;
5use futures::stream::BoxStream;
6use tonic::async_trait;
7use tracing::{instrument, trace};
8
9use crate::composition::{CompositionContext, ServiceBuilder};
10use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
11use crate::directoryservice::{self, DirectoryPutter, DirectoryService, SimplePutter};
12use crate::{B3Digest, Directory};
13
14/// Asks near first, if not found, asks far.
15/// If found in there, returns it, and *inserts* it into
16/// near.
17/// Specifically, it always obtains the entire directory closure from far and inserts it into near,
18/// which is useful when far does not support accessing intermediate directories (but near does).
19/// There is no negative cache.
20/// Inserts and listings are not implemented for now.
21pub struct Cache<DN, DF> {
22    instance_name: String,
23    near: DN,
24    far: DF,
25}
26
27impl<DN, DF> Cache<DN, DF> {
28    pub fn new(instance_name: String, near: DN, far: DF) -> Self {
29        Self {
30            instance_name,
31            near,
32            far,
33        }
34    }
35}
36
37#[async_trait]
38impl<DN, DF> DirectoryService for Cache<DN, DF>
39where
40    DN: DirectoryService + Clone + 'static,
41    DF: DirectoryService + Clone + 'static,
42{
43    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
44    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, directoryservice::Error> {
45        // check near
46        if let Some(directory) = self.near.get(digest).await.map_err(Error::NearGet)? {
47            trace!("serving from cache");
48            return Ok(Some(directory));
49        }
50
51        trace!("not found in near, asking remote…");
52        // We always ask recursive, and populate the children to support far not allowing non-root access
53        // We currently wait for all children to be received before returning
54        // the requested directory, so subsequent children requests don't fail when these
55        // stores are used.
56        // FUTUREWORK: make this configurable, allow firing off a background task populating the children.
57        let mut directories = self.far.get_recursive(digest);
58        let mut graph_builder = DirectoryGraphBuilder::new_root_to_leaves(*digest);
59
60        let mut resp_directory = None;
61        while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
62            graph_builder
63                .try_insert(directory.clone())
64                .map_err(Error::DirectoryOrdering)?;
65            if resp_directory.is_none() {
66                resp_directory = Some(directory);
67            }
68        }
69
70        // If far had the directory, put into near.
71        if let Some(resp_directory) = resp_directory {
72            let directory_graph = graph_builder.build().map_err(Error::DirectoryOrdering)?;
73            // Drain into near
74            let mut near_putter = self.near.put_multiple_start();
75            for directory in directory_graph.drain_leaves_to_root() {
76                near_putter.put(directory).await.map_err(Error::NearPut)?;
77            }
78
79            let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
80            debug_assert_eq!(digest, &actual_digest);
81            Ok(Some(resp_directory))
82        } else {
83            Ok(None)
84        }
85    }
86
87    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
88    async fn put(&self, _directory: Directory) -> Result<B3Digest, directoryservice::Error> {
89        Err(Error::Unimplemented.into())
90    }
91
92    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
93    fn get_recursive(
94        &self,
95        root_directory_digest: &B3Digest,
96    ) -> BoxStream<'_, Result<Directory, directoryservice::Error>> {
97        let near = &self.near;
98        let far = &self.far;
99        let digest = *root_directory_digest;
100
101        async_stream::try_stream! {
102            let mut directories = near.get_recursive(&digest);
103
104            if let Some(first) = directories.try_next().await.map_err(Error::NearGet)? {
105                trace!("serving from cache");
106                yield first;
107
108                while let Some(dir) = directories.try_next().await.map_err(Error::NearGet)? {
109                    yield dir;
110                }
111                return;
112            }
113
114            trace!("not found in near, asking remote…");
115
116            let mut directories = far.get_recursive(&digest);
117            let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest);
118
119            // Return to the client, while inserting to the graph builder.
120            while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
121                builder.try_insert(directory.clone()).map_err(Error::DirectoryOrdering)?;
122                yield directory;
123            }
124
125            match builder.build() {
126                Ok(directory_graph) => {
127                    // Drain into near
128                    let mut near_putter = near.put_multiple_start();
129                    for directory in directory_graph.drain_leaves_to_root() {
130                        near_putter.put(directory).await.map_err(Error::NearPut)?;
131                    }
132                    let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
133                    debug_assert_eq!(digest, actual_digest);
134                }
135                Err(crate::directoryservice::OrderingError::EmptySet) => return,
136                Err(err) => Err(Error::DirectoryOrdering(err))?
137            }
138        }
139        .boxed()
140    }
141
142    #[instrument(skip_all)]
143    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
144        Box::new(SimplePutter::new(self))
145    }
146}
147
148#[derive(thiserror::Error, Debug)]
149pub enum Error {
150    #[error("wrong arguments: {0}")]
151    WrongConfig(&'static str),
152    #[error("Directory Graph ordering error: {0}")]
153    DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
154    #[error("serde-qs error: {0}")]
155    SerdeQS(#[from] serde_qs::Error),
156
157    #[error("getting from near: {0}")]
158    NearGet(#[source] directoryservice::Error),
159    #[error("putting into near: {0}")]
160    NearPut(#[source] directoryservice::Error),
161    #[error("getting from far: {0}")]
162    FarGet(#[source] directoryservice::Error),
163
164    #[error("puts are unimplemented")]
165    Unimplemented,
166}
167
168impl From<Error> for directoryservice::Error {
169    fn from(value: Error) -> Self {
170        Self(Box::new(value))
171    }
172}
173
174#[derive(serde::Deserialize, Debug)]
175#[serde(deny_unknown_fields)]
176pub struct CacheConfig {
177    near: String,
178    far: String,
179}
180
181impl TryFrom<url::Url> for CacheConfig {
182    type Error = Box<dyn std::error::Error + Send + Sync>;
183    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
184        // cache doesn't support host or path in the URL.
185        if url.has_authority() || !url.path().is_empty() {
186            return Err(Error::WrongConfig("no authority or path allowed").into());
187        }
188        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
189    }
190}
191
192#[async_trait]
193impl ServiceBuilder for CacheConfig {
194    type Output = dyn DirectoryService;
195    async fn build<'a>(
196        &'a self,
197        instance_name: &str,
198        context: &CompositionContext,
199    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
200        let (near, far) = futures::join!(
201            context.resolve::<Self::Output>(&self.near),
202            context.resolve::<Self::Output>(&self.far)
203        );
204        Ok(Arc::new(Cache {
205            instance_name: instance_name.to_string(),
206            near: near?,
207            far: far?,
208        }))
209    }
210}