snix_store/pathinfoservice/
bigtable.rs1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use async_stream::try_stream;
4use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
5use data_encoding::HEXLOWER;
6use futures::{StreamExt, TryStreamExt, stream::BoxStream};
7use nix_compat::nixbase32;
8use prost::Message;
9use serde::{Deserialize, Serialize};
10use serde_with::{DurationSeconds, serde_as};
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12use std::sync::Arc;
13use tonic::async_trait;
14use tracing::{Span, instrument, trace};
15
16const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
19
20#[derive(Clone)]
35pub struct BigtablePathInfoService {
36 instance_name: String,
37 client: bigtable::BigTable,
38 params: BigtableParameters,
39
40 #[cfg(test)]
41 #[allow(dead_code)]
42 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
45}
46
47impl BigtablePathInfoService {
48 #[cfg(not(test))]
49 pub async fn connect(
50 instance_name: String,
51 params: BigtableParameters,
52 ) -> Result<Self, bigtable::Error> {
53 let connection = bigtable::BigTableConnection::new(
54 ¶ms.project_id,
55 ¶ms.instance_name,
56 params.is_read_only,
57 params.channel_size,
58 params.timeout,
59 )
60 .await?;
61
62 Ok(Self {
63 instance_name,
64 client: connection.client(),
65 params,
66 })
67 }
68
69 #[cfg(test)]
70 pub async fn connect(
71 instance_name: String,
72 params: BigtableParameters,
73 ) -> Result<Self, bigtable::Error> {
74 use std::time::Duration;
75
76 use async_process::{Command, Stdio};
77 use tempfile::TempDir;
78 use tokio_retry::{Retry, strategy::ExponentialBackoff};
79
80 let tmpdir = TempDir::new().unwrap();
81
82 let socket_path = tmpdir.path().join("cbtemulator.sock");
83
84 let emulator_process = Command::new("cbtemulator")
85 .arg("-address")
86 .arg(socket_path.clone())
87 .stderr(Stdio::piped())
88 .stdout(Stdio::piped())
89 .kill_on_drop(true)
90 .spawn()
91 .expect("failed to spawn emulator");
92
93 Retry::spawn(
94 ExponentialBackoff::from_millis(20)
95 .max_delay(Duration::from_secs(1))
96 .take(3),
97 || async {
98 if socket_path.exists() {
99 Ok(())
100 } else {
101 Err(())
102 }
103 },
104 )
105 .await
106 .expect("failed to wait for socket");
107
108 for cmd in &[
110 vec!["createtable", ¶ms.table_name],
111 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
112 ] {
113 Command::new("cbt")
114 .args({
115 let mut args = vec![
116 "-instance",
117 ¶ms.instance_name,
118 "-project",
119 ¶ms.project_id,
120 ];
121 args.extend_from_slice(cmd);
122 args
123 })
124 .env(
125 "BIGTABLE_EMULATOR_HOST",
126 format!("unix://{}", socket_path.to_string_lossy()),
127 )
128 .output()
129 .await
130 .expect("failed to run cbt setup command");
131 }
132
133 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
134 &format!("unix://{}", socket_path.to_string_lossy()),
135 ¶ms.project_id,
136 ¶ms.instance_name,
137 false,
138 None,
139 )?;
140
141 Ok(Self {
142 instance_name: instance_name.to_string(),
143 client: connection.client(),
144 params,
145 emulator: (tmpdir, emulator_process).into(),
146 })
147 }
148}
149
150fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
153 HEXLOWER.encode(digest)
154}
155
156#[async_trait]
157impl PathInfoService for BigtablePathInfoService {
158 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
159 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
160 let mut client = self.client.clone();
161 let path_info_key = derive_pathinfo_key(&digest);
162
163 let request = bigtable_v2::ReadRowsRequest {
164 app_profile_id: self.params.app_profile_id.to_string(),
165 table_name: client.get_full_table_name(&self.params.table_name),
166 rows_limit: 1,
167 rows: Some(bigtable_v2::RowSet {
168 row_keys: vec![path_info_key.clone().into()],
169 row_ranges: vec![],
170 }),
171 filter: Some(bigtable_v2::RowFilter {
174 filter: Some(bigtable_v2::row_filter::Filter::Chain(
175 bigtable_v2::row_filter::Chain {
176 filters: vec![
177 bigtable_v2::RowFilter {
178 filter: Some(
179 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
180 self.params.family_name.to_string(),
181 ),
182 ),
183 },
184 bigtable_v2::RowFilter {
185 filter: Some(
186 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
187 path_info_key.clone().into(),
188 ),
189 ),
190 },
191 ],
192 },
193 )),
194 }),
195 ..Default::default()
196 };
197
198 let mut response = client
199 .read_rows(request)
200 .await
201 .map_err(|e| Error::BigTable {
202 msg: "reading rows",
203 source: e,
204 })?;
205
206 if response.len() != 1 {
207 if response.len() > 1 {
208 Err(Error::UnexpectedDataReturned("got more than one row"))?
210 }
211 return Ok(None);
213 }
214
215 let (row_key, mut cells) = response.pop().unwrap();
216 if row_key != path_info_key.as_bytes() {
217 Err(Error::UnexpectedDataReturned("got wrong row key"))?
219 }
220
221 let cell = cells
222 .pop()
223 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
224
225 if !cells.is_empty() {
228 Err(Error::UnexpectedDataReturned("got more than one cell"))?
229 }
230
231 if path_info_key.as_bytes() != cell.qualifier {
234 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
235 }
236
237 let path_info_proto =
239 proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
240
241 let path_info = PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
242
243 if path_info.store_path.digest() != &digest {
244 Err(Error::PathInfoUnexpectedDigest)?;
245 }
246
247 Ok(Some(path_info))
248 }
249
250 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
251 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
252 let mut client = self.client.clone();
253 let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
254
255 let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
256 if data.len() as u64 > CELL_SIZE_LIMIT {
257 Err(Error::PathInfoTooBig)?;
258 }
259
260 let resp = client
261 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
262 table_name: client.get_full_table_name(&self.params.table_name),
263 app_profile_id: self.params.app_profile_id.to_string(),
264 authorized_view_name: "".to_string(),
265 row_key: path_info_key.clone().into(),
266 predicate_filter: Some(bigtable_v2::RowFilter {
267 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
268 path_info_key.clone().into(),
269 )),
270 }),
271 true_mutations: vec![],
273 false_mutations: vec![
275 bigtable_v2::Mutation {
277 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
278 bigtable_v2::mutation::SetCell {
279 family_name: self.params.family_name.to_string(),
280 column_qualifier: path_info_key.clone().into(),
281 timestamp_micros: -1, value: data,
283 },
284 )),
285 },
286 ],
287 })
288 .await
289 .map_err(|e| Error::BigTable {
290 msg: "mutating rows",
291 source: e,
292 })?;
293
294 if resp.predicate_matched {
295 trace!("already existed")
296 }
297
298 Ok(path_info)
299 }
300
301 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
302 let mut client = self.client.clone();
303
304 let request = bigtable_v2::ReadRowsRequest {
305 app_profile_id: self.params.app_profile_id.to_string(),
306 table_name: client.get_full_table_name(&self.params.table_name),
307 filter: Some(bigtable_v2::RowFilter {
308 filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
309 self.params.family_name.to_string(),
310 )),
311 }),
312 ..Default::default()
313 };
314
315 try_stream! {
316 let mut rows = client
317 .stream_rows(request)
318 .await
319 .map_err(|e| Error::BigTable {
320 msg: "send stream rows request",
321 source: e,
322 })?;
323
324 while let Some((row_key, mut cells)) =
325 rows.try_next().await.map_err(|e| Error::BigTable {
326 msg: "stream rows",
327 source: e,
328 })?
329 {
330 let span = Span::current();
331 span.record("row.key", bstr::BStr::new(&row_key).to_string());
332
333 let cell = cells
334 .pop()
335 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
336
337 if !cells.is_empty() {
340 Err(Error::UnexpectedDataReturned("got more than one cell"))?
341 }
342
343 if row_key != cell.qualifier {
345 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
346 }
347
348 let path_info_proto =
350 proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
351
352 let path_info =
353 PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
354
355 let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
356
357 if exp_path_info_key.as_bytes() != row_key.as_slice() {
358 Err(Error::PathInfoUnexpectedDigest)?;
359 }
360
361 yield path_info
362 }
363 }
364 .boxed()
365 }
366}
367
368#[derive(thiserror::Error, Debug)]
369pub enum Error {
370 #[error("wrong arguments: {0}")]
371 WrongConfig(&'static str),
372 #[error("serde-qs error: {0}")]
373 SerdeQS(#[from] serde_qs::Error),
374
375 #[error("failed to decode protobuf: {0}")]
376 ProtobufDecode(#[from] prost::DecodeError),
377 #[error("failed to validate PathInfo: {0}")]
378 PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
379 #[error("PathInfo has unexpected digest")]
380 PathInfoUnexpectedDigest,
381 #[error("PathInfo exceeds cell limit on Bigtable")]
382 PathInfoTooBig,
383
384 #[error("bigtable returned unexpected data: {0}")]
387 UnexpectedDataReturned(&'static str),
388 #[error("bigtable error occured while {msg}: {source}")]
389 BigTable {
390 msg: &'static str,
391 #[source]
392 source: bigtable::Error,
393 },
394}
395
396#[serde_as]
400#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
401pub struct BigtableParameters {
402 project_id: String,
403 instance_name: String,
404 #[serde(default)]
405 is_read_only: bool,
406 #[serde(default = "default_channel_size")]
407 channel_size: usize,
408
409 #[serde_as(as = "Option<DurationSeconds<String>>")]
410 #[serde(default = "default_timeout")]
411 timeout: Option<std::time::Duration>,
412 table_name: String,
413 family_name: String,
414
415 #[serde(default = "default_app_profile_id")]
416 app_profile_id: String,
417}
418
419fn default_app_profile_id() -> String {
420 "default".to_owned()
421}
422
423fn default_channel_size() -> usize {
424 4
425}
426
427fn default_timeout() -> Option<std::time::Duration> {
428 Some(std::time::Duration::from_secs(4))
429}
430
431impl BigtableParameters {
432 #[cfg(test)]
433 pub fn default_for_tests() -> Self {
434 Self {
435 project_id: "project-1".into(),
436 instance_name: "instance-1".into(),
437 is_read_only: false,
438 channel_size: default_channel_size(),
439 timeout: default_timeout(),
440 table_name: "table-1".into(),
441 family_name: "cf1".into(),
442 app_profile_id: default_app_profile_id(),
443 }
444 }
445}
446
447#[async_trait]
448impl ServiceBuilder for BigtableParameters {
449 type Output = dyn PathInfoService;
450 async fn build<'a>(
451 &'a self,
452 instance_name: &str,
453 _context: &CompositionContext,
454 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
455 Ok(Arc::new(
456 BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
457 ))
458 }
459}
460
461impl TryFrom<url::Url> for BigtableParameters {
462 type Error = Box<dyn std::error::Error + Send + Sync>;
463 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
464 let instance_name = url
466 .host_str()
467 .ok_or_else(|| Error::WrongConfig("instance name missing"))?
468 .to_owned();
469
470 url.query_pairs_mut()
472 .append_pair("instance_name", &instance_name);
473
474 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
475
476 Ok(params)
477 }
478}