Skip to main content

snix_castore/directoryservice/combinators/
priority.rs

1use std::{
2    collections::{BTreeMap, btree_map},
3    fmt::Display,
4    sync::Arc,
5};
6
7use futures::{StreamExt, TryStreamExt, stream::BoxStream};
8use tonic::async_trait;
9use tracing::instrument;
10
11use crate::{
12    B3Digest, Directory,
13    composition::{CompositionContext, CompositionError, ServiceBuilder},
14    directoryservice::{self, DirectoryPutter, DirectoryService},
15};
16
17/// Holds references to many different directory services, each with an associated priority.
18/// Read requests try services sequentially, sorted by their priority, ascending.
19/// Any error in a service bubbles up.
20/// Write requests are not implemented.
21pub struct Priority<DS> {
22    instance_name: String,
23    /// The services, keyed by their priority.
24    // NOTE: Arc<dyn DS> implements DS too, so you can put different service types in here.
25    services: BTreeMap<Prio, DS>,
26}
27
28impl From<u64> for Prio {
29    fn from(value: u64) -> Self {
30        Self(value)
31    }
32}
33
34#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)]
35pub struct Prio(u64);
36
37impl Display for Prio {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}", self.0)
40    }
41}
42
43impl<DS> Priority<DS> {
44    /// Construct from an iterator of priorities and services.
45    /// Services with the same priority are converted to a race combinator.
46    pub fn new<I: IntoIterator<Item = (Prio, DS)>>(instance_name: String, iter: I) -> Priority<DS> {
47        let mut services = BTreeMap::new();
48
49        for (prio, service) in iter {
50            match services.entry(prio) {
51                btree_map::Entry::Vacant(entry) => {
52                    entry.insert(service);
53                }
54                btree_map::Entry::Occupied(_entry) => {
55                    unimplemented!(
56                        "already got another service at prio {prio}, race not implemented"
57                    );
58                }
59            }
60        }
61
62        Self {
63            instance_name,
64            services,
65        }
66    }
67}
68
69#[async_trait]
70impl<DS> DirectoryService for Priority<DS>
71where
72    DS: DirectoryService,
73{
74    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
75    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, directoryservice::Error> {
76        // traverse the list of services in priority order. If any service has it, return from there.
77        // Errors cause the combinator to bail out early.
78        for (prio, service) in self.services.iter() {
79            if let Some(directory) = service
80                .get(digest)
81                .await
82                .map_err(|err| Error::Backend(*prio, err))?
83            {
84                return Ok(Some(directory));
85            }
86        }
87
88        Ok(None)
89    }
90
91    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
92    fn get_recursive(
93        &self,
94        root_directory_digest: &B3Digest,
95    ) -> BoxStream<'_, Result<Directory, directoryservice::Error>> {
96        let digest = *root_directory_digest;
97        async_stream::try_stream! {
98            for (prio, service) in self.services.iter() {
99                let mut directories_stream = service.get_recursive(&digest);
100                // Once a service said it has a closure (non-empty stream), we return everything from there, including errors.
101                if let Some(directory) = directories_stream.try_next().await.map_err(|err| { Error::Backend(*prio, err)})? {
102                    yield directory;
103
104                    while let Some(directory) = directories_stream.try_next().await.map_err(|err| { Error::Backend(*prio, err)})? {
105                        yield directory;
106                    }
107                    // we're done
108                    return;
109                }
110                // try the next service in the list
111            }
112        }
113        .boxed()
114    }
115
116    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
117    async fn put(&self, _directory: Directory) -> Result<B3Digest, directoryservice::Error> {
118        Err(Error::Unimplemented.into())
119    }
120
121    #[instrument(skip_all)]
122    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
123        struct FailingPutter();
124
125        #[async_trait]
126        impl DirectoryPutter for FailingPutter {
127            async fn put(&mut self, _directory: Directory) -> Result<(), directoryservice::Error> {
128                Err(Error::Unimplemented)?
129            }
130            async fn close(&mut self) -> Result<B3Digest, directoryservice::Error> {
131                Err(Error::Unimplemented)?
132            }
133        }
134
135        Box::new(FailingPutter())
136    }
137}
138
139#[derive(thiserror::Error, Debug)]
140pub enum Error {
141    #[error("wrong arguments: {0}")]
142    WrongConfig(&'static str),
143
144    #[error("error from service with prio {0}")]
145    Backend(Prio, #[source] directoryservice::Error),
146
147    #[error("puts are unimplemented")]
148    Unimplemented,
149}
150
151impl From<Error> for directoryservice::Error {
152    fn from(value: Error) -> Self {
153        Self(Box::new(value))
154    }
155}
156
157#[derive(serde::Deserialize, Debug)]
158#[serde(deny_unknown_fields)]
159pub struct PriorityConfig {
160    services: BTreeMap<u64, String>,
161}
162
163impl TryFrom<url::Url> for PriorityConfig {
164    type Error = Box<dyn std::error::Error + Send + Sync>;
165    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
166        if url.has_authority() || !url.path().is_empty() {
167            return Err(Error::WrongConfig("no authority or path allowed").into());
168        }
169        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
170    }
171}
172
173#[async_trait]
174impl ServiceBuilder for PriorityConfig {
175    type Output = dyn DirectoryService;
176    async fn build<'a>(
177        &'a self,
178        instance_name: &str,
179        context: &CompositionContext,
180    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
181        let services = futures::future::try_join_all(self.services.iter().map(
182            |(prio, instance_ref)| async move {
183                Ok::<_, CompositionError>((
184                    Prio::from(*prio),
185                    context.resolve::<Self::Output>(instance_ref).await?,
186                ))
187            },
188        ))
189        .await?;
190
191        Ok(Arc::new(Priority::new(instance_name.to_string(), services)))
192    }
193}
194
195#[cfg(test)]
196mod test {
197    use mockall::{Sequence, predicate};
198    use pretty_assertions::{assert_eq, assert_matches};
199
200    use super::*;
201    use crate::{
202        directoryservice::MockDirectoryService,
203        fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_WITH_KEEP},
204    };
205
206    /// If first has something, last is never tried.
207    #[tokio::test]
208    async fn get_first_gets_tried_only() {
209        let mut first = MockDirectoryService::new();
210        let mut last = MockDirectoryService::new();
211
212        first
213            .expect_get()
214            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
215            .once()
216            .returning(|_| Ok(Some(DIRECTORY_WITH_KEEP.clone())));
217
218        last.expect_get().never();
219
220        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
221
222        assert_eq!(
223            Some(DIRECTORY_WITH_KEEP.clone()),
224            uut.get(&DIRECTORY_WITH_KEEP.digest())
225                .await
226                .expect("to succeed")
227        )
228    }
229
230    /// If first doesn't have it, we try last.
231    #[tokio::test]
232    async fn get_first_then_last() {
233        let mut first = MockDirectoryService::new();
234        let mut last = MockDirectoryService::new();
235        let mut seq = Sequence::new();
236
237        first
238            .expect_get()
239            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
240            .once()
241            .in_sequence(&mut seq)
242            .returning(|_| Ok(None));
243
244        last.expect_get()
245            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
246            .once()
247            .in_sequence(&mut seq)
248            .returning(|_| Ok(Some(DIRECTORY_WITH_KEEP.clone())));
249
250        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
251
252        assert_eq!(
253            Some(DIRECTORY_WITH_KEEP.clone()),
254            uut.get(&DIRECTORY_WITH_KEEP.digest())
255                .await
256                .expect("to succeed")
257        )
258    }
259
260    /// If none of the two have it, we return None.
261    #[tokio::test]
262    async fn get_first_then_last_not_found() {
263        let mut first = MockDirectoryService::new();
264        let mut last = MockDirectoryService::new();
265        let mut seq = Sequence::new();
266
267        first
268            .expect_get()
269            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
270            .once()
271            .in_sequence(&mut seq)
272            .returning(|_| Ok(None));
273
274        last.expect_get()
275            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
276            .once()
277            .in_sequence(&mut seq)
278            .returning(|_| Ok(None));
279
280        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
281
282        assert_eq!(
283            None,
284            uut.get(&DIRECTORY_WITH_KEEP.digest())
285                .await
286                .expect("to succeed")
287        )
288    }
289
290    /// Errors are bubbled up from the first backend emitting the error,
291    /// and the error identifies the backend that emitted the error.
292    #[tokio::test]
293    async fn get_bubble_up_error_first() {
294        let mut first = MockDirectoryService::new();
295        let mut last = MockDirectoryService::new();
296
297        first
298            .expect_get()
299            .with(predicate::eq(DIRECTORY_WITH_KEEP.digest()))
300            .once()
301            .returning(|_| Err(directoryservice::Error("oh no".into())));
302
303        last.expect_get().never();
304
305        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
306
307        let err = uut
308            .get(&DIRECTORY_WITH_KEEP.digest())
309            .await
310            .expect_err("must fail")
311            .0;
312
313        let err = err.downcast_ref::<Error>().unwrap();
314        assert_matches!(err, Error::Backend(Prio(0), _));
315    }
316
317    /// If the first backend responds to get_recursive, we return from there.
318    #[tokio::test]
319    async fn get_recursive_first() {
320        let mut first = MockDirectoryService::new();
321        let mut last = MockDirectoryService::new();
322
323        first
324            .expect_get_recursive()
325            .with(predicate::eq(DIRECTORY_B.digest()))
326            .once()
327            .returning(|_| {
328                futures::stream::iter([Ok(DIRECTORY_B.clone()), Ok(DIRECTORY_A.clone())]).boxed()
329            });
330        last.expect_get_recursive().never();
331
332        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
333
334        let directories = uut
335            .get_recursive(&DIRECTORY_B.digest())
336            .try_collect::<Vec<_>>()
337            .await
338            .expect("to succeed");
339
340        assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], directories);
341    }
342
343    /// If the first one doesn't have a directory closure, return from the next.
344    #[tokio::test]
345    async fn get_recursive_second() {
346        let mut first = MockDirectoryService::new();
347        let mut last = MockDirectoryService::new();
348        let mut seq = Sequence::new();
349
350        first
351            .expect_get_recursive()
352            .with(predicate::eq(DIRECTORY_B.digest()))
353            .once()
354            .in_sequence(&mut seq)
355            .returning(|_| futures::stream::empty().boxed());
356
357        last.expect_get_recursive()
358            .with(predicate::eq(DIRECTORY_B.digest()))
359            .once()
360            .in_sequence(&mut seq)
361            .returning(|_| {
362                futures::stream::iter([Ok(DIRECTORY_B.clone()), Ok(DIRECTORY_A.clone())]).boxed()
363            });
364
365        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
366
367        let directories = uut
368            .get_recursive(&DIRECTORY_B.digest())
369            .try_collect::<Vec<_>>()
370            .await
371            .expect("to succeed");
372
373        assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], directories);
374    }
375
376    /// Propagate errors from get_recursive
377    #[tokio::test]
378    async fn get_recursive_error_first() {
379        let mut first = MockDirectoryService::new();
380        let mut last = MockDirectoryService::new();
381
382        first
383            .expect_get_recursive()
384            .with(predicate::eq(DIRECTORY_B.digest()))
385            .once()
386            .returning(|_| {
387                futures::stream::iter([Err(directoryservice::Error("oh no".into()))]).boxed()
388            });
389
390        last.expect_get_recursive().never();
391
392        let uut = Priority::new("uut".to_string(), [(0.into(), first), (1.into(), last)]);
393
394        let err = uut
395            .get_recursive(&DIRECTORY_B.digest())
396            .try_collect::<Vec<_>>()
397            .await
398            .expect_err("to fail")
399            .0;
400
401        let err = err.downcast_ref::<Error>().unwrap();
402        assert_matches!(err, Error::Backend(Prio(0), _));
403    }
404
405    /// put is unsupported, and not sent to the backend
406    #[tokio::test]
407    async fn put_unsupported() {
408        let mut first = MockDirectoryService::new();
409        first.expect_put().never();
410
411        let uut = Priority::new("uut".to_string(), [(0.into(), first)]);
412
413        let err = uut
414            .put(DIRECTORY_WITH_KEEP.clone())
415            .await
416            .expect_err("must fail")
417            .0;
418
419        let err = err.downcast_ref::<Error>().unwrap();
420        assert_matches!(err, Error::Unimplemented);
421    }
422
423    /// put_recursive is unsupported, and not sent to the backend
424    #[tokio::test]
425    async fn put_recursive_unsupported() {
426        let mut first = MockDirectoryService::new();
427        first.expect_put().never();
428
429        let uut = Priority::new("uut".to_string(), [(0.into(), first)]);
430
431        let mut handle = uut.put_multiple_start();
432        let err = handle
433            .put(DIRECTORY_WITH_KEEP.clone())
434            .await
435            .expect_err("must fail")
436            .0;
437
438        let err = err.downcast_ref::<Error>().unwrap();
439        assert_matches!(err, Error::Unimplemented);
440    }
441}