Skip to main content

snix_castore/directoryservice/
redb.rs

1use futures::{StreamExt, TryStreamExt, stream::BoxStream};
2use prost::Message;
3use redb::{ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traversal};
9use crate::{
10    B3Digest,
11    composition::{CompositionContext, ServiceBuilder},
12    directoryservice::directory_graph::DirectoryGraphBuilder,
13    proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17    TableDefinition::new("directory");
18
19enum Db {
20    ReadOnly(redb::ReadOnlyDatabase),
21    ReadWrite(redb::Database),
22}
23
24impl Db {
25    fn begin_read(&self) -> Result<redb::ReadTransaction, Error> {
26        match self {
27            Db::ReadOnly(db) => Ok(db.begin_read()?),
28            Db::ReadWrite(db) => Ok(db.begin_read()?),
29        }
30    }
31
32    fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
33        match self {
34            Db::ReadOnly(_) => Err(Error::OpenedReadonly),
35            Db::ReadWrite(db) => Ok(db.begin_write()?),
36        }
37    }
38}
39
40#[derive(Clone)]
41pub struct RedbDirectoryService {
42    instance_name: String,
43
44    /// An Arc'ed Database, read-only or writeable.
45    db: Arc<Db>,
46}
47
48impl RedbDirectoryService {
49    /// Constructs a new instance using the specified config.
50    pub async fn new(
51        instance_name: String,
52        config: RedbDirectoryServiceConfig,
53    ) -> Result<Self, Error> {
54        if let Some(path) = config.path.clone() {
55            if &path == "" {
56                return Err(Error::WrongConfig("empty path is disallowed"));
57            }
58            if &path == "/" {
59                return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
60            }
61
62            if config.read_only {
63                let db = tokio::task::spawn_blocking(move || {
64                    let mut builder = redb::Database::builder();
65                    configure_builder(&mut builder, &config);
66                    builder.open_read_only(path)
67                })
68                .await??;
69
70                return Ok(Self {
71                    instance_name,
72                    db: Arc::new(Db::ReadOnly(db)),
73                });
74            }
75
76            if let Some(parent) = path.parent() {
77                tokio::fs::create_dir_all(parent).await?;
78            }
79
80            let db = tokio::task::spawn_blocking(move || {
81                let mut builder = redb::Database::builder();
82                configure_builder(&mut builder, &config);
83
84                let db = builder.create(path)?;
85                create_schema(&db)?;
86                Ok::<_, Error>(db)
87            })
88            .await??;
89
90            Ok(Self {
91                instance_name,
92                db: Arc::new(Db::ReadWrite(db)),
93            })
94        } else {
95            Self::new_temporary(instance_name, config)
96        }
97    }
98
99    /// Constructs a new instance using the in-memory backend.
100    /// Sync, as there's no real IO happening.
101    pub fn new_temporary(
102        instance_name: String,
103        config: RedbDirectoryServiceConfig,
104    ) -> Result<Self, Error> {
105        debug_assert!(
106            config.path.is_none(),
107            "Snix bug: config.path is not None, but new_temporary requested"
108        );
109
110        if config.read_only {
111            return Err(Error::WrongConfig("in-memory database cannot be read-only"));
112        }
113
114        let mut builder = redb::Database::builder();
115        configure_builder(&mut builder, &config);
116
117        let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
118
119        create_schema(&db)?;
120
121        Ok(Self {
122            instance_name,
123            db: Arc::new(Db::ReadWrite(db)),
124        })
125    }
126}
127
128/// Applies options from [RedbDirectoryServiceConfig] to a [redb::Builder].
129fn configure_builder(builder: &mut redb::Builder, config: &RedbDirectoryServiceConfig) {
130    if let Some(cache_size) = config.cache_size {
131        builder.set_cache_size(cache_size);
132    }
133}
134
135/// Ensures all tables are present.
136/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
137/// create it if not present.
138#[allow(clippy::result_large_err)]
139fn create_schema(db: &redb::Database) -> Result<(), Error> {
140    let txn = db.begin_write()?;
141    txn.open_table(DIRECTORY_TABLE)?;
142    txn.commit()?;
143
144    Ok(())
145}
146
147#[async_trait]
148impl DirectoryService for RedbDirectoryService {
149    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
150    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
151        let db = self.db.clone();
152        let digest = *digest;
153        // Retrieves the protobuf-encoded Directory for the corresponding digest.
154        let directory_data = match tokio::task::spawn_blocking(move || -> Result<_, Error> {
155            let txn = db.begin_read()?;
156            let table = txn.open_table(DIRECTORY_TABLE)?;
157            Ok(table.get(*digest)?)
158        })
159        .await
160        .map_err(Error::TokioJoin)??
161        {
162            // The Directory was not found, return None.
163            None => return Ok(None),
164            Some(directory_data) => directory_data.value(),
165        };
166
167        // We check that the digest of the retrieved Directory matches the expected digest.
168        let actual = B3Digest::from(blake3::hash(&directory_data));
169        if actual != digest {
170            return Err(Error::WrongDigest {
171                expected: digest,
172                actual,
173            }
174            .into());
175        }
176
177        // Attempt to decode the retrieved protobuf-encoded Directory
178        let proto_directory =
179            proto::Directory::decode(directory_data.as_slice()).map_err(Error::ProtobufDecode)?;
180        let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
181
182        Ok(Some(directory))
183    }
184
185    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
186    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
187        let db = self.db.clone();
188        let digest = tokio::task::spawn_blocking(move || -> Result<_, Error> {
189            let digest = directory.digest();
190
191            // Store the directory in the table.
192            let txn = db.begin_write()?;
193            {
194                let mut table = txn.open_table(DIRECTORY_TABLE)?;
195                table.insert(
196                    digest.as_ref(),
197                    proto::Directory::from(directory).encode_to_vec(),
198                )?;
199            }
200            txn.commit()?;
201
202            Ok(digest)
203        })
204        .await
205        .map_err(Error::TokioJoin)??;
206
207        Ok(digest)
208    }
209
210    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
211    fn get_recursive(
212        &self,
213        root_directory_digest: &B3Digest,
214    ) -> BoxStream<'static, Result<Directory, super::Error>> {
215        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
216        // redb transaction to avoid constantly closing and opening new transactions for the
217        // database.
218        let svc = self.clone();
219        traversal::root_to_leaves(*root_directory_digest, move |digest| {
220            let svc = svc.clone();
221            async move { svc.get(&digest).await }
222        })
223        .map_err(Error::DirectoryTraversal)
224        .err_into()
225        .boxed()
226    }
227
228    #[instrument(skip_all)]
229    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
230        Box::new(RedbDirectoryPutter {
231            db: self.db.clone(),
232            builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
233        })
234    }
235}
236
237pub struct RedbDirectoryPutter {
238    db: Arc<Db>,
239
240    /// The directories (inside the directory validator) that we insert later,
241    /// or None, if they were already inserted.
242    builder: Option<DirectoryGraphBuilder>,
243}
244
245#[async_trait]
246impl DirectoryPutter for RedbDirectoryPutter {
247    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
248    async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
249        let builder = self
250            .builder
251            .as_mut()
252            .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
253
254        builder
255            .try_insert(directory)
256            .map_err(Error::DirectoryOrdering)?;
257
258        Ok(())
259    }
260
261    #[instrument(level = "trace", skip_all, ret, err)]
262    async fn close(&mut self) -> Result<B3Digest, super::Error> {
263        let builder = self
264            .builder
265            .take()
266            .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
267
268        // Insert all directories as a batch.
269        let db = self.db.clone();
270        let root_digest = tokio::task::spawn_blocking(move || {
271            // Retrieve the validated directories.
272            let directory_graph = builder.build().map_err(Error::DirectoryOrdering)?;
273            let root_digest = directory_graph.root().digest();
274
275            let txn = db.begin_write()?;
276            // Looping over all the verified directories, queuing them up for a
277            // batch insertion.
278            {
279                let mut table = txn.open_table(DIRECTORY_TABLE)?;
280                for directory in directory_graph.drain_leaves_to_root() {
281                    table.insert(
282                        directory.digest().as_ref(),
283                        proto::Directory::from(directory).encode_to_vec(),
284                    )?;
285                }
286            }
287            txn.commit()?;
288
289            Ok::<_, Error>(root_digest)
290        })
291        .await
292        .map_err(Error::TokioJoin)??;
293
294        Ok(root_digest)
295    }
296}
297
298#[derive(thiserror::Error, Debug)]
299pub enum Error {
300    #[error("wrong arguments: {0}")]
301    WrongConfig(&'static str),
302    #[error("serde-qs error: {0}")]
303    SerdeQS(#[from] serde_qs::Error),
304
305    #[error("Directory Graph ordering error")]
306    DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
307
308    #[error("DirectoryPutter already closed")]
309    DirectoryPutterAlreadyClosed,
310
311    #[error("failure during directory traversal")]
312    DirectoryTraversal(#[source] traversal::Error),
313
314    #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
315    WrongDigest {
316        expected: B3Digest,
317        actual: B3Digest,
318    },
319    #[error("failed to decode protobuf: {0}")]
320    ProtobufDecode(#[from] prost::DecodeError),
321    #[error("failed to validate directory: {0}")]
322    DirectoryValidation(#[from] crate::DirectoryError),
323
324    #[error("unable to open write txn, database opened read-only")]
325    OpenedReadonly,
326    #[error("redb commit error: {0}")]
327    RedbCommit(#[from] redb::CommitError),
328    #[error("redb database error: {0}")]
329    RedbDatabase(#[from] redb::DatabaseError),
330    #[error("redb error: {0}")]
331    Redb(#[from] redb::Error),
332    #[error("redb storage error: {0}")]
333    RedbStorage(#[from] redb::StorageError),
334    #[error("redb table error: {0}")]
335    RedbTable(#[from] redb::TableError),
336    #[error("redb txn error: {0}")]
337    RedbTransaction(#[from] redb::TransactionError),
338
339    #[error("join error: {0}")]
340    TokioJoin(#[from] tokio::task::JoinError),
341    #[error("io error: {0}")]
342    IO(#[from] std::io::Error),
343}
344
345impl From<Error> for super::Error {
346    fn from(value: Error) -> Self {
347        Self(Box::new(value))
348    }
349}
350
351#[derive(Clone, Default, serde::Deserialize)]
352#[serde(deny_unknown_fields)]
353pub struct RedbDirectoryServiceConfig {
354    path: Option<PathBuf>,
355
356    /// The amount of memory (in bytes) used for caching data
357    cache_size: Option<usize>,
358
359    /// Whether to open read-only.
360    #[serde(default)]
361    read_only: bool,
362}
363
364impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
365    type Error = Box<dyn std::error::Error + Send + Sync>;
366
367    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
368        if url.has_host() {
369            return Err(Error::WrongConfig("no host allowed").into());
370        }
371
372        let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
373            ("redb+memory", false, "") => None,
374            ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
375                "redb+memory with path is disallowed",
376            )))?,
377            ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
378                "redb+memory may not have authority",
379            )))?,
380            ("redb", _, "") => Err(Box::new(Error::WrongConfig(
381                "redb without path is disallowed, use redb+memory if you want in-memory",
382            )))?,
383            ("redb", true, _path) => Err(Box::new(Error::WrongConfig("authority disallowed")))?,
384            ("redb", false, path) => Some(path.into()),
385            (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
386        };
387
388        let mut config: RedbDirectoryServiceConfig =
389            serde_qs::from_str(url.query().unwrap_or_default())?;
390
391        config.path = path;
392
393        Ok(config)
394    }
395}
396
397#[async_trait]
398impl ServiceBuilder for RedbDirectoryServiceConfig {
399    type Output = dyn DirectoryService;
400    async fn build<'a>(
401        &'a self,
402        instance_name: &str,
403        _context: &CompositionContext,
404    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
405        Ok(Arc::new(
406            RedbDirectoryService::new(instance_name.to_string(), self.to_owned()).await?,
407        ))
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use tempfile::TempDir;
414
415    use crate::{
416        directoryservice::{DirectoryService, RedbDirectoryService, RedbDirectoryServiceConfig},
417        fixtures::DIRECTORY_A,
418    };
419
420    #[tokio::test]
421    async fn reopen_as_read_only() {
422        let tempdir = TempDir::new().unwrap();
423        let path = tempdir.path().join("data.redb");
424
425        let config = RedbDirectoryServiceConfig {
426            path: Some(path),
427            cache_size: None,
428            read_only: false,
429        };
430
431        // Create a read-write directory service and insert some data.
432        {
433            let directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
434                .await
435                .expect("to construct");
436
437            directory_service
438                .put(DIRECTORY_A.clone())
439                .await
440                .expect("to insert");
441        } // we drop the rw database here.
442
443        // Re-open the same path in ro mode (twice)
444        let ro_config = RedbDirectoryServiceConfig {
445            read_only: true,
446            ..config
447        };
448
449        let directory_service_ro_1 =
450            RedbDirectoryService::new("ro1".to_string(), ro_config.clone())
451                .await
452                .expect("to construct");
453        let directory_service_ro_2 = RedbDirectoryService::new("ro2".to_string(), ro_config)
454            .await
455            .expect("to construct");
456
457        assert_eq!(
458            directory_service_ro_1
459                .get(&DIRECTORY_A.digest())
460                .await
461                .expect("get to succeed")
462                .expect("to be Some(_)")
463                .digest(),
464            DIRECTORY_A.digest()
465        );
466        assert_eq!(
467            directory_service_ro_2
468                .get(&DIRECTORY_A.digest())
469                .await
470                .expect("get to succeed")
471                .expect("to be Some(_)")
472                .digest(),
473            DIRECTORY_A.digest()
474        );
475    }
476
477    #[tokio::test]
478    async fn read_only_nonexistent() {
479        let tempdir = TempDir::new().unwrap();
480        let path = tempdir.path().join("data.redb");
481
482        let config = RedbDirectoryServiceConfig {
483            path: Some(path),
484            cache_size: None,
485            read_only: true,
486        };
487
488        // Opening a read-only redb should fail if the path doesn't exist.
489        assert!(
490            RedbDirectoryService::new("test".to_string(), config)
491                .await
492                .is_err(),
493            "opening new path r/o should fail"
494        );
495    }
496
497    #[tokio::test]
498    async fn open_rw_and_ro() {
499        let tempdir = TempDir::new().unwrap();
500        let path = tempdir.path().join("data.redb");
501
502        let config = RedbDirectoryServiceConfig {
503            path: Some(path),
504            cache_size: None,
505            read_only: false,
506        };
507
508        let _directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
509            .await
510            .expect("to construct");
511
512        // Opening a read-only redb should fail if it's already opened read-write.
513        assert!(
514            RedbDirectoryService::new(
515                "ro".to_string(),
516                RedbDirectoryServiceConfig {
517                    read_only: true,
518                    ..config
519                }
520            )
521            .await
522            .is_err(),
523            "opening r/o should fail if still open r/w"
524        );
525    }
526}