Skip to main content

snix_store/pathinfoservice/
redb.rs

1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use data_encoding::BASE64;
4use futures::{StreamExt, TryStreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::composition::{CompositionContext, ServiceBuilder};
8use std::{path::PathBuf, sync::Arc};
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::async_trait;
11use tracing::instrument;
12
13const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
14
15enum Db {
16    ReadOnly(redb::ReadOnlyDatabase),
17    ReadWrite(redb::Database),
18}
19
20impl Db {
21    fn begin_read(&self) -> Result<redb::ReadTransaction, redb::TransactionError> {
22        match self {
23            Db::ReadOnly(db) => db.begin_read(),
24            Db::ReadWrite(db) => db.begin_read(),
25        }
26    }
27
28    fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
29        match self {
30            Db::ReadOnly(_) => Err(Error::OpenedReadonly),
31            Db::ReadWrite(db) => Ok(db.begin_write()?),
32        }
33    }
34}
35
36/// PathInfoService implementation using redb under the hood.
37/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
38/// its corresponding protobuf-encoded PathInfo.
39#[derive(Clone)]
40pub struct RedbPathInfoService {
41    instance_name: String,
42
43    /// An Arc'ed Database, read-only or writeable.
44    db: Arc<Db>,
45}
46
47impl RedbPathInfoService {
48    /// Constructs a new instance using the specified config.
49    pub async fn new(
50        instance_name: String,
51        config: RedbPathInfoServiceConfig,
52    ) -> Result<Self, Error> {
53        if let Some(path) = config.path.clone() {
54            if &path == "" {
55                return Err(Error::WrongConfig("empty path is disallowed"));
56            }
57            if &path == "/" {
58                return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
59            }
60
61            if config.read_only {
62                let db = tokio::task::spawn_blocking(move || {
63                    let mut builder = redb::Database::builder();
64                    configure_builder(&mut builder, &config);
65                    builder.open_read_only(path)
66                })
67                .await??;
68
69                return Ok(Self {
70                    instance_name,
71                    db: Arc::new(Db::ReadOnly(db)),
72                });
73            }
74
75            if let Some(parent) = path.parent() {
76                tokio::fs::create_dir_all(parent).await?;
77            }
78
79            let db = tokio::task::spawn_blocking(move || {
80                let mut builder = redb::Database::builder();
81                configure_builder(&mut builder, &config);
82
83                let db = builder.create(path)?;
84                create_schema(&db)?;
85                Ok::<_, Error>(db)
86            })
87            .await??;
88
89            Ok(Self {
90                instance_name,
91                db: Arc::new(Db::ReadWrite(db)),
92            })
93        } else {
94            Self::new_temporary(instance_name, config)
95        }
96    }
97
98    /// Constructs a new instance using the in-memory backend.
99    /// Sync, as there's no real IO happening.
100    pub fn new_temporary(
101        instance_name: String,
102        config: RedbPathInfoServiceConfig,
103    ) -> Result<Self, Error> {
104        debug_assert!(
105            config.path.is_none(),
106            "Snix bug: config.path is not None, but new_temporary requested"
107        );
108
109        if config.read_only {
110            return Err(Error::WrongConfig("in-memory database cannot be read-only"));
111        }
112
113        let mut builder = redb::Database::builder();
114        configure_builder(&mut builder, &config);
115
116        let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
117
118        create_schema(&db)?;
119
120        Ok(RedbPathInfoService {
121            instance_name,
122            db: Arc::new(Db::ReadWrite(db)),
123        })
124    }
125}
126
127/// Applies options from [RedbPathInfoServiceConfig] to a [redb::Builder].
128fn configure_builder(builder: &mut redb::Builder, config: &RedbPathInfoServiceConfig) {
129    if let Some(cache_size) = config.cache_size {
130        builder.set_cache_size(cache_size);
131    }
132}
133
134/// Ensures all tables are present.
135/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
136/// create it if not present.
137#[allow(clippy::result_large_err)]
138fn create_schema(db: &redb::Database) -> Result<(), Error> {
139    let txn = db.begin_write()?;
140    txn.open_table(PATHINFO_TABLE)?;
141    txn.commit()?;
142
143    Ok(())
144}
145
146#[async_trait]
147impl PathInfoService for RedbPathInfoService {
148    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
149    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
150        let db = self.db.clone();
151
152        let path_info_bytes = match tokio::task::spawn_blocking({
153            move || -> Result<_, Error> {
154                let txn = db.begin_read()?;
155                let table = txn.open_table(PATHINFO_TABLE)?;
156                Ok(table.get(digest)?)
157            }
158        })
159        .await??
160        {
161            // The PathInfo was not found, return None.
162            None => return Ok(None),
163            Some(path_info_data) => path_info_data.value(),
164        };
165
166        let pathinfo_proto =
167            proto::PathInfo::decode(path_info_bytes.as_slice()).map_err(Error::ProtobufDecode)?;
168        let path_info = PathInfo::try_from(pathinfo_proto).map_err(Error::PathInfoValidation)?;
169
170        return Ok(Some(path_info));
171    }
172
173    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
174    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
175        let db = self.db.clone();
176        tokio::task::spawn_blocking({
177            let path_info = path_info.clone();
178            move || -> Result<(), Error> {
179                let txn = db.begin_write()?;
180                {
181                    let mut table = txn.open_table(PATHINFO_TABLE)?;
182                    table.insert(
183                        *path_info.store_path.digest(),
184                        proto::PathInfo::from(path_info).encode_to_vec(),
185                    )?;
186                }
187                txn.commit()?;
188                Ok(())
189            }
190        })
191        .await??;
192
193        Ok(path_info)
194    }
195
196    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
197        let db = self.db.clone();
198        let (tx, rx) = tokio::sync::mpsc::channel(64);
199
200        tokio::task::spawn_blocking(move || {
201            // IIFE to be able to use ? for the error cases
202            let result = (|| -> Result<(), Error> {
203                let read_txn = db.begin_read()?;
204
205                let table = read_txn.open_table(PATHINFO_TABLE)?;
206
207                let table_iter = table.iter()?;
208
209                for elem in table_iter {
210                    let path_info_proto = proto::PathInfo::decode(elem?.1.value().as_slice())?;
211
212                    let path_info = PathInfo::try_from(path_info_proto)?;
213
214                    if tx.blocking_send(Ok(path_info)).is_err() {
215                        break;
216                    }
217                }
218
219                Ok(())
220            })();
221
222            if let Err(err) = result {
223                let _ = tx.blocking_send(Err(err));
224            }
225        });
226
227        ReceiverStream::new(rx).err_into().boxed()
228    }
229}
230
231#[derive(thiserror::Error, Debug)]
232pub enum Error {
233    #[error("wrong arguments: {0}")]
234    WrongConfig(&'static str),
235    #[error("serde-qs error: {0}")]
236    SerdeQS(#[from] serde_qs::Error),
237
238    #[error("failed to decode protobuf: {0}")]
239    ProtobufDecode(#[from] prost::DecodeError),
240    #[error("failed to validate PathInfo: {0}")]
241    PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
242
243    #[error("unable to open write txn, database opened read-only")]
244    OpenedReadonly,
245
246    #[error("redb commit error: {0}")]
247    RedbCommit(#[from] redb::CommitError),
248    #[error("redb database error: {0}")]
249    RedbDatabase(#[from] redb::DatabaseError),
250    #[error("redb error: {0}")]
251    Redb(#[from] redb::Error),
252    #[error("redb storage error: {0}")]
253    RedbStorage(#[from] redb::StorageError),
254    #[error("redb table error: {0}")]
255    RedbTable(#[from] redb::TableError),
256    #[error("redb txn error: {0}")]
257    RedbTransaction(#[from] redb::TransactionError),
258
259    #[error("join error: {0}")]
260    TokioJoin(#[from] tokio::task::JoinError),
261    #[error("io error: {0}")]
262    IO(#[from] std::io::Error),
263}
264
265#[derive(Clone, Default, serde::Deserialize)]
266#[serde(deny_unknown_fields)]
267pub struct RedbPathInfoServiceConfig {
268    path: Option<PathBuf>,
269
270    /// The amount of memory (in bytes) used for caching data
271    cache_size: Option<usize>,
272
273    /// Whether to open read-only.
274    #[serde(default)]
275    read_only: bool,
276}
277
278impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
279    type Error = Box<dyn std::error::Error + Send + Sync>;
280    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
281        if url.has_host() {
282            return Err(Error::WrongConfig("no host allowed").into());
283        }
284
285        let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
286            ("redb+memory", false, "") => None,
287            ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
288                "redb+memory with path is disallowed",
289            )))?,
290            ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
291                "redb+memory may not have authority",
292            )))?,
293            ("redb", _, "") => Err(Box::new(Error::WrongConfig(
294                "redb without path is disallowed, use redb+memory if you want in-memory",
295            )))?,
296            ("redb", true, _path) => Err(Box::new(Error::WrongConfig("authority disallowed")))?,
297            ("redb", false, path) => Some(path.into()),
298            (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
299        };
300
301        let mut config: RedbPathInfoServiceConfig =
302            serde_qs::from_str(url.query().unwrap_or_default())?;
303
304        config.path = path;
305
306        Ok(config)
307    }
308}
309
310#[async_trait]
311impl ServiceBuilder for RedbPathInfoServiceConfig {
312    type Output = dyn PathInfoService;
313    async fn build<'a>(
314        &'a self,
315        instance_name: &str,
316        _context: &CompositionContext,
317    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
318        Ok(Arc::new(
319            RedbPathInfoService::new(instance_name.to_string(), self.to_owned()).await?,
320        ))
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use tempfile::TempDir;
327
328    use crate::fixtures::{DUMMY_PATH_DIGEST, PATH_INFO};
329    use crate::pathinfoservice::{PathInfoService, RedbPathInfoService, RedbPathInfoServiceConfig};
330
331    #[tokio::test]
332    async fn reopen_as_read_only() {
333        let tempdir = TempDir::new().unwrap();
334        let path = tempdir.path().join("data.redb");
335
336        let config = RedbPathInfoServiceConfig {
337            path: Some(path),
338            cache_size: None,
339            read_only: false,
340        };
341
342        // Create a read-write path info service and insert some data.
343        {
344            let path_info_service = RedbPathInfoService::new("rw".to_string(), config.clone())
345                .await
346                .expect("to construct");
347
348            path_info_service
349                .put(PATH_INFO.clone())
350                .await
351                .expect("to insert");
352        } // we drop the rw database here.
353
354        // Re-open the same path in ro mode (twice)
355        let ro_config = RedbPathInfoServiceConfig {
356            read_only: true,
357            ..config
358        };
359
360        let path_info_service_ro_1 = RedbPathInfoService::new("ro1".to_string(), ro_config.clone())
361            .await
362            .expect("to construct");
363        let path_info_service_ro_2 = RedbPathInfoService::new("ro2".to_string(), ro_config)
364            .await
365            .expect("to construct");
366
367        assert_eq!(
368            path_info_service_ro_1
369                .get(DUMMY_PATH_DIGEST)
370                .await
371                .expect("get to succeed")
372                .expect("to be Some(_)")
373                .store_path
374                .digest(),
375            &DUMMY_PATH_DIGEST,
376        );
377        assert_eq!(
378            path_info_service_ro_2
379                .get(DUMMY_PATH_DIGEST)
380                .await
381                .expect("get to succeed")
382                .expect("to be Some(_)")
383                .store_path
384                .digest(),
385            &DUMMY_PATH_DIGEST,
386        );
387    }
388
389    #[tokio::test]
390    async fn read_only_nonexistent() {
391        let tempdir = TempDir::new().unwrap();
392        let path = tempdir.path().join("data.redb");
393
394        let config = RedbPathInfoServiceConfig {
395            path: Some(path),
396            cache_size: None,
397            read_only: true,
398        };
399
400        // Opening a read-only redb should fail if the path doesn't exist.
401        assert!(
402            RedbPathInfoService::new("test".to_string(), config)
403                .await
404                .is_err(),
405            "opening new path r/o should fail"
406        );
407    }
408
409    #[tokio::test]
410    async fn open_rw_and_ro() {
411        let tempdir = TempDir::new().unwrap();
412        let path = tempdir.path().join("data.redb");
413
414        let config = RedbPathInfoServiceConfig {
415            path: Some(path),
416            cache_size: None,
417            read_only: false,
418        };
419
420        let _path_info_service = RedbPathInfoService::new("rw".to_string(), config.clone())
421            .await
422            .expect("to construct");
423
424        // Opening a read-only redb should fail if it's already opened read-write.
425        assert!(
426            RedbPathInfoService::new(
427                "ro".to_string(),
428                RedbPathInfoServiceConfig {
429                    read_only: true,
430                    ..config
431                }
432            )
433            .await
434            .is_err(),
435            "opening r/o should fail if still open r/w"
436        );
437    }
438}