snix_store/pathinfoservice/
redb.rs1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableTable, TableDefinition};
7use snix_castore::{
8 Error,
9 composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{info, instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18pub struct RedbPathInfoService {
22 instance_name: String,
23 db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32 if path == PathBuf::from("/") {
33 return Err(Error::StorageError(
34 "cowardly refusing to open / with redb".to_string(),
35 ));
36 }
37
38 if let Some(parent) = path.parent() {
39 std::fs::create_dir_all(parent)?;
40 }
41
42 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
43 let mut db = redb::Database::builder()
44 .create_with_file_format_v3(true)
45 .create(path)?;
46
47 if db.upgrade()? {
49 info!("Upgraded database format");
50 };
51
52 create_schema(&db)?;
53 Ok(db)
54 })
55 .await??;
56
57 Ok(Self {
58 instance_name,
59 db: Arc::new(db),
60 })
61 }
62
63 pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
65 let db = redb::Database::builder()
66 .create_with_file_format_v3(true)
67 .create_with_backend(redb::backends::InMemoryBackend::new())?;
68
69 create_schema(&db)?;
70
71 Ok(Self {
72 instance_name,
73 db: Arc::new(db),
74 })
75 }
76}
77
78#[allow(clippy::result_large_err)]
82fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
83 let txn = db.begin_write()?;
84 txn.open_table(PATHINFO_TABLE)?;
85 txn.commit()?;
86
87 Ok(())
88}
89
90#[async_trait]
91impl PathInfoService for RedbPathInfoService {
92 #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
93 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
94 let db = self.db.clone();
95
96 tokio::task::spawn_blocking({
97 move || {
98 let txn = db.begin_read()?;
99 let table = txn.open_table(PATHINFO_TABLE)?;
100 match table.get(digest)? {
101 Some(pathinfo_bytes) => Ok(Some(
102 proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
103 .map_err(|e| {
104 warn!(err=%e, "failed to decode stored PathInfo");
105 Error::StorageError("failed to decode stored PathInfo".to_string())
106 })?
107 .try_into()
108 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
109 )),
110 None => Ok(None),
111 }
112 }
113 })
114 .await?
115 }
116
117 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
118 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
119 let db = self.db.clone();
120
121 tokio::task::spawn_blocking({
122 let path_info = path_info.clone();
123 move || -> Result<(), Error> {
124 let txn = db.begin_write()?;
125 {
126 let mut table = txn.open_table(PATHINFO_TABLE)?;
127 table
128 .insert(
129 *path_info.store_path.digest(),
130 proto::PathInfo::from(path_info).encode_to_vec(),
131 )
132 .map_err(|e| {
133 warn!(err=%e, "failed to insert PathInfo");
134 Error::StorageError("failed to insert PathInfo".to_string())
135 })?;
136 }
137 Ok(txn.commit()?)
138 }
139 })
140 .await??;
141
142 Ok(path_info)
143 }
144
145 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
146 let db = self.db.clone();
147 let (tx, rx) = tokio::sync::mpsc::channel(64);
148
149 tokio::task::spawn_blocking(move || {
150 let result = (|| -> Result<(), Error> {
152 let read_txn = db.begin_read().map_err(|err| {
153 warn!(%err, "failed to open read transaction");
154 Error::StorageError("failed to open read transaction".to_string())
155 })?;
156
157 let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| {
158 warn!(%err, "failed to open table");
159 Error::StorageError("failed to open table".to_string())
160 })?;
161
162 let table_iter = table.iter().map_err(|err| {
163 warn!(%err, "failed to iterate over table items");
164 Error::StorageError("failed to iterate over table items".into())
165 })?;
166
167 for elem in table_iter {
168 let elem = elem.map_err(|err| {
169 warn!(%err, "failed to retrieve item");
170 Error::StorageError("failed to retrieve item".into())
171 })?;
172
173 let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice())
174 .map_err(|err| {
175 warn!(%err, "invalid PathInfo");
176 Error::StorageError("invalid PathInfo".into())
177 })?;
178
179 let path_info = PathInfo::try_from(path_info_proto)
180 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
181
182 if tx.blocking_send(Ok(path_info)).is_err() {
183 break;
184 }
185 }
186
187 Ok(())
188 })();
189
190 if let Err(err) = result {
191 let _ = tx.blocking_send(Err(err));
192 }
193 });
194
195 ReceiverStream::new(rx).boxed()
196 }
197}
198
199#[derive(serde::Deserialize)]
200#[serde(deny_unknown_fields)]
201pub struct RedbPathInfoServiceConfig {
202 is_temporary: bool,
203 #[serde(default)]
204 path: Option<PathBuf>,
206}
207
208impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
209 type Error = Box<dyn std::error::Error + Send + Sync>;
210 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
211 if url.has_host() {
213 return Err(Error::StorageError("no host allowed".to_string()).into());
214 }
215
216 Ok(if url.path().is_empty() {
217 RedbPathInfoServiceConfig {
218 is_temporary: true,
219 path: None,
220 }
221 } else {
222 RedbPathInfoServiceConfig {
223 is_temporary: false,
224 path: Some(url.path().into()),
225 }
226 })
227 }
228}
229
230#[async_trait]
231impl ServiceBuilder for RedbPathInfoServiceConfig {
232 type Output = dyn PathInfoService;
233 async fn build<'a>(
234 &'a self,
235 instance_name: &str,
236 _context: &CompositionContext,
237 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
238 match self {
239 RedbPathInfoServiceConfig {
240 is_temporary: true,
241 path: None,
242 } => Ok(Arc::new(RedbPathInfoService::new_temporary(
243 instance_name.to_string(),
244 )?)),
245 RedbPathInfoServiceConfig {
246 is_temporary: true,
247 path: Some(_),
248 } => Err(
249 Error::StorageError("Temporary RedbPathInfoService can not have path".into())
250 .into(),
251 ),
252 RedbPathInfoServiceConfig {
253 is_temporary: false,
254 path: None,
255 } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
256 RedbPathInfoServiceConfig {
257 is_temporary: false,
258 path: Some(path),
259 } => Ok(Arc::new(
260 RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
261 )),
262 }
263 }
264}