snix_castore/directoryservice/
bigtable.rs1use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
2use bytes::Bytes;
3use data_encoding::HEXLOWER;
4use futures::stream::BoxStream;
5use prost::Message;
6use serde::{Deserialize, Serialize};
7use serde_with::{DurationSeconds, serde_as};
8use std::sync::Arc;
9use tonic::async_trait;
10use tracing::{instrument, trace, warn};
11
12use super::{
13 Directory, DirectoryPutter, DirectoryService, SimplePutter, utils::traverse_directory,
14};
15use crate::composition::{CompositionContext, ServiceBuilder};
16use crate::{B3Digest, Error, proto};
17
18const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
21
22#[derive(Clone)]
39pub struct BigtableDirectoryService {
40 instance_name: String,
41 client: bigtable::BigTable,
42 params: BigtableParameters,
43
44 #[cfg(test)]
45 #[allow(dead_code)]
46 emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
49}
50
51impl BigtableDirectoryService {
52 #[cfg(not(test))]
53 pub async fn connect(
54 instance_name: String,
55 params: BigtableParameters,
56 ) -> Result<Self, bigtable::Error> {
57 let connection = bigtable::BigTableConnection::new(
58 ¶ms.project_id,
59 ¶ms.instance_name,
60 params.is_read_only,
61 params.channel_size,
62 params.timeout,
63 )
64 .await?;
65
66 Ok(Self {
67 instance_name,
68 client: connection.client(),
69 params,
70 })
71 }
72
73 #[cfg(test)]
74 pub async fn connect(
75 instance_name: String,
76 params: BigtableParameters,
77 ) -> Result<Self, bigtable::Error> {
78 use std::time::Duration;
79
80 use async_process::{Command, Stdio};
81 use tempfile::TempDir;
82 use tokio_retry::{Retry, strategy::ExponentialBackoff};
83
84 let tmpdir = TempDir::new().unwrap();
85
86 let socket_path = tmpdir.path().join("cbtemulator.sock");
87
88 let emulator_process = Command::new("cbtemulator")
89 .arg("-address")
90 .arg(socket_path.clone())
91 .stderr(Stdio::piped())
92 .stdout(Stdio::piped())
93 .kill_on_drop(true)
94 .spawn()
95 .expect("failed to spawn emulator");
96
97 Retry::spawn(
98 ExponentialBackoff::from_millis(20)
99 .max_delay(Duration::from_secs(1))
100 .take(3),
101 || async {
102 if socket_path.exists() {
103 Ok(())
104 } else {
105 Err(())
106 }
107 },
108 )
109 .await
110 .expect("failed to wait for socket");
111
112 for cmd in &[
114 vec!["createtable", ¶ms.table_name],
115 vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
116 ] {
117 Command::new("cbt")
118 .args({
119 let mut args = vec![
120 "-instance",
121 ¶ms.instance_name,
122 "-project",
123 ¶ms.project_id,
124 ];
125 args.extend_from_slice(cmd);
126 args
127 })
128 .env(
129 "BIGTABLE_EMULATOR_HOST",
130 format!("unix://{}", socket_path.to_string_lossy()),
131 )
132 .output()
133 .await
134 .expect("failed to run cbt setup command");
135 }
136
137 let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
138 &format!("unix://{}", socket_path.to_string_lossy()),
139 ¶ms.project_id,
140 ¶ms.instance_name,
141 params.is_read_only,
142 params.timeout,
143 )?;
144
145 Ok(Self {
146 instance_name,
147 client: connection.client(),
148 params,
149 emulator: (tmpdir, emulator_process).into(),
150 })
151 }
152}
153
154fn derive_directory_key(digest: &B3Digest) -> String {
157 HEXLOWER.encode(digest.as_slice())
158}
159
160#[async_trait]
161impl DirectoryService for BigtableDirectoryService {
162 #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
163 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
164 let mut client = self.client.clone();
165 let directory_key = derive_directory_key(digest);
166
167 let request = bigtable_v2::ReadRowsRequest {
168 app_profile_id: self.params.app_profile_id.to_string(),
169 table_name: client.get_full_table_name(&self.params.table_name),
170 rows_limit: 1,
171 rows: Some(bigtable_v2::RowSet {
172 row_keys: vec![directory_key.clone().into()],
173 row_ranges: vec![],
174 }),
175 filter: Some(bigtable_v2::RowFilter {
178 filter: Some(bigtable_v2::row_filter::Filter::Chain(
179 bigtable_v2::row_filter::Chain {
180 filters: vec![
181 bigtable_v2::RowFilter {
182 filter: Some(
183 bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
184 self.params.family_name.to_string(),
185 ),
186 ),
187 },
188 bigtable_v2::RowFilter {
189 filter: Some(
190 bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
191 directory_key.clone().into(),
192 ),
193 ),
194 },
195 ],
196 },
197 )),
198 }),
199 ..Default::default()
200 };
201
202 let mut response = client
203 .read_rows(request)
204 .await
205 .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
206
207 if response.len() != 1 {
208 if response.len() > 1 {
209 return Err(Error::StorageError(
211 "got more than one row from bigtable".into(),
212 ));
213 }
214 return Ok(None);
216 }
217
218 let (row_key, mut row_cells) = response.pop().unwrap();
219 if row_key != directory_key.as_bytes() {
220 return Err(Error::StorageError(
222 "got wrong row key from bigtable".into(),
223 ));
224 }
225
226 let row_cell = row_cells
227 .pop()
228 .ok_or_else(|| Error::StorageError("found no cells".into()))?;
229
230 if !row_cells.is_empty() {
233 return Err(Error::StorageError(
234 "more than one cell returned from bigtable".into(),
235 ));
236 }
237
238 if directory_key.as_bytes() != row_cell.qualifier {
241 return Err(Error::StorageError("unexpected cell qualifier".into()));
242 }
243
244 let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
246 if got_digest != *digest {
247 return Err(Error::StorageError(format!(
248 "invalid digest: {}",
249 got_digest
250 )));
251 }
252
253 let directory = proto::Directory::decode(Bytes::from(row_cell.value))
255 .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?
256 .try_into()
257 .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
258
259 Ok(Some(directory))
260 }
261
262 #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
263 async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
264 let directory_digest = directory.digest();
265 let mut client = self.client.clone();
266 let directory_key = derive_directory_key(&directory_digest);
267
268 let data = proto::Directory::from(directory).encode_to_vec();
269 if data.len() as u64 > CELL_SIZE_LIMIT {
270 return Err(Error::StorageError(
271 "Directory exceeds cell limit on Bigtable".into(),
272 ));
273 }
274
275 let resp = client
276 .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
277 table_name: client.get_full_table_name(&self.params.table_name),
278 app_profile_id: self.params.app_profile_id.to_string(),
279 authorized_view_name: "".to_string(),
280 row_key: directory_key.clone().into(),
281 predicate_filter: Some(bigtable_v2::RowFilter {
282 filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
283 directory_key.clone().into(),
284 )),
285 }),
286 true_mutations: vec![],
288 false_mutations: vec![
290 bigtable_v2::Mutation {
292 mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
293 bigtable_v2::mutation::SetCell {
294 family_name: self.params.family_name.to_string(),
295 column_qualifier: directory_key.clone().into(),
296 timestamp_micros: -1, value: data,
298 },
299 )),
300 },
301 ],
302 })
303 .await
304 .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
305
306 if resp.predicate_matched {
307 trace!("already existed")
308 }
309
310 Ok(directory_digest)
311 }
312
313 #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
314 fn get_recursive(
315 &self,
316 root_directory_digest: &B3Digest,
317 ) -> BoxStream<'static, Result<Directory, Error>> {
318 traverse_directory(self.clone(), root_directory_digest)
319 }
320
321 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
322 fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + '_)> {
323 Box::new(SimplePutter::new(self))
324 }
325}
326
327#[serde_as]
331#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
332#[serde(deny_unknown_fields)]
333pub struct BigtableParameters {
334 project_id: String,
335 instance_name: String,
336 #[serde(default)]
337 is_read_only: bool,
338 #[serde(default = "default_channel_size")]
339 channel_size: usize,
340
341 #[serde_as(as = "Option<DurationSeconds<String>>")]
342 #[serde(default = "default_timeout")]
343 timeout: Option<std::time::Duration>,
344 table_name: String,
345 family_name: String,
346
347 #[serde(default = "default_app_profile_id")]
348 app_profile_id: String,
349}
350
351#[async_trait]
352impl ServiceBuilder for BigtableParameters {
353 type Output = dyn DirectoryService;
354 async fn build<'a>(
355 &'a self,
356 instance_name: &str,
357 _context: &CompositionContext,
358 ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
359 Ok(Arc::new(
360 BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
361 ))
362 }
363}
364
365impl TryFrom<url::Url> for BigtableParameters {
366 type Error = Box<dyn std::error::Error + Send + Sync>;
367 fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
368 let instance_name = url
370 .host_str()
371 .ok_or_else(|| Error::StorageError("instance name missing".into()))?
372 .to_string();
373
374 url.query_pairs_mut()
376 .append_pair("instance_name", &instance_name);
377
378 let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
379 .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
380
381 Ok(params)
382 }
383}
384
385fn default_app_profile_id() -> String {
386 "default".to_owned()
387}
388
389fn default_channel_size() -> usize {
390 4
391}
392
393fn default_timeout() -> Option<std::time::Duration> {
394 Some(std::time::Duration::from_secs(4))
395}