1use futures::{StreamExt, TryStreamExt, stream::BoxStream};
2use prost::Message;
3use redb::{ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traversal};
9use crate::{
10 B3Digest,
11 composition::{CompositionContext, ServiceBuilder},
12 directoryservice::directory_graph::DirectoryGraphBuilder,
13 proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17 TableDefinition::new("directory");
18
19enum Db {
20 ReadOnly(redb::ReadOnlyDatabase),
21 ReadWrite(redb::Database),
22}
23
24impl Db {
25 fn begin_read(&self) -> Result<redb::ReadTransaction, Error> {
26 match self {
27 Db::ReadOnly(db) => Ok(db.begin_read()?),
28 Db::ReadWrite(db) => Ok(db.begin_read()?),
29 }
30 }
31
32 fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
33 match self {
34 Db::ReadOnly(_) => Err(Error::OpenedReadonly),
35 Db::ReadWrite(db) => Ok(db.begin_write()?),
36 }
37 }
38}
39
40#[derive(Clone)]
41pub struct RedbDirectoryService {
42 instance_name: String,
43
44 db: Arc<Db>,
46}
47
48impl RedbDirectoryService {
49 pub async fn new(
51 instance_name: String,
52 config: RedbDirectoryServiceConfig,
53 ) -> Result<Self, Error> {
54 if let Some(path) = config.path.clone() {
55 if &path == "" {
56 return Err(Error::WrongConfig("empty path is disallowed"));
57 }
58 if &path == "/" {
59 return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
60 }
61
62 if config.read_only {
63 let db = tokio::task::spawn_blocking(move || {
64 let mut builder = redb::Database::builder();
65 configure_builder(&mut builder, &config);
66 builder.open_read_only(path)
67 })
68 .await??;
69
70 return Ok(Self {
71 instance_name,
72 db: Arc::new(Db::ReadOnly(db)),
73 });
74 }
75
76 if let Some(parent) = path.parent() {
77 tokio::fs::create_dir_all(parent).await?;
78 }
79
80 let db = tokio::task::spawn_blocking(move || {
81 let mut builder = redb::Database::builder();
82 configure_builder(&mut builder, &config);
83
84 let db = builder.create(path)?;
85 create_schema(&db)?;
86 Ok::<_, Error>(db)
87 })
88 .await??;
89
90 Ok(Self {
91 instance_name,
92 db: Arc::new(Db::ReadWrite(db)),
93 })
94 } else {
95 Self::new_temporary(instance_name, config)
96 }
97 }
98
99 pub fn new_temporary(
102 instance_name: String,
103 config: RedbDirectoryServiceConfig,
104 ) -> Result<Self, Error> {
105 debug_assert!(
106 config.path.is_none(),
107 "Snix bug: config.path is not None, but new_temporary requested"
108 );
109
110 if config.read_only {
111 return Err(Error::WrongConfig("in-memory database cannot be read-only"));
112 }
113
114 let mut builder = redb::Database::builder();
115 configure_builder(&mut builder, &config);
116
117 let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
118
119 create_schema(&db)?;
120
121 Ok(Self {
122 instance_name,
123 db: Arc::new(Db::ReadWrite(db)),
124 })
125 }
126}
127
128fn configure_builder(builder: &mut redb::Builder, config: &RedbDirectoryServiceConfig) {
130 if let Some(cache_size) = config.cache_size {
131 builder.set_cache_size(cache_size);
132 }
133}
134
135#[allow(clippy::result_large_err)]
139fn create_schema(db: &redb::Database) -> Result<(), Error> {
140 let txn = db.begin_write()?;
141 txn.open_table(DIRECTORY_TABLE)?;
142 txn.commit()?;
143
144 Ok(())
145}
146
147#[async_trait]
148impl DirectoryService for RedbDirectoryService {
149 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
150 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
151 let db = self.db.clone();
152 let digest = *digest;
153 let directory_data = match tokio::task::spawn_blocking(move || -> Result<_, Error> {
155 let txn = db.begin_read()?;
156 let table = txn.open_table(DIRECTORY_TABLE)?;
157 Ok(table.get(*digest)?)
158 })
159 .await
160 .map_err(Error::TokioJoin)??
161 {
162 None => return Ok(None),
164 Some(directory_data) => directory_data.value(),
165 };
166
167 let actual = B3Digest::from(blake3::hash(&directory_data));
169 if actual != digest {
170 return Err(Error::WrongDigest {
171 expected: digest,
172 actual,
173 }
174 .into());
175 }
176
177 let proto_directory =
179 proto::Directory::decode(directory_data.as_slice()).map_err(Error::ProtobufDecode)?;
180 let directory = Directory::try_from(proto_directory).map_err(Error::DirectoryValidation)?;
181
182 Ok(Some(directory))
183 }
184
185 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
186 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
187 let db = self.db.clone();
188 let digest = tokio::task::spawn_blocking(move || -> Result<_, Error> {
189 let digest = directory.digest();
190
191 let txn = db.begin_write()?;
193 {
194 let mut table = txn.open_table(DIRECTORY_TABLE)?;
195 table.insert(
196 digest.as_ref(),
197 proto::Directory::from(directory).encode_to_vec(),
198 )?;
199 }
200 txn.commit()?;
201
202 Ok(digest)
203 })
204 .await
205 .map_err(Error::TokioJoin)??;
206
207 Ok(digest)
208 }
209
210 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
211 fn get_recursive(
212 &self,
213 root_directory_digest: &B3Digest,
214 ) -> BoxStream<'static, Result<Directory, super::Error>> {
215 let svc = self.clone();
219 traversal::root_to_leaves(*root_directory_digest, move |digest| {
220 let svc = svc.clone();
221 async move { svc.get(&digest).await }
222 })
223 .map_err(Error::DirectoryTraversal)
224 .err_into()
225 .boxed()
226 }
227
228 #[instrument(skip_all)]
229 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
230 Box::new(RedbDirectoryPutter {
231 db: self.db.clone(),
232 builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
233 })
234 }
235}
236
237pub struct RedbDirectoryPutter {
238 db: Arc<Db>,
239
240 builder: Option<DirectoryGraphBuilder>,
243}
244
245#[async_trait]
246impl DirectoryPutter for RedbDirectoryPutter {
247 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
248 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
249 let builder = self
250 .builder
251 .as_mut()
252 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
253
254 builder
255 .try_insert(directory)
256 .map_err(Error::DirectoryOrdering)?;
257
258 Ok(())
259 }
260
261 #[instrument(level = "trace", skip_all, ret, err)]
262 async fn close(&mut self) -> Result<B3Digest, super::Error> {
263 let builder = self
264 .builder
265 .take()
266 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
267
268 let db = self.db.clone();
270 let root_digest = tokio::task::spawn_blocking(move || {
271 let directory_graph = builder.build().map_err(Error::DirectoryOrdering)?;
273 let root_digest = directory_graph.root().digest();
274
275 let txn = db.begin_write()?;
276 {
279 let mut table = txn.open_table(DIRECTORY_TABLE)?;
280 for directory in directory_graph.drain_leaves_to_root() {
281 table.insert(
282 directory.digest().as_ref(),
283 proto::Directory::from(directory).encode_to_vec(),
284 )?;
285 }
286 }
287 txn.commit()?;
288
289 Ok::<_, Error>(root_digest)
290 })
291 .await
292 .map_err(Error::TokioJoin)??;
293
294 Ok(root_digest)
295 }
296}
297
298#[derive(thiserror::Error, Debug)]
299pub enum Error {
300 #[error("wrong arguments: {0}")]
301 WrongConfig(&'static str),
302 #[error("serde-qs error: {0}")]
303 SerdeQS(#[from] serde_qs::Error),
304
305 #[error("Directory Graph ordering error")]
306 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
307
308 #[error("DirectoryPutter already closed")]
309 DirectoryPutterAlreadyClosed,
310
311 #[error("failure during directory traversal")]
312 DirectoryTraversal(#[source] traversal::Error),
313
314 #[error("requested directory has wrong digest, expected {expected}, actual {actual}")]
315 WrongDigest {
316 expected: B3Digest,
317 actual: B3Digest,
318 },
319 #[error("failed to decode protobuf: {0}")]
320 ProtobufDecode(#[from] prost::DecodeError),
321 #[error("failed to validate directory: {0}")]
322 DirectoryValidation(#[from] crate::DirectoryError),
323
324 #[error("unable to open write txn, database opened read-only")]
325 OpenedReadonly,
326 #[error("redb commit error: {0}")]
327 RedbCommit(#[from] redb::CommitError),
328 #[error("redb database error: {0}")]
329 RedbDatabase(#[from] redb::DatabaseError),
330 #[error("redb error: {0}")]
331 Redb(#[from] redb::Error),
332 #[error("redb storage error: {0}")]
333 RedbStorage(#[from] redb::StorageError),
334 #[error("redb table error: {0}")]
335 RedbTable(#[from] redb::TableError),
336 #[error("redb txn error: {0}")]
337 RedbTransaction(#[from] redb::TransactionError),
338
339 #[error("join error: {0}")]
340 TokioJoin(#[from] tokio::task::JoinError),
341 #[error("io error: {0}")]
342 IO(#[from] std::io::Error),
343}
344
345impl From<Error> for super::Error {
346 fn from(value: Error) -> Self {
347 Self(Box::new(value))
348 }
349}
350
351#[derive(Clone, Default, serde::Deserialize)]
352#[serde(deny_unknown_fields)]
353pub struct RedbDirectoryServiceConfig {
354 path: Option<PathBuf>,
355
356 cache_size: Option<usize>,
358
359 #[serde(default)]
361 read_only: bool,
362}
363
364impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
365 type Error = Box<dyn std::error::Error + Send + Sync>;
366
367 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
368 if url.has_host() {
369 return Err(Error::WrongConfig("no host allowed").into());
370 }
371
372 let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
373 ("redb+memory", false, "") => None,
374 ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
375 "redb+memory with path is disallowed",
376 )))?,
377 ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
378 "redb+memory may not have authority",
379 )))?,
380 ("redb", _, "") => Err(Box::new(Error::WrongConfig(
381 "redb without path is disallowed, use redb+memory if you want in-memory",
382 )))?,
383 ("redb", true, _path) => Err(Box::new(Error::WrongConfig("authority disallowed")))?,
384 ("redb", false, path) => Some(path.into()),
385 (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
386 };
387
388 let mut config: RedbDirectoryServiceConfig =
389 serde_qs::from_str(url.query().unwrap_or_default())?;
390
391 config.path = path;
392
393 Ok(config)
394 }
395}
396
397#[async_trait]
398impl ServiceBuilder for RedbDirectoryServiceConfig {
399 type Output = dyn DirectoryService;
400 async fn build<'a>(
401 &'a self,
402 instance_name: &str,
403 _context: &CompositionContext,
404 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
405 Ok(Arc::new(
406 RedbDirectoryService::new(instance_name.to_string(), self.to_owned()).await?,
407 ))
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use tempfile::TempDir;
414
415 use crate::{
416 directoryservice::{DirectoryService, RedbDirectoryService, RedbDirectoryServiceConfig},
417 fixtures::DIRECTORY_A,
418 };
419
420 #[tokio::test]
421 async fn reopen_as_read_only() {
422 let tempdir = TempDir::new().unwrap();
423 let path = tempdir.path().join("data.redb");
424
425 let config = RedbDirectoryServiceConfig {
426 path: Some(path),
427 cache_size: None,
428 read_only: false,
429 };
430
431 {
433 let directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
434 .await
435 .expect("to construct");
436
437 directory_service
438 .put(DIRECTORY_A.clone())
439 .await
440 .expect("to insert");
441 } let ro_config = RedbDirectoryServiceConfig {
445 read_only: true,
446 ..config
447 };
448
449 let directory_service_ro_1 =
450 RedbDirectoryService::new("ro1".to_string(), ro_config.clone())
451 .await
452 .expect("to construct");
453 let directory_service_ro_2 = RedbDirectoryService::new("ro2".to_string(), ro_config)
454 .await
455 .expect("to construct");
456
457 assert_eq!(
458 directory_service_ro_1
459 .get(&DIRECTORY_A.digest())
460 .await
461 .expect("get to succeed")
462 .expect("to be Some(_)")
463 .digest(),
464 DIRECTORY_A.digest()
465 );
466 assert_eq!(
467 directory_service_ro_2
468 .get(&DIRECTORY_A.digest())
469 .await
470 .expect("get to succeed")
471 .expect("to be Some(_)")
472 .digest(),
473 DIRECTORY_A.digest()
474 );
475 }
476
477 #[tokio::test]
478 async fn read_only_nonexistent() {
479 let tempdir = TempDir::new().unwrap();
480 let path = tempdir.path().join("data.redb");
481
482 let config = RedbDirectoryServiceConfig {
483 path: Some(path),
484 cache_size: None,
485 read_only: true,
486 };
487
488 assert!(
490 RedbDirectoryService::new("test".to_string(), config)
491 .await
492 .is_err(),
493 "opening new path r/o should fail"
494 );
495 }
496
497 #[tokio::test]
498 async fn open_rw_and_ro() {
499 let tempdir = TempDir::new().unwrap();
500 let path = tempdir.path().join("data.redb");
501
502 let config = RedbDirectoryServiceConfig {
503 path: Some(path),
504 cache_size: None,
505 read_only: false,
506 };
507
508 let _directory_service = RedbDirectoryService::new("rw".to_string(), config.clone())
509 .await
510 .expect("to construct");
511
512 assert!(
514 RedbDirectoryService::new(
515 "ro".to_string(),
516 RedbDirectoryServiceConfig {
517 read_only: true,
518 ..config
519 }
520 )
521 .await
522 .is_err(),
523 "opening r/o should fail if still open r/w"
524 );
525 }
526}