bigtable_rs/
bigtable.rs

1//! `bigtable` module provides a few convenient structs for calling Google Bigtable from Rust code.
2//!
3//!
4//! Example usage:
5//! ```rust,no_run
6//! use bigtable_rs::bigtable;
7//! use bigtable_rs::google::bigtable::v2::row_filter::{Chain, Filter};
8//! use bigtable_rs::google::bigtable::v2::row_range::{EndKey, StartKey};
9//! use bigtable_rs::google::bigtable::v2::{ReadRowsRequest, RowFilter, RowRange, RowSet};
10//! use env_logger;
11//! use std::error::Error;
12//! use std::time::Duration;
13//!
14//! #[tokio::main]
15//! async fn main() -> Result<(), Box<dyn Error>> {
16//!     env_logger::init();
17//!
18//!     let project_id = "project-1";
19//!     let instance_name = "instance-1";
20//!     let table_name = "table-1";
21//!     let channel_size = 4;
22//!     let timeout = Duration::from_secs(10);
23//!
24//!     let key_start: String = "key1".to_owned();
25//!     let key_end: String = "key4".to_owned();
26//!
27//!     // make a bigtable client
28//!     let connection = bigtable::BigTableConnection::new(
29//!         project_id,
30//!         instance_name,
31//!         true,
32//!         channel_size,
33//!         Some(timeout),
34//!     )
35//!         .await?;
36//!     let mut bigtable = connection.client();
37//!
38//!     // prepare a ReadRowsRequest
39//!     let request = ReadRowsRequest {
40//!         app_profile_id: "default".to_owned(),
41//!         table_name: bigtable.get_full_table_name(table_name),
42//!         rows_limit: 10,
43//!         rows: Some(RowSet {
44//!             row_keys: vec![], // use this field to put keys for reading specific rows
45//!             row_ranges: vec![RowRange {
46//!                 start_key: Some(StartKey::StartKeyClosed(key_start.into_bytes())),
47//!                 end_key: Some(EndKey::EndKeyOpen(key_end.into_bytes())),
48//!             }],
49//!         }),
50//!         filter: Some(RowFilter {
51//!             filter: Some(Filter::Chain(Chain {
52//!                 filters: vec![
53//!                     RowFilter {
54//!                         filter: Some(Filter::FamilyNameRegexFilter("cf1".to_owned())),
55//!                     },
56//!                     RowFilter {
57//!                         filter: Some(Filter::ColumnQualifierRegexFilter("c1".as_bytes().to_vec())),
58//!                     },
59//!                     RowFilter {
60//!                         filter: Some(Filter::CellsPerColumnLimitFilter(1)),
61//!                     },
62//!                 ],
63//!             })),
64//!         }),
65//!         ..ReadRowsRequest::default()
66//!     };
67//!
68//!     // calling bigtable API to get results
69//!     let response = bigtable.read_rows(request).await?;
70//!
71//!     // simply print results for example usage
72//!     response.into_iter().for_each(|(key, data)| {
73//!         println!("------------\n{}", String::from_utf8(key.clone()).unwrap());
74//!         data.into_iter().for_each(|row_cell| {
75//!             println!(
76//!                 "    [{}:{}] \"{}\" @ {}",
77//!                 row_cell.family_name,
78//!                 String::from_utf8(row_cell.qualifier).unwrap(),
79//!                 String::from_utf8(row_cell.value).unwrap(),
80//!                 row_cell.timestamp_micros
81//!             )
82//!         })
83//!     });
84//!
85//!     Ok(())
86//! }
87//! ```
88
89use std::sync::Arc;
90use std::time::Duration;
91
92use futures_util::Stream;
93use gcp_auth::TokenProvider;
94use log::info;
95use thiserror::Error;
96use tokio::net::UnixStream;
97use tonic::transport::Endpoint;
98use tonic::{codec::Streaming, transport::Channel, transport::ClientTlsConfig, Response};
99use tower::ServiceBuilder;
100
101use crate::auth_service::AuthSvc;
102use crate::bigtable::read_rows::{decode_read_rows_response, decode_read_rows_response_stream};
103use crate::google::bigtable::v2::{
104    bigtable_client::BigtableClient, MutateRowRequest, MutateRowResponse, MutateRowsRequest,
105    MutateRowsResponse, ReadRowsRequest, RowSet, SampleRowKeysRequest, SampleRowKeysResponse,
106};
107use crate::google::bigtable::v2::{CheckAndMutateRowRequest, CheckAndMutateRowResponse};
108use crate::{root_ca_certificate, util::get_row_range_from_prefix};
109
110pub mod read_rows;
111
112/// An alias for Vec<u8> as row key
113type RowKey = Vec<u8>;
114/// A convenient Result type
115type Result<T> = std::result::Result<T, Error>;
116
117/// A data structure for returning the read content of a cell in a row.
118#[derive(Debug)]
119pub struct RowCell {
120    pub family_name: String,
121    pub qualifier: Vec<u8>,
122    pub value: Vec<u8>,
123    pub timestamp_micros: i64,
124    pub labels: Vec<String>,
125}
126
127/// Error types the client may have
128#[derive(Debug, Error)]
129pub enum Error {
130    #[error("AccessToken error: {0}")]
131    AccessTokenError(String),
132
133    #[error("Certificate error: {0}")]
134    CertificateError(String),
135
136    #[error("I/O Error: {0}")]
137    IoError(std::io::Error),
138
139    #[error("Transport error: {0}")]
140    TransportError(tonic::transport::Error),
141
142    #[error("Chunk error")]
143    ChunkError(String),
144
145    #[error("Row not found")]
146    RowNotFound,
147
148    #[error("Row write failed")]
149    RowWriteFailed,
150
151    #[error("Object not found: {0}")]
152    ObjectNotFound(String),
153
154    #[error("Object is corrupt: {0}")]
155    ObjectCorrupt(String),
156
157    #[error("RPC error: {0}")]
158    RpcError(tonic::Status),
159
160    #[error("Timeout error after {0} seconds")]
161    TimeoutError(u64),
162
163    #[error("GCPAuthError error: {0}")]
164    GCPAuthError(#[from] gcp_auth::Error),
165}
166
167impl std::convert::From<std::io::Error> for Error {
168    fn from(err: std::io::Error) -> Self {
169        Self::IoError(err)
170    }
171}
172
173impl std::convert::From<tonic::transport::Error> for Error {
174    fn from(err: tonic::transport::Error) -> Self {
175        Self::TransportError(err)
176    }
177}
178
179impl std::convert::From<tonic::Status> for Error {
180    fn from(err: tonic::Status) -> Self {
181        Self::RpcError(err)
182    }
183}
184
185/// For initiate a Bigtable connection, then a `Bigtable` client can be made from it.
186#[derive(Clone)]
187pub struct BigTableConnection {
188    client: BigtableClient<AuthSvc>,
189    table_prefix: Arc<String>,
190    timeout: Arc<Option<Duration>>,
191}
192
193impl BigTableConnection {
194    /// Establish a connection to the BigTable instance named `instance_name`.  If read-only access
195    /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
196    ///
197    /// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the
198    /// program name that contains the BigTable instance in addition to access credentials.
199    ///
200    /// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
201    ///
202    /// `channel_size` defines the number of connections (or channels) established to Bigtable
203    /// service, and the requests are load balanced onto all the channels. You must therefore
204    /// make sure all of these connections are open when a new request is to be sent.
205    /// Idle connections are automatically closed in "a few minutes". Therefore it is important to
206    /// make sure you have a high enough QPS to send at least one request through all the
207    /// connections (in every service host) every minute. If not, you should consider decreasing the
208    /// channel size. If you are not sure what value to pick and your load is low, just start with 1.
209    /// The recommended value could be 2 x the thread count in your tokio environment see info here
210    /// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for
211    /// different applications.
212    ///
213    pub async fn new(
214        project_id: &str,
215        instance_name: &str,
216        is_read_only: bool,
217        channel_size: usize,
218        timeout: Option<Duration>,
219    ) -> Result<Self> {
220        match std::env::var("BIGTABLE_EMULATOR_HOST") {
221            Ok(endpoint) => Self::new_with_emulator(
222                endpoint.as_str(),
223                project_id,
224                instance_name,
225                is_read_only,
226                timeout,
227            ),
228
229            Err(_) => {
230                let token_provider = gcp_auth::provider().await?;
231                Self::new_with_token_provider(
232                    project_id,
233                    instance_name,
234                    is_read_only,
235                    channel_size,
236                    timeout,
237                    token_provider,
238                )
239            }
240        }
241    }
242    /// Establish a connection to the BigTable instance named `instance_name`.  If read-only access
243    /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
244    ///
245    /// The `authentication_manager` variable will be used to determine the
246    /// program name that contains the BigTable instance in addition to access credentials.
247    ///
248    ///
249    /// `channel_size` defines the number of connections (or channels) established to Bigtable
250    /// service, and the requests are load balanced onto all the channels. You must therefore
251    /// make sure all of these connections are open when a new request is to be sent.
252    /// Idle connections are automatically closed in "a few minutes". Therefore it is important to
253    /// make sure you have a high enough QPS to send at least one request through all the
254    /// connections (in every service host) every minute. If not, you should consider decreasing the
255    /// channel size. If you are not sure what value to pick and your load is low, just start with 1.
256    /// The recommended value could be 2 x the thread count in your tokio environment see info here
257    /// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for
258    /// different applications.
259    ///
260    pub fn new_with_token_provider(
261        project_id: &str,
262        instance_name: &str,
263        is_read_only: bool,
264        channel_size: usize,
265        timeout: Option<Duration>,
266        token_provider: Arc<dyn TokenProvider>,
267    ) -> Result<Self> {
268        match std::env::var("BIGTABLE_EMULATOR_HOST") {
269            Ok(endpoint) => Self::new_with_emulator(
270                endpoint.as_str(),
271                project_id,
272                instance_name,
273                is_read_only,
274                timeout,
275            ),
276
277            Err(_) => {
278                let table_prefix = format!(
279                    "projects/{}/instances/{}/tables/",
280                    project_id, instance_name
281                );
282
283                let endpoints: Result<Vec<Endpoint>> = vec![0; channel_size.max(1)]
284                    .iter()
285                    .map(move |_| {
286                        Channel::from_static("https://bigtable.googleapis.com")
287                            .tls_config(
288                                ClientTlsConfig::new()
289                                    .ca_certificate(
290                                        root_ca_certificate::load()
291                                            .map_err(Error::CertificateError)
292                                            .expect("root certificate error"),
293                                    )
294                                    .domain_name("bigtable.googleapis.com"),
295                            )
296                            .map_err(Error::TransportError)
297                    })
298                    .collect();
299
300                let endpoints: Vec<Endpoint> = endpoints?
301                    .into_iter()
302                    .map(|ep| {
303                        ep.http2_keep_alive_interval(Duration::from_secs(60))
304                            .keep_alive_while_idle(true)
305                    })
306                    .map(|ep| {
307                        if let Some(timeout) = timeout {
308                            ep.timeout(timeout)
309                        } else {
310                            ep
311                        }
312                    })
313                    .collect();
314
315                // construct a channel, by balancing over all endpoints.
316                let channel = Channel::balance_list(endpoints.into_iter());
317
318                let token_provider = Some(token_provider);
319                Ok(Self {
320                    client: create_client(channel, token_provider, is_read_only),
321                    table_prefix: Arc::new(table_prefix),
322                    timeout: Arc::new(timeout),
323                })
324            }
325        }
326    }
327
328    /// Establish a connection to a BigTable emulator at [emulator_endpoint].
329    /// This is usually covered by [Self::new] or [Self::new_with_auth_manager],
330    /// which both support the `BIGTABLE_EMULATOR_HOST` env variable. However,
331    /// this function can also be used directly, in case setting
332    /// `BIGTABLE_EMULATOR_HOST` is inconvenient.
333    pub fn new_with_emulator(
334        emulator_endpoint: &str,
335        project_id: &str,
336        instance_name: &str,
337        is_read_only: bool,
338        timeout: Option<Duration>,
339    ) -> Result<Self> {
340        info!("Connecting to bigtable emulator at {}", emulator_endpoint);
341
342        // configures the endpoint with the specified parameters
343        fn configure_endpoint(endpoint: Endpoint, timeout: Option<Duration>) -> Endpoint {
344            let endpoint = endpoint
345                .http2_keep_alive_interval(Duration::from_secs(60))
346                .keep_alive_while_idle(true);
347
348            if let Some(timeout) = timeout {
349                endpoint.timeout(timeout)
350            } else {
351                endpoint
352            }
353        }
354
355        // Parse emulator_endpoint. Officially, it's only host:port,
356        // but unix:///path/to/unix.sock also works in the Go SDK at least.
357        // Having the emulator listen on unix domain sockets without ip2unix is
358        // covered in https://github.com/googleapis/google-cloud-go/pull/9665.
359        let channel = if let Some(path) = emulator_endpoint.strip_prefix("unix://") {
360            // the URL doesn't matter, we use a custom connector.
361            let endpoint = Endpoint::from_static("http://[::]:50051");
362            let endpoint = configure_endpoint(endpoint, timeout);
363
364            let path: String = path.to_string();
365            let connector = tower::service_fn({
366                move |_: tonic::transport::Uri| {
367                    let path = path.clone();
368                    async move {
369                        let stream = UnixStream::connect(path).await?;
370                        Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
371                    }
372                }
373            });
374
375            endpoint.connect_with_connector_lazy(connector)
376        } else {
377            let endpoint = Channel::from_shared(format!("http://{}", emulator_endpoint))
378                .expect("invalid connection emulator uri");
379            let endpoint = configure_endpoint(endpoint, timeout);
380
381            endpoint.connect_lazy()
382        };
383
384        Ok(Self {
385            client: create_client(channel, None, is_read_only),
386            table_prefix: Arc::new(format!(
387                "projects/{}/instances/{}/tables/",
388                project_id, instance_name
389            )),
390            timeout: Arc::new(timeout),
391        })
392    }
393
394    /// Create a new BigTable client by cloning needed properties.
395    ///
396    /// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
397    /// the created new clients can be cheaply cloned and thus can be send to different threads
398    pub fn client(&self) -> BigTable {
399        BigTable {
400            client: self.client.clone(),
401            table_prefix: self.table_prefix.clone(),
402            timeout: self.timeout.clone(),
403        }
404    }
405
406    /// Provide a convenient method to update the inner `BigtableClient` so a newly configured client can be set
407    pub fn configure_inner_client(
408        &mut self,
409        config_fn: fn(BigtableClient<AuthSvc>) -> BigtableClient<AuthSvc>,
410    ) {
411        self.client = config_fn(self.client.clone());
412    }
413}
414
415/// Helper function to create a BigtableClient<AuthSvc>
416/// from a channel.
417fn create_client(
418    channel: Channel,
419    token_provider: Option<Arc<dyn TokenProvider>>,
420    read_only: bool,
421) -> BigtableClient<AuthSvc> {
422    let scopes = if read_only {
423        "https://www.googleapis.com/auth/bigtable.data.readonly"
424    } else {
425        "https://www.googleapis.com/auth/bigtable.data"
426    };
427
428    let auth_svc = ServiceBuilder::new()
429        .layer_fn(|c| AuthSvc::new(c, token_provider.clone(), scopes.to_string()))
430        .service(channel);
431    return BigtableClient::new(auth_svc);
432}
433
434/// The core struct for Bigtable client, which wraps a gPRC client defined by Bigtable proto.
435/// In order to easily use this struct in multiple threads, we only store cloneable references here.
436/// `BigtableClient<AuthSvc>` is a type alias of `BigtableClient` and it wraps a tonic Channel.
437/// Cloning on `Bigtable` is cheap.
438///
439/// Bigtable can be created via `bigtable::BigTableConnection::new()` and cloned
440/// ```rust,no_run
441/// #[tokio::main]
442/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
443///   use bigtable_rs::bigtable;
444///   let connection = bigtable::BigTableConnection::new("p-id", "i-id", true, 1, None).await?;
445///   let bt_client = connection.client();
446///   // Cheap to clone clients and used in other places.
447///   let bt_client2 = bt_client.clone();
448///   Ok(())
449/// }
450/// ```
451#[derive(Clone)]
452pub struct BigTable {
453    // clone is cheap with Channel, see https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html
454    client: BigtableClient<AuthSvc>,
455    table_prefix: Arc<String>,
456    timeout: Arc<Option<Duration>>,
457}
458
459impl BigTable {
460    /// Wrapped `check_and_mutate_row` method
461    pub async fn check_and_mutate_row(
462        &mut self,
463        request: CheckAndMutateRowRequest,
464    ) -> Result<CheckAndMutateRowResponse> {
465        let response = self
466            .client
467            .check_and_mutate_row(request)
468            .await?
469            .into_inner();
470        Ok(response)
471    }
472
473    /// Wrapped `read_rows` method
474    pub async fn read_rows(
475        &mut self,
476        request: ReadRowsRequest,
477    ) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
478        let response = self.client.read_rows(request).await?.into_inner();
479        decode_read_rows_response(self.timeout.as_ref(), response).await
480    }
481
482    /// Provide `read_rows_with_prefix` method to allow using a prefix as key
483    pub async fn read_rows_with_prefix(
484        &mut self,
485        mut request: ReadRowsRequest,
486        prefix: Vec<u8>,
487    ) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
488        let row_range = get_row_range_from_prefix(prefix);
489        request.rows = Some(RowSet {
490            row_keys: vec![], // use this field to put keys for reading specific rows
491            row_ranges: vec![row_range],
492        });
493        let response = self.client.read_rows(request).await?.into_inner();
494        decode_read_rows_response(self.timeout.as_ref(), response).await
495    }
496
497    /// Streaming support for `read_rows` method
498    pub async fn stream_rows(
499        &mut self,
500        request: ReadRowsRequest,
501    ) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
502        let response = self.client.read_rows(request).await?.into_inner();
503        let stream = decode_read_rows_response_stream(response).await;
504        Ok(stream)
505    }
506
507    /// Streaming support for `read_rows_with_prefix` method
508    pub async fn stream_rows_with_prefix(
509        &mut self,
510        mut request: ReadRowsRequest,
511        prefix: Vec<u8>,
512    ) -> Result<impl Stream<Item = Result<(RowKey, Vec<RowCell>)>>> {
513        let row_range = get_row_range_from_prefix(prefix);
514        request.rows = Some(RowSet {
515            row_keys: vec![],
516            row_ranges: vec![row_range],
517        });
518        let response = self.client.read_rows(request).await?.into_inner();
519        let stream = decode_read_rows_response_stream(response).await;
520        Ok(stream)
521    }
522
523    /// Wrapped `sample_row_keys` method
524    pub async fn sample_row_keys(
525        &mut self,
526        request: SampleRowKeysRequest,
527    ) -> Result<Streaming<SampleRowKeysResponse>> {
528        let response = self.client.sample_row_keys(request).await?.into_inner();
529        Ok(response)
530    }
531
532    /// Wrapped `mutate_row` method
533    pub async fn mutate_row(
534        &mut self,
535        request: MutateRowRequest,
536    ) -> Result<Response<MutateRowResponse>> {
537        let response = self.client.mutate_row(request).await?;
538        Ok(response)
539    }
540
541    /// Wrapped `mutate_rows` method
542    pub async fn mutate_rows(
543        &mut self,
544        request: MutateRowsRequest,
545    ) -> Result<Streaming<MutateRowsResponse>> {
546        let response = self.client.mutate_rows(request).await?.into_inner();
547        Ok(response)
548    }
549
550    /// Provide a convenient method to get the inner `BigtableClient` so user can use any methods
551    /// defined from the Bigtable V2 gRPC API
552    pub fn get_client(&mut self) -> &mut BigtableClient<AuthSvc> {
553        &mut self.client
554    }
555
556    /// Provide a convenient method to update the inner `BigtableClient` config
557    pub fn configure_inner_client(
558        &mut self,
559        config_fn: fn(BigtableClient<AuthSvc>) -> BigtableClient<AuthSvc>,
560    ) {
561        self.client = config_fn(self.client.clone());
562    }
563
564    /// Provide a convenient method to get full table, which can be used for building requests
565    pub fn get_full_table_name(&self, table_name: &str) -> String {
566        [&self.table_prefix, table_name].concat()
567    }
568}