object_store/
throttle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A throttling object store wrapper
19use parking_lot::Mutex;
20use std::ops::Range;
21use std::{convert::TryInto, sync::Arc};
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::{
25    path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta,
26    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
27};
28use crate::{GetOptions, UploadPart};
29use async_trait::async_trait;
30use bytes::Bytes;
31use futures::{stream::BoxStream, FutureExt, StreamExt};
32use std::time::Duration;
33
34/// Configuration settings for throttled store
35#[derive(Debug, Default, Clone, Copy)]
36pub struct ThrottleConfig {
37    /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
38    ///
39    /// Sleeping is done before the underlying store is called and independently of the success of
40    /// the operation.
41    pub wait_delete_per_call: Duration,
42
43    /// Sleep duration for every byte received during [`get`](ThrottledStore::get).
44    ///
45    /// Sleeping is performed after the underlying store returned and only for successful gets. The
46    /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
47    ///
48    /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
49    /// there be an intermediate failure (i.e. after partly consuming the output bytes), the
50    /// resulting sleep time will be partial as well.
51    pub wait_get_per_byte: Duration,
52
53    /// Sleep duration for every call to [`get`](ThrottledStore::get).
54    ///
55    /// Sleeping is done before the underlying store is called and independently of the success of
56    /// the operation. The sleep duration is additive to
57    /// [`wait_get_per_byte`](Self::wait_get_per_byte).
58    pub wait_get_per_call: Duration,
59
60    /// Sleep duration for every call to [`list`](ThrottledStore::list).
61    ///
62    /// Sleeping is done before the underlying store is called and independently of the success of
63    /// the operation. The sleep duration is additive to
64    /// [`wait_list_per_entry`](Self::wait_list_per_entry).
65    pub wait_list_per_call: Duration,
66
67    /// Sleep duration for every entry received during [`list`](ThrottledStore::list).
68    ///
69    /// Sleeping is performed after the underlying store returned and only for successful lists.
70    /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
71    ///
72    /// Note that the per-entry sleep only happens as the user consumes the output entries. Should
73    /// there be an intermediate failure (i.e. after partly consuming the output entries), the
74    /// resulting sleep time will be partial as well.
75    pub wait_list_per_entry: Duration,
76
77    /// Sleep duration for every call to
78    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
79    ///
80    /// Sleeping is done before the underlying store is called and independently of the success of
81    /// the operation. The sleep duration is additive to
82    /// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
83    pub wait_list_with_delimiter_per_call: Duration,
84
85    /// Sleep duration for every entry received during
86    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
87    ///
88    /// Sleeping is performed after the underlying store returned and only for successful gets. The
89    /// sleep duration is additive to
90    /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
91    pub wait_list_with_delimiter_per_entry: Duration,
92
93    /// Sleep duration for every call to [`put`](ThrottledStore::put).
94    ///
95    /// Sleeping is done before the underlying store is called and independently of the success of
96    /// the operation.
97    pub wait_put_per_call: Duration,
98}
99
100/// Sleep only if non-zero duration
101async fn sleep(duration: Duration) {
102    if !duration.is_zero() {
103        tokio::time::sleep(duration).await
104    }
105}
106
107/// Store wrapper that wraps an inner store with some `sleep` calls.
108///
109/// This can be used for performance testing.
110///
111/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
112/// conditions!**
113#[derive(Debug)]
114pub struct ThrottledStore<T> {
115    inner: T,
116    config: Arc<Mutex<ThrottleConfig>>,
117}
118
119impl<T> ThrottledStore<T> {
120    /// Create new wrapper with zero waiting times.
121    pub fn new(inner: T, config: ThrottleConfig) -> Self {
122        Self {
123            inner,
124            config: Arc::new(Mutex::new(config)),
125        }
126    }
127
128    /// Mutate config.
129    pub fn config_mut<F>(&self, f: F)
130    where
131        F: Fn(&mut ThrottleConfig),
132    {
133        let mut guard = self.config.lock();
134        f(&mut guard)
135    }
136
137    /// Return copy of current config.
138    pub fn config(&self) -> ThrottleConfig {
139        *self.config.lock()
140    }
141}
142
143impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        write!(f, "ThrottledStore({})", self.inner)
146    }
147}
148
149#[async_trait]
150impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
151    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
152        sleep(self.config().wait_put_per_call).await;
153        self.inner.put(location, payload).await
154    }
155
156    async fn put_opts(
157        &self,
158        location: &Path,
159        payload: PutPayload,
160        opts: PutOptions,
161    ) -> Result<PutResult> {
162        sleep(self.config().wait_put_per_call).await;
163        self.inner.put_opts(location, payload, opts).await
164    }
165
166    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
167        let upload = self.inner.put_multipart(location).await?;
168        Ok(Box::new(ThrottledUpload {
169            upload,
170            sleep: self.config().wait_put_per_call,
171        }))
172    }
173
174    async fn put_multipart_opts(
175        &self,
176        location: &Path,
177        opts: PutMultipartOpts,
178    ) -> Result<Box<dyn MultipartUpload>> {
179        let upload = self.inner.put_multipart_opts(location, opts).await?;
180        Ok(Box::new(ThrottledUpload {
181            upload,
182            sleep: self.config().wait_put_per_call,
183        }))
184    }
185
186    async fn get(&self, location: &Path) -> Result<GetResult> {
187        sleep(self.config().wait_get_per_call).await;
188
189        // need to copy to avoid moving / referencing `self`
190        let wait_get_per_byte = self.config().wait_get_per_byte;
191
192        let result = self.inner.get(location).await?;
193        Ok(throttle_get(result, wait_get_per_byte))
194    }
195
196    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
197        sleep(self.config().wait_get_per_call).await;
198
199        // need to copy to avoid moving / referencing `self`
200        let wait_get_per_byte = self.config().wait_get_per_byte;
201
202        let result = self.inner.get_opts(location, options).await?;
203        Ok(throttle_get(result, wait_get_per_byte))
204    }
205
206    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
207        let config = self.config();
208
209        let sleep_duration =
210            config.wait_get_per_call + config.wait_get_per_byte * (range.end - range.start) as u32;
211
212        sleep(sleep_duration).await;
213
214        self.inner.get_range(location, range).await
215    }
216
217    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
218        let config = self.config();
219
220        let total_bytes: usize = ranges.iter().map(|range| range.end - range.start).sum();
221        let sleep_duration =
222            config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32;
223
224        sleep(sleep_duration).await;
225
226        self.inner.get_ranges(location, ranges).await
227    }
228
229    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
230        sleep(self.config().wait_put_per_call).await;
231        self.inner.head(location).await
232    }
233
234    async fn delete(&self, location: &Path) -> Result<()> {
235        sleep(self.config().wait_delete_per_call).await;
236
237        self.inner.delete(location).await
238    }
239
240    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
241        let stream = self.inner.list(prefix);
242        futures::stream::once(async move {
243            let wait_list_per_entry = self.config().wait_list_per_entry;
244            sleep(self.config().wait_list_per_call).await;
245            throttle_stream(stream, move |_| wait_list_per_entry)
246        })
247        .flatten()
248        .boxed()
249    }
250
251    fn list_with_offset(
252        &self,
253        prefix: Option<&Path>,
254        offset: &Path,
255    ) -> BoxStream<'_, Result<ObjectMeta>> {
256        let stream = self.inner.list_with_offset(prefix, offset);
257        futures::stream::once(async move {
258            let wait_list_per_entry = self.config().wait_list_per_entry;
259            sleep(self.config().wait_list_per_call).await;
260            throttle_stream(stream, move |_| wait_list_per_entry)
261        })
262        .flatten()
263        .boxed()
264    }
265
266    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
267        sleep(self.config().wait_list_with_delimiter_per_call).await;
268
269        match self.inner.list_with_delimiter(prefix).await {
270            Ok(list_result) => {
271                let entries_len = usize_to_u32_saturate(list_result.objects.len());
272                sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await;
273                Ok(list_result)
274            }
275            Err(err) => Err(err),
276        }
277    }
278
279    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
280        sleep(self.config().wait_put_per_call).await;
281
282        self.inner.copy(from, to).await
283    }
284
285    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
286        sleep(self.config().wait_put_per_call).await;
287
288        self.inner.rename(from, to).await
289    }
290
291    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
292        sleep(self.config().wait_put_per_call).await;
293
294        self.inner.copy_if_not_exists(from, to).await
295    }
296
297    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
298        sleep(self.config().wait_put_per_call).await;
299
300        self.inner.rename_if_not_exists(from, to).await
301    }
302}
303
304/// Saturated `usize` to `u32` cast.
305fn usize_to_u32_saturate(x: usize) -> u32 {
306    x.try_into().unwrap_or(u32::MAX)
307}
308
309fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
310    let s = match result.payload {
311        GetResultPayload::Stream(s) => s,
312        GetResultPayload::File(_, _) => unimplemented!(),
313    };
314
315    let stream = throttle_stream(s, move |bytes| {
316        let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
317        wait_get_per_byte * bytes_len
318    });
319
320    GetResult {
321        payload: GetResultPayload::Stream(stream),
322        ..result
323    }
324}
325
326fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
327    stream: BoxStream<'_, Result<T, E>>,
328    delay: F,
329) -> BoxStream<'_, Result<T, E>>
330where
331    F: Fn(&T) -> Duration + Send + Sync + 'static,
332{
333    stream
334        .then(move |result| {
335            let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
336            sleep(delay).then(|_| futures::future::ready(result))
337        })
338        .boxed()
339}
340
341#[async_trait]
342impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
343    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
344        self.inner.create_multipart(path).await
345    }
346
347    async fn put_part(
348        &self,
349        path: &Path,
350        id: &MultipartId,
351        part_idx: usize,
352        data: PutPayload,
353    ) -> Result<PartId> {
354        sleep(self.config().wait_put_per_call).await;
355        self.inner.put_part(path, id, part_idx, data).await
356    }
357
358    async fn complete_multipart(
359        &self,
360        path: &Path,
361        id: &MultipartId,
362        parts: Vec<PartId>,
363    ) -> Result<PutResult> {
364        self.inner.complete_multipart(path, id, parts).await
365    }
366
367    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
368        self.inner.abort_multipart(path, id).await
369    }
370}
371
372#[derive(Debug)]
373struct ThrottledUpload {
374    upload: Box<dyn MultipartUpload>,
375    sleep: Duration,
376}
377
378#[async_trait]
379impl MultipartUpload for ThrottledUpload {
380    fn put_part(&mut self, data: PutPayload) -> UploadPart {
381        let duration = self.sleep;
382        let put = self.upload.put_part(data);
383        Box::pin(async move {
384            sleep(duration).await;
385            put.await
386        })
387    }
388
389    async fn complete(&mut self) -> Result<PutResult> {
390        self.upload.complete().await
391    }
392
393    async fn abort(&mut self) -> Result<()> {
394        self.upload.abort().await
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::{integration::*, memory::InMemory, GetResultPayload};
402    use futures::TryStreamExt;
403    use tokio::time::Duration;
404    use tokio::time::Instant;
405
406    const WAIT_TIME: Duration = Duration::from_millis(100);
407    const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
408
409    macro_rules! assert_bounds {
410        ($d:expr, $lower:expr) => {
411            assert_bounds!($d, $lower, $lower + 2);
412        };
413        ($d:expr, $lower:expr, $upper:expr) => {
414            let d = $d;
415            let lower = $lower * WAIT_TIME;
416            let upper = $upper * WAIT_TIME;
417            assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
418            assert!(d < upper, "{:?} must be < than {:?}", d, upper);
419        };
420    }
421
422    #[tokio::test]
423    async fn throttle_test() {
424        let inner = InMemory::new();
425        let store = ThrottledStore::new(inner, ThrottleConfig::default());
426
427        put_get_delete_list(&store).await;
428        list_uses_directories_correctly(&store).await;
429        list_with_delimiter(&store).await;
430        rename_and_copy(&store).await;
431        copy_if_not_exists(&store).await;
432        stream_get(&store).await;
433        multipart(&store, &store).await;
434    }
435
436    #[tokio::test]
437    async fn delete_test() {
438        let inner = InMemory::new();
439        let store = ThrottledStore::new(inner, ThrottleConfig::default());
440
441        assert_bounds!(measure_delete(&store, None).await, 0);
442        assert_bounds!(measure_delete(&store, Some(0)).await, 0);
443        assert_bounds!(measure_delete(&store, Some(10)).await, 0);
444
445        store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
446        assert_bounds!(measure_delete(&store, None).await, 1);
447        assert_bounds!(measure_delete(&store, Some(0)).await, 1);
448        assert_bounds!(measure_delete(&store, Some(10)).await, 1);
449    }
450
451    #[tokio::test]
452    // macos github runner is so slow it can't complete within WAIT_TIME*2
453    #[cfg(target_os = "linux")]
454    async fn get_test() {
455        let inner = InMemory::new();
456        let store = ThrottledStore::new(inner, ThrottleConfig::default());
457
458        assert_bounds!(measure_get(&store, None).await, 0);
459        assert_bounds!(measure_get(&store, Some(0)).await, 0);
460        assert_bounds!(measure_get(&store, Some(10)).await, 0);
461
462        store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
463        assert_bounds!(measure_get(&store, None).await, 1);
464        assert_bounds!(measure_get(&store, Some(0)).await, 1);
465        assert_bounds!(measure_get(&store, Some(10)).await, 1);
466
467        store.config_mut(|cfg| {
468            cfg.wait_get_per_call = ZERO;
469            cfg.wait_get_per_byte = WAIT_TIME;
470        });
471        assert_bounds!(measure_get(&store, Some(2)).await, 2);
472
473        store.config_mut(|cfg| {
474            cfg.wait_get_per_call = WAIT_TIME;
475            cfg.wait_get_per_byte = WAIT_TIME;
476        });
477        assert_bounds!(measure_get(&store, Some(2)).await, 3);
478    }
479
480    #[tokio::test]
481    // macos github runner is so slow it can't complete within WAIT_TIME*2
482    #[cfg(target_os = "linux")]
483    async fn list_test() {
484        let inner = InMemory::new();
485        let store = ThrottledStore::new(inner, ThrottleConfig::default());
486
487        assert_bounds!(measure_list(&store, 0).await, 0);
488        assert_bounds!(measure_list(&store, 10).await, 0);
489
490        store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
491        assert_bounds!(measure_list(&store, 0).await, 1);
492        assert_bounds!(measure_list(&store, 10).await, 1);
493
494        store.config_mut(|cfg| {
495            cfg.wait_list_per_call = ZERO;
496            cfg.wait_list_per_entry = WAIT_TIME;
497        });
498        assert_bounds!(measure_list(&store, 2).await, 2);
499
500        store.config_mut(|cfg| {
501            cfg.wait_list_per_call = WAIT_TIME;
502            cfg.wait_list_per_entry = WAIT_TIME;
503        });
504        assert_bounds!(measure_list(&store, 2).await, 3);
505    }
506
507    #[tokio::test]
508    // macos github runner is so slow it can't complete within WAIT_TIME*2
509    #[cfg(target_os = "linux")]
510    async fn list_with_delimiter_test() {
511        let inner = InMemory::new();
512        let store = ThrottledStore::new(inner, ThrottleConfig::default());
513
514        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
515        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
516
517        store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
518        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
519        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
520
521        store.config_mut(|cfg| {
522            cfg.wait_list_with_delimiter_per_call = ZERO;
523            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
524        });
525        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
526
527        store.config_mut(|cfg| {
528            cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
529            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
530        });
531        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
532    }
533
534    #[tokio::test]
535    async fn put_test() {
536        let inner = InMemory::new();
537        let store = ThrottledStore::new(inner, ThrottleConfig::default());
538
539        assert_bounds!(measure_put(&store, 0).await, 0);
540        assert_bounds!(measure_put(&store, 10).await, 0);
541
542        store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
543        assert_bounds!(measure_put(&store, 0).await, 1);
544        assert_bounds!(measure_put(&store, 10).await, 1);
545
546        store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
547        assert_bounds!(measure_put(&store, 0).await, 0);
548    }
549
550    async fn place_test_object(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Path {
551        let path = Path::from("foo");
552
553        if let Some(n_bytes) = n_bytes {
554            let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
555            store.put(&path, data.into()).await.unwrap();
556        } else {
557            // ensure object is absent
558            store.delete(&path).await.unwrap();
559        }
560
561        path
562    }
563
564    #[allow(dead_code)]
565    async fn place_test_objects(store: &ThrottledStore<InMemory>, n_entries: usize) -> Path {
566        let prefix = Path::from("foo");
567
568        // clean up store
569        let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap();
570
571        for entry in entries {
572            store.delete(&entry.location).await.unwrap();
573        }
574
575        // create new entries
576        for i in 0..n_entries {
577            let path = prefix.child(i.to_string().as_str());
578            store.put(&path, "bar".into()).await.unwrap();
579        }
580
581        prefix
582    }
583
584    async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
585        let path = place_test_object(store, n_bytes).await;
586
587        let t0 = Instant::now();
588        store.delete(&path).await.unwrap();
589
590        t0.elapsed()
591    }
592
593    #[allow(dead_code)]
594    async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
595        let path = place_test_object(store, n_bytes).await;
596
597        let t0 = Instant::now();
598        let res = store.get(&path).await;
599        if n_bytes.is_some() {
600            // need to consume bytes to provoke sleep times
601            let s = match res.unwrap().payload {
602                GetResultPayload::Stream(s) => s,
603                GetResultPayload::File(_, _) => unimplemented!(),
604            };
605
606            s.map_ok(|b| bytes::BytesMut::from(&b[..]))
607                .try_concat()
608                .await
609                .unwrap();
610        } else {
611            assert!(res.is_err());
612        }
613
614        t0.elapsed()
615    }
616
617    #[allow(dead_code)]
618    async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
619        let prefix = place_test_objects(store, n_entries).await;
620
621        let t0 = Instant::now();
622        store
623            .list(Some(&prefix))
624            .try_collect::<Vec<_>>()
625            .await
626            .unwrap();
627
628        t0.elapsed()
629    }
630
631    #[allow(dead_code)]
632    async fn measure_list_with_delimiter(
633        store: &ThrottledStore<InMemory>,
634        n_entries: usize,
635    ) -> Duration {
636        let prefix = place_test_objects(store, n_entries).await;
637
638        let t0 = Instant::now();
639        store.list_with_delimiter(Some(&prefix)).await.unwrap();
640
641        t0.elapsed()
642    }
643
644    async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
645        let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
646
647        let t0 = Instant::now();
648        store.put(&Path::from("foo"), data.into()).await.unwrap();
649
650        t0.elapsed()
651    }
652}