snix_castore/directoryservice/
bigtable.rs1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use data_encoding::HEXLOWER;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
13use crate::composition::{CompositionContext, ServiceBuilder};
14use crate::directoryservice::traversal;
15use crate::{B3Digest, proto};
16
17const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
20
21#[derive(Clone)]
38pub struct BigtableDirectoryService {
39 instance_name: String,
40 client: bigtable::BigTable,
41 params: BigtableParameters,
42
43 #[cfg(test)]
44 #[allow(dead_code)]
45 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
48}
49
50impl BigtableDirectoryService {
51 #[cfg(not(test))]
52 pub async fn connect(
53 instance_name: String,
54 params: BigtableParameters,
55 ) -> Result<Self, bigtable::Error> {
56 let connection = bigtable::BigTableConnection::new(
57 ¶ms.project_id,
58 ¶ms.instance_name,
59 params.is_read_only,
60 params.channel_size,
61 params.timeout,
62 )
63 .await?;
64
65 Ok(Self {
66 instance_name,
67 client: connection.client(),
68 params,
69 })
70 }
71
72 #[cfg(test)]
73 pub async fn connect(
74 instance_name: String,
75 params: BigtableParameters,
76 ) -> Result<Self, bigtable::Error> {
77 use std::time::Duration;
78
79 use async_process::{Command, Stdio};
80 use tempfile::TempDir;
81 use tokio_retry::{Retry, strategy::ExponentialBackoff};
82
83 let tmpdir = TempDir::new().unwrap();
84
85 let socket_path = tmpdir.path().join("cbtemulator.sock");
86
87 let emulator_process = Command::new("cbtemulator")
88 .arg("-address")
89 .arg(socket_path.clone())
90 .stderr(Stdio::piped())
91 .stdout(Stdio::piped())
92 .kill_on_drop(true)
93 .spawn()
94 .expect("failed to spawn emulator");
95
96 Retry::spawn(
97 ExponentialBackoff::from_millis(20)
98 .max_delay(Duration::from_secs(1))
99 .take(3),
100 || async {
101 if socket_path.exists() {
102 Ok(())
103 } else {
104 Err(())
105 }
106 },
107 )
108 .await
109 .expect("failed to wait for socket");
110
111 for cmd in &[
113 vec!["createtable", ¶ms.table_name],
114 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
115 ] {
116 Command::new("cbt")
117 .args({
118 let mut args = vec![
119 "-instance",
120 ¶ms.instance_name,
121 "-project",
122 ¶ms.project_id,
123 ];
124 args.extend_from_slice(cmd);
125 args
126 })
127 .env(
128 "BIGTABLE_EMULATOR_HOST",
129 format!("unix://{}", socket_path.to_string_lossy()),
130 )
131 .output()
132 .await
133 .expect("failed to run cbt setup command");
134 }
135
136 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
137 &format!("unix://{}", socket_path.to_string_lossy()),
138 ¶ms.project_id,
139 ¶ms.instance_name,
140 params.is_read_only,
141 params.timeout,
142 )?;
143
144 Ok(Self {
145 instance_name,
146 client: connection.client(),
147 params,
148 emulator: (tmpdir, emulator_process).into(),
149 })
150 }
151}
152
153fn derive_directory_key(digest: &B3Digest) -> String {
156 HEXLOWER.encode(digest.as_slice())
157}
158
159#[async_trait]
160impl DirectoryService for BigtableDirectoryService {
161 #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
162 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
163 let mut client = self.client.clone();
164 let directory_key = derive_directory_key(digest);
165
166 let request = bigtable_v2::ReadRowsRequest {
167 app_profile_id: self.params.app_profile_id.to_string(),
168 table_name: client.get_full_table_name(&self.params.table_name),
169 rows_limit: 1,
170 rows: Some(bigtable_v2::RowSet {
171 row_keys: vec![directory_key.clone().into()],
172 row_ranges: vec![],
173 }),
174 filter: Some(bigtable_v2::RowFilter {
177 filter: Some(bigtable_v2::row_filter::Filter::Chain(
178 bigtable_v2::row_filter::Chain {
179 filters: vec![
180 bigtable_v2::RowFilter {
181 filter: Some(
182 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
183 self.params.family_name.to_string(),
184 ),
185 ),
186 },
187 bigtable_v2::RowFilter {
188 filter: Some(
189 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
190 directory_key.clone().into(),
191 ),
192 ),
193 },
194 ],
195 },
196 )),
197 }),
198 ..Default::default()
199 };
200
201 let mut response = client
202 .read_rows(request)
203 .await
204 .map_err(|e| Error::BigTable {
205 msg: "reading rows",
206 source: e,
207 })?;
208
209 if response.len() != 1 {
210 if response.len() > 1 {
211 Err(Error::UnexpectedDataReturned("got more than one row"))?
213 }
214 return Ok(None);
216 }
217
218 let (row_key, mut row_cells) = response.pop().unwrap();
219 if row_key != directory_key.as_bytes() {
220 Err(Error::UnexpectedDataReturned("got wrong row key"))?
222 }
223
224 let row_cell = row_cells
225 .pop()
226 .ok_or_else(|| Error::UnexpectedDataReturned("found no cells"))?;
227
228 if !row_cells.is_empty() {
231 Err(Error::UnexpectedDataReturned("got more than one cell"))?;
232 }
233
234 if directory_key.as_bytes() != row_cell.qualifier {
237 Err(Error::UnexpectedDataReturned("unexpected cell qualifier"))?
238 }
239
240 let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
242 if got_digest != *digest {
243 Err(Error::DirectoryUnexpectedDigest)?
244 }
245
246 let directory_proto =
248 proto::Directory::decode(row_cell.value.as_slice()).map_err(Error::ProtobufDecode)?;
249 let directory = Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)?;
250
251 Ok(Some(directory))
252 }
253
254 #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
255 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
256 let directory_digest = directory.digest();
257 let mut client = self.client.clone();
258 let directory_key = derive_directory_key(&directory_digest);
259
260 let data = proto::Directory::from(directory).encode_to_vec();
261 if data.len() as u64 > CELL_SIZE_LIMIT {
262 Err(Error::DirectoryTooBig)?;
263 }
264
265 let resp = client
266 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
267 table_name: client.get_full_table_name(&self.params.table_name),
268 app_profile_id: self.params.app_profile_id.to_string(),
269 authorized_view_name: "".to_string(),
270 row_key: directory_key.clone().into(),
271 predicate_filter: Some(bigtable_v2::RowFilter {
272 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
273 directory_key.clone().into(),
274 )),
275 }),
276 true_mutations: vec![],
278 false_mutations: vec![
280 bigtable_v2::Mutation {
282 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
283 bigtable_v2::mutation::SetCell {
284 family_name: self.params.family_name.to_string(),
285 column_qualifier: directory_key.clone().into(),
286 timestamp_micros: -1, value: data,
288 },
289 )),
290 },
291 ],
292 })
293 .await
294 .map_err(|e| Error::BigTable {
295 msg: "mutating rows",
296 source: e,
297 })?;
298
299 if resp.predicate_matched {
300 trace!("already existed")
301 }
302
303 Ok(directory_digest)
304 }
305
306 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
307 fn get_recursive(
308 &self,
309 root_directory_digest: &B3Digest,
310 ) -> BoxStream<'static, Result<Directory, super::Error>> {
311 let svc = self.clone();
312 super::traversal::root_to_leaves(*root_directory_digest, move |digest| {
313 let svc = svc.clone();
314 async move { svc.get(&digest).await }
315 })
316 .map_err(|err| Box::new(Error::DirectoryTraversal(err)))
317 .err_into()
318 .boxed()
319 }
320
321 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
322 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_> {
323 Box::new(SimplePutter::new(self))
324 }
325}
326
327#[derive(thiserror::Error, Debug)]
328pub enum Error {
329 #[error("wrong arguments: {0}")]
330 WrongConfig(&'static str),
331 #[error("serde-qs error: {0}")]
332 SerdeQS(#[from] serde_qs::Error),
333
334 #[error("failed to decode protobuf: {0}")]
335 ProtobufDecode(#[from] prost::DecodeError),
336 #[error("failed to validate directory: {0}")]
337 DirectoryValidation(#[from] crate::DirectoryError),
338 #[error("Directory has unexpected digest")]
339 DirectoryUnexpectedDigest,
340 #[error("Directory exceeds cell limit on Bigtable")]
341 DirectoryTooBig,
342
343 #[error("failure during directory traversal")]
344 DirectoryTraversal(#[source] traversal::Error),
345
346 #[error("bigtable returned unexpected data: {0}")]
349 UnexpectedDataReturned(&'static str),
350 #[error("bigtable error occured while {msg}: {source}")]
351 BigTable {
352 msg: &'static str,
353 #[source]
354 source: bigtable::Error,
355 },
356}
357
358#[serde_as]
362#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
363#[serde(deny_unknown_fields)]
364pub struct BigtableParameters {
365 project_id: String,
366 instance_name: String,
367 #[serde(default)]
368 is_read_only: bool,
369 #[serde(default = "default_channel_size")]
370 channel_size: usize,
371
372 #[serde_as(as = "Option<DurationSeconds<String>>")]
373 #[serde(default = "default_timeout")]
374 timeout: Option<std::time::Duration>,
375 table_name: String,
376 family_name: String,
377
378 #[serde(default = "default_app_profile_id")]
379 app_profile_id: String,
380}
381
382fn default_app_profile_id() -> String {
383 "default".to_owned()
384}
385
386fn default_channel_size() -> usize {
387 4
388}
389
390fn default_timeout() -> Option<std::time::Duration> {
391 Some(std::time::Duration::from_secs(4))
392}
393
394#[async_trait]
395impl ServiceBuilder for BigtableParameters {
396 type Output = dyn DirectoryService;
397 async fn build<'a>(
398 &'a self,
399 instance_name: &str,
400 _context: &CompositionContext,
401 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
402 Ok(Arc::new(
403 BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
404 ))
405 }
406}
407
408impl TryFrom<url::Url> for BigtableParameters {
409 type Error = Box<dyn std::error::Error + Send + Sync>;
410 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
411 let instance_name = url
413 .host_str()
414 .ok_or_else(|| Error::WrongConfig("instance name missing"))?
415 .to_owned();
416
417 url.query_pairs_mut()
419 .append_pair("instance_name", &instance_name);
420
421 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())?;
422
423 Ok(params)
424 }
425}