snix_store/pathinfoservice/
redb.rs

1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use data_encoding::BASE64;
4use futures::{StreamExt, stream::BoxStream};
5use prost::Message;
6use redb::{Database, ReadableTable, TableDefinition};
7use snix_castore::{
8    Error,
9    composition::{CompositionContext, ServiceBuilder},
10};
11use std::{path::PathBuf, sync::Arc};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::async_trait;
14use tracing::{instrument, warn};
15
16const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
17
18/// PathInfoService implementation using redb under the hood.
19/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
20/// its corresponding protobuf-encoded PathInfo.
21pub struct RedbPathInfoService {
22    instance_name: String,
23    // We wrap db in an Arc to be able to move it into spawn_blocking,
24    // as discussed in https://github.com/cberner/redb/issues/789
25    db: Arc<Database>,
26}
27
28impl RedbPathInfoService {
29    /// Constructs a new instance using the specified file system path for
30    /// storage.
31    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
32        if path == PathBuf::from("/") {
33            return Err(Error::StorageError(
34                "cowardly refusing to open / with redb".to_string(),
35            ));
36        }
37
38        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
39            let db = redb::Database::create(path)?;
40            create_schema(&db)?;
41            Ok(db)
42        })
43        .await??;
44
45        Ok(Self {
46            instance_name,
47            db: Arc::new(db),
48        })
49    }
50
51    /// Constructs a new instance using the in-memory backend.
52    pub fn new_temporary(instance_name: String) -> Result<Self, Error> {
53        let db =
54            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
55
56        create_schema(&db)?;
57
58        Ok(Self {
59            instance_name,
60            db: Arc::new(db),
61        })
62    }
63}
64
65/// Ensures all tables are present.
66/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
67/// create it if not present.
68fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
69    let txn = db.begin_write()?;
70    txn.open_table(PATHINFO_TABLE)?;
71    txn.commit()?;
72
73    Ok(())
74}
75
76#[async_trait]
77impl PathInfoService for RedbPathInfoService {
78    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))]
79    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
80        let db = self.db.clone();
81
82        tokio::task::spawn_blocking({
83            move || {
84                let txn = db.begin_read()?;
85                let table = txn.open_table(PATHINFO_TABLE)?;
86                match table.get(digest)? {
87                    Some(pathinfo_bytes) => Ok(Some(
88                        proto::PathInfo::decode(pathinfo_bytes.value().as_slice())
89                            .map_err(|e| {
90                                warn!(err=%e, "failed to decode stored PathInfo");
91                                Error::StorageError("failed to decode stored PathInfo".to_string())
92                            })?
93                            .try_into()
94                            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
95                    )),
96                    None => Ok(None),
97                }
98            }
99        })
100        .await?
101    }
102
103    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
104    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
105        let db = self.db.clone();
106
107        tokio::task::spawn_blocking({
108            let path_info = path_info.clone();
109            move || -> Result<(), Error> {
110                let txn = db.begin_write()?;
111                {
112                    let mut table = txn.open_table(PATHINFO_TABLE)?;
113                    table
114                        .insert(
115                            *path_info.store_path.digest(),
116                            proto::PathInfo::from(path_info).encode_to_vec(),
117                        )
118                        .map_err(|e| {
119                            warn!(err=%e, "failed to insert PathInfo");
120                            Error::StorageError("failed to insert PathInfo".to_string())
121                        })?;
122                }
123                Ok(txn.commit()?)
124            }
125        })
126        .await??;
127
128        Ok(path_info)
129    }
130
131    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
132        let db = self.db.clone();
133        let (tx, rx) = tokio::sync::mpsc::channel(50);
134
135        // Spawn a blocking task which writes all PathInfos to tx.
136        tokio::task::spawn_blocking({
137            move || -> Result<(), Error> {
138                let read_txn = db.begin_read()?;
139                let table = read_txn.open_table(PATHINFO_TABLE)?;
140
141                for elem in table.iter()? {
142                    let elem = elem?;
143                    tokio::runtime::Handle::current()
144                        .block_on(tx.send(Ok({
145                            let path_info_proto = proto::PathInfo::decode(
146                                elem.1.value().as_slice(),
147                            )
148                            .map_err(|e| {
149                                warn!(err=%e, "invalid PathInfo");
150                                Error::StorageError("invalid PathInfo".to_string())
151                            })?;
152                            PathInfo::try_from(path_info_proto).map_err(|e| {
153                                Error::StorageError(format!("Invalid path info: {e}"))
154                            })?
155                        })))
156                        .map_err(|e| Error::StorageError(e.to_string()))?;
157                }
158
159                Ok(())
160            }
161        });
162
163        ReceiverStream::from(rx).boxed()
164    }
165}
166
167#[derive(serde::Deserialize)]
168#[serde(deny_unknown_fields)]
169pub struct RedbPathInfoServiceConfig {
170    is_temporary: bool,
171    #[serde(default)]
172    /// required when is_temporary = false
173    path: Option<PathBuf>,
174}
175
176impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
177    type Error = Box<dyn std::error::Error + Send + Sync>;
178    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
179        // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only)
180        if url.has_host() {
181            return Err(Error::StorageError("no host allowed".to_string()).into());
182        }
183
184        Ok(if url.path().is_empty() {
185            RedbPathInfoServiceConfig {
186                is_temporary: true,
187                path: None,
188            }
189        } else {
190            RedbPathInfoServiceConfig {
191                is_temporary: false,
192                path: Some(url.path().into()),
193            }
194        })
195    }
196}
197
198#[async_trait]
199impl ServiceBuilder for RedbPathInfoServiceConfig {
200    type Output = dyn PathInfoService;
201    async fn build<'a>(
202        &'a self,
203        instance_name: &str,
204        _context: &CompositionContext,
205    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
206        match self {
207            RedbPathInfoServiceConfig {
208                is_temporary: true,
209                path: None,
210            } => Ok(Arc::new(RedbPathInfoService::new_temporary(
211                instance_name.to_string(),
212            )?)),
213            RedbPathInfoServiceConfig {
214                is_temporary: true,
215                path: Some(_),
216            } => Err(
217                Error::StorageError("Temporary RedbPathInfoService can not have path".into())
218                    .into(),
219            ),
220            RedbPathInfoServiceConfig {
221                is_temporary: false,
222                path: None,
223            } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
224            RedbPathInfoServiceConfig {
225                is_temporary: false,
226                path: Some(path),
227            } => Ok(Arc::new(
228                RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?,
229            )),
230        }
231    }
232}