snix_castore/directoryservice/combinators/
cache.rs1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryStreamExt;
5use futures::stream::BoxStream;
6use tonic::async_trait;
7use tracing::{instrument, trace};
8
9use crate::composition::{CompositionContext, ServiceBuilder};
10use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
11use crate::directoryservice::{self, DirectoryPutter, DirectoryService, SimplePutter};
12use crate::{B3Digest, Directory};
13
14pub struct Cache<DN, DF> {
22 instance_name: String,
23 near: DN,
24 far: DF,
25}
26
27impl<DN, DF> Cache<DN, DF> {
28 pub fn new(instance_name: String, near: DN, far: DF) -> Self {
29 Self {
30 instance_name,
31 near,
32 far,
33 }
34 }
35}
36
37#[async_trait]
38impl<DN, DF> DirectoryService for Cache<DN, DF>
39where
40 DN: DirectoryService + Clone + 'static,
41 DF: DirectoryService + Clone + 'static,
42{
43 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
44 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, directoryservice::Error> {
45 if let Some(directory) = self.near.get(digest).await.map_err(Error::NearGet)? {
47 trace!("serving from cache");
48 return Ok(Some(directory));
49 }
50
51 trace!("not found in near, asking remote…");
52 let mut directories = self.far.get_recursive(digest);
58 let mut graph_builder = DirectoryGraphBuilder::new_root_to_leaves(*digest);
59
60 let mut resp_directory = None;
61 while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
62 graph_builder
63 .try_insert(directory.clone())
64 .map_err(Error::DirectoryOrdering)?;
65 if resp_directory.is_none() {
66 resp_directory = Some(directory);
67 }
68 }
69
70 if let Some(resp_directory) = resp_directory {
72 let directory_graph = graph_builder.build().map_err(Error::DirectoryOrdering)?;
73 let mut near_putter = self.near.put_multiple_start();
75 for directory in directory_graph.drain_leaves_to_root() {
76 near_putter.put(directory).await.map_err(Error::NearPut)?;
77 }
78
79 let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
80 debug_assert_eq!(digest, &actual_digest);
81 Ok(Some(resp_directory))
82 } else {
83 Ok(None)
84 }
85 }
86
87 #[instrument(skip_all, fields(instance_name = %self.instance_name))]
88 async fn put(&self, _directory: Directory) -> Result<B3Digest, directoryservice::Error> {
89 Err(Error::Unimplemented.into())
90 }
91
92 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
93 fn get_recursive(
94 &self,
95 root_directory_digest: &B3Digest,
96 ) -> BoxStream<'_, Result<Directory, directoryservice::Error>> {
97 let near = &self.near;
98 let far = &self.far;
99 let digest = *root_directory_digest;
100
101 async_stream::try_stream! {
102 let mut directories = near.get_recursive(&digest);
103
104 if let Some(first) = directories.try_next().await.map_err(Error::NearGet)? {
105 trace!("serving from cache");
106 yield first;
107
108 while let Some(dir) = directories.try_next().await.map_err(Error::NearGet)? {
109 yield dir;
110 }
111 return;
112 }
113
114 trace!("not found in near, asking remote…");
115
116 let mut directories = far.get_recursive(&digest);
117 let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest);
118
119 while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
121 builder.try_insert(directory.clone()).map_err(Error::DirectoryOrdering)?;
122 yield directory;
123 }
124
125 match builder.build() {
126 Ok(directory_graph) => {
127 let mut near_putter = near.put_multiple_start();
129 for directory in directory_graph.drain_leaves_to_root() {
130 near_putter.put(directory).await.map_err(Error::NearPut)?;
131 }
132 let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
133 debug_assert_eq!(digest, actual_digest);
134 }
135 Err(crate::directoryservice::OrderingError::EmptySet) => return,
136 Err(err) => Err(Error::DirectoryOrdering(err))?
137 }
138 }
139 .boxed()
140 }
141
142 #[instrument(skip_all)]
143 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
144 Box::new(SimplePutter::new(self))
145 }
146}
147
148#[derive(thiserror::Error, Debug)]
149pub enum Error {
150 #[error("wrong arguments: {0}")]
151 WrongConfig(&'static str),
152 #[error("Directory Graph ordering error: {0}")]
153 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
154 #[error("serde-qs error: {0}")]
155 SerdeQS(#[from] serde_qs::Error),
156
157 #[error("getting from near: {0}")]
158 NearGet(#[source] directoryservice::Error),
159 #[error("putting into near: {0}")]
160 NearPut(#[source] directoryservice::Error),
161 #[error("getting from far: {0}")]
162 FarGet(#[source] directoryservice::Error),
163
164 #[error("puts are unimplemented")]
165 Unimplemented,
166}
167
168impl From<Error> for directoryservice::Error {
169 fn from(value: Error) -> Self {
170 Self(Box::new(value))
171 }
172}
173
174#[derive(serde::Deserialize, Debug)]
175#[serde(deny_unknown_fields)]
176pub struct CacheConfig {
177 near: String,
178 far: String,
179}
180
181impl TryFrom<url::Url> for CacheConfig {
182 type Error = Box<dyn std::error::Error + Send + Sync>;
183 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
184 if url.has_authority() || !url.path().is_empty() {
186 return Err(Error::WrongConfig("no authority or path allowed").into());
187 }
188 Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
189 }
190}
191
192#[async_trait]
193impl ServiceBuilder for CacheConfig {
194 type Output = dyn DirectoryService;
195 async fn build<'a>(
196 &'a self,
197 instance_name: &str,
198 context: &CompositionContext,
199 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
200 let (near, far) = futures::join!(
201 context.resolve::<Self::Output>(&self.near),
202 context.resolve::<Self::Output>(&self.far)
203 );
204 Ok(Arc::new(Cache {
205 instance_name: instance_name.to_string(),
206 near: near?,
207 far: far?,
208 }))
209 }
210}