bigtable_rs/bigtable/
read_rows.rs

1use crate::bigtable::{Error, Result, RowCell, RowKey};
2use crate::google::bigtable::v2::read_rows_response::cell_chunk::RowStatus;
3use crate::google::bigtable::v2::read_rows_response::CellChunk;
4use crate::google::bigtable::v2::ReadRowsResponse;
5use futures_util::stream::iter;
6use futures_util::{Stream, StreamExt};
7use log::trace;
8use std::collections::HashSet;
9use std::time::{Duration, Instant};
10use tonic::Streaming;
11
12/// As each `CellChunk` could be only part of a cell, this method reorganize multiple `CellChunk`
13/// from multiple `ReadRowsResponse` into a `Vec<(RowKey, Vec<RowCell>)>`.
14pub async fn decode_read_rows_response(
15    timeout: &Option<Duration>,
16    mut rrr: Streaming<ReadRowsResponse>,
17) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
18    let mut rows: Vec<(RowKey, Vec<RowCell>)> = vec![];
19
20    let started = Instant::now();
21    while let Some(res) = rrr.message().await? {
22        if let Some(timeout) = timeout.as_ref() {
23            if Instant::now().duration_since(started) > *timeout {
24                return Err(Error::TimeoutError(timeout.as_secs()));
25            }
26        }
27        let rows_part = decode_read_rows_response_to_vec(res.chunks);
28        for part in rows_part.into_iter() {
29            match part {
30                Ok(part) => rows.push(part),
31                Err(e) => return Err(e),
32            }
33        }
34    }
35    Ok(rows)
36}
37
38/// Flatten and decode the stream of `ReadRowsResponse` into a stream of `Result<(RowKey, Vec<RowCell>)>>`.
39pub async fn decode_read_rows_response_stream(
40    rrr: Streaming<ReadRowsResponse>,
41) -> impl Stream<Item = Result<(RowKey, Vec<RowCell>)>> {
42    rrr.flat_map(|message| match message {
43        Ok(response) => {
44            let results = decode_read_rows_response_to_vec(response.chunks);
45            iter(results)
46        }
47        Err(e) => iter(vec![Err(Error::RpcError(e))]),
48    })
49}
50
51pub fn decode_read_rows_response_to_vec(
52    chunks: Vec<CellChunk>,
53) -> Vec<Result<(RowKey, Vec<RowCell>)>> {
54    let mut rows: Vec<Result<(RowKey, Vec<RowCell>)>> = vec![];
55    let mut row_key = None;
56    let mut row_data: Vec<RowCell> = vec![];
57
58    let mut cell_family_name = None;
59    let mut cell_name = None;
60    let mut cell_timestamp = 0;
61    let mut cell_value = vec![];
62    // If this CellChunk is part of a chunked cell value and this is
63    // not the final chunk of that cell, value_size will be set to the
64    // total length of the cell value.  The client can use this size
65    // to pre-allocate memory to hold the full cell value.
66    let mut cell_value_size: usize;
67    let mut cell_labels = vec![];
68
69    let mut start_new_cell = false;
70    let mut committed_row_cell_count = 0usize;
71    let mut start_new_row = false; // Marker for starting a new row. A commit will set this as false
72
73    let mut key_set: HashSet<Vec<u8>> = HashSet::new();
74    let mut chunk_value_is_empty: bool;
75
76    if chunks.is_empty() {
77        return rows;
78    }
79
80    for (i, mut chunk) in chunks.into_iter().enumerate() {
81        // The comments for `read_rows_response::CellChunk` provide essential details for
82        // understanding how the below decoding works...
83        trace!("chunk {}: {:?}", i, chunk.value);
84
85        // Starting a new row?
86        if !chunk.row_key.is_empty() {
87            if row_key.is_none() || row_key.take().unwrap() != chunk.row_key {
88                // a new key comes, start_new_row should be false at this time
89                if start_new_row {
90                    rows.truncate(committed_row_cell_count);
91                    rows.push(Err(Error::ChunkError(
92                        "Invalid - no commit before key changes".to_owned(),
93                    )));
94                    return rows;
95                }
96                start_new_row = true;
97            }
98            row_key = Some(chunk.row_key);
99        } else {
100            // row_key is empty
101            if !start_new_row {
102                rows.truncate(committed_row_cell_count);
103                rows.push(Err(Error::ChunkError(
104                    "Invalid - new row missing row key".to_owned(),
105                )));
106                return rows;
107            }
108        }
109
110        // when starting a new cell with new family name, then a qualifier must exist
111        if chunk.family_name.is_some()
112            && !chunk.family_name.eq(&cell_family_name)
113            && chunk.qualifier.is_none()
114        {
115            rows.truncate(committed_row_cell_count);
116            rows.push(Err(Error::ChunkError(
117                "new col family but no specified qualifier".to_owned(),
118            )));
119            return rows;
120        }
121
122        // start a new cell with the existing cell_name or new cell_name (chunk.qualifier)
123        if (start_new_cell && cell_name.is_some()) || chunk.qualifier.is_some() {
124            if chunk.value_size == 0 {
125                cell_value_size = chunk.value.len();
126            } else {
127                cell_value_size = chunk.value_size as usize;
128            }
129            cell_value = Vec::with_capacity(cell_value_size);
130            // when a new cell with the same qualifier starts, we need to reuse the old cell_name and cell_family_name
131            cell_family_name = chunk.family_name.or(cell_family_name);
132            cell_name = chunk.qualifier.or(cell_name);
133            cell_timestamp = chunk.timestamp_micros;
134            cell_labels = chunk.labels;
135            start_new_cell = false;
136        }
137
138        chunk_value_is_empty = chunk.value.is_empty();
139        cell_value.append(&mut chunk.value);
140
141        // last chunk for the cell?
142        if chunk.value_size == 0 {
143            // Close up the cell
144            if cell_name.is_some() {
145                let row_cell = RowCell {
146                    family_name: cell_family_name.clone().unwrap_or("".to_owned()),
147                    qualifier: cell_name.clone().unwrap(), // checked above
148                    value: cell_value,
149                    timestamp_micros: cell_timestamp,
150                    labels: cell_labels,
151                };
152                cell_value = vec![]; // borrow checker
153                cell_labels = vec![];
154                row_data.push(row_cell);
155            }
156            // make sure we start a new cell in case the qualifier doesn't change
157            start_new_cell = true;
158        }
159
160        // End of a row?
161        match chunk.row_status {
162            None => {
163                // more for this row, don't push to row_data or rows vector, let the next
164                // chunk close up those vectors.
165            }
166            Some(RowStatus::CommitRow(flag)) => {
167                if let Some(row_key) = row_key.clone() {
168                    rows.push(Ok((row_key, row_data)));
169                    row_data = vec![];
170                }
171                if flag {
172                    if let Some(row_key) = row_key.clone() {
173                        let no_duplicated_key = key_set.insert(row_key);
174                        if !no_duplicated_key {
175                            rows.truncate(committed_row_cell_count);
176                            rows.push(Err(Error::ChunkError(
177                                "Invalid - duplicate row key".to_owned(),
178                            )));
179                            return rows;
180                        }
181                    }
182                    if chunk.value_size != 0 {
183                        // meaning chunk is not ended yet
184                        rows.truncate(committed_row_cell_count);
185                        rows.push(Err(Error::ChunkError(
186                            "Invalid - commit with chunk not ended".to_owned(),
187                        )));
188                        return rows;
189                    }
190
191                    committed_row_cell_count = rows.len();
192                    start_new_row = false;
193                }
194            }
195            Some(RowStatus::ResetRow(_)) => {
196                // ResetRow indicates that the client should drop all previous chunks for
197                // `row_key`, as it will be re-read from the beginning.
198                row_key = None;
199                row_data = vec![];
200                start_new_row = false;
201                rows.truncate(committed_row_cell_count);
202
203                if !chunk_value_is_empty {
204                    rows.truncate(committed_row_cell_count);
205                    rows.push(Err(Error::ChunkError(
206                        "Invalid - reset with chunk".to_owned(),
207                    )));
208                    return rows;
209                }
210            }
211        }
212    }
213
214    if start_new_row && committed_row_cell_count == 0 {
215        return vec![Err(Error::ChunkError("No rows committed".to_owned()))];
216    }
217
218    if start_new_row {
219        rows.truncate(committed_row_cell_count);
220        rows.push(Err(Error::ChunkError(
221            "Invalid - last row missing commit".to_owned(),
222        )));
223        return rows;
224    }
225
226    return rows;
227}