1use std::fmt::{Debug, Display, Formatter};
21use std::ops::Range;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use bytes::{BufMut, Bytes, BytesMut};
26use futures::stream::BoxStream;
27use futures::StreamExt;
28
29use crate::path::Path;
30use crate::{
31 GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
32 PutMultipartOpts, PutOptions, PutResult,
33};
34use crate::{PutPayload, Result};
35
36#[derive(Debug)]
45pub struct ChunkedStore {
46 inner: Arc<dyn ObjectStore>,
47 chunk_size: usize,
48}
49
50impl ChunkedStore {
51 pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
53 Self { inner, chunk_size }
54 }
55}
56
57impl Display for ChunkedStore {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 write!(f, "ChunkedStore({})", self.inner)
60 }
61}
62
63#[async_trait]
64impl ObjectStore for ChunkedStore {
65 async fn put_opts(
66 &self,
67 location: &Path,
68 payload: PutPayload,
69 opts: PutOptions,
70 ) -> Result<PutResult> {
71 self.inner.put_opts(location, payload, opts).await
72 }
73
74 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
75 self.inner.put_multipart(location).await
76 }
77
78 async fn put_multipart_opts(
79 &self,
80 location: &Path,
81 opts: PutMultipartOpts,
82 ) -> Result<Box<dyn MultipartUpload>> {
83 self.inner.put_multipart_opts(location, opts).await
84 }
85
86 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
87 let r = self.inner.get_opts(location, options).await?;
88 let stream = match r.payload {
89 GetResultPayload::File(file, path) => {
90 crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
91 }
92 GetResultPayload::Stream(stream) => {
93 let buffer = BytesMut::new();
94 futures::stream::unfold(
95 (stream, buffer, false, self.chunk_size),
96 |(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
97 if exhausted {
100 return None;
101 }
102 while buffer.len() < chunk_size {
103 match stream.next().await {
104 None => {
105 exhausted = true;
106 let slice = buffer.split_off(0).freeze();
107 return Some((
108 Ok(slice),
109 (stream, buffer, exhausted, chunk_size),
110 ));
111 }
112 Some(Ok(bytes)) => {
113 buffer.put(bytes);
114 }
115 Some(Err(e)) => {
116 return Some((
117 Err(crate::Error::Generic {
118 store: "ChunkedStore",
119 source: Box::new(e),
120 }),
121 (stream, buffer, exhausted, chunk_size),
122 ))
123 }
124 };
125 }
126 let slice = buffer.split_to(chunk_size).freeze();
128 Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
129 },
130 )
131 .boxed()
132 }
133 };
134 Ok(GetResult {
135 payload: GetResultPayload::Stream(stream),
136 ..r
137 })
138 }
139
140 async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
141 self.inner.get_range(location, range).await
142 }
143
144 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
145 self.inner.head(location).await
146 }
147
148 async fn delete(&self, location: &Path) -> Result<()> {
149 self.inner.delete(location).await
150 }
151
152 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
153 self.inner.list(prefix)
154 }
155
156 fn list_with_offset(
157 &self,
158 prefix: Option<&Path>,
159 offset: &Path,
160 ) -> BoxStream<'_, Result<ObjectMeta>> {
161 self.inner.list_with_offset(prefix, offset)
162 }
163
164 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
165 self.inner.list_with_delimiter(prefix).await
166 }
167
168 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
169 self.inner.copy(from, to).await
170 }
171
172 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
173 self.inner.copy_if_not_exists(from, to).await
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use futures::StreamExt;
180
181 use crate::integration::*;
182 use crate::local::LocalFileSystem;
183 use crate::memory::InMemory;
184 use crate::path::Path;
185
186 use super::*;
187
188 #[tokio::test]
189 async fn test_chunked_basic() {
190 let location = Path::parse("test").unwrap();
191 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
192 store.put(&location, vec![0; 1001].into()).await.unwrap();
193
194 for chunk_size in [10, 20, 31] {
195 let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
196 let mut s = match store.get(&location).await.unwrap().payload {
197 GetResultPayload::Stream(s) => s,
198 _ => unreachable!(),
199 };
200
201 let mut remaining = 1001;
202 while let Some(next) = s.next().await {
203 let size = next.unwrap().len();
204 let expected = remaining.min(chunk_size);
205 assert_eq!(size, expected);
206 remaining -= expected;
207 }
208 assert_eq!(remaining, 0);
209 }
210 }
211
212 #[tokio::test]
213 async fn test_chunked() {
214 let temporary = tempfile::tempdir().unwrap();
215 let integrations: &[Arc<dyn ObjectStore>] = &[
216 Arc::new(InMemory::new()),
217 Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
218 ];
219
220 for integration in integrations {
221 let integration = ChunkedStore::new(Arc::clone(integration), 100);
222
223 put_get_delete_list(&integration).await;
224 get_opts(&integration).await;
225 list_uses_directories_correctly(&integration).await;
226 list_with_delimiter(&integration).await;
227 rename_and_copy(&integration).await;
228 copy_if_not_exists(&integration).await;
229 stream_get(&integration).await;
230 }
231 }
232}