bigtable_rs/bigtable/
read_rows.rs1use crate::bigtable::{Error, Result, RowCell, RowKey};
2use crate::google::bigtable::v2::read_rows_response::cell_chunk::RowStatus;
3use crate::google::bigtable::v2::read_rows_response::CellChunk;
4use crate::google::bigtable::v2::ReadRowsResponse;
5use futures_util::stream::iter;
6use futures_util::{Stream, StreamExt};
7use log::trace;
8use std::collections::HashSet;
9use std::time::{Duration, Instant};
10use tonic::Streaming;
11
12pub async fn decode_read_rows_response(
15 timeout: &Option<Duration>,
16 mut rrr: Streaming<ReadRowsResponse>,
17) -> Result<Vec<(RowKey, Vec<RowCell>)>> {
18 let mut rows: Vec<(RowKey, Vec<RowCell>)> = vec![];
19
20 let started = Instant::now();
21 while let Some(res) = rrr.message().await? {
22 if let Some(timeout) = timeout.as_ref() {
23 if Instant::now().duration_since(started) > *timeout {
24 return Err(Error::TimeoutError(timeout.as_secs()));
25 }
26 }
27 let rows_part = decode_read_rows_response_to_vec(res.chunks);
28 for part in rows_part.into_iter() {
29 match part {
30 Ok(part) => rows.push(part),
31 Err(e) => return Err(e),
32 }
33 }
34 }
35 Ok(rows)
36}
37
38pub async fn decode_read_rows_response_stream(
40 rrr: Streaming<ReadRowsResponse>,
41) -> impl Stream<Item = Result<(RowKey, Vec<RowCell>)>> {
42 rrr.flat_map(|message| match message {
43 Ok(response) => {
44 let results = decode_read_rows_response_to_vec(response.chunks);
45 iter(results)
46 }
47 Err(e) => iter(vec![Err(Error::RpcError(e))]),
48 })
49}
50
51pub fn decode_read_rows_response_to_vec(
52 chunks: Vec<CellChunk>,
53) -> Vec<Result<(RowKey, Vec<RowCell>)>> {
54 let mut rows: Vec<Result<(RowKey, Vec<RowCell>)>> = vec![];
55 let mut row_key = None;
56 let mut row_data: Vec<RowCell> = vec![];
57
58 let mut cell_family_name = None;
59 let mut cell_name = None;
60 let mut cell_timestamp = 0;
61 let mut cell_value = vec![];
62 let mut cell_value_size: usize;
67 let mut cell_labels = vec![];
68
69 let mut start_new_cell = false;
70 let mut committed_row_cell_count = 0usize;
71 let mut start_new_row = false; let mut key_set: HashSet<Vec<u8>> = HashSet::new();
74 let mut chunk_value_is_empty: bool;
75
76 if chunks.is_empty() {
77 return rows;
78 }
79
80 for (i, mut chunk) in chunks.into_iter().enumerate() {
81 trace!("chunk {}: {:?}", i, chunk.value);
84
85 if !chunk.row_key.is_empty() {
87 if row_key.is_none() || row_key.take().unwrap() != chunk.row_key {
88 if start_new_row {
90 rows.truncate(committed_row_cell_count);
91 rows.push(Err(Error::ChunkError(
92 "Invalid - no commit before key changes".to_owned(),
93 )));
94 return rows;
95 }
96 start_new_row = true;
97 }
98 row_key = Some(chunk.row_key);
99 } else {
100 if !start_new_row {
102 rows.truncate(committed_row_cell_count);
103 rows.push(Err(Error::ChunkError(
104 "Invalid - new row missing row key".to_owned(),
105 )));
106 return rows;
107 }
108 }
109
110 if chunk.family_name.is_some()
112 && !chunk.family_name.eq(&cell_family_name)
113 && chunk.qualifier.is_none()
114 {
115 rows.truncate(committed_row_cell_count);
116 rows.push(Err(Error::ChunkError(
117 "new col family but no specified qualifier".to_owned(),
118 )));
119 return rows;
120 }
121
122 if (start_new_cell && cell_name.is_some()) || chunk.qualifier.is_some() {
124 if chunk.value_size == 0 {
125 cell_value_size = chunk.value.len();
126 } else {
127 cell_value_size = chunk.value_size as usize;
128 }
129 cell_value = Vec::with_capacity(cell_value_size);
130 cell_family_name = chunk.family_name.or(cell_family_name);
132 cell_name = chunk.qualifier.or(cell_name);
133 cell_timestamp = chunk.timestamp_micros;
134 cell_labels = chunk.labels;
135 start_new_cell = false;
136 }
137
138 chunk_value_is_empty = chunk.value.is_empty();
139 cell_value.append(&mut chunk.value);
140
141 if chunk.value_size == 0 {
143 if cell_name.is_some() {
145 let row_cell = RowCell {
146 family_name: cell_family_name.clone().unwrap_or("".to_owned()),
147 qualifier: cell_name.clone().unwrap(), value: cell_value,
149 timestamp_micros: cell_timestamp,
150 labels: cell_labels,
151 };
152 cell_value = vec![]; cell_labels = vec![];
154 row_data.push(row_cell);
155 }
156 start_new_cell = true;
158 }
159
160 match chunk.row_status {
162 None => {
163 }
166 Some(RowStatus::CommitRow(flag)) => {
167 if let Some(row_key) = row_key.clone() {
168 rows.push(Ok((row_key, row_data)));
169 row_data = vec![];
170 }
171 if flag {
172 if let Some(row_key) = row_key.clone() {
173 let no_duplicated_key = key_set.insert(row_key);
174 if !no_duplicated_key {
175 rows.truncate(committed_row_cell_count);
176 rows.push(Err(Error::ChunkError(
177 "Invalid - duplicate row key".to_owned(),
178 )));
179 return rows;
180 }
181 }
182 if chunk.value_size != 0 {
183 rows.truncate(committed_row_cell_count);
185 rows.push(Err(Error::ChunkError(
186 "Invalid - commit with chunk not ended".to_owned(),
187 )));
188 return rows;
189 }
190
191 committed_row_cell_count = rows.len();
192 start_new_row = false;
193 }
194 }
195 Some(RowStatus::ResetRow(_)) => {
196 row_key = None;
199 row_data = vec![];
200 start_new_row = false;
201 rows.truncate(committed_row_cell_count);
202
203 if !chunk_value_is_empty {
204 rows.truncate(committed_row_cell_count);
205 rows.push(Err(Error::ChunkError(
206 "Invalid - reset with chunk".to_owned(),
207 )));
208 return rows;
209 }
210 }
211 }
212 }
213
214 if start_new_row && committed_row_cell_count == 0 {
215 return vec![Err(Error::ChunkError("No rows committed".to_owned()))];
216 }
217
218 if start_new_row {
219 rows.truncate(committed_row_cell_count);
220 rows.push(Err(Error::ChunkError(
221 "Invalid - last row missing commit".to_owned(),
222 )));
223 return rows;
224 }
225
226 return rows;
227}