snix_castore/directoryservice/
redb.rs

1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traverse_directory};
9use crate::{
10    B3Digest, Error,
11    composition::{CompositionContext, ServiceBuilder},
12    directoryservice::directory_graph::{DirectoryGraphBuilder, DirectoryOrder},
13    proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17    TableDefinition::new("directory");
18
19#[derive(Clone)]
20pub struct RedbDirectoryService {
21    instance_name: String,
22    // We wrap the db in an Arc to be able to move it into spawn_blocking,
23    // as discussed in https://github.com/cberner/redb/issues/789
24    db: Arc<Database>,
25}
26
27impl RedbDirectoryService {
28    /// Constructs a new instance using the specified filesystem path for
29    /// storage.
30    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
31        if &path == "/" {
32            return Err(Error::StorageError(
33                "cowardly refusing to open / with redb".to_string(),
34            ));
35        }
36
37        if let Some(parent) = path.parent() {
38            std::fs::create_dir_all(parent)?;
39        }
40
41        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
42            let db = redb::Database::builder().create(path)?;
43
44            create_schema(&db)?;
45            Ok(db)
46        })
47        .await??;
48
49        Ok(Self {
50            instance_name,
51            db: Arc::new(db),
52        })
53    }
54
55    /// Constructs a new instance using the in-memory backend.
56    pub fn new_temporary() -> Result<Self, Error> {
57        let db =
58            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
59
60        create_schema(&db)?;
61
62        Ok(Self {
63            instance_name: "root".into(),
64            db: Arc::new(db),
65        })
66    }
67}
68
69/// Ensures all tables are present.
70/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
71/// create it if not present.
72#[allow(clippy::result_large_err)]
73fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
74    let txn = db.begin_write()?;
75    txn.open_table(DIRECTORY_TABLE)?;
76    txn.commit()?;
77
78    Ok(())
79}
80
81#[async_trait]
82impl DirectoryService for RedbDirectoryService {
83    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
84    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
85        let db = self.db.clone();
86
87        // Retrieves the protobuf-encoded Directory for the corresponding digest.
88        let db_get_resp = tokio::task::spawn_blocking({
89            let digest = *digest.as_ref();
90            move || -> Result<_, redb::Error> {
91                let txn = db.begin_read()?;
92                let table = txn.open_table(DIRECTORY_TABLE)?;
93                Ok(table.get(digest)?)
94            }
95        })
96        .await?
97        .map_err(|e| {
98            warn!(err=%e, "failed to retrieve Directory");
99            Error::StorageError("failed to retrieve Directory".to_string())
100        })?;
101
102        // The Directory was not found, return None.
103        let directory_data = match db_get_resp {
104            None => return Ok(None),
105            Some(d) => d,
106        };
107
108        // We check that the digest of the retrieved Directory matches the expected digest.
109        let actual_digest = blake3::hash(directory_data.value().as_slice());
110        if actual_digest.as_bytes() != digest.as_slice() {
111            warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
112            return Err(Error::StorageError(
113                "requested Directory got the wrong digest".to_string(),
114            ));
115        }
116
117        // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if
118        // the decoding failed.
119        let directory = match proto::Directory::decode(&*directory_data.value()) {
120            Ok(dir) => {
121                // The returned Directory must be valid.
122                dir.try_into().map_err(|e| {
123                    warn!(err=%e, "Directory failed validation");
124                    Error::StorageError("Directory failed validation".to_string())
125                })?
126            }
127            Err(e) => {
128                warn!(err=%e, "failed to parse Directory");
129                return Err(Error::StorageError("failed to parse Directory".to_string()));
130            }
131        };
132
133        Ok(Some(directory))
134    }
135
136    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
137    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
138        tokio::task::spawn_blocking({
139            let db = self.db.clone();
140            move || {
141                let digest = directory.digest();
142
143                // Store the directory in the table.
144                let txn = db.begin_write()?;
145                {
146                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
147                    table.insert(
148                        digest.as_ref(),
149                        proto::Directory::from(directory).encode_to_vec(),
150                    )?;
151                }
152                txn.commit()?;
153
154                Ok(digest)
155            }
156        })
157        .await?
158    }
159
160    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
161    fn get_recursive(
162        &self,
163        root_directory_digest: &B3Digest,
164    ) -> BoxStream<'static, Result<Directory, Error>> {
165        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
166        // redb transaction to avoid constantly closing and opening new transactions for the
167        // database.
168        traverse_directory(self.clone(), root_directory_digest)
169    }
170
171    #[instrument(skip_all)]
172    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
173        Box::new(RedbDirectoryPutter {
174            db: &self.db,
175            builder: Some(DirectoryGraphBuilder::new_with_insertion_order(
176                DirectoryOrder::LeavesToRoot,
177            )),
178        })
179    }
180}
181
182pub struct RedbDirectoryPutter<'a> {
183    db: &'a Database,
184
185    /// The directories (inside the directory validator) that we insert later,
186    /// or None, if they were already inserted.
187    builder: Option<DirectoryGraphBuilder>,
188}
189
190#[async_trait]
191impl DirectoryPutter for RedbDirectoryPutter<'_> {
192    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
193    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
194        let builder = self
195            .builder
196            .as_mut()
197            .ok_or_else(|| Error::StorageError("already closed".to_string()))?;
198
199        builder.insert(directory)?;
200
201        Ok(())
202    }
203
204    #[instrument(level = "trace", skip_all, ret, err)]
205    async fn close(&mut self) -> Result<B3Digest, Error> {
206        let builder = self
207            .builder
208            .take()
209            .ok_or_else(|| Error::StorageError("already closed".to_string()))?;
210
211        // Insert all directories as a batch.
212        tokio::task::spawn_blocking({
213            let txn = self.db.begin_write()?;
214            move || {
215                // Retrieve the validated directories.
216                let directory_graph = builder.build()?;
217                let root_digest = directory_graph.root().digest();
218
219                // Looping over all the verified directories, queuing them up for a
220                // batch insertion.
221                {
222                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
223                    for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
224                        table.insert(
225                            directory.digest().as_ref(),
226                            proto::Directory::from(directory).encode_to_vec(),
227                        )?;
228                    }
229                }
230                txn.commit()?;
231
232                Ok(root_digest)
233            }
234        })
235        .await?
236    }
237}
238
239#[derive(serde::Deserialize)]
240#[serde(deny_unknown_fields)]
241pub struct RedbDirectoryServiceConfig {
242    is_temporary: bool,
243    #[serde(default)]
244    /// required when is_temporary = false
245    path: Option<PathBuf>,
246}
247
248impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
249    type Error = Box<dyn std::error::Error + Send + Sync>;
250    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
251        // redb doesn't support host, and a path can be provided (otherwise
252        // it'll live in memory only).
253        if url.has_host() {
254            return Err(Error::StorageError("no host allowed".to_string()).into());
255        }
256
257        Ok(if url.path().is_empty() {
258            RedbDirectoryServiceConfig {
259                is_temporary: true,
260                path: None,
261            }
262        } else {
263            RedbDirectoryServiceConfig {
264                is_temporary: false,
265                path: Some(url.path().into()),
266            }
267        })
268    }
269}
270
271#[async_trait]
272impl ServiceBuilder for RedbDirectoryServiceConfig {
273    type Output = dyn DirectoryService;
274    async fn build<'a>(
275        &'a self,
276        instance_name: &str,
277        _context: &CompositionContext,
278    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
279        match self {
280            RedbDirectoryServiceConfig {
281                is_temporary: true,
282                path: None,
283            } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
284            RedbDirectoryServiceConfig {
285                is_temporary: true,
286                path: Some(_),
287            } => Err(Error::StorageError(
288                "Temporary RedbDirectoryService can not have path".into(),
289            )
290            .into()),
291            RedbDirectoryServiceConfig {
292                is_temporary: false,
293                path: None,
294            } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
295            RedbDirectoryServiceConfig {
296                is_temporary: false,
297                path: Some(path),
298            } => Ok(Arc::new(
299                RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
300            )),
301        }
302    }
303}