snix_store/pathinfoservice/
bigtable.rs1use super::{PathInfo, PathInfoService};
2use crate::proto;
3use async_stream::try_stream;
4use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
5use bytes::Bytes;
6use data_encoding::HEXLOWER;
7use futures::stream::BoxStream;
8use nix_compat::nixbase32;
9use prost::Message;
10use serde::{Deserialize, Serialize};
11use serde_with::{DurationSeconds, serde_as};
12use snix_castore::Error;
13use snix_castore::composition::{CompositionContext, ServiceBuilder};
14use std::sync::Arc;
15use tonic::async_trait;
16use tracing::{Span, instrument, trace, warn};
17
18const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22#[derive(Clone)]
37pub struct BigtablePathInfoService {
38 instance_name: String,
39 client: bigtable::BigTable,
40 params: BigtableParameters,
41
42 #[cfg(test)]
43 #[allow(dead_code)]
44 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
47}
48
49impl BigtablePathInfoService {
50 #[cfg(not(test))]
51 pub async fn connect(
52 instance_name: String,
53 params: BigtableParameters,
54 ) -> Result<Self, bigtable::Error> {
55 let connection = bigtable::BigTableConnection::new(
56 ¶ms.project_id,
57 ¶ms.instance_name,
58 params.is_read_only,
59 params.channel_size,
60 params.timeout,
61 )
62 .await?;
63
64 Ok(Self {
65 instance_name,
66 client: connection.client(),
67 params,
68 })
69 }
70
71 #[cfg(test)]
72 pub async fn connect(
73 instance_name: String,
74 params: BigtableParameters,
75 ) -> Result<Self, bigtable::Error> {
76 use std::time::Duration;
77
78 use async_process::{Command, Stdio};
79 use tempfile::TempDir;
80 use tokio_retry::{Retry, strategy::ExponentialBackoff};
81
82 let tmpdir = TempDir::new().unwrap();
83
84 let socket_path = tmpdir.path().join("cbtemulator.sock");
85
86 let emulator_process = Command::new("cbtemulator")
87 .arg("-address")
88 .arg(socket_path.clone())
89 .stderr(Stdio::piped())
90 .stdout(Stdio::piped())
91 .kill_on_drop(true)
92 .spawn()
93 .expect("failed to spawn emulator");
94
95 Retry::spawn(
96 ExponentialBackoff::from_millis(20)
97 .max_delay(Duration::from_secs(1))
98 .take(3),
99 || async {
100 if socket_path.exists() {
101 Ok(())
102 } else {
103 Err(())
104 }
105 },
106 )
107 .await
108 .expect("failed to wait for socket");
109
110 for cmd in &[
112 vec!["createtable", ¶ms.table_name],
113 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
114 ] {
115 Command::new("cbt")
116 .args({
117 let mut args = vec![
118 "-instance",
119 ¶ms.instance_name,
120 "-project",
121 ¶ms.project_id,
122 ];
123 args.extend_from_slice(cmd);
124 args
125 })
126 .env(
127 "BIGTABLE_EMULATOR_HOST",
128 format!("unix://{}", socket_path.to_string_lossy()),
129 )
130 .output()
131 .await
132 .expect("failed to run cbt setup command");
133 }
134
135 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
136 &format!("unix://{}", socket_path.to_string_lossy()),
137 ¶ms.project_id,
138 ¶ms.instance_name,
139 false,
140 None,
141 )?;
142
143 Ok(Self {
144 instance_name: instance_name.to_string(),
145 client: connection.client(),
146 params,
147 emulator: (tmpdir, emulator_process).into(),
148 })
149 }
150}
151
152fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
155 HEXLOWER.encode(digest)
156}
157
158#[async_trait]
159impl PathInfoService for BigtablePathInfoService {
160 #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))]
161 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
162 let mut client = self.client.clone();
163 let path_info_key = derive_pathinfo_key(&digest);
164
165 let request = bigtable_v2::ReadRowsRequest {
166 app_profile_id: self.params.app_profile_id.to_string(),
167 table_name: client.get_full_table_name(&self.params.table_name),
168 rows_limit: 1,
169 rows: Some(bigtable_v2::RowSet {
170 row_keys: vec![path_info_key.clone().into()],
171 row_ranges: vec![],
172 }),
173 filter: Some(bigtable_v2::RowFilter {
176 filter: Some(bigtable_v2::row_filter::Filter::Chain(
177 bigtable_v2::row_filter::Chain {
178 filters: vec![
179 bigtable_v2::RowFilter {
180 filter: Some(
181 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
182 self.params.family_name.to_string(),
183 ),
184 ),
185 },
186 bigtable_v2::RowFilter {
187 filter: Some(
188 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
189 path_info_key.clone().into(),
190 ),
191 ),
192 },
193 ],
194 },
195 )),
196 }),
197 ..Default::default()
198 };
199
200 let mut response = client
201 .read_rows(request)
202 .await
203 .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
204
205 if response.len() != 1 {
206 if response.len() > 1 {
207 return Err(Error::StorageError(
209 "got more than one row from bigtable".into(),
210 ));
211 }
212 return Ok(None);
214 }
215
216 let (row_key, mut cells) = response.pop().unwrap();
217 if row_key != path_info_key.as_bytes() {
218 return Err(Error::StorageError(
220 "got wrong row key from bigtable".into(),
221 ));
222 }
223
224 let cell = cells
225 .pop()
226 .ok_or_else(|| Error::StorageError("found no cells".into()))?;
227
228 if !cells.is_empty() {
231 return Err(Error::StorageError(
232 "more than one cell returned from bigtable".into(),
233 ));
234 }
235
236 if path_info_key.as_bytes() != cell.qualifier {
239 return Err(Error::StorageError("unexpected cell qualifier".into()));
240 }
241
242 let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
244 .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
245
246 let path_info = PathInfo::try_from(path_info_proto)
247 .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
248
249 if path_info.store_path.digest() != &digest {
250 return Err(Error::StorageError("PathInfo has unexpected digest".into()));
251 }
252
253 Ok(Some(path_info))
254 }
255
256 #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))]
257 async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
258 let mut client = self.client.clone();
259 let path_info_key = derive_pathinfo_key(path_info.store_path.digest());
260
261 let data = proto::PathInfo::from(path_info.clone()).encode_to_vec();
262 if data.len() as u64 > CELL_SIZE_LIMIT {
263 return Err(Error::StorageError(
264 "PathInfo exceeds cell limit on Bigtable".into(),
265 ));
266 }
267
268 let resp = client
269 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
270 table_name: client.get_full_table_name(&self.params.table_name),
271 app_profile_id: self.params.app_profile_id.to_string(),
272 authorized_view_name: "".to_string(),
273 row_key: path_info_key.clone().into(),
274 predicate_filter: Some(bigtable_v2::RowFilter {
275 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
276 path_info_key.clone().into(),
277 )),
278 }),
279 true_mutations: vec![],
281 false_mutations: vec![
283 bigtable_v2::Mutation {
285 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
286 bigtable_v2::mutation::SetCell {
287 family_name: self.params.family_name.to_string(),
288 column_qualifier: path_info_key.clone().into(),
289 timestamp_micros: -1, value: data,
291 },
292 )),
293 },
294 ],
295 })
296 .await
297 .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
298
299 if resp.predicate_matched {
300 trace!("already existed")
301 }
302
303 Ok(path_info)
304 }
305
306 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
307 let mut client = self.client.clone();
308
309 let request = bigtable_v2::ReadRowsRequest {
310 app_profile_id: self.params.app_profile_id.to_string(),
311 table_name: client.get_full_table_name(&self.params.table_name),
312 filter: Some(bigtable_v2::RowFilter {
313 filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
314 self.params.family_name.to_string(),
315 )),
316 }),
317 ..Default::default()
318 };
319
320 let stream = try_stream! {
321 let mut rows = client
322 .stream_rows(request)
323 .await
324 .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?.enumerate();
325
326 use futures::stream::StreamExt;
327
328 while let Some((i, elem)) = rows.next().await {
329 let (row_key, mut cells) = elem.map_err(|e| Error::StorageError(format!("unable to stream row {}: {}", i, e)))?;
330 let span = Span::current();
331 span.record("row.key", bstr::BStr::new(&row_key).to_string());
332
333 let cell = cells
334 .pop()
335 .ok_or_else(|| Error::StorageError("found no cells".into()))?;
336
337 if !cells.is_empty() {
340
341 Err(Error::StorageError(
342 "more than one cell returned from bigtable".into(),
343 ))?
344 }
345
346 if row_key != cell.qualifier {
348 warn!("unexpected cell qualifier");
349 Err(Error::StorageError("unexpected cell qualifier".into()))?;
350 }
351
352 let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
354 .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
355
356 let path_info = PathInfo::try_from(path_info_proto).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
357
358 let exp_path_info_key = derive_pathinfo_key(path_info.store_path.digest());
359
360 if exp_path_info_key.as_bytes() != row_key.as_slice() {
361 Err(Error::StorageError("PathInfo has unexpected digest".into()))?
362 }
363
364
365 yield path_info
366 }
367 };
368
369 Box::pin(stream)
370 }
371}
372
373#[serde_as]
377#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
378pub struct BigtableParameters {
379 project_id: String,
380 instance_name: String,
381 #[serde(default)]
382 is_read_only: bool,
383 #[serde(default = "default_channel_size")]
384 channel_size: usize,
385
386 #[serde_as(as = "Option<DurationSeconds<String>>")]
387 #[serde(default = "default_timeout")]
388 timeout: Option<std::time::Duration>,
389 table_name: String,
390 family_name: String,
391
392 #[serde(default = "default_app_profile_id")]
393 app_profile_id: String,
394}
395
396impl BigtableParameters {
397 #[cfg(test)]
398 pub fn default_for_tests() -> Self {
399 Self {
400 project_id: "project-1".into(),
401 instance_name: "instance-1".into(),
402 is_read_only: false,
403 channel_size: default_channel_size(),
404 timeout: default_timeout(),
405 table_name: "table-1".into(),
406 family_name: "cf1".into(),
407 app_profile_id: default_app_profile_id(),
408 }
409 }
410}
411
412fn default_app_profile_id() -> String {
413 "default".to_owned()
414}
415
416fn default_channel_size() -> usize {
417 4
418}
419
420fn default_timeout() -> Option<std::time::Duration> {
421 Some(std::time::Duration::from_secs(4))
422}
423
424#[async_trait]
425impl ServiceBuilder for BigtableParameters {
426 type Output = dyn PathInfoService;
427 async fn build<'a>(
428 &'a self,
429 instance_name: &str,
430 _context: &CompositionContext,
431 ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
432 Ok(Arc::new(
433 BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?,
434 ))
435 }
436}
437
438impl TryFrom<url::Url> for BigtableParameters {
439 type Error = Box<dyn std::error::Error + Send + Sync>;
440 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
441 let instance_name = url
443 .host_str()
444 .ok_or_else(|| Error::StorageError("instance name missing".into()))?
445 .to_string();
446
447 url.query_pairs_mut()
449 .append_pair("instance_name", &instance_name);
450
451 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
452 .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
453
454 Ok(params)
455 }
456}