Skip to main content

snix_castore/directoryservice/
bigtable.rs

1use bigtable_rs::bigtable;
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable_v2;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use serde_with::{DurationSeconds, serde_as};
9use std::sync::Arc;
10use tonic::async_trait;
11use tracing::{instrument, trace, warn};
12
13use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
14use crate::composition::{CompositionContext, ServiceBuilder};
15use crate::directoryservice::traversal;
16use crate::{B3Digest, proto};
17
18/// There should not be more than 10 MiB in a single cell.
19/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
20const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22/// Provides a [DirectoryService] implementation using
23/// [Bigtable](https://cloud.google.com/bigtable/docs/)
24/// as an underlying K/V store.
25///
26/// # Data format
27/// We use Bigtable as a plain K/V store.
28/// The row key is the digest of the directory, in hexlower.
29/// Inside the row, we currently have a single column/cell, again using the
30/// hexlower directory digest.
31/// Its value is the Directory message, serialized in canonical protobuf.
32/// We currently only populate this column.
33///
34/// In the future, we might want to introduce "bucketing", essentially storing
35/// all directories inserted via `put_multiple_start` in a batched form.
36/// This will prevent looking up intermediate Directories, which are not
37/// directly at the root, so rely on store composition.
38#[derive(Clone)]
39pub struct BigtableDirectoryService {
40    instance_name: String,
41    client: bigtable::BigTable,
42    params: BigtableParameters,
43
44    #[cfg(test)]
45    #[allow(dead_code)]
46    /// Holds the temporary directory containing the unix socket, and the
47    /// spawned emulator process.
48    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
49}
50
51impl BigtableDirectoryService {
52    #[cfg(not(test))]
53    pub async fn connect(
54        instance_name: String,
55        params: BigtableParameters,
56    ) -> Result<Self, bigtable::Error> {
57        let connection = bigtable::BigTableConnection::new(
58            &params.project_id,
59            &params.instance_name,
60            params.is_read_only,
61            params.channel_size,
62            params.timeout,
63        )
64        .await?;
65
66        Ok(Self {
67            instance_name,
68            client: connection.client(),
69            params,
70        })
71    }
72
73    #[cfg(test)]
74    pub async fn connect(
75        instance_name: String,
76        params: BigtableParameters,
77    ) -> Result<Self, bigtable::Error> {
78        use std::time::Duration;
79
80        use async_process::{Command, Stdio};
81        use tempfile::TempDir;
82        use tokio_retry::{Retry, strategy::ExponentialBackoff};
83
84        let tmpdir = TempDir::new().unwrap();
85
86        let socket_path = tmpdir.path().join("cbtemulator.sock");
87
88        let emulator_process = Command::new("cbtemulator")
89            .arg("-address")
90            .arg(socket_path.clone())
91            .stderr(Stdio::piped())
92            .stdout(Stdio::piped())
93            .kill_on_drop(true)
94            .spawn()
95            .expect("failed to spawn emulator");
96
97        Retry::spawn(
98            ExponentialBackoff::from_millis(20)
99                .max_delay(Duration::from_secs(1))
100                .take(3),
101            || async {
102                if socket_path.exists() {
103                    Ok(())
104                } else {
105                    Err(())
106                }
107            },
108        )
109        .await
110        .expect("failed to wait for socket");
111
112        // populate the emulator
113        for cmd in &[
114            vec!["createtable", &params.table_name],
115            vec!["createfamily", &params.table_name, &params.family_name],
116        ] {
117            Command::new("cbt")
118                .args({
119                    let mut args = vec![
120                        "-instance",
121                        &params.instance_name,
122                        "-project",
123                        &params.project_id,
124                    ];
125                    args.extend_from_slice(cmd);
126                    args
127                })
128                .env(
129                    "BIGTABLE_EMULATOR_HOST",
130                    format!("unix://{}", socket_path.to_string_lossy()),
131                )
132                .output()
133                .await
134                .expect("failed to run cbt setup command");
135        }
136
137        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
138            &format!("unix://{}", socket_path.to_string_lossy()),
139            &params.project_id,
140            &params.instance_name,
141            params.is_read_only,
142            1,
143            params.timeout,
144        )?;
145
146        Ok(Self {
147            instance_name,
148            client: connection.client(),
149            params,
150            emulator: (tmpdir, emulator_process).into(),
151        })
152    }
153}
154
155/// Derives the row/column key for a given blake3 digest.
156/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
157fn derive_directory_key(digest: &B3Digest) -> String {
158    HEXLOWER.encode(digest.as_slice())
159}
160
161#[async_trait]
162impl DirectoryService for BigtableDirectoryService {
163    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
164    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
165        let mut client = self.client.clone();
166        let directory_key = derive_directory_key(digest);
167
168        let request = bigtable_v2::ReadRowsRequest {
169            app_profile_id: self.params.app_profile_id.to_string(),
170            table_name: client.get_full_table_name(&self.params.table_name),
171            rows_limit: 1,
172            rows: Some(bigtable_v2::RowSet {
173                row_keys: vec![directory_key.clone().into()],
174                row_ranges: vec![],
175            }),
176            // Filter selected family name, and column qualifier matching our digest.
177            // This is to ensure we don't fail once we start bucketing.
178            filter: Some(bigtable_v2::RowFilter {
179                filter: Some(bigtable_v2::row_filter::Filter::Chain(
180                    bigtable_v2::row_filter::Chain {
181                        filters: vec![
182                            bigtable_v2::RowFilter {
183                                filter: Some(
184                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
185                                        self.params.family_name.to_string(),
186                                    ),
187                                ),
188                            },
189                            bigtable_v2::RowFilter {
190                                filter: Some(
191                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
192                                        directory_key.clone().into(),
193                                    ),
194                                ),
195                            },
196                        ],
197                    },
198                )),
199            }),
200            ..Default::default()
201        };
202
203        let mut response = client
204            .read_rows(request)
205            .await
206            .map_err(|e| Error::BigTable {
207                msg: "reading rows",
208                source: e,
209            })?;
210
211        if response.len() != 1 {
212            if response.len() > 1 {
213                // This shouldn't happen, we limit number of rows to 1
214                Err(Error::UnexpectedDataReturned("got more than one row"))?
215            }
216            // else, this is simply a "not found".
217            return Ok(None);
218        }
219
220        let (row_key, mut row_cells) = response.pop().unwrap();
221        if row_key != directory_key.as_bytes() {
222            // This shouldn't happen, we requested this row key.
223            Err(Error::UnexpectedDataReturned("got wrong row key"))?
224        }
225
226        let row_cell = row_cells
227            .pop()
228            .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
229
230        // Ensure there's only one cell (so no more left after the pop())
231        // This shouldn't happen, We filter out other cells in our query.
232        if !row_cells.is_empty() {
233            Err(Error::UnexpectedDataReturned("got more than one cell"))?;
234        }
235
236        // We also require the qualifier to be correct in the filter above,
237        // so this shouldn't happen.
238        if directory_key.as_bytes() != row_cell.qualifier {
239            Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
240        }
241
242        // For the data in that cell, ensure the digest matches what's requested, before parsing.
243        let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
244        if got_digest != *digest {
245            Err(Error::DirectoryUnexpectedDigest)?
246        }
247
248        // Try to parse the value into a Directory message.
249        let directory_proto =
250            proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
251        let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
252
253        Ok(Some(directory))
254    }
255
256    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
257    async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
258        let directory_digest = directory.digest();
259        let mut client = self.client.clone();
260        let directory_key = derive_directory_key(&directory_digest);
261
262        let data = proto::Directory::from(directory).encode_to_vec();
263        if data.len() as u64 > CELL_SIZE_LIMIT {
264            Err(Error::DirectoryTooBig)?;
265        }
266
267        let resp = client
268            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
269                table_name: client.get_full_table_name(&self.params.table_name),
270                app_profile_id: self.params.app_profile_id.to_string(),
271                authorized_view_name: "".to_string(),
272                row_key: directory_key.clone().into(),
273                predicate_filter: Some(bigtable_v2::RowFilter {
274                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
275                        directory_key.clone().into(),
276                    )),
277                }),
278                // If the column was already found, do nothing.
279                true_mutations: vec![],
280                // Else, do the insert.
281                false_mutations: vec![
282                    // https://cloud.google.com/bigtable/docs/writes
283                    bigtable_v2::Mutation {
284                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
285                            bigtable_v2::mutation::SetCell {
286                                family_name: self.params.family_name.to_string(),
287                                column_qualifier: directory_key.clone().into(),
288                                timestamp_micros: -1, // use server time to fill timestamp
289                                value: data,
290                            },
291                        )),
292                    },
293                ],
294            })
295            .await
296            .map_err(|e| Error::BigTable {
297                msg: "mutating rows",
298                source: e,
299            })?;
300
301        if resp.predicate_matched {
302            trace!("already existed")
303        }
304
305        Ok(directory_digest)
306    }
307
308    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
309    fn get_recursive(
310        &self,
311        root_directory_digest: &B3Digest,
312    ) -> BoxStream<'static, Result<Directory, super::Error>> {
313        let svc = self.clone();
314        super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
315            let svc = svc.clone();
316            async move { svc.get(&digest).await }
317        })
318        .map_err(Error::DirectoryTraversal)
319        .err_into()
320        .boxed()
321    }
322
323    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
324    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
325        Box::new(SimplePutter::new(self))
326    }
327}
328
329#[derive(thiserror::Error, Debug)]
330pub enum Error {
331    #[error("wrong arguments: {0}")]
332    WrongConfig(&'static str),
333    #[error("serde-qs error: {0}")]
334    SerdeQS(#[from] serde_qs::Error),
335
336    #[error("failed to decode protobuf: {0}")]
337    ProtobufDecode(#[from] prost::DecodeError),
338    #[error("failed to validate directory: {0}")]
339    DirectoryValidation(#[from] crate::DirectoryError),
340    #[error("Directory has unexpected digest")]
341    DirectoryUnexpectedDigest,
342    #[error("Directory exceeds cell limit on Bigtable")]
343    DirectoryTooBig,
344
345    #[error("failure during directory traversal")]
346    DirectoryTraversal(#[source] traversal::Error),
347
348    /// These should never happen, we simply double-check some of the data
349    /// returned by bigtable to actually match what we requested.
350    #[error("bigtable returned unexpected data: {0}")]
351    UnexpectedDataReturned(&'static str),
352    #[error("bigtable error occurred while {msg}: {source}")]
353    BigTable {
354        msg: &'static str,
355        #[source]
356        source: bigtable::Error,
357    },
358}
359
360impl From<Error> for super::Error {
361    fn from(value: Error) -> Self {
362        Self(Box::new(value))
363    }
364}
365
366/// Represents configuration of [BigtableDirectoryService].
367/// This currently conflates both connect parameters and data model/client
368/// behaviour parameters.
369#[serde_as]
370#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
371#[serde(deny_unknown_fields)]
372pub struct BigtableParameters {
373    project_id: String,
374    instance_name: String,
375    #[serde(default)]
376    is_read_only: bool,
377    #[serde(default = "default_channel_size")]
378    channel_size: usize,
379
380    #[serde_as(as = "Option<DurationSeconds<String>>")]
381    #[serde(default = "default_timeout")]
382    timeout: Option<std::time::Duration>,
383    table_name: String,
384    family_name: String,
385
386    #[serde(default = "default_app_profile_id")]
387    app_profile_id: String,
388}
389
390fn default_app_profile_id() -> String {
391    "default".to_owned()
392}
393
394fn default_channel_size() -> usize {
395    4
396}
397
398fn default_timeout() -> Option<std::time::Duration> {
399    Some(std::time::Duration::from_secs(4))
400}
401
402#[async_trait]
403impl ServiceBuilder for BigtableParameters {
404    type Output = dyn DirectoryService;
405    async fn build<'a>(
406        &'a self,
407        instance_name: &str,
408        _context: &CompositionContext,
409    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
410        Ok(Arc::new(
411            BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
412        ))
413    }
414}
415
416impl TryFrom<url::Url> for BigtableParameters {
417    type Error = Box<dyn std::error::Error + Send + Sync>;
418    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
419        // parse the instance name from the hostname.
420        let instance_name = url
421            .host_str()
422            .ok_or_else(|| Error::WrongConfig("instance name missing"))?
423            .to_owned();
424
425        // … but add it to the query string now, so we just need to parse that.
426        url.query_pairs_mut()
427            .append_pair("instance_name", &instance_name);
428
429        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
430
431        Ok(params)
432    }
433}