snix_castore/directoryservice/
combinators.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryFutureExt;
5use futures::TryStreamExt;
6use futures::stream::BoxStream;
7use tonic::async_trait;
8use tracing::{instrument, trace};
9
10use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
11use crate::B3Digest;
12use crate::Error;
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::DirectoryPutter;
15
16/// Asks near first, if not found, asks far.
17/// If found in there, returns it, and *inserts* it into
18/// near.
19/// Specifically, it always obtains the entire directory closure from far and inserts it into near,
20/// which is useful when far does not support accessing intermediate directories (but near does).
21/// There is no negative cache.
22/// Inserts and listings are not implemented for now.
23pub struct Cache<DS1, DS2> {
24    instance_name: String,
25    near: DS1,
26    far: DS2,
27}
28
29impl<DS1, DS2> Cache<DS1, DS2> {
30    pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
31        Self {
32            instance_name,
33            near,
34            far,
35        }
36    }
37}
38
39#[async_trait]
40impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
41where
42    DS1: DirectoryService + Clone + 'static,
43    DS2: DirectoryService + Clone + 'static,
44{
45    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
46    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
47        match self.near.get(digest).await? {
48            Some(directory) => {
49                trace!("serving from cache");
50                Ok(Some(directory))
51            }
52            None => {
53                trace!("not found in near, asking remote…");
54
55                let mut copy = DirectoryGraph::with_order(
56                    RootToLeavesValidator::new_with_root_digest(digest.clone()),
57                );
58
59                let mut stream = self.far.get_recursive(digest);
60                let root = stream.try_next().await?;
61
62                if let Some(root) = root.clone() {
63                    copy.add(root)
64                        .map_err(|e| Error::StorageError(e.to_string()))?;
65                }
66
67                while let Some(dir) = stream.try_next().await? {
68                    copy.add(dir)
69                        .map_err(|e| Error::StorageError(e.to_string()))?;
70                }
71
72                let copy = copy
73                    .validate()
74                    .map_err(|e| Error::StorageError(e.to_string()))?;
75
76                let mut put = self.near.put_multiple_start();
77                for dir in copy.drain_leaves_to_root() {
78                    put.put(dir).await?;
79                }
80                put.close().await?;
81
82                Ok(root)
83            }
84        }
85    }
86
87    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
88    async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
89        Err(Error::StorageError("unimplemented".to_string()))
90    }
91
92    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
93    fn get_recursive(
94        &self,
95        root_directory_digest: &B3Digest,
96    ) -> BoxStream<'static, Result<Directory, Error>> {
97        let near = self.near.clone();
98        let far = self.far.clone();
99        let digest = root_directory_digest.clone();
100        Box::pin(
101            (async move {
102                let mut stream = near.get_recursive(&digest);
103                match stream.try_next().await? {
104                    Some(first) => {
105                        trace!("serving from cache");
106                        Ok(futures::stream::once(async { Ok(first) })
107                            .chain(stream)
108                            .left_stream())
109                    }
110                    None => {
111                        trace!("not found in near, asking remote…");
112
113                        let mut copy_for_near = DirectoryGraph::with_order(
114                            RootToLeavesValidator::new_with_root_digest(digest.clone()),
115                        );
116                        let mut copy_for_client = vec![];
117
118                        let mut stream = far.get_recursive(&digest);
119                        while let Some(dir) = stream.try_next().await? {
120                            copy_for_near
121                                .add(dir.clone())
122                                .map_err(|e| Error::StorageError(e.to_string()))?;
123                            copy_for_client.push(dir);
124                        }
125
126                        let copy_for_near = copy_for_near
127                            .validate()
128                            .map_err(|e| Error::StorageError(e.to_string()))?;
129                        let mut put = near.put_multiple_start();
130                        for dir in copy_for_near.drain_leaves_to_root() {
131                            put.put(dir).await?;
132                        }
133                        put.close().await?;
134
135                        Ok(futures::stream::iter(copy_for_client.into_iter().map(Ok))
136                            .right_stream())
137                    }
138                }
139            })
140            .try_flatten_stream(),
141        )
142    }
143
144    #[instrument(skip_all)]
145    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + '_)> {
146        Box::new(SimplePutter::new(self))
147    }
148}
149
150#[derive(serde::Deserialize, Debug)]
151#[serde(deny_unknown_fields)]
152pub struct CacheConfig {
153    near: String,
154    far: String,
155}
156
157impl TryFrom<url::Url> for CacheConfig {
158    type Error = Box<dyn std::error::Error + Send + Sync>;
159    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
160        // cache doesn't support host or path in the URL.
161        if url.has_host() || !url.path().is_empty() {
162            return Err(Error::StorageError("invalid url".to_string()).into());
163        }
164        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
165    }
166}
167
168#[async_trait]
169impl ServiceBuilder for CacheConfig {
170    type Output = dyn DirectoryService;
171    async fn build<'a>(
172        &'a self,
173        instance_name: &str,
174        context: &CompositionContext,
175    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
176        let (near, far) = futures::join!(
177            context.resolve::<Self::Output>(self.near.clone()),
178            context.resolve::<Self::Output>(self.far.clone())
179        );
180        Ok(Arc::new(Cache {
181            instance_name: instance_name.to_string(),
182            near: near?,
183            far: far?,
184        }))
185    }
186}