snix_castore/directoryservice/
redb.rs

1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{info, instrument, warn};
7
8use super::{
9    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
10    traverse_directory,
11};
12use crate::{
13    B3Digest, Error,
14    composition::{CompositionContext, ServiceBuilder},
15    proto,
16};
17
18const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
19    TableDefinition::new("directory");
20
21#[derive(Clone)]
22pub struct RedbDirectoryService {
23    instance_name: String,
24    // We wrap the db in an Arc to be able to move it into spawn_blocking,
25    // as discussed in https://github.com/cberner/redb/issues/789
26    db: Arc<Database>,
27}
28
29impl RedbDirectoryService {
30    /// Constructs a new instance using the specified filesystem path for
31    /// storage.
32    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
33        if path == PathBuf::from("/") {
34            return Err(Error::StorageError(
35                "cowardly refusing to open / with redb".to_string(),
36            ));
37        }
38
39        if let Some(parent) = path.parent() {
40            std::fs::create_dir_all(parent)?;
41        }
42
43        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
44            let mut db = redb::Database::builder()
45                .create_with_file_format_v3(true)
46                .create(path)?;
47
48            // Upgrade redb database file format.
49            if db.upgrade()? {
50                info!("Upgraded database format");
51            };
52
53            create_schema(&db)?;
54            Ok(db)
55        })
56        .await??;
57
58        Ok(Self {
59            instance_name,
60            db: Arc::new(db),
61        })
62    }
63
64    /// Constructs a new instance using the in-memory backend.
65    pub fn new_temporary() -> Result<Self, Error> {
66        let db = redb::Database::builder()
67            .create_with_file_format_v3(true)
68            .create_with_backend(redb::backends::InMemoryBackend::new())?;
69
70        create_schema(&db)?;
71
72        Ok(Self {
73            instance_name: "root".into(),
74            db: Arc::new(db),
75        })
76    }
77}
78
79/// Ensures all tables are present.
80/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
81/// create it if not present.
82#[allow(clippy::result_large_err)]
83fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
84    let txn = db.begin_write()?;
85    txn.open_table(DIRECTORY_TABLE)?;
86    txn.commit()?;
87
88    Ok(())
89}
90
91#[async_trait]
92impl DirectoryService for RedbDirectoryService {
93    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
94    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
95        let db = self.db.clone();
96
97        // Retrieves the protobuf-encoded Directory for the corresponding digest.
98        let db_get_resp = tokio::task::spawn_blocking({
99            let digest = *digest.as_ref();
100            move || -> Result<_, redb::Error> {
101                let txn = db.begin_read()?;
102                let table = txn.open_table(DIRECTORY_TABLE)?;
103                Ok(table.get(digest)?)
104            }
105        })
106        .await?
107        .map_err(|e| {
108            warn!(err=%e, "failed to retrieve Directory");
109            Error::StorageError("failed to retrieve Directory".to_string())
110        })?;
111
112        // The Directory was not found, return None.
113        let directory_data = match db_get_resp {
114            None => return Ok(None),
115            Some(d) => d,
116        };
117
118        // We check that the digest of the retrieved Directory matches the expected digest.
119        let actual_digest = blake3::hash(directory_data.value().as_slice());
120        if actual_digest.as_bytes() != digest.as_slice() {
121            warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
122            return Err(Error::StorageError(
123                "requested Directory got the wrong digest".to_string(),
124            ));
125        }
126
127        // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if
128        // the decoding failed.
129        let directory = match proto::Directory::decode(&*directory_data.value()) {
130            Ok(dir) => {
131                // The returned Directory must be valid.
132                dir.try_into().map_err(|e| {
133                    warn!(err=%e, "Directory failed validation");
134                    Error::StorageError("Directory failed validation".to_string())
135                })?
136            }
137            Err(e) => {
138                warn!(err=%e, "failed to parse Directory");
139                return Err(Error::StorageError("failed to parse Directory".to_string()));
140            }
141        };
142
143        Ok(Some(directory))
144    }
145
146    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
147    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
148        tokio::task::spawn_blocking({
149            let db = self.db.clone();
150            move || {
151                let digest = directory.digest();
152
153                // Store the directory in the table.
154                let txn = db.begin_write()?;
155                {
156                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
157                    table.insert(
158                        digest.as_ref(),
159                        proto::Directory::from(directory).encode_to_vec(),
160                    )?;
161                }
162                txn.commit()?;
163
164                Ok(digest)
165            }
166        })
167        .await?
168    }
169
170    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
171    fn get_recursive(
172        &self,
173        root_directory_digest: &B3Digest,
174    ) -> BoxStream<'static, Result<Directory, Error>> {
175        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
176        // redb transaction to avoid constantly closing and opening new transactions for the
177        // database.
178        traverse_directory(self.clone(), root_directory_digest)
179    }
180
181    #[instrument(skip_all)]
182    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
183        Box::new(RedbDirectoryPutter {
184            db: &self.db,
185            directory_validator: Some(Default::default()),
186        })
187    }
188}
189
190pub struct RedbDirectoryPutter<'a> {
191    db: &'a Database,
192
193    /// The directories (inside the directory validator) that we insert later,
194    /// or None, if they were already inserted.
195    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
196}
197
198#[async_trait]
199impl DirectoryPutter for RedbDirectoryPutter<'_> {
200    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
201    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
202        match self.directory_validator {
203            None => return Err(Error::StorageError("already closed".to_string())),
204            Some(ref mut validator) => {
205                validator
206                    .add(directory)
207                    .map_err(|e| Error::StorageError(e.to_string()))?;
208            }
209        }
210
211        Ok(())
212    }
213
214    #[instrument(level = "trace", skip_all, ret, err)]
215    async fn close(&mut self) -> Result<B3Digest, Error> {
216        match self.directory_validator.take() {
217            None => Err(Error::StorageError("already closed".to_string())),
218            Some(validator) => {
219                // Insert all directories as a batch.
220                tokio::task::spawn_blocking({
221                    let txn = self.db.begin_write()?;
222                    move || {
223                        // Retrieve the validated directories.
224                        let directories = validator
225                            .validate()
226                            .map_err(|e| Error::StorageError(e.to_string()))?
227                            .drain_leaves_to_root()
228                            .collect::<Vec<_>>();
229
230                        // Get the root digest, which is at the end (cf. insertion order)
231                        let root_digest = directories
232                            .last()
233                            .ok_or_else(|| Error::StorageError("got no directories".to_string()))?
234                            .digest();
235
236                        {
237                            let mut table = txn.open_table(DIRECTORY_TABLE)?;
238
239                            // Looping over all the verified directories, queuing them up for a
240                            // batch insertion.
241                            for directory in directories {
242                                table.insert(
243                                    directory.digest().as_ref(),
244                                    proto::Directory::from(directory).encode_to_vec(),
245                                )?;
246                            }
247                        }
248
249                        txn.commit()?;
250
251                        Ok(root_digest)
252                    }
253                })
254                .await?
255            }
256        }
257    }
258}
259
260#[derive(serde::Deserialize)]
261#[serde(deny_unknown_fields)]
262pub struct RedbDirectoryServiceConfig {
263    is_temporary: bool,
264    #[serde(default)]
265    /// required when is_temporary = false
266    path: Option<PathBuf>,
267}
268
269impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
270    type Error = Box<dyn std::error::Error + Send + Sync>;
271    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
272        // redb doesn't support host, and a path can be provided (otherwise
273        // it'll live in memory only).
274        if url.has_host() {
275            return Err(Error::StorageError("no host allowed".to_string()).into());
276        }
277
278        Ok(if url.path().is_empty() {
279            RedbDirectoryServiceConfig {
280                is_temporary: true,
281                path: None,
282            }
283        } else {
284            RedbDirectoryServiceConfig {
285                is_temporary: false,
286                path: Some(url.path().into()),
287            }
288        })
289    }
290}
291
292#[async_trait]
293impl ServiceBuilder for RedbDirectoryServiceConfig {
294    type Output = dyn DirectoryService;
295    async fn build<'a>(
296        &'a self,
297        instance_name: &str,
298        _context: &CompositionContext,
299    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
300        match self {
301            RedbDirectoryServiceConfig {
302                is_temporary: true,
303                path: None,
304            } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
305            RedbDirectoryServiceConfig {
306                is_temporary: true,
307                path: Some(_),
308            } => Err(Error::StorageError(
309                "Temporary RedbDirectoryService can not have path".into(),
310            )
311            .into()),
312            RedbDirectoryServiceConfig {
313                is_temporary: false,
314                path: None,
315            } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
316            RedbDirectoryServiceConfig {
317                is_temporary: false,
318                path: Some(path),
319            } => Ok(Arc::new(
320                RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
321            )),
322        }
323    }
324}