snix_store/pathinfoservice/
redb.rs

1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::{
8    Error,
9    composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18/// PathInfoService implementation using redb under the hood.
19/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
20/// its corresponding protobuf-encoded PathInfo.
21pub struct RedbPathInfoService {
22    instance_name: String,
23    // We wrap db in an Arc to be able to move it into spawn_blocking,
24    // as discussed in https://github.com/cberner/redb/issues/789
25    db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29    /// Constructs a new instance using the specified file system path for
30    /// storage.
31    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32        if &path == "/" {
33            return Err(Error::StorageError(
34                "cowardly refusing to open / with redb".to_string(),
35            ));
36        }
37
38        if let Some(parent) = path.parent() {
39            std::fs::create_dir_all(parent)?;
40        }
41
42        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
43            let db = redb::Database::builder().create(path)?;
44
45            create_schema(&db)?;
46            Ok(db)
47        })
48        .await??;
49
50        Ok(Self {
51            instance_name,
52            db: Arc::new(db),
53        })
54    }
55
56    /// Constructs a new instance using the in-memory backend.
57    pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
58        let db =
59            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
60
61        create_schema(&db)?;
62
63        Ok(Self {
64            instance_name,
65            db: Arc::new(db),
66        })
67    }
68}
69
70/// Ensures all tables are present.
71/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
72/// create it if not present.
73#[allow(clippy::result_large_err)]
74fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
75    let txn = db.begin_write()?;
76    txn.open_table(PATHINFO_TABLE)?;
77    txn.commit()?;
78
79    Ok(())
80}
81
82#[async_trait]
83impl PathInfoService for RedbPathInfoService {
84    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
85    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
86        let db = self.db.clone();
87
88        tokio::task::spawn_blocking({
89            move || {
90                let txn = db.begin_read()?;
91                let table = txn.open_table(PATHINFO_TABLE)?;
92                match table.get(digest)? {
93                    Some(pathinfo_bytes) => Ok(Some(
94                        proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
95                            .map_err(|e| {
96                                warn!(err=%e, "failed to decode stored PathInfo");
97                                Error::StorageError("failed to decode stored PathInfo".to_string())
98                            })?
99                            .try_into()
100                            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
101                    )),
102                    None => Ok(None),
103                }
104            }
105        })
106        .await?
107    }
108
109    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
110    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
111        let db = self.db.clone();
112
113        tokio::task::spawn_blocking({
114            let path_info = path_info.clone();
115            move || -> Result<(), Error> {
116                let txn = db.begin_write()?;
117                {
118                    let mut table = txn.open_table(PATHINFO_TABLE)?;
119                    table
120                        .insert(
121                            *path_info.store_path.digest(),
122                            proto::PathInfo::from(path_info).encode_to_vec(),
123                        )
124                        .map_err(|e| {
125                            warn!(err=%e, "failed to insert PathInfo");
126                            Error::StorageError("failed to insert PathInfo".to_string())
127                        })?;
128                }
129                Ok(txn.commit()?)
130            }
131        })
132        .await??;
133
134        Ok(path_info)
135    }
136
137    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
138        let db = self.db.clone();
139        let (tx, rx) = tokio::sync::mpsc::channel(64);
140
141        tokio::task::spawn_blocking(move || {
142            // IIFE to be able to use ? for the error cases
143            let result = (|| -> Result<(), Error> {
144                let read_txn = db.begin_read().map_err(|err| {
145                    warn!(%err, "failed to open read transaction");
146                    Error::StorageError("failed to open read transaction".to_string())
147                })?;
148
149                let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| {
150                    warn!(%err, "failed to open table");
151                    Error::StorageError("failed to open table".to_string())
152                })?;
153
154                let table_iter = table.iter().map_err(|err| {
155                    warn!(%err, "failed to iterate over table items");
156                    Error::StorageError("failed to iterate over table items".into())
157                })?;
158
159                for elem in table_iter {
160                    let elem = elem.map_err(|err| {
161                        warn!(%err, "failed to retrieve item");
162                        Error::StorageError("failed to retrieve item".into())
163                    })?;
164
165                    let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice())
166                        .map_err(|err| {
167                            warn!(%err, "invalid PathInfo");
168                            Error::StorageError("invalid PathInfo".into())
169                        })?;
170
171                    let path_info = PathInfo::try_from(path_info_proto)
172                        .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
173
174                    if tx.blocking_send(Ok(path_info)).is_err() {
175                        break;
176                    }
177                }
178
179                Ok(())
180            })();
181
182            if let Err(err) = result {
183                let _ = tx.blocking_send(Err(err));
184            }
185        });
186
187        ReceiverStream::new(rx).boxed()
188    }
189}
190
191#[derive(serde::Deserialize)]
192#[serde(deny_unknown_fields)]
193pub struct RedbPathInfoServiceConfig {
194    is_temporary: bool,
195    #[serde(default)]
196    /// required when is_temporary = false
197    path: Option<PathBuf>,
198}
199
200impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
201    type Error = Box<dyn std::error::Error + Send + Sync>;
202    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
203        // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only)
204        if url.has_host() {
205            return Err(Error::StorageError("no host allowed".to_string()).into());
206        }
207
208        Ok(if url.path().is_empty() {
209            RedbPathInfoServiceConfig {
210                is_temporary: true,
211                path: None,
212            }
213        } else {
214            RedbPathInfoServiceConfig {
215                is_temporary: false,
216                path: Some(url.path().into()),
217            }
218        })
219    }
220}
221
222#[async_trait]
223impl ServiceBuilder for RedbPathInfoServiceConfig {
224    type Output = dyn PathInfoService;
225    async fn build<'a>(
226        &'a self,
227        instance_name: &str,
228        _context: &CompositionContext,
229    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
230        match self {
231            RedbPathInfoServiceConfig {
232                is_temporary: true,
233                path: None,
234            } => Ok(Arc::new(RedbPathInfoService::new_temporary(
235                instance_name.to_string(),
236            )?)),
237            RedbPathInfoServiceConfig {
238                is_temporary: true,
239                path: Some(_),
240            } => Err(
241                Error::StorageError("Temporary RedbPathInfoService can not have path".into())
242                    .into(),
243            ),
244            RedbPathInfoServiceConfig {
245                is_temporary: false,
246                path: None,
247            } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
248            RedbPathInfoServiceConfig {
249                is_temporary: false,
250                path: Some(path),
251            } => Ok(Arc::new(
252                RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
253            )),
254        }
255    }
256}