snix_castore/directoryservice/
redb.rs

1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{
9    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
10    traverse_directory,
11};
12use crate::{
13    B3Digest, Error,
14    composition::{CompositionContext, ServiceBuilder},
15    proto,
16};
17
18const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
19    TableDefinition::new("directory");
20
21#[derive(Clone)]
22pub struct RedbDirectoryService {
23    instance_name: String,
24    // We wrap the db in an Arc to be able to move it into spawn_blocking,
25    // as discussed in https://github.com/cberner/redb/issues/789
26    db: Arc<Database>,
27}
28
29impl RedbDirectoryService {
30    /// Constructs a new instance using the specified filesystem path for
31    /// storage.
32    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
33        if path == PathBuf::from("/") {
34            return Err(Error::StorageError(
35                "cowardly refusing to open / with redb".to_string(),
36            ));
37        }
38
39        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
40            let db = redb::Database::create(path)?;
41            create_schema(&db)?;
42            Ok(db)
43        })
44        .await??;
45
46        Ok(Self {
47            instance_name,
48            db: Arc::new(db),
49        })
50    }
51
52    /// Constructs a new instance using the in-memory backend.
53    pub fn new_temporary() -> Result<Self, Error> {
54        let db =
55            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
56
57        create_schema(&db)?;
58
59        Ok(Self {
60            instance_name: "root".into(),
61            db: Arc::new(db),
62        })
63    }
64}
65
66/// Ensures all tables are present.
67/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
68/// create it if not present.
69fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
70    let txn = db.begin_write()?;
71    txn.open_table(DIRECTORY_TABLE)?;
72    txn.commit()?;
73
74    Ok(())
75}
76
77#[async_trait]
78impl DirectoryService for RedbDirectoryService {
79    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
80    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
81        let db = self.db.clone();
82
83        // Retrieves the protobuf-encoded Directory for the corresponding digest.
84        let db_get_resp = tokio::task::spawn_blocking({
85            let digest = *digest.as_ref();
86            move || -> Result<_, redb::Error> {
87                let txn = db.begin_read()?;
88                let table = txn.open_table(DIRECTORY_TABLE)?;
89                Ok(table.get(digest)?)
90            }
91        })
92        .await?
93        .map_err(|e| {
94            warn!(err=%e, "failed to retrieve Directory");
95            Error::StorageError("failed to retrieve Directory".to_string())
96        })?;
97
98        // The Directory was not found, return None.
99        let directory_data = match db_get_resp {
100            None => return Ok(None),
101            Some(d) => d,
102        };
103
104        // We check that the digest of the retrieved Directory matches the expected digest.
105        let actual_digest = blake3::hash(directory_data.value().as_slice());
106        if actual_digest.as_bytes() != digest.as_slice() {
107            warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
108            return Err(Error::StorageError(
109                "requested Directory got the wrong digest".to_string(),
110            ));
111        }
112
113        // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if
114        // the decoding failed.
115        let directory = match proto::Directory::decode(&*directory_data.value()) {
116            Ok(dir) => {
117                // The returned Directory must be valid.
118                dir.try_into().map_err(|e| {
119                    warn!(err=%e, "Directory failed validation");
120                    Error::StorageError("Directory failed validation".to_string())
121                })?
122            }
123            Err(e) => {
124                warn!(err=%e, "failed to parse Directory");
125                return Err(Error::StorageError("failed to parse Directory".to_string()));
126            }
127        };
128
129        Ok(Some(directory))
130    }
131
132    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
133    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
134        tokio::task::spawn_blocking({
135            let db = self.db.clone();
136            move || {
137                let digest = directory.digest();
138
139                // Store the directory in the table.
140                let txn = db.begin_write()?;
141                {
142                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
143                    table.insert(
144                        digest.as_ref(),
145                        proto::Directory::from(directory).encode_to_vec(),
146                    )?;
147                }
148                txn.commit()?;
149
150                Ok(digest)
151            }
152        })
153        .await?
154    }
155
156    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
157    fn get_recursive(
158        &self,
159        root_directory_digest: &B3Digest,
160    ) -> BoxStream<'static, Result<Directory, Error>> {
161        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
162        // redb transaction to avoid constantly closing and opening new transactions for the
163        // database.
164        traverse_directory(self.clone(), root_directory_digest)
165    }
166
167    #[instrument(skip_all)]
168    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
169        Box::new(RedbDirectoryPutter {
170            db: &self.db,
171            directory_validator: Some(Default::default()),
172        })
173    }
174}
175
176pub struct RedbDirectoryPutter<'a> {
177    db: &'a Database,
178
179    /// The directories (inside the directory validator) that we insert later,
180    /// or None, if they were already inserted.
181    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
182}
183
184#[async_trait]
185impl DirectoryPutter for RedbDirectoryPutter<'_> {
186    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
187    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
188        match self.directory_validator {
189            None => return Err(Error::StorageError("already closed".to_string())),
190            Some(ref mut validator) => {
191                validator
192                    .add(directory)
193                    .map_err(|e| Error::StorageError(e.to_string()))?;
194            }
195        }
196
197        Ok(())
198    }
199
200    #[instrument(level = "trace", skip_all, ret, err)]
201    async fn close(&mut self) -> Result<B3Digest, Error> {
202        match self.directory_validator.take() {
203            None => Err(Error::StorageError("already closed".to_string())),
204            Some(validator) => {
205                // Insert all directories as a batch.
206                tokio::task::spawn_blocking({
207                    let txn = self.db.begin_write()?;
208                    move || {
209                        // Retrieve the validated directories.
210                        let directories = validator
211                            .validate()
212                            .map_err(|e| Error::StorageError(e.to_string()))?
213                            .drain_leaves_to_root()
214                            .collect::<Vec<_>>();
215
216                        // Get the root digest, which is at the end (cf. insertion order)
217                        let root_digest = directories
218                            .last()
219                            .ok_or_else(|| Error::StorageError("got no directories".to_string()))?
220                            .digest();
221
222                        {
223                            let mut table = txn.open_table(DIRECTORY_TABLE)?;
224
225                            // Looping over all the verified directories, queuing them up for a
226                            // batch insertion.
227                            for directory in directories {
228                                table.insert(
229                                    directory.digest().as_ref(),
230                                    proto::Directory::from(directory).encode_to_vec(),
231                                )?;
232                            }
233                        }
234
235                        txn.commit()?;
236
237                        Ok(root_digest)
238                    }
239                })
240                .await?
241            }
242        }
243    }
244}
245
246#[derive(serde::Deserialize)]
247#[serde(deny_unknown_fields)]
248pub struct RedbDirectoryServiceConfig {
249    is_temporary: bool,
250    #[serde(default)]
251    /// required when is_temporary = false
252    path: Option<PathBuf>,
253}
254
255impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
256    type Error = Box<dyn std::error::Error + Send + Sync>;
257    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
258        // redb doesn't support host, and a path can be provided (otherwise
259        // it'll live in memory only).
260        if url.has_host() {
261            return Err(Error::StorageError("no host allowed".to_string()).into());
262        }
263
264        Ok(if url.path().is_empty() {
265            RedbDirectoryServiceConfig {
266                is_temporary: true,
267                path: None,
268            }
269        } else {
270            RedbDirectoryServiceConfig {
271                is_temporary: false,
272                path: Some(url.path().into()),
273            }
274        })
275    }
276}
277
278#[async_trait]
279impl ServiceBuilder for RedbDirectoryServiceConfig {
280    type Output = dyn DirectoryService;
281    async fn build<'a>(
282        &'a self,
283        instance_name: &str,
284        _context: &CompositionContext,
285    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
286        match self {
287            RedbDirectoryServiceConfig {
288                is_temporary: true,
289                path: None,
290            } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
291            RedbDirectoryServiceConfig {
292                is_temporary: true,
293                path: Some(_),
294            } => Err(Error::StorageError(
295                "Temporary RedbDirectoryService can not have path".into(),
296            )
297            .into()),
298            RedbDirectoryServiceConfig {
299                is_temporary: false,
300                path: None,
301            } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
302            RedbDirectoryServiceConfig {
303                is_temporary: false,
304                path: Some(path),
305            } => Ok(Arc::new(
306                RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
307            )),
308        }
309    }
310}