snix_castore/directoryservice/
combinators.rs1use std::sync::Arc;
2
3use futures::StreamExt;
4use futures::TryFutureExt;
5use futures::TryStreamExt;
6use futures::stream::BoxStream;
7use tonic::async_trait;
8use tracing::{instrument, trace};
9
10use super::{Directory, DirectoryService, SimplePutter};
11use crate::B3Digest;
12use crate::Error;
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::DirectoryPutter;
15use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
16use crate::directoryservice::directory_graph::DirectoryOrder;
17
18pub struct Cache<DS1, DS2> {
26 instance_name: String,
27 near: DS1,
28 far: DS2,
29}
30
31impl<DS1, DS2> Cache<DS1, DS2> {
32 pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
33 Self {
34 instance_name,
35 near,
36 far,
37 }
38 }
39}
40
41#[async_trait]
42impl<DS1, DS2> DirectoryService for Cache<DS1, DS2>
43where
44 DS1: DirectoryService + Clone + 'static,
45 DS2: DirectoryService + Clone + 'static,
46{
47 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
48 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
49 if let Some(directory) = self.near.get(digest).await? {
51 trace!("serving from cache");
52 return Ok(Some(directory));
53 }
54
55 trace!("not found in near, asking remote…");
56 let mut directories = self.far.get_recursive(digest);
58 if let Some(first_directory) = directories.try_next().await? {
59 let mut graph_builder =
61 DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
62 graph_builder
63 .insert(first_directory.clone())
64 .expect("Snix bug: inserting first directory for RTL should always work");
65
66 tokio::spawn({
67 let digest = digest.clone();
68 let near = self.near.clone();
69 async move {
70 let mut near_putter = near.put_multiple_start();
71
72 while let Some(directory) = directories.try_next().await? {
74 graph_builder.insert(directory)?;
75 }
76
77 let directory_graph = graph_builder.build()?;
78
79 for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
81 near_putter.put(directory).await?;
82 }
83
84 let actual_digest = near_putter.close().await?;
85 debug_assert_eq!(digest, actual_digest);
86
87 Ok::<_, Error>(())
88 }
89 });
90
91 Ok(Some(first_directory))
92 } else {
93 Ok(None)
94 }
95 }
96
97 #[instrument(skip_all, fields(instance_name = %self.instance_name))]
98 async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
99 Err(Error::StorageError("unimplemented".to_string()))
100 }
101
102 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
103 fn get_recursive(
104 &self,
105 root_directory_digest: &B3Digest,
106 ) -> BoxStream<'static, Result<Directory, Error>> {
107 let near = self.near.clone();
108 let far = self.far.clone();
109 let digest = root_directory_digest.clone();
110 async move {
111 let mut stream = near.get_recursive(&digest);
112
113 if let Some(first) = stream.try_next().await? {
114 trace!("serving from cache");
115 return Ok(futures::stream::once(async { Ok(first) })
116 .chain(stream)
117 .boxed());
118 }
119
120 trace!("not found in near, asking remote…");
121
122 let mut directories = far.get_recursive(&digest);
123 let mut graph_builder =
124 DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
125
126 Ok(async_stream::try_stream! {
128 let mut near_putter = near.put_multiple_start();
129
130 while let Some(directory) = directories.try_next().await? {
131 graph_builder.insert(directory.clone())?;
132
133 yield directory;
134 }
135
136 let directory_graph = graph_builder.build()?;
137
138 for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
140 near_putter.put(directory).await?;
141 }
142
143 let actual_digest = near_putter.close().await?;
144 debug_assert_eq!(digest, actual_digest);
145 }
146 .boxed())
147 }
148 .try_flatten_stream()
149 .boxed()
150 }
151
152 #[instrument(skip_all)]
153 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
154 Box::new(SimplePutter::new(self))
155 }
156}
157
158#[derive(serde::Deserialize, Debug)]
159#[serde(deny_unknown_fields)]
160pub struct CacheConfig {
161 near: String,
162 far: String,
163}
164
165impl TryFrom<url::Url> for CacheConfig {
166 type Error = Box<dyn std::error::Error + Send + Sync>;
167 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
168 if url.has_host() || !url.path().is_empty() {
170 return Err(Error::StorageError("invalid url".to_string()).into());
171 }
172 Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
173 }
174}
175
176#[async_trait]
177impl ServiceBuilder for CacheConfig {
178 type Output = dyn DirectoryService;
179 async fn build<'a>(
180 &'a self,
181 instance_name: &str,
182 context: &CompositionContext,
183 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
184 let (near, far) = futures::join!(
185 context.resolve::<Self::Output>(self.near.clone()),
186 context.resolve::<Self::Output>(self.far.clone())
187 );
188 Ok(Arc::new(Cache {
189 instance_name: instance_name.to_string(),
190 near: near?,
191 far: far?,
192 }))
193 }
194}