snix_store/pathinfoservice/
lru.rs1use async_stream::try_stream;
2use futures::stream::BoxStream;
3use lru::LruCache;
4use nix_compat::nixbase32;
5use std::num::NonZeroUsize;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tonic::async_trait;
9use tracing::instrument;
10
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12
13use crate::pathinfoservice;
14
15use super::{PathInfo, PathInfoService};
16
17pub struct LruPathInfoService {
18 instance_name: String,
19 lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
20}
21
22impl LruPathInfoService {
23 pub fn with_capacity(instance_name: String, capacity: NonZeroUsize) -> Self {
24 Self {
25 instance_name,
26 lru: Arc::new(RwLock::new(LruCache::new(capacity))),
27 }
28 }
29}
30
31#[async_trait]
32impl PathInfoService for LruPathInfoService {
33 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
34 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
35 Ok(self.lru.write().await.get(&digest).cloned())
36 }
37
38 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
39 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
40 self.lru
41 .write()
42 .await
43 .put(*path_info.store_path.digest(), path_info.clone());
44
45 Ok(path_info)
46 }
47
48 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
49 let lru = self.lru.clone();
50 Box::pin(try_stream! {
51 let lru = lru.read().await;
52 let it = lru.iter();
53
54 for (_k,v) in it {
55 yield v.clone()
56 }
57 })
58 }
59}
60
61#[derive(thiserror::Error, Debug)]
62pub enum Error {
63 #[error("instantiating from a url is not supported")]
64 URLNotSupported,
65}
66
67#[derive(serde::Deserialize, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct LruPathInfoServiceConfig {
70 capacity: NonZeroUsize,
71}
72
73impl TryFrom<url::Url> for LruPathInfoServiceConfig {
74 type Error = Box<dyn std::error::Error + Send + Sync>;
75 fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
76 Err(Error::URLNotSupported)?
77 }
78}
79
80#[async_trait]
81impl ServiceBuilder for LruPathInfoServiceConfig {
82 type Output = dyn PathInfoService;
83 async fn build<'a>(
84 &'a self,
85 instance_name: &str,
86 _context: &CompositionContext,
87 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
88 Ok(Arc::new(LruPathInfoService::with_capacity(
89 instance_name.to_string(),
90 self.capacity,
91 )))
92 }
93}
94
95#[cfg(test)]
96mod test {
97 use nix_compat::store_path::StorePath;
98 use std::{num::NonZeroUsize, sync::LazyLock};
99
100 use crate::{
101 fixtures::PATH_INFO,
102 pathinfoservice::{LruPathInfoService, PathInfo, PathInfoService},
103 };
104 static PATHINFO_2: LazyLock<PathInfo> = LazyLock::new(|| {
105 let mut p = PATH_INFO.clone();
106 p.store_path = StorePath::from_name_and_digest_fixed("dummy", [1; 20]).unwrap();
107 p
108 });
109
110 static PATHINFO_2_DIGEST: LazyLock<[u8; 20]> =
111 LazyLock::new(|| *PATHINFO_2.store_path.digest());
112
113 #[tokio::test]
114 async fn evict() {
115 let svc = LruPathInfoService::with_capacity("test".into(), NonZeroUsize::new(1).unwrap());
116
117 assert!(
119 svc.get(*PATH_INFO.store_path.digest())
120 .await
121 .expect("no error")
122 .is_none()
123 );
124
125 svc.put(PATH_INFO.clone()).await.expect("no error");
127
128 assert_eq!(
130 Some(PATH_INFO.clone()),
131 svc.get(*PATH_INFO.store_path.digest())
132 .await
133 .expect("no error")
134 );
135
136 svc.put(PATHINFO_2.clone()).await.expect("no error");
138
139 assert_eq!(
141 Some(PATHINFO_2.clone()),
142 svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
143 );
144
145 assert!(
147 svc.get(*PATH_INFO.store_path.digest())
148 .await
149 .expect("no error")
150 .is_none()
151 );
152 }
153}