snix_castore/directoryservice/
redb.rs1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{
9 Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
10 traverse_directory,
11};
12use crate::{
13 B3Digest, Error,
14 composition::{CompositionContext, ServiceBuilder},
15 proto,
16};
17
18const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
19 TableDefinition::new("directory");
20
21#[derive(Clone)]
22pub struct RedbDirectoryService {
23 instance_name: String,
24 db: Arc<Database>,
27}
28
29impl RedbDirectoryService {
30 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
33 if path == PathBuf::from("/") {
34 return Err(Error::StorageError(
35 "cowardly refusing to open / with redb".to_string(),
36 ));
37 }
38
39 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
40 let db = redb::Database::create(path)?;
41 create_schema(&db)?;
42 Ok(db)
43 })
44 .await??;
45
46 Ok(Self {
47 instance_name,
48 db: Arc::new(db),
49 })
50 }
51
52 pub fn new_temporary() -> Result<Self, Error> {
54 let db =
55 redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
56
57 create_schema(&db)?;
58
59 Ok(Self {
60 instance_name: "root".into(),
61 db: Arc::new(db),
62 })
63 }
64}
65
66fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
70 let txn = db.begin_write()?;
71 txn.open_table(DIRECTORY_TABLE)?;
72 txn.commit()?;
73
74 Ok(())
75}
76
77#[async_trait]
78impl DirectoryService for RedbDirectoryService {
79 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
80 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
81 let db = self.db.clone();
82
83 let db_get_resp = tokio::task::spawn_blocking({
85 let digest = *digest.as_ref();
86 move || -> Result<_, redb::Error> {
87 let txn = db.begin_read()?;
88 let table = txn.open_table(DIRECTORY_TABLE)?;
89 Ok(table.get(digest)?)
90 }
91 })
92 .await?
93 .map_err(|e| {
94 warn!(err=%e, "failed to retrieve Directory");
95 Error::StorageError("failed to retrieve Directory".to_string())
96 })?;
97
98 let directory_data = match db_get_resp {
100 None => return Ok(None),
101 Some(d) => d,
102 };
103
104 let actual_digest = blake3::hash(directory_data.value().as_slice());
106 if actual_digest.as_bytes() != digest.as_slice() {
107 warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
108 return Err(Error::StorageError(
109 "requested Directory got the wrong digest".to_string(),
110 ));
111 }
112
113 let directory = match proto::Directory::decode(&*directory_data.value()) {
116 Ok(dir) => {
117 dir.try_into().map_err(|e| {
119 warn!(err=%e, "Directory failed validation");
120 Error::StorageError("Directory failed validation".to_string())
121 })?
122 }
123 Err(e) => {
124 warn!(err=%e, "failed to parse Directory");
125 return Err(Error::StorageError("failed to parse Directory".to_string()));
126 }
127 };
128
129 Ok(Some(directory))
130 }
131
132 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
133 async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
134 tokio::task::spawn_blocking({
135 let db = self.db.clone();
136 move || {
137 let digest = directory.digest();
138
139 let txn = db.begin_write()?;
141 {
142 let mut table = txn.open_table(DIRECTORY_TABLE)?;
143 table.insert(
144 digest.as_ref(),
145 proto::Directory::from(directory).encode_to_vec(),
146 )?;
147 }
148 txn.commit()?;
149
150 Ok(digest)
151 }
152 })
153 .await?
154 }
155
156 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
157 fn get_recursive(
158 &self,
159 root_directory_digest: &B3Digest,
160 ) -> BoxStream<'static, Result<Directory, Error>> {
161 traverse_directory(self.clone(), root_directory_digest)
165 }
166
167 #[instrument(skip_all)]
168 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
169 Box::new(RedbDirectoryPutter {
170 db: &self.db,
171 directory_validator: Some(Default::default()),
172 })
173 }
174}
175
176pub struct RedbDirectoryPutter<'a> {
177 db: &'a Database,
178
179 directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
182}
183
184#[async_trait]
185impl DirectoryPutter for RedbDirectoryPutter<'_> {
186 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
187 async fn put(&mut self, directory: Directory) -> Result<(), Error> {
188 match self.directory_validator {
189 None => return Err(Error::StorageError("already closed".to_string())),
190 Some(ref mut validator) => {
191 validator
192 .add(directory)
193 .map_err(|e| Error::StorageError(e.to_string()))?;
194 }
195 }
196
197 Ok(())
198 }
199
200 #[instrument(level = "trace", skip_all, ret, err)]
201 async fn close(&mut self) -> Result<B3Digest, Error> {
202 match self.directory_validator.take() {
203 None => Err(Error::StorageError("already closed".to_string())),
204 Some(validator) => {
205 tokio::task::spawn_blocking({
207 let txn = self.db.begin_write()?;
208 move || {
209 let directories = validator
211 .validate()
212 .map_err(|e| Error::StorageError(e.to_string()))?
213 .drain_leaves_to_root()
214 .collect::<Vec<_>>();
215
216 let root_digest = directories
218 .last()
219 .ok_or_else(|| Error::StorageError("got no directories".to_string()))?
220 .digest();
221
222 {
223 let mut table = txn.open_table(DIRECTORY_TABLE)?;
224
225 for directory in directories {
228 table.insert(
229 directory.digest().as_ref(),
230 proto::Directory::from(directory).encode_to_vec(),
231 )?;
232 }
233 }
234
235 txn.commit()?;
236
237 Ok(root_digest)
238 }
239 })
240 .await?
241 }
242 }
243 }
244}
245
246#[derive(serde::Deserialize)]
247#[serde(deny_unknown_fields)]
248pub struct RedbDirectoryServiceConfig {
249 is_temporary: bool,
250 #[serde(default)]
251 path: Option<PathBuf>,
253}
254
255impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
256 type Error = Box<dyn std::error::Error + Send + Sync>;
257 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
258 if url.has_host() {
261 return Err(Error::StorageError("no host allowed".to_string()).into());
262 }
263
264 Ok(if url.path().is_empty() {
265 RedbDirectoryServiceConfig {
266 is_temporary: true,
267 path: None,
268 }
269 } else {
270 RedbDirectoryServiceConfig {
271 is_temporary: false,
272 path: Some(url.path().into()),
273 }
274 })
275 }
276}
277
278#[async_trait]
279impl ServiceBuilder for RedbDirectoryServiceConfig {
280 type Output = dyn DirectoryService;
281 async fn build<'a>(
282 &'a self,
283 instance_name: &str,
284 _context: &CompositionContext,
285 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
286 match self {
287 RedbDirectoryServiceConfig {
288 is_temporary: true,
289 path: None,
290 } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
291 RedbDirectoryServiceConfig {
292 is_temporary: true,
293 path: Some(_),
294 } => Err(Error::StorageError(
295 "Temporary RedbDirectoryService can not have path".into(),
296 )
297 .into()),
298 RedbDirectoryServiceConfig {
299 is_temporary: false,
300 path: None,
301 } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
302 RedbDirectoryServiceConfig {
303 is_temporary: false,
304 path: Some(path),
305 } => Ok(Arc::new(
306 RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
307 )),
308 }
309 }
310}