snix_castore/directoryservice/
bigtable.rs

1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::traversal;
15use crate::{B3Digest, proto};
16
17/// There should not be more than 10 MiB in a single cell.
18/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
19const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21/// Provides a [DirectoryService] implementation using
22/// [Bigtable](https://cloud.google.com/bigtable/docs/)
23/// as an underlying K/V store.
24///
25/// # Data format
26/// We use Bigtable as a plain K/V store.
27/// The row key is the digest of the directory, in hexlower.
28/// Inside the row, we currently have a single column/cell, again using the
29/// hexlower directory digest.
30/// Its value is the Directory message, serialized in canonical protobuf.
31/// We currently only populate this column.
32///
33/// In the future, we might want to introduce "bucketing", essentially storing
34/// all directories inserted via `put_multiple_start` in a batched form.
35/// This will prevent looking up intermediate Directories, which are not
36/// directly at the root, so rely on store composition.
37#[derive(Clone)]
38pub struct BigtableDirectoryService {
39    instance_name: String,
40    client: bigtable::BigTable,
41    params: BigtableParameters,
42
43    #[cfg(test)]
44    #[allow(dead_code)]
45    /// Holds the temporary directory containing the unix socket, and the
46    /// spawned emulator process.
47    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
48}
49
50impl BigtableDirectoryService {
51    #[cfg(not(test))]
52    pub async fn connect(
53        instance_name: String,
54        params: BigtableParameters,
55    ) -> Result<Self, bigtable::Error> {
56        let connection = bigtable::BigTableConnection::new(
57            &params.project_id,
58            &params.instance_name,
59            params.is_read_only,
60            params.channel_size,
61            params.timeout,
62        )
63        .await?;
64
65        Ok(Self {
66            instance_name,
67            client: connection.client(),
68            params,
69        })
70    }
71
72    #[cfg(test)]
73    pub async fn connect(
74        instance_name: String,
75        params: BigtableParameters,
76    ) -> Result<Self, bigtable::Error> {
77        use std::time::Duration;
78
79        use async_process::{Command, Stdio};
80        use tempfile::TempDir;
81        use tokio_retry::{Retry, strategy::ExponentialBackoff};
82
83        let tmpdir = TempDir::new().unwrap();
84
85        let socket_path = tmpdir.path().join("cbtemulator.sock");
86
87        let emulator_process = Command::new("cbtemulator")
88            .arg("-address")
89            .arg(socket_path.clone())
90            .stderr(Stdio::piped())
91            .stdout(Stdio::piped())
92            .kill_on_drop(true)
93            .spawn()
94            .expect("failed to spawn emulator");
95
96        Retry::spawn(
97            ExponentialBackoff::from_millis(20)
98                .max_delay(Duration::from_secs(1))
99                .take(3),
100            || async {
101                if socket_path.exists() {
102                    Ok(())
103                } else {
104                    Err(())
105                }
106            },
107        )
108        .await
109        .expect("failed to wait for socket");
110
111        // populate the emulator
112        for cmd in &[
113            vec!["createtable", &params.table_name],
114            vec!["createfamily", &params.table_name, &params.family_name],
115        ] {
116            Command::new("cbt")
117                .args({
118                    let mut args = vec![
119                        "-instance",
120                        &params.instance_name,
121                        "-project",
122                        &params.project_id,
123                    ];
124                    args.extend_from_slice(cmd);
125                    args
126                })
127                .env(
128                    "BIGTABLE_EMULATOR_HOST",
129                    format!("unix://{}", socket_path.to_string_lossy()),
130                )
131                .output()
132                .await
133                .expect("failed to run cbt setup command");
134        }
135
136        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
137            &format!("unix://{}", socket_path.to_string_lossy()),
138            &params.project_id,
139            &params.instance_name,
140            params.is_read_only,
141            params.timeout,
142        )?;
143
144        Ok(Self {
145            instance_name,
146            client: connection.client(),
147            params,
148            emulator: (tmpdir, emulator_process).into(),
149        })
150    }
151}
152
153/// Derives the row/column key for a given blake3 digest.
154/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
155fn derive_directory_key(digest: &B3Digest) -> String {
156    HEXLOWER.encode(digest.as_slice())
157}
158
159#[async_trait]
160impl DirectoryService for BigtableDirectoryService {
161    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
162    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
163        let mut client = self.client.clone();
164        let directory_key = derive_directory_key(digest);
165
166        let request = bigtable_v2::ReadRowsRequest {
167            app_profile_id: self.params.app_profile_id.to_string(),
168            table_name: client.get_full_table_name(&self.params.table_name),
169            rows_limit: 1,
170            rows: Some(bigtable_v2::RowSet {
171                row_keys: vec![directory_key.clone().into()],
172                row_ranges: vec![],
173            }),
174            // Filter selected family name, and column qualifier matching our digest.
175            // This is to ensure we don't fail once we start bucketing.
176            filter: Some(bigtable_v2::RowFilter {
177                filter: Some(bigtable_v2::row_filter::Filter::Chain(
178                    bigtable_v2::row_filter::Chain {
179                        filters: vec![
180                            bigtable_v2::RowFilter {
181                                filter: Some(
182                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
183                                        self.params.family_name.to_string(),
184                                    ),
185                                ),
186                            },
187                            bigtable_v2::RowFilter {
188                                filter: Some(
189                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
190                                        directory_key.clone().into(),
191                                    ),
192                                ),
193                            },
194                        ],
195                    },
196                )),
197            }),
198            ..Default::default()
199        };
200
201        let mut response = client
202            .read_rows(request)
203            .await
204            .map_err(|e| Error::BigTable {
205                msg: "reading rows",
206                source: e,
207            })?;
208
209        if response.len() != 1 {
210            if response.len() > 1 {
211                // This shouldn't happen, we limit number of rows to 1
212                Err(Error::UnexpectedDataReturned("got more than one row"))?
213            }
214            // else, this is simply a "not found".
215            return Ok(None);
216        }
217
218        let (row_key, mut row_cells) = response.pop().unwrap();
219        if row_key != directory_key.as_bytes() {
220            // This shouldn't happen, we requested this row key.
221            Err(Error::UnexpectedDataReturned("got wrong row key"))?
222        }
223
224        let row_cell = row_cells
225            .pop()
226            .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
227
228        // Ensure there's only one cell (so no more left after the pop())
229        // This shouldn't happen, We filter out other cells in our query.
230        if !row_cells.is_empty() {
231            Err(Error::UnexpectedDataReturned("got more than one cell"))?;
232        }
233
234        // We also require the qualifier to be correct in the filter above,
235        // so this shouldn't happen.
236        if directory_key.as_bytes() != row_cell.qualifier {
237            Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
238        }
239
240        // For the data in that cell, ensure the digest matches what's requested, before parsing.
241        let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
242        if got_digest != *digest {
243            Err(Error::DirectoryUnexpectedDigest)?
244        }
245
246        // Try to parse the value into a Directory message.
247        let directory_proto =
248            proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
249        let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
250
251        Ok(Some(directory))
252    }
253
254    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
255    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
256        let directory_digest = directory.digest();
257        let mut client = self.client.clone();
258        let directory_key = derive_directory_key(&directory_digest);
259
260        let data = proto::Directory::from(directory).encode_to_vec();
261        if data.len() as u64 > CELL_SIZE_LIMIT {
262            Err(Error::DirectoryTooBig)?;
263        }
264
265        let resp = client
266            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
267                table_name: client.get_full_table_name(&self.params.table_name),
268                app_profile_id: self.params.app_profile_id.to_string(),
269                authorized_view_name: "".to_string(),
270                row_key: directory_key.clone().into(),
271                predicate_filter: Some(bigtable_v2::RowFilter {
272                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
273                        directory_key.clone().into(),
274                    )),
275                }),
276                // If the column was already found, do nothing.
277                true_mutations: vec![],
278                // Else, do the insert.
279                false_mutations: vec![
280                    // https://cloud.google.com/bigtable/docs/writes
281                    bigtable_v2::Mutation {
282                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
283                            bigtable_v2::mutation::SetCell {
284                                family_name: self.params.family_name.to_string(),
285                                column_qualifier: directory_key.clone().into(),
286                                timestamp_micros: -1, // use server time to fill timestamp
287                                value: data,
288                            },
289                        )),
290                    },
291                ],
292            })
293            .await
294            .map_err(|e| Error::BigTable {
295                msg: "mutating rows",
296                source: e,
297            })?;
298
299        if resp.predicate_matched {
300            trace!("already existed")
301        }
302
303        Ok(directory_digest)
304    }
305
306    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
307    fn get_recursive(
308        &self,
309        root_directory_digest: &B3Digest,
310    ) -> BoxStream<'static, Result<Directory, super::Error>> {
311        let svc = self.clone();
312        super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
313            let svc = svc.clone();
314            async move { svc.get(&digest).await }
315        })
316        .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
317        .err_into()
318        .boxed()
319    }
320
321    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
322    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
323        Box::new(SimplePutter::new(self))
324    }
325}
326
327#[derive(thiserror::Error, Debug)]
328pub enum Error {
329    #[error("wrong arguments: {0}")]
330    WrongConfig(&'static str),
331    #[error("serde-qs error: {0}")]
332    SerdeQS(#[from] serde_qs::Error),
333
334    #[error("failed to decode protobuf: {0}")]
335    ProtobufDecode(#[from] prost::DecodeError),
336    #[error("failed to validate directory: {0}")]
337    DirectoryValidation(#[from] crate::DirectoryError),
338    #[error("Directory has unexpected digest")]
339    DirectoryUnexpectedDigest,
340    #[error("Directory exceeds cell limit on Bigtable")]
341    DirectoryTooBig,
342
343    #[error("failure during directory traversal")]
344    DirectoryTraversal(#[source] traversal::Error),
345
346    /// These should never happen, we simply double-check some of the data
347    /// returned by bigtable to actually match what we requested.
348    #[error("bigtable returned unexpected data: {0}")]
349    UnexpectedDataReturned(&'static str),
350    #[error("bigtable error occured while {msg}: {source}")]
351    BigTable {
352        msg: &'static str,
353        #[source]
354        source: bigtable::Error,
355    },
356}
357
358/// Represents configuration of [BigtableDirectoryService].
359/// This currently conflates both connect parameters and data model/client
360/// behaviour parameters.
361#[serde_as]
362#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
363#[serde(deny_unknown_fields)]
364pub struct BigtableParameters {
365    project_id: String,
366    instance_name: String,
367    #[serde(default)]
368    is_read_only: bool,
369    #[serde(default = "default_channel_size")]
370    channel_size: usize,
371
372    #[serde_as(as = "Option<DurationSeconds<String>>")]
373    #[serde(default = "default_timeout")]
374    timeout: Option<std::time::Duration>,
375    table_name: String,
376    family_name: String,
377
378    #[serde(default = "default_app_profile_id")]
379    app_profile_id: String,
380}
381
382fn default_app_profile_id() -> String {
383    "default".to_owned()
384}
385
386fn default_channel_size() -> usize {
387    4
388}
389
390fn default_timeout() -> Option<std::time::Duration> {
391    Some(std::time::Duration::from_secs(4))
392}
393
394#[async_trait]
395impl ServiceBuilder for BigtableParameters {
396    type Output = dyn DirectoryService;
397    async fn build<'a>(
398        &'a self,
399        instance_name: &str,
400        _context: &CompositionContext,
401    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
402        Ok(Arc::new(
403            BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
404        ))
405    }
406}
407
408impl TryFrom<url::Url> for BigtableParameters {
409    type Error = Box<dyn std::error::Error + Send + Sync>;
410    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
411        // parse the instance name from the hostname.
412        let instance_name = url
413            .host_str()
414            .ok_or_else(|| Error::WrongConfig("instance name missing"))?
415            .to_owned();
416
417        // … but add it to the query string now, so we just need to parse that.
418        url.query_pairs_mut()
419            .append_pair("instance_name", &instance_name);
420
421        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
422
423        Ok(params)
424    }
425}