snix_store/pathinfoservice/
redb.rs1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use data_encoding::BASE64;
4use futures::{StreamExt, TryStreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::composition::{CompositionContext, ServiceBuilder};
8use std::{path::PathBuf, sync::Arc};
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::async_trait;
11use tracing::instrument;
12
13const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
14
15enum Db {
16 ReadOnly(redb::ReadOnlyDatabase),
17 ReadWrite(redb::Database),
18}
19
20impl Db {
21 fn begin_read(&self) -> Result<redb::ReadTransaction, redb::TransactionError> {
22 match self {
23 Db::ReadOnly(db) => db.begin_read(),
24 Db::ReadWrite(db) => db.begin_read(),
25 }
26 }
27
28 fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
29 match self {
30 Db::ReadOnly(_) => Err(Error::OpenedReadonly),
31 Db::ReadWrite(db) => Ok(db.begin_write()?),
32 }
33 }
34}
35
36#[derive(Clone)]
40pub struct RedbPathInfoService {
41 instance_name: String,
42
43 db: Arc<Db>,
45}
46
47impl RedbPathInfoService {
48 pub async fn new(
50 instance_name: String,
51 config: RedbPathInfoServiceConfig,
52 ) -> Result<Self, Error> {
53 if let Some(path) = config.path.clone() {
54 if &path == "" {
55 return Err(Error::WrongConfig("empty path is disallowed"));
56 }
57 if &path == "/" {
58 return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
59 }
60
61 if config.read_only {
62 let db = tokio::task::spawn_blocking(move || {
63 let mut builder = redb::Database::builder();
64 configure_builder(&mut builder, &config);
65 builder.open_read_only(path)
66 })
67 .await??;
68
69 return Ok(Self {
70 instance_name,
71 db: Arc::new(Db::ReadOnly(db)),
72 });
73 }
74
75 if let Some(parent) = path.parent() {
76 tokio::fs::create_dir_all(parent).await?;
77 }
78
79 let db = tokio::task::spawn_blocking(move || {
80 let mut builder = redb::Database::builder();
81 configure_builder(&mut builder, &config);
82
83 let db = builder.create(path)?;
84 create_schema(&db)?;
85 Ok::<_, Error>(db)
86 })
87 .await??;
88
89 Ok(Self {
90 instance_name,
91 db: Arc::new(Db::ReadWrite(db)),
92 })
93 } else {
94 Self::new_temporary(instance_name, config)
95 }
96 }
97
98 pub fn new_temporary(
101 instance_name: String,
102 config: RedbPathInfoServiceConfig,
103 ) -> Result<Self, Error> {
104 debug_assert!(
105 config.path.is_none(),
106 "Snix bug: config.path is not None, but new_temporary requested"
107 );
108
109 if config.read_only {
110 return Err(Error::WrongConfig("in-memory database cannot be read-only"));
111 }
112
113 let mut builder = redb::Database::builder();
114 configure_builder(&mut builder, &config);
115
116 let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
117
118 create_schema(&db)?;
119
120 Ok(RedbPathInfoService {
121 instance_name,
122 db: Arc::new(Db::ReadWrite(db)),
123 })
124 }
125}
126
127fn configure_builder(builder: &mut redb::Builder, config: &RedbPathInfoServiceConfig) {
129 if let Some(cache_size) = config.cache_size {
130 builder.set_cache_size(cache_size);
131 }
132}
133
134#[allow(clippy::result_large_err)]
138fn create_schema(db: &redb::Database) -> Result<(), Error> {
139 let txn = db.begin_write()?;
140 txn.open_table(PATHINFO_TABLE)?;
141 txn.commit()?;
142
143 Ok(())
144}
145
146#[async_trait]
147impl PathInfoService for RedbPathInfoService {
148 #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
149 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
150 let db = self.db.clone();
151
152 let path_info_bytes = match tokio::task::spawn_blocking({
153 move || -> Result<_, Error> {
154 let txn = db.begin_read()?;
155 let table = txn.open_table(PATHINFO_TABLE)?;
156 Ok(table.get(digest)?)
157 }
158 })
159 .await??
160 {
161 None => return Ok(None),
163 Some(path_info_data) => path_info_data.value(),
164 };
165
166 let pathinfo_proto =
167 proto::PathInfo::decode(path_info_bytes.as_slice()).map_err(Error::ProtobufDecode)?;
168 let path_info = PathInfo::try_from(pathinfo_proto).map_err(Error::PathInfoValidation)?;
169
170 return Ok(Some(path_info));
171 }
172
173 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
174 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
175 let db = self.db.clone();
176 tokio::task::spawn_blocking({
177 let path_info = path_info.clone();
178 move || -> Result<(), Error> {
179 let txn = db.begin_write()?;
180 {
181 let mut table = txn.open_table(PATHINFO_TABLE)?;
182 table.insert(
183 *path_info.store_path.digest(),
184 proto::PathInfo::from(path_info).encode_to_vec(),
185 )?;
186 }
187 txn.commit()?;
188 Ok(())
189 }
190 })
191 .await??;
192
193 Ok(path_info)
194 }
195
196 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
197 let db = self.db.clone();
198 let (tx, rx) = tokio::sync::mpsc::channel(64);
199
200 tokio::task::spawn_blocking(move || {
201 let result = (|| -> Result<(), Error> {
203 let read_txn = db.begin_read()?;
204
205 let table = read_txn.open_table(PATHINFO_TABLE)?;
206
207 let table_iter = table.iter()?;
208
209 for elem in table_iter {
210 let path_info_proto = proto::PathInfo::decode(elem?.1.value().as_slice())?;
211
212 let path_info = PathInfo::try_from(path_info_proto)?;
213
214 if tx.blocking_send(Ok(path_info)).is_err() {
215 break;
216 }
217 }
218
219 Ok(())
220 })();
221
222 if let Err(err) = result {
223 let _ = tx.blocking_send(Err(err));
224 }
225 });
226
227 ReceiverStream::new(rx).err_into().boxed()
228 }
229}
230
231#[derive(thiserror::Error, Debug)]
232pub enum Error {
233 #[error("wrong arguments: {0}")]
234 WrongConfig(&'static str),
235 #[error("serde-qs error: {0}")]
236 SerdeQS(#[from] serde_qs::Error),
237
238 #[error("failed to decode protobuf: {0}")]
239 ProtobufDecode(#[from] prost::DecodeError),
240 #[error("failed to validate PathInfo: {0}")]
241 PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
242
243 #[error("unable to open write txn, database opened read-only")]
244 OpenedReadonly,
245
246 #[error("redb commit error: {0}")]
247 RedbCommit(#[from] redb::CommitError),
248 #[error("redb database error: {0}")]
249 RedbDatabase(#[from] redb::DatabaseError),
250 #[error("redb error: {0}")]
251 Redb(#[from] redb::Error),
252 #[error("redb storage error: {0}")]
253 RedbStorage(#[from] redb::StorageError),
254 #[error("redb table error: {0}")]
255 RedbTable(#[from] redb::TableError),
256 #[error("redb txn error: {0}")]
257 RedbTransaction(#[from] redb::TransactionError),
258
259 #[error("join error: {0}")]
260 TokioJoin(#[from] tokio::task::JoinError),
261 #[error("io error: {0}")]
262 IO(#[from] std::io::Error),
263}
264
265#[derive(Clone, Default, serde::Deserialize)]
266#[serde(deny_unknown_fields)]
267pub struct RedbPathInfoServiceConfig {
268 path: Option<PathBuf>,
269
270 cache_size: Option<usize>,
272
273 #[serde(default)]
275 read_only: bool,
276}
277
278impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
279 type Error = Box<dyn std::error::Error + Send + Sync>;
280 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
281 if url.has_host() {
282 return Err(Error::WrongConfig("no host allowed").into());
283 }
284
285 let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
286 ("redb+memory", false, "") => None,
287 ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
288 "redb+memory with path is disallowed",
289 )))?,
290 ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
291 "redb+memory may not have authority",
292 )))?,
293 ("redb", _, "") => Err(Box::new(Error::WrongConfig(
294 "redb without path is disallowed, use redb+memory if you want in-memory",
295 )))?,
296 ("redb", true, _path) => Err(Box::new(Error::WrongConfig("authority disallowed")))?,
297 ("redb", false, path) => Some(path.into()),
298 (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
299 };
300
301 let mut config: RedbPathInfoServiceConfig =
302 serde_qs::from_str(url.query().unwrap_or_default())?;
303
304 config.path = path;
305
306 Ok(config)
307 }
308}
309
310#[async_trait]
311impl ServiceBuilder for RedbPathInfoServiceConfig {
312 type Output = dyn PathInfoService;
313 async fn build<'a>(
314 &'a self,
315 instance_name: &str,
316 _context: &CompositionContext,
317 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
318 Ok(Arc::new(
319 RedbPathInfoService::new(instance_name.to_string(), self.to_owned()).await?,
320 ))
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use tempfile::TempDir;
327
328 use crate::fixtures::{DUMMY_PATH_DIGEST, PATH_INFO};
329 use crate::pathinfoservice::{PathInfoService, RedbPathInfoService, RedbPathInfoServiceConfig};
330
331 #[tokio::test]
332 async fn reopen_as_read_only() {
333 let tempdir = TempDir::new().unwrap();
334 let path = tempdir.path().join("data.redb");
335
336 let config = RedbPathInfoServiceConfig {
337 path: Some(path),
338 cache_size: None,
339 read_only: false,
340 };
341
342 {
344 let path_info_service = RedbPathInfoService::new("rw".to_string(), config.clone())
345 .await
346 .expect("to construct");
347
348 path_info_service
349 .put(PATH_INFO.clone())
350 .await
351 .expect("to insert");
352 } let ro_config = RedbPathInfoServiceConfig {
356 read_only: true,
357 ..config
358 };
359
360 let path_info_service_ro_1 = RedbPathInfoService::new("ro1".to_string(), ro_config.clone())
361 .await
362 .expect("to construct");
363 let path_info_service_ro_2 = RedbPathInfoService::new("ro2".to_string(), ro_config)
364 .await
365 .expect("to construct");
366
367 assert_eq!(
368 path_info_service_ro_1
369 .get(DUMMY_PATH_DIGEST)
370 .await
371 .expect("get to succeed")
372 .expect("to be Some(_)")
373 .store_path
374 .digest(),
375 &DUMMY_PATH_DIGEST,
376 );
377 assert_eq!(
378 path_info_service_ro_2
379 .get(DUMMY_PATH_DIGEST)
380 .await
381 .expect("get to succeed")
382 .expect("to be Some(_)")
383 .store_path
384 .digest(),
385 &DUMMY_PATH_DIGEST,
386 );
387 }
388
389 #[tokio::test]
390 async fn read_only_nonexistent() {
391 let tempdir = TempDir::new().unwrap();
392 let path = tempdir.path().join("data.redb");
393
394 let config = RedbPathInfoServiceConfig {
395 path: Some(path),
396 cache_size: None,
397 read_only: true,
398 };
399
400 assert!(
402 RedbPathInfoService::new("test".to_string(), config)
403 .await
404 .is_err(),
405 "opening new path r/o should fail"
406 );
407 }
408
409 #[tokio::test]
410 async fn open_rw_and_ro() {
411 let tempdir = TempDir::new().unwrap();
412 let path = tempdir.path().join("data.redb");
413
414 let config = RedbPathInfoServiceConfig {
415 path: Some(path),
416 cache_size: None,
417 read_only: false,
418 };
419
420 let _path_info_service = RedbPathInfoService::new("rw".to_string(), config.clone())
421 .await
422 .expect("to construct");
423
424 assert!(
426 RedbPathInfoService::new(
427 "ro".to_string(),
428 RedbPathInfoServiceConfig {
429 read_only: true,
430 ..config
431 }
432 )
433 .await
434 .is_err(),
435 "opening r/o should fail if still open r/w"
436 );
437 }
438}