snix_store/pathinfoservice/
cache.rs1use std::sync::Arc;
2
3use futures::{TryStreamExt, stream::BoxStream};
4use nix_compat::nixbase32;
5use snix_castore::composition::{CompositionContext, ServiceBuilder};
6use tonic::async_trait;
7use tracing::{debug, instrument};
8
9use crate::pathinfoservice;
10
11use super::{PathInfo, PathInfoService};
12
13pub struct Cache<PS1, PS2> {
19 instance_name: String,
20 near: PS1,
21 far: PS2,
22}
23
24impl<PS1, PS2> Cache<PS1, PS2> {
25 pub fn new(instance_name: String, near: PS1, far: PS2) -> Self {
26 Self {
27 instance_name,
28 near,
29 far,
30 }
31 }
32}
33
34#[async_trait]
35impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
36where
37 PS1: PathInfoService,
38 PS2: PathInfoService,
39{
40 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
41 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
42 match self.near.get(digest).await.map_err(Error::NearGet)? {
43 Some(path_info) => {
44 debug!("serving from cache");
45 Ok(Some(path_info))
46 }
47 None => {
48 debug!("not found in near, asking remote…");
49 match self.far.get(digest).await.map_err(Error::FarGet)? {
50 None => Ok(None),
51 Some(path_info) => {
52 debug!("found in remote, adding to cache");
53 self.near
54 .put(path_info.clone())
55 .await
56 .map_err(Error::NearPut)?;
57 Ok(Some(path_info))
58 }
59 }
60 }
61 }
62 }
63
64 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
65 async fn has(&self, digest: [u8; 20]) -> Result<bool, pathinfoservice::Error> {
66 Ok(self.near.has(digest).await.map_err(Error::NearGet)?
68 || self.far.has(digest).await.map_err(Error::FarGet)?)
69 }
70
71 async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
72 Err(Error::Unimplemented)?
73 }
74
75 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
76 Box::pin(tokio_stream::once(Err(Error::Unimplemented)).err_into())
77 }
78}
79
80#[derive(serde::Deserialize)]
81#[serde(deny_unknown_fields)]
82pub struct CacheConfig {
83 pub near: String,
84 pub far: String,
85}
86
87#[derive(thiserror::Error, Debug)]
88pub enum Error {
89 #[error("instantiating from a url is not supported")]
90 URLNotSupported,
91
92 #[error("getting from near: {0}")]
93 NearGet(#[source] pathinfoservice::Error),
94 #[error("putting into near: {0}")]
95 NearPut(#[source] pathinfoservice::Error),
96 #[error("getting from far: {0}")]
97 FarGet(#[source] pathinfoservice::Error),
98
99 #[error("puts are unimplemented")]
100 Unimplemented,
101}
102
103impl TryFrom<url::Url> for CacheConfig {
104 type Error = Box<dyn std::error::Error + Send + Sync>;
105 fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
106 Err(Error::URLNotSupported)?
107 }
108}
109
110#[async_trait]
111impl ServiceBuilder for CacheConfig {
112 type Output = dyn PathInfoService;
113 async fn build<'a>(
114 &'a self,
115 instance_name: &str,
116 context: &CompositionContext,
117 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
118 let (near, far) = futures::join!(
119 context.resolve::<Self::Output>(&self.near),
120 context.resolve::<Self::Output>(&self.far)
121 );
122 Ok(Arc::new(Cache {
123 instance_name: instance_name.to_string(),
124 near: near?,
125 far: far?,
126 }))
127 }
128}
129
130#[cfg(test)]
131mod test {
132 use std::num::NonZeroUsize;
133
134 use crate::{
135 fixtures::PATH_INFO,
136 pathinfoservice::{LruPathInfoService, PathInfoService},
137 utils::gen_test_pathinfo_service,
138 };
139
140 async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, impl PathInfoService> {
142 let far = gen_test_pathinfo_service();
144
145 let near = LruPathInfoService::with_capacity("near".into(), NonZeroUsize::new(1).unwrap());
147
148 super::Cache::new("root".into(), near, far)
150 }
151
152 #[tokio::test]
154 async fn test_populate_cache() {
155 let svc = create_pathinfoservice().await;
156
157 assert!(
159 svc.get(*PATH_INFO.store_path.digest())
160 .await
161 .unwrap()
162 .is_none()
163 );
164
165 svc.far.put(PATH_INFO.clone()).await.unwrap();
167
168 assert_eq!(
170 Some(PATH_INFO.clone()),
171 svc.get(*PATH_INFO.store_path.digest()).await.unwrap()
172 );
173
174 assert_eq!(
176 Some(PATH_INFO.clone()),
177 svc.near.get(*PATH_INFO.store_path.digest()).await.unwrap()
178 );
179 }
180}