snix_castore/directoryservice/
combinators.rs1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryFutureExt;
5use futures::TryStreamExt;
6use futures::stream::BoxStream;
7use tonic::async_trait;
8use tracing::{instrument, trace};
9
10use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
11use crate::B3Digest;
12use crate::Error;
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::DirectoryPutter;
15
16pub struct Cache<DS1, DS2> {
24 instance_name: String,
25 near: DS1,
26 far: DS2,
27}
28
29impl<DS1, DS2> Cache<DS1, DS2> {
30 pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
31 Self {
32 instance_name,
33 near,
34 far,
35 }
36 }
37}
38
39#[async_trait]
40impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
41where
42 DS1: DirectoryService + Clone + 'static,
43 DS2: DirectoryService + Clone + 'static,
44{
45 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
46 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
47 match self.near.get(digest).await? {
48 Some(directory) => {
49 trace!("serving from cache");
50 Ok(Some(directory))
51 }
52 None => {
53 trace!("not found in near, asking remote…");
54
55 let mut copy = DirectoryGraph::with_order(
56 RootToLeavesValidator::new_with_root_digest(digest.clone()),
57 );
58
59 let mut stream = self.far.get_recursive(digest);
60 let root = stream.try_next().await?;
61
62 if let Some(root) = root.clone() {
63 copy.add(root)
64 .map_err(|e| Error::StorageError(e.to_string()))?;
65 }
66
67 while let Some(dir) = stream.try_next().await? {
68 copy.add(dir)
69 .map_err(|e| Error::StorageError(e.to_string()))?;
70 }
71
72 let copy = copy
73 .validate()
74 .map_err(|e| Error::StorageError(e.to_string()))?;
75
76 let mut put = self.near.put_multiple_start();
77 for dir in copy.drain_leaves_to_root() {
78 put.put(dir).await?;
79 }
80 put.close().await?;
81
82 Ok(root)
83 }
84 }
85 }
86
87 #[instrument(skip_all, fields(instance_name = %self.instance_name))]
88 async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
89 Err(Error::StorageError("unimplemented".to_string()))
90 }
91
92 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
93 fn get_recursive(
94 &self,
95 root_directory_digest: &B3Digest,
96 ) -> BoxStream<'static, Result<Directory, Error>> {
97 let near = self.near.clone();
98 let far = self.far.clone();
99 let digest = root_directory_digest.clone();
100 Box::pin(
101 (async move {
102 let mut stream = near.get_recursive(&digest);
103 match stream.try_next().await? {
104 Some(first) => {
105 trace!("serving from cache");
106 Ok(futures::stream::once(async { Ok(first) })
107 .chain(stream)
108 .left_stream())
109 }
110 None => {
111 trace!("not found in near, asking remote…");
112
113 let mut copy_for_near = DirectoryGraph::with_order(
114 RootToLeavesValidator::new_with_root_digest(digest.clone()),
115 );
116 let mut copy_for_client = vec![];
117
118 let mut stream = far.get_recursive(&digest);
119 while let Some(dir) = stream.try_next().await? {
120 copy_for_near
121 .add(dir.clone())
122 .map_err(|e| Error::StorageError(e.to_string()))?;
123 copy_for_client.push(dir);
124 }
125
126 let copy_for_near = copy_for_near
127 .validate()
128 .map_err(|e| Error::StorageError(e.to_string()))?;
129 let mut put = near.put_multiple_start();
130 for dir in copy_for_near.drain_leaves_to_root() {
131 put.put(dir).await?;
132 }
133 put.close().await?;
134
135 Ok(futures::stream::iter(copy_for_client.into_iter().map(Ok))
136 .right_stream())
137 }
138 }
139 })
140 .try_flatten_stream(),
141 )
142 }
143
144 #[instrument(skip_all)]
145 fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + '_)> {
146 Box::new(SimplePutter::new(self))
147 }
148}
149
150#[derive(serde::Deserialize, Debug)]
151#[serde(deny_unknown_fields)]
152pub struct CacheConfig {
153 near: String,
154 far: String,
155}
156
157impl TryFrom<url::Url> for CacheConfig {
158 type Error = Box<dyn std::error::Error + Send + Sync>;
159 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
160 if url.has_host() || !url.path().is_empty() {
162 return Err(Error::StorageError("invalid url".to_string()).into());
163 }
164 Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
165 }
166}
167
168#[async_trait]
169impl ServiceBuilder for CacheConfig {
170 type Output = dyn DirectoryService;
171 async fn build<'a>(
172 &'a self,
173 instance_name: &str,
174 context: &CompositionContext,
175 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
176 let (near, far) = futures::join!(
177 context.resolve::<Self::Output>(self.near.clone()),
178 context.resolve::<Self::Output>(self.far.clone())
179 );
180 Ok(Arc::new(Cache {
181 instance_name: instance_name.to_string(),
182 near: near?,
183 far: far?,
184 }))
185 }
186}