snix_store/pathinfoservice/
bigtable.rs

1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use async_stream::try_stream;
4use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
5use bytes::Bytes;
6use data_encoding::HEXLOWER;
7use futures::stream::BoxStream;
8use nix_compat::nixbase32;
9use prost::Message;
10use serde::{Deserialize, Serialize};
11use serde_with::{DurationSeconds, serde_as};
12use snix_castore::Error;
13use snix_castore::composition::{CompositionContext, ServiceBuilder};
14use std::sync::Arc;
15use tonic::async_trait;
16use tracing::{Span, instrument, trace, warn};
17
18/// There should not be more than 10 MiB in a single cell.
19/// <https://cloud.google.com/bigtable/docs/schema-design#cells>
20const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22/// Provides a [PathInfoService] implementation using
23/// [Bigtable](https://cloud.google.com/bigtable/docs/)
24/// as an underlying K/V store.
25///
26/// # Data format
27/// We use Bigtable as a plain K/V store.
28/// The row key is the digest of the store path, in hexlower.
29/// Inside the row, we currently have a single column/cell, again using the
30/// hexlower store path digest.
31/// Its value is the PathInfo message, serialized in canonical protobuf.
32/// We currently only populate this column.
33///
34/// Listing is ranging over all rows, and calculate_nar is returning a
35/// "unimplemented" error.
36#[derive(Clone)]
37pub struct BigtablePathInfoService {
38    instance_name: String,
39    client: bigtable::BigTable,
40    params: BigtableParameters,
41
42    #[cfg(test)]
43    #[allow(dead_code)]
44    /// Holds the temporary directory containing the unix socket, and the
45    /// spawned emulator process.
46    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
47}
48
49impl BigtablePathInfoService {
50    #[cfg(not(test))]
51    pub async fn connect(
52        instance_name: String,
53        params: BigtableParameters,
54    ) -> Result<Self, bigtable::Error> {
55        let connection = bigtable::BigTableConnection::new(
56            &params.project_id,
57            &params.instance_name,
58            params.is_read_only,
59            params.channel_size,
60            params.timeout,
61        )
62        .await?;
63
64        Ok(Self {
65            instance_name,
66            client: connection.client(),
67            params,
68        })
69    }
70
71    #[cfg(test)]
72    pub async fn connect(
73        instance_name: String,
74        params: BigtableParameters,
75    ) -> Result<Self, bigtable::Error> {
76        use std::time::Duration;
77
78        use async_process::{Command, Stdio};
79        use tempfile::TempDir;
80        use tokio_retry::{Retry, strategy::ExponentialBackoff};
81
82        let tmpdir = TempDir::new().unwrap();
83
84        let socket_path = tmpdir.path().join("cbtemulator.sock");
85
86        let emulator_process = Command::new("cbtemulator")
87            .arg("-address")
88            .arg(socket_path.clone())
89            .stderr(Stdio::piped())
90            .stdout(Stdio::piped())
91            .kill_on_drop(true)
92            .spawn()
93            .expect("failed to spawn emulator");
94
95        Retry::spawn(
96            ExponentialBackoff::from_millis(20)
97                .max_delay(Duration::from_secs(1))
98                .take(3),
99            || async {
100                if socket_path.exists() {
101                    Ok(())
102                } else {
103                    Err(())
104                }
105            },
106        )
107        .await
108        .expect("failed to wait for socket");
109
110        // populate the emulator
111        for cmd in &[
112            vec!["createtable", &params.table_name],
113            vec!["createfamily", &params.table_name, &params.family_name],
114        ] {
115            Command::new("cbt")
116                .args({
117                    let mut args = vec![
118                        "-instance",
119                        &params.instance_name,
120                        "-project",
121                        &params.project_id,
122                    ];
123                    args.extend_from_slice(cmd);
124                    args
125                })
126                .env(
127                    "BIGTABLE_EMULATOR_HOST",
128                    format!("unix://{}", socket_path.to_string_lossy()),
129                )
130                .output()
131                .await
132                .expect("failed to run cbt setup command");
133        }
134
135        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
136            &format!("unix://{}", socket_path.to_string_lossy()),
137            &params.project_id,
138            &params.instance_name,
139            false,
140            None,
141        )?;
142
143        Ok(Self {
144            instance_name: instance_name.to_string(),
145            client: connection.client(),
146            params,
147            emulator: (tmpdir, emulator_process).into(),
148        })
149    }
150}
151
152/// Derives the row/column key for a given output path.
153/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
154fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
155    HEXLOWER.encode(digest)
156}
157
158#[async_trait]
159impl PathInfoService for BigtablePathInfoService {
160    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
161    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
162        let mut client = self.client.clone();
163        let path_info_key = derive_pathinfo_key(&digest);
164
165        let request = bigtable_v2::ReadRowsRequest {
166            app_profile_id: self.params.app_profile_id.to_string(),
167            table_name: client.get_full_table_name(&self.params.table_name),
168            rows_limit: 1,
169            rows: Some(bigtable_v2::RowSet {
170                row_keys: vec![path_info_key.clone().into()],
171                row_ranges: vec![],
172            }),
173            // Filter selected family name, and column qualifier matching the digest.
174            // The latter is to ensure we don't fail once we start adding more metadata.
175            filter: Some(bigtable_v2::RowFilter {
176                filter: Some(bigtable_v2::row_filter::Filter::Chain(
177                    bigtable_v2::row_filter::Chain {
178                        filters: vec![
179                            bigtable_v2::RowFilter {
180                                filter: Some(
181                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
182                                        self.params.family_name.to_string(),
183                                    ),
184                                ),
185                            },
186                            bigtable_v2::RowFilter {
187                                filter: Some(
188                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
189                                        path_info_key.clone().into(),
190                                    ),
191                                ),
192                            },
193                        ],
194                    },
195                )),
196            }),
197            ..Default::default()
198        };
199
200        let mut response = client
201            .read_rows(request)
202            .await
203            .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
204
205        if response.len() != 1 {
206            if response.len() > 1 {
207                // This shouldn't happen, we limit number of rows to 1
208                return Err(Error::StorageError(
209                    "got more than one row from bigtable".into(),
210                ));
211            }
212            // else, this is simply a "not found".
213            return Ok(None);
214        }
215
216        let (row_key, mut cells) = response.pop().unwrap();
217        if row_key != path_info_key.as_bytes() {
218            // This shouldn't happen, we requested this row key.
219            return Err(Error::StorageError(
220                "got wrong row key from bigtable".into(),
221            ));
222        }
223
224        let cell = cells
225            .pop()
226            .ok_or_else(|| Error::StorageError("found no cells".into()))?;
227
228        // Ensure there's only one cell (so no more left after the pop())
229        // This shouldn't happen, We filter out other cells in our query.
230        if !cells.is_empty() {
231            return Err(Error::StorageError(
232                "more than one cell returned from bigtable".into(),
233            ));
234        }
235
236        // We also require the qualifier to be correct in the filter above,
237        // so this shouldn't happen.
238        if path_info_key.as_bytes() != cell.qualifier {
239            return Err(Error::StorageError("unexpected cell qualifier".into()));
240        }
241
242        // Try to parse the value into a PathInfo message
243        let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
244            .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
245
246        let path_info = PathInfo::try_from(path_info_proto)
247            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
248
249        if path_info.store_path.digest() != &digest {
250            return Err(Error::StorageError("PathInfo has unexpected digest".into()));
251        }
252
253        Ok(Some(path_info))
254    }
255
256    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
257    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
258        let mut client = self.client.clone();
259        let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
260
261        let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
262        if data.len() as u64 > CELL_SIZE_LIMIT {
263            return Err(Error::StorageError(
264                "PathInfo exceeds cell limit on Bigtable".into(),
265            ));
266        }
267
268        let resp = client
269            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
270                table_name: client.get_full_table_name(&self.params.table_name),
271                app_profile_id: self.params.app_profile_id.to_string(),
272                authorized_view_name: "".to_string(),
273                row_key: path_info_key.clone().into(),
274                predicate_filter: Some(bigtable_v2::RowFilter {
275                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
276                        path_info_key.clone().into(),
277                    )),
278                }),
279                // If the column was already found, do nothing.
280                true_mutations: vec![],
281                // Else, do the insert.
282                false_mutations: vec![
283                    // https://cloud.google.com/bigtable/docs/writes
284                    bigtable_v2::Mutation {
285                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
286                            bigtable_v2::mutation::SetCell {
287                                family_name: self.params.family_name.to_string(),
288                                column_qualifier: path_info_key.clone().into(),
289                                timestamp_micros: -1, // use server time to fill timestamp
290                                value: data,
291                            },
292                        )),
293                    },
294                ],
295            })
296            .await
297            .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
298
299        if resp.predicate_matched {
300            trace!("already existed")
301        }
302
303        Ok(path_info)
304    }
305
306    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
307        let mut client = self.client.clone();
308
309        let request = bigtable_v2::ReadRowsRequest {
310            app_profile_id: self.params.app_profile_id.to_string(),
311            table_name: client.get_full_table_name(&self.params.table_name),
312            filter: Some(bigtable_v2::RowFilter {
313                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
314                    self.params.family_name.to_string(),
315                )),
316            }),
317            ..Default::default()
318        };
319
320        let stream = try_stream! {
321            let mut rows = client
322                .stream_rows(request)
323                .await
324                .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?.enumerate();
325
326            use futures::stream::StreamExt;
327
328            while let Some((i, elem)) = rows.next().await {
329                let (row_key, mut cells) = elem.map_err(|e| Error::StorageError(format!("unable to stream row {}: {}", i, e)))?;
330                let span = Span::current();
331                span.record("row.key", bstr::BStr::new(&row_key).to_string());
332
333                let cell = cells
334                    .pop()
335                    .ok_or_else(|| Error::StorageError("found no cells".into()))?;
336
337                // Ensure there's only one cell (so no more left after the pop())
338                // This shouldn't happen, We filter out other cells in our query.
339                if !cells.is_empty() {
340
341                    Err(Error::StorageError(
342                        "more than one cell returned from bigtable".into(),
343                    ))?
344                }
345
346                // The cell must have the same qualifier as the row key
347                if row_key != cell.qualifier {
348                    warn!("unexpected cell qualifier");
349                    Err(Error::StorageError("unexpected cell qualifier".into()))?;
350                }
351
352                // Try to parse the value into a PathInfo message.
353                let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
354                    .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
355
356                let path_info = PathInfo::try_from(path_info_proto).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
357
358                let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
359
360                if exp_path_info_key.as_bytes() != row_key.as_slice() {
361                    Err(Error::StorageError("PathInfo has unexpected digest".into()))?
362                }
363
364
365                yield path_info
366            }
367        };
368
369        Box::pin(stream)
370    }
371}
372
373/// Represents configuration of [BigtablePathInfoService].
374/// This currently conflates both connect parameters and data model/client
375/// behaviour parameters.
376#[serde_as]
377#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
378pub struct BigtableParameters {
379    project_id: String,
380    instance_name: String,
381    #[serde(default)]
382    is_read_only: bool,
383    #[serde(default = "default_channel_size")]
384    channel_size: usize,
385
386    #[serde_as(as = "Option<DurationSeconds<String>>")]
387    #[serde(default = "default_timeout")]
388    timeout: Option<std::time::Duration>,
389    table_name: String,
390    family_name: String,
391
392    #[serde(default = "default_app_profile_id")]
393    app_profile_id: String,
394}
395
396impl BigtableParameters {
397    #[cfg(test)]
398    pub fn default_for_tests() -> Self {
399        Self {
400            project_id: "project-1".into(),
401            instance_name: "instance-1".into(),
402            is_read_only: false,
403            channel_size: default_channel_size(),
404            timeout: default_timeout(),
405            table_name: "table-1".into(),
406            family_name: "cf1".into(),
407            app_profile_id: default_app_profile_id(),
408        }
409    }
410}
411
412fn default_app_profile_id() -> String {
413    "default".to_owned()
414}
415
416fn default_channel_size() -> usize {
417    4
418}
419
420fn default_timeout() -> Option<std::time::Duration> {
421    Some(std::time::Duration::from_secs(4))
422}
423
424#[async_trait]
425impl ServiceBuilder for BigtableParameters {
426    type Output = dyn PathInfoService;
427    async fn build<'a>(
428        &'a self,
429        instance_name: &str,
430        _context: &CompositionContext,
431    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
432        Ok(Arc::new(
433            BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
434        ))
435    }
436}
437
438impl TryFrom<url::Url> for BigtableParameters {
439    type Error = Box<dyn std::error::Error + Send + Sync>;
440    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
441        // parse the instance name from the hostname.
442        let instance_name = url
443            .host_str()
444            .ok_or_else(|| Error::StorageError("instance name missing".into()))?
445            .to_string();
446
447        // … but add it to the query string now, so we just need to parse that.
448        url.query_pairs_mut()
449            .append_pair("instance_name", &instance_name);
450
451        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
452            .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
453
454        Ok(params)
455    }
456}