snix_store/pathinfoservice/
cache.rs1use std::sync::Arc;
2
3use futures::stream::BoxStream;
4use nix_compat::nixbase32;
5use snix_castore::Error;
6use snix_castore::composition::{CompositionContext, ServiceBuilder};
7use tonic::async_trait;
8use tracing::{debug, instrument};
9
10use super::{PathInfo, PathInfoService};
11
12pub struct Cache<PS1, PS2> {
18 instance_name: String,
19 near: PS1,
20 far: PS2,
21}
22
23impl<PS1, PS2> Cache<PS1, PS2> {
24 pub fn new(instance_name: String, near: PS1, far: PS2) -> Self {
25 Self {
26 instance_name,
27 near,
28 far,
29 }
30 }
31}
32
33#[async_trait]
34impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
35where
36 PS1: PathInfoService,
37 PS2: PathInfoService,
38{
39 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
40 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
41 match self.near.get(digest).await? {
42 Some(path_info) => {
43 debug!("serving from cache");
44 Ok(Some(path_info))
45 }
46 None => {
47 debug!("not found in near, asking remote…");
48 match self.far.get(digest).await? {
49 None => Ok(None),
50 Some(path_info) => {
51 debug!("found in remote, adding to cache");
52 self.near.put(path_info.clone()).await?;
53 Ok(Some(path_info))
54 }
55 }
56 }
57 }
58 }
59
60 async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
61 Err(Error::StorageError("unimplemented".to_string()))
62 }
63
64 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
65 Box::pin(tokio_stream::once(Err(Error::StorageError(
66 "unimplemented".to_string(),
67 ))))
68 }
69}
70
71#[derive(serde::Deserialize)]
72pub struct CacheConfig {
73 pub near: String,
74 pub far: String,
75}
76
77impl TryFrom<url::Url> for CacheConfig {
78 type Error = Box<dyn std::error::Error + Send + Sync>;
79 fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
80 Err(Error::StorageError(
81 "Instantiating a CombinedPathInfoService from a url is not supported".into(),
82 )
83 .into())
84 }
85}
86
87#[async_trait]
88impl ServiceBuilder for CacheConfig {
89 type Output = dyn PathInfoService;
90 async fn build<'a>(
91 &'a self,
92 instance_name: &str,
93 context: &CompositionContext,
94 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
95 let (near, far) = futures::join!(
96 context.resolve::<Self::Output>(self.near.clone()),
97 context.resolve::<Self::Output>(self.far.clone())
98 );
99 Ok(Arc::new(Cache {
100 instance_name: instance_name.to_string(),
101 near: near?,
102 far: far?,
103 }))
104 }
105}
106
107#[cfg(test)]
108mod test {
109 use std::num::NonZeroUsize;
110
111 use crate::{
112 fixtures::PATH_INFO,
113 pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService},
114 };
115
116 async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> {
119 let far = MemoryPathInfoService::default();
121
122 let near = LruPathInfoService::with_capacity("test".into(), NonZeroUsize::new(1).unwrap());
124
125 super::Cache::new("test".into(), near, far)
127 }
128
129 #[tokio::test]
131 async fn test_populate_cache() {
132 let svc = create_pathinfoservice().await;
133
134 assert!(
136 svc.get(*PATH_INFO.store_path.digest())
137 .await
138 .unwrap()
139 .is_none()
140 );
141
142 svc.far.put(PATH_INFO.clone()).await.unwrap();
144
145 assert_eq!(
147 Some(PATH_INFO.clone()),
148 svc.get(*PATH_INFO.store_path.digest()).await.unwrap()
149 );
150
151 assert_eq!(
153 Some(PATH_INFO.clone()),
154 svc.near.get(*PATH_INFO.store_path.digest()).await.unwrap()
155 );
156 }
157}