snix_store/pathinfoservice/
lru.rs

1use async_stream::try_stream;
2use futures::stream::BoxStream;
3use lru::LruCache;
4use nix_compat::nixbase32;
5use std::num::NonZeroUsize;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tonic::async_trait;
9use tracing::instrument;
10
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12
13use crate::pathinfoservice;
14
15use super::{PathInfo, PathInfoService};
16
17pub struct LruPathInfoService {
18    instance_name: String,
19    lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
20}
21
22impl LruPathInfoService {
23    pub fn with_capacity(instance_name: String, capacity: NonZeroUsize) -> Self {
24        Self {
25            instance_name,
26            lru: Arc::new(RwLock::new(LruCache::new(capacity))),
27        }
28    }
29}
30
31#[async_trait]
32impl PathInfoService for LruPathInfoService {
33    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
34    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
35        Ok(self.lru.write().await.get(&digest).cloned())
36    }
37
38    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
39    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
40        self.lru
41            .write()
42            .await
43            .put(*path_info.store_path.digest(), path_info.clone());
44
45        Ok(path_info)
46    }
47
48    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
49        let lru = self.lru.clone();
50        Box::pin(try_stream! {
51            let lru = lru.read().await;
52            let it = lru.iter();
53
54            for (_k,v) in it {
55                yield v.clone()
56            }
57        })
58    }
59}
60
61#[derive(thiserror::Error, Debug)]
62pub enum Error {
63    #[error("instantiating from a url is not supported")]
64    URLNotSupported,
65}
66
67#[derive(serde::Deserialize, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct LruPathInfoServiceConfig {
70    capacity: NonZeroUsize,
71}
72
73impl TryFrom<url::Url> for LruPathInfoServiceConfig {
74    type Error = Box<dyn std::error::Error + Send + Sync>;
75    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
76        Err(Error::URLNotSupported)?
77    }
78}
79
80#[async_trait]
81impl ServiceBuilder for LruPathInfoServiceConfig {
82    type Output = dyn PathInfoService;
83    async fn build<'a>(
84        &'a self,
85        instance_name: &str,
86        _context: &CompositionContext,
87    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
88        Ok(Arc::new(LruPathInfoService::with_capacity(
89            instance_name.to_string(),
90            self.capacity,
91        )))
92    }
93}
94
95#[cfg(test)]
96mod test {
97    use nix_compat::store_path::StorePath;
98    use std::{num::NonZeroUsize, sync::LazyLock};
99
100    use crate::{
101        fixtures::PATH_INFO,
102        pathinfoservice::{LruPathInfoService, PathInfo, PathInfoService},
103    };
104    static PATHINFO_2: LazyLock<PathInfo> = LazyLock::new(|| {
105        let mut p = PATH_INFO.clone();
106        p.store_path = StorePath::from_name_and_digest_fixed("dummy", [1; 20]).unwrap();
107        p
108    });
109
110    static PATHINFO_2_DIGEST: LazyLock<[u8; 20]> =
111        LazyLock::new(|| *PATHINFO_2.store_path.digest());
112
113    #[tokio::test]
114    async fn evict() {
115        let svc = LruPathInfoService::with_capacity("test".into(), NonZeroUsize::new(1).unwrap());
116
117        // pathinfo_1 should not be there
118        assert!(
119            svc.get(*PATH_INFO.store_path.digest())
120                .await
121                .expect("no error")
122                .is_none()
123        );
124
125        // insert it
126        svc.put(PATH_INFO.clone()).await.expect("no error");
127
128        // now it should be there.
129        assert_eq!(
130            Some(PATH_INFO.clone()),
131            svc.get(*PATH_INFO.store_path.digest())
132                .await
133                .expect("no error")
134        );
135
136        // insert pathinfo_2. This will evict pathinfo 1
137        svc.put(PATHINFO_2.clone()).await.expect("no error");
138
139        // now pathinfo 2 should be there.
140        assert_eq!(
141            Some(PATHINFO_2.clone()),
142            svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
143        );
144
145        // … but pathinfo 1 not anymore.
146        assert!(
147            svc.get(*PATH_INFO.store_path.digest())
148                .await
149                .expect("no error")
150                .is_none()
151        );
152    }
153}