object_store/upload.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::task::{Context, Poll};
19
20use crate::{PutPayload, PutPayloadMut, PutResult, Result};
21use async_trait::async_trait;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::ready;
25use tokio::task::JoinSet;
26
27/// An upload part request
28pub type UploadPart = BoxFuture<'static, Result<()>>;
29
30/// A trait allowing writing an object in fixed size chunks
31///
32/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
33/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
34/// may be polled in parallel, allowing for concurrent uploads.
35///
36/// Once all part uploads have been polled to completion, the upload can be completed by
37/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
38/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
39/// is called before all [`UploadPart`] have been polled to completion.
40#[async_trait]
41pub trait MultipartUpload: Send + std::fmt::Debug {
42 /// Upload the next part
43 ///
44 /// Most stores require that all parts excluding the last are at least 5 MiB, and some
45 /// further require that all parts excluding the last be the same size, e.g. [R2].
46 /// Clients wanting to maximise compatibility should therefore perform writes in
47 /// fixed size blocks larger than 5 MiB.
48 ///
49 /// Implementations may invoke this method multiple times and then await on the
50 /// returned futures in parallel
51 ///
52 /// ```no_run
53 /// # use futures::StreamExt;
54 /// # use object_store::MultipartUpload;
55 /// #
56 /// # async fn test() {
57 /// #
58 /// let mut upload: Box<&dyn MultipartUpload> = todo!();
59 /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
60 /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
61 /// futures::future::try_join(p1, p2).await.unwrap();
62 /// upload.complete().await.unwrap();
63 /// # }
64 /// ```
65 ///
66 /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
67 fn put_part(&mut self, data: PutPayload) -> UploadPart;
68
69 /// Complete the multipart upload
70 ///
71 /// It is implementation defined behaviour if this method is called before polling
72 /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
73 /// it is implementation defined behaviour to call [`MultipartUpload::complete`]
74 /// on an already completed or aborted [`MultipartUpload`].
75 async fn complete(&mut self) -> Result<PutResult>;
76
77 /// Abort the multipart upload
78 ///
79 /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
80 /// some object stores will automatically clean up any previously uploaded parts.
81 /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
82 /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
83 ///
84 /// It will not be possible to call `abort` in all failure scenarios, for example
85 /// non-graceful shutdown of the calling application. It is therefore recommended
86 /// object stores are configured with lifecycle rules to automatically cleanup
87 /// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
88 /// for more information.
89 ///
90 /// It is implementation defined behaviour to call [`MultipartUpload::abort`]
91 /// on an already completed or aborted [`MultipartUpload`]
92 async fn abort(&mut self) -> Result<()>;
93}
94
95#[async_trait]
96impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
97 fn put_part(&mut self, data: PutPayload) -> UploadPart {
98 (**self).put_part(data)
99 }
100
101 async fn complete(&mut self) -> Result<PutResult> {
102 (**self).complete().await
103 }
104
105 async fn abort(&mut self) -> Result<()> {
106 (**self).abort().await
107 }
108}
109
110/// A synchronous write API for uploading data in parallel in fixed size chunks
111///
112/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
113///
114/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
115/// allowing back pressure on producers, prior to buffering the next part. However, unlike
116/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
117///
118/// [`Sink`]: futures::sink::Sink
119#[derive(Debug)]
120pub struct WriteMultipart {
121 upload: Box<dyn MultipartUpload>,
122
123 buffer: PutPayloadMut,
124
125 chunk_size: usize,
126
127 tasks: JoinSet<Result<()>>,
128}
129
130impl WriteMultipart {
131 /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
132 pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
133 Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
134 }
135
136 /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
137 pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
138 Self {
139 upload,
140 chunk_size,
141 buffer: PutPayloadMut::new(),
142 tasks: Default::default(),
143 }
144 }
145
146 /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
147 ///
148 /// See [`Self::wait_for_capacity`] for an async version of this function
149 pub fn poll_for_capacity(
150 &mut self,
151 cx: &mut Context<'_>,
152 max_concurrency: usize,
153 ) -> Poll<Result<()>> {
154 while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
155 ready!(self.tasks.poll_join_next(cx)).unwrap()??
156 }
157 Poll::Ready(Ok(()))
158 }
159
160 /// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
161 ///
162 /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
163 pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
164 futures::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
165 }
166
167 /// Write data to this [`WriteMultipart`]
168 ///
169 /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
170 /// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
171 ///
172 /// Note this method is synchronous (not `async`) and will immediately
173 /// start new uploads as soon as the internal `chunk_size` is hit,
174 /// regardless of how many outstanding uploads are already in progress.
175 ///
176 /// Back pressure can optionally be applied to producers by calling
177 /// [`Self::wait_for_capacity`] prior to calling this method
178 pub fn write(&mut self, mut buf: &[u8]) {
179 while !buf.is_empty() {
180 let remaining = self.chunk_size - self.buffer.content_length();
181 let to_read = buf.len().min(remaining);
182 self.buffer.extend_from_slice(&buf[..to_read]);
183 if to_read == remaining {
184 let buffer = std::mem::take(&mut self.buffer);
185 self.put_part(buffer.into())
186 }
187 buf = &buf[to_read..]
188 }
189 }
190
191 /// Put a chunk of data into this [`WriteMultipart`] without copying
192 ///
193 /// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
194 /// perform writes from non-owned buffers should prefer [`Self::write`] as this
195 /// will allow multiple calls to share the same underlying allocation.
196 ///
197 /// See [`Self::write`] for information on backpressure
198 pub fn put(&mut self, mut bytes: Bytes) {
199 while !bytes.is_empty() {
200 let remaining = self.chunk_size - self.buffer.content_length();
201 if bytes.len() < remaining {
202 self.buffer.push(bytes);
203 return;
204 }
205 self.buffer.push(bytes.split_to(remaining));
206 let buffer = std::mem::take(&mut self.buffer);
207 self.put_part(buffer.into())
208 }
209 }
210
211 pub(crate) fn put_part(&mut self, part: PutPayload) {
212 self.tasks.spawn(self.upload.put_part(part));
213 }
214
215 /// Abort this upload, attempting to clean up any successfully uploaded parts
216 pub async fn abort(mut self) -> Result<()> {
217 self.tasks.shutdown().await;
218 self.upload.abort().await
219 }
220
221 /// Flush final chunk, and await completion of all in-flight requests
222 pub async fn finish(mut self) -> Result<PutResult> {
223 if !self.buffer.is_empty() {
224 let part = std::mem::take(&mut self.buffer);
225 self.put_part(part.into())
226 }
227
228 self.wait_for_capacity(0).await?;
229
230 match self.upload.complete().await {
231 Err(e) => {
232 self.tasks.shutdown().await;
233 self.upload.abort().await?;
234 Err(e)
235 }
236 Ok(result) => Ok(result),
237 }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::sync::Arc;
244 use std::time::Duration;
245
246 use futures::FutureExt;
247 use parking_lot::Mutex;
248 use rand::prelude::StdRng;
249 use rand::{Rng, SeedableRng};
250
251 use crate::memory::InMemory;
252 use crate::path::Path;
253 use crate::throttle::{ThrottleConfig, ThrottledStore};
254 use crate::ObjectStore;
255
256 use super::*;
257
258 #[tokio::test]
259 async fn test_concurrency() {
260 let config = ThrottleConfig {
261 wait_put_per_call: Duration::from_millis(1),
262 ..Default::default()
263 };
264
265 let path = Path::from("foo");
266 let store = ThrottledStore::new(InMemory::new(), config);
267 let upload = store.put_multipart(&path).await.unwrap();
268 let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
269
270 for _ in 0..20 {
271 write.write(&[0; 5]);
272 }
273 assert!(write.wait_for_capacity(10).now_or_never().is_none());
274 write.wait_for_capacity(10).await.unwrap()
275 }
276
277 #[derive(Debug, Default)]
278 struct InstrumentedUpload {
279 chunks: Arc<Mutex<Vec<PutPayload>>>,
280 }
281
282 #[async_trait]
283 impl MultipartUpload for InstrumentedUpload {
284 fn put_part(&mut self, data: PutPayload) -> UploadPart {
285 self.chunks.lock().push(data);
286 futures::future::ready(Ok(())).boxed()
287 }
288
289 async fn complete(&mut self) -> Result<PutResult> {
290 Ok(PutResult {
291 e_tag: None,
292 version: None,
293 })
294 }
295
296 async fn abort(&mut self) -> Result<()> {
297 unimplemented!()
298 }
299 }
300
301 #[tokio::test]
302 async fn test_write_multipart() {
303 let mut rng = StdRng::seed_from_u64(42);
304
305 for method in [0.0, 0.5, 1.0] {
306 for _ in 0..10 {
307 for chunk_size in [1, 17, 23] {
308 let upload = Box::<InstrumentedUpload>::default();
309 let chunks = Arc::clone(&upload.chunks);
310 let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
311
312 let mut expected = Vec::with_capacity(1024);
313
314 for _ in 0..50 {
315 let chunk_size = rng.gen_range(0..30);
316 let data: Vec<_> = (0..chunk_size).map(|_| rng.gen()).collect();
317 expected.extend_from_slice(&data);
318
319 match rng.gen_bool(method) {
320 true => write.put(data.into()),
321 false => write.write(&data),
322 }
323 }
324 write.finish().await.unwrap();
325
326 let chunks = chunks.lock();
327
328 let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
329 assert_eq!(expected, actual);
330
331 for chunk in chunks.iter().take(chunks.len() - 1) {
332 assert_eq!(chunk.content_length(), chunk_size)
333 }
334
335 let last_chunk = chunks.last().unwrap().content_length();
336 assert!(last_chunk <= chunk_size, "{chunk_size}");
337 }
338 }
339 }
340 }
341}