snix_store/pathinfoservice/
redb.rs1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::{
8 Error,
9 composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18pub struct RedbPathInfoService {
22 instance_name: String,
23 db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32 if &path == "/" {
33 return Err(Error::StorageError(
34 "cowardly refusing to open / with redb".to_string(),
35 ));
36 }
37
38 if let Some(parent) = path.parent() {
39 std::fs::create_dir_all(parent)?;
40 }
41
42 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
43 let db = redb::Database::builder().create(path)?;
44
45 create_schema(&db)?;
46 Ok(db)
47 })
48 .await??;
49
50 Ok(Self {
51 instance_name,
52 db: Arc::new(db),
53 })
54 }
55
56 pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
58 let db =
59 redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
60
61 create_schema(&db)?;
62
63 Ok(Self {
64 instance_name,
65 db: Arc::new(db),
66 })
67 }
68}
69
70#[allow(clippy::result_large_err)]
74fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
75 let txn = db.begin_write()?;
76 txn.open_table(PATHINFO_TABLE)?;
77 txn.commit()?;
78
79 Ok(())
80}
81
82#[async_trait]
83impl PathInfoService for RedbPathInfoService {
84 #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
85 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
86 let db = self.db.clone();
87
88 tokio::task::spawn_blocking({
89 move || {
90 let txn = db.begin_read()?;
91 let table = txn.open_table(PATHINFO_TABLE)?;
92 match table.get(digest)? {
93 Some(pathinfo_bytes) => Ok(Some(
94 proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
95 .map_err(|e| {
96 warn!(err=%e, "failed to decode stored PathInfo");
97 Error::StorageError("failed to decode stored PathInfo".to_string())
98 })?
99 .try_into()
100 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
101 )),
102 None => Ok(None),
103 }
104 }
105 })
106 .await?
107 }
108
109 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
110 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
111 let db = self.db.clone();
112
113 tokio::task::spawn_blocking({
114 let path_info = path_info.clone();
115 move || -> Result<(), Error> {
116 let txn = db.begin_write()?;
117 {
118 let mut table = txn.open_table(PATHINFO_TABLE)?;
119 table
120 .insert(
121 *path_info.store_path.digest(),
122 proto::PathInfo::from(path_info).encode_to_vec(),
123 )
124 .map_err(|e| {
125 warn!(err=%e, "failed to insert PathInfo");
126 Error::StorageError("failed to insert PathInfo".to_string())
127 })?;
128 }
129 Ok(txn.commit()?)
130 }
131 })
132 .await??;
133
134 Ok(path_info)
135 }
136
137 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
138 let db = self.db.clone();
139 let (tx, rx) = tokio::sync::mpsc::channel(64);
140
141 tokio::task::spawn_blocking(move || {
142 let result = (|| -> Result<(), Error> {
144 let read_txn = db.begin_read().map_err(|err| {
145 warn!(%err, "failed to open read transaction");
146 Error::StorageError("failed to open read transaction".to_string())
147 })?;
148
149 let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| {
150 warn!(%err, "failed to open table");
151 Error::StorageError("failed to open table".to_string())
152 })?;
153
154 let table_iter = table.iter().map_err(|err| {
155 warn!(%err, "failed to iterate over table items");
156 Error::StorageError("failed to iterate over table items".into())
157 })?;
158
159 for elem in table_iter {
160 let elem = elem.map_err(|err| {
161 warn!(%err, "failed to retrieve item");
162 Error::StorageError("failed to retrieve item".into())
163 })?;
164
165 let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice())
166 .map_err(|err| {
167 warn!(%err, "invalid PathInfo");
168 Error::StorageError("invalid PathInfo".into())
169 })?;
170
171 let path_info = PathInfo::try_from(path_info_proto)
172 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
173
174 if tx.blocking_send(Ok(path_info)).is_err() {
175 break;
176 }
177 }
178
179 Ok(())
180 })();
181
182 if let Err(err) = result {
183 let _ = tx.blocking_send(Err(err));
184 }
185 });
186
187 ReceiverStream::new(rx).boxed()
188 }
189}
190
191#[derive(serde::Deserialize)]
192#[serde(deny_unknown_fields)]
193pub struct RedbPathInfoServiceConfig {
194 is_temporary: bool,
195 #[serde(default)]
196 path: Option<PathBuf>,
198}
199
200impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
201 type Error = Box<dyn std::error::Error + Send + Sync>;
202 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
203 if url.has_host() {
205 return Err(Error::StorageError("no host allowed".to_string()).into());
206 }
207
208 Ok(if url.path().is_empty() {
209 RedbPathInfoServiceConfig {
210 is_temporary: true,
211 path: None,
212 }
213 } else {
214 RedbPathInfoServiceConfig {
215 is_temporary: false,
216 path: Some(url.path().into()),
217 }
218 })
219 }
220}
221
222#[async_trait]
223impl ServiceBuilder for RedbPathInfoServiceConfig {
224 type Output = dyn PathInfoService;
225 async fn build<'a>(
226 &'a self,
227 instance_name: &str,
228 _context: &CompositionContext,
229 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
230 match self {
231 RedbPathInfoServiceConfig {
232 is_temporary: true,
233 path: None,
234 } => Ok(Arc::new(RedbPathInfoService::new_temporary(
235 instance_name.to_string(),
236 )?)),
237 RedbPathInfoServiceConfig {
238 is_temporary: true,
239 path: Some(_),
240 } => Err(
241 Error::StorageError("Temporary RedbPathInfoService can not have path".into())
242 .into(),
243 ),
244 RedbPathInfoServiceConfig {
245 is_temporary: false,
246 path: None,
247 } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
248 RedbPathInfoServiceConfig {
249 is_temporary: false,
250 path: Some(path),
251 } => Ok(Arc::new(
252 RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
253 )),
254 }
255 }
256}