object_store/
chunked.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A [`ChunkedStore`] that can be used to test streaming behaviour
19
20use std::fmt::{Debug, Display, Formatter};
21use std::ops::Range;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use bytes::{BufMut, Bytes, BytesMut};
26use futures::stream::BoxStream;
27use futures::StreamExt;
28
29use crate::path::Path;
30use crate::{
31    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
32    PutMultipartOpts, PutOptions, PutResult,
33};
34use crate::{PutPayload, Result};
35
36/// Wraps a [`ObjectStore`] and makes its get response return chunks
37/// in a controllable manner.
38///
39/// A `ChunkedStore` makes the memory consumption and performance of
40/// the wrapped [`ObjectStore`] worse. It is intended for use within
41/// tests, to control the chunks in the produced output streams. For
42/// example, it is used to verify the delimiting logic in
43/// newline_delimited_stream.
44#[derive(Debug)]
45pub struct ChunkedStore {
46    inner: Arc<dyn ObjectStore>,
47    chunk_size: usize,
48}
49
50impl ChunkedStore {
51    /// Creates a new [`ChunkedStore`] with the specified chunk_size
52    pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
53        Self { inner, chunk_size }
54    }
55}
56
57impl Display for ChunkedStore {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        write!(f, "ChunkedStore({})", self.inner)
60    }
61}
62
63#[async_trait]
64impl ObjectStore for ChunkedStore {
65    async fn put_opts(
66        &self,
67        location: &Path,
68        payload: PutPayload,
69        opts: PutOptions,
70    ) -> Result<PutResult> {
71        self.inner.put_opts(location, payload, opts).await
72    }
73
74    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
75        self.inner.put_multipart(location).await
76    }
77
78    async fn put_multipart_opts(
79        &self,
80        location: &Path,
81        opts: PutMultipartOpts,
82    ) -> Result<Box<dyn MultipartUpload>> {
83        self.inner.put_multipart_opts(location, opts).await
84    }
85
86    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
87        let r = self.inner.get_opts(location, options).await?;
88        let stream = match r.payload {
89            GetResultPayload::File(file, path) => {
90                crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
91            }
92            GetResultPayload::Stream(stream) => {
93                let buffer = BytesMut::new();
94                futures::stream::unfold(
95                    (stream, buffer, false, self.chunk_size),
96                    |(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
97                        // Keep accumulating bytes until we reach capacity as long as
98                        // the stream can provide them:
99                        if exhausted {
100                            return None;
101                        }
102                        while buffer.len() < chunk_size {
103                            match stream.next().await {
104                                None => {
105                                    exhausted = true;
106                                    let slice = buffer.split_off(0).freeze();
107                                    return Some((
108                                        Ok(slice),
109                                        (stream, buffer, exhausted, chunk_size),
110                                    ));
111                                }
112                                Some(Ok(bytes)) => {
113                                    buffer.put(bytes);
114                                }
115                                Some(Err(e)) => {
116                                    return Some((
117                                        Err(crate::Error::Generic {
118                                            store: "ChunkedStore",
119                                            source: Box::new(e),
120                                        }),
121                                        (stream, buffer, exhausted, chunk_size),
122                                    ))
123                                }
124                            };
125                        }
126                        // Return the chunked values as the next value in the stream
127                        let slice = buffer.split_to(chunk_size).freeze();
128                        Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
129                    },
130                )
131                .boxed()
132            }
133        };
134        Ok(GetResult {
135            payload: GetResultPayload::Stream(stream),
136            ..r
137        })
138    }
139
140    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
141        self.inner.get_range(location, range).await
142    }
143
144    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
145        self.inner.head(location).await
146    }
147
148    async fn delete(&self, location: &Path) -> Result<()> {
149        self.inner.delete(location).await
150    }
151
152    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
153        self.inner.list(prefix)
154    }
155
156    fn list_with_offset(
157        &self,
158        prefix: Option<&Path>,
159        offset: &Path,
160    ) -> BoxStream<'_, Result<ObjectMeta>> {
161        self.inner.list_with_offset(prefix, offset)
162    }
163
164    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
165        self.inner.list_with_delimiter(prefix).await
166    }
167
168    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
169        self.inner.copy(from, to).await
170    }
171
172    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
173        self.inner.copy_if_not_exists(from, to).await
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use futures::StreamExt;
180
181    use crate::integration::*;
182    use crate::local::LocalFileSystem;
183    use crate::memory::InMemory;
184    use crate::path::Path;
185
186    use super::*;
187
188    #[tokio::test]
189    async fn test_chunked_basic() {
190        let location = Path::parse("test").unwrap();
191        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
192        store.put(&location, vec![0; 1001].into()).await.unwrap();
193
194        for chunk_size in [10, 20, 31] {
195            let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
196            let mut s = match store.get(&location).await.unwrap().payload {
197                GetResultPayload::Stream(s) => s,
198                _ => unreachable!(),
199            };
200
201            let mut remaining = 1001;
202            while let Some(next) = s.next().await {
203                let size = next.unwrap().len();
204                let expected = remaining.min(chunk_size);
205                assert_eq!(size, expected);
206                remaining -= expected;
207            }
208            assert_eq!(remaining, 0);
209        }
210    }
211
212    #[tokio::test]
213    async fn test_chunked() {
214        let temporary = tempfile::tempdir().unwrap();
215        let integrations: &[Arc<dyn ObjectStore>] = &[
216            Arc::new(InMemory::new()),
217            Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
218        ];
219
220        for integration in integrations {
221            let integration = ChunkedStore::new(Arc::clone(integration), 100);
222
223            put_get_delete_list(&integration).await;
224            get_opts(&integration).await;
225            list_uses_directories_correctly(&integration).await;
226            list_with_delimiter(&integration).await;
227            rename_and_copy(&integration).await;
228            copy_if_not_exists(&integration).await;
229            stream_get(&integration).await;
230        }
231    }
232}