snix_castore/directoryservice/
combinators.rs1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryStreamExt;
5use futures::stream::BoxStream;
6use tonic::async_trait;
7use tracing::{instrument, trace};
8
9use super::{Directory, DirectoryService, SimplePutter};
10use crate::B3Digest;
11use crate::composition::{CompositionContext, ServiceBuilder};
12use crate::directoryservice::DirectoryPutter;
13use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
14
15pub struct Cache<DS1, DS2> {
23 instance_name: String,
24 near: DS1,
25 far: DS2,
26}
27
28impl<DS1, DS2> Cache<DS1, DS2> {
29 pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
30 Self {
31 instance_name,
32 near,
33 far,
34 }
35 }
36}
37
38#[async_trait]
39impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
40where
41 DS1: DirectoryService + Clone + 'static,
42 DS2: DirectoryService + Clone + 'static,
43{
44 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
45 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
46 if let Some(directory) = self.near.get(digest).await.map_err(Error::NearGet)? {
48 trace!("serving from cache");
49 return Ok(Some(directory));
50 }
51
52 trace!("not found in near, asking remote…");
53 let mut directories = self.far.get_recursive(digest);
59 let mut graph_builder = DirectoryGraphBuilder::new_root_to_leaves(*digest);
60
61 let mut resp_directory = None;
62 while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
63 graph_builder.try_insert(directory.clone())?;
64 if resp_directory.is_none() {
65 resp_directory = Some(directory);
66 }
67 }
68
69 if let Some(resp_directory) = resp_directory {
71 let directory_graph = graph_builder.build()?;
72 let mut near_putter = self.near.put_multiple_start();
74 for directory in directory_graph.drain_leaves_to_root() {
75 near_putter.put(directory).await.map_err(Error::NearPut)?;
76 }
77
78 let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
79 debug_assert_eq!(digest, &actual_digest);
80 Ok(Some(resp_directory))
81 } else {
82 Ok(None)
83 }
84 }
85
86 #[instrument(skip_all, fields(instance_name = %self.instance_name))]
87 async fn put(&self, _directory: Directory) -> Result<B3Digest, super::Error> {
88 Err(Error::Unimplemented.into())
89 }
90
91 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
92 fn get_recursive(
93 &self,
94 root_directory_digest: &B3Digest,
95 ) -> BoxStream<'_, Result<Directory, super::Error>> {
96 let near = &self.near;
97 let far = &self.far;
98 let digest = *root_directory_digest;
99
100 async_stream::try_stream! {
101 let mut directories = near.get_recursive(&digest);
102
103 if let Some(first) = directories.try_next().await.map_err(Error::NearGet)? {
104 trace!("serving from cache");
105 yield first;
106
107 while let Some(dir) = directories.try_next().await.map_err(Error::NearGet)? {
108 yield dir;
109 }
110 return;
111 }
112
113 trace!("not found in near, asking remote…");
114
115 let mut directories = far.get_recursive(&digest);
116 let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest);
117
118 while let Some(directory) = directories.try_next().await.map_err(Error::FarGet)? {
120 builder.try_insert(directory.clone())?;
121 yield directory;
122 }
123
124 match builder.build() {
125 Ok(directory_graph) => {
126 let mut near_putter = near.put_multiple_start();
128 for directory in directory_graph.drain_leaves_to_root() {
129 near_putter.put(directory).await.map_err(Error::NearPut)?;
130 }
131 let actual_digest = near_putter.close().await.map_err(Error::NearPut)?;
132 debug_assert_eq!(digest, actual_digest);
133 }
134 Err(crate::directoryservice::OrderingError::EmptySet) => return,
135 Err(e) => Err(e)?
136 }
137 }
138 .boxed()
139 }
140
141 #[instrument(skip_all)]
142 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
143 Box::new(SimplePutter::new(self))
144 }
145}
146
147#[derive(thiserror::Error, Debug)]
148pub enum Error {
149 #[error("wrong arguments: {0}")]
150 WrongConfig(&'static str),
151 #[error("serde-qs error: {0}")]
152 SerdeQS(#[from] serde_qs::Error),
153
154 #[error("getting from near: {0}")]
155 NearGet(#[source] super::Error),
156 #[error("putting into near: {0}")]
157 NearPut(#[source] super::Error),
158 #[error("getting from far: {0}")]
159 FarGet(#[source] super::Error),
160
161 #[error("puts are unimplemented")]
162 Unimplemented,
163}
164
165#[derive(serde::Deserialize, Debug)]
166#[serde(deny_unknown_fields)]
167pub struct CacheConfig {
168 near: String,
169 far: String,
170}
171
172impl TryFrom<url::Url> for CacheConfig {
173 type Error = Box<dyn std::error::Error + Send + Sync>;
174 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
175 if url.has_host() || !url.path().is_empty() {
177 return Err(Error::WrongConfig("no host or path allowed").into());
178 }
179 Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
180 }
181}
182
183#[async_trait]
184impl ServiceBuilder for CacheConfig {
185 type Output = dyn DirectoryService;
186 async fn build<'a>(
187 &'a self,
188 instance_name: &str,
189 context: &CompositionContext,
190 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
191 let (near, far) = futures::join!(
192 context.resolve::<Self::Output>(&self.near),
193 context.resolve::<Self::Output>(&self.far)
194 );
195 Ok(Arc::new(Cache {
196 instance_name: instance_name.to_string(),
197 near: near?,
198 far: far?,
199 }))
200 }
201}