object_store/
delimited.rs1use std::collections::VecDeque;
21
22use bytes::Bytes;
23use futures::{Stream, StreamExt};
24use snafu::{ensure, Snafu};
25
26use super::Result;
27
28#[derive(Debug, Snafu)]
29enum Error {
30 #[snafu(display("encountered unterminated string"))]
31 UnterminatedString,
32
33 #[snafu(display("encountered trailing escape character"))]
34 TrailingEscape,
35}
36
37impl From<Error> for super::Error {
38 fn from(err: Error) -> Self {
39 Self::Generic {
40 store: "LineDelimiter",
41 source: Box::new(err),
42 }
43 }
44}
45
46const QUOTE: u8 = b'"';
48
49const NEWLINE: u8 = b'\n';
51
52const ESCAPE: u8 = b'\\';
54
55#[derive(Debug, Default)]
58struct LineDelimiter {
59 complete: VecDeque<Bytes>,
61 remainder: Vec<u8>,
63 is_escape: bool,
65 is_quote: bool,
67}
68
69impl LineDelimiter {
70 fn new() -> Self {
72 Self::default()
73 }
74
75 fn push(&mut self, val: impl Into<Bytes>) {
77 let val: Bytes = val.into();
78
79 let is_escape = &mut self.is_escape;
80 let is_quote = &mut self.is_quote;
81 let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
82 if *is_escape {
83 *is_escape = false;
84 None
85 } else if *v == ESCAPE {
86 *is_escape = true;
87 None
88 } else if *v == QUOTE {
89 *is_quote = !*is_quote;
90 None
91 } else if *is_quote {
92 None
93 } else {
94 (*v == NEWLINE).then_some(idx + 1)
95 }
96 });
97
98 let start_offset = match self.remainder.is_empty() {
99 true => 0,
100 false => match record_ends.next() {
101 Some(idx) => {
102 self.remainder.extend_from_slice(&val[0..idx]);
103 self.complete
104 .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
105 idx
106 }
107 None => {
108 self.remainder.extend_from_slice(&val);
109 return;
110 }
111 },
112 };
113 let end_offset = record_ends.last().unwrap_or(start_offset);
114 if start_offset != end_offset {
115 self.complete.push_back(val.slice(start_offset..end_offset));
116 }
117
118 if end_offset != val.len() {
119 self.remainder.extend_from_slice(&val[end_offset..])
120 }
121 }
122
123 fn finish(&mut self) -> Result<bool> {
127 if !self.remainder.is_empty() {
128 ensure!(!self.is_quote, UnterminatedStringSnafu);
129 ensure!(!self.is_quote, TrailingEscapeSnafu);
130
131 self.complete
132 .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
133 }
134 Ok(self.complete.is_empty())
135 }
136}
137
138impl Iterator for LineDelimiter {
139 type Item = Bytes;
140
141 fn next(&mut self) -> Option<Self::Item> {
142 self.complete.pop_front()
143 }
144}
145
146pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
150where
151 S: Stream<Item = Result<Bytes>> + Unpin,
152{
153 let delimiter = LineDelimiter::new();
154
155 futures::stream::unfold(
156 (s, delimiter, false),
157 |(mut s, mut delimiter, mut exhausted)| async move {
158 loop {
159 if let Some(next) = delimiter.next() {
160 return Some((Ok(next), (s, delimiter, exhausted)));
161 } else if exhausted {
162 return None;
163 }
164
165 match s.next().await {
166 Some(Ok(bytes)) => delimiter.push(bytes),
167 Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
168 None => {
169 exhausted = true;
170 match delimiter.finish() {
171 Ok(true) => return None,
172 Ok(false) => continue,
173 Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
174 }
175 }
176 }
177 }
178 },
179 )
180}
181
182#[cfg(test)]
183mod tests {
184 use futures::stream::{BoxStream, TryStreamExt};
185
186 use super::*;
187
188 #[test]
189 fn test_delimiter() {
190 let mut delimiter = LineDelimiter::new();
191 delimiter.push("hello\nworld");
192 delimiter.push("\n\n");
193
194 assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
195 assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
196 assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
197 assert!(delimiter.next().is_none());
198 }
199
200 #[test]
201 fn test_delimiter_escaped() {
202 let mut delimiter = LineDelimiter::new();
203 delimiter.push("");
204 delimiter.push("fo\\\n\"foo");
205 delimiter.push("bo\n\"bar\n");
206 delimiter.push("\"he");
207 delimiter.push("llo\"\n");
208 assert_eq!(
209 delimiter.next().unwrap(),
210 Bytes::from("fo\\\n\"foobo\n\"bar\n")
211 );
212 assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
213 assert!(delimiter.next().is_none());
214
215 delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
217 assert!(!delimiter.finish().unwrap());
218
219 assert_eq!(
220 delimiter.next().unwrap(),
221 Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
222 );
223 assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
224 assert!(delimiter.finish().unwrap());
225 assert!(delimiter.next().is_none());
226 }
227
228 #[tokio::test]
229 async fn test_delimiter_stream() {
230 let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
231 let input_stream = futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
232 let stream = newline_delimited_stream(input_stream);
233
234 let results: Vec<_> = stream.try_collect().await.unwrap();
235 assert_eq!(
236 results,
237 vec![
238 Bytes::from("hello\nworld\n"),
239 Bytes::from("bingo\n"),
240 Bytes::from("cupcakes")
241 ]
242 )
243 }
244 #[tokio::test]
245 async fn test_delimiter_unfold_stream() {
246 let input_stream: BoxStream<'static, Result<Bytes>> = futures::stream::unfold(
247 VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
248 |mut input| async move {
249 if !input.is_empty() {
250 Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
251 } else {
252 None
253 }
254 },
255 )
256 .boxed();
257 let stream = newline_delimited_stream(input_stream);
258
259 let results: Vec<_> = stream.try_collect().await.unwrap();
260 assert_eq!(
261 results,
262 vec![
263 Bytes::from("hello\nworld\n"),
264 Bytes::from("bingo\n"),
265 Bytes::from("cupcakes")
266 ]
267 )
268 }
269}