object_store/
prefix.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store wrapper handling a constant path prefix
19use bytes::Bytes;
20use futures::{stream::BoxStream, StreamExt, TryStreamExt};
21use std::ops::Range;
22
23use crate::path::Path;
24use crate::{
25    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
26    PutOptions, PutPayload, PutResult, Result,
27};
28
29#[doc(hidden)]
30#[deprecated(note = "Use PrefixStore")]
31pub type PrefixObjectStore<T> = PrefixStore<T>;
32
33/// Store wrapper that applies a constant prefix to all paths handled by the store.
34#[derive(Debug, Clone)]
35pub struct PrefixStore<T: ObjectStore> {
36    prefix: Path,
37    inner: T,
38}
39
40impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
43    }
44}
45
46impl<T: ObjectStore> PrefixStore<T> {
47    /// Create a new instance of [`PrefixStore`]
48    pub fn new(store: T, prefix: impl Into<Path>) -> Self {
49        Self {
50            prefix: prefix.into(),
51            inner: store,
52        }
53    }
54
55    /// Create the full path from a path relative to prefix
56    fn full_path(&self, location: &Path) -> Path {
57        self.prefix.parts().chain(location.parts()).collect()
58    }
59
60    /// Strip the constant prefix from a given path
61    fn strip_prefix(&self, path: Path) -> Path {
62        // Note cannot use match because of borrow checker
63        if let Some(suffix) = path.prefix_match(&self.prefix) {
64            return suffix.collect();
65        }
66        path
67    }
68
69    /// Strip the constant prefix from a given ObjectMeta
70    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
71        ObjectMeta {
72            last_modified: meta.last_modified,
73            size: meta.size,
74            location: self.strip_prefix(meta.location),
75            e_tag: meta.e_tag,
76            version: None,
77        }
78    }
79}
80
81#[async_trait::async_trait]
82impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
83    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
84        let full_path = self.full_path(location);
85        self.inner.put(&full_path, payload).await
86    }
87
88    async fn put_opts(
89        &self,
90        location: &Path,
91        payload: PutPayload,
92        opts: PutOptions,
93    ) -> Result<PutResult> {
94        let full_path = self.full_path(location);
95        self.inner.put_opts(&full_path, payload, opts).await
96    }
97
98    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
99        let full_path = self.full_path(location);
100        self.inner.put_multipart(&full_path).await
101    }
102
103    async fn put_multipart_opts(
104        &self,
105        location: &Path,
106        opts: PutMultipartOpts,
107    ) -> Result<Box<dyn MultipartUpload>> {
108        let full_path = self.full_path(location);
109        self.inner.put_multipart_opts(&full_path, opts).await
110    }
111
112    async fn get(&self, location: &Path) -> Result<GetResult> {
113        let full_path = self.full_path(location);
114        self.inner.get(&full_path).await
115    }
116
117    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
118        let full_path = self.full_path(location);
119        self.inner.get_range(&full_path, range).await
120    }
121
122    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
123        let full_path = self.full_path(location);
124        self.inner.get_opts(&full_path, options).await
125    }
126
127    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
128        let full_path = self.full_path(location);
129        self.inner.get_ranges(&full_path, ranges).await
130    }
131
132    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
133        let full_path = self.full_path(location);
134        let meta = self.inner.head(&full_path).await?;
135        Ok(self.strip_meta(meta))
136    }
137
138    async fn delete(&self, location: &Path) -> Result<()> {
139        let full_path = self.full_path(location);
140        self.inner.delete(&full_path).await
141    }
142
143    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
144        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
145        let s = self.inner.list(Some(&prefix));
146        s.map_ok(|meta| self.strip_meta(meta)).boxed()
147    }
148
149    fn list_with_offset(
150        &self,
151        prefix: Option<&Path>,
152        offset: &Path,
153    ) -> BoxStream<'_, Result<ObjectMeta>> {
154        let offset = self.full_path(offset);
155        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
156        let s = self.inner.list_with_offset(Some(&prefix), &offset);
157        s.map_ok(|meta| self.strip_meta(meta)).boxed()
158    }
159
160    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
161        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
162        self.inner
163            .list_with_delimiter(Some(&prefix))
164            .await
165            .map(|lst| ListResult {
166                common_prefixes: lst
167                    .common_prefixes
168                    .into_iter()
169                    .map(|p| self.strip_prefix(p))
170                    .collect(),
171                objects: lst
172                    .objects
173                    .into_iter()
174                    .map(|meta| self.strip_meta(meta))
175                    .collect(),
176            })
177    }
178
179    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
180        let full_from = self.full_path(from);
181        let full_to = self.full_path(to);
182        self.inner.copy(&full_from, &full_to).await
183    }
184
185    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
186        let full_from = self.full_path(from);
187        let full_to = self.full_path(to);
188        self.inner.rename(&full_from, &full_to).await
189    }
190
191    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
192        let full_from = self.full_path(from);
193        let full_to = self.full_path(to);
194        self.inner.copy_if_not_exists(&full_from, &full_to).await
195    }
196
197    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
198        let full_from = self.full_path(from);
199        let full_to = self.full_path(to);
200        self.inner.rename_if_not_exists(&full_from, &full_to).await
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::integration::*;
208    use crate::local::LocalFileSystem;
209
210    use tempfile::TempDir;
211
212    #[tokio::test]
213    async fn prefix_test() {
214        let root = TempDir::new().unwrap();
215        let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
216        let integration = PrefixStore::new(inner, "prefix");
217
218        put_get_delete_list(&integration).await;
219        get_opts(&integration).await;
220        list_uses_directories_correctly(&integration).await;
221        list_with_delimiter(&integration).await;
222        rename_and_copy(&integration).await;
223        copy_if_not_exists(&integration).await;
224        stream_get(&integration).await;
225    }
226
227    #[tokio::test]
228    async fn prefix_test_applies_prefix() {
229        let tmpdir = TempDir::new().unwrap();
230        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
231
232        let location = Path::from("prefix/test_file.json");
233        let data = Bytes::from("arbitrary data");
234
235        local.put(&location, data.clone().into()).await.unwrap();
236
237        let prefix = PrefixStore::new(local, "prefix");
238        let location_prefix = Path::from("test_file.json");
239
240        let content_list = flatten_list_stream(&prefix, None).await.unwrap();
241        assert_eq!(content_list, &[location_prefix.clone()]);
242
243        let root = Path::from("/");
244        let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
245        assert_eq!(content_list, &[location_prefix.clone()]);
246
247        let read_data = prefix
248            .get(&location_prefix)
249            .await
250            .unwrap()
251            .bytes()
252            .await
253            .unwrap();
254        assert_eq!(&*read_data, data);
255
256        let target_prefix = Path::from("/test_written.json");
257        prefix
258            .put(&target_prefix, data.clone().into())
259            .await
260            .unwrap();
261
262        prefix.delete(&location_prefix).await.unwrap();
263
264        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
265
266        let err = local.get(&location).await.unwrap_err();
267        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
268
269        let location = Path::from("prefix/test_written.json");
270        let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
271        assert_eq!(&*read_data, data)
272    }
273}