snix_store/pathinfoservice/
cache.rs

1use std::sync::Arc;
2
3use futures::{TryStreamExt, stream::BoxStream};
4use nix_compat::nixbase32;
5use snix_castore::composition::{CompositionContext, ServiceBuilder};
6use tonic::async_trait;
7use tracing::{debug, instrument};
8
9use crate::pathinfoservice;
10
11use super::{PathInfo, PathInfoService};
12
13/// Asks near first, if not found, asks far.
14/// If found in there, returns it, and *inserts* it into
15/// near.
16/// There is no negative cache.
17/// Inserts and listings are not implemented for now.
18pub struct Cache<PS1, PS2> {
19    instance_name: String,
20    near: PS1,
21    far: PS2,
22}
23
24impl<PS1, PS2> Cache<PS1, PS2> {
25    pub fn new(instance_name: String, near: PS1, far: PS2) -> Self {
26        Self {
27            instance_name,
28            near,
29            far,
30        }
31    }
32}
33
34#[async_trait]
35impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
36where
37    PS1: PathInfoService,
38    PS2: PathInfoService,
39{
40    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
41    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
42        match self.near.get(digest).await.map_err(Error::NearGet)? {
43            Some(path_info) => {
44                debug!("serving from cache");
45                Ok(Some(path_info))
46            }
47            None => {
48                debug!("not found in near, asking remote…");
49                match self.far.get(digest).await.map_err(Error::FarGet)? {
50                    None => Ok(None),
51                    Some(path_info) => {
52                        debug!("found in remote, adding to cache");
53                        self.near
54                            .put(path_info.clone())
55                            .await
56                            .map_err(Error::NearPut)?;
57                        Ok(Some(path_info))
58                    }
59                }
60            }
61        }
62    }
63
64    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
65    async fn has(&self, digest: [u8; 20]) -> Result<bool, pathinfoservice::Error> {
66        // FUTUREWORK: queue background tasks if ! self.near.has && self.far.has ? (configurable)
67        Ok(self.near.has(digest).await.map_err(Error::NearGet)?
68            || self.far.has(digest).await.map_err(Error::FarGet)?)
69    }
70
71    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
72        Err(Error::Unimplemented)?
73    }
74
75    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
76        Box::pin(tokio_stream::once(Err(Error::Unimplemented)).err_into())
77    }
78}
79
80#[derive(serde::Deserialize)]
81#[serde(deny_unknown_fields)]
82pub struct CacheConfig {
83    pub near: String,
84    pub far: String,
85}
86
87#[derive(thiserror::Error, Debug)]
88pub enum Error {
89    #[error("instantiating from a url is not supported")]
90    URLNotSupported,
91
92    #[error("getting from near: {0}")]
93    NearGet(#[source] pathinfoservice::Error),
94    #[error("putting into near: {0}")]
95    NearPut(#[source] pathinfoservice::Error),
96    #[error("getting from far: {0}")]
97    FarGet(#[source] pathinfoservice::Error),
98
99    #[error("puts are unimplemented")]
100    Unimplemented,
101}
102
103impl TryFrom<url::Url> for CacheConfig {
104    type Error = Box<dyn std::error::Error + Send + Sync>;
105    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
106        Err(Error::URLNotSupported)?
107    }
108}
109
110#[async_trait]
111impl ServiceBuilder for CacheConfig {
112    type Output = dyn PathInfoService;
113    async fn build<'a>(
114        &'a self,
115        instance_name: &str,
116        context: &CompositionContext,
117    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
118        let (near, far) = futures::join!(
119            context.resolve::<Self::Output>(&self.near),
120            context.resolve::<Self::Output>(&self.far)
121        );
122        Ok(Arc::new(Cache {
123            instance_name: instance_name.to_string(),
124            near: near?,
125            far: far?,
126        }))
127    }
128}
129
130#[cfg(test)]
131mod test {
132    use std::num::NonZeroUsize;
133
134    use crate::{
135        fixtures::PATH_INFO,
136        pathinfoservice::{LruPathInfoService, PathInfoService},
137        utils::gen_test_pathinfo_service,
138    };
139
140    /// Helper function setting up an instance of a Cache PathInfoService.
141    async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, impl PathInfoService> {
142        // Create an instance of a "far" PathInfoService.
143        let far = gen_test_pathinfo_service();
144
145        // … and an instance of a "near" PathInfoService.
146        let near = LruPathInfoService::with_capacity("near".into(), NonZeroUsize::new(1).unwrap());
147
148        // create a Pathinfoservice combining the two and return it.
149        super::Cache::new("root".into(), near, far)
150    }
151
152    /// Getting from the far backend is gonna insert it into the near one.
153    #[tokio::test]
154    async fn test_populate_cache() {
155        let svc = create_pathinfoservice().await;
156
157        // query the PathInfo, things should not be there.
158        assert!(
159            svc.get(*PATH_INFO.store_path.digest())
160                .await
161                .unwrap()
162                .is_none()
163        );
164
165        // insert it into the far one.
166        svc.far.put(PATH_INFO.clone()).await.unwrap();
167
168        // now try getting it again, it should succeed.
169        assert_eq!(
170            Some(PATH_INFO.clone()),
171            svc.get(*PATH_INFO.store_path.digest()).await.unwrap()
172        );
173
174        // peek near, it should now be there.
175        assert_eq!(
176            Some(PATH_INFO.clone()),
177            svc.near.get(*PATH_INFO.store_path.digest()).await.unwrap()
178        );
179    }
180}