snix_castore/directoryservice/
bigtable.rs1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::traversal;
15use crate::{B3Digest, proto};
16
17const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21#[derive(Clone)]
38pub struct BigtableDirectoryService {
39 instance_name: String,
40 client: bigtable::BigTable,
41 params: BigtableParameters,
42
43 #[cfg(test)]
44 #[allow(dead_code)]
45 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
48}
49
50impl BigtableDirectoryService {
51 #[cfg(not(test))]
52 pub async fn connect(
53 instance_name: String,
54 params: BigtableParameters,
55 ) -> Result<Self, bigtable::Error> {
56 let connection = bigtable::BigTableConnection::new(
57 ¶ms.project_id,
58 ¶ms.instance_name,
59 params.is_read_only,
60 params.channel_size,
61 params.timeout,
62 )
63 .await?;
64
65 Ok(Self {
66 instance_name,
67 client: connection.client(),
68 params,
69 })
70 }
71
72 #[cfg(test)]
73 pub async fn connect(
74 instance_name: String,
75 params: BigtableParameters,
76 ) -> Result<Self, bigtable::Error> {
77 use std::time::Duration;
78
79 use async_process::{Command, Stdio};
80 use tempfile::TempDir;
81 use tokio_retry::{Retry, strategy::ExponentialBackoff};
82
83 let tmpdir = TempDir::new().unwrap();
84
85 let socket_path = tmpdir.path().join("cbtemulator.sock");
86
87 let emulator_process = Command::new("cbtemulator")
88 .arg("-address")
89 .arg(socket_path.clone())
90 .stderr(Stdio::piped())
91 .stdout(Stdio::piped())
92 .kill_on_drop(true)
93 .spawn()
94 .expect("failed to spawn emulator");
95
96 Retry::spawn(
97 ExponentialBackoff::from_millis(20)
98 .max_delay(Duration::from_secs(1))
99 .take(3),
100 || async {
101 if socket_path.exists() {
102 Ok(())
103 } else {
104 Err(())
105 }
106 },
107 )
108 .await
109 .expect("failed to wait for socket");
110
111 for cmd in &[
113 vec!["createtable", ¶ms.table_name],
114 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
115 ] {
116 Command::new("cbt")
117 .args({
118 let mut args = vec![
119 "-instance",
120 ¶ms.instance_name,
121 "-project",
122 ¶ms.project_id,
123 ];
124 args.extend_from_slice(cmd);
125 args
126 })
127 .env(
128 "BIGTABLE_EMULATOR_HOST",
129 format!("unix://{}", socket_path.to_string_lossy()),
130 )
131 .output()
132 .await
133 .expect("failed to run cbt setup command");
134 }
135
136 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
137 &format!("unix://{}", socket_path.to_string_lossy()),
138 ¶ms.project_id,
139 ¶ms.instance_name,
140 params.is_read_only,
141 1,
142 params.timeout,
143 )?;
144
145 Ok(Self {
146 instance_name,
147 client: connection.client(),
148 params,
149 emulator: (tmpdir, emulator_process).into(),
150 })
151 }
152}
153
154fn derive_directory_key(digest: &B3Digest) -> String {
157 HEXLOWER.encode(digest.as_slice())
158}
159
160#[async_trait]
161impl DirectoryService for BigtableDirectoryService {
162 #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
163 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
164 let mut client = self.client.clone();
165 let directory_key = derive_directory_key(digest);
166
167 let request = bigtable_v2::ReadRowsRequest {
168 app_profile_id: self.params.app_profile_id.to_string(),
169 table_name: client.get_full_table_name(&self.params.table_name),
170 rows_limit: 1,
171 rows: Some(bigtable_v2::RowSet {
172 row_keys: vec![directory_key.clone().into()],
173 row_ranges: vec![],
174 }),
175 filter: Some(bigtable_v2::RowFilter {
178 filter: Some(bigtable_v2::row_filter::Filter::Chain(
179 bigtable_v2::row_filter::Chain {
180 filters: vec![
181 bigtable_v2::RowFilter {
182 filter: Some(
183 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
184 self.params.family_name.to_string(),
185 ),
186 ),
187 },
188 bigtable_v2::RowFilter {
189 filter: Some(
190 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
191 directory_key.clone().into(),
192 ),
193 ),
194 },
195 ],
196 },
197 )),
198 }),
199 ..Default::default()
200 };
201
202 let mut response = client
203 .read_rows(request)
204 .await
205 .map_err(|e| Error::BigTable {
206 msg: "reading rows",
207 source: e,
208 })?;
209
210 if response.len() != 1 {
211 if response.len() > 1 {
212 Err(Error::UnexpectedDataReturned("got more than one row"))?
214 }
215 return Ok(None);
217 }
218
219 let (row_key, mut row_cells) = response.pop().unwrap();
220 if row_key != directory_key.as_bytes() {
221 Err(Error::UnexpectedDataReturned("got wrong row key"))?
223 }
224
225 let row_cell = row_cells
226 .pop()
227 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
228
229 if !row_cells.is_empty() {
232 Err(Error::UnexpectedDataReturned("got more than one cell"))?;
233 }
234
235 if directory_key.as_bytes() != row_cell.qualifier {
238 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
239 }
240
241 let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
243 if got_digest != *digest {
244 Err(Error::DirectoryUnexpectedDigest)?
245 }
246
247 let directory_proto =
249 proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
250 let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
251
252 Ok(Some(directory))
253 }
254
255 #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
256 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
257 let directory_digest = directory.digest();
258 let mut client = self.client.clone();
259 let directory_key = derive_directory_key(&directory_digest);
260
261 let data = proto::Directory::from(directory).encode_to_vec();
262 if data.len() as u64 > CELL_SIZE_LIMIT {
263 Err(Error::DirectoryTooBig)?;
264 }
265
266 let resp = client
267 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
268 table_name: client.get_full_table_name(&self.params.table_name),
269 app_profile_id: self.params.app_profile_id.to_string(),
270 authorized_view_name: "".to_string(),
271 row_key: directory_key.clone().into(),
272 predicate_filter: Some(bigtable_v2::RowFilter {
273 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
274 directory_key.clone().into(),
275 )),
276 }),
277 true_mutations: vec![],
279 false_mutations: vec![
281 bigtable_v2::Mutation {
283 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
284 bigtable_v2::mutation::SetCell {
285 family_name: self.params.family_name.to_string(),
286 column_qualifier: directory_key.clone().into(),
287 timestamp_micros: -1, value: data,
289 },
290 )),
291 },
292 ],
293 })
294 .await
295 .map_err(|e| Error::BigTable {
296 msg: "mutating rows",
297 source: e,
298 })?;
299
300 if resp.predicate_matched {
301 trace!("already existed")
302 }
303
304 Ok(directory_digest)
305 }
306
307 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
308 fn get_recursive(
309 &self,
310 root_directory_digest: &B3Digest,
311 ) -> BoxStream<'static, Result<Directory, super::Error>> {
312 let svc = self.clone();
313 super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
314 let svc = svc.clone();
315 async move { svc.get(&digest).await }
316 })
317 .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
318 .err_into()
319 .boxed()
320 }
321
322 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
323 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
324 Box::new(SimplePutter::new(self))
325 }
326}
327
328#[derive(thiserror::Error, Debug)]
329pub enum Error {
330 #[error("wrong arguments: {0}")]
331 WrongConfig(&'static str),
332 #[error("serde-qs error: {0}")]
333 SerdeQS(#[from] serde_qs::Error),
334
335 #[error("failed to decode protobuf: {0}")]
336 ProtobufDecode(#[from] prost::DecodeError),
337 #[error("failed to validate directory: {0}")]
338 DirectoryValidation(#[from] crate::DirectoryError),
339 #[error("Directory has unexpected digest")]
340 DirectoryUnexpectedDigest,
341 #[error("Directory exceeds cell limit on Bigtable")]
342 DirectoryTooBig,
343
344 #[error("failure during directory traversal")]
345 DirectoryTraversal(#[source] traversal::Error),
346
347 #[error("bigtable returned unexpected data: {0}")]
350 UnexpectedDataReturned(&'static str),
351 #[error("bigtable error occurred while {msg}: {source}")]
352 BigTable {
353 msg: &'static str,
354 #[source]
355 source: bigtable::Error,
356 },
357}
358
359#[serde_as]
363#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
364#[serde(deny_unknown_fields)]
365pub struct BigtableParameters {
366 project_id: String,
367 instance_name: String,
368 #[serde(default)]
369 is_read_only: bool,
370 #[serde(default = "default_channel_size")]
371 channel_size: usize,
372
373 #[serde_as(as = "Option<DurationSeconds<String>>")]
374 #[serde(default = "default_timeout")]
375 timeout: Option<std::time::Duration>,
376 table_name: String,
377 family_name: String,
378
379 #[serde(default = "default_app_profile_id")]
380 app_profile_id: String,
381}
382
383fn default_app_profile_id() -> String {
384 "default".to_owned()
385}
386
387fn default_channel_size() -> usize {
388 4
389}
390
391fn default_timeout() -> Option<std::time::Duration> {
392 Some(std::time::Duration::from_secs(4))
393}
394
395#[async_trait]
396impl ServiceBuilder for BigtableParameters {
397 type Output = dyn DirectoryService;
398 async fn build<'a>(
399 &'a self,
400 instance_name: &str,
401 _context: &CompositionContext,
402 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
403 Ok(Arc::new(
404 BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
405 ))
406 }
407}
408
409impl TryFrom<url::Url> for BigtableParameters {
410 type Error = Box<dyn std::error::Error + Send + Sync>;
411 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
412 let instance_name = url
414 .host_str()
415 .ok_or_else(|| Error::WrongConfig("instance name missing"))?
416 .to_owned();
417
418 url.query_pairs_mut()
420 .append_pair("instance_name", &instance_name);
421
422 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
423
424 Ok(params)
425 }
426}