snix_store/pathinfoservice/
bigtable.rs

1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use async_stream::try_stream;
4use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
5use data_encoding::HEXLOWER;
6use futures::{StreamExt, TryStreamExt, stream::BoxStream};
7use nix_compat::nixbase32;
8use prost::Message;
9use serde::{Deserialize, Serialize};
10use serde_with::{DurationSeconds, serde_as};
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12use std::sync::Arc;
13use tonic::async_trait;
14use tracing::{Span, instrument, trace};
15
16/// There should not be more than 10 MiB in a single cell.
17/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
18const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
19
20/// Provides a [PathInfoService] implementation using
21/// [Bigtable](https://cloud.google.com/bigtable/docs/)
22/// as an underlying K/V store.
23///
24/// # Data format
25/// We use Bigtable as a plain K/V store.
26/// The row key is the digest of the store path, in hexlower.
27/// Inside the row, we currently have a single column/cell, again using the
28/// hexlower store path digest.
29/// Its value is the PathInfo message, serialized in canonical protobuf.
30/// We currently only populate this column.
31///
32/// Listing is ranging over all rows, and calculate_nar is returning a
33/// "unimplemented" error.
34#[derive(Clone)]
35pub struct BigtablePathInfoService {
36    instance_name: String,
37    client: bigtable::BigTable,
38    params: BigtableParameters,
39
40    #[cfg(test)]
41    #[allow(dead_code)]
42    /// Holds the temporary directory containing the unix socket, and the
43    /// spawned emulator process.
44    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
45}
46
47impl BigtablePathInfoService {
48    #[cfg(not(test))]
49    pub async fn connect(
50        instance_name: String,
51        params: BigtableParameters,
52    ) -> Result<Self, bigtable::Error> {
53        let connection = bigtable::BigTableConnection::new(
54            &params.project_id,
55            &params.instance_name,
56            params.is_read_only,
57            params.channel_size,
58            params.timeout,
59        )
60        .await?;
61
62        Ok(Self {
63            instance_name,
64            client: connection.client(),
65            params,
66        })
67    }
68
69    #[cfg(test)]
70    pub async fn connect(
71        instance_name: String,
72        params: BigtableParameters,
73    ) -> Result<Self, bigtable::Error> {
74        use std::time::Duration;
75
76        use async_process::{Command, Stdio};
77        use tempfile::TempDir;
78        use tokio_retry::{Retry, strategy::ExponentialBackoff};
79
80        let tmpdir = TempDir::new().unwrap();
81
82        let socket_path = tmpdir.path().join("cbtemulator.sock");
83
84        let emulator_process = Command::new("cbtemulator")
85            .arg("-address")
86            .arg(socket_path.clone())
87            .stderr(Stdio::piped())
88            .stdout(Stdio::piped())
89            .kill_on_drop(true)
90            .spawn()
91            .expect("failed to spawn emulator");
92
93        Retry::spawn(
94            ExponentialBackoff::from_millis(20)
95                .max_delay(Duration::from_secs(1))
96                .take(3),
97            || async {
98                if socket_path.exists() {
99                    Ok(())
100                } else {
101                    Err(())
102                }
103            },
104        )
105        .await
106        .expect("failed to wait for socket");
107
108        // populate the emulator
109        for cmd in &[
110            vec!["createtable", &params.table_name],
111            vec!["createfamily", &params.table_name, &params.family_name],
112        ] {
113            Command::new("cbt")
114                .args({
115                    let mut args = vec![
116                        "-instance",
117                        &params.instance_name,
118                        "-project",
119                        &params.project_id,
120                    ];
121                    args.extend_from_slice(cmd);
122                    args
123                })
124                .env(
125                    "BIGTABLE_EMULATOR_HOST",
126                    format!("unix://{}", socket_path.to_string_lossy()),
127                )
128                .output()
129                .await
130                .expect("failed to run cbt setup command");
131        }
132
133        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
134            &format!("unix://{}", socket_path.to_string_lossy()),
135            &params.project_id,
136            &params.instance_name,
137            false,
138            None,
139        )?;
140
141        Ok(Self {
142            instance_name: instance_name.to_string(),
143            client: connection.client(),
144            params,
145            emulator: (tmpdir, emulator_process).into(),
146        })
147    }
148}
149
150/// Derives the row/column key for a given output path.
151/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
152fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
153    HEXLOWER.encode(digest)
154}
155
156#[async_trait]
157impl PathInfoService for BigtablePathInfoService {
158    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
159    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
160        let mut client = self.client.clone();
161        let path_info_key = derive_pathinfo_key(&digest);
162
163        let request = bigtable_v2::ReadRowsRequest {
164            app_profile_id: self.params.app_profile_id.to_string(),
165            table_name: client.get_full_table_name(&self.params.table_name),
166            rows_limit: 1,
167            rows: Some(bigtable_v2::RowSet {
168                row_keys: vec![path_info_key.clone().into()],
169                row_ranges: vec![],
170            }),
171            // Filter selected family name, and column qualifier matching the digest.
172            // The latter is to ensure we don't fail once we start adding more metadata.
173            filter: Some(bigtable_v2::RowFilter {
174                filter: Some(bigtable_v2::row_filter::Filter::Chain(
175                    bigtable_v2::row_filter::Chain {
176                        filters: vec![
177                            bigtable_v2::RowFilter {
178                                filter: Some(
179                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
180                                        self.params.family_name.to_string(),
181                                    ),
182                                ),
183                            },
184                            bigtable_v2::RowFilter {
185                                filter: Some(
186                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
187                                        path_info_key.clone().into(),
188                                    ),
189                                ),
190                            },
191                        ],
192                    },
193                )),
194            }),
195            ..Default::default()
196        };
197
198        let mut response = client
199            .read_rows(request)
200            .await
201            .map_err(|e| Error::BigTable {
202                msg: "reading rows",
203                source: e,
204            })?;
205
206        if response.len() != 1 {
207            if response.len() > 1 {
208                // This shouldn't happen, we limit number of rows to 1
209                Err(Error::UnexpectedDataReturned("got more than one row"))?
210            }
211            // else, this is simply a "not found".
212            return Ok(None);
213        }
214
215        let (row_key, mut cells) = response.pop().unwrap();
216        if row_key != path_info_key.as_bytes() {
217            // This shouldn't happen, we requested this row key.
218            Err(Error::UnexpectedDataReturned("got wrong row key"))?
219        }
220
221        let cell = cells
222            .pop()
223            .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
224
225        // Ensure there's only one cell (so no more left after the pop())
226        // This shouldn't happen, We filter out other cells in our query.
227        if !cells.is_empty() {
228            Err(Error::UnexpectedDataReturned("got more than one cell"))?
229        }
230
231        // We also require the qualifier to be correct in the filter above,
232        // so this shouldn't happen.
233        if path_info_key.as_bytes() != cell.qualifier {
234            Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
235        }
236
237        // Try to parse the value into a PathInfo message
238        let path_info_proto =
239            proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
240
241        let path_info = PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
242
243        if path_info.store_path.digest() != &digest {
244            Err(Error::PathInfoUnexpectedDigest)?;
245        }
246
247        Ok(Some(path_info))
248    }
249
250    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
251    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
252        let mut client = self.client.clone();
253        let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
254
255        let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
256        if data.len() as u64 > CELL_SIZE_LIMIT {
257            Err(Error::PathInfoTooBig)?;
258        }
259
260        let resp = client
261            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
262                table_name: client.get_full_table_name(&self.params.table_name),
263                app_profile_id: self.params.app_profile_id.to_string(),
264                authorized_view_name: "".to_string(),
265                row_key: path_info_key.clone().into(),
266                predicate_filter: Some(bigtable_v2::RowFilter {
267                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
268                        path_info_key.clone().into(),
269                    )),
270                }),
271                // If the column was already found, do nothing.
272                true_mutations: vec![],
273                // Else, do the insert.
274                false_mutations: vec![
275                    // https://cloud.google.com/bigtable/docs/writes
276                    bigtable_v2::Mutation {
277                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
278                            bigtable_v2::mutation::SetCell {
279                                family_name: self.params.family_name.to_string(),
280                                column_qualifier: path_info_key.clone().into(),
281                                timestamp_micros: -1, // use server time to fill timestamp
282                                value: data,
283                            },
284                        )),
285                    },
286                ],
287            })
288            .await
289            .map_err(|e| Error::BigTable {
290                msg: "mutating rows",
291                source: e,
292            })?;
293
294        if resp.predicate_matched {
295            trace!("already existed")
296        }
297
298        Ok(path_info)
299    }
300
301    fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
302        let mut client = self.client.clone();
303
304        let request = bigtable_v2::ReadRowsRequest {
305            app_profile_id: self.params.app_profile_id.to_string(),
306            table_name: client.get_full_table_name(&self.params.table_name),
307            filter: Some(bigtable_v2::RowFilter {
308                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
309                    self.params.family_name.to_string(),
310                )),
311            }),
312            ..Default::default()
313        };
314
315        try_stream! {
316            let mut rows = client
317                .stream_rows(request)
318                .await
319                .map_err(|e| Error::BigTable {
320                    msg: "send stream rows request",
321                    source: e,
322                })?;
323
324            while let Some((row_key, mut cells)) =
325                rows.try_next().await.map_err(|e| Error::BigTable {
326                    msg: "stream rows",
327                    source: e,
328                })?
329            {
330                let span = Span::current();
331                span.record("row.key", bstr::BStr::new(&row_key).to_string());
332
333                let cell = cells
334                    .pop()
335                    .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
336
337                // Ensure there's only one cell (so no more left after the pop())
338                // This shouldn't happen, We filter out other cells in our query.
339                if !cells.is_empty() {
340                    Err(Error::UnexpectedDataReturned("got more than one cell"))?
341                }
342
343                // The cell must have the same qualifier as the row key
344                if row_key != cell.qualifier {
345                    Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
346                }
347
348                // Try to parse the value into a PathInfo message.
349                let path_info_proto =
350                    proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
351
352                let path_info =
353                    PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
354
355                let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
356
357                if exp_path_info_key.as_bytes() != row_key.as_slice() {
358                    Err(Error::PathInfoUnexpectedDigest)?;
359                }
360
361                yield path_info
362            }
363        }
364        .boxed()
365    }
366}
367
368#[derive(thiserror::Error, Debug)]
369pub enum Error {
370    #[error("wrong arguments: {0}")]
371    WrongConfig(&'static str),
372    #[error("serde-qs error: {0}")]
373    SerdeQS(#[from] serde_qs::Error),
374
375    #[error("failed to decode protobuf: {0}")]
376    ProtobufDecode(#[from] prost::DecodeError),
377    #[error("failed to validate PathInfo: {0}")]
378    PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
379    #[error("PathInfo has unexpected digest")]
380    PathInfoUnexpectedDigest,
381    #[error("PathInfo exceeds cell limit on Bigtable")]
382    PathInfoTooBig,
383
384    /// These should never happen, we simply double-check some of the data
385    /// returned by bigtable to actually match what we requested.
386    #[error("bigtable returned unexpected data: {0}")]
387    UnexpectedDataReturned(&'static str),
388    #[error("bigtable error occured while {msg}: {source}")]
389    BigTable {
390        msg: &'static str,
391        #[source]
392        source: bigtable::Error,
393    },
394}
395
396/// Represents configuration of [BigtablePathInfoService].
397/// This currently conflates both connect parameters and data model/client
398/// behaviour parameters.
399#[serde_as]
400#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
401pub struct BigtableParameters {
402    project_id: String,
403    instance_name: String,
404    #[serde(default)]
405    is_read_only: bool,
406    #[serde(default = "default_channel_size")]
407    channel_size: usize,
408
409    #[serde_as(as = "Option<DurationSeconds<String>>")]
410    #[serde(default = "default_timeout")]
411    timeout: Option<std::time::Duration>,
412    table_name: String,
413    family_name: String,
414
415    #[serde(default = "default_app_profile_id")]
416    app_profile_id: String,
417}
418
419fn default_app_profile_id() -> String {
420    "default".to_owned()
421}
422
423fn default_channel_size() -> usize {
424    4
425}
426
427fn default_timeout() -> Option<std::time::Duration> {
428    Some(std::time::Duration::from_secs(4))
429}
430
431impl BigtableParameters {
432    #[cfg(test)]
433    pub fn default_for_tests() -> Self {
434        Self {
435            project_id: "project-1".into(),
436            instance_name: "instance-1".into(),
437            is_read_only: false,
438            channel_size: default_channel_size(),
439            timeout: default_timeout(),
440            table_name: "table-1".into(),
441            family_name: "cf1".into(),
442            app_profile_id: default_app_profile_id(),
443        }
444    }
445}
446
447#[async_trait]
448impl ServiceBuilder for BigtableParameters {
449    type Output = dyn PathInfoService;
450    async fn build<'a>(
451        &'a self,
452        instance_name: &str,
453        _context: &CompositionContext,
454    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
455        Ok(Arc::new(
456            BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
457        ))
458    }
459}
460
461impl TryFrom<url::Url> for BigtableParameters {
462    type Error = Box<dyn std::error::Error + Send + Sync>;
463    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
464        // parse the instance name from the hostname.
465        let instance_name = url
466            .host_str()
467            .ok_or_else(|| Error::WrongConfig("instance name missing"))?
468            .to_owned();
469
470        // … but add it to the query string now, so we just need to parse that.
471        url.query_pairs_mut()
472            .append_pair("instance_name", &instance_name);
473
474        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
475
476        Ok(params)
477    }
478}