snix_castore/directoryservice/
bigtable.rs1use bigtable_rs::bigtable;
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use googleapis_tonic_google_bigtable_v2::google::bigtable::v2 as bigtable_v2;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use serde_with::{DurationSeconds, serde_as};
9use std::sync::Arc;
10use tonic::async_trait;
11use tracing::{instrument, trace, warn};
12
13use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
14use crate::composition::{CompositionContext, ServiceBuilder};
15use crate::directoryservice::traversal;
16use crate::{B3Digest, proto};
17
18const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22#[derive(Clone)]
39pub struct BigtableDirectoryService {
40 instance_name: String,
41 client: bigtable::BigTable,
42 params: BigtableParameters,
43
44 #[cfg(test)]
45 #[allow(dead_code)]
46 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
49}
50
51impl BigtableDirectoryService {
52 #[cfg(not(test))]
53 pub async fn connect(
54 instance_name: String,
55 params: BigtableParameters,
56 ) -> Result<Self, bigtable::Error> {
57 let connection = bigtable::BigTableConnection::new(
58 ¶ms.project_id,
59 ¶ms.instance_name,
60 params.is_read_only,
61 params.channel_size,
62 params.timeout,
63 )
64 .await?;
65
66 Ok(Self {
67 instance_name,
68 client: connection.client(),
69 params,
70 })
71 }
72
73 #[cfg(test)]
74 pub async fn connect(
75 instance_name: String,
76 params: BigtableParameters,
77 ) -> Result<Self, bigtable::Error> {
78 use std::time::Duration;
79
80 use async_process::{Command, Stdio};
81 use tempfile::TempDir;
82 use tokio_retry::{Retry, strategy::ExponentialBackoff};
83
84 let tmpdir = TempDir::new().unwrap();
85
86 let socket_path = tmpdir.path().join("cbtemulator.sock");
87
88 let emulator_process = Command::new("cbtemulator")
89 .arg("-address")
90 .arg(socket_path.clone())
91 .stderr(Stdio::piped())
92 .stdout(Stdio::piped())
93 .kill_on_drop(true)
94 .spawn()
95 .expect("failed to spawn emulator");
96
97 Retry::spawn(
98 ExponentialBackoff::from_millis(20)
99 .max_delay(Duration::from_secs(1))
100 .take(3),
101 || async {
102 if socket_path.exists() {
103 Ok(())
104 } else {
105 Err(())
106 }
107 },
108 )
109 .await
110 .expect("failed to wait for socket");
111
112 for cmd in &[
114 vec!["createtable", ¶ms.table_name],
115 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
116 ] {
117 Command::new("cbt")
118 .args({
119 let mut args = vec![
120 "-instance",
121 ¶ms.instance_name,
122 "-project",
123 ¶ms.project_id,
124 ];
125 args.extend_from_slice(cmd);
126 args
127 })
128 .env(
129 "BIGTABLE_EMULATOR_HOST",
130 format!("unix://{}", socket_path.to_string_lossy()),
131 )
132 .output()
133 .await
134 .expect("failed to run cbt setup command");
135 }
136
137 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
138 &format!("unix://{}", socket_path.to_string_lossy()),
139 ¶ms.project_id,
140 ¶ms.instance_name,
141 params.is_read_only,
142 1,
143 params.timeout,
144 )?;
145
146 Ok(Self {
147 instance_name,
148 client: connection.client(),
149 params,
150 emulator: (tmpdir, emulator_process).into(),
151 })
152 }
153}
154
155fn derive_directory_key(digest: &B3Digest) -> String {
158 HEXLOWER.encode(digest.as_slice())
159}
160
161#[async_trait]
162impl DirectoryService for BigtableDirectoryService {
163 #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
164 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
165 let mut client = self.client.clone();
166 let directory_key = derive_directory_key(digest);
167
168 let request = bigtable_v2::ReadRowsRequest {
169 app_profile_id: self.params.app_profile_id.to_string(),
170 table_name: client.get_full_table_name(&self.params.table_name),
171 rows_limit: 1,
172 rows: Some(bigtable_v2::RowSet {
173 row_keys: vec![directory_key.clone().into()],
174 row_ranges: vec![],
175 }),
176 filter: Some(bigtable_v2::RowFilter {
179 filter: Some(bigtable_v2::row_filter::Filter::Chain(
180 bigtable_v2::row_filter::Chain {
181 filters: vec![
182 bigtable_v2::RowFilter {
183 filter: Some(
184 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
185 self.params.family_name.to_string(),
186 ),
187 ),
188 },
189 bigtable_v2::RowFilter {
190 filter: Some(
191 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
192 directory_key.clone().into(),
193 ),
194 ),
195 },
196 ],
197 },
198 )),
199 }),
200 ..Default::default()
201 };
202
203 let mut response = client
204 .read_rows(request)
205 .await
206 .map_err(|e| Error::BigTable {
207 msg: "reading rows",
208 source: e,
209 })?;
210
211 if response.len() != 1 {
212 if response.len() > 1 {
213 Err(Error::UnexpectedDataReturned("got more than one row"))?
215 }
216 return Ok(None);
218 }
219
220 let (row_key, mut row_cells) = response.pop().unwrap();
221 if row_key != directory_key.as_bytes() {
222 Err(Error::UnexpectedDataReturned("got wrong row key"))?
224 }
225
226 let row_cell = row_cells
227 .pop()
228 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
229
230 if !row_cells.is_empty() {
233 Err(Error::UnexpectedDataReturned("got more than one cell"))?;
234 }
235
236 if directory_key.as_bytes() != row_cell.qualifier {
239 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
240 }
241
242 let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
244 if got_digest != *digest {
245 Err(Error::DirectoryUnexpectedDigest)?
246 }
247
248 let directory_proto =
250 proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
251 let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
252
253 Ok(Some(directory))
254 }
255
256 #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
257 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
258 let directory_digest = directory.digest();
259 let mut client = self.client.clone();
260 let directory_key = derive_directory_key(&directory_digest);
261
262 let data = proto::Directory::from(directory).encode_to_vec();
263 if data.len() as u64 > CELL_SIZE_LIMIT {
264 Err(Error::DirectoryTooBig)?;
265 }
266
267 let resp = client
268 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
269 table_name: client.get_full_table_name(&self.params.table_name),
270 app_profile_id: self.params.app_profile_id.to_string(),
271 authorized_view_name: "".to_string(),
272 row_key: directory_key.clone().into(),
273 predicate_filter: Some(bigtable_v2::RowFilter {
274 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
275 directory_key.clone().into(),
276 )),
277 }),
278 true_mutations: vec![],
280 false_mutations: vec![
282 bigtable_v2::Mutation {
284 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
285 bigtable_v2::mutation::SetCell {
286 family_name: self.params.family_name.to_string(),
287 column_qualifier: directory_key.clone().into(),
288 timestamp_micros: -1, value: data,
290 },
291 )),
292 },
293 ],
294 })
295 .await
296 .map_err(|e| Error::BigTable {
297 msg: "mutating rows",
298 source: e,
299 })?;
300
301 if resp.predicate_matched {
302 trace!("already existed")
303 }
304
305 Ok(directory_digest)
306 }
307
308 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
309 fn get_recursive(
310 &self,
311 root_directory_digest: &B3Digest,
312 ) -> BoxStream<'static, Result<Directory, super::Error>> {
313 let svc = self.clone();
314 super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
315 let svc = svc.clone();
316 async move { svc.get(&digest).await }
317 })
318 .map_err(Error::DirectoryTraversal)
319 .err_into()
320 .boxed()
321 }
322
323 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
324 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
325 Box::new(SimplePutter::new(self))
326 }
327}
328
329#[derive(thiserror::Error, Debug)]
330pub enum Error {
331 #[error("wrong arguments: {0}")]
332 WrongConfig(&'static str),
333 #[error("serde-qs error: {0}")]
334 SerdeQS(#[from] serde_qs::Error),
335
336 #[error("failed to decode protobuf: {0}")]
337 ProtobufDecode(#[from] prost::DecodeError),
338 #[error("failed to validate directory: {0}")]
339 DirectoryValidation(#[from] crate::DirectoryError),
340 #[error("Directory has unexpected digest")]
341 DirectoryUnexpectedDigest,
342 #[error("Directory exceeds cell limit on Bigtable")]
343 DirectoryTooBig,
344
345 #[error("failure during directory traversal")]
346 DirectoryTraversal(#[source] traversal::Error),
347
348 #[error("bigtable returned unexpected data: {0}")]
351 UnexpectedDataReturned(&'static str),
352 #[error("bigtable error occurred while {msg}: {source}")]
353 BigTable {
354 msg: &'static str,
355 #[source]
356 source: bigtable::Error,
357 },
358}
359
360impl From<Error> for super::Error {
361 fn from(value: Error) -> Self {
362 Self(Box::new(value))
363 }
364}
365
366#[serde_as]
370#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
371#[serde(deny_unknown_fields)]
372pub struct BigtableParameters {
373 project_id: String,
374 instance_name: String,
375 #[serde(default)]
376 is_read_only: bool,
377 #[serde(default = "default_channel_size")]
378 channel_size: usize,
379
380 #[serde_as(as = "Option<DurationSeconds<String>>")]
381 #[serde(default = "default_timeout")]
382 timeout: Option<std::time::Duration>,
383 table_name: String,
384 family_name: String,
385
386 #[serde(default = "default_app_profile_id")]
387 app_profile_id: String,
388}
389
390fn default_app_profile_id() -> String {
391 "default".to_owned()
392}
393
394fn default_channel_size() -> usize {
395 4
396}
397
398fn default_timeout() -> Option<std::time::Duration> {
399 Some(std::time::Duration::from_secs(4))
400}
401
402#[async_trait]
403impl ServiceBuilder for BigtableParameters {
404 type Output = dyn DirectoryService;
405 async fn build<'a>(
406 &'a self,
407 instance_name: &str,
408 _context: &CompositionContext,
409 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
410 Ok(Arc::new(
411 BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
412 ))
413 }
414}
415
416impl TryFrom<url::Url> for BigtableParameters {
417 type Error = Box<dyn std::error::Error + Send + Sync>;
418 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
419 let instance_name = url
421 .host_str()
422 .ok_or_else(|| Error::WrongConfig("instance name missing"))?
423 .to_owned();
424
425 url.query_pairs_mut()
427 .append_pair("instance_name", &instance_name);
428
429 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
430
431 Ok(params)
432 }
433}