snix_castore/directoryservice/combinators/
priority.rs1use std::{
2 collections::{BTreeMap, btree_map},
3 fmt::Display,
4 sync::Arc,
5};
6
7use futures::{StreamExt, TryStreamExt, stream::BoxStream};
8use tonic::async_trait;
9use tracing::instrument;
10
11use crate::{
12 B3Digest, Directory,
13 composition::{CompositionContext, CompositionError, ServiceBuilder},
14 directoryservice::{self, DirectoryPutter, DirectoryService},
15};
16
17pub struct Priority<DS> {
22 instance_name: String,
23 services: BTreeMap<Prio, DS>,
26}
27
28impl From<u64> for Prio {
29 fn from(value: u64) -> Self {
30 Self(value)
31 }
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)]
35pub struct Prio(u64);
36
37impl Display for Prio {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}", self.0)
40 }
41}
42
43impl<DS> Priority<DS> {
44 pub fn new<I: IntoIterator<Item = (Prio, DS)>>(instance_name: String, iter: I) -> Priority<DS> {
47 let mut services = BTreeMap::new();
48
49 for (prio, service) in iter {
50 match services.entry(prio) {
51 btree_map::Entry::Vacant(entry) => {
52 entry.insert(service);
53 }
54 btree_map::Entry::Occupied(_entry) => {
55 unimplemented!(
56 "already got another service at prio {prio}, race not implemented"
57 );
58 }
59 }
60 }
61
62 Self {
63 instance_name,
64 services,
65 }
66 }
67}
68
69#[async_trait]
70impl<DS> DirectoryService for Priority<DS>
71where
72 DS: DirectoryService,
73{
74 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
75 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, directoryservice::Error> {
76 for (prio, service) in self.services.iter() {
79 if let Some(directory) = service
80 .get(digest)
81 .await
82 .map_err(|err| Error::Backend(*prio, err))?
83 {
84 return Ok(Some(directory));
85 }
86 }
87
88 Ok(None)
89 }
90
91 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
92 fn get_recursive(
93 &self,
94 root_directory_digest: &B3Digest,
95 ) -> BoxStream<'_, Result<Directory, directoryservice::Error>> {
96 let digest = *root_directory_digest;
97 async_stream::try_stream! {
98 for (prio, service) in self.services.iter() {
99 let mut directories_stream = service.get_recursive(&digest);
100 if let Some(directory) = directories_stream.try_next().await.map_err(|err| { Error::Backend(*prio, err)})? {
102 yield directory;
103
104 while let Some(directory) = directories_stream.try_next().await.map_err(|err| { Error::Backend(*prio, err)})? {
105 yield directory;
106 }
107 return;
109 }
110 }
112 }
113 .boxed()
114 }
115
116 #[instrument(skip_all, fields(instance_name = %self.instance_name))]
117 async fn put(&self, _directory: Directory) -> Result<B3Digest, directoryservice::Error> {
118 Err(Error::Unimplemented.into())
119 }
120
121 #[instrument(skip_all)]
122 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
123 struct FailingPutter();
124
125 #[async_trait]
126 impl DirectoryPutter for FailingPutter {
127 async fn put(&mut self, _directory: Directory) -> Result<(), directoryservice::Error> {
128 Err(Error::Unimplemented)?
129 }
130 async fn close(&mut self) -> Result<B3Digest, directoryservice::Error> {
131 Err(Error::Unimplemented)?
132 }
133 }
134
135 Box::new(FailingPutter())
136 }
137}
138
139#[derive(thiserror::Error, Debug)]
140pub enum Error {
141 #[error("wrong arguments: {0}")]
142 WrongConfig(&'static str),
143
144 #[error("error from service with prio {0}")]
145 Backend(Prio, #[source] directoryservice::Error),
146
147 #[error("puts are unimplemented")]
148 Unimplemented,
149}
150
151impl From<Error> for directoryservice::Error {
152 fn from(value: Error) -> Self {
153 Self(Box::new(value))
154 }
155}
156
157#[derive(serde::Deserialize, Debug)]
158#[serde(deny_unknown_fields)]
159pub struct PriorityConfig {
160 services: BTreeMap<u64, String>,
161}
162
163impl TryFrom<url::Url> for PriorityConfig {
164 type Error = Box<dyn std::error::Error + Send + Sync>;
165 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
166 if url.has_authority() || !url.path().is_empty() {
167 return Err(Error::WrongConfig("no authority or path allowed").into());
168 }
169 Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
170 }
171}
172
173#[async_trait]
174impl ServiceBuilder for PriorityConfig {
175 type Output = dyn DirectoryService;
176 async fn build<'a>(
177 &'a self,
178 instance_name: &str,
179 context: &CompositionContext,
180 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
181 let services = futures::future::try_join_all(self.services.iter().map(
182 |(prio, instance_ref)| async move {
183 Ok::<_, CompositionError>((
184 Prio::from(*prio),
185 context.resolve::<Self::Output>(instance_ref).await?,
186 ))
187 },
188 ))
189 .await?;
190
191 Ok(Arc::new(Priority::new(instance_name.to_string(), services)))
192 }
193}
194
195#[cfg(test)]
196mod test {
197 use mockall::{Sequence, predicate};
198 use pretty_assertions::{assert_eq, assert_matches};
199
200 use super::*;
201 use crate::{
202 directoryservice::MockDirectoryService,
203 fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_WITH_KEEP},
204 };
205
206 #[tokio::test]
208 async fn get_first_gets_tried_only() {
209 let mut first = MockDirectoryService::new();
210 let mut last = MockDirectoryService::new();
211
212 first
213 .expect_get()
214 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
215 .once()
216 .returning(|_| Ok(Some(DIRECTORY_WITH_KEEP.clone())));
217
218 last.expect_get().never();
219
220 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
221
222 assert_eq!(
223 Some(DIRECTORY_WITH_KEEP.clone()),
224 uut.get(&DIRECTORY_WITH_KEEP.digest())
225 .await
226 .expect("to succeed")
227 )
228 }
229
230 #[tokio::test]
232 async fn get_first_then_last() {
233 let mut first = MockDirectoryService::new();
234 let mut last = MockDirectoryService::new();
235 let mut seq = Sequence::new();
236
237 first
238 .expect_get()
239 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
240 .once()
241 .in_sequence(&mut seq)
242 .returning(|_| Ok(None));
243
244 last.expect_get()
245 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
246 .once()
247 .in_sequence(&mut seq)
248 .returning(|_| Ok(Some(DIRECTORY_WITH_KEEP.clone())));
249
250 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
251
252 assert_eq!(
253 Some(DIRECTORY_WITH_KEEP.clone()),
254 uut.get(&DIRECTORY_WITH_KEEP.digest())
255 .await
256 .expect("to succeed")
257 )
258 }
259
260 #[tokio::test]
262 async fn get_first_then_last_not_found() {
263 let mut first = MockDirectoryService::new();
264 let mut last = MockDirectoryService::new();
265 let mut seq = Sequence::new();
266
267 first
268 .expect_get()
269 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
270 .once()
271 .in_sequence(&mut seq)
272 .returning(|_| Ok(None));
273
274 last.expect_get()
275 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
276 .once()
277 .in_sequence(&mut seq)
278 .returning(|_| Ok(None));
279
280 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
281
282 assert_eq!(
283 None,
284 uut.get(&DIRECTORY_WITH_KEEP.digest())
285 .await
286 .expect("to succeed")
287 )
288 }
289
290 #[tokio::test]
293 async fn get_bubble_up_error_first() {
294 let mut first = MockDirectoryService::new();
295 let mut last = MockDirectoryService::new();
296
297 first
298 .expect_get()
299 .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
300 .once()
301 .returning(|_| Err(directoryservice::Error("oh no".into())));
302
303 last.expect_get().never();
304
305 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
306
307 let err = uut
308 .get(&DIRECTORY_WITH_KEEP.digest())
309 .await
310 .expect_err("must fail")
311 .0;
312
313 let err = err.downcast_ref::<Error>().unwrap();
314 assert_matches!(err, Error::Backend(Prio(0), _));
315 }
316
317 #[tokio::test]
319 async fn get_recursive_first() {
320 let mut first = MockDirectoryService::new();
321 let mut last = MockDirectoryService::new();
322
323 first
324 .expect_get_recursive()
325 .with(predicate::eq(DIRECTORY_B.digest()))
326 .once()
327 .returning(|_| {
328 futures::stream::iter([Ok(DIRECTORY_B.clone()), Ok(DIRECTORY_A.clone())]).boxed()
329 });
330 last.expect_get_recursive().never();
331
332 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
333
334 let directories = uut
335 .get_recursive(&DIRECTORY_B.digest())
336 .try_collect::<Vec<_>>()
337 .await
338 .expect("to succeed");
339
340 assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], directories);
341 }
342
343 #[tokio::test]
345 async fn get_recursive_second() {
346 let mut first = MockDirectoryService::new();
347 let mut last = MockDirectoryService::new();
348 let mut seq = Sequence::new();
349
350 first
351 .expect_get_recursive()
352 .with(predicate::eq(DIRECTORY_B.digest()))
353 .once()
354 .in_sequence(&mut seq)
355 .returning(|_| futures::stream::empty().boxed());
356
357 last.expect_get_recursive()
358 .with(predicate::eq(DIRECTORY_B.digest()))
359 .once()
360 .in_sequence(&mut seq)
361 .returning(|_| {
362 futures::stream::iter([Ok(DIRECTORY_B.clone()), Ok(DIRECTORY_A.clone())]).boxed()
363 });
364
365 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
366
367 let directories = uut
368 .get_recursive(&DIRECTORY_B.digest())
369 .try_collect::<Vec<_>>()
370 .await
371 .expect("to succeed");
372
373 assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], directories);
374 }
375
376 #[tokio::test]
378 async fn get_recursive_error_first() {
379 let mut first = MockDirectoryService::new();
380 let mut last = MockDirectoryService::new();
381
382 first
383 .expect_get_recursive()
384 .with(predicate::eq(DIRECTORY_B.digest()))
385 .once()
386 .returning(|_| {
387 futures::stream::iter([Err(directoryservice::Error("oh no".into()))]).boxed()
388 });
389
390 last.expect_get_recursive().never();
391
392 let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
393
394 let err = uut
395 .get_recursive(&DIRECTORY_B.digest())
396 .try_collect::<Vec<_>>()
397 .await
398 .expect_err("to fail")
399 .0;
400
401 let err = err.downcast_ref::<Error>().unwrap();
402 assert_matches!(err, Error::Backend(Prio(0), _));
403 }
404
405 #[tokio::test]
407 async fn put_unsupported() {
408 let mut first = MockDirectoryService::new();
409 first.expect_put().never();
410
411 let uut = Priority::new("uut".to_string(), [(0.into(), first)]);
412
413 let err = uut
414 .put(DIRECTORY_WITH_KEEP.clone())
415 .await
416 .expect_err("must fail")
417 .0;
418
419 let err = err.downcast_ref::<Error>().unwrap();
420 assert_matches!(err, Error::Unimplemented);
421 }
422
423 #[tokio::test]
425 async fn put_recursive_unsupported() {
426 let mut first = MockDirectoryService::new();
427 first.expect_put().never();
428
429 let uut = Priority::new("uut".to_string(), [(0.into(), first)]);
430
431 let mut handle = uut.put_multiple_start();
432 let err = handle
433 .put(DIRECTORY_WITH_KEEP.clone())
434 .await
435 .expect_err("must fail")
436 .0;
437
438 let err = err.downcast_ref::<Error>().unwrap();
439 assert_matches!(err, Error::Unimplemented);
440 }
441}