snix_castore/directoryservice/
redb.rs

1use futures::{StreamExt, TryStreamExt, stream::BoxStream};
2use prost::Message;
3use redb::{ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traversal};
9use crate::{
10    B3Digest,
11    composition::{CompositionContext, ServiceBuilder},
12    directoryservice::directory_graph::DirectoryGraphBuilder,
13    proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17    TableDefinition::new("directory");
18
19enum Db {
20    ReadOnly(redb::ReadOnlyDatabase),
21    ReadWrite(redb::Database),
22}
23
24impl Db {
25    fn begin_read(&self) -> Result<redb::ReadTransaction, Error> {
26        match self {
27            Db::ReadOnly(db) => Ok(db.begin_read()?),
28            Db::ReadWrite(db) => Ok(db.begin_read()?),
29        }
30    }
31
32    fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
33        match self {
34            Db::ReadOnly(_) => Err(Error::OpenedReadonly),
35            Db::ReadWrite(db) => Ok(db.begin_write()?),
36        }
37    }
38}
39
40#[derive(Clone)]
41pub struct RedbDirectoryService {
42    instance_name: String,
43
44    /// An Arc'ed Database, read-only or writeable.
45    db: Arc<Db>,
46}
47
48impl RedbDirectoryService {
49    /// Constructs a new instance using the specified config.
50    pub async fn new(
51        instance_name: String,
52        config: RedbDirectoryServiceConfig,
53    ) -> Result<Self, Error> {
54        if let Some(path) = config.path.clone() {
55            if &path == "" {
56                return Err(Error::WrongConfig("empty path is disallowed"));
57            }
58            if &path == "/" {
59                return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
60            }
61
62            if config.read_only {
63                let db =
64                    tokio::task::spawn_blocking(|| redb::Database::builder().open_read_only(path))
65                        .await??;
66
67                return Ok(Self {
68                    instance_name,
69                    db: Arc::new(Db::ReadOnly(db)),
70                });
71            }
72
73            if let Some(parent) = path.parent() {
74                tokio::fs::create_dir_all(parent).await?;
75            }
76
77            let db = tokio::task::spawn_blocking(move || {
78                let mut builder = redb::Database::builder();
79                configure_builder(&mut builder, &config);
80
81                let db = builder.create(path)?;
82                create_schema(&db)?;
83                Ok::<_, Error>(db)
84            })
85            .await??;
86
87            Ok(Self {
88                instance_name,
89                db: Arc::new(Db::ReadWrite(db)),
90            })
91        } else {
92            Self::new_temporary(instance_name, config)
93        }
94    }
95
96    /// Constructs a new instance using the in-memory backend.
97    /// Sync, as there's no real IO happening.
98    pub fn new_temporary(
99        instance_name: String,
100        config: RedbDirectoryServiceConfig,
101    ) -> Result<Self, Error> {
102        debug_assert!(
103            config.path.is_none(),
104            "Snix bug: config.path is not None, but new_temporary requested"
105        );
106
107        if config.read_only {
108            return Err(Error::WrongConfig("in-memory database cannot be read-only"));
109        }
110
111        let mut builder = redb::Database::builder();
112        configure_builder(&mut builder, &config);
113
114        let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
115
116        create_schema(&db)?;
117
118        Ok(Self {
119            instance_name,
120            db: Arc::new(Db::ReadWrite(db)),
121        })
122    }
123}
124
125/// Applies options from [RedbDirectoryServiceConfig] to a [redb::Builder].
126fn configure_builder(builder: &mut redb::Builder, config: &RedbDirectoryServiceConfig) {
127    if let Some(cache_size) = config.cache_size {
128        builder.set_cache_size(cache_size);
129    }
130}
131
132/// Ensures all tables are present.
133/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
134/// create it if not present.
135#[allow(clippy::result_large_err)]
136fn create_schema(db: &redb::Database) -> Result<(), Error> {
137    let txn = db.begin_write()?;
138    txn.open_table(DIRECTORY_TABLE)?;
139    txn.commit()?;
140
141    Ok(())
142}
143
144#[async_trait]
145impl DirectoryService for RedbDirectoryService {
146    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
147    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
148        let db = self.db.clone();
149        let digest = *digest;
150        // Retrieves the protobuf-encoded Directory for the corresponding digest.
151        let directory_data = match tokio::task::spawn_blocking(move || -> Result<_, Error> {
152            let txn = db.begin_read()?;
153            let table = txn.open_table(DIRECTORY_TABLE)?;
154            Ok(table.get(*digest)?)
155        })
156        .await??
157        {
158            // The Directory was not found, return None.
159            None => return Ok(None),
160            Some(directory_data) => directory_data.value(),
161        };
162
163        // We check that the digest of the retrieved Directory matches the expected digest.
164        let actual = B3Digest::from(blake3::hash(&directory_data));
165        if actual != digest {
166            return Err(Error::WrongDigest {
167                expected: digest,
168                actual,
169            }
170            .into());
171        }
172
173        // Attempt to decode the retrieved protobuf-encoded Directory
174        let proto_directory =
175            proto::Directory::decode(directory_data.as_slice()).map_err(Error::ProtobufDecode)?;
176        let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
177
178        Ok(Some(directory))
179    }
180
181    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
182    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
183        let db = self.db.clone();
184        let digest = tokio::task::spawn_blocking(move || -> Result<_, Error> {
185            let digest = directory.digest();
186
187            // Store the directory in the table.
188            let txn = db.begin_write()?;
189            {
190                let mut table = txn.open_table(DIRECTORY_TABLE)?;
191                table.insert(
192                    digest.as_ref(),
193                    proto::Directory::from(directory).encode_to_vec(),
194                )?;
195            }
196            txn.commit()?;
197
198            Ok(digest)
199        })
200        .await??;
201
202        Ok(digest)
203    }
204
205    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
206    fn get_recursive(
207        &self,
208        root_directory_digest: &B3Digest,
209    ) -> BoxStream<'static, Result<Directory, super::Error>> {
210        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
211        // redb transaction to avoid constantly closing and opening new transactions for the
212        // database.
213        let svc = self.clone();
214        traversal::root_to_leaves(*root_directory_digest, move |digest| {
215            let svc = svc.clone();
216            async move { svc.get(&digest).await }
217        })
218        .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
219        .err_into()
220        .boxed()
221    }
222
223    #[instrument(skip_all)]
224    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
225        Box::new(RedbDirectoryPutter {
226            db: &self.db,
227            builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
228        })
229    }
230}
231
232pub struct RedbDirectoryPutter<'a> {
233    db: &'a Db,
234
235    /// The directories (inside the directory validator) that we insert later,
236    /// or None, if they were already inserted.
237    builder: Option<DirectoryGraphBuilder>,
238}
239
240#[async_trait]
241impl DirectoryPutter for RedbDirectoryPutter<'_> {
242    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
243    async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
244        let builder = self
245            .builder
246            .as_mut()
247            .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
248
249        builder
250            .try_insert(directory)
251            .map_err(Error::DirectoryOrdering)?;
252
253        Ok(())
254    }
255
256    #[instrument(level = "trace", skip_all, ret, err)]
257    async fn close(&mut self) -> Result<B3Digest, super::Error> {
258        let builder = self
259            .builder
260            .take()
261            .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
262
263        // Insert all directories as a batch.
264        let root_digest = tokio::task::spawn_blocking({
265            let txn = self.db.begin_write()?;
266            move || {
267                // Retrieve the validated directories.
268                let directory_graph = builder.build().map_err(Error::DirectoryOrdering)?;
269                let root_digest = directory_graph.root().digest();
270
271                // Looping over all the verified directories, queuing them up for a
272                // batch insertion.
273                {
274                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
275                    for directory in directory_graph.drain_leaves_to_root() {
276                        table.insert(
277                            directory.digest().as_ref(),
278                            proto::Directory::from(directory).encode_to_vec(),
279                        )?;
280                    }
281                }
282                txn.commit()?;
283
284                Ok::<_, Error>(root_digest)
285            }
286        })
287        .await??;
288
289        Ok(root_digest)
290    }
291}
292
293#[derive(thiserror::Error, Debug)]
294pub enum Error {
295    #[error("wrong arguments: {0}")]
296    WrongConfig(&'static str),
297    #[error("serde-qs error: {0}")]
298    SerdeQS(#[from] serde_qs::Error),
299
300    #[error("Directory Graph ordering error")]
301    DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
302
303    #[error("DirectoryPutter already closed")]
304    DirectoryPutterAlreadyClosed,
305
306    #[error("failure during directory traversal")]
307    DirectoryTraversal(#[source] traversal::Error),
308
309    #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
310    WrongDigest {
311        expected: B3Digest,
312        actual: B3Digest,
313    },
314    #[error("failed to decode protobuf: {0}")]
315    ProtobufDecode(#[from] prost::DecodeError),
316    #[error("failed to validate directory: {0}")]
317    DirectoryValidation(#[from] crate::DirectoryError),
318
319    #[error("unable to open write txn, database opened read-only")]
320    OpenedReadonly,
321    #[error("redb commit error: {0}")]
322    RedbCommit(#[from] redb::CommitError),
323    #[error("redb database error: {0}")]
324    RedbDatabase(#[from] redb::DatabaseError),
325    #[error("redb error: {0}")]
326    Redb(#[from] redb::Error),
327    #[error("redb storage error: {0}")]
328    RedbStorage(#[from] redb::StorageError),
329    #[error("redb table error: {0}")]
330    RedbTable(#[from] redb::TableError),
331    #[error("redb txn error: {0}")]
332    RedbTransaction(#[from] redb::TransactionError),
333
334    #[error("join error: {0}")]
335    TokioJoin(#[from] tokio::task::JoinError),
336    #[error("io error: {0}")]
337    IO(#[from] std::io::Error),
338}
339
340#[derive(Clone, Default, serde::Deserialize)]
341#[serde(deny_unknown_fields)]
342pub struct RedbDirectoryServiceConfig {
343    path: Option<PathBuf>,
344
345    /// The amount of memory (in bytes) used for caching data
346    cache_size: Option<usize>,
347
348    /// Whether to open read-only.
349    #[serde(default)]
350    read_only: bool,
351}
352
353impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
354    type Error = Box<dyn std::error::Error + Send + Sync>;
355
356    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
357        if url.has_host() {
358            return Err(Error::WrongConfig("no host allowed").into());
359        }
360
361        let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
362            ("redb+memory", false, "") => None,
363            ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
364                "redb+memory with path is disallowed",
365            )))?,
366            ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
367                "redb+memory may not have authority",
368            )))?,
369            ("redb", _, "") => Err(Box::new(Error::WrongConfig(
370                "redb without path is disallowed, use redb+memory if you want in-memory",
371            )))?,
372            ("redb", _, path) => Some(path.into()),
373            (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
374        };
375
376        let mut config: RedbDirectoryServiceConfig =
377            serde_qs::from_str(url.query().unwrap_or_default())?;
378
379        config.path = path;
380
381        Ok(config)
382    }
383}
384
385#[async_trait]
386impl ServiceBuilder for RedbDirectoryServiceConfig {
387    type Output = dyn DirectoryService;
388    async fn build<'a>(
389        &'a self,
390        instance_name: &str,
391        _context: &CompositionContext,
392    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
393        Ok(Arc::new(
394            RedbDirectoryService::new(instance_name.to_string(), self.to_owned()).await?,
395        ))
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use tempfile::TempDir;
402
403    use crate::{
404        directoryservice::{DirectoryService, RedbDirectoryService, RedbDirectoryServiceConfig},
405        fixtures::DIRECTORY_A,
406    };
407
408    #[tokio::test]
409    async fn reopen_as_read_only() {
410        let tempdir = TempDir::new().unwrap();
411        let path = tempdir.path().join("data.redb");
412
413        let config = RedbDirectoryServiceConfig {
414            path: Some(path),
415            cache_size: None,
416            read_only: false,
417        };
418
419        // Create a read-write directory service and insert some data.
420        {
421            let directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
422                .await
423                .expect("to construct");
424
425            directory_service
426                .put(DIRECTORY_A.clone())
427                .await
428                .expect("to insert");
429        } // we drop the rw database here.
430
431        // Re-open the same path in ro mode (twice)
432        let ro_config = RedbDirectoryServiceConfig {
433            read_only: true,
434            ..config
435        };
436
437        let directory_service_ro_1 =
438            RedbDirectoryService::new("ro1".to_string(), ro_config.clone())
439                .await
440                .expect("to construct");
441        let directory_service_ro_2 = RedbDirectoryService::new("ro2".to_string(), ro_config)
442            .await
443            .expect("to construct");
444
445        assert_eq!(
446            directory_service_ro_1
447                .get(&DIRECTORY_A.digest())
448                .await
449                .expect("get to succeed")
450                .expect("to be Some(_)")
451                .digest(),
452            DIRECTORY_A.digest()
453        );
454        assert_eq!(
455            directory_service_ro_2
456                .get(&DIRECTORY_A.digest())
457                .await
458                .expect("get to succeed")
459                .expect("to be Some(_)")
460                .digest(),
461            DIRECTORY_A.digest()
462        );
463    }
464
465    #[tokio::test]
466    async fn read_only_nonexistent() {
467        let tempdir = TempDir::new().unwrap();
468        let path = tempdir.path().join("data.redb");
469
470        let config = RedbDirectoryServiceConfig {
471            path: Some(path),
472            cache_size: None,
473            read_only: true,
474        };
475
476        // Opening a read-only redb should fail if the path doesn't exist.
477        assert!(
478            RedbDirectoryService::new("test".to_string(), config)
479                .await
480                .is_err(),
481            "opening new path r/o should fail"
482        );
483    }
484
485    #[tokio::test]
486    async fn open_rw_and_ro() {
487        let tempdir = TempDir::new().unwrap();
488        let path = tempdir.path().join("data.redb");
489
490        let config = RedbDirectoryServiceConfig {
491            path: Some(path),
492            cache_size: None,
493            read_only: false,
494        };
495
496        let _directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
497            .await
498            .expect("to construct");
499
500        // Opening a read-only redb should fail if it's already opened read-write.
501        assert!(
502            RedbDirectoryService::new(
503                "ro".to_string(),
504                RedbDirectoryServiceConfig {
505                    read_only: true,
506                    ..config
507                }
508            )
509            .await
510            .is_err(),
511            "opening r/o should fail if still open r/w"
512        );
513    }
514}