bigtable_rs/bigtable.rs
1//! `bigtable` module provides a few convenient structs for calling Google Bigtable from Rust code.
2//!
3//!
4//! Example usage:
5//! ```rust,no_run
6//! use bigtable_rs::bigtable;
7//! use bigtable_rs::google::bigtable::v2::row_filter::{Chain, Filter};
8//! use bigtable_rs::google::bigtable::v2::row_range::{EndKey, StartKey};
9//! use bigtable_rs::google::bigtable::v2::{ReadRowsRequest, RowFilter, RowRange, RowSet};
10//! use env_logger;
11//! use std::error::Error;
12//! use std::time::Duration;
13//!
14//! #[tokio::main]
15//! async fn main() -> Result<(), Box<dyn Error>> {
16//! env_logger::init();
17//!
18//! let project_id = "project-1";
19//! let instance_name = "instance-1";
20//! let table_name = "table-1";
21//! let channel_size = 4;
22//! let timeout = Duration::from_secs(10);
23//!
24//! let key_start: String = "key1".to_owned();
25//! let key_end: String = "key4".to_owned();
26//!
27//! // make a bigtable client
28//! let connection = bigtable::BigTableConnection::new(
29//! project_id,
30//! instance_name,
31//! true,
32//! channel_size,
33//! Some(timeout),
34//! )
35//! .await?;
36//! let mut bigtable = connection.client();
37//!
38//! // prepare a ReadRowsRequest
39//! let request = ReadRowsRequest {
40//! app_profile_id: "default".to_owned(),
41//! table_name: bigtable.get_full_table_name(table_name),
42//! rows_limit: 10,
43//! rows: Some(RowSet {
44//! row_keys: vec![], // use this field to put keys for reading specific rows
45//! row_ranges: vec![RowRange {
46//! start_key: Some(StartKey::StartKeyClosed(key_start.into_bytes())),
47//! end_key: Some(EndKey::EndKeyOpen(key_end.into_bytes())),
48//! }],
49//! }),
50//! filter: Some(RowFilter {
51//! filter: Some(Filter::Chain(Chain {
52//! filters: vec![
53//! RowFilter {
54//! filter: Some(Filter::FamilyNameRegexFilter("cf1".to_owned())),
55//! },
56//! RowFilter {
57//! filter: Some(Filter::ColumnQualifierRegexFilter("c1".as_bytes().to_vec())),
58//! },
59//! RowFilter {
60//! filter: Some(Filter::CellsPerColumnLimitFilter(1)),
61//! },
62//! ],
63//! })),
64//! }),
65//! ..ReadRowsRequest::default()
66//! };
67//!
68//! // calling bigtable API to get results
69//! let response = bigtable.read_rows(request).await?;
70//!
71//! // simply print results for example usage
72//! response.into_iter().for_each(|(key, data)| {
73//! println!("------------\n{}", String::from_utf8(key.clone()).unwrap());
74//! data.into_iter().for_each(|row_cell| {
75//! println!(
76//! " [{}:{}] \"{}\" @ {}",
77//! row_cell.family_name,
78//! String::from_utf8(row_cell.qualifier).unwrap(),
79//! String::from_utf8(row_cell.value).unwrap(),
80//! row_cell.timestamp_micros
81//! )
82//! })
83//! });
84//!
85//! Ok(())
86//! }
87//! ```
88
89use std::sync::Arc;
90use std::time::Duration;
91
92use futures_util::Stream;
93use gcp_auth::TokenProvider;
94use log::info;
95use thiserror::Error;
96use tokio::net::UnixStream;
97use tonic::transport::Endpoint;
98use tonic::{codec::Streaming, transport::Channel, transport::ClientTlsConfig, Response};
99use tower::ServiceBuilder;
100
101use crate::auth_service::AuthSvc;
102use crate::bigtable::read_rows::{decode_read_rows_response, decode_read_rows_response_stream};
103use crate::google::bigtable::v2::{
104 bigtable_client::BigtableClient, MutateRowRequest, MutateRowResponse, MutateRowsRequest,
105 MutateRowsResponse, ReadRowsRequest, RowSet, SampleRowKeysRequest, SampleRowKeysResponse,
106};
107use crate::google::bigtable::v2::{CheckAndMutateRowRequest, CheckAndMutateRowResponse};
108use crate::{root_ca_certificate, util::get_row_range_from_prefix};
109
110pub mod read_rows;
111
112/// An alias for Vec<u8> as row key
113type RowKey = Vec<u8>;
114/// A convenient Result type
115type Result<T> = std::result::Result<T, Error>;
116
117/// A data structure for returning the read content of a cell in a row.
118#[derive(Debug)]
119pub struct RowCell {
120 pub family_name: String,
121 pub qualifier: Vec<u8>,
122 pub value: Vec<u8>,
123 pub timestamp_micros: i64,
124 pub labels: Vec<String>,
125}
126
127/// Error types the client may have
128#[derive(Debug, Error)]
129pub enum Error {
130 #[error("AccessToken error: {0}")]
131 AccessTokenError(String),
132
133 #[error("Certificate error: {0}")]
134 CertificateError(String),
135
136 #[error("I/O Error: {0}")]
137 IoError(std::io::Error),
138
139 #[error("Transport error: {0}")]
140 TransportError(tonic::transport::Error),
141
142 #[error("Chunk error")]
143 ChunkError(String),
144
145 #[error("Row not found")]
146 RowNotFound,
147
148 #[error("Row write failed")]
149 RowWriteFailed,
150
151 #[error("Object not found: {0}")]
152 ObjectNotFound(String),
153
154 #[error("Object is corrupt: {0}")]
155 ObjectCorrupt(String),
156
157 #[error("RPC error: {0}")]
158 RpcError(tonic::Status),
159
160 #[error("Timeout error after {0} seconds")]
161 TimeoutError(u64),
162
163 #[error("GCPAuthError error: {0}")]
164 GCPAuthError(#[from] gcp_auth::Error),
165}
166
167impl std::convert::From<std::io::Error> for Error {
168 fn from(err: std::io::Error) -> Self {
169 Self::IoError(err)
170 }
171}
172
173impl std::convert::From<tonic::transport::Error> for Error {
174 fn from(err: tonic::transport::Error) -> Self {
175 Self::TransportError(err)
176 }
177}
178
179impl std::convert::From<tonic::Status> for Error {
180 fn from(err: tonic::Status) -> Self {
181 Self::RpcError(err)
182 }
183}
184
185/// For initiate a Bigtable connection, then a `Bigtable` client can be made from it.
186#[derive(Clone)]
187pub struct BigTableConnection {
188 client: BigtableClient<AuthSvc>,
189 table_prefix: Arc<String>,
190 timeout: Arc<Option<Duration>>,
191}
192
193impl BigTableConnection {
194 /// Establish a connection to the BigTable instance named `instance_name`. If read-only access
195 /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
196 ///
197 /// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the
198 /// program name that contains the BigTable instance in addition to access credentials.
199 ///
200 /// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
201 ///
202 /// `channel_size` defines the number of connections (or channels) established to Bigtable
203 /// service, and the requests are load balanced onto all the channels. You must therefore
204 /// make sure all of these connections are open when a new request is to be sent.
205 /// Idle connections are automatically closed in "a few minutes". Therefore it is important to
206 /// make sure you have a high enough QPS to send at least one request through all the
207 /// connections (in every service host) every minute. If not, you should consider decreasing the
208 /// channel size. If you are not sure what value to pick and your load is low, just start with 1.
209 /// The recommended value could be 2 x the thread count in your tokio environment see info here
210 /// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for
211 /// different applications.
212 ///
213 pub async fn new(
214 project_id: &str,
215 instance_name: &str,
216 is_read_only: bool,
217 channel_size: usize,
218 timeout: Option<Duration>,
219 ) -> Result<Self> {
220 match std::env::var("BIGTABLE_EMULATOR_HOST") {
221 Ok(endpoint) => Self::new_with_emulator(
222 endpoint.as_str(),
223 project_id,
224 instance_name,
225 is_read_only,
226 timeout,
227 ),
228
229 Err(_) => {
230 let token_provider = gcp_auth::provider().await?;
231 Self::new_with_token_provider(
232 project_id,
233 instance_name,
234 is_read_only,
235 channel_size,
236 timeout,
237 token_provider,
238 )
239 }
240 }
241 }
242 /// Establish a connection to the BigTable instance named `instance_name`. If read-only access
243 /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
244 ///
245 /// The `authentication_manager` variable will be used to determine the
246 /// program name that contains the BigTable instance in addition to access credentials.
247 ///
248 ///
249 /// `channel_size` defines the number of connections (or channels) established to Bigtable
250 /// service, and the requests are load balanced onto all the channels. You must therefore
251 /// make sure all of these connections are open when a new request is to be sent.
252 /// Idle connections are automatically closed in "a few minutes". Therefore it is important to
253 /// make sure you have a high enough QPS to send at least one request through all the
254 /// connections (in every service host) every minute. If not, you should consider decreasing the
255 /// channel size. If you are not sure what value to pick and your load is low, just start with 1.
256 /// The recommended value could be 2 x the thread count in your tokio environment see info here
257 /// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for
258 /// different applications.
259 ///
260 pub fn new_with_token_provider(
261 project_id: &str,
262 instance_name: &str,
263 is_read_only: bool,
264 channel_size: usize,
265 timeout: Option<Duration>,
266 token_provider: Arc<dyn TokenProvider>,
267 ) -> Result<Self> {
268 match std::env::var("BIGTABLE_EMULATOR_HOST") {
269 Ok(endpoint) => Self::new_with_emulator(
270 endpoint.as_str(),
271 project_id,
272 instance_name,
273 is_read_only,
274 timeout,
275 ),
276
277 Err(_) => {
278 let table_prefix = format!(
279 "projects/{}/instances/{}/tables/",
280 project_id, instance_name
281 );
282
283 let endpoints: Result<Vec<Endpoint>> = vec![0; channel_size.max(1)]
284 .iter()
285 .map(move |_| {
286 Channel::from_static("https://bigtable.googleapis.com")
287 .tls_config(
288 ClientTlsConfig::new()
289 .ca_certificate(
290 root_ca_certificate::load()
291 .map_err(Error::CertificateError)
292 .expect("root certificate error"),
293 )
294 .domain_name("bigtable.googleapis.com"),
295 )
296 .map_err(Error::TransportError)
297 })
298 .collect();
299
300 let endpoints: Vec<Endpoint> = endpoints?
301 .into_iter()
302 .map(|ep| {
303 ep.http2_keep_alive_interval(Duration::from_secs(60))
304 .keep_alive_while_idle(true)
305 })
306 .map(|ep| {
307 if let Some(timeout) = timeout {
308 ep.timeout(timeout)
309 } else {
310 ep
311 }
312 })
313 .collect();
314
315 // construct a channel, by balancing over all endpoints.
316 let channel = Channel::balance_list(endpoints.into_iter());
317
318 let token_provider = Some(token_provider);
319 Ok(Self {
320 client: create_client(channel, token_provider, is_read_only),
321 table_prefix: Arc::new(table_prefix),
322 timeout: Arc::new(timeout),
323 })
324 }
325 }
326 }
327
328 /// Establish a connection to a BigTable emulator at [emulator_endpoint].
329 /// This is usually covered by [Self::new] or [Self::new_with_auth_manager],
330 /// which both support the `BIGTABLE_EMULATOR_HOST` env variable. However,
331 /// this function can also be used directly, in case setting
332 /// `BIGTABLE_EMULATOR_HOST` is inconvenient.
333 pub fn new_with_emulator(
334 emulator_endpoint: &str,
335 project_id: &str,
336 instance_name: &str,
337 is_read_only: bool,
338 timeout: Option<Duration>,
339 ) -> Result<Self> {
340 info!("Connecting to bigtable emulator at {}", emulator_endpoint);
341
342 // configures the endpoint with the specified parameters
343 fn configure_endpoint(endpoint: Endpoint, timeout: Option<Duration>) -> Endpoint {
344 let endpoint = endpoint
345 .http2_keep_alive_interval(Duration::from_secs(60))
346 .keep_alive_while_idle(true);
347
348 if let Some(timeout) = timeout {
349 endpoint.timeout(timeout)
350 } else {
351 endpoint
352 }
353 }
354
355 // Parse emulator_endpoint. Officially, it's only host:port,
356 // but unix:///path/to/unix.sock also works in the Go SDK at least.
357 // Having the emulator listen on unix domain sockets without ip2unix is
358 // covered in https://github.com/googleapis/google-cloud-go/pull/9665.
359 let channel = if let Some(path) = emulator_endpoint.strip_prefix("unix://") {
360 // the URL doesn't matter, we use a custom connector.
361 let endpoint = Endpoint::from_static("http://[::]:50051");
362 let endpoint = configure_endpoint(endpoint, timeout);
363
364 let path: String = path.to_string();
365 let connector = tower::service_fn({
366 move |_: tonic::transport::Uri| {
367 let path = path.clone();
368 async move {
369 let stream = UnixStream::connect(path).await?;
370 Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
371 }
372 }
373 });
374
375 endpoint.connect_with_connector_lazy(connector)
376 } else {
377 let endpoint = Channel::from_shared(format!("http://{}", emulator_endpoint))
378 .expect("invalid connection emulator uri");
379 let endpoint = configure_endpoint(endpoint, timeout);
380
381 endpoint.connect_lazy()
382 };
383
384 Ok(Self {
385 client: create_client(channel, None, is_read_only),
386 table_prefix: Arc::new(format!(
387 "projects/{}/instances/{}/tables/",
388 project_id, instance_name
389 )),
390 timeout: Arc::new(timeout),
391 })
392 }
393
394 /// Create a new BigTable client by cloning needed properties.
395 ///
396 /// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
397 /// the created new clients can be cheaply cloned and thus can be send to different threads
398 pub fn client(&self) -> BigTable {
399 BigTable {
400 client: self.client.clone(),
401 table_prefix: self.table_prefix.clone(),
402 timeout: self.timeout.clone(),
403 }
404 }
405
406 /// Provide a convenient method to update the inner `BigtableClient` so a newly configured client can be set
407 pub fn configure_inner_client(
408 &mut self,
409 config_fn: fn(BigtableClient<AuthSvc>) -> BigtableClient<AuthSvc>,
410 ) {
411 self.client = config_fn(self.client.clone());
412 }
413}
414
415/// Helper function to create a BigtableClient<AuthSvc>
416/// from a channel.
417fn create_client(
418 channel: Channel,
419 token_provider: Option<Arc<dyn TokenProvider>>,
420 read_only: bool,
421) -> BigtableClient<AuthSvc> {
422 let scopes = if read_only {
423 "https://www.googleapis.com/auth/bigtable.data.readonly"
424 } else {
425 "https://www.googleapis.com/auth/bigtable.data"
426 };
427
428 let auth_svc = ServiceBuilder::new()
429 .layer_fn(|c| AuthSvc::new(c, token_provider.clone(), scopes.to_string()))
430 .service(channel);
431 return BigtableClient::new(auth_svc);
432}
433
434/// The core struct for Bigtable client, which wraps a gPRC client defined by Bigtable proto.
435/// In order to easily use this struct in multiple threads, we only store cloneable references here.
436/// `BigtableClient<AuthSvc>` is a type alias of `BigtableClient` and it wraps a tonic Channel.
437/// Cloning on `Bigtable` is cheap.
438///
439/// Bigtable can be created via `bigtable::BigTableConnection::new()` and cloned
440/// ```rust,no_run
441/// #[tokio::main]
442/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
443/// use bigtable_rs::bigtable;
444/// let connection = bigtable::BigTableConnection::new("p-id", "i-id", true, 1, None).await?;
445/// let bt_client = connection.client();
446/// // Cheap to clone clients and used in other places.
447/// let bt_client2 = bt_client.clone();
448/// Ok(())
449/// }
450/// ```
451#[derive(Clone)]
452pub struct BigTable {
453 // clone is cheap with Channel, see https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html
454 client: BigtableClient<AuthSvc>,
455 table_prefix: Arc<String>,
456 timeout: Arc<Option<Duration>>,
457}
458
459impl BigTable {
460 /// Wrapped `check_and_mutate_row` method
461 pub async fn check_and_mutate_row(
462 &mut self,
463 request: CheckAndMutateRowRequest,
464 ) -> Result<CheckAndMutateRowResponse> {
465 let response = self
466 .client
467 .check_and_mutate_row(request)
468 .await?
469 .into_inner();
470 Ok(response)
471 }
472
473 /// Wrapped `read_rows` method
474 pub async fn read_rows(
475 &mut self,
476 request: ReadRowsRequest,
477 ) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
478 let response = self.client.read_rows(request).await?.into_inner();
479 decode_read_rows_response(self.timeout.as_ref(), response).await
480 }
481
482 /// Provide `read_rows_with_prefix` method to allow using a prefix as key
483 pub async fn read_rows_with_prefix(
484 &mut self,
485 mut request: ReadRowsRequest,
486 prefix: Vec<u8>,
487 ) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
488 let row_range = get_row_range_from_prefix(prefix);
489 request.rows = Some(RowSet {
490 row_keys: vec![], // use this field to put keys for reading specific rows
491 row_ranges: vec![row_range],
492 });
493 let response = self.client.read_rows(request).await?.into_inner();
494 decode_read_rows_response(self.timeout.as_ref(), response).await
495 }
496
497 /// Streaming support for `read_rows` method
498 pub async fn stream_rows(
499 &mut self,
500 request: ReadRowsRequest,
501 ) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
502 let response = self.client.read_rows(request).await?.into_inner();
503 let stream = decode_read_rows_response_stream(response).await;
504 Ok(stream)
505 }
506
507 /// Streaming support for `read_rows_with_prefix` method
508 pub async fn stream_rows_with_prefix(
509 &mut self,
510 mut request: ReadRowsRequest,
511 prefix: Vec<u8>,
512 ) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
513 let row_range = get_row_range_from_prefix(prefix);
514 request.rows = Some(RowSet {
515 row_keys: vec![],
516 row_ranges: vec![row_range],
517 });
518 let response = self.client.read_rows(request).await?.into_inner();
519 let stream = decode_read_rows_response_stream(response).await;
520 Ok(stream)
521 }
522
523 /// Wrapped `sample_row_keys` method
524 pub async fn sample_row_keys(
525 &mut self,
526 request: SampleRowKeysRequest,
527 ) -> Result<Streaming<SampleRowKeysResponse>> {
528 let response = self.client.sample_row_keys(request).await?.into_inner();
529 Ok(response)
530 }
531
532 /// Wrapped `mutate_row` method
533 pub async fn mutate_row(
534 &mut self,
535 request: MutateRowRequest,
536 ) -> Result<Response<MutateRowResponse>> {
537 let response = self.client.mutate_row(request).await?;
538 Ok(response)
539 }
540
541 /// Wrapped `mutate_rows` method
542 pub async fn mutate_rows(
543 &mut self,
544 request: MutateRowsRequest,
545 ) -> Result<Streaming<MutateRowsResponse>> {
546 let response = self.client.mutate_rows(request).await?.into_inner();
547 Ok(response)
548 }
549
550 /// Provide a convenient method to get the inner `BigtableClient` so user can use any methods
551 /// defined from the Bigtable V2 gRPC API
552 pub fn get_client(&mut self) -> &mut BigtableClient<AuthSvc> {
553 &mut self.client
554 }
555
556 /// Provide a convenient method to update the inner `BigtableClient` config
557 pub fn configure_inner_client(
558 &mut self,
559 config_fn: fn(BigtableClient<AuthSvc>) -> BigtableClient<AuthSvc>,
560 ) {
561 self.client = config_fn(self.client.clone());
562 }
563
564 /// Provide a convenient method to get full table, which can be used for building requests
565 pub fn get_full_table_name(&self, table_name: &str) -> String {
566 [&self.table_prefix, table_name].concat()
567 }
568}