snix_castore/directoryservice/
redb.rs1use futures::stream::BoxStream;
2use prost::Message;
3use redb::{Database, ReadableDatabase, TableDefinition};
4use std::{path::PathBuf, sync::Arc};
5use tonic::async_trait;
6use tracing::{instrument, warn};
7
8use super::{Directory, DirectoryPutter, DirectoryService, traverse_directory};
9use crate::{
10 B3Digest, Error,
11 composition::{CompositionContext, ServiceBuilder},
12 directoryservice::directory_graph::{DirectoryGraphBuilder, DirectoryOrder},
13 proto,
14};
15
16const DIRECTORY_TABLE: TableDefinition<[u8; B3Digest::LENGTH], Vec<u8>> =
17 TableDefinition::new("directory");
18
19#[derive(Clone)]
20pub struct RedbDirectoryService {
21 instance_name: String,
22 db: Arc<Database>,
25}
26
27impl RedbDirectoryService {
28 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
31 if &path == "/" {
32 return Err(Error::StorageError(
33 "cowardly refusing to open / with redb".to_string(),
34 ));
35 }
36
37 if let Some(parent) = path.parent() {
38 std::fs::create_dir_all(parent)?;
39 }
40
41 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
42 let db = redb::Database::builder().create(path)?;
43
44 create_schema(&db)?;
45 Ok(db)
46 })
47 .await??;
48
49 Ok(Self {
50 instance_name,
51 db: Arc::new(db),
52 })
53 }
54
55 pub fn new_temporary() -> Result<Self, Error> {
57 let db =
58 redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
59
60 create_schema(&db)?;
61
62 Ok(Self {
63 instance_name: "root".into(),
64 db: Arc::new(db),
65 })
66 }
67}
68
69#[allow(clippy::result_large_err)]
73fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
74 let txn = db.begin_write()?;
75 txn.open_table(DIRECTORY_TABLE)?;
76 txn.commit()?;
77
78 Ok(())
79}
80
81#[async_trait]
82impl DirectoryService for RedbDirectoryService {
83 #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
84 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
85 let db = self.db.clone();
86
87 let db_get_resp = tokio::task::spawn_blocking({
89 let digest = *digest.as_ref();
90 move || -> Result<_, redb::Error> {
91 let txn = db.begin_read()?;
92 let table = txn.open_table(DIRECTORY_TABLE)?;
93 Ok(table.get(digest)?)
94 }
95 })
96 .await?
97 .map_err(|e| {
98 warn!(err=%e, "failed to retrieve Directory");
99 Error::StorageError("failed to retrieve Directory".to_string())
100 })?;
101
102 let directory_data = match db_get_resp {
104 None => return Ok(None),
105 Some(d) => d,
106 };
107
108 let actual_digest = blake3::hash(directory_data.value().as_slice());
110 if actual_digest.as_bytes() != digest.as_slice() {
111 warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
112 return Err(Error::StorageError(
113 "requested Directory got the wrong digest".to_string(),
114 ));
115 }
116
117 let directory = match proto::Directory::decode(&*directory_data.value()) {
120 Ok(dir) => {
121 dir.try_into().map_err(|e| {
123 warn!(err=%e, "Directory failed validation");
124 Error::StorageError("Directory failed validation".to_string())
125 })?
126 }
127 Err(e) => {
128 warn!(err=%e, "failed to parse Directory");
129 return Err(Error::StorageError("failed to parse Directory".to_string()));
130 }
131 };
132
133 Ok(Some(directory))
134 }
135
136 #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
137 async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
138 tokio::task::spawn_blocking({
139 let db = self.db.clone();
140 move || {
141 let digest = directory.digest();
142
143 let txn = db.begin_write()?;
145 {
146 let mut table = txn.open_table(DIRECTORY_TABLE)?;
147 table.insert(
148 digest.as_ref(),
149 proto::Directory::from(directory).encode_to_vec(),
150 )?;
151 }
152 txn.commit()?;
153
154 Ok(digest)
155 }
156 })
157 .await?
158 }
159
160 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
161 fn get_recursive(
162 &self,
163 root_directory_digest: &B3Digest,
164 ) -> BoxStream<'static, Result<Directory, Error>> {
165 traverse_directory(self.clone(), root_directory_digest)
169 }
170
171 #[instrument(skip_all)]
172 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
173 Box::new(RedbDirectoryPutter {
174 db: &self.db,
175 builder: Some(DirectoryGraphBuilder::new_with_insertion_order(
176 DirectoryOrder::LeavesToRoot,
177 )),
178 })
179 }
180}
181
182pub struct RedbDirectoryPutter<'a> {
183 db: &'a Database,
184
185 builder: Option<DirectoryGraphBuilder>,
188}
189
190#[async_trait]
191impl DirectoryPutter for RedbDirectoryPutter<'_> {
192 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
193 async fn put(&mut self, directory: Directory) -> Result<(), Error> {
194 let builder = self
195 .builder
196 .as_mut()
197 .ok_or_else(|| Error::StorageError("already closed".to_string()))?;
198
199 builder.insert(directory)?;
200
201 Ok(())
202 }
203
204 #[instrument(level = "trace", skip_all, ret, err)]
205 async fn close(&mut self) -> Result<B3Digest, Error> {
206 let builder = self
207 .builder
208 .take()
209 .ok_or_else(|| Error::StorageError("already closed".to_string()))?;
210
211 tokio::task::spawn_blocking({
213 let txn = self.db.begin_write()?;
214 move || {
215 let directory_graph = builder.build()?;
217 let root_digest = directory_graph.root().digest();
218
219 {
222 let mut table = txn.open_table(DIRECTORY_TABLE)?;
223 for directory in directory_graph.drain(DirectoryOrder::LeavesToRoot) {
224 table.insert(
225 directory.digest().as_ref(),
226 proto::Directory::from(directory).encode_to_vec(),
227 )?;
228 }
229 }
230 txn.commit()?;
231
232 Ok(root_digest)
233 }
234 })
235 .await?
236 }
237}
238
239#[derive(serde::Deserialize)]
240#[serde(deny_unknown_fields)]
241pub struct RedbDirectoryServiceConfig {
242 is_temporary: bool,
243 #[serde(default)]
244 path: Option<PathBuf>,
246}
247
248impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
249 type Error = Box<dyn std::error::Error + Send + Sync>;
250 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
251 if url.has_host() {
254 return Err(Error::StorageError("no host allowed".to_string()).into());
255 }
256
257 Ok(if url.path().is_empty() {
258 RedbDirectoryServiceConfig {
259 is_temporary: true,
260 path: None,
261 }
262 } else {
263 RedbDirectoryServiceConfig {
264 is_temporary: false,
265 path: Some(url.path().into()),
266 }
267 })
268 }
269}
270
271#[async_trait]
272impl ServiceBuilder for RedbDirectoryServiceConfig {
273 type Output = dyn DirectoryService;
274 async fn build<'a>(
275 &'a self,
276 instance_name: &str,
277 _context: &CompositionContext,
278 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
279 match self {
280 RedbDirectoryServiceConfig {
281 is_temporary: true,
282 path: None,
283 } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
284 RedbDirectoryServiceConfig {
285 is_temporary: true,
286 path: Some(_),
287 } => Err(Error::StorageError(
288 "Temporary RedbDirectoryService can not have path".into(),
289 )
290 .into()),
291 RedbDirectoryServiceConfig {
292 is_temporary: false,
293 path: None,
294 } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
295 RedbDirectoryServiceConfig {
296 is_temporary: false,
297 path: Some(path),
298 } => Ok(Arc::new(
299 RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
300 )),
301 }
302 }
303}