1use bytes::Bytes;
20use futures::{stream::BoxStream, StreamExt, TryStreamExt};
21use std::ops::Range;
22
23use crate::path::Path;
24use crate::{
25 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
26 PutOptions, PutPayload, PutResult, Result,
27};
28
29#[doc(hidden)]
30#[deprecated(note = "Use PrefixStore")]
31pub type PrefixObjectStore<T> = PrefixStore<T>;
32
33#[derive(Debug, Clone)]
35pub struct PrefixStore<T: ObjectStore> {
36 prefix: Path,
37 inner: T,
38}
39
40impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
43 }
44}
45
46impl<T: ObjectStore> PrefixStore<T> {
47 pub fn new(store: T, prefix: impl Into<Path>) -> Self {
49 Self {
50 prefix: prefix.into(),
51 inner: store,
52 }
53 }
54
55 fn full_path(&self, location: &Path) -> Path {
57 self.prefix.parts().chain(location.parts()).collect()
58 }
59
60 fn strip_prefix(&self, path: Path) -> Path {
62 if let Some(suffix) = path.prefix_match(&self.prefix) {
64 return suffix.collect();
65 }
66 path
67 }
68
69 fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
71 ObjectMeta {
72 last_modified: meta.last_modified,
73 size: meta.size,
74 location: self.strip_prefix(meta.location),
75 e_tag: meta.e_tag,
76 version: None,
77 }
78 }
79}
80
81#[async_trait::async_trait]
82impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
83 async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
84 let full_path = self.full_path(location);
85 self.inner.put(&full_path, payload).await
86 }
87
88 async fn put_opts(
89 &self,
90 location: &Path,
91 payload: PutPayload,
92 opts: PutOptions,
93 ) -> Result<PutResult> {
94 let full_path = self.full_path(location);
95 self.inner.put_opts(&full_path, payload, opts).await
96 }
97
98 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
99 let full_path = self.full_path(location);
100 self.inner.put_multipart(&full_path).await
101 }
102
103 async fn put_multipart_opts(
104 &self,
105 location: &Path,
106 opts: PutMultipartOpts,
107 ) -> Result<Box<dyn MultipartUpload>> {
108 let full_path = self.full_path(location);
109 self.inner.put_multipart_opts(&full_path, opts).await
110 }
111
112 async fn get(&self, location: &Path) -> Result<GetResult> {
113 let full_path = self.full_path(location);
114 self.inner.get(&full_path).await
115 }
116
117 async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
118 let full_path = self.full_path(location);
119 self.inner.get_range(&full_path, range).await
120 }
121
122 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
123 let full_path = self.full_path(location);
124 self.inner.get_opts(&full_path, options).await
125 }
126
127 async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
128 let full_path = self.full_path(location);
129 self.inner.get_ranges(&full_path, ranges).await
130 }
131
132 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
133 let full_path = self.full_path(location);
134 let meta = self.inner.head(&full_path).await?;
135 Ok(self.strip_meta(meta))
136 }
137
138 async fn delete(&self, location: &Path) -> Result<()> {
139 let full_path = self.full_path(location);
140 self.inner.delete(&full_path).await
141 }
142
143 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
144 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
145 let s = self.inner.list(Some(&prefix));
146 s.map_ok(|meta| self.strip_meta(meta)).boxed()
147 }
148
149 fn list_with_offset(
150 &self,
151 prefix: Option<&Path>,
152 offset: &Path,
153 ) -> BoxStream<'_, Result<ObjectMeta>> {
154 let offset = self.full_path(offset);
155 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
156 let s = self.inner.list_with_offset(Some(&prefix), &offset);
157 s.map_ok(|meta| self.strip_meta(meta)).boxed()
158 }
159
160 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
161 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
162 self.inner
163 .list_with_delimiter(Some(&prefix))
164 .await
165 .map(|lst| ListResult {
166 common_prefixes: lst
167 .common_prefixes
168 .into_iter()
169 .map(|p| self.strip_prefix(p))
170 .collect(),
171 objects: lst
172 .objects
173 .into_iter()
174 .map(|meta| self.strip_meta(meta))
175 .collect(),
176 })
177 }
178
179 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
180 let full_from = self.full_path(from);
181 let full_to = self.full_path(to);
182 self.inner.copy(&full_from, &full_to).await
183 }
184
185 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
186 let full_from = self.full_path(from);
187 let full_to = self.full_path(to);
188 self.inner.rename(&full_from, &full_to).await
189 }
190
191 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
192 let full_from = self.full_path(from);
193 let full_to = self.full_path(to);
194 self.inner.copy_if_not_exists(&full_from, &full_to).await
195 }
196
197 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
198 let full_from = self.full_path(from);
199 let full_to = self.full_path(to);
200 self.inner.rename_if_not_exists(&full_from, &full_to).await
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::integration::*;
208 use crate::local::LocalFileSystem;
209
210 use tempfile::TempDir;
211
212 #[tokio::test]
213 async fn prefix_test() {
214 let root = TempDir::new().unwrap();
215 let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
216 let integration = PrefixStore::new(inner, "prefix");
217
218 put_get_delete_list(&integration).await;
219 get_opts(&integration).await;
220 list_uses_directories_correctly(&integration).await;
221 list_with_delimiter(&integration).await;
222 rename_and_copy(&integration).await;
223 copy_if_not_exists(&integration).await;
224 stream_get(&integration).await;
225 }
226
227 #[tokio::test]
228 async fn prefix_test_applies_prefix() {
229 let tmpdir = TempDir::new().unwrap();
230 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
231
232 let location = Path::from("prefix/test_file.json");
233 let data = Bytes::from("arbitrary data");
234
235 local.put(&location, data.clone().into()).await.unwrap();
236
237 let prefix = PrefixStore::new(local, "prefix");
238 let location_prefix = Path::from("test_file.json");
239
240 let content_list = flatten_list_stream(&prefix, None).await.unwrap();
241 assert_eq!(content_list, &[location_prefix.clone()]);
242
243 let root = Path::from("/");
244 let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
245 assert_eq!(content_list, &[location_prefix.clone()]);
246
247 let read_data = prefix
248 .get(&location_prefix)
249 .await
250 .unwrap()
251 .bytes()
252 .await
253 .unwrap();
254 assert_eq!(&*read_data, data);
255
256 let target_prefix = Path::from("/test_written.json");
257 prefix
258 .put(&target_prefix, data.clone().into())
259 .await
260 .unwrap();
261
262 prefix.delete(&location_prefix).await.unwrap();
263
264 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
265
266 let err = local.get(&location).await.unwrap_err();
267 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
268
269 let location = Path::from("prefix/test_written.json");
270 let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
271 assert_eq!(&*read_data, data)
272 }
273}