object_store/
buffered.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for performing tokio-style buffered IO
19
20use crate::path::Path;
21use crate::{
22    Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
23    WriteMultipart,
24};
25use bytes::Bytes;
26use futures::future::{BoxFuture, FutureExt};
27use futures::ready;
28use std::cmp::Ordering;
29use std::io::{Error, ErrorKind, SeekFrom};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
34
35/// The default buffer size used by [`BufReader`]
36pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
37
38/// An async-buffered reader compatible with the tokio IO traits
39///
40/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`]
41/// to populate its internal buffer once depleted. This buffer is cleared on seek.
42///
43/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`]
44/// methods that better map to the network APIs. This is because most object stores have
45/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary
46/// round-trips is critical to throughput.
47///
48/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`],
49/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range.
50///
51/// Systems looking to read multiple ranges of a file should instead consider using
52/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
53///
54/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
55pub struct BufReader {
56    /// The object store to fetch data from
57    store: Arc<dyn ObjectStore>,
58    /// The size of the object
59    size: u64,
60    /// The path to the object
61    path: Path,
62    /// The current position in the object
63    cursor: u64,
64    /// The number of bytes to read in a single request
65    capacity: usize,
66    /// The buffered data if any
67    buffer: Buffer,
68}
69
70impl std::fmt::Debug for BufReader {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("BufReader")
73            .field("path", &self.path)
74            .field("size", &self.size)
75            .field("capacity", &self.capacity)
76            .finish()
77    }
78}
79
80enum Buffer {
81    Empty,
82    Pending(BoxFuture<'static, std::io::Result<Bytes>>),
83    Ready(Bytes),
84}
85
86impl BufReader {
87    /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`]
88    pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
89        Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
90    }
91
92    /// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity`
93    pub fn with_capacity(store: Arc<dyn ObjectStore>, meta: &ObjectMeta, capacity: usize) -> Self {
94        Self {
95            path: meta.location.clone(),
96            size: meta.size as _,
97            store,
98            capacity,
99            cursor: 0,
100            buffer: Buffer::Empty,
101        }
102    }
103
104    fn poll_fill_buf_impl(
105        &mut self,
106        cx: &mut Context<'_>,
107        amnt: usize,
108    ) -> Poll<std::io::Result<&[u8]>> {
109        let buf = &mut self.buffer;
110        loop {
111            match buf {
112                Buffer::Empty => {
113                    let store = Arc::clone(&self.store);
114                    let path = self.path.clone();
115                    let start = self.cursor.min(self.size) as _;
116                    let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;
117
118                    if start == end {
119                        return Poll::Ready(Ok(&[]));
120                    }
121
122                    *buf = Buffer::Pending(Box::pin(async move {
123                        Ok(store.get_range(&path, start..end).await?)
124                    }))
125                }
126                Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
127                    Ok(b) => *buf = Buffer::Ready(b),
128                    Err(e) => return Poll::Ready(Err(e)),
129                },
130                Buffer::Ready(r) => return Poll::Ready(Ok(r)),
131            }
132        }
133    }
134}
135
136impl AsyncSeek for BufReader {
137    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
138        self.cursor = match position {
139            SeekFrom::Start(offset) => offset,
140            SeekFrom::End(offset) => checked_add_signed(self.size, offset).ok_or_else(|| {
141                Error::new(
142                    ErrorKind::InvalidInput,
143                    format!(
144                        "Seeking {offset} from end of {} byte file would result in overflow",
145                        self.size
146                    ),
147                )
148            })?,
149            SeekFrom::Current(offset) => {
150                checked_add_signed(self.cursor, offset).ok_or_else(|| {
151                    Error::new(
152                        ErrorKind::InvalidInput,
153                        format!(
154                            "Seeking {offset} from current offset of {} would result in overflow",
155                            self.cursor
156                        ),
157                    )
158                })?
159            }
160        };
161        self.buffer = Buffer::Empty;
162        Ok(())
163    }
164
165    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
166        Poll::Ready(Ok(self.cursor))
167    }
168}
169
170impl AsyncRead for BufReader {
171    fn poll_read(
172        mut self: Pin<&mut Self>,
173        cx: &mut Context<'_>,
174        out: &mut ReadBuf<'_>,
175    ) -> Poll<std::io::Result<()>> {
176        // Read the maximum of the internal buffer and `out`
177        let to_read = out.remaining().max(self.capacity);
178        let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
179            Ok(buf) => {
180                let to_consume = out.remaining().min(buf.len());
181                out.put_slice(&buf[..to_consume]);
182                self.consume(to_consume);
183                Ok(())
184            }
185            Err(e) => Err(e),
186        };
187        Poll::Ready(r)
188    }
189}
190
191impl AsyncBufRead for BufReader {
192    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
193        let capacity = self.capacity;
194        self.get_mut().poll_fill_buf_impl(cx, capacity)
195    }
196
197    fn consume(mut self: Pin<&mut Self>, amt: usize) {
198        match &mut self.buffer {
199            Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
200            Buffer::Ready(b) => match b.len().cmp(&amt) {
201                Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
202                Ordering::Greater => *b = b.slice(amt..),
203                Ordering::Equal => self.buffer = Buffer::Empty,
204            },
205            Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
206        }
207        self.cursor += amt as u64;
208    }
209}
210
211/// An async buffered writer compatible with the tokio IO traits
212///
213/// This writer adaptively uses [`ObjectStore::put`] or
214/// [`ObjectStore::put_multipart`] depending on the amount of data that has
215/// been written.
216///
217/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
218/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
219/// streamed using [`ObjectStore::put_multipart`]
220pub struct BufWriter {
221    capacity: usize,
222    max_concurrency: usize,
223    attributes: Option<Attributes>,
224    tags: Option<TagSet>,
225    state: BufWriterState,
226    store: Arc<dyn ObjectStore>,
227}
228
229impl std::fmt::Debug for BufWriter {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        f.debug_struct("BufWriter")
232            .field("capacity", &self.capacity)
233            .finish()
234    }
235}
236
237enum BufWriterState {
238    /// Buffer up to capacity bytes
239    Buffer(Path, PutPayloadMut),
240    /// [`ObjectStore::put_multipart`]
241    Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
242    /// Write to a multipart upload
243    Write(Option<WriteMultipart>),
244    /// [`ObjectStore::put`]
245    Flush(BoxFuture<'static, crate::Result<()>>),
246}
247
248impl BufWriter {
249    /// Create a new [`BufWriter`] from the provided [`ObjectStore`] and [`Path`]
250    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
251        Self::with_capacity(store, path, 10 * 1024 * 1024)
252    }
253
254    /// Create a new [`BufWriter`] from the provided [`ObjectStore`], [`Path`] and `capacity`
255    pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity: usize) -> Self {
256        Self {
257            capacity,
258            store,
259            max_concurrency: 8,
260            attributes: None,
261            tags: None,
262            state: BufWriterState::Buffer(path, PutPayloadMut::new()),
263        }
264    }
265
266    /// Override the maximum number of in-flight requests for this writer
267    ///
268    /// Defaults to 8
269    pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
270        Self {
271            max_concurrency,
272            ..self
273        }
274    }
275
276    /// Set the attributes of the uploaded object
277    pub fn with_attributes(self, attributes: Attributes) -> Self {
278        Self {
279            attributes: Some(attributes),
280            ..self
281        }
282    }
283
284    /// Set the tags of the uploaded object
285    pub fn with_tags(self, tags: TagSet) -> Self {
286        Self {
287            tags: Some(tags),
288            ..self
289        }
290    }
291
292    /// Write data to the writer in [`Bytes`].
293    ///
294    /// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying.
295    ///
296    /// This API is recommended while the data source generates [`Bytes`].
297    pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> {
298        loop {
299            return match &mut self.state {
300                BufWriterState::Write(Some(write)) => {
301                    write.wait_for_capacity(self.max_concurrency).await?;
302                    write.put(bytes);
303                    Ok(())
304                }
305                BufWriterState::Write(None) | BufWriterState::Flush(_) => {
306                    panic!("Already shut down")
307                }
308                // NOTE
309                //
310                // This case should never happen in practice, but rust async API does
311                // make it possible for users to call `put` before `poll_write` returns `Ready`.
312                //
313                // We allow such usage by `await` the future and continue the loop.
314                BufWriterState::Prepare(f) => {
315                    self.state = BufWriterState::Write(f.await?.into());
316                    continue;
317                }
318                BufWriterState::Buffer(path, b) => {
319                    if b.content_length().saturating_add(bytes.len()) < self.capacity {
320                        b.push(bytes);
321                        Ok(())
322                    } else {
323                        let buffer = std::mem::take(b);
324                        let path = std::mem::take(path);
325                        let opts = PutMultipartOpts {
326                            attributes: self.attributes.take().unwrap_or_default(),
327                            tags: self.tags.take().unwrap_or_default(),
328                        };
329                        let upload = self.store.put_multipart_opts(&path, opts).await?;
330                        let mut chunked =
331                            WriteMultipart::new_with_chunk_size(upload, self.capacity);
332                        for chunk in buffer.freeze() {
333                            chunked.put(chunk);
334                        }
335                        chunked.put(bytes);
336                        self.state = BufWriterState::Write(Some(chunked));
337                        Ok(())
338                    }
339                }
340            };
341        }
342    }
343
344    /// Abort this writer, cleaning up any partially uploaded state
345    ///
346    /// # Panic
347    ///
348    /// Panics if this writer has already been shutdown or aborted
349    pub async fn abort(&mut self) -> crate::Result<()> {
350        match &mut self.state {
351            BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()),
352            BufWriterState::Flush(_) => panic!("Already shut down"),
353            BufWriterState::Write(x) => x.take().unwrap().abort().await,
354        }
355    }
356}
357
358impl AsyncWrite for BufWriter {
359    fn poll_write(
360        mut self: Pin<&mut Self>,
361        cx: &mut Context<'_>,
362        buf: &[u8],
363    ) -> Poll<Result<usize, Error>> {
364        let cap = self.capacity;
365        let max_concurrency = self.max_concurrency;
366        loop {
367            return match &mut self.state {
368                BufWriterState::Write(Some(write)) => {
369                    ready!(write.poll_for_capacity(cx, max_concurrency))?;
370                    write.write(buf);
371                    Poll::Ready(Ok(buf.len()))
372                }
373                BufWriterState::Write(None) | BufWriterState::Flush(_) => {
374                    panic!("Already shut down")
375                }
376                BufWriterState::Prepare(f) => {
377                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
378                    continue;
379                }
380                BufWriterState::Buffer(path, b) => {
381                    if b.content_length().saturating_add(buf.len()) >= cap {
382                        let buffer = std::mem::take(b);
383                        let path = std::mem::take(path);
384                        let opts = PutMultipartOpts {
385                            attributes: self.attributes.take().unwrap_or_default(),
386                            tags: self.tags.take().unwrap_or_default(),
387                        };
388                        let store = Arc::clone(&self.store);
389                        self.state = BufWriterState::Prepare(Box::pin(async move {
390                            let upload = store.put_multipart_opts(&path, opts).await?;
391                            let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
392                            for chunk in buffer.freeze() {
393                                chunked.put(chunk);
394                            }
395                            Ok(chunked)
396                        }));
397                        continue;
398                    }
399                    b.extend_from_slice(buf);
400                    Poll::Ready(Ok(buf.len()))
401                }
402            };
403        }
404    }
405
406    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
407        loop {
408            return match &mut self.state {
409                BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
410                BufWriterState::Flush(_) => panic!("Already shut down"),
411                BufWriterState::Prepare(f) => {
412                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
413                    continue;
414                }
415            };
416        }
417    }
418
419    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
420        loop {
421            match &mut self.state {
422                BufWriterState::Prepare(f) => {
423                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
424                }
425                BufWriterState::Buffer(p, b) => {
426                    let buf = std::mem::take(b);
427                    let path = std::mem::take(p);
428                    let opts = PutOptions {
429                        attributes: self.attributes.take().unwrap_or_default(),
430                        tags: self.tags.take().unwrap_or_default(),
431                        ..Default::default()
432                    };
433                    let store = Arc::clone(&self.store);
434                    self.state = BufWriterState::Flush(Box::pin(async move {
435                        store.put_opts(&path, buf.into(), opts).await?;
436                        Ok(())
437                    }));
438                }
439                BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from),
440                BufWriterState::Write(x) => {
441                    let upload = x.take().ok_or_else(|| {
442                        std::io::Error::new(
443                            ErrorKind::InvalidInput,
444                            "Cannot shutdown a writer that has already been shut down",
445                        )
446                    })?;
447                    self.state = BufWriterState::Flush(
448                        async move {
449                            upload.finish().await?;
450                            Ok(())
451                        }
452                        .boxed(),
453                    )
454                }
455            }
456        }
457    }
458}
459
460/// Port of standardised function as requires Rust 1.66
461///
462/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
463#[inline]
464fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
465    let (res, overflowed) = a.overflowing_add(rhs as _);
466    let overflow = overflowed ^ (rhs < 0);
467    (!overflow).then_some(res)
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use crate::memory::InMemory;
474    use crate::path::Path;
475    use crate::{Attribute, GetOptions};
476    use itertools::Itertools;
477    use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
478
479    #[tokio::test]
480    async fn test_buf_reader() {
481        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
482
483        let existent = Path::from("exists.txt");
484        const BYTES: usize = 4096;
485
486        let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
487        store.put(&existent, data.clone().into()).await.unwrap();
488
489        let meta = store.head(&existent).await.unwrap();
490
491        let mut reader = BufReader::new(Arc::clone(&store), &meta);
492        let mut out = Vec::with_capacity(BYTES);
493        let read = reader.read_to_end(&mut out).await.unwrap();
494
495        assert_eq!(read, BYTES);
496        assert_eq!(&out, &data);
497
498        let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
499        assert_eq!(
500            err.to_string(),
501            "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"
502        );
503
504        reader.rewind().await.unwrap();
505
506        let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
507        assert_eq!(
508            err.to_string(),
509            "Seeking -1 from current offset of 0 would result in overflow"
510        );
511
512        // Seeking beyond the bounds of the file is permitted but should return no data
513        reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
514        let buf = reader.fill_buf().await.unwrap();
515        assert!(buf.is_empty());
516
517        let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
518        assert_eq!(
519            err.to_string(),
520            "Seeking 1 from current offset of 18446744073709551615 would result in overflow"
521        );
522
523        for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
524            let store = Arc::clone(&store);
525            let mut reader = BufReader::with_capacity(store, &meta, capacity);
526
527            let mut bytes_read = 0;
528            loop {
529                let buf = reader.fill_buf().await.unwrap();
530                if buf.is_empty() {
531                    assert_eq!(bytes_read, BYTES);
532                    break;
533                }
534                assert!(buf.starts_with(b"12345678"));
535                bytes_read += 8;
536                reader.consume(8);
537            }
538
539            let mut buf = Vec::with_capacity(76);
540            reader.seek(SeekFrom::Current(-76)).await.unwrap();
541            reader.read_to_end(&mut buf).await.unwrap();
542            assert_eq!(&buf, &data[BYTES - 76..]);
543
544            reader.rewind().await.unwrap();
545            let buffer = reader.fill_buf().await.unwrap();
546            assert_eq!(buffer, &data[..capacity.min(BYTES)]);
547
548            reader.seek(SeekFrom::Start(325)).await.unwrap();
549            let buffer = reader.fill_buf().await.unwrap();
550            assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);
551
552            reader.seek(SeekFrom::End(0)).await.unwrap();
553            let buffer = reader.fill_buf().await.unwrap();
554            assert!(buffer.is_empty());
555        }
556    }
557
558    // Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging`
559    #[tokio::test]
560    async fn test_buf_writer() {
561        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
562        let path = Path::from("file.txt");
563        let attributes = Attributes::from_iter([
564            (Attribute::ContentType, "text/html"),
565            (Attribute::CacheControl, "max-age=604800"),
566        ]);
567
568        // Test put
569        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
570            .with_attributes(attributes.clone());
571        writer.write_all(&[0; 20]).await.unwrap();
572        writer.flush().await.unwrap();
573        writer.write_all(&[0; 5]).await.unwrap();
574        writer.shutdown().await.unwrap();
575        let response = store
576            .get_opts(
577                &path,
578                GetOptions {
579                    head: true,
580                    ..Default::default()
581                },
582            )
583            .await
584            .unwrap();
585        assert_eq!(response.meta.size, 25);
586        assert_eq!(response.attributes, attributes);
587
588        // Test multipart
589        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
590            .with_attributes(attributes.clone());
591        writer.write_all(&[0; 20]).await.unwrap();
592        writer.flush().await.unwrap();
593        writer.write_all(&[0; 20]).await.unwrap();
594        writer.shutdown().await.unwrap();
595        let response = store
596            .get_opts(
597                &path,
598                GetOptions {
599                    head: true,
600                    ..Default::default()
601                },
602            )
603            .await
604            .unwrap();
605        assert_eq!(response.meta.size, 40);
606        assert_eq!(response.attributes, attributes);
607    }
608
609    #[tokio::test]
610    async fn test_buf_writer_with_put() {
611        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
612        let path = Path::from("file.txt");
613
614        // Test put
615        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
616        writer
617            .put(Bytes::from((0..20).collect_vec()))
618            .await
619            .unwrap();
620        writer
621            .put(Bytes::from((20..25).collect_vec()))
622            .await
623            .unwrap();
624        writer.shutdown().await.unwrap();
625        let response = store
626            .get_opts(
627                &path,
628                GetOptions {
629                    head: true,
630                    ..Default::default()
631                },
632            )
633            .await
634            .unwrap();
635        assert_eq!(response.meta.size, 25);
636        assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec());
637
638        // Test multipart
639        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
640        writer
641            .put(Bytes::from((0..20).collect_vec()))
642            .await
643            .unwrap();
644        writer
645            .put(Bytes::from((20..40).collect_vec()))
646            .await
647            .unwrap();
648        writer.shutdown().await.unwrap();
649        let response = store
650            .get_opts(
651                &path,
652                GetOptions {
653                    head: true,
654                    ..Default::default()
655                },
656            )
657            .await
658            .unwrap();
659        assert_eq!(response.meta.size, 40);
660        assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec());
661    }
662}