snix_store/pathinfoservice/
redb.rs

1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use data_encoding::BASE64;
4use futures::{StreamExt, TryStreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{ReadableDatabase, ReadableTable, TableDefinition};
7use snix_castore::composition::{CompositionContext, ServiceBuilder};
8use std::{path::PathBuf, sync::Arc};
9use tokio_stream::wrappers::ReceiverStream;
10use tonic::async_trait;
11use tracing::instrument;
12
13const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
14
15enum Db {
16    ReadOnly(redb::ReadOnlyDatabase),
17    ReadWrite(redb::Database),
18}
19
20impl Db {
21    fn begin_read(&self) -> Result<redb::ReadTransaction, redb::TransactionError> {
22        match self {
23            Db::ReadOnly(db) => db.begin_read(),
24            Db::ReadWrite(db) => db.begin_read(),
25        }
26    }
27
28    fn begin_write(&self) -> Result<redb::WriteTransaction, Error> {
29        match self {
30            Db::ReadOnly(_) => Err(Error::OpenedReadonly),
31            Db::ReadWrite(db) => Ok(db.begin_write()?),
32        }
33    }
34}
35
36/// PathInfoService implementation using redb under the hood.
37/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
38/// its corresponding protobuf-encoded PathInfo.
39#[derive(Clone)]
40pub struct RedbPathInfoService {
41    instance_name: String,
42
43    /// An Arc'ed Database, read-only or writeable.
44    db: Arc<Db>,
45}
46
47impl RedbPathInfoService {
48    /// Constructs a new instance using the specified config.
49    pub async fn new(
50        instance_name: String,
51        config: RedbPathInfoServiceConfig,
52    ) -> Result<Self, Error> {
53        if let Some(path) = config.path.clone() {
54            if &path == "" {
55                return Err(Error::WrongConfig("empty path is disallowed"));
56            }
57            if &path == "/" {
58                return Err(Error::WrongConfig("cowardly refusing to open / with redb"));
59            }
60
61            if config.read_only {
62                let db =
63                    tokio::task::spawn_blocking(|| redb::Database::builder().open_read_only(path))
64                        .await??;
65
66                return Ok(Self {
67                    instance_name,
68                    db: Arc::new(Db::ReadOnly(db)),
69                });
70            }
71
72            if let Some(parent) = path.parent() {
73                tokio::fs::create_dir_all(parent).await?;
74            }
75
76            let db = tokio::task::spawn_blocking(move || {
77                let mut builder = redb::Database::builder();
78                configure_builder(&mut builder, &config);
79
80                let db = builder.create(path)?;
81                create_schema(&db)?;
82                Ok::<_, Error>(db)
83            })
84            .await??;
85
86            Ok(Self {
87                instance_name,
88                db: Arc::new(Db::ReadWrite(db)),
89            })
90        } else {
91            Self::new_temporary(instance_name, config)
92        }
93    }
94
95    /// Constructs a new instance using the in-memory backend.
96    /// Sync, as there's no real IO happening.
97    pub fn new_temporary(
98        instance_name: String,
99        config: RedbPathInfoServiceConfig,
100    ) -> Result<Self, Error> {
101        debug_assert!(
102            config.path.is_none(),
103            "Snix bug: config.path is not None, but new_temporary requested"
104        );
105
106        if config.read_only {
107            return Err(Error::WrongConfig("in-memory database cannot be read-only"));
108        }
109
110        let mut builder = redb::Database::builder();
111        configure_builder(&mut builder, &config);
112
113        let db = builder.create_with_backend(redb::backends::InMemoryBackend::new())?;
114
115        create_schema(&db)?;
116
117        Ok(RedbPathInfoService {
118            instance_name,
119            db: Arc::new(Db::ReadWrite(db)),
120        })
121    }
122}
123
124/// Applies options from [RedbPathInfoServiceConfig] to a [redb::Builder].
125fn configure_builder(builder: &mut redb::Builder, config: &RedbPathInfoServiceConfig) {
126    if let Some(cache_size) = config.cache_size {
127        builder.set_cache_size(cache_size);
128    }
129}
130
131/// Ensures all tables are present.
132/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
133/// create it if not present.
134#[allow(clippy::result_large_err)]
135fn create_schema(db: &redb::Database) -> Result<(), Error> {
136    let txn = db.begin_write()?;
137    txn.open_table(PATHINFO_TABLE)?;
138    txn.commit()?;
139
140    Ok(())
141}
142
143#[async_trait]
144impl PathInfoService for RedbPathInfoService {
145    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
146    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
147        let db = self.db.clone();
148
149        let path_info_bytes = match tokio::task::spawn_blocking({
150            move || -> Result<_, Error> {
151                let txn = db.begin_read()?;
152                let table = txn.open_table(PATHINFO_TABLE)?;
153                Ok(table.get(digest)?)
154            }
155        })
156        .await??
157        {
158            // The PathInfo was not found, return None.
159            None => return Ok(None),
160            Some(path_info_data) => path_info_data.value(),
161        };
162
163        let pathinfo_proto =
164            proto::PathInfo::decode(path_info_bytes.as_slice()).map_err(Error::ProtobufDecode)?;
165        let path_info = PathInfo::try_from(pathinfo_proto).map_err(Error::PathInfoValidation)?;
166
167        return Ok(Some(path_info));
168    }
169
170    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
171    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
172        let db = self.db.clone();
173        tokio::task::spawn_blocking({
174            let path_info = path_info.clone();
175            move || -> Result<(), Error> {
176                let txn = db.begin_write()?;
177                {
178                    let mut table = txn.open_table(PATHINFO_TABLE)?;
179                    table.insert(
180                        *path_info.store_path.digest(),
181                        proto::PathInfo::from(path_info).encode_to_vec(),
182                    )?;
183                }
184                txn.commit()?;
185                Ok(())
186            }
187        })
188        .await??;
189
190        Ok(path_info)
191    }
192
193    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
194        let db = self.db.clone();
195        let (tx, rx) = tokio::sync::mpsc::channel(64);
196
197        tokio::task::spawn_blocking(move || {
198            // IIFE to be able to use ? for the error cases
199            let result = (|| -> Result<(), Error> {
200                let read_txn = db.begin_read()?;
201
202                let table = read_txn.open_table(PATHINFO_TABLE)?;
203
204                let table_iter = table.iter()?;
205
206                for elem in table_iter {
207                    let path_info_proto = proto::PathInfo::decode(elem?.1.value().as_slice())?;
208
209                    let path_info = PathInfo::try_from(path_info_proto)?;
210
211                    if tx.blocking_send(Ok(path_info)).is_err() {
212                        break;
213                    }
214                }
215
216                Ok(())
217            })();
218
219            if let Err(err) = result {
220                let _ = tx.blocking_send(Err(err));
221            }
222        });
223
224        ReceiverStream::new(rx).err_into().boxed()
225    }
226}
227
228#[derive(thiserror::Error, Debug)]
229pub enum Error {
230    #[error("wrong arguments: {0}")]
231    WrongConfig(&'static str),
232    #[error("serde-qs error: {0}")]
233    SerdeQS(#[from] serde_qs::Error),
234
235    #[error("failed to decode protobuf: {0}")]
236    ProtobufDecode(#[from] prost::DecodeError),
237    #[error("failed to validate PathInfo: {0}")]
238    PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
239
240    #[error("unable to open write txn, database opened read-only")]
241    OpenedReadonly,
242
243    #[error("redb commit error: {0}")]
244    RedbCommit(#[from] redb::CommitError),
245    #[error("redb database error: {0}")]
246    RedbDatabase(#[from] redb::DatabaseError),
247    #[error("redb error: {0}")]
248    Redb(#[from] redb::Error),
249    #[error("redb storage error: {0}")]
250    RedbStorage(#[from] redb::StorageError),
251    #[error("redb table error: {0}")]
252    RedbTable(#[from] redb::TableError),
253    #[error("redb txn error: {0}")]
254    RedbTransaction(#[from] redb::TransactionError),
255
256    #[error("join error: {0}")]
257    TokioJoin(#[from] tokio::task::JoinError),
258    #[error("io error: {0}")]
259    IO(#[from] std::io::Error),
260}
261
262#[derive(Clone, Default, serde::Deserialize)]
263#[serde(deny_unknown_fields)]
264pub struct RedbPathInfoServiceConfig {
265    path: Option<PathBuf>,
266
267    /// The amount of memory (in bytes) used for caching data
268    cache_size: Option<usize>,
269
270    /// Whether to open read-only.
271    #[serde(default)]
272    read_only: bool,
273}
274
275impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
276    type Error = Box<dyn std::error::Error + Send + Sync>;
277    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
278        if url.has_host() {
279            return Err(Error::WrongConfig("no host allowed").into());
280        }
281
282        let path: Option<PathBuf> = match (url.scheme(), url.has_authority(), url.path()) {
283            ("redb+memory", false, "") => None,
284            ("redb+memory", false, _) => Err(Box::new(Error::WrongConfig(
285                "redb+memory with path is disallowed",
286            )))?,
287            ("redb+memory", true, _) => Err(Box::new(Error::WrongConfig(
288                "redb+memory may not have authority",
289            )))?,
290            ("redb", _, "") => Err(Box::new(Error::WrongConfig(
291                "redb without path is disallowed, use redb+memory if you want in-memory",
292            )))?,
293            ("redb", _, path) => Some(path.into()),
294            (_scheme, _, _) => Err(Box::new(Error::WrongConfig("unrecognized scheme")))?,
295        };
296
297        let mut config: RedbPathInfoServiceConfig =
298            serde_qs::from_str(url.query().unwrap_or_default())?;
299
300        config.path = path;
301
302        Ok(config)
303    }
304}
305
306#[async_trait]
307impl ServiceBuilder for RedbPathInfoServiceConfig {
308    type Output = dyn PathInfoService;
309    async fn build<'a>(
310        &'a self,
311        instance_name: &str,
312        _context: &CompositionContext,
313    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
314        Ok(Arc::new(
315            RedbPathInfoService::new(instance_name.to_string(), self.to_owned()).await?,
316        ))
317    }
318}