1use futures::{StreamExt, TryStreamExt, stream::BoxStream};
2use prost::Message;
3use redb::{ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traversal};
9use crate::{
10 B3Digest,
11 composition::{CompositionContext, ServiceBuilder},
12 directoryservice::directory_graph::DirectoryGraphBuilder,
13 proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17 TableDefinition::new("directory");
18
19enum Db {
20 ReadOnly(redb::ReadOnlyDatabase),
21 ReadWrite(redb::Database),
22}
23
24impl Db {
25 fn begin_read(&self) -> Result<redb::ReadTransaction, Error> {
26 match self {
27 Db::ReadOnly(db) => Ok(db.begin_read()?),
28 Db::ReadWrite(db) => Ok(db.begin_read()?),
29 }
30 }
31
32 fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
33 match self {
34 Db::ReadOnly(_) => Err(Error::OpenedReadonly),
35 Db::ReadWrite(db) => Ok(db.begin_write()?),
36 }
37 }
38}
39
40#[derive(Clone)]
41pub struct RedbDirectoryService {
42 instance_name: String,
43
44 db: Arc<Db>,
46}
47
48impl RedbDirectoryService {
49 pub async fn new(
51 instance_name: String,
52 config: RedbDirectoryServiceConfig,
53 ) -> Result<Self, Error> {
54 if let Some(path) = config.path.clone() {
55 if &path == "" {
56 return Err(Error::WrongConfig("empty path is disallowed"));
57 }
58 if &path == "/" {
59 return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
60 }
61
62 if config.read_only {
63 let db =
64 tokio::task::spawn_blocking(|| redb::Database::builder().open_read_only(path))
65 .await??;
66
67 return Ok(Self {
68 instance_name,
69 db: Arc::new(Db::ReadOnly(db)),
70 });
71 }
72
73 if let Some(parent) = path.parent() {
74 tokio::fs::create_dir_all(parent).await?;
75 }
76
77 let db = tokio::task::spawn_blocking(move || {
78 let mut builder = redb::Database::builder();
79 configure_builder(&mut builder, &config);
80
81 let db = builder.create(path)?;
82 create_schema(&db)?;
83 Ok::<_, Error>(db)
84 })
85 .await??;
86
87 Ok(Self {
88 instance_name,
89 db: Arc::new(Db::ReadWrite(db)),
90 })
91 } else {
92 Self::new_temporary(instance_name, config)
93 }
94 }
95
96 pub fn new_temporary(
99 instance_name: String,
100 config: RedbDirectoryServiceConfig,
101 ) -> Result<Self, Error> {
102 debug_assert!(
103 config.path.is_none(),
104 "Snix bug: config.path is not None, but new_temporary requested"
105 );
106
107 if config.read_only {
108 return Err(Error::WrongConfig("in-memory database cannot be read-only"));
109 }
110
111 let mut builder = redb::Database::builder();
112 configure_builder(&mut builder, &config);
113
114 let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
115
116 create_schema(&db)?;
117
118 Ok(Self {
119 instance_name,
120 db: Arc::new(Db::ReadWrite(db)),
121 })
122 }
123}
124
125fn configure_builder(builder: &mut redb::Builder, config: &RedbDirectoryServiceConfig) {
127 if let Some(cache_size) = config.cache_size {
128 builder.set_cache_size(cache_size);
129 }
130}
131
132#[allow(clippy::result_large_err)]
136fn create_schema(db: &redb::Database) -> Result<(), Error> {
137 let txn = db.begin_write()?;
138 txn.open_table(DIRECTORY_TABLE)?;
139 txn.commit()?;
140
141 Ok(())
142}
143
144#[async_trait]
145impl DirectoryService for RedbDirectoryService {
146 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
147 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
148 let db = self.db.clone();
149 let digest = *digest;
150 let directory_data = match tokio::task::spawn_blocking(move || -> Result<_, Error> {
152 let txn = db.begin_read()?;
153 let table = txn.open_table(DIRECTORY_TABLE)?;
154 Ok(table.get(*digest)?)
155 })
156 .await??
157 {
158 None => return Ok(None),
160 Some(directory_data) => directory_data.value(),
161 };
162
163 let actual = B3Digest::from(blake3::hash(&directory_data));
165 if actual != digest {
166 return Err(Error::WrongDigest {
167 expected: digest,
168 actual,
169 }
170 .into());
171 }
172
173 let proto_directory =
175 proto::Directory::decode(directory_data.as_slice()).map_err(Error::ProtobufDecode)?;
176 let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
177
178 Ok(Some(directory))
179 }
180
181 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
182 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
183 let db = self.db.clone();
184 let digest = tokio::task::spawn_blocking(move || -> Result<_, Error> {
185 let digest = directory.digest();
186
187 let txn = db.begin_write()?;
189 {
190 let mut table = txn.open_table(DIRECTORY_TABLE)?;
191 table.insert(
192 digest.as_ref(),
193 proto::Directory::from(directory).encode_to_vec(),
194 )?;
195 }
196 txn.commit()?;
197
198 Ok(digest)
199 })
200 .await??;
201
202 Ok(digest)
203 }
204
205 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
206 fn get_recursive(
207 &self,
208 root_directory_digest: &B3Digest,
209 ) -> BoxStream<'static, Result<Directory, super::Error>> {
210 let svc = self.clone();
214 traversal::root_to_leaves(*root_directory_digest, move |digest| {
215 let svc = svc.clone();
216 async move { svc.get(&digest).await }
217 })
218 .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
219 .err_into()
220 .boxed()
221 }
222
223 #[instrument(skip_all)]
224 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
225 Box::new(RedbDirectoryPutter {
226 db: &self.db,
227 builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
228 })
229 }
230}
231
232pub struct RedbDirectoryPutter<'a> {
233 db: &'a Db,
234
235 builder: Option<DirectoryGraphBuilder>,
238}
239
240#[async_trait]
241impl DirectoryPutter for RedbDirectoryPutter<'_> {
242 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
243 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
244 let builder = self
245 .builder
246 .as_mut()
247 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
248
249 builder
250 .try_insert(directory)
251 .map_err(Error::DirectoryOrdering)?;
252
253 Ok(())
254 }
255
256 #[instrument(level = "trace", skip_all, ret, err)]
257 async fn close(&mut self) -> Result<B3Digest, super::Error> {
258 let builder = self
259 .builder
260 .take()
261 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
262
263 let root_digest = tokio::task::spawn_blocking({
265 let txn = self.db.begin_write()?;
266 move || {
267 let directory_graph = builder.build().map_err(Error::DirectoryOrdering)?;
269 let root_digest = directory_graph.root().digest();
270
271 {
274 let mut table = txn.open_table(DIRECTORY_TABLE)?;
275 for directory in directory_graph.drain_leaves_to_root() {
276 table.insert(
277 directory.digest().as_ref(),
278 proto::Directory::from(directory).encode_to_vec(),
279 )?;
280 }
281 }
282 txn.commit()?;
283
284 Ok::<_, Error>(root_digest)
285 }
286 })
287 .await??;
288
289 Ok(root_digest)
290 }
291}
292
293#[derive(thiserror::Error, Debug)]
294pub enum Error {
295 #[error("wrong arguments: {0}")]
296 WrongConfig(&'static str),
297 #[error("serde-qs error: {0}")]
298 SerdeQS(#[from] serde_qs::Error),
299
300 #[error("Directory Graph ordering error")]
301 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
302
303 #[error("DirectoryPutter already closed")]
304 DirectoryPutterAlreadyClosed,
305
306 #[error("failure during directory traversal")]
307 DirectoryTraversal(#[source] traversal::Error),
308
309 #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
310 WrongDigest {
311 expected: B3Digest,
312 actual: B3Digest,
313 },
314 #[error("failed to decode protobuf: {0}")]
315 ProtobufDecode(#[from] prost::DecodeError),
316 #[error("failed to validate directory: {0}")]
317 DirectoryValidation(#[from] crate::DirectoryError),
318
319 #[error("unable to open write txn, database opened read-only")]
320 OpenedReadonly,
321 #[error("redb commit error: {0}")]
322 RedbCommit(#[from] redb::CommitError),
323 #[error("redb database error: {0}")]
324 RedbDatabase(#[from] redb::DatabaseError),
325 #[error("redb error: {0}")]
326 Redb(#[from] redb::Error),
327 #[error("redb storage error: {0}")]
328 RedbStorage(#[from] redb::StorageError),
329 #[error("redb table error: {0}")]
330 RedbTable(#[from] redb::TableError),
331 #[error("redb txn error: {0}")]
332 RedbTransaction(#[from] redb::TransactionError),
333
334 #[error("join error: {0}")]
335 TokioJoin(#[from] tokio::task::JoinError),
336 #[error("io error: {0}")]
337 IO(#[from] std::io::Error),
338}
339
340#[derive(Clone, Default, serde::Deserialize)]
341#[serde(deny_unknown_fields)]
342pub struct RedbDirectoryServiceConfig {
343 path: Option<PathBuf>,
344
345 cache_size: Option<usize>,
347
348 #[serde(default)]
350 read_only: bool,
351}
352
353impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
354 type Error = Box<dyn std::error::Error + Send + Sync>;
355
356 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
357 if url.has_host() {
358 return Err(Error::WrongConfig("no host allowed").into());
359 }
360
361 let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
362 ("redb+memory", false, "") => None,
363 ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
364 "redb+memory with path is disallowed",
365 )))?,
366 ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
367 "redb+memory may not have authority",
368 )))?,
369 ("redb", _, "") => Err(Box::new(Error::WrongConfig(
370 "redb without path is disallowed, use redb+memory if you want in-memory",
371 )))?,
372 ("redb", _, path) => Some(path.into()),
373 (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
374 };
375
376 let mut config: RedbDirectoryServiceConfig =
377 serde_qs::from_str(url.query().unwrap_or_default())?;
378
379 config.path = path;
380
381 Ok(config)
382 }
383}
384
385#[async_trait]
386impl ServiceBuilder for RedbDirectoryServiceConfig {
387 type Output = dyn DirectoryService;
388 async fn build<'a>(
389 &'a self,
390 instance_name: &str,
391 _context: &CompositionContext,
392 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
393 Ok(Arc::new(
394 RedbDirectoryService::new(instance_name.to_string(), self.to_owned()).await?,
395 ))
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use tempfile::TempDir;
402
403 use crate::{
404 directoryservice::{DirectoryService, RedbDirectoryService, RedbDirectoryServiceConfig},
405 fixtures::DIRECTORY_A,
406 };
407
408 #[tokio::test]
409 async fn reopen_as_read_only() {
410 let tempdir = TempDir::new().unwrap();
411 let path = tempdir.path().join("data.redb");
412
413 let config = RedbDirectoryServiceConfig {
414 path: Some(path),
415 cache_size: None,
416 read_only: false,
417 };
418
419 {
421 let directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
422 .await
423 .expect("to construct");
424
425 directory_service
426 .put(DIRECTORY_A.clone())
427 .await
428 .expect("to insert");
429 } let ro_config = RedbDirectoryServiceConfig {
433 read_only: true,
434 ..config
435 };
436
437 let directory_service_ro_1 =
438 RedbDirectoryService::new("ro1".to_string(), ro_config.clone())
439 .await
440 .expect("to construct");
441 let directory_service_ro_2 = RedbDirectoryService::new("ro2".to_string(), ro_config)
442 .await
443 .expect("to construct");
444
445 assert_eq!(
446 directory_service_ro_1
447 .get(&DIRECTORY_A.digest())
448 .await
449 .expect("get to succeed")
450 .expect("to be Some(_)")
451 .digest(),
452 DIRECTORY_A.digest()
453 );
454 assert_eq!(
455 directory_service_ro_2
456 .get(&DIRECTORY_A.digest())
457 .await
458 .expect("get to succeed")
459 .expect("to be Some(_)")
460 .digest(),
461 DIRECTORY_A.digest()
462 );
463 }
464
465 #[tokio::test]
466 async fn read_only_nonexistent() {
467 let tempdir = TempDir::new().unwrap();
468 let path = tempdir.path().join("data.redb");
469
470 let config = RedbDirectoryServiceConfig {
471 path: Some(path),
472 cache_size: None,
473 read_only: true,
474 };
475
476 assert!(
478 RedbDirectoryService::new("test".to_string(), config)
479 .await
480 .is_err(),
481 "opening new path r/o should fail"
482 );
483 }
484
485 #[tokio::test]
486 async fn open_rw_and_ro() {
487 let tempdir = TempDir::new().unwrap();
488 let path = tempdir.path().join("data.redb");
489
490 let config = RedbDirectoryServiceConfig {
491 path: Some(path),
492 cache_size: None,
493 read_only: false,
494 };
495
496 let _directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
497 .await
498 .expect("to construct");
499
500 assert!(
502 RedbDirectoryService::new(
503 "ro".to_string(),
504 RedbDirectoryServiceConfig {
505 read_only: true,
506 ..config
507 }
508 )
509 .await
510 .is_err(),
511 "opening r/o should fail if still open r/w"
512 );
513 }
514}