object_store/client/
list.rs1use crate::client::pagination::stream_paginated;
19use crate::path::Path;
20use crate::Result;
21use crate::{ListResult, ObjectMeta};
22use async_trait::async_trait;
23use futures::stream::BoxStream;
24use futures::{StreamExt, TryStreamExt};
25use std::collections::BTreeSet;
26
27#[async_trait]
29pub trait ListClient: Send + Sync + 'static {
30 async fn list_request(
31 &self,
32 prefix: Option<&str>,
33 delimiter: bool,
34 token: Option<&str>,
35 offset: Option<&str>,
36 ) -> Result<(ListResult, Option<String>)>;
37}
38
39#[async_trait]
41pub trait ListClientExt {
42 fn list_paginated(
43 &self,
44 prefix: Option<&Path>,
45 delimiter: bool,
46 offset: Option<&Path>,
47 ) -> BoxStream<'_, Result<ListResult>>;
48
49 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
50
51 #[allow(unused)]
52 fn list_with_offset(
53 &self,
54 prefix: Option<&Path>,
55 offset: &Path,
56 ) -> BoxStream<'_, Result<ObjectMeta>>;
57
58 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
59}
60
61#[async_trait]
62impl<T: ListClient> ListClientExt for T {
63 fn list_paginated(
64 &self,
65 prefix: Option<&Path>,
66 delimiter: bool,
67 offset: Option<&Path>,
68 ) -> BoxStream<'_, Result<ListResult>> {
69 let offset = offset.map(|x| x.to_string());
70 let prefix = prefix
71 .filter(|x| !x.as_ref().is_empty())
72 .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));
73
74 stream_paginated(
75 (prefix, offset),
76 move |(prefix, offset), token| async move {
77 let (r, next_token) = self
78 .list_request(
79 prefix.as_deref(),
80 delimiter,
81 token.as_deref(),
82 offset.as_deref(),
83 )
84 .await?;
85 Ok((r, (prefix, offset), next_token))
86 },
87 )
88 .boxed()
89 }
90
91 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
92 self.list_paginated(prefix, false, None)
93 .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
94 .try_flatten()
95 .boxed()
96 }
97
98 fn list_with_offset(
99 &self,
100 prefix: Option<&Path>,
101 offset: &Path,
102 ) -> BoxStream<'_, Result<ObjectMeta>> {
103 self.list_paginated(prefix, false, Some(offset))
104 .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
105 .try_flatten()
106 .boxed()
107 }
108
109 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
110 let mut stream = self.list_paginated(prefix, true, None);
111
112 let mut common_prefixes = BTreeSet::new();
113 let mut objects = Vec::new();
114
115 while let Some(result) = stream.next().await {
116 let response = result?;
117 common_prefixes.extend(response.common_prefixes.into_iter());
118 objects.extend(response.objects.into_iter());
119 }
120
121 Ok(ListResult {
122 common_prefixes: common_prefixes.into_iter().collect(),
123 objects,
124 })
125 }
126}