snix_store/pathinfoservice/
redb.rs

1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableTable, TableDefinition};
7use snix_castore::{
8    Error,
9    composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{info, instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18/// PathInfoService implementation using redb under the hood.
19/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
20/// its corresponding protobuf-encoded PathInfo.
21pub struct RedbPathInfoService {
22    instance_name: String,
23    // We wrap db in an Arc to be able to move it into spawn_blocking,
24    // as discussed in https://github.com/cberner/redb/issues/789
25    db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29    /// Constructs a new instance using the specified file system path for
30    /// storage.
31    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32        if path == PathBuf::from("/") {
33            return Err(Error::StorageError(
34                "cowardly refusing to open / with redb".to_string(),
35            ));
36        }
37
38        if let Some(parent) = path.parent() {
39            std::fs::create_dir_all(parent)?;
40        }
41
42        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
43            let mut db = redb::Database::builder()
44                .create_with_file_format_v3(true)
45                .create(path)?;
46
47            // Upgrade redb database file format.
48            if db.upgrade()? {
49                info!("Upgraded database format");
50            };
51
52            create_schema(&db)?;
53            Ok(db)
54        })
55        .await??;
56
57        Ok(Self {
58            instance_name,
59            db: Arc::new(db),
60        })
61    }
62
63    /// Constructs a new instance using the in-memory backend.
64    pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
65        let db = redb::Database::builder()
66            .create_with_file_format_v3(true)
67            .create_with_backend(redb::backends::InMemoryBackend::new())?;
68
69        create_schema(&db)?;
70
71        Ok(Self {
72            instance_name,
73            db: Arc::new(db),
74        })
75    }
76}
77
78/// Ensures all tables are present.
79/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
80/// create it if not present.
81#[allow(clippy::result_large_err)]
82fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
83    let txn = db.begin_write()?;
84    txn.open_table(PATHINFO_TABLE)?;
85    txn.commit()?;
86
87    Ok(())
88}
89
90#[async_trait]
91impl PathInfoService for RedbPathInfoService {
92    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
93    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
94        let db = self.db.clone();
95
96        tokio::task::spawn_blocking({
97            move || {
98                let txn = db.begin_read()?;
99                let table = txn.open_table(PATHINFO_TABLE)?;
100                match table.get(digest)? {
101                    Some(pathinfo_bytes) => Ok(Some(
102                        proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
103                            .map_err(|e| {
104                                warn!(err=%e, "failed to decode stored PathInfo");
105                                Error::StorageError("failed to decode stored PathInfo".to_string())
106                            })?
107                            .try_into()
108                            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
109                    )),
110                    None => Ok(None),
111                }
112            }
113        })
114        .await?
115    }
116
117    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
118    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
119        let db = self.db.clone();
120
121        tokio::task::spawn_blocking({
122            let path_info = path_info.clone();
123            move || -> Result<(), Error> {
124                let txn = db.begin_write()?;
125                {
126                    let mut table = txn.open_table(PATHINFO_TABLE)?;
127                    table
128                        .insert(
129                            *path_info.store_path.digest(),
130                            proto::PathInfo::from(path_info).encode_to_vec(),
131                        )
132                        .map_err(|e| {
133                            warn!(err=%e, "failed to insert PathInfo");
134                            Error::StorageError("failed to insert PathInfo".to_string())
135                        })?;
136                }
137                Ok(txn.commit()?)
138            }
139        })
140        .await??;
141
142        Ok(path_info)
143    }
144
145    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
146        let db = self.db.clone();
147        let (tx, rx) = tokio::sync::mpsc::channel(64);
148
149        tokio::task::spawn_blocking(move || {
150            // IIFE to be able to use ? for the error cases
151            let result = (|| -> Result<(), Error> {
152                let read_txn = db.begin_read().map_err(|err| {
153                    warn!(%err, "failed to open read transaction");
154                    Error::StorageError("failed to open read transaction".to_string())
155                })?;
156
157                let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| {
158                    warn!(%err, "failed to open table");
159                    Error::StorageError("failed to open table".to_string())
160                })?;
161
162                let table_iter = table.iter().map_err(|err| {
163                    warn!(%err, "failed to iterate over table items");
164                    Error::StorageError("failed to iterate over table items".into())
165                })?;
166
167                for elem in table_iter {
168                    let elem = elem.map_err(|err| {
169                        warn!(%err, "failed to retrieve item");
170                        Error::StorageError("failed to retrieve item".into())
171                    })?;
172
173                    let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice())
174                        .map_err(|err| {
175                            warn!(%err, "invalid PathInfo");
176                            Error::StorageError("invalid PathInfo".into())
177                        })?;
178
179                    let path_info = PathInfo::try_from(path_info_proto)
180                        .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
181
182                    if tx.blocking_send(Ok(path_info)).is_err() {
183                        break;
184                    }
185                }
186
187                Ok(())
188            })();
189
190            if let Err(err) = result {
191                let _ = tx.blocking_send(Err(err));
192            }
193        });
194
195        ReceiverStream::new(rx).boxed()
196    }
197}
198
199#[derive(serde::Deserialize)]
200#[serde(deny_unknown_fields)]
201pub struct RedbPathInfoServiceConfig {
202    is_temporary: bool,
203    #[serde(default)]
204    /// required when is_temporary = false
205    path: Option<PathBuf>,
206}
207
208impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
209    type Error = Box<dyn std::error::Error + Send + Sync>;
210    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
211        // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only)
212        if url.has_host() {
213            return Err(Error::StorageError("no host allowed".to_string()).into());
214        }
215
216        Ok(if url.path().is_empty() {
217            RedbPathInfoServiceConfig {
218                is_temporary: true,
219                path: None,
220            }
221        } else {
222            RedbPathInfoServiceConfig {
223                is_temporary: false,
224                path: Some(url.path().into()),
225            }
226        })
227    }
228}
229
230#[async_trait]
231impl ServiceBuilder for RedbPathInfoServiceConfig {
232    type Output = dyn PathInfoService;
233    async fn build<'a>(
234        &'a self,
235        instance_name: &str,
236        _context: &CompositionContext,
237    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
238        match self {
239            RedbPathInfoServiceConfig {
240                is_temporary: true,
241                path: None,
242            } => Ok(Arc::new(RedbPathInfoService::new_temporary(
243                instance_name.to_string(),
244            )?)),
245            RedbPathInfoServiceConfig {
246                is_temporary: true,
247                path: Some(_),
248            } => Err(
249                Error::StorageError("Temporary RedbPathInfoService can not have path".into())
250                    .into(),
251            ),
252            RedbPathInfoServiceConfig {
253                is_temporary: false,
254                path: None,
255            } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
256            RedbPathInfoServiceConfig {
257                is_temporary: false,
258                path: Some(path),
259            } => Ok(Arc::new(
260                RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
261            )),
262        }
263    }
264}