snix_store/pathinfoservice/
redb.rs1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use data_encoding::BASE64;
4use futures::{StreamExt, TryStreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::composition::{CompositionContext, ServiceBuilder};
8use std::{path::PathBuf, sync::Arc};
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::async_trait;
11use tracing::instrument;
12
13const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
14
15enum Db {
16 ReadOnly(redb::ReadOnlyDatabase),
17 ReadWrite(redb::Database),
18}
19
20impl Db {
21 fn begin_read(&self) -> Result<redb::ReadTransaction, redb::TransactionError> {
22 match self {
23 Db::ReadOnly(db) => db.begin_read(),
24 Db::ReadWrite(db) => db.begin_read(),
25 }
26 }
27
28 fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
29 match self {
30 Db::ReadOnly(_) => Err(Error::OpenedReadonly),
31 Db::ReadWrite(db) => Ok(db.begin_write()?),
32 }
33 }
34}
35
36#[derive(Clone)]
40pub struct RedbPathInfoService {
41 instance_name: String,
42
43 db: Arc<Db>,
45}
46
47impl RedbPathInfoService {
48 pub async fn new(
50 instance_name: String,
51 config: RedbPathInfoServiceConfig,
52 ) -> Result<Self, Error> {
53 if let Some(path) = config.path.clone() {
54 if &path == "" {
55 return Err(Error::WrongConfig("empty path is disallowed"));
56 }
57 if &path == "/" {
58 return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
59 }
60
61 if config.read_only {
62 let db =
63 tokio::task::spawn_blocking(|| redb::Database::builder().open_read_only(path))
64 .await??;
65
66 return Ok(Self {
67 instance_name,
68 db: Arc::new(Db::ReadOnly(db)),
69 });
70 }
71
72 if let Some(parent) = path.parent() {
73 tokio::fs::create_dir_all(parent).await?;
74 }
75
76 let db = tokio::task::spawn_blocking(move || {
77 let mut builder = redb::Database::builder();
78 configure_builder(&mut builder, &config);
79
80 let db = builder.create(path)?;
81 create_schema(&db)?;
82 Ok::<_, Error>(db)
83 })
84 .await??;
85
86 Ok(Self {
87 instance_name,
88 db: Arc::new(Db::ReadWrite(db)),
89 })
90 } else {
91 Self::new_temporary(instance_name, config)
92 }
93 }
94
95 pub fn new_temporary(
98 instance_name: String,
99 config: RedbPathInfoServiceConfig,
100 ) -> Result<Self, Error> {
101 debug_assert!(
102 config.path.is_none(),
103 "Snix bug: config.path is not None, but new_temporary requested"
104 );
105
106 if config.read_only {
107 return Err(Error::WrongConfig("in-memory database cannot be read-only"));
108 }
109
110 let mut builder = redb::Database::builder();
111 configure_builder(&mut builder, &config);
112
113 let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
114
115 create_schema(&db)?;
116
117 Ok(RedbPathInfoService {
118 instance_name,
119 db: Arc::new(Db::ReadWrite(db)),
120 })
121 }
122}
123
124fn configure_builder(builder: &mut redb::Builder, config: &RedbPathInfoServiceConfig) {
126 if let Some(cache_size) = config.cache_size {
127 builder.set_cache_size(cache_size);
128 }
129}
130
131#[allow(clippy::result_large_err)]
135fn create_schema(db: &redb::Database) -> Result<(), Error> {
136 let txn = db.begin_write()?;
137 txn.open_table(PATHINFO_TABLE)?;
138 txn.commit()?;
139
140 Ok(())
141}
142
143#[async_trait]
144impl PathInfoService for RedbPathInfoService {
145 #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
146 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
147 let db = self.db.clone();
148
149 let path_info_bytes = match tokio::task::spawn_blocking({
150 move || -> Result<_, Error> {
151 let txn = db.begin_read()?;
152 let table = txn.open_table(PATHINFO_TABLE)?;
153 Ok(table.get(digest)?)
154 }
155 })
156 .await??
157 {
158 None => return Ok(None),
160 Some(path_info_data) => path_info_data.value(),
161 };
162
163 let pathinfo_proto =
164 proto::PathInfo::decode(path_info_bytes.as_slice()).map_err(Error::ProtobufDecode)?;
165 let path_info = PathInfo::try_from(pathinfo_proto).map_err(Error::PathInfoValidation)?;
166
167 return Ok(Some(path_info));
168 }
169
170 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
171 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
172 let db = self.db.clone();
173 tokio::task::spawn_blocking({
174 let path_info = path_info.clone();
175 move || -> Result<(), Error> {
176 let txn = db.begin_write()?;
177 {
178 let mut table = txn.open_table(PATHINFO_TABLE)?;
179 table.insert(
180 *path_info.store_path.digest(),
181 proto::PathInfo::from(path_info).encode_to_vec(),
182 )?;
183 }
184 txn.commit()?;
185 Ok(())
186 }
187 })
188 .await??;
189
190 Ok(path_info)
191 }
192
193 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
194 let db = self.db.clone();
195 let (tx, rx) = tokio::sync::mpsc::channel(64);
196
197 tokio::task::spawn_blocking(move || {
198 let result = (|| -> Result<(), Error> {
200 let read_txn = db.begin_read()?;
201
202 let table = read_txn.open_table(PATHINFO_TABLE)?;
203
204 let table_iter = table.iter()?;
205
206 for elem in table_iter {
207 let path_info_proto = proto::PathInfo::decode(elem?.1.value().as_slice())?;
208
209 let path_info = PathInfo::try_from(path_info_proto)?;
210
211 if tx.blocking_send(Ok(path_info)).is_err() {
212 break;
213 }
214 }
215
216 Ok(())
217 })();
218
219 if let Err(err) = result {
220 let _ = tx.blocking_send(Err(err));
221 }
222 });
223
224 ReceiverStream::new(rx).err_into().boxed()
225 }
226}
227
228#[derive(thiserror::Error, Debug)]
229pub enum Error {
230 #[error("wrong arguments: {0}")]
231 WrongConfig(&'static str),
232 #[error("serde-qs error: {0}")]
233 SerdeQS(#[from] serde_qs::Error),
234
235 #[error("failed to decode protobuf: {0}")]
236 ProtobufDecode(#[from] prost::DecodeError),
237 #[error("failed to validate PathInfo: {0}")]
238 PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
239
240 #[error("unable to open write txn, database opened read-only")]
241 OpenedReadonly,
242
243 #[error("redb commit error: {0}")]
244 RedbCommit(#[from] redb::CommitError),
245 #[error("redb database error: {0}")]
246 RedbDatabase(#[from] redb::DatabaseError),
247 #[error("redb error: {0}")]
248 Redb(#[from] redb::Error),
249 #[error("redb storage error: {0}")]
250 RedbStorage(#[from] redb::StorageError),
251 #[error("redb table error: {0}")]
252 RedbTable(#[from] redb::TableError),
253 #[error("redb txn error: {0}")]
254 RedbTransaction(#[from] redb::TransactionError),
255
256 #[error("join error: {0}")]
257 TokioJoin(#[from] tokio::task::JoinError),
258 #[error("io error: {0}")]
259 IO(#[from] std::io::Error),
260}
261
262#[derive(Clone, Default, serde::Deserialize)]
263#[serde(deny_unknown_fields)]
264pub struct RedbPathInfoServiceConfig {
265 path: Option<PathBuf>,
266
267 cache_size: Option<usize>,
269
270 #[serde(default)]
272 read_only: bool,
273}
274
275impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
276 type Error = Box<dyn std::error::Error + Send + Sync>;
277 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
278 if url.has_host() {
279 return Err(Error::WrongConfig("no host allowed").into());
280 }
281
282 let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
283 ("redb+memory", false, "") => None,
284 ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
285 "redb+memory with path is disallowed",
286 )))?,
287 ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
288 "redb+memory may not have authority",
289 )))?,
290 ("redb", _, "") => Err(Box::new(Error::WrongConfig(
291 "redb without path is disallowed, use redb+memory if you want in-memory",
292 )))?,
293 ("redb", _, path) => Some(path.into()),
294 (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
295 };
296
297 let mut config: RedbPathInfoServiceConfig =
298 serde_qs::from_str(url.query().unwrap_or_default())?;
299
300 config.path = path;
301
302 Ok(config)
303 }
304}
305
306#[async_trait]
307impl ServiceBuilder for RedbPathInfoServiceConfig {
308 type Output = dyn PathInfoService;
309 async fn build<'a>(
310 &'a self,
311 instance_name: &str,
312 _context: &CompositionContext,
313 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
314 Ok(Arc::new(
315 RedbPathInfoService::new(instance_name.to_string(), self.to_owned()).await?,
316 ))
317 }
318}