object_store/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An in-memory object store implementation
19use std::collections::{BTreeMap, BTreeSet, HashMap};
20use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use futures::{stream::BoxStream, StreamExt};
27use parking_lot::RwLock;
28use snafu::{OptionExt, ResultExt, Snafu};
29
30use crate::multipart::{MultipartStore, PartId};
31use crate::util::InvalidGetRange;
32use crate::{
33    path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
34    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult,
35    Result, UpdateVersion, UploadPart,
36};
37use crate::{GetOptions, PutPayload};
38
39/// A specialized `Error` for in-memory object store-related errors
40#[derive(Debug, Snafu)]
41#[allow(missing_docs)]
42enum Error {
43    #[snafu(display("No data in memory found. Location: {path}"))]
44    NoDataInMemory { path: String },
45
46    #[snafu(display("Invalid range: {source}"))]
47    Range { source: InvalidGetRange },
48
49    #[snafu(display("Object already exists at that location: {path}"))]
50    AlreadyExists { path: String },
51
52    #[snafu(display("ETag required for conditional update"))]
53    MissingETag,
54
55    #[snafu(display("MultipartUpload not found: {id}"))]
56    UploadNotFound { id: String },
57
58    #[snafu(display("Missing part at index: {part}"))]
59    MissingPart { part: usize },
60}
61
62impl From<Error> for super::Error {
63    fn from(source: Error) -> Self {
64        match source {
65            Error::NoDataInMemory { ref path } => Self::NotFound {
66                path: path.into(),
67                source: source.into(),
68            },
69            Error::AlreadyExists { ref path } => Self::AlreadyExists {
70                path: path.into(),
71                source: source.into(),
72            },
73            _ => Self::Generic {
74                store: "InMemory",
75                source: Box::new(source),
76            },
77        }
78    }
79}
80
81/// In-memory storage suitable for testing or for opting out of using a cloud
82/// storage provider.
83#[derive(Debug, Default)]
84pub struct InMemory {
85    storage: SharedStorage,
86}
87
88#[derive(Debug, Clone)]
89struct Entry {
90    data: Bytes,
91    last_modified: DateTime<Utc>,
92    attributes: Attributes,
93    e_tag: usize,
94}
95
96impl Entry {
97    fn new(
98        data: Bytes,
99        last_modified: DateTime<Utc>,
100        e_tag: usize,
101        attributes: Attributes,
102    ) -> Self {
103        Self {
104            data,
105            last_modified,
106            e_tag,
107            attributes,
108        }
109    }
110}
111
112#[derive(Debug, Default, Clone)]
113struct Storage {
114    next_etag: usize,
115    map: BTreeMap<Path, Entry>,
116    uploads: HashMap<usize, PartStorage>,
117}
118
119#[derive(Debug, Default, Clone)]
120struct PartStorage {
121    parts: Vec<Option<Bytes>>,
122}
123
124type SharedStorage = Arc<RwLock<Storage>>;
125
126impl Storage {
127    fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
128        let etag = self.next_etag;
129        self.next_etag += 1;
130        let entry = Entry::new(bytes, Utc::now(), etag, attributes);
131        self.overwrite(location, entry);
132        etag
133    }
134
135    fn overwrite(&mut self, location: &Path, entry: Entry) {
136        self.map.insert(location.clone(), entry);
137    }
138
139    fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
140        use std::collections::btree_map;
141        match self.map.entry(location.clone()) {
142            btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
143                path: location.to_string(),
144            }
145            .into()),
146            btree_map::Entry::Vacant(v) => {
147                v.insert(entry);
148                Ok(())
149            }
150        }
151    }
152
153    fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> {
154        match self.map.get_mut(location) {
155            // Return Precondition instead of NotFound for consistency with stores
156            None => Err(crate::Error::Precondition {
157                path: location.to_string(),
158                source: format!("Object at location {location} not found").into(),
159            }),
160            Some(e) => {
161                let existing = e.e_tag.to_string();
162                let expected = v.e_tag.context(MissingETagSnafu)?;
163                if existing == expected {
164                    *e = entry;
165                    Ok(())
166                } else {
167                    Err(crate::Error::Precondition {
168                        path: location.to_string(),
169                        source: format!("{existing} does not match {expected}").into(),
170                    })
171                }
172            }
173        }
174    }
175
176    fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
177        let parts = id
178            .parse()
179            .ok()
180            .and_then(|x| self.uploads.get_mut(&x))
181            .context(UploadNotFoundSnafu { id })?;
182        Ok(parts)
183    }
184
185    fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
186        let parts = id
187            .parse()
188            .ok()
189            .and_then(|x| self.uploads.remove(&x))
190            .context(UploadNotFoundSnafu { id })?;
191        Ok(parts)
192    }
193}
194
195impl std::fmt::Display for InMemory {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        write!(f, "InMemory")
198    }
199}
200
201#[async_trait]
202impl ObjectStore for InMemory {
203    async fn put_opts(
204        &self,
205        location: &Path,
206        payload: PutPayload,
207        opts: PutOptions,
208    ) -> Result<PutResult> {
209        let mut storage = self.storage.write();
210        let etag = storage.next_etag;
211        let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
212
213        match opts.mode {
214            PutMode::Overwrite => storage.overwrite(location, entry),
215            PutMode::Create => storage.create(location, entry)?,
216            PutMode::Update(v) => storage.update(location, v, entry)?,
217        }
218        storage.next_etag += 1;
219
220        Ok(PutResult {
221            e_tag: Some(etag.to_string()),
222            version: None,
223        })
224    }
225
226    async fn put_multipart_opts(
227        &self,
228        location: &Path,
229        opts: PutMultipartOpts,
230    ) -> Result<Box<dyn MultipartUpload>> {
231        Ok(Box::new(InMemoryUpload {
232            location: location.clone(),
233            attributes: opts.attributes,
234            parts: vec![],
235            storage: Arc::clone(&self.storage),
236        }))
237    }
238
239    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
240        let entry = self.entry(location).await?;
241        let e_tag = entry.e_tag.to_string();
242
243        let meta = ObjectMeta {
244            location: location.clone(),
245            last_modified: entry.last_modified,
246            size: entry.data.len(),
247            e_tag: Some(e_tag),
248            version: None,
249        };
250        options.check_preconditions(&meta)?;
251
252        let (range, data) = match options.range {
253            Some(range) => {
254                let r = range.as_range(entry.data.len()).context(RangeSnafu)?;
255                (r.clone(), entry.data.slice(r))
256            }
257            None => (0..entry.data.len(), entry.data),
258        };
259        let stream = futures::stream::once(futures::future::ready(Ok(data)));
260
261        Ok(GetResult {
262            payload: GetResultPayload::Stream(stream.boxed()),
263            attributes: entry.attributes,
264            meta,
265            range,
266        })
267    }
268
269    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
270        let entry = self.entry(location).await?;
271        ranges
272            .iter()
273            .map(|range| {
274                let r = GetRange::Bounded(range.clone())
275                    .as_range(entry.data.len())
276                    .context(RangeSnafu)?;
277
278                Ok(entry.data.slice(r))
279            })
280            .collect()
281    }
282
283    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
284        let entry = self.entry(location).await?;
285
286        Ok(ObjectMeta {
287            location: location.clone(),
288            last_modified: entry.last_modified,
289            size: entry.data.len(),
290            e_tag: Some(entry.e_tag.to_string()),
291            version: None,
292        })
293    }
294
295    async fn delete(&self, location: &Path) -> Result<()> {
296        self.storage.write().map.remove(location);
297        Ok(())
298    }
299
300    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
301        let root = Path::default();
302        let prefix = prefix.unwrap_or(&root);
303
304        let storage = self.storage.read();
305        let values: Vec<_> = storage
306            .map
307            .range((prefix)..)
308            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
309            .filter(|(key, _)| {
310                // Don't return for exact prefix match
311                key.prefix_match(prefix)
312                    .map(|mut x| x.next().is_some())
313                    .unwrap_or(false)
314            })
315            .map(|(key, value)| {
316                Ok(ObjectMeta {
317                    location: key.clone(),
318                    last_modified: value.last_modified,
319                    size: value.data.len(),
320                    e_tag: Some(value.e_tag.to_string()),
321                    version: None,
322                })
323            })
324            .collect();
325
326        futures::stream::iter(values).boxed()
327    }
328
329    /// The memory implementation returns all results, as opposed to the cloud
330    /// versions which limit their results to 1k or more because of API
331    /// limitations.
332    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
333        let root = Path::default();
334        let prefix = prefix.unwrap_or(&root);
335
336        let mut common_prefixes = BTreeSet::new();
337
338        // Only objects in this base level should be returned in the
339        // response. Otherwise, we just collect the common prefixes.
340        let mut objects = vec![];
341        for (k, v) in self.storage.read().map.range((prefix)..) {
342            if !k.as_ref().starts_with(prefix.as_ref()) {
343                break;
344            }
345
346            let mut parts = match k.prefix_match(prefix) {
347                Some(parts) => parts,
348                None => continue,
349            };
350
351            // Pop first element
352            let common_prefix = match parts.next() {
353                Some(p) => p,
354                // Should only return children of the prefix
355                None => continue,
356            };
357
358            if parts.next().is_some() {
359                common_prefixes.insert(prefix.child(common_prefix));
360            } else {
361                let object = ObjectMeta {
362                    location: k.clone(),
363                    last_modified: v.last_modified,
364                    size: v.data.len(),
365                    e_tag: Some(v.e_tag.to_string()),
366                    version: None,
367                };
368                objects.push(object);
369            }
370        }
371
372        Ok(ListResult {
373            objects,
374            common_prefixes: common_prefixes.into_iter().collect(),
375        })
376    }
377
378    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
379        let entry = self.entry(from).await?;
380        self.storage
381            .write()
382            .insert(to, entry.data, entry.attributes);
383        Ok(())
384    }
385
386    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
387        let entry = self.entry(from).await?;
388        let mut storage = self.storage.write();
389        if storage.map.contains_key(to) {
390            return Err(Error::AlreadyExists {
391                path: to.to_string(),
392            }
393            .into());
394        }
395        storage.insert(to, entry.data, entry.attributes);
396        Ok(())
397    }
398}
399
400#[async_trait]
401impl MultipartStore for InMemory {
402    async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
403        let mut storage = self.storage.write();
404        let etag = storage.next_etag;
405        storage.next_etag += 1;
406        storage.uploads.insert(etag, Default::default());
407        Ok(etag.to_string())
408    }
409
410    async fn put_part(
411        &self,
412        _path: &Path,
413        id: &MultipartId,
414        part_idx: usize,
415        payload: PutPayload,
416    ) -> Result<PartId> {
417        let mut storage = self.storage.write();
418        let upload = storage.upload_mut(id)?;
419        if part_idx <= upload.parts.len() {
420            upload.parts.resize(part_idx + 1, None);
421        }
422        upload.parts[part_idx] = Some(payload.into());
423        Ok(PartId {
424            content_id: Default::default(),
425        })
426    }
427
428    async fn complete_multipart(
429        &self,
430        path: &Path,
431        id: &MultipartId,
432        _parts: Vec<PartId>,
433    ) -> Result<PutResult> {
434        let mut storage = self.storage.write();
435        let upload = storage.remove_upload(id)?;
436
437        let mut cap = 0;
438        for (part, x) in upload.parts.iter().enumerate() {
439            cap += x.as_ref().context(MissingPartSnafu { part })?.len();
440        }
441        let mut buf = Vec::with_capacity(cap);
442        for x in &upload.parts {
443            buf.extend_from_slice(x.as_ref().unwrap())
444        }
445        let etag = storage.insert(path, buf.into(), Default::default());
446        Ok(PutResult {
447            e_tag: Some(etag.to_string()),
448            version: None,
449        })
450    }
451
452    async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
453        self.storage.write().remove_upload(id)?;
454        Ok(())
455    }
456}
457
458impl InMemory {
459    /// Create new in-memory storage.
460    pub fn new() -> Self {
461        Self::default()
462    }
463
464    /// Creates a fork of the store, with the current content copied into the
465    /// new store.
466    pub fn fork(&self) -> Self {
467        let storage = self.storage.read();
468        let storage = Arc::new(RwLock::new(storage.clone()));
469        Self { storage }
470    }
471
472    /// Creates a clone of the store
473    #[deprecated(note = "Use fork() instead")]
474    pub async fn clone(&self) -> Self {
475        self.fork()
476    }
477
478    async fn entry(&self, location: &Path) -> Result<Entry> {
479        let storage = self.storage.read();
480        let value = storage
481            .map
482            .get(location)
483            .cloned()
484            .context(NoDataInMemorySnafu {
485                path: location.to_string(),
486            })?;
487
488        Ok(value)
489    }
490}
491
492#[derive(Debug)]
493struct InMemoryUpload {
494    location: Path,
495    attributes: Attributes,
496    parts: Vec<PutPayload>,
497    storage: Arc<RwLock<Storage>>,
498}
499
500#[async_trait]
501impl MultipartUpload for InMemoryUpload {
502    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
503        self.parts.push(payload);
504        Box::pin(futures::future::ready(Ok(())))
505    }
506
507    async fn complete(&mut self) -> Result<PutResult> {
508        let cap = self.parts.iter().map(|x| x.content_length()).sum();
509        let mut buf = Vec::with_capacity(cap);
510        let parts = self.parts.iter().flatten();
511        parts.for_each(|x| buf.extend_from_slice(x));
512        let etag = self.storage.write().insert(
513            &self.location,
514            buf.into(),
515            std::mem::take(&mut self.attributes),
516        );
517
518        Ok(PutResult {
519            e_tag: Some(etag.to_string()),
520            version: None,
521        })
522    }
523
524    async fn abort(&mut self) -> Result<()> {
525        Ok(())
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use crate::integration::*;
532
533    use super::*;
534
535    #[tokio::test]
536    async fn in_memory_test() {
537        let integration = InMemory::new();
538
539        put_get_delete_list(&integration).await;
540        get_opts(&integration).await;
541        list_uses_directories_correctly(&integration).await;
542        list_with_delimiter(&integration).await;
543        rename_and_copy(&integration).await;
544        copy_if_not_exists(&integration).await;
545        stream_get(&integration).await;
546        put_opts(&integration, true).await;
547        multipart(&integration, &integration).await;
548        put_get_attributes(&integration).await;
549    }
550
551    #[tokio::test]
552    async fn box_test() {
553        let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
554
555        put_get_delete_list(&integration).await;
556        get_opts(&integration).await;
557        list_uses_directories_correctly(&integration).await;
558        list_with_delimiter(&integration).await;
559        rename_and_copy(&integration).await;
560        copy_if_not_exists(&integration).await;
561        stream_get(&integration).await;
562    }
563
564    #[tokio::test]
565    async fn arc_test() {
566        let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
567
568        put_get_delete_list(&integration).await;
569        get_opts(&integration).await;
570        list_uses_directories_correctly(&integration).await;
571        list_with_delimiter(&integration).await;
572        rename_and_copy(&integration).await;
573        copy_if_not_exists(&integration).await;
574        stream_get(&integration).await;
575    }
576
577    #[tokio::test]
578    async fn unknown_length() {
579        let integration = InMemory::new();
580
581        let location = Path::from("some_file");
582
583        let data = Bytes::from("arbitrary data");
584
585        integration
586            .put(&location, data.clone().into())
587            .await
588            .unwrap();
589
590        let read_data = integration
591            .get(&location)
592            .await
593            .unwrap()
594            .bytes()
595            .await
596            .unwrap();
597        assert_eq!(&*read_data, data);
598    }
599
600    const NON_EXISTENT_NAME: &str = "nonexistentname";
601
602    #[tokio::test]
603    async fn nonexistent_location() {
604        let integration = InMemory::new();
605
606        let location = Path::from(NON_EXISTENT_NAME);
607
608        let err = get_nonexistent_object(&integration, Some(location))
609            .await
610            .unwrap_err();
611        if let crate::Error::NotFound { path, source } = err {
612            let source_variant = source.downcast_ref::<Error>();
613            assert!(
614                matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
615                "got: {source_variant:?}"
616            );
617            assert_eq!(path, NON_EXISTENT_NAME);
618        } else {
619            panic!("unexpected error type: {err:?}");
620        }
621    }
622}