snix_castore/directoryservice/
bigtable.rs1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use bytes::Bytes;
3use data_encoding::HEXLOWER;
4use futures::stream::BoxStream;
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{
13 Directory, DirectoryPutter, DirectoryService, SimplePutter, utils::traverse_directory,
14};
15use crate::composition::{CompositionContext, ServiceBuilder};
16use crate::{B3Digest, Error, proto};
17
18const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22#[derive(Clone)]
39pub struct BigtableDirectoryService {
40 instance_name: String,
41 client: bigtable::BigTable,
42 params: BigtableParameters,
43
44 #[cfg(test)]
45 #[allow(dead_code)]
46 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
49}
50
51impl BigtableDirectoryService {
52 #[cfg(not(test))]
53 pub async fn connect(
54 instance_name: String,
55 params: BigtableParameters,
56 ) -> Result<Self, bigtable::Error> {
57 let connection = bigtable::BigTableConnection::new(
58 ¶ms.project_id,
59 ¶ms.instance_name,
60 params.is_read_only,
61 params.channel_size,
62 params.timeout,
63 )
64 .await?;
65
66 Ok(Self {
67 instance_name,
68 client: connection.client(),
69 params,
70 })
71 }
72
73 #[cfg(test)]
74 pub async fn connect(
75 instance_name: String,
76 params: BigtableParameters,
77 ) -> Result<Self, bigtable::Error> {
78 use std::time::Duration;
79
80 use async_process::{Command, Stdio};
81 use tempfile::TempDir;
82 use tokio_retry::{Retry, strategy::ExponentialBackoff};
83
84 let tmpdir = TempDir::new().unwrap();
85
86 let socket_path = tmpdir.path().join("cbtemulator.sock");
87
88 let emulator_process = Command::new("cbtemulator")
89 .arg("-address")
90 .arg(socket_path.clone())
91 .stderr(Stdio::piped())
92 .stdout(Stdio::piped())
93 .kill_on_drop(true)
94 .spawn()
95 .expect("failed to spawn emulator");
96
97 Retry::spawn(
98 ExponentialBackoff::from_millis(20)
99 .max_delay(Duration::from_secs(1))
100 .take(3),
101 || async {
102 if socket_path.exists() {
103 Ok(())
104 } else {
105 Err(())
106 }
107 },
108 )
109 .await
110 .expect("failed to wait for socket");
111
112 for cmd in &[
114 vec!["createtable", ¶ms.table_name],
115 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
116 ] {
117 Command::new("cbt")
118 .args({
119 let mut args = vec![
120 "-instance",
121 ¶ms.instance_name,
122 "-project",
123 ¶ms.project_id,
124 ];
125 args.extend_from_slice(cmd);
126 args
127 })
128 .env(
129 "BIGTABLE_EMULATOR_HOST",
130 format!("unix://{}", socket_path.to_string_lossy()),
131 )
132 .output()
133 .await
134 .expect("failed to run cbt setup command");
135 }
136
137 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
138 &format!("unix://{}", socket_path.to_string_lossy()),
139 ¶ms.project_id,
140 ¶ms.instance_name,
141 params.is_read_only,
142 params.timeout,
143 )?;
144
145 Ok(Self {
146 instance_name,
147 client: connection.client(),
148 params,
149 emulator: (tmpdir, emulator_process).into(),
150 })
151 }
152}
153
154fn derive_directory_key(digest: &B3Digest) -> String {
157 HEXLOWER.encode(digest.as_slice())
158}
159
160#[async_trait]
161impl DirectoryService for BigtableDirectoryService {
162 #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
163 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
164 let mut client = self.client.clone();
165 let directory_key = derive_directory_key(digest);
166
167 let request = bigtable_v2::ReadRowsRequest {
168 app_profile_id: self.params.app_profile_id.to_string(),
169 table_name: client.get_full_table_name(&self.params.table_name),
170 rows_limit: 1,
171 rows: Some(bigtable_v2::RowSet {
172 row_keys: vec![directory_key.clone().into()],
173 row_ranges: vec![],
174 }),
175 filter: Some(bigtable_v2::RowFilter {
178 filter: Some(bigtable_v2::row_filter::Filter::Chain(
179 bigtable_v2::row_filter::Chain {
180 filters: vec![
181 bigtable_v2::RowFilter {
182 filter: Some(
183 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
184 self.params.family_name.to_string(),
185 ),
186 ),
187 },
188 bigtable_v2::RowFilter {
189 filter: Some(
190 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
191 directory_key.clone().into(),
192 ),
193 ),
194 },
195 ],
196 },
197 )),
198 }),
199 ..Default::default()
200 };
201
202 let mut response = client
203 .read_rows(request)
204 .await
205 .map_err(|e| Error::StorageError(format!("unable to read rows: {e}")))?;
206
207 if response.len() != 1 {
208 if response.len() > 1 {
209 return Err(Error::StorageError(
211 "got more than one row from bigtable".into(),
212 ));
213 }
214 return Ok(None);
216 }
217
218 let (row_key, mut row_cells) = response.pop().unwrap();
219 if row_key != directory_key.as_bytes() {
220 return Err(Error::StorageError(
222 "got wrong row key from bigtable".into(),
223 ));
224 }
225
226 let row_cell = row_cells
227 .pop()
228 .ok_or_else(|| Error::StorageError("found no cells".into()))?;
229
230 if !row_cells.is_empty() {
233 return Err(Error::StorageError(
234 "more than one cell returned from bigtable".into(),
235 ));
236 }
237
238 if directory_key.as_bytes() != row_cell.qualifier {
241 return Err(Error::StorageError("unexpected cell qualifier".into()));
242 }
243
244 let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
246 if got_digest != *digest {
247 return Err(Error::StorageError(format!("invalid digest: {got_digest}")));
248 }
249
250 let directory = proto::Directory::decode(Bytes::from(row_cell.value))
252 .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {e}")))?
253 .try_into()
254 .map_err(|e| Error::StorageError(format!("invalid Directory message: {e}")))?;
255
256 Ok(Some(directory))
257 }
258
259 #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
260 async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
261 let directory_digest = directory.digest();
262 let mut client = self.client.clone();
263 let directory_key = derive_directory_key(&directory_digest);
264
265 let data = proto::Directory::from(directory).encode_to_vec();
266 if data.len() as u64 > CELL_SIZE_LIMIT {
267 return Err(Error::StorageError(
268 "Directory exceeds cell limit on Bigtable".into(),
269 ));
270 }
271
272 let resp = client
273 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
274 table_name: client.get_full_table_name(&self.params.table_name),
275 app_profile_id: self.params.app_profile_id.to_string(),
276 authorized_view_name: "".to_string(),
277 row_key: directory_key.clone().into(),
278 predicate_filter: Some(bigtable_v2::RowFilter {
279 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
280 directory_key.clone().into(),
281 )),
282 }),
283 true_mutations: vec![],
285 false_mutations: vec![
287 bigtable_v2::Mutation {
289 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
290 bigtable_v2::mutation::SetCell {
291 family_name: self.params.family_name.to_string(),
292 column_qualifier: directory_key.clone().into(),
293 timestamp_micros: -1, value: data,
295 },
296 )),
297 },
298 ],
299 })
300 .await
301 .map_err(|e| Error::StorageError(format!("unable to mutate rows: {e}")))?;
302
303 if resp.predicate_matched {
304 trace!("already existed")
305 }
306
307 Ok(directory_digest)
308 }
309
310 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
311 fn get_recursive(
312 &self,
313 root_directory_digest: &B3Digest,
314 ) -> BoxStream<'static, Result<Directory, Error>> {
315 traverse_directory(self.clone(), root_directory_digest)
316 }
317
318 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
319 fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + '_)> {
320 Box::new(SimplePutter::new(self))
321 }
322}
323
324#[serde_as]
328#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
329#[serde(deny_unknown_fields)]
330pub struct BigtableParameters {
331 project_id: String,
332 instance_name: String,
333 #[serde(default)]
334 is_read_only: bool,
335 #[serde(default = "default_channel_size")]
336 channel_size: usize,
337
338 #[serde_as(as = "Option<DurationSeconds<String>>")]
339 #[serde(default = "default_timeout")]
340 timeout: Option<std::time::Duration>,
341 table_name: String,
342 family_name: String,
343
344 #[serde(default = "default_app_profile_id")]
345 app_profile_id: String,
346}
347
348#[async_trait]
349impl ServiceBuilder for BigtableParameters {
350 type Output = dyn DirectoryService;
351 async fn build<'a>(
352 &'a self,
353 instance_name: &str,
354 _context: &CompositionContext,
355 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
356 Ok(Arc::new(
357 BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
358 ))
359 }
360}
361
362impl TryFrom<url::Url> for BigtableParameters {
363 type Error = Box<dyn std::error::Error + Send + Sync>;
364 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
365 let instance_name = url
367 .host_str()
368 .ok_or_else(|| Error::StorageError("instance name missing".into()))?
369 .to_string();
370
371 url.query_pairs_mut()
373 .append_pair("instance_name", &instance_name);
374
375 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
376 .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {e}")))?;
377
378 Ok(params)
379 }
380}
381
382fn default_app_profile_id() -> String {
383 "default".to_owned()
384}
385
386fn default_channel_size() -> usize {
387 4
388}
389
390fn default_timeout() -> Option<std::time::Duration> {
391 Some(std::time::Duration::from_secs(4))
392}