1use futures::{StreamExt, TryStreamExt, stream::BoxStream};
2use prost::Message;
3use redb::{ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traversal};
9use crate::{
10 B3Digest,
11 composition::{CompositionContext, ServiceBuilder},
12 directoryservice::directory_graph::DirectoryGraphBuilder,
13 proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17 TableDefinition::new("directory");
18
19enum Db {
20 ReadOnly(redb::ReadOnlyDatabase),
21 ReadWrite(redb::Database),
22}
23
24impl Db {
25 fn begin_read(&self) -> Result<redb::ReadTransaction, Error> {
26 match self {
27 Db::ReadOnly(db) => Ok(db.begin_read()?),
28 Db::ReadWrite(db) => Ok(db.begin_read()?),
29 }
30 }
31
32 fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
33 match self {
34 Db::ReadOnly(_) => Err(Error::OpenedReadonly),
35 Db::ReadWrite(db) => Ok(db.begin_write()?),
36 }
37 }
38}
39
40#[derive(Clone)]
41pub struct RedbDirectoryService {
42 instance_name: String,
43
44 db: Arc<Db>,
46}
47
48impl RedbDirectoryService {
49 pub async fn new(
51 instance_name: String,
52 config: RedbDirectoryServiceConfig,
53 ) -> Result<Self, Error> {
54 if let Some(path) = config.path.clone() {
55 if &path == "" {
56 return Err(Error::WrongConfig("empty path is disallowed"));
57 }
58 if &path == "/" {
59 return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
60 }
61
62 if config.read_only {
63 let db =
64 tokio::task::spawn_blocking(|| redb::Database::builder().open_read_only(path))
65 .await??;
66
67 return Ok(Self {
68 instance_name,
69 db: Arc::new(Db::ReadOnly(db)),
70 });
71 }
72
73 if let Some(parent) = path.parent() {
74 tokio::fs::create_dir_all(parent).await?;
75 }
76
77 let db = tokio::task::spawn_blocking(move || {
78 let mut builder = redb::Database::builder();
79 configure_builder(&mut builder, &config);
80
81 let db = builder.create(path)?;
82 create_schema(&db)?;
83 Ok::<_, Error>(db)
84 })
85 .await??;
86
87 Ok(Self {
88 instance_name,
89 db: Arc::new(Db::ReadWrite(db)),
90 })
91 } else {
92 Self::new_temporary(instance_name, config)
93 }
94 }
95
96 pub fn new_temporary(
99 instance_name: String,
100 config: RedbDirectoryServiceConfig,
101 ) -> Result<Self, Error> {
102 debug_assert!(
103 config.path.is_none(),
104 "Snix bug: config.path is not None, but new_temporary requested"
105 );
106
107 if config.read_only {
108 return Err(Error::WrongConfig("in-memory database cannot be read-only"));
109 }
110
111 let mut builder = redb::Database::builder();
112 configure_builder(&mut builder, &config);
113
114 let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
115
116 create_schema(&db)?;
117
118 Ok(Self {
119 instance_name,
120 db: Arc::new(Db::ReadWrite(db)),
121 })
122 }
123}
124
125fn configure_builder(builder: &mut redb::Builder, config: &RedbDirectoryServiceConfig) {
127 if let Some(cache_size) = config.cache_size {
128 builder.set_cache_size(cache_size);
129 }
130}
131
132#[allow(clippy::result_large_err)]
136fn create_schema(db: &redb::Database) -> Result<(), Error> {
137 let txn = db.begin_write()?;
138 txn.open_table(DIRECTORY_TABLE)?;
139 txn.commit()?;
140
141 Ok(())
142}
143
144#[async_trait]
145impl DirectoryService for RedbDirectoryService {
146 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
147 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
148 let db = self.db.clone();
149 let digest = *digest;
150 let directory_data = match tokio::task::spawn_blocking(move || -> Result<_, Error> {
152 let txn = db.begin_read()?;
153 let table = txn.open_table(DIRECTORY_TABLE)?;
154 Ok(table.get(*digest)?)
155 })
156 .await??
157 {
158 None => return Ok(None),
160 Some(directory_data) => directory_data.value(),
161 };
162
163 let actual = B3Digest::from(blake3::hash(&directory_data));
165 if actual != digest {
166 return Err(Error::WrongDigest {
167 expected: digest,
168 actual,
169 }
170 .into());
171 }
172
173 let proto_directory =
175 proto::Directory::decode(directory_data.as_slice()).map_err(Error::ProtobufDecode)?;
176 let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
177
178 Ok(Some(directory))
179 }
180
181 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
182 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
183 let db = self.db.clone();
184 let digest = tokio::task::spawn_blocking(move || -> Result<_, Error> {
185 let digest = directory.digest();
186
187 let txn = db.begin_write()?;
189 {
190 let mut table = txn.open_table(DIRECTORY_TABLE)?;
191 table.insert(
192 digest.as_ref(),
193 proto::Directory::from(directory).encode_to_vec(),
194 )?;
195 }
196 txn.commit()?;
197
198 Ok(digest)
199 })
200 .await??;
201
202 Ok(digest)
203 }
204
205 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
206 fn get_recursive(
207 &self,
208 root_directory_digest: &B3Digest,
209 ) -> BoxStream<'static, Result<Directory, super::Error>> {
210 let svc = self.clone();
214 traversal::root_to_leaves(*root_directory_digest, move |digest| {
215 let svc = svc.clone();
216 async move { svc.get(&digest).await }
217 })
218 .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
219 .err_into()
220 .boxed()
221 }
222
223 #[instrument(skip_all)]
224 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
225 Box::new(RedbDirectoryPutter {
226 db: &self.db,
227 builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
228 })
229 }
230}
231
232pub struct RedbDirectoryPutter<'a> {
233 db: &'a Db,
234
235 builder: Option<DirectoryGraphBuilder>,
238}
239
240#[async_trait]
241impl DirectoryPutter for RedbDirectoryPutter<'_> {
242 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
243 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
244 let builder = self
245 .builder
246 .as_mut()
247 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
248
249 builder
250 .try_insert(directory)
251 .map_err(Error::DirectoryOrdering)?;
252
253 Ok(())
254 }
255
256 #[instrument(level = "trace", skip_all, ret, err)]
257 async fn close(&mut self) -> Result<B3Digest, super::Error> {
258 let builder = self
259 .builder
260 .take()
261 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
262
263 let root_digest = tokio::task::spawn_blocking({
265 let txn = self.db.begin_write()?;
266 move || {
267 let directory_graph = builder.build().map_err(Error::DirectoryOrdering)?;
269 let root_digest = directory_graph.root().digest();
270
271 {
274 let mut table = txn.open_table(DIRECTORY_TABLE)?;
275 for directory in directory_graph.drain_leaves_to_root() {
276 table.insert(
277 directory.digest().as_ref(),
278 proto::Directory::from(directory).encode_to_vec(),
279 )?;
280 }
281 }
282 txn.commit()?;
283
284 Ok::<_, Error>(root_digest)
285 }
286 })
287 .await??;
288
289 Ok(root_digest)
290 }
291}
292
293#[derive(thiserror::Error, Debug)]
294pub enum Error {
295 #[error("wrong arguments: {0}")]
296 WrongConfig(&'static str),
297 #[error("serde-qs error: {0}")]
298 SerdeQS(#[from] serde_qs::Error),
299
300 #[error("Directory Graph ordering error")]
301 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
302
303 #[error("DirectoryPutter already closed")]
304 DirectoryPutterAlreadyClosed,
305
306 #[error("failure during directory traversal")]
307 DirectoryTraversal(#[source] traversal::Error),
308
309 #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
310 WrongDigest {
311 expected: B3Digest,
312 actual: B3Digest,
313 },
314 #[error("failed to decode protobuf: {0}")]
315 ProtobufDecode(#[from] prost::DecodeError),
316 #[error("failed to validate directory: {0}")]
317 DirectoryValidation(#[from] crate::DirectoryError),
318
319 #[error("unable to open write txn, database opened read-only")]
320 OpenedReadonly,
321 #[error("redb commit error: {0}")]
322 RedbCommit(#[from] redb::CommitError),
323 #[error("redb database error: {0}")]
324 RedbDatabase(#[from] redb::DatabaseError),
325 #[error("redb error: {0}")]
326 Redb(#[from] redb::Error),
327 #[error("redb storage error: {0}")]
328 RedbStorage(#[from] redb::StorageError),
329 #[error("redb table error: {0}")]
330 RedbTable(#[from] redb::TableError),
331 #[error("redb txn error: {0}")]
332 RedbTransaction(#[from] redb::TransactionError),
333
334 #[error("join error: {0}")]
335 TokioJoin(#[from] tokio::task::JoinError),
336 #[error("io error: {0}")]
337 IO(#[from] std::io::Error),
338}
339
340#[derive(Clone, Default, serde::Deserialize)]
341#[serde(deny_unknown_fields)]
342pub struct RedbDirectoryServiceConfig {
343 path: Option<PathBuf>,
344
345 cache_size: Option<usize>,
347
348 #[serde(default)]
350 read_only: bool,
351}
352
353impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
354 type Error = Box<dyn std::error::Error + Send + Sync>;
355
356 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
357 if url.has_host() {
358 return Err(Error::WrongConfig("no host allowed").into());
359 }
360
361 let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
362 ("redb+memory", false, "") => None,
363 ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
364 "redb+memory with path is disallowed",
365 )))?,
366 ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
367 "redb+memory may not have authority",
368 )))?,
369 ("redb", _, "") => Err(Box::new(Error::WrongConfig(
370 "redb without path is disallowed, use redb+memory if you want in-memory",
371 )))?,
372 ("redb", true, _path) => Err(Box::new(Error::WrongConfig("authority disallowed")))?,
373 ("redb", false, path) => Some(path.into()),
374 (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
375 };
376
377 let mut config: RedbDirectoryServiceConfig =
378 serde_qs::from_str(url.query().unwrap_or_default())?;
379
380 config.path = path;
381
382 Ok(config)
383 }
384}
385
386#[async_trait]
387impl ServiceBuilder for RedbDirectoryServiceConfig {
388 type Output = dyn DirectoryService;
389 async fn build<'a>(
390 &'a self,
391 instance_name: &str,
392 _context: &CompositionContext,
393 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
394 Ok(Arc::new(
395 RedbDirectoryService::new(instance_name.to_string(), self.to_owned()).await?,
396 ))
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use tempfile::TempDir;
403
404 use crate::{
405 directoryservice::{DirectoryService, RedbDirectoryService, RedbDirectoryServiceConfig},
406 fixtures::DIRECTORY_A,
407 };
408
409 #[tokio::test]
410 async fn reopen_as_read_only() {
411 let tempdir = TempDir::new().unwrap();
412 let path = tempdir.path().join("data.redb");
413
414 let config = RedbDirectoryServiceConfig {
415 path: Some(path),
416 cache_size: None,
417 read_only: false,
418 };
419
420 {
422 let directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
423 .await
424 .expect("to construct");
425
426 directory_service
427 .put(DIRECTORY_A.clone())
428 .await
429 .expect("to insert");
430 } let ro_config = RedbDirectoryServiceConfig {
434 read_only: true,
435 ..config
436 };
437
438 let directory_service_ro_1 =
439 RedbDirectoryService::new("ro1".to_string(), ro_config.clone())
440 .await
441 .expect("to construct");
442 let directory_service_ro_2 = RedbDirectoryService::new("ro2".to_string(), ro_config)
443 .await
444 .expect("to construct");
445
446 assert_eq!(
447 directory_service_ro_1
448 .get(&DIRECTORY_A.digest())
449 .await
450 .expect("get to succeed")
451 .expect("to be Some(_)")
452 .digest(),
453 DIRECTORY_A.digest()
454 );
455 assert_eq!(
456 directory_service_ro_2
457 .get(&DIRECTORY_A.digest())
458 .await
459 .expect("get to succeed")
460 .expect("to be Some(_)")
461 .digest(),
462 DIRECTORY_A.digest()
463 );
464 }
465
466 #[tokio::test]
467 async fn read_only_nonexistent() {
468 let tempdir = TempDir::new().unwrap();
469 let path = tempdir.path().join("data.redb");
470
471 let config = RedbDirectoryServiceConfig {
472 path: Some(path),
473 cache_size: None,
474 read_only: true,
475 };
476
477 assert!(
479 RedbDirectoryService::new("test".to_string(), config)
480 .await
481 .is_err(),
482 "opening new path r/o should fail"
483 );
484 }
485
486 #[tokio::test]
487 async fn open_rw_and_ro() {
488 let tempdir = TempDir::new().unwrap();
489 let path = tempdir.path().join("data.redb");
490
491 let config = RedbDirectoryServiceConfig {
492 path: Some(path),
493 cache_size: None,
494 read_only: false,
495 };
496
497 let _directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
498 .await
499 .expect("to construct");
500
501 assert!(
503 RedbDirectoryService::new(
504 "ro".to_string(),
505 RedbDirectoryServiceConfig {
506 read_only: true,
507 ..config
508 }
509 )
510 .await
511 .is_err(),
512 "opening r/o should fail if still open r/w"
513 );
514 }
515}