snix_store/pathinfoservice/
redb.rs1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableTable, TableDefinition};
7use snix_castore::{
8 Error,
9 composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18pub struct RedbPathInfoService {
22 instance_name: String,
23 db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29 pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32 if path == PathBuf::from("/") {
33 return Err(Error::StorageError(
34 "cowardly refusing to open / with redb".to_string(),
35 ));
36 }
37
38 let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
39 let db = redb::Database::create(path)?;
40 create_schema(&db)?;
41 Ok(db)
42 })
43 .await??;
44
45 Ok(Self {
46 instance_name,
47 db: Arc::new(db),
48 })
49 }
50
51 pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
53 let db =
54 redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
55
56 create_schema(&db)?;
57
58 Ok(Self {
59 instance_name,
60 db: Arc::new(db),
61 })
62 }
63}
64
65fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
69 let txn = db.begin_write()?;
70 txn.open_table(PATHINFO_TABLE)?;
71 txn.commit()?;
72
73 Ok(())
74}
75
76#[async_trait]
77impl PathInfoService for RedbPathInfoService {
78 #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
79 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
80 let db = self.db.clone();
81
82 tokio::task::spawn_blocking({
83 move || {
84 let txn = db.begin_read()?;
85 let table = txn.open_table(PATHINFO_TABLE)?;
86 match table.get(digest)? {
87 Some(pathinfo_bytes) => Ok(Some(
88 proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
89 .map_err(|e| {
90 warn!(err=%e, "failed to decode stored PathInfo");
91 Error::StorageError("failed to decode stored PathInfo".to_string())
92 })?
93 .try_into()
94 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
95 )),
96 None => Ok(None),
97 }
98 }
99 })
100 .await?
101 }
102
103 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
104 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
105 let db = self.db.clone();
106
107 tokio::task::spawn_blocking({
108 let path_info = path_info.clone();
109 move || -> Result<(), Error> {
110 let txn = db.begin_write()?;
111 {
112 let mut table = txn.open_table(PATHINFO_TABLE)?;
113 table
114 .insert(
115 *path_info.store_path.digest(),
116 proto::PathInfo::from(path_info).encode_to_vec(),
117 )
118 .map_err(|e| {
119 warn!(err=%e, "failed to insert PathInfo");
120 Error::StorageError("failed to insert PathInfo".to_string())
121 })?;
122 }
123 Ok(txn.commit()?)
124 }
125 })
126 .await??;
127
128 Ok(path_info)
129 }
130
131 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
132 let db = self.db.clone();
133 let (tx, rx) = tokio::sync::mpsc::channel(50);
134
135 tokio::task::spawn_blocking({
137 move || -> Result<(), Error> {
138 let read_txn = db.begin_read()?;
139 let table = read_txn.open_table(PATHINFO_TABLE)?;
140
141 for elem in table.iter()? {
142 let elem = elem?;
143 tokio::runtime::Handle::current()
144 .block_on(tx.send(Ok({
145 let path_info_proto = proto::PathInfo::decode(
146 elem.1.value().as_slice(),
147 )
148 .map_err(|e| {
149 warn!(err=%e, "invalid PathInfo");
150 Error::StorageError("invalid PathInfo".to_string())
151 })?;
152 PathInfo::try_from(path_info_proto).map_err(|e| {
153 Error::StorageError(format!("Invalid path info: {e}"))
154 })?
155 })))
156 .map_err(|e| Error::StorageError(e.to_string()))?;
157 }
158
159 Ok(())
160 }
161 });
162
163 ReceiverStream::from(rx).boxed()
164 }
165}
166
167#[derive(serde::Deserialize)]
168#[serde(deny_unknown_fields)]
169pub struct RedbPathInfoServiceConfig {
170 is_temporary: bool,
171 #[serde(default)]
172 path: Option<PathBuf>,
174}
175
176impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
177 type Error = Box<dyn std::error::Error + Send + Sync>;
178 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
179 if url.has_host() {
181 return Err(Error::StorageError("no host allowed".to_string()).into());
182 }
183
184 Ok(if url.path().is_empty() {
185 RedbPathInfoServiceConfig {
186 is_temporary: true,
187 path: None,
188 }
189 } else {
190 RedbPathInfoServiceConfig {
191 is_temporary: false,
192 path: Some(url.path().into()),
193 }
194 })
195 }
196}
197
198#[async_trait]
199impl ServiceBuilder for RedbPathInfoServiceConfig {
200 type Output = dyn PathInfoService;
201 async fn build<'a>(
202 &'a self,
203 instance_name: &str,
204 _context: &CompositionContext,
205 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
206 match self {
207 RedbPathInfoServiceConfig {
208 is_temporary: true,
209 path: None,
210 } => Ok(Arc::new(RedbPathInfoService::new_temporary(
211 instance_name.to_string(),
212 )?)),
213 RedbPathInfoServiceConfig {
214 is_temporary: true,
215 path: Some(_),
216 } => Err(
217 Error::StorageError("Temporary RedbPathInfoService can not have path".into())
218 .into(),
219 ),
220 RedbPathInfoServiceConfig {
221 is_temporary: false,
222 path: None,
223 } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
224 RedbPathInfoServiceConfig {
225 is_temporary: false,
226 path: Some(path),
227 } => Ok(Arc::new(
228 RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
229 )),
230 }
231 }
232}