object_store/client/
list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::pagination::stream_paginated;
19use crate::path::Path;
20use crate::Result;
21use crate::{ListResult, ObjectMeta};
22use async_trait::async_trait;
23use futures::stream::BoxStream;
24use futures::{StreamExt, TryStreamExt};
25use std::collections::BTreeSet;
26
27/// A client that can perform paginated list requests
28#[async_trait]
29pub trait ListClient: Send + Sync + 'static {
30    async fn list_request(
31        &self,
32        prefix: Option<&str>,
33        delimiter: bool,
34        token: Option<&str>,
35        offset: Option<&str>,
36    ) -> Result<(ListResult, Option<String>)>;
37}
38
39/// Extension trait for [`ListClient`] that adds common listing functionality
40#[async_trait]
41pub trait ListClientExt {
42    fn list_paginated(
43        &self,
44        prefix: Option<&Path>,
45        delimiter: bool,
46        offset: Option<&Path>,
47    ) -> BoxStream<'_, Result<ListResult>>;
48
49    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
50
51    #[allow(unused)]
52    fn list_with_offset(
53        &self,
54        prefix: Option<&Path>,
55        offset: &Path,
56    ) -> BoxStream<'_, Result<ObjectMeta>>;
57
58    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
59}
60
61#[async_trait]
62impl<T: ListClient> ListClientExt for T {
63    fn list_paginated(
64        &self,
65        prefix: Option<&Path>,
66        delimiter: bool,
67        offset: Option<&Path>,
68    ) -> BoxStream<'_, Result<ListResult>> {
69        let offset = offset.map(|x| x.to_string());
70        let prefix = prefix
71            .filter(|x| !x.as_ref().is_empty())
72            .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));
73
74        stream_paginated(
75            (prefix, offset),
76            move |(prefix, offset), token| async move {
77                let (r, next_token) = self
78                    .list_request(
79                        prefix.as_deref(),
80                        delimiter,
81                        token.as_deref(),
82                        offset.as_deref(),
83                    )
84                    .await?;
85                Ok((r, (prefix, offset), next_token))
86            },
87        )
88        .boxed()
89    }
90
91    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
92        self.list_paginated(prefix, false, None)
93            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
94            .try_flatten()
95            .boxed()
96    }
97
98    fn list_with_offset(
99        &self,
100        prefix: Option<&Path>,
101        offset: &Path,
102    ) -> BoxStream<'_, Result<ObjectMeta>> {
103        self.list_paginated(prefix, false, Some(offset))
104            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
105            .try_flatten()
106            .boxed()
107    }
108
109    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
110        let mut stream = self.list_paginated(prefix, true, None);
111
112        let mut common_prefixes = BTreeSet::new();
113        let mut objects = Vec::new();
114
115        while let Some(result) = stream.next().await {
116            let response = result?;
117            common_prefixes.extend(response.common_prefixes.into_iter());
118            objects.extend(response.objects.into_iter());
119        }
120
121        Ok(ListResult {
122            common_prefixes: common_prefixes.into_iter().collect(),
123            objects,
124        })
125    }
126}