snix_store/pathinfoservice/
bigtable.rs1use super::{PathInfo, PathInfoService};
2use crate::{pathinfoservice, proto};
3use async_stream::try_stream;
4use bigtable_rs::bigtable;
5use data_encoding::HEXLOWER;
6use futures::{StreamExt, TryStreamExt, stream::BoxStream};
7use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable_v2;
8use nix_compat::nixbase32;
9use prost::Message;
10use serde::{Deserialize, Serialize};
11use serde_with::{DurationSeconds, serde_as};
12use snix_castore::composition::{CompositionContext, ServiceBuilder};
13use std::sync::Arc;
14use tonic::async_trait;
15use tracing::{Span, instrument, trace};
16
17const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21#[derive(Clone)]
36pub struct BigtablePathInfoService {
37 instance_name: String,
38 client: bigtable::BigTable,
39 params: BigtableParameters,
40
41 #[cfg(test)]
42 #[allow(dead_code)]
43 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
46}
47
48impl BigtablePathInfoService {
49 #[cfg(not(test))]
50 pub async fn connect(
51 instance_name: String,
52 params: BigtableParameters,
53 ) -> Result<Self, bigtable::Error> {
54 let connection = bigtable::BigTableConnection::new(
55 ¶ms.project_id,
56 ¶ms.instance_name,
57 params.is_read_only,
58 params.channel_size,
59 params.timeout,
60 )
61 .await?;
62
63 Ok(Self {
64 instance_name,
65 client: connection.client(),
66 params,
67 })
68 }
69
70 #[cfg(test)]
71 pub async fn connect(
72 instance_name: String,
73 params: BigtableParameters,
74 ) -> Result<Self, bigtable::Error> {
75 use std::time::Duration;
76
77 use async_process::{Command, Stdio};
78 use tempfile::TempDir;
79 use tokio_retry::{Retry, strategy::ExponentialBackoff};
80
81 let tmpdir = TempDir::new().unwrap();
82
83 let socket_path = tmpdir.path().join("cbtemulator.sock");
84
85 let emulator_process = Command::new("cbtemulator")
86 .arg("-address")
87 .arg(socket_path.clone())
88 .stderr(Stdio::piped())
89 .stdout(Stdio::piped())
90 .kill_on_drop(true)
91 .spawn()
92 .expect("failed to spawn emulator");
93
94 Retry::spawn(
95 ExponentialBackoff::from_millis(20)
96 .max_delay(Duration::from_secs(1))
97 .take(3),
98 || async {
99 if socket_path.exists() {
100 Ok(())
101 } else {
102 Err(())
103 }
104 },
105 )
106 .await
107 .expect("failed to wait for socket");
108
109 for cmd in &[
111 vec!["createtable", ¶ms.table_name],
112 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
113 ] {
114 Command::new("cbt")
115 .args({
116 let mut args = vec![
117 "-instance",
118 ¶ms.instance_name,
119 "-project",
120 ¶ms.project_id,
121 ];
122 args.extend_from_slice(cmd);
123 args
124 })
125 .env(
126 "BIGTABLE_EMULATOR_HOST",
127 format!("unix://{}", socket_path.to_string_lossy()),
128 )
129 .output()
130 .await
131 .expect("failed to run cbt setup command");
132 }
133
134 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
135 &format!("unix://{}", socket_path.to_string_lossy()),
136 ¶ms.project_id,
137 ¶ms.instance_name,
138 false,
139 1,
140 None,
141 )?;
142
143 Ok(Self {
144 instance_name: instance_name.to_string(),
145 client: connection.client(),
146 params,
147 emulator: (tmpdir, emulator_process).into(),
148 })
149 }
150}
151
152fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
155 HEXLOWER.encode(digest)
156}
157
158#[async_trait]
159impl PathInfoService for BigtablePathInfoService {
160 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
161 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
162 let mut client = self.client.clone();
163 let path_info_key = derive_pathinfo_key(&digest);
164
165 let request = bigtable_v2::ReadRowsRequest {
166 app_profile_id: self.params.app_profile_id.to_string(),
167 table_name: client.get_full_table_name(&self.params.table_name),
168 rows_limit: 1,
169 rows: Some(bigtable_v2::RowSet {
170 row_keys: vec![path_info_key.clone().into()],
171 row_ranges: vec![],
172 }),
173 filter: Some(bigtable_v2::RowFilter {
176 filter: Some(bigtable_v2::row_filter::Filter::Chain(
177 bigtable_v2::row_filter::Chain {
178 filters: vec![
179 bigtable_v2::RowFilter {
180 filter: Some(
181 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
182 self.params.family_name.to_string(),
183 ),
184 ),
185 },
186 bigtable_v2::RowFilter {
187 filter: Some(
188 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
189 path_info_key.clone().into(),
190 ),
191 ),
192 },
193 ],
194 },
195 )),
196 }),
197 ..Default::default()
198 };
199
200 let mut response = client
201 .read_rows(request)
202 .await
203 .map_err(|e| Error::BigTable {
204 msg: "reading rows",
205 source: e,
206 })?;
207
208 if response.len() != 1 {
209 if response.len() > 1 {
210 Err(Error::UnexpectedDataReturned("got more than one row"))?
212 }
213 return Ok(None);
215 }
216
217 let (row_key, mut cells) = response.pop().unwrap();
218 if row_key != path_info_key.as_bytes() {
219 Err(Error::UnexpectedDataReturned("got wrong row key"))?
221 }
222
223 let cell = cells
224 .pop()
225 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
226
227 if !cells.is_empty() {
230 Err(Error::UnexpectedDataReturned("got more than one cell"))?
231 }
232
233 if path_info_key.as_bytes() != cell.qualifier {
236 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
237 }
238
239 let path_info_proto =
241 proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
242
243 let path_info = PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
244
245 if path_info.store_path.digest() != &digest {
246 Err(Error::PathInfoUnexpectedDigest)?;
247 }
248
249 Ok(Some(path_info))
250 }
251
252 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
253 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
254 let mut client = self.client.clone();
255 let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
256
257 let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
258 if data.len() as u64 > CELL_SIZE_LIMIT {
259 Err(Error::PathInfoTooBig)?;
260 }
261
262 let resp = client
263 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
264 table_name: client.get_full_table_name(&self.params.table_name),
265 app_profile_id: self.params.app_profile_id.to_string(),
266 authorized_view_name: "".to_string(),
267 row_key: path_info_key.clone().into(),
268 predicate_filter: Some(bigtable_v2::RowFilter {
269 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
270 path_info_key.clone().into(),
271 )),
272 }),
273 true_mutations: vec![],
275 false_mutations: vec![
277 bigtable_v2::Mutation {
279 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
280 bigtable_v2::mutation::SetCell {
281 family_name: self.params.family_name.to_string(),
282 column_qualifier: path_info_key.clone().into(),
283 timestamp_micros: -1, value: data,
285 },
286 )),
287 },
288 ],
289 })
290 .await
291 .map_err(|e| Error::BigTable {
292 msg: "mutating rows",
293 source: e,
294 })?;
295
296 if resp.predicate_matched {
297 trace!("already existed")
298 }
299
300 Ok(path_info)
301 }
302
303 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
304 let mut client = self.client.clone();
305
306 let request = bigtable_v2::ReadRowsRequest {
307 app_profile_id: self.params.app_profile_id.to_string(),
308 table_name: client.get_full_table_name(&self.params.table_name),
309 filter: Some(bigtable_v2::RowFilter {
310 filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
311 self.params.family_name.to_string(),
312 )),
313 }),
314 ..Default::default()
315 };
316
317 try_stream! {
318 let mut rows = client
319 .stream_rows(request)
320 .await
321 .map_err(|e| Error::BigTable {
322 msg: "send stream rows request",
323 source: e,
324 })?;
325
326 while let Some((row_key, mut cells)) =
327 rows.try_next().await.map_err(|e| Error::BigTable {
328 msg: "stream rows",
329 source: e,
330 })?
331 {
332 let span = Span::current();
333 span.record("row.key", bstr::BStr::new(&row_key).to_string());
334
335 let cell = cells
336 .pop()
337 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
338
339 if !cells.is_empty() {
342 Err(Error::UnexpectedDataReturned("got more than one cell"))?
343 }
344
345 if row_key != cell.qualifier {
347 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
348 }
349
350 let path_info_proto =
352 proto::PathInfo::decode(cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
353
354 let path_info =
355 PathInfo::try_from(path_info_proto).map_err(Error::PathInfoValidation)?;
356
357 let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
358
359 if exp_path_info_key.as_bytes() != row_key.as_slice() {
360 Err(Error::PathInfoUnexpectedDigest)?;
361 }
362
363 yield path_info
364 }
365 }
366 .boxed()
367 }
368}
369
370#[derive(thiserror::Error, Debug)]
371pub enum Error {
372 #[error("wrong arguments: {0}")]
373 WrongConfig(&'static str),
374 #[error("serde-qs error: {0}")]
375 SerdeQS(#[from] serde_qs::Error),
376
377 #[error("failed to decode protobuf: {0}")]
378 ProtobufDecode(#[from] prost::DecodeError),
379 #[error("failed to validate PathInfo: {0}")]
380 PathInfoValidation(#[from] crate::proto::ValidatePathInfoError),
381 #[error("PathInfo has unexpected digest")]
382 PathInfoUnexpectedDigest,
383 #[error("PathInfo exceeds cell limit on Bigtable")]
384 PathInfoTooBig,
385
386 #[error("bigtable returned unexpected data: {0}")]
389 UnexpectedDataReturned(&'static str),
390 #[error("bigtable error occurred while {msg}: {source}")]
391 BigTable {
392 msg: &'static str,
393 #[source]
394 source: bigtable::Error,
395 },
396}
397
398#[serde_as]
402#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
403pub struct BigtableParameters {
404 project_id: String,
405 instance_name: String,
406 #[serde(default)]
407 is_read_only: bool,
408 #[serde(default = "default_channel_size")]
409 channel_size: usize,
410
411 #[serde_as(as = "Option<DurationSeconds<String>>")]
412 #[serde(default = "default_timeout")]
413 timeout: Option<std::time::Duration>,
414 table_name: String,
415 family_name: String,
416
417 #[serde(default = "default_app_profile_id")]
418 app_profile_id: String,
419}
420
421fn default_app_profile_id() -> String {
422 "default".to_owned()
423}
424
425fn default_channel_size() -> usize {
426 4
427}
428
429fn default_timeout() -> Option<std::time::Duration> {
430 Some(std::time::Duration::from_secs(4))
431}
432
433impl BigtableParameters {
434 #[cfg(test)]
435 pub fn default_for_tests() -> Self {
436 Self {
437 project_id: "project-1".into(),
438 instance_name: "instance-1".into(),
439 is_read_only: false,
440 channel_size: default_channel_size(),
441 timeout: default_timeout(),
442 table_name: "table-1".into(),
443 family_name: "cf1".into(),
444 app_profile_id: default_app_profile_id(),
445 }
446 }
447}
448
449#[async_trait]
450impl ServiceBuilder for BigtableParameters {
451 type Output = dyn PathInfoService;
452 async fn build<'a>(
453 &'a self,
454 instance_name: &str,
455 _context: &CompositionContext,
456 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
457 Ok(Arc::new(
458 BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
459 ))
460 }
461}
462
463impl TryFrom<url::Url> for BigtableParameters {
464 type Error = Box<dyn std::error::Error + Send + Sync>;
465 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
466 let instance_name = url
468 .host_str()
469 .ok_or_else(|| Error::WrongConfig("instance name missing"))?
470 .to_owned();
471
472 url.query_pairs_mut()
474 .append_pair("instance_name", &instance_name);
475
476 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
477
478 Ok(params)
479 }
480}