object_store/
upload.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::task::{Context, Poll};
19
20use crate::{PutPayload, PutPayloadMut, PutResult, Result};
21use async_trait::async_trait;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::ready;
25use tokio::task::JoinSet;
26
27/// An upload part request
28pub type UploadPart = BoxFuture<'static, Result<()>>;
29
30/// A trait allowing writing an object in fixed size chunks
31///
32/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
33/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
34/// may be polled in parallel, allowing for concurrent uploads.
35///
36/// Once all part uploads have been polled to completion, the upload can be completed by
37/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
38/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
39/// is called before all [`UploadPart`] have been polled to completion.
40#[async_trait]
41pub trait MultipartUpload: Send + std::fmt::Debug {
42    /// Upload the next part
43    ///
44    /// Most stores require that all parts excluding the last are at least 5 MiB, and some
45    /// further require that all parts excluding the last be the same size, e.g. [R2].
46    /// Clients wanting to maximise compatibility should therefore perform writes in
47    /// fixed size blocks larger than 5 MiB.
48    ///
49    /// Implementations may invoke this method multiple times and then await on the
50    /// returned futures in parallel
51    ///
52    /// ```no_run
53    /// # use futures::StreamExt;
54    /// # use object_store::MultipartUpload;
55    /// #
56    /// # async fn test() {
57    /// #
58    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
59    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
60    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
61    /// futures::future::try_join(p1, p2).await.unwrap();
62    /// upload.complete().await.unwrap();
63    /// # }
64    /// ```
65    ///
66    /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
67    fn put_part(&mut self, data: PutPayload) -> UploadPart;
68
69    /// Complete the multipart upload
70    ///
71    /// It is implementation defined behaviour if this method is called before polling
72    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
73    /// it is implementation defined behaviour to call [`MultipartUpload::complete`]
74    /// on an already completed or aborted [`MultipartUpload`].
75    async fn complete(&mut self) -> Result<PutResult>;
76
77    /// Abort the multipart upload
78    ///
79    /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
80    /// some object stores will automatically clean up any previously uploaded parts.
81    /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
82    /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
83    ///
84    /// It will not be possible to call `abort` in all failure scenarios, for example
85    /// non-graceful shutdown of the calling application. It is therefore recommended
86    /// object stores are configured with lifecycle rules to automatically cleanup
87    /// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
88    /// for more information.
89    ///
90    /// It is implementation defined behaviour to call [`MultipartUpload::abort`]
91    /// on an already completed or aborted [`MultipartUpload`]
92    async fn abort(&mut self) -> Result<()>;
93}
94
95#[async_trait]
96impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
97    fn put_part(&mut self, data: PutPayload) -> UploadPart {
98        (**self).put_part(data)
99    }
100
101    async fn complete(&mut self) -> Result<PutResult> {
102        (**self).complete().await
103    }
104
105    async fn abort(&mut self) -> Result<()> {
106        (**self).abort().await
107    }
108}
109
110/// A synchronous write API for uploading data in parallel in fixed size chunks
111///
112/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
113///
114/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
115/// allowing back pressure on producers, prior to buffering the next part. However, unlike
116/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
117///
118/// [`Sink`]: futures::sink::Sink
119#[derive(Debug)]
120pub struct WriteMultipart {
121    upload: Box<dyn MultipartUpload>,
122
123    buffer: PutPayloadMut,
124
125    chunk_size: usize,
126
127    tasks: JoinSet<Result<()>>,
128}
129
130impl WriteMultipart {
131    /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
132    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
133        Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
134    }
135
136    /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
137    pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
138        Self {
139            upload,
140            chunk_size,
141            buffer: PutPayloadMut::new(),
142            tasks: Default::default(),
143        }
144    }
145
146    /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
147    ///
148    /// See [`Self::wait_for_capacity`] for an async version of this function
149    pub fn poll_for_capacity(
150        &mut self,
151        cx: &mut Context<'_>,
152        max_concurrency: usize,
153    ) -> Poll<Result<()>> {
154        while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
155            ready!(self.tasks.poll_join_next(cx)).unwrap()??
156        }
157        Poll::Ready(Ok(()))
158    }
159
160    /// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
161    ///
162    /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
163    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
164        futures::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
165    }
166
167    /// Write data to this [`WriteMultipart`]
168    ///
169    /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
170    /// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
171    ///
172    /// Note this method is synchronous (not `async`) and will immediately
173    /// start new uploads as soon as the internal `chunk_size` is hit,
174    /// regardless of how many outstanding uploads are already in progress.
175    ///
176    /// Back pressure can optionally be applied to producers by calling
177    /// [`Self::wait_for_capacity`] prior to calling this method
178    pub fn write(&mut self, mut buf: &[u8]) {
179        while !buf.is_empty() {
180            let remaining = self.chunk_size - self.buffer.content_length();
181            let to_read = buf.len().min(remaining);
182            self.buffer.extend_from_slice(&buf[..to_read]);
183            if to_read == remaining {
184                let buffer = std::mem::take(&mut self.buffer);
185                self.put_part(buffer.into())
186            }
187            buf = &buf[to_read..]
188        }
189    }
190
191    /// Put a chunk of data into this [`WriteMultipart`] without copying
192    ///
193    /// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
194    /// perform writes from non-owned buffers should prefer [`Self::write`] as this
195    /// will allow multiple calls to share the same underlying allocation.
196    ///
197    /// See [`Self::write`] for information on backpressure
198    pub fn put(&mut self, mut bytes: Bytes) {
199        while !bytes.is_empty() {
200            let remaining = self.chunk_size - self.buffer.content_length();
201            if bytes.len() < remaining {
202                self.buffer.push(bytes);
203                return;
204            }
205            self.buffer.push(bytes.split_to(remaining));
206            let buffer = std::mem::take(&mut self.buffer);
207            self.put_part(buffer.into())
208        }
209    }
210
211    pub(crate) fn put_part(&mut self, part: PutPayload) {
212        self.tasks.spawn(self.upload.put_part(part));
213    }
214
215    /// Abort this upload, attempting to clean up any successfully uploaded parts
216    pub async fn abort(mut self) -> Result<()> {
217        self.tasks.shutdown().await;
218        self.upload.abort().await
219    }
220
221    /// Flush final chunk, and await completion of all in-flight requests
222    pub async fn finish(mut self) -> Result<PutResult> {
223        if !self.buffer.is_empty() {
224            let part = std::mem::take(&mut self.buffer);
225            self.put_part(part.into())
226        }
227
228        self.wait_for_capacity(0).await?;
229
230        match self.upload.complete().await {
231            Err(e) => {
232                self.tasks.shutdown().await;
233                self.upload.abort().await?;
234                Err(e)
235            }
236            Ok(result) => Ok(result),
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::sync::Arc;
244    use std::time::Duration;
245
246    use futures::FutureExt;
247    use parking_lot::Mutex;
248    use rand::prelude::StdRng;
249    use rand::{Rng, SeedableRng};
250
251    use crate::memory::InMemory;
252    use crate::path::Path;
253    use crate::throttle::{ThrottleConfig, ThrottledStore};
254    use crate::ObjectStore;
255
256    use super::*;
257
258    #[tokio::test]
259    async fn test_concurrency() {
260        let config = ThrottleConfig {
261            wait_put_per_call: Duration::from_millis(1),
262            ..Default::default()
263        };
264
265        let path = Path::from("foo");
266        let store = ThrottledStore::new(InMemory::new(), config);
267        let upload = store.put_multipart(&path).await.unwrap();
268        let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
269
270        for _ in 0..20 {
271            write.write(&[0; 5]);
272        }
273        assert!(write.wait_for_capacity(10).now_or_never().is_none());
274        write.wait_for_capacity(10).await.unwrap()
275    }
276
277    #[derive(Debug, Default)]
278    struct InstrumentedUpload {
279        chunks: Arc<Mutex<Vec<PutPayload>>>,
280    }
281
282    #[async_trait]
283    impl MultipartUpload for InstrumentedUpload {
284        fn put_part(&mut self, data: PutPayload) -> UploadPart {
285            self.chunks.lock().push(data);
286            futures::future::ready(Ok(())).boxed()
287        }
288
289        async fn complete(&mut self) -> Result<PutResult> {
290            Ok(PutResult {
291                e_tag: None,
292                version: None,
293            })
294        }
295
296        async fn abort(&mut self) -> Result<()> {
297            unimplemented!()
298        }
299    }
300
301    #[tokio::test]
302    async fn test_write_multipart() {
303        let mut rng = StdRng::seed_from_u64(42);
304
305        for method in [0.0, 0.5, 1.0] {
306            for _ in 0..10 {
307                for chunk_size in [1, 17, 23] {
308                    let upload = Box::<InstrumentedUpload>::default();
309                    let chunks = Arc::clone(&upload.chunks);
310                    let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
311
312                    let mut expected = Vec::with_capacity(1024);
313
314                    for _ in 0..50 {
315                        let chunk_size = rng.gen_range(0..30);
316                        let data: Vec<_> = (0..chunk_size).map(|_| rng.gen()).collect();
317                        expected.extend_from_slice(&data);
318
319                        match rng.gen_bool(method) {
320                            true => write.put(data.into()),
321                            false => write.write(&data),
322                        }
323                    }
324                    write.finish().await.unwrap();
325
326                    let chunks = chunks.lock();
327
328                    let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
329                    assert_eq!(expected, actual);
330
331                    for chunk in chunks.iter().take(chunks.len() - 1) {
332                        assert_eq!(chunk.content_length(), chunk_size)
333                    }
334
335                    let last_chunk = chunks.last().unwrap().content_length();
336                    assert!(last_chunk <= chunk_size, "{chunk_size}");
337                }
338            }
339        }
340    }
341}