Skip to main content

snix_store/pathinfoservice/
bigtable.rs

1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use async_stream::try_stream;
4use bigtable_rs::bigtable;
5use data_encoding::HEXLOWER;
6use futures::{StreamExt, TryStreamExt, stream::BoxStream};
7use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable_v2;
8use nix_compat::nixbase32;
9use prost::Message;
10use serde::{Deserialize, Serialize};
11use serde_with::{DurationSeconds, serde_as};
12use snix_castore::composition::{CompositionContext, ServiceBuilder};
13use std::sync::Arc;
14use tonic::async_trait;
15use tracing::{Span, instrument, trace};
16
17/// There should not be more than 10 MiB in a single cell.
18/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
19const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21/// Provides a [PathInfoService] implementation using
22/// [Bigtable](https://cloud.google.com/bigtable/docs/)
23/// as an underlying K/V store.
24///
25/// # Data format
26/// We use Bigtable as a plain K/V store.
27/// The row key is the digest of the store path, in hexlower.
28/// Inside the row, we currently have a single column/cell, again using the
29/// hexlower store path digest.
30/// Its value is the PathInfo message, serialized in canonical protobuf.
31/// We currently only populate this column.
32///
33/// Listing is ranging over all rows, and calculate_nar is returning a
34/// "unimplemented" error.
35#[derive(Clone)]
36pub struct BigtablePathInfoService {
37    instance_name: String,
38    client: bigtable::BigTable,
39    params: BigtableParameters,
40
41    #[cfg(test)]
42    #[allow(dead_code)]
43    /// Holds the temporary directory containing the unix socket, and the
44    /// spawned emulator process.
45    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
46}
47
48impl BigtablePathInfoService {
49    #[cfg(not(test))]
50    pub async fn connect(
51        instance_name: String,
52        params: BigtableParameters,
53    ) -> Result<Self, bigtable::Error> {
54        let connection = bigtable::BigTableConnection::new(
55            &params.project_id,
56            &params.instance_name,
57            params.is_read_only,
58            params.channel_size,
59            params.timeout,
60        )
61        .await?;
62
63        Ok(Self {
64            instance_name,
65            client: connection.client(),
66            params,
67        })
68    }
69
70    #[cfg(test)]
71    pub async fn connect(
72        instance_name: String,
73        params: BigtableParameters,
74    ) -> Result<Self, bigtable::Error> {
75        use std::time::Duration;
76
77        use async_process::{Command, Stdio};
78        use tempfile::TempDir;
79        use tokio_retry::{Retry, strategy::ExponentialBackoff};
80
81        let tmpdir = TempDir::new().unwrap();
82
83        let socket_path = tmpdir.path().join("cbtemulator.sock");
84
85        let emulator_process = Command::new("cbtemulator")
86            .arg("-address")
87            .arg(socket_path.clone())
88            .stderr(Stdio::piped())
89            .stdout(Stdio::piped())
90            .kill_on_drop(true)
91            .spawn()
92            .expect("failed to spawn emulator");
93
94        Retry::spawn(
95            ExponentialBackoff::from_millis(20)
96                .max_delay(Duration::from_secs(1))
97                .take(3),
98            || async {
99                if socket_path.exists() {
100                    Ok(())
101                } else {
102                    Err(())
103                }
104            },
105        )
106        .await
107        .expect("failed to wait for socket");
108
109        // populate the emulator
110        for cmd in &[
111            vec!["createtable", &params.table_name],
112            vec!["createfamily", &params.table_name, &params.family_name],
113        ] {
114            Command::new("cbt")
115                .args({
116                    let mut args = vec![
117                        "-instance",
118                        &params.instance_name,
119                        "-project",
120                        &params.project_id,
121                    ];
122                    args.extend_from_slice(cmd);
123                    args
124                })
125                .env(
126                    "BIGTABLE_EMULATOR_HOST",
127                    format!("unix://{}", socket_path.to_string_lossy()),
128                )
129                .output()
130                .await
131                .expect("failed to run cbt setup command");
132        }
133
134        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
135            &format!("unix://{}", socket_path.to_string_lossy()),
136            &params.project_id,
137            &params.instance_name,
138            false,
139            1,
140            None,
141        )?;
142
143        Ok(Self {
144            instance_name: instance_name.to_string(),
145            client: connection.client(),
146            params,
147            emulator: (tmpdir, emulator_process).into(),
148        })
149    }
150}
151
152/// Derives the row/column key for a given output path.
153/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
154fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
155    HEXLOWER.encode(digest)
156}
157
158#[async_trait]
159impl PathInfoService for BigtablePathInfoService {
160    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
161    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
162        let mut client = self.client.clone();
163        let path_info_key = derive_pathinfo_key(&digest);
164
165        let request = bigtable_v2::ReadRowsRequest {
166            app_profile_id: self.params.app_profile_id.to_string(),
167            table_name: client.get_full_table_name(&self.params.table_name),
168            rows_limit: 1,
169            rows: Some(bigtable_v2::RowSet {
170                row_keys: vec![path_info_key.clone().into()],
171                row_ranges: vec![],
172            }),
173            // Filter selected family name, and column qualifier matching the digest.
174            // The latter is to ensure we don't fail once we start adding more metadata.
175            filter: Some(bigtable_v2::RowFilter {
176                filter: Some(bigtable_v2::row_filter::Filter::Chain(
177                    bigtable_v2::row_filter::Chain {
178                        filters: vec![
179                            bigtable_v2::RowFilter {
180                                filter: Some(
181                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
182                                        self.params.family_name.to_string(),
183                                    ),
184                                ),
185                            },
186                            bigtable_v2::RowFilter {
187                                filter: Some(
188                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
189                                        path_info_key.clone().into(),
190                                    ),
191                                ),
192                            },
193                        ],
194                    },
195                )),
196            }),
197            ..Default::default()
198        };
199
200        let mut response = client
201            .read_rows(request)
202            .await
203            .map_err(|e| Error::BigTable {
204                msg: "reading rows",
205                source: e,
206            })?;
207
208        if response.len() != 1 {
209            if response.len() > 1 {
210                // This shouldn't happen, we limit number of rows to 1
211                Err(Error::UnexpectedDataReturned("got more than one row"))?
212            }
213            // else, this is simply a "not found".
214            return Ok(None);
215        }
216
217        let (row_key, mut cells) = response.pop().unwrap();
218        if row_key != path_info_key.as_bytes() {
219            // This shouldn't happen, we requested this row key.
220            Err(Error::UnexpectedDataReturned("got wrong row key"))?
221        }
222
223        let cell = cells
224            .pop()
225            .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
226
227        // Ensure there's only one cell (so no more left after the pop())
228        // This shouldn't happen, We filter out other cells in our query.
229        if !cells.is_empty() {
230            Err(Error::UnexpectedDataReturned("got more than one cell"))?
231        }
232
233        // We also require the qualifier to be correct in the filter above,
234        // so this shouldn't happen.
235        if path_info_key.as_bytes() != cell.qualifier {
236            Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
237        }
238
239        // Try to parse the value into a PathInfo message
240        let path_info_proto =
241            proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
242
243        let path_info = PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
244
245        if path_info.store_path.digest() != &digest {
246            Err(Error::PathInfoUnexpectedDigest)?;
247        }
248
249        Ok(Some(path_info))
250    }
251
252    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
253    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
254        let mut client = self.client.clone();
255        let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
256
257        let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
258        if data.len() as u64 > CELL_SIZE_LIMIT {
259            Err(Error::PathInfoTooBig)?;
260        }
261
262        let resp = client
263            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
264                table_name: client.get_full_table_name(&self.params.table_name),
265                app_profile_id: self.params.app_profile_id.to_string(),
266                authorized_view_name: "".to_string(),
267                row_key: path_info_key.clone().into(),
268                predicate_filter: Some(bigtable_v2::RowFilter {
269                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
270                        path_info_key.clone().into(),
271                    )),
272                }),
273                // If the column was already found, do nothing.
274                true_mutations: vec![],
275                // Else, do the insert.
276                false_mutations: vec![
277                    // https://cloud.google.com/bigtable/docs/writes
278                    bigtable_v2::Mutation {
279                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
280                            bigtable_v2::mutation::SetCell {
281                                family_name: self.params.family_name.to_string(),
282                                column_qualifier: path_info_key.clone().into(),
283                                timestamp_micros: -1, // use server time to fill timestamp
284                                value: data,
285                            },
286                        )),
287                    },
288                ],
289            })
290            .await
291            .map_err(|e| Error::BigTable {
292                msg: "mutating rows",
293                source: e,
294            })?;
295
296        if resp.predicate_matched {
297            trace!("already existed")
298        }
299
300        Ok(path_info)
301    }
302
303    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
304        let mut client = self.client.clone();
305
306        let request = bigtable_v2::ReadRowsRequest {
307            app_profile_id: self.params.app_profile_id.to_string(),
308            table_name: client.get_full_table_name(&self.params.table_name),
309            filter: Some(bigtable_v2::RowFilter {
310                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
311                    self.params.family_name.to_string(),
312                )),
313            }),
314            ..Default::default()
315        };
316
317        try_stream! {
318            let mut rows = client
319                .stream_rows(request)
320                .await
321                .map_err(|e| Error::BigTable {
322                    msg: "send stream rows request",
323                    source: e,
324                })?;
325
326            while let Some((row_key, mut cells)) =
327                rows.try_next().await.map_err(|e| Error::BigTable {
328                    msg: "stream rows",
329                    source: e,
330                })?
331            {
332                let span = Span::current();
333                span.record("row.key", bstr::BStr::new(&row_key).to_string());
334
335                let cell = cells
336                    .pop()
337                    .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
338
339                // Ensure there's only one cell (so no more left after the pop())
340                // This shouldn't happen, We filter out other cells in our query.
341                if !cells.is_empty() {
342                    Err(Error::UnexpectedDataReturned("got more than one cell"))?
343                }
344
345                // The cell must have the same qualifier as the row key
346                if row_key != cell.qualifier {
347                    Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
348                }
349
350                // Try to parse the value into a PathInfo message.
351                let path_info_proto =
352                    proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
353
354                let path_info =
355                    PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
356
357                let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
358
359                if exp_path_info_key.as_bytes() != row_key.as_slice() {
360                    Err(Error::PathInfoUnexpectedDigest)?;
361                }
362
363                yield path_info
364            }
365        }
366        .boxed()
367    }
368}
369
370#[derive(thiserror::Error, Debug)]
371pub enum Error {
372    #[error("wrong arguments: {0}")]
373    WrongConfig(&'static str),
374    #[error("serde-qs error: {0}")]
375    SerdeQS(#[from] serde_qs::Error),
376
377    #[error("failed to decode protobuf: {0}")]
378    ProtobufDecode(#[from] prost::DecodeError),
379    #[error("failed to validate PathInfo: {0}")]
380    PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
381    #[error("PathInfo has unexpected digest")]
382    PathInfoUnexpectedDigest,
383    #[error("PathInfo exceeds cell limit on Bigtable")]
384    PathInfoTooBig,
385
386    /// These should never happen, we simply double-check some of the data
387    /// returned by bigtable to actually match what we requested.
388    #[error("bigtable returned unexpected data: {0}")]
389    UnexpectedDataReturned(&'static str),
390    #[error("bigtable error occurred while {msg}: {source}")]
391    BigTable {
392        msg: &'static str,
393        #[source]
394        source: bigtable::Error,
395    },
396}
397
398/// Represents configuration of [BigtablePathInfoService].
399/// This currently conflates both connect parameters and data model/client
400/// behaviour parameters.
401#[serde_as]
402#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
403pub struct BigtableParameters {
404    project_id: String,
405    instance_name: String,
406    #[serde(default)]
407    is_read_only: bool,
408    #[serde(default = "default_channel_size")]
409    channel_size: usize,
410
411    #[serde_as(as = "Option<DurationSeconds<String>>")]
412    #[serde(default = "default_timeout")]
413    timeout: Option<std::time::Duration>,
414    table_name: String,
415    family_name: String,
416
417    #[serde(default = "default_app_profile_id")]
418    app_profile_id: String,
419}
420
421fn default_app_profile_id() -> String {
422    "default".to_owned()
423}
424
425fn default_channel_size() -> usize {
426    4
427}
428
429fn default_timeout() -> Option<std::time::Duration> {
430    Some(std::time::Duration::from_secs(4))
431}
432
433impl BigtableParameters {
434    #[cfg(test)]
435    pub fn default_for_tests() -> Self {
436        Self {
437            project_id: "project-1".into(),
438            instance_name: "instance-1".into(),
439            is_read_only: false,
440            channel_size: default_channel_size(),
441            timeout: default_timeout(),
442            table_name: "table-1".into(),
443            family_name: "cf1".into(),
444            app_profile_id: default_app_profile_id(),
445        }
446    }
447}
448
449#[async_trait]
450impl ServiceBuilder for BigtableParameters {
451    type Output = dyn PathInfoService;
452    async fn build<'a>(
453        &'a self,
454        instance_name: &str,
455        _context: &CompositionContext,
456    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
457        Ok(Arc::new(
458            BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
459        ))
460    }
461}
462
463impl TryFrom<url::Url> for BigtableParameters {
464    type Error = Box<dyn std::error::Error + Send + Sync>;
465    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
466        // parse the instance name from the hostname.
467        let instance_name = url
468            .host_str()
469            .ok_or_else(|| Error::WrongConfig("instance name missing"))?
470            .to_owned();
471
472        // … but add it to the query string now, so we just need to parse that.
473        url.query_pairs_mut()
474            .append_pair("instance_name", &instance_name);
475
476        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
477
478        Ok(params)
479    }
480}