1use parking_lot::Mutex;
20use std::ops::Range;
21use std::{convert::TryInto, sync::Arc};
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::{
25 path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta,
26 ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
27};
28use crate::{GetOptions, UploadPart};
29use async_trait::async_trait;
30use bytes::Bytes;
31use futures::{stream::BoxStream, FutureExt, StreamExt};
32use std::time::Duration;
33
34#[derive(Debug, Default, Clone, Copy)]
36pub struct ThrottleConfig {
37 pub wait_delete_per_call: Duration,
42
43 pub wait_get_per_byte: Duration,
52
53 pub wait_get_per_call: Duration,
59
60 pub wait_list_per_call: Duration,
66
67 pub wait_list_per_entry: Duration,
76
77 pub wait_list_with_delimiter_per_call: Duration,
84
85 pub wait_list_with_delimiter_per_entry: Duration,
92
93 pub wait_put_per_call: Duration,
98}
99
100async fn sleep(duration: Duration) {
102 if !duration.is_zero() {
103 tokio::time::sleep(duration).await
104 }
105}
106
107#[derive(Debug)]
114pub struct ThrottledStore<T> {
115 inner: T,
116 config: Arc<Mutex<ThrottleConfig>>,
117}
118
119impl<T> ThrottledStore<T> {
120 pub fn new(inner: T, config: ThrottleConfig) -> Self {
122 Self {
123 inner,
124 config: Arc::new(Mutex::new(config)),
125 }
126 }
127
128 pub fn config_mut<F>(&self, f: F)
130 where
131 F: Fn(&mut ThrottleConfig),
132 {
133 let mut guard = self.config.lock();
134 f(&mut guard)
135 }
136
137 pub fn config(&self) -> ThrottleConfig {
139 *self.config.lock()
140 }
141}
142
143impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 write!(f, "ThrottledStore({})", self.inner)
146 }
147}
148
149#[async_trait]
150impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
151 async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
152 sleep(self.config().wait_put_per_call).await;
153 self.inner.put(location, payload).await
154 }
155
156 async fn put_opts(
157 &self,
158 location: &Path,
159 payload: PutPayload,
160 opts: PutOptions,
161 ) -> Result<PutResult> {
162 sleep(self.config().wait_put_per_call).await;
163 self.inner.put_opts(location, payload, opts).await
164 }
165
166 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
167 let upload = self.inner.put_multipart(location).await?;
168 Ok(Box::new(ThrottledUpload {
169 upload,
170 sleep: self.config().wait_put_per_call,
171 }))
172 }
173
174 async fn put_multipart_opts(
175 &self,
176 location: &Path,
177 opts: PutMultipartOpts,
178 ) -> Result<Box<dyn MultipartUpload>> {
179 let upload = self.inner.put_multipart_opts(location, opts).await?;
180 Ok(Box::new(ThrottledUpload {
181 upload,
182 sleep: self.config().wait_put_per_call,
183 }))
184 }
185
186 async fn get(&self, location: &Path) -> Result<GetResult> {
187 sleep(self.config().wait_get_per_call).await;
188
189 let wait_get_per_byte = self.config().wait_get_per_byte;
191
192 let result = self.inner.get(location).await?;
193 Ok(throttle_get(result, wait_get_per_byte))
194 }
195
196 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
197 sleep(self.config().wait_get_per_call).await;
198
199 let wait_get_per_byte = self.config().wait_get_per_byte;
201
202 let result = self.inner.get_opts(location, options).await?;
203 Ok(throttle_get(result, wait_get_per_byte))
204 }
205
206 async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
207 let config = self.config();
208
209 let sleep_duration =
210 config.wait_get_per_call + config.wait_get_per_byte * (range.end - range.start) as u32;
211
212 sleep(sleep_duration).await;
213
214 self.inner.get_range(location, range).await
215 }
216
217 async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
218 let config = self.config();
219
220 let total_bytes: usize = ranges.iter().map(|range| range.end - range.start).sum();
221 let sleep_duration =
222 config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32;
223
224 sleep(sleep_duration).await;
225
226 self.inner.get_ranges(location, ranges).await
227 }
228
229 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
230 sleep(self.config().wait_put_per_call).await;
231 self.inner.head(location).await
232 }
233
234 async fn delete(&self, location: &Path) -> Result<()> {
235 sleep(self.config().wait_delete_per_call).await;
236
237 self.inner.delete(location).await
238 }
239
240 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
241 let stream = self.inner.list(prefix);
242 futures::stream::once(async move {
243 let wait_list_per_entry = self.config().wait_list_per_entry;
244 sleep(self.config().wait_list_per_call).await;
245 throttle_stream(stream, move |_| wait_list_per_entry)
246 })
247 .flatten()
248 .boxed()
249 }
250
251 fn list_with_offset(
252 &self,
253 prefix: Option<&Path>,
254 offset: &Path,
255 ) -> BoxStream<'_, Result<ObjectMeta>> {
256 let stream = self.inner.list_with_offset(prefix, offset);
257 futures::stream::once(async move {
258 let wait_list_per_entry = self.config().wait_list_per_entry;
259 sleep(self.config().wait_list_per_call).await;
260 throttle_stream(stream, move |_| wait_list_per_entry)
261 })
262 .flatten()
263 .boxed()
264 }
265
266 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
267 sleep(self.config().wait_list_with_delimiter_per_call).await;
268
269 match self.inner.list_with_delimiter(prefix).await {
270 Ok(list_result) => {
271 let entries_len = usize_to_u32_saturate(list_result.objects.len());
272 sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await;
273 Ok(list_result)
274 }
275 Err(err) => Err(err),
276 }
277 }
278
279 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
280 sleep(self.config().wait_put_per_call).await;
281
282 self.inner.copy(from, to).await
283 }
284
285 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
286 sleep(self.config().wait_put_per_call).await;
287
288 self.inner.rename(from, to).await
289 }
290
291 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
292 sleep(self.config().wait_put_per_call).await;
293
294 self.inner.copy_if_not_exists(from, to).await
295 }
296
297 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
298 sleep(self.config().wait_put_per_call).await;
299
300 self.inner.rename_if_not_exists(from, to).await
301 }
302}
303
304fn usize_to_u32_saturate(x: usize) -> u32 {
306 x.try_into().unwrap_or(u32::MAX)
307}
308
309fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
310 let s = match result.payload {
311 GetResultPayload::Stream(s) => s,
312 GetResultPayload::File(_, _) => unimplemented!(),
313 };
314
315 let stream = throttle_stream(s, move |bytes| {
316 let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
317 wait_get_per_byte * bytes_len
318 });
319
320 GetResult {
321 payload: GetResultPayload::Stream(stream),
322 ..result
323 }
324}
325
326fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
327 stream: BoxStream<'_, Result<T, E>>,
328 delay: F,
329) -> BoxStream<'_, Result<T, E>>
330where
331 F: Fn(&T) -> Duration + Send + Sync + 'static,
332{
333 stream
334 .then(move |result| {
335 let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
336 sleep(delay).then(|_| futures::future::ready(result))
337 })
338 .boxed()
339}
340
341#[async_trait]
342impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
343 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
344 self.inner.create_multipart(path).await
345 }
346
347 async fn put_part(
348 &self,
349 path: &Path,
350 id: &MultipartId,
351 part_idx: usize,
352 data: PutPayload,
353 ) -> Result<PartId> {
354 sleep(self.config().wait_put_per_call).await;
355 self.inner.put_part(path, id, part_idx, data).await
356 }
357
358 async fn complete_multipart(
359 &self,
360 path: &Path,
361 id: &MultipartId,
362 parts: Vec<PartId>,
363 ) -> Result<PutResult> {
364 self.inner.complete_multipart(path, id, parts).await
365 }
366
367 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
368 self.inner.abort_multipart(path, id).await
369 }
370}
371
372#[derive(Debug)]
373struct ThrottledUpload {
374 upload: Box<dyn MultipartUpload>,
375 sleep: Duration,
376}
377
378#[async_trait]
379impl MultipartUpload for ThrottledUpload {
380 fn put_part(&mut self, data: PutPayload) -> UploadPart {
381 let duration = self.sleep;
382 let put = self.upload.put_part(data);
383 Box::pin(async move {
384 sleep(duration).await;
385 put.await
386 })
387 }
388
389 async fn complete(&mut self) -> Result<PutResult> {
390 self.upload.complete().await
391 }
392
393 async fn abort(&mut self) -> Result<()> {
394 self.upload.abort().await
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::{integration::*, memory::InMemory, GetResultPayload};
402 use futures::TryStreamExt;
403 use tokio::time::Duration;
404 use tokio::time::Instant;
405
406 const WAIT_TIME: Duration = Duration::from_millis(100);
407 const ZERO: Duration = Duration::from_millis(0); macro_rules! assert_bounds {
410 ($d:expr, $lower:expr) => {
411 assert_bounds!($d, $lower, $lower + 2);
412 };
413 ($d:expr, $lower:expr, $upper:expr) => {
414 let d = $d;
415 let lower = $lower * WAIT_TIME;
416 let upper = $upper * WAIT_TIME;
417 assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
418 assert!(d < upper, "{:?} must be < than {:?}", d, upper);
419 };
420 }
421
422 #[tokio::test]
423 async fn throttle_test() {
424 let inner = InMemory::new();
425 let store = ThrottledStore::new(inner, ThrottleConfig::default());
426
427 put_get_delete_list(&store).await;
428 list_uses_directories_correctly(&store).await;
429 list_with_delimiter(&store).await;
430 rename_and_copy(&store).await;
431 copy_if_not_exists(&store).await;
432 stream_get(&store).await;
433 multipart(&store, &store).await;
434 }
435
436 #[tokio::test]
437 async fn delete_test() {
438 let inner = InMemory::new();
439 let store = ThrottledStore::new(inner, ThrottleConfig::default());
440
441 assert_bounds!(measure_delete(&store, None).await, 0);
442 assert_bounds!(measure_delete(&store, Some(0)).await, 0);
443 assert_bounds!(measure_delete(&store, Some(10)).await, 0);
444
445 store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
446 assert_bounds!(measure_delete(&store, None).await, 1);
447 assert_bounds!(measure_delete(&store, Some(0)).await, 1);
448 assert_bounds!(measure_delete(&store, Some(10)).await, 1);
449 }
450
451 #[tokio::test]
452 #[cfg(target_os = "linux")]
454 async fn get_test() {
455 let inner = InMemory::new();
456 let store = ThrottledStore::new(inner, ThrottleConfig::default());
457
458 assert_bounds!(measure_get(&store, None).await, 0);
459 assert_bounds!(measure_get(&store, Some(0)).await, 0);
460 assert_bounds!(measure_get(&store, Some(10)).await, 0);
461
462 store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
463 assert_bounds!(measure_get(&store, None).await, 1);
464 assert_bounds!(measure_get(&store, Some(0)).await, 1);
465 assert_bounds!(measure_get(&store, Some(10)).await, 1);
466
467 store.config_mut(|cfg| {
468 cfg.wait_get_per_call = ZERO;
469 cfg.wait_get_per_byte = WAIT_TIME;
470 });
471 assert_bounds!(measure_get(&store, Some(2)).await, 2);
472
473 store.config_mut(|cfg| {
474 cfg.wait_get_per_call = WAIT_TIME;
475 cfg.wait_get_per_byte = WAIT_TIME;
476 });
477 assert_bounds!(measure_get(&store, Some(2)).await, 3);
478 }
479
480 #[tokio::test]
481 #[cfg(target_os = "linux")]
483 async fn list_test() {
484 let inner = InMemory::new();
485 let store = ThrottledStore::new(inner, ThrottleConfig::default());
486
487 assert_bounds!(measure_list(&store, 0).await, 0);
488 assert_bounds!(measure_list(&store, 10).await, 0);
489
490 store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
491 assert_bounds!(measure_list(&store, 0).await, 1);
492 assert_bounds!(measure_list(&store, 10).await, 1);
493
494 store.config_mut(|cfg| {
495 cfg.wait_list_per_call = ZERO;
496 cfg.wait_list_per_entry = WAIT_TIME;
497 });
498 assert_bounds!(measure_list(&store, 2).await, 2);
499
500 store.config_mut(|cfg| {
501 cfg.wait_list_per_call = WAIT_TIME;
502 cfg.wait_list_per_entry = WAIT_TIME;
503 });
504 assert_bounds!(measure_list(&store, 2).await, 3);
505 }
506
507 #[tokio::test]
508 #[cfg(target_os = "linux")]
510 async fn list_with_delimiter_test() {
511 let inner = InMemory::new();
512 let store = ThrottledStore::new(inner, ThrottleConfig::default());
513
514 assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
515 assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
516
517 store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
518 assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
519 assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
520
521 store.config_mut(|cfg| {
522 cfg.wait_list_with_delimiter_per_call = ZERO;
523 cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
524 });
525 assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
526
527 store.config_mut(|cfg| {
528 cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
529 cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
530 });
531 assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
532 }
533
534 #[tokio::test]
535 async fn put_test() {
536 let inner = InMemory::new();
537 let store = ThrottledStore::new(inner, ThrottleConfig::default());
538
539 assert_bounds!(measure_put(&store, 0).await, 0);
540 assert_bounds!(measure_put(&store, 10).await, 0);
541
542 store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
543 assert_bounds!(measure_put(&store, 0).await, 1);
544 assert_bounds!(measure_put(&store, 10).await, 1);
545
546 store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
547 assert_bounds!(measure_put(&store, 0).await, 0);
548 }
549
550 async fn place_test_object(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Path {
551 let path = Path::from("foo");
552
553 if let Some(n_bytes) = n_bytes {
554 let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
555 store.put(&path, data.into()).await.unwrap();
556 } else {
557 store.delete(&path).await.unwrap();
559 }
560
561 path
562 }
563
564 #[allow(dead_code)]
565 async fn place_test_objects(store: &ThrottledStore<InMemory>, n_entries: usize) -> Path {
566 let prefix = Path::from("foo");
567
568 let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap();
570
571 for entry in entries {
572 store.delete(&entry.location).await.unwrap();
573 }
574
575 for i in 0..n_entries {
577 let path = prefix.child(i.to_string().as_str());
578 store.put(&path, "bar".into()).await.unwrap();
579 }
580
581 prefix
582 }
583
584 async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
585 let path = place_test_object(store, n_bytes).await;
586
587 let t0 = Instant::now();
588 store.delete(&path).await.unwrap();
589
590 t0.elapsed()
591 }
592
593 #[allow(dead_code)]
594 async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
595 let path = place_test_object(store, n_bytes).await;
596
597 let t0 = Instant::now();
598 let res = store.get(&path).await;
599 if n_bytes.is_some() {
600 let s = match res.unwrap().payload {
602 GetResultPayload::Stream(s) => s,
603 GetResultPayload::File(_, _) => unimplemented!(),
604 };
605
606 s.map_ok(|b| bytes::BytesMut::from(&b[..]))
607 .try_concat()
608 .await
609 .unwrap();
610 } else {
611 assert!(res.is_err());
612 }
613
614 t0.elapsed()
615 }
616
617 #[allow(dead_code)]
618 async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
619 let prefix = place_test_objects(store, n_entries).await;
620
621 let t0 = Instant::now();
622 store
623 .list(Some(&prefix))
624 .try_collect::<Vec<_>>()
625 .await
626 .unwrap();
627
628 t0.elapsed()
629 }
630
631 #[allow(dead_code)]
632 async fn measure_list_with_delimiter(
633 store: &ThrottledStore<InMemory>,
634 n_entries: usize,
635 ) -> Duration {
636 let prefix = place_test_objects(store, n_entries).await;
637
638 let t0 = Instant::now();
639 store.list_with_delimiter(Some(&prefix)).await.unwrap();
640
641 t0.elapsed()
642 }
643
644 async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
645 let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
646
647 let t0 = Instant::now();
648 store.put(&Path::from("foo"), data.into()).await.unwrap();
649
650 t0.elapsed()
651 }
652}