snix_castore/directoryservice/
bigtable.rs

1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::traversal;
15use crate::{B3Digest, proto};
16
17/// There should not be more than 10 MiB in a single cell.
18/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
19const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21/// Provides a [DirectoryService] implementation using
22/// [Bigtable](https://cloud.google.com/bigtable/docs/)
23/// as an underlying K/V store.
24///
25/// # Data format
26/// We use Bigtable as a plain K/V store.
27/// The row key is the digest of the directory, in hexlower.
28/// Inside the row, we currently have a single column/cell, again using the
29/// hexlower directory digest.
30/// Its value is the Directory message, serialized in canonical protobuf.
31/// We currently only populate this column.
32///
33/// In the future, we might want to introduce "bucketing", essentially storing
34/// all directories inserted via `put_multiple_start` in a batched form.
35/// This will prevent looking up intermediate Directories, which are not
36/// directly at the root, so rely on store composition.
37#[derive(Clone)]
38pub struct BigtableDirectoryService {
39    instance_name: String,
40    client: bigtable::BigTable,
41    params: BigtableParameters,
42
43    #[cfg(test)]
44    #[allow(dead_code)]
45    /// Holds the temporary directory containing the unix socket, and the
46    /// spawned emulator process.
47    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
48}
49
50impl BigtableDirectoryService {
51    #[cfg(not(test))]
52    pub async fn connect(
53        instance_name: String,
54        params: BigtableParameters,
55    ) -> Result<Self, bigtable::Error> {
56        let connection = bigtable::BigTableConnection::new(
57            &params.project_id,
58            &params.instance_name,
59            params.is_read_only,
60            params.channel_size,
61            params.timeout,
62        )
63        .await?;
64
65        Ok(Self {
66            instance_name,
67            client: connection.client(),
68            params,
69        })
70    }
71
72    #[cfg(test)]
73    pub async fn connect(
74        instance_name: String,
75        params: BigtableParameters,
76    ) -> Result<Self, bigtable::Error> {
77        use std::time::Duration;
78
79        use async_process::{Command, Stdio};
80        use tempfile::TempDir;
81        use tokio_retry::{Retry, strategy::ExponentialBackoff};
82
83        let tmpdir = TempDir::new().unwrap();
84
85        let socket_path = tmpdir.path().join("cbtemulator.sock");
86
87        let emulator_process = Command::new("cbtemulator")
88            .arg("-address")
89            .arg(socket_path.clone())
90            .stderr(Stdio::piped())
91            .stdout(Stdio::piped())
92            .kill_on_drop(true)
93            .spawn()
94            .expect("failed to spawn emulator");
95
96        Retry::spawn(
97            ExponentialBackoff::from_millis(20)
98                .max_delay(Duration::from_secs(1))
99                .take(3),
100            || async {
101                if socket_path.exists() {
102                    Ok(())
103                } else {
104                    Err(())
105                }
106            },
107        )
108        .await
109        .expect("failed to wait for socket");
110
111        // populate the emulator
112        for cmd in &[
113            vec!["createtable", &params.table_name],
114            vec!["createfamily", &params.table_name, &params.family_name],
115        ] {
116            Command::new("cbt")
117                .args({
118                    let mut args = vec![
119                        "-instance",
120                        &params.instance_name,
121                        "-project",
122                        &params.project_id,
123                    ];
124                    args.extend_from_slice(cmd);
125                    args
126                })
127                .env(
128                    "BIGTABLE_EMULATOR_HOST",
129                    format!("unix://{}", socket_path.to_string_lossy()),
130                )
131                .output()
132                .await
133                .expect("failed to run cbt setup command");
134        }
135
136        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
137            &format!("unix://{}", socket_path.to_string_lossy()),
138            &params.project_id,
139            &params.instance_name,
140            params.is_read_only,
141            1,
142            params.timeout,
143        )?;
144
145        Ok(Self {
146            instance_name,
147            client: connection.client(),
148            params,
149            emulator: (tmpdir, emulator_process).into(),
150        })
151    }
152}
153
154/// Derives the row/column key for a given blake3 digest.
155/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
156fn derive_directory_key(digest: &B3Digest) -> String {
157    HEXLOWER.encode(digest.as_slice())
158}
159
160#[async_trait]
161impl DirectoryService for BigtableDirectoryService {
162    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
163    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
164        let mut client = self.client.clone();
165        let directory_key = derive_directory_key(digest);
166
167        let request = bigtable_v2::ReadRowsRequest {
168            app_profile_id: self.params.app_profile_id.to_string(),
169            table_name: client.get_full_table_name(&self.params.table_name),
170            rows_limit: 1,
171            rows: Some(bigtable_v2::RowSet {
172                row_keys: vec![directory_key.clone().into()],
173                row_ranges: vec![],
174            }),
175            // Filter selected family name, and column qualifier matching our digest.
176            // This is to ensure we don't fail once we start bucketing.
177            filter: Some(bigtable_v2::RowFilter {
178                filter: Some(bigtable_v2::row_filter::Filter::Chain(
179                    bigtable_v2::row_filter::Chain {
180                        filters: vec![
181                            bigtable_v2::RowFilter {
182                                filter: Some(
183                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
184                                        self.params.family_name.to_string(),
185                                    ),
186                                ),
187                            },
188                            bigtable_v2::RowFilter {
189                                filter: Some(
190                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
191                                        directory_key.clone().into(),
192                                    ),
193                                ),
194                            },
195                        ],
196                    },
197                )),
198            }),
199            ..Default::default()
200        };
201
202        let mut response = client
203            .read_rows(request)
204            .await
205            .map_err(|e| Error::BigTable {
206                msg: "reading rows",
207                source: e,
208            })?;
209
210        if response.len() != 1 {
211            if response.len() > 1 {
212                // This shouldn't happen, we limit number of rows to 1
213                Err(Error::UnexpectedDataReturned("got more than one row"))?
214            }
215            // else, this is simply a "not found".
216            return Ok(None);
217        }
218
219        let (row_key, mut row_cells) = response.pop().unwrap();
220        if row_key != directory_key.as_bytes() {
221            // This shouldn't happen, we requested this row key.
222            Err(Error::UnexpectedDataReturned("got wrong row key"))?
223        }
224
225        let row_cell = row_cells
226            .pop()
227            .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
228
229        // Ensure there's only one cell (so no more left after the pop())
230        // This shouldn't happen, We filter out other cells in our query.
231        if !row_cells.is_empty() {
232            Err(Error::UnexpectedDataReturned("got more than one cell"))?;
233        }
234
235        // We also require the qualifier to be correct in the filter above,
236        // so this shouldn't happen.
237        if directory_key.as_bytes() != row_cell.qualifier {
238            Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
239        }
240
241        // For the data in that cell, ensure the digest matches what's requested, before parsing.
242        let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
243        if got_digest != *digest {
244            Err(Error::DirectoryUnexpectedDigest)?
245        }
246
247        // Try to parse the value into a Directory message.
248        let directory_proto =
249            proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
250        let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
251
252        Ok(Some(directory))
253    }
254
255    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
256    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
257        let directory_digest = directory.digest();
258        let mut client = self.client.clone();
259        let directory_key = derive_directory_key(&directory_digest);
260
261        let data = proto::Directory::from(directory).encode_to_vec();
262        if data.len() as u64 > CELL_SIZE_LIMIT {
263            Err(Error::DirectoryTooBig)?;
264        }
265
266        let resp = client
267            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
268                table_name: client.get_full_table_name(&self.params.table_name),
269                app_profile_id: self.params.app_profile_id.to_string(),
270                authorized_view_name: "".to_string(),
271                row_key: directory_key.clone().into(),
272                predicate_filter: Some(bigtable_v2::RowFilter {
273                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
274                        directory_key.clone().into(),
275                    )),
276                }),
277                // If the column was already found, do nothing.
278                true_mutations: vec![],
279                // Else, do the insert.
280                false_mutations: vec![
281                    // https://cloud.google.com/bigtable/docs/writes
282                    bigtable_v2::Mutation {
283                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
284                            bigtable_v2::mutation::SetCell {
285                                family_name: self.params.family_name.to_string(),
286                                column_qualifier: directory_key.clone().into(),
287                                timestamp_micros: -1, // use server time to fill timestamp
288                                value: data,
289                            },
290                        )),
291                    },
292                ],
293            })
294            .await
295            .map_err(|e| Error::BigTable {
296                msg: "mutating rows",
297                source: e,
298            })?;
299
300        if resp.predicate_matched {
301            trace!("already existed")
302        }
303
304        Ok(directory_digest)
305    }
306
307    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
308    fn get_recursive(
309        &self,
310        root_directory_digest: &B3Digest,
311    ) -> BoxStream<'static, Result<Directory, super::Error>> {
312        let svc = self.clone();
313        super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
314            let svc = svc.clone();
315            async move { svc.get(&digest).await }
316        })
317        .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
318        .err_into()
319        .boxed()
320    }
321
322    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
323    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
324        Box::new(SimplePutter::new(self))
325    }
326}
327
328#[derive(thiserror::Error, Debug)]
329pub enum Error {
330    #[error("wrong arguments: {0}")]
331    WrongConfig(&'static str),
332    #[error("serde-qs error: {0}")]
333    SerdeQS(#[from] serde_qs::Error),
334
335    #[error("failed to decode protobuf: {0}")]
336    ProtobufDecode(#[from] prost::DecodeError),
337    #[error("failed to validate directory: {0}")]
338    DirectoryValidation(#[from] crate::DirectoryError),
339    #[error("Directory has unexpected digest")]
340    DirectoryUnexpectedDigest,
341    #[error("Directory exceeds cell limit on Bigtable")]
342    DirectoryTooBig,
343
344    #[error("failure during directory traversal")]
345    DirectoryTraversal(#[source] traversal::Error),
346
347    /// These should never happen, we simply double-check some of the data
348    /// returned by bigtable to actually match what we requested.
349    #[error("bigtable returned unexpected data: {0}")]
350    UnexpectedDataReturned(&'static str),
351    #[error("bigtable error occurred while {msg}: {source}")]
352    BigTable {
353        msg: &'static str,
354        #[source]
355        source: bigtable::Error,
356    },
357}
358
359/// Represents configuration of [BigtableDirectoryService].
360/// This currently conflates both connect parameters and data model/client
361/// behaviour parameters.
362#[serde_as]
363#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
364#[serde(deny_unknown_fields)]
365pub struct BigtableParameters {
366    project_id: String,
367    instance_name: String,
368    #[serde(default)]
369    is_read_only: bool,
370    #[serde(default = "default_channel_size")]
371    channel_size: usize,
372
373    #[serde_as(as = "Option<DurationSeconds<String>>")]
374    #[serde(default = "default_timeout")]
375    timeout: Option<std::time::Duration>,
376    table_name: String,
377    family_name: String,
378
379    #[serde(default = "default_app_profile_id")]
380    app_profile_id: String,
381}
382
383fn default_app_profile_id() -> String {
384    "default".to_owned()
385}
386
387fn default_channel_size() -> usize {
388    4
389}
390
391fn default_timeout() -> Option<std::time::Duration> {
392    Some(std::time::Duration::from_secs(4))
393}
394
395#[async_trait]
396impl ServiceBuilder for BigtableParameters {
397    type Output = dyn DirectoryService;
398    async fn build<'a>(
399        &'a self,
400        instance_name: &str,
401        _context: &CompositionContext,
402    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
403        Ok(Arc::new(
404            BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
405        ))
406    }
407}
408
409impl TryFrom<url::Url> for BigtableParameters {
410    type Error = Box<dyn std::error::Error + Send + Sync>;
411    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
412        // parse the instance name from the hostname.
413        let instance_name = url
414            .host_str()
415            .ok_or_else(|| Error::WrongConfig("instance name missing"))?
416            .to_owned();
417
418        // … but add it to the query string now, so we just need to parse that.
419        url.query_pairs_mut()
420            .append_pair("instance_name", &instance_name);
421
422        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
423
424        Ok(params)
425    }
426}