object_store/
payload.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use std::sync::Arc;
20
21/// A cheaply cloneable, ordered collection of [`Bytes`]
22#[derive(Debug, Clone)]
23pub struct PutPayload(Arc<[Bytes]>);
24
25impl Default for PutPayload {
26    fn default() -> Self {
27        Self(Arc::new([]))
28    }
29}
30
31impl PutPayload {
32    /// Create a new empty [`PutPayload`]
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Creates a [`PutPayload`] from a static slice
38    pub fn from_static(s: &'static [u8]) -> Self {
39        s.into()
40    }
41
42    /// Creates a [`PutPayload`] from a [`Bytes`]
43    pub fn from_bytes(s: Bytes) -> Self {
44        s.into()
45    }
46
47    #[cfg(feature = "cloud")]
48    pub(crate) fn body(&self) -> reqwest::Body {
49        reqwest::Body::wrap_stream(futures::stream::iter(
50            self.clone().into_iter().map(Ok::<_, crate::Error>),
51        ))
52    }
53
54    /// Returns the total length of the [`Bytes`] in this payload
55    pub fn content_length(&self) -> usize {
56        self.0.iter().map(|b| b.len()).sum()
57    }
58
59    /// Returns an iterator over the [`Bytes`] in this payload
60    pub fn iter(&self) -> PutPayloadIter<'_> {
61        PutPayloadIter(self.0.iter())
62    }
63}
64
65impl AsRef<[Bytes]> for PutPayload {
66    fn as_ref(&self) -> &[Bytes] {
67        self.0.as_ref()
68    }
69}
70
71impl<'a> IntoIterator for &'a PutPayload {
72    type Item = &'a Bytes;
73    type IntoIter = PutPayloadIter<'a>;
74
75    fn into_iter(self) -> Self::IntoIter {
76        self.iter()
77    }
78}
79
80impl IntoIterator for PutPayload {
81    type Item = Bytes;
82    type IntoIter = PutPayloadIntoIter;
83
84    fn into_iter(self) -> Self::IntoIter {
85        PutPayloadIntoIter {
86            payload: self,
87            idx: 0,
88        }
89    }
90}
91
92/// An iterator over [`PutPayload`]
93#[derive(Debug)]
94pub struct PutPayloadIter<'a>(std::slice::Iter<'a, Bytes>);
95
96impl<'a> Iterator for PutPayloadIter<'a> {
97    type Item = &'a Bytes;
98
99    fn next(&mut self) -> Option<Self::Item> {
100        self.0.next()
101    }
102
103    fn size_hint(&self) -> (usize, Option<usize>) {
104        self.0.size_hint()
105    }
106}
107
108/// An owning iterator of [`PutPayload`]
109#[derive(Debug)]
110pub struct PutPayloadIntoIter {
111    payload: PutPayload,
112    idx: usize,
113}
114
115impl Iterator for PutPayloadIntoIter {
116    type Item = Bytes;
117
118    fn next(&mut self) -> Option<Self::Item> {
119        let p = self.payload.0.get(self.idx)?.clone();
120        self.idx += 1;
121        Some(p)
122    }
123
124    fn size_hint(&self) -> (usize, Option<usize>) {
125        let l = self.payload.0.len() - self.idx;
126        (l, Some(l))
127    }
128}
129
130impl From<Bytes> for PutPayload {
131    fn from(value: Bytes) -> Self {
132        Self(Arc::new([value]))
133    }
134}
135
136impl From<Vec<u8>> for PutPayload {
137    fn from(value: Vec<u8>) -> Self {
138        Self(Arc::new([value.into()]))
139    }
140}
141
142impl From<&'static str> for PutPayload {
143    fn from(value: &'static str) -> Self {
144        Bytes::from(value).into()
145    }
146}
147
148impl From<&'static [u8]> for PutPayload {
149    fn from(value: &'static [u8]) -> Self {
150        Bytes::from(value).into()
151    }
152}
153
154impl From<String> for PutPayload {
155    fn from(value: String) -> Self {
156        Bytes::from(value).into()
157    }
158}
159
160impl FromIterator<u8> for PutPayload {
161    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
162        Bytes::from_iter(iter).into()
163    }
164}
165
166impl FromIterator<Bytes> for PutPayload {
167    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
168        Self(iter.into_iter().collect())
169    }
170}
171
172impl From<PutPayload> for Bytes {
173    fn from(value: PutPayload) -> Self {
174        match value.0.len() {
175            0 => Self::new(),
176            1 => value.0[0].clone(),
177            _ => {
178                let mut buf = Vec::with_capacity(value.content_length());
179                value.iter().for_each(|x| buf.extend_from_slice(x));
180                buf.into()
181            }
182        }
183    }
184}
185
186/// A builder for [`PutPayload`] that avoids reallocating memory
187///
188/// Data is allocated in fixed blocks, which are flushed to [`Bytes`] once full.
189/// Unlike [`Vec`] this avoids needing to repeatedly reallocate blocks of memory,
190/// which typically involves copying all the previously written data to a new
191/// contiguous memory region.
192#[derive(Debug)]
193pub struct PutPayloadMut {
194    len: usize,
195    completed: Vec<Bytes>,
196    in_progress: Vec<u8>,
197    block_size: usize,
198}
199
200impl Default for PutPayloadMut {
201    fn default() -> Self {
202        Self {
203            len: 0,
204            completed: vec![],
205            in_progress: vec![],
206
207            block_size: 8 * 1024,
208        }
209    }
210}
211
212impl PutPayloadMut {
213    /// Create a new [`PutPayloadMut`]
214    pub fn new() -> Self {
215        Self::default()
216    }
217
218    /// Configures the minimum allocation size
219    ///
220    /// Defaults to 8KB
221    pub fn with_block_size(self, block_size: usize) -> Self {
222        Self { block_size, ..self }
223    }
224
225    /// Write bytes into this [`PutPayloadMut`]
226    ///
227    /// If there is an in-progress block, data will be first written to it, flushing
228    /// it to [`Bytes`] once full. If data remains to be written, a new block of memory
229    /// of at least the configured block size will be allocated, to hold the remaining data.
230    pub fn extend_from_slice(&mut self, slice: &[u8]) {
231        let remaining = self.in_progress.capacity() - self.in_progress.len();
232        let to_copy = remaining.min(slice.len());
233
234        self.in_progress.extend_from_slice(&slice[..to_copy]);
235        if self.in_progress.capacity() == self.in_progress.len() {
236            let new_cap = self.block_size.max(slice.len() - to_copy);
237            let completed = std::mem::replace(&mut self.in_progress, Vec::with_capacity(new_cap));
238            if !completed.is_empty() {
239                self.completed.push(completed.into())
240            }
241            self.in_progress.extend_from_slice(&slice[to_copy..])
242        }
243        self.len += slice.len();
244    }
245
246    /// Append a [`Bytes`] to this [`PutPayloadMut`] without copying
247    ///
248    /// This will close any currently buffered block populated by [`Self::extend_from_slice`],
249    /// and append `bytes` to this payload without copying.
250    pub fn push(&mut self, bytes: Bytes) {
251        if !self.in_progress.is_empty() {
252            let completed = std::mem::take(&mut self.in_progress);
253            self.completed.push(completed.into())
254        }
255        self.len += bytes.len();
256        self.completed.push(bytes);
257    }
258
259    /// Returns `true` if this [`PutPayloadMut`] contains no bytes
260    #[inline]
261    pub fn is_empty(&self) -> bool {
262        self.len == 0
263    }
264
265    /// Returns the total length of the [`Bytes`] in this payload
266    #[inline]
267    pub fn content_length(&self) -> usize {
268        self.len
269    }
270
271    /// Convert into [`PutPayload`]
272    pub fn freeze(mut self) -> PutPayload {
273        if !self.in_progress.is_empty() {
274            let completed = std::mem::take(&mut self.in_progress).into();
275            self.completed.push(completed);
276        }
277        PutPayload(self.completed.into())
278    }
279}
280
281impl From<PutPayloadMut> for PutPayload {
282    fn from(value: PutPayloadMut) -> Self {
283        value.freeze()
284    }
285}
286
287#[cfg(test)]
288mod test {
289    use crate::PutPayloadMut;
290
291    #[test]
292    fn test_put_payload() {
293        let mut chunk = PutPayloadMut::new().with_block_size(23);
294        chunk.extend_from_slice(&[1; 16]);
295        chunk.extend_from_slice(&[2; 32]);
296        chunk.extend_from_slice(&[2; 5]);
297        chunk.extend_from_slice(&[2; 21]);
298        chunk.extend_from_slice(&[2; 40]);
299        chunk.extend_from_slice(&[0; 0]);
300        chunk.push("foobar".into());
301
302        let payload = chunk.freeze();
303        assert_eq!(payload.content_length(), 120);
304
305        let chunks = payload.as_ref();
306        assert_eq!(chunks.len(), 6);
307
308        assert_eq!(chunks[0].len(), 23);
309        assert_eq!(chunks[1].len(), 25); // 32 - (23 - 16)
310        assert_eq!(chunks[2].len(), 23);
311        assert_eq!(chunks[3].len(), 23);
312        assert_eq!(chunks[4].len(), 20);
313        assert_eq!(chunks[5].len(), 6);
314    }
315
316    #[test]
317    fn test_content_length() {
318        let mut chunk = PutPayloadMut::new();
319        chunk.push(vec![0; 23].into());
320        assert_eq!(chunk.content_length(), 23);
321        chunk.extend_from_slice(&[0; 4]);
322        assert_eq!(chunk.content_length(), 27);
323        chunk.push(vec![0; 121].into());
324        assert_eq!(chunk.content_length(), 148);
325        let payload = chunk.freeze();
326        assert_eq!(payload.content_length(), 148);
327    }
328}