object_store/
delimited.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility for streaming newline delimited files from object storage
19
20use std::collections::VecDeque;
21
22use bytes::Bytes;
23use futures::{Stream, StreamExt};
24use snafu::{ensure, Snafu};
25
26use super::Result;
27
28#[derive(Debug, Snafu)]
29enum Error {
30    #[snafu(display("encountered unterminated string"))]
31    UnterminatedString,
32
33    #[snafu(display("encountered trailing escape character"))]
34    TrailingEscape,
35}
36
37impl From<Error> for super::Error {
38    fn from(err: Error) -> Self {
39        Self::Generic {
40            store: "LineDelimiter",
41            source: Box::new(err),
42        }
43    }
44}
45
46/// The ASCII encoding of `"`
47const QUOTE: u8 = b'"';
48
49/// The ASCII encoding of `\n`
50const NEWLINE: u8 = b'\n';
51
52/// The ASCII encoding of `\`
53const ESCAPE: u8 = b'\\';
54
55/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator
56/// of [`Bytes`] containing a whole number of new line delimited records
57#[derive(Debug, Default)]
58struct LineDelimiter {
59    /// Complete chunks of [`Bytes`]
60    complete: VecDeque<Bytes>,
61    /// Remainder bytes that form the next record
62    remainder: Vec<u8>,
63    /// True if the last character was the escape character
64    is_escape: bool,
65    /// True if currently processing a quoted string
66    is_quote: bool,
67}
68
69impl LineDelimiter {
70    /// Creates a new [`LineDelimiter`] with the provided delimiter
71    fn new() -> Self {
72        Self::default()
73    }
74
75    /// Adds the next set of [`Bytes`]
76    fn push(&mut self, val: impl Into<Bytes>) {
77        let val: Bytes = val.into();
78
79        let is_escape = &mut self.is_escape;
80        let is_quote = &mut self.is_quote;
81        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
82            if *is_escape {
83                *is_escape = false;
84                None
85            } else if *v == ESCAPE {
86                *is_escape = true;
87                None
88            } else if *v == QUOTE {
89                *is_quote = !*is_quote;
90                None
91            } else if *is_quote {
92                None
93            } else {
94                (*v == NEWLINE).then_some(idx + 1)
95            }
96        });
97
98        let start_offset = match self.remainder.is_empty() {
99            true => 0,
100            false => match record_ends.next() {
101                Some(idx) => {
102                    self.remainder.extend_from_slice(&val[0..idx]);
103                    self.complete
104                        .push_back(Bytes::from(std::mem::take(&mut self.remainder)));
105                    idx
106                }
107                None => {
108                    self.remainder.extend_from_slice(&val);
109                    return;
110                }
111            },
112        };
113        let end_offset = record_ends.last().unwrap_or(start_offset);
114        if start_offset != end_offset {
115            self.complete.push_back(val.slice(start_offset..end_offset));
116        }
117
118        if end_offset != val.len() {
119            self.remainder.extend_from_slice(&val[end_offset..])
120        }
121    }
122
123    /// Marks the end of the stream, delimiting any remaining bytes
124    ///
125    /// Returns `true` if there is no remaining data to be read
126    fn finish(&mut self) -> Result<bool> {
127        if !self.remainder.is_empty() {
128            ensure!(!self.is_quote, UnterminatedStringSnafu);
129            ensure!(!self.is_quote, TrailingEscapeSnafu);
130
131            self.complete
132                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
133        }
134        Ok(self.complete.is_empty())
135    }
136}
137
138impl Iterator for LineDelimiter {
139    type Item = Bytes;
140
141    fn next(&mut self) -> Option<Self::Item> {
142        self.complete.pop_front()
143    }
144}
145
146/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
147/// yielded [`Bytes`] contains a whole number of new line delimited records
148/// accounting for `\` style escapes and `"` quotes
149pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
150where
151    S: Stream<Item = Result<Bytes>> + Unpin,
152{
153    let delimiter = LineDelimiter::new();
154
155    futures::stream::unfold(
156        (s, delimiter, false),
157        |(mut s, mut delimiter, mut exhausted)| async move {
158            loop {
159                if let Some(next) = delimiter.next() {
160                    return Some((Ok(next), (s, delimiter, exhausted)));
161                } else if exhausted {
162                    return None;
163                }
164
165                match s.next().await {
166                    Some(Ok(bytes)) => delimiter.push(bytes),
167                    Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
168                    None => {
169                        exhausted = true;
170                        match delimiter.finish() {
171                            Ok(true) => return None,
172                            Ok(false) => continue,
173                            Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
174                        }
175                    }
176                }
177            }
178        },
179    )
180}
181
182#[cfg(test)]
183mod tests {
184    use futures::stream::{BoxStream, TryStreamExt};
185
186    use super::*;
187
188    #[test]
189    fn test_delimiter() {
190        let mut delimiter = LineDelimiter::new();
191        delimiter.push("hello\nworld");
192        delimiter.push("\n\n");
193
194        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
195        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
196        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
197        assert!(delimiter.next().is_none());
198    }
199
200    #[test]
201    fn test_delimiter_escaped() {
202        let mut delimiter = LineDelimiter::new();
203        delimiter.push("");
204        delimiter.push("fo\\\n\"foo");
205        delimiter.push("bo\n\"bar\n");
206        delimiter.push("\"he");
207        delimiter.push("llo\"\n");
208        assert_eq!(
209            delimiter.next().unwrap(),
210            Bytes::from("fo\\\n\"foobo\n\"bar\n")
211        );
212        assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
213        assert!(delimiter.next().is_none());
214
215        // Verify can push further data
216        delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
217        assert!(!delimiter.finish().unwrap());
218
219        assert_eq!(
220            delimiter.next().unwrap(),
221            Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
222        );
223        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
224        assert!(delimiter.finish().unwrap());
225        assert!(delimiter.next().is_none());
226    }
227
228    #[tokio::test]
229    async fn test_delimiter_stream() {
230        let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
231        let input_stream = futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
232        let stream = newline_delimited_stream(input_stream);
233
234        let results: Vec<_> = stream.try_collect().await.unwrap();
235        assert_eq!(
236            results,
237            vec![
238                Bytes::from("hello\nworld\n"),
239                Bytes::from("bingo\n"),
240                Bytes::from("cupcakes")
241            ]
242        )
243    }
244    #[tokio::test]
245    async fn test_delimiter_unfold_stream() {
246        let input_stream: BoxStream<'static, Result<Bytes>> = futures::stream::unfold(
247            VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
248            |mut input| async move {
249                if !input.is_empty() {
250                    Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
251                } else {
252                    None
253                }
254            },
255        )
256        .boxed();
257        let stream = newline_delimited_stream(input_stream);
258
259        let results: Vec<_> = stream.try_collect().await.unwrap();
260        assert_eq!(
261            results,
262            vec![
263                Bytes::from("hello\nworld\n"),
264                Bytes::from("bingo\n"),
265                Bytes::from("cupcakes")
266            ]
267        )
268    }
269}