snix_castore/directoryservice/
redb.rs1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{info, instrument, warn};
7
8use super::{
9 Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
10 traverse_directory,
11};
12use crate::{
13 B3Digest, Error,
14 composition::{CompositionContext, ServiceBuilder},
15 proto,
16};
17
18const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
19 TableDefinition::new("directory");
20
21#[derive(Clone)]
22pub struct RedbDirectoryService {
23 instance_name: String,
24 db: Arc<Database>,
27}
28
29impl RedbDirectoryService {
30 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
33 if path == PathBuf::from("/") {
34 return Err(Error::StorageError(
35 "cowardly refusing to open / with redb".to_string(),
36 ));
37 }
38
39 if let Some(parent) = path.parent() {
40 std::fs::create_dir_all(parent)?;
41 }
42
43 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
44 let mut db = redb::Database::builder()
45 .create_with_file_format_v3(true)
46 .create(path)?;
47
48 if db.upgrade()? {
50 info!("Upgraded database format");
51 };
52
53 create_schema(&db)?;
54 Ok(db)
55 })
56 .await??;
57
58 Ok(Self {
59 instance_name,
60 db: Arc::new(db),
61 })
62 }
63
64 pub fn new_temporary() -> Result<Self, Error> {
66 let db = redb::Database::builder()
67 .create_with_file_format_v3(true)
68 .create_with_backend(redb::backends::InMemoryBackend::new())?;
69
70 create_schema(&db)?;
71
72 Ok(Self {
73 instance_name: "root".into(),
74 db: Arc::new(db),
75 })
76 }
77}
78
79#[allow(clippy::result_large_err)]
83fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
84 let txn = db.begin_write()?;
85 txn.open_table(DIRECTORY_TABLE)?;
86 txn.commit()?;
87
88 Ok(())
89}
90
91#[async_trait]
92impl DirectoryService for RedbDirectoryService {
93 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
94 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
95 let db = self.db.clone();
96
97 let db_get_resp = tokio::task::spawn_blocking({
99 let digest = *digest.as_ref();
100 move || -> Result<_, redb::Error> {
101 let txn = db.begin_read()?;
102 let table = txn.open_table(DIRECTORY_TABLE)?;
103 Ok(table.get(digest)?)
104 }
105 })
106 .await?
107 .map_err(|e| {
108 warn!(err=%e, "failed to retrieve Directory");
109 Error::StorageError("failed to retrieve Directory".to_string())
110 })?;
111
112 let directory_data = match db_get_resp {
114 None => return Ok(None),
115 Some(d) => d,
116 };
117
118 let actual_digest = blake3::hash(directory_data.value().as_slice());
120 if actual_digest.as_bytes() != digest.as_slice() {
121 warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
122 return Err(Error::StorageError(
123 "requested Directory got the wrong digest".to_string(),
124 ));
125 }
126
127 let directory = match proto::Directory::decode(&*directory_data.value()) {
130 Ok(dir) => {
131 dir.try_into().map_err(|e| {
133 warn!(err=%e, "Directory failed validation");
134 Error::StorageError("Directory failed validation".to_string())
135 })?
136 }
137 Err(e) => {
138 warn!(err=%e, "failed to parse Directory");
139 return Err(Error::StorageError("failed to parse Directory".to_string()));
140 }
141 };
142
143 Ok(Some(directory))
144 }
145
146 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
147 async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
148 tokio::task::spawn_blocking({
149 let db = self.db.clone();
150 move || {
151 let digest = directory.digest();
152
153 let txn = db.begin_write()?;
155 {
156 let mut table = txn.open_table(DIRECTORY_TABLE)?;
157 table.insert(
158 digest.as_ref(),
159 proto::Directory::from(directory).encode_to_vec(),
160 )?;
161 }
162 txn.commit()?;
163
164 Ok(digest)
165 }
166 })
167 .await?
168 }
169
170 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
171 fn get_recursive(
172 &self,
173 root_directory_digest: &B3Digest,
174 ) -> BoxStream<'static, Result<Directory, Error>> {
175 traverse_directory(self.clone(), root_directory_digest)
179 }
180
181 #[instrument(skip_all)]
182 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
183 Box::new(RedbDirectoryPutter {
184 db: &self.db,
185 directory_validator: Some(Default::default()),
186 })
187 }
188}
189
190pub struct RedbDirectoryPutter<'a> {
191 db: &'a Database,
192
193 directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
196}
197
198#[async_trait]
199impl DirectoryPutter for RedbDirectoryPutter<'_> {
200 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
201 async fn put(&mut self, directory: Directory) -> Result<(), Error> {
202 match self.directory_validator {
203 None => return Err(Error::StorageError("already closed".to_string())),
204 Some(ref mut validator) => {
205 validator
206 .add(directory)
207 .map_err(|e| Error::StorageError(e.to_string()))?;
208 }
209 }
210
211 Ok(())
212 }
213
214 #[instrument(level = "trace", skip_all, ret, err)]
215 async fn close(&mut self) -> Result<B3Digest, Error> {
216 match self.directory_validator.take() {
217 None => Err(Error::StorageError("already closed".to_string())),
218 Some(validator) => {
219 tokio::task::spawn_blocking({
221 let txn = self.db.begin_write()?;
222 move || {
223 let directories = validator
225 .validate()
226 .map_err(|e| Error::StorageError(e.to_string()))?
227 .drain_leaves_to_root()
228 .collect::<Vec<_>>();
229
230 let root_digest = directories
232 .last()
233 .ok_or_else(|| Error::StorageError("got no directories".to_string()))?
234 .digest();
235
236 {
237 let mut table = txn.open_table(DIRECTORY_TABLE)?;
238
239 for directory in directories {
242 table.insert(
243 directory.digest().as_ref(),
244 proto::Directory::from(directory).encode_to_vec(),
245 )?;
246 }
247 }
248
249 txn.commit()?;
250
251 Ok(root_digest)
252 }
253 })
254 .await?
255 }
256 }
257 }
258}
259
260#[derive(serde::Deserialize)]
261#[serde(deny_unknown_fields)]
262pub struct RedbDirectoryServiceConfig {
263 is_temporary: bool,
264 #[serde(default)]
265 path: Option<PathBuf>,
267}
268
269impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
270 type Error = Box<dyn std::error::Error + Send + Sync>;
271 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
272 if url.has_host() {
275 return Err(Error::StorageError("no host allowed".to_string()).into());
276 }
277
278 Ok(if url.path().is_empty() {
279 RedbDirectoryServiceConfig {
280 is_temporary: true,
281 path: None,
282 }
283 } else {
284 RedbDirectoryServiceConfig {
285 is_temporary: false,
286 path: Some(url.path().into()),
287 }
288 })
289 }
290}
291
292#[async_trait]
293impl ServiceBuilder for RedbDirectoryServiceConfig {
294 type Output = dyn DirectoryService;
295 async fn build<'a>(
296 &'a self,
297 instance_name: &str,
298 _context: &CompositionContext,
299 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
300 match self {
301 RedbDirectoryServiceConfig {
302 is_temporary: true,
303 path: None,
304 } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
305 RedbDirectoryServiceConfig {
306 is_temporary: true,
307 path: Some(_),
308 } => Err(Error::StorageError(
309 "Temporary RedbDirectoryService can not have path".into(),
310 )
311 .into()),
312 RedbDirectoryServiceConfig {
313 is_temporary: false,
314 path: None,
315 } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
316 RedbDirectoryServiceConfig {
317 is_temporary: false,
318 path: Some(path),
319 } => Ok(Arc::new(
320 RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
321 )),
322 }
323 }
324}