1use std::collections::{BTreeMap, BTreeSet, HashMap};
20use std::ops::Range;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use futures::{stream::BoxStream, StreamExt};
27use parking_lot::RwLock;
28use snafu::{OptionExt, ResultExt, Snafu};
29
30use crate::multipart::{MultipartStore, PartId};
31use crate::util::InvalidGetRange;
32use crate::{
33 path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
34 MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult,
35 Result, UpdateVersion, UploadPart,
36};
37use crate::{GetOptions, PutPayload};
38
39#[derive(Debug, Snafu)]
41#[allow(missing_docs)]
42enum Error {
43 #[snafu(display("No data in memory found. Location: {path}"))]
44 NoDataInMemory { path: String },
45
46 #[snafu(display("Invalid range: {source}"))]
47 Range { source: InvalidGetRange },
48
49 #[snafu(display("Object already exists at that location: {path}"))]
50 AlreadyExists { path: String },
51
52 #[snafu(display("ETag required for conditional update"))]
53 MissingETag,
54
55 #[snafu(display("MultipartUpload not found: {id}"))]
56 UploadNotFound { id: String },
57
58 #[snafu(display("Missing part at index: {part}"))]
59 MissingPart { part: usize },
60}
61
62impl From<Error> for super::Error {
63 fn from(source: Error) -> Self {
64 match source {
65 Error::NoDataInMemory { ref path } => Self::NotFound {
66 path: path.into(),
67 source: source.into(),
68 },
69 Error::AlreadyExists { ref path } => Self::AlreadyExists {
70 path: path.into(),
71 source: source.into(),
72 },
73 _ => Self::Generic {
74 store: "InMemory",
75 source: Box::new(source),
76 },
77 }
78 }
79}
80
81#[derive(Debug, Default)]
84pub struct InMemory {
85 storage: SharedStorage,
86}
87
88#[derive(Debug, Clone)]
89struct Entry {
90 data: Bytes,
91 last_modified: DateTime<Utc>,
92 attributes: Attributes,
93 e_tag: usize,
94}
95
96impl Entry {
97 fn new(
98 data: Bytes,
99 last_modified: DateTime<Utc>,
100 e_tag: usize,
101 attributes: Attributes,
102 ) -> Self {
103 Self {
104 data,
105 last_modified,
106 e_tag,
107 attributes,
108 }
109 }
110}
111
112#[derive(Debug, Default, Clone)]
113struct Storage {
114 next_etag: usize,
115 map: BTreeMap<Path, Entry>,
116 uploads: HashMap<usize, PartStorage>,
117}
118
119#[derive(Debug, Default, Clone)]
120struct PartStorage {
121 parts: Vec<Option<Bytes>>,
122}
123
124type SharedStorage = Arc<RwLock<Storage>>;
125
126impl Storage {
127 fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
128 let etag = self.next_etag;
129 self.next_etag += 1;
130 let entry = Entry::new(bytes, Utc::now(), etag, attributes);
131 self.overwrite(location, entry);
132 etag
133 }
134
135 fn overwrite(&mut self, location: &Path, entry: Entry) {
136 self.map.insert(location.clone(), entry);
137 }
138
139 fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
140 use std::collections::btree_map;
141 match self.map.entry(location.clone()) {
142 btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
143 path: location.to_string(),
144 }
145 .into()),
146 btree_map::Entry::Vacant(v) => {
147 v.insert(entry);
148 Ok(())
149 }
150 }
151 }
152
153 fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> {
154 match self.map.get_mut(location) {
155 None => Err(crate::Error::Precondition {
157 path: location.to_string(),
158 source: format!("Object at location {location} not found").into(),
159 }),
160 Some(e) => {
161 let existing = e.e_tag.to_string();
162 let expected = v.e_tag.context(MissingETagSnafu)?;
163 if existing == expected {
164 *e = entry;
165 Ok(())
166 } else {
167 Err(crate::Error::Precondition {
168 path: location.to_string(),
169 source: format!("{existing} does not match {expected}").into(),
170 })
171 }
172 }
173 }
174 }
175
176 fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
177 let parts = id
178 .parse()
179 .ok()
180 .and_then(|x| self.uploads.get_mut(&x))
181 .context(UploadNotFoundSnafu { id })?;
182 Ok(parts)
183 }
184
185 fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
186 let parts = id
187 .parse()
188 .ok()
189 .and_then(|x| self.uploads.remove(&x))
190 .context(UploadNotFoundSnafu { id })?;
191 Ok(parts)
192 }
193}
194
195impl std::fmt::Display for InMemory {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 write!(f, "InMemory")
198 }
199}
200
201#[async_trait]
202impl ObjectStore for InMemory {
203 async fn put_opts(
204 &self,
205 location: &Path,
206 payload: PutPayload,
207 opts: PutOptions,
208 ) -> Result<PutResult> {
209 let mut storage = self.storage.write();
210 let etag = storage.next_etag;
211 let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
212
213 match opts.mode {
214 PutMode::Overwrite => storage.overwrite(location, entry),
215 PutMode::Create => storage.create(location, entry)?,
216 PutMode::Update(v) => storage.update(location, v, entry)?,
217 }
218 storage.next_etag += 1;
219
220 Ok(PutResult {
221 e_tag: Some(etag.to_string()),
222 version: None,
223 })
224 }
225
226 async fn put_multipart_opts(
227 &self,
228 location: &Path,
229 opts: PutMultipartOpts,
230 ) -> Result<Box<dyn MultipartUpload>> {
231 Ok(Box::new(InMemoryUpload {
232 location: location.clone(),
233 attributes: opts.attributes,
234 parts: vec![],
235 storage: Arc::clone(&self.storage),
236 }))
237 }
238
239 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
240 let entry = self.entry(location).await?;
241 let e_tag = entry.e_tag.to_string();
242
243 let meta = ObjectMeta {
244 location: location.clone(),
245 last_modified: entry.last_modified,
246 size: entry.data.len(),
247 e_tag: Some(e_tag),
248 version: None,
249 };
250 options.check_preconditions(&meta)?;
251
252 let (range, data) = match options.range {
253 Some(range) => {
254 let r = range.as_range(entry.data.len()).context(RangeSnafu)?;
255 (r.clone(), entry.data.slice(r))
256 }
257 None => (0..entry.data.len(), entry.data),
258 };
259 let stream = futures::stream::once(futures::future::ready(Ok(data)));
260
261 Ok(GetResult {
262 payload: GetResultPayload::Stream(stream.boxed()),
263 attributes: entry.attributes,
264 meta,
265 range,
266 })
267 }
268
269 async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
270 let entry = self.entry(location).await?;
271 ranges
272 .iter()
273 .map(|range| {
274 let r = GetRange::Bounded(range.clone())
275 .as_range(entry.data.len())
276 .context(RangeSnafu)?;
277
278 Ok(entry.data.slice(r))
279 })
280 .collect()
281 }
282
283 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
284 let entry = self.entry(location).await?;
285
286 Ok(ObjectMeta {
287 location: location.clone(),
288 last_modified: entry.last_modified,
289 size: entry.data.len(),
290 e_tag: Some(entry.e_tag.to_string()),
291 version: None,
292 })
293 }
294
295 async fn delete(&self, location: &Path) -> Result<()> {
296 self.storage.write().map.remove(location);
297 Ok(())
298 }
299
300 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
301 let root = Path::default();
302 let prefix = prefix.unwrap_or(&root);
303
304 let storage = self.storage.read();
305 let values: Vec<_> = storage
306 .map
307 .range((prefix)..)
308 .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
309 .filter(|(key, _)| {
310 key.prefix_match(prefix)
312 .map(|mut x| x.next().is_some())
313 .unwrap_or(false)
314 })
315 .map(|(key, value)| {
316 Ok(ObjectMeta {
317 location: key.clone(),
318 last_modified: value.last_modified,
319 size: value.data.len(),
320 e_tag: Some(value.e_tag.to_string()),
321 version: None,
322 })
323 })
324 .collect();
325
326 futures::stream::iter(values).boxed()
327 }
328
329 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
333 let root = Path::default();
334 let prefix = prefix.unwrap_or(&root);
335
336 let mut common_prefixes = BTreeSet::new();
337
338 let mut objects = vec![];
341 for (k, v) in self.storage.read().map.range((prefix)..) {
342 if !k.as_ref().starts_with(prefix.as_ref()) {
343 break;
344 }
345
346 let mut parts = match k.prefix_match(prefix) {
347 Some(parts) => parts,
348 None => continue,
349 };
350
351 let common_prefix = match parts.next() {
353 Some(p) => p,
354 None => continue,
356 };
357
358 if parts.next().is_some() {
359 common_prefixes.insert(prefix.child(common_prefix));
360 } else {
361 let object = ObjectMeta {
362 location: k.clone(),
363 last_modified: v.last_modified,
364 size: v.data.len(),
365 e_tag: Some(v.e_tag.to_string()),
366 version: None,
367 };
368 objects.push(object);
369 }
370 }
371
372 Ok(ListResult {
373 objects,
374 common_prefixes: common_prefixes.into_iter().collect(),
375 })
376 }
377
378 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
379 let entry = self.entry(from).await?;
380 self.storage
381 .write()
382 .insert(to, entry.data, entry.attributes);
383 Ok(())
384 }
385
386 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
387 let entry = self.entry(from).await?;
388 let mut storage = self.storage.write();
389 if storage.map.contains_key(to) {
390 return Err(Error::AlreadyExists {
391 path: to.to_string(),
392 }
393 .into());
394 }
395 storage.insert(to, entry.data, entry.attributes);
396 Ok(())
397 }
398}
399
400#[async_trait]
401impl MultipartStore for InMemory {
402 async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
403 let mut storage = self.storage.write();
404 let etag = storage.next_etag;
405 storage.next_etag += 1;
406 storage.uploads.insert(etag, Default::default());
407 Ok(etag.to_string())
408 }
409
410 async fn put_part(
411 &self,
412 _path: &Path,
413 id: &MultipartId,
414 part_idx: usize,
415 payload: PutPayload,
416 ) -> Result<PartId> {
417 let mut storage = self.storage.write();
418 let upload = storage.upload_mut(id)?;
419 if part_idx <= upload.parts.len() {
420 upload.parts.resize(part_idx + 1, None);
421 }
422 upload.parts[part_idx] = Some(payload.into());
423 Ok(PartId {
424 content_id: Default::default(),
425 })
426 }
427
428 async fn complete_multipart(
429 &self,
430 path: &Path,
431 id: &MultipartId,
432 _parts: Vec<PartId>,
433 ) -> Result<PutResult> {
434 let mut storage = self.storage.write();
435 let upload = storage.remove_upload(id)?;
436
437 let mut cap = 0;
438 for (part, x) in upload.parts.iter().enumerate() {
439 cap += x.as_ref().context(MissingPartSnafu { part })?.len();
440 }
441 let mut buf = Vec::with_capacity(cap);
442 for x in &upload.parts {
443 buf.extend_from_slice(x.as_ref().unwrap())
444 }
445 let etag = storage.insert(path, buf.into(), Default::default());
446 Ok(PutResult {
447 e_tag: Some(etag.to_string()),
448 version: None,
449 })
450 }
451
452 async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
453 self.storage.write().remove_upload(id)?;
454 Ok(())
455 }
456}
457
458impl InMemory {
459 pub fn new() -> Self {
461 Self::default()
462 }
463
464 pub fn fork(&self) -> Self {
467 let storage = self.storage.read();
468 let storage = Arc::new(RwLock::new(storage.clone()));
469 Self { storage }
470 }
471
472 #[deprecated(note = "Use fork() instead")]
474 pub async fn clone(&self) -> Self {
475 self.fork()
476 }
477
478 async fn entry(&self, location: &Path) -> Result<Entry> {
479 let storage = self.storage.read();
480 let value = storage
481 .map
482 .get(location)
483 .cloned()
484 .context(NoDataInMemorySnafu {
485 path: location.to_string(),
486 })?;
487
488 Ok(value)
489 }
490}
491
492#[derive(Debug)]
493struct InMemoryUpload {
494 location: Path,
495 attributes: Attributes,
496 parts: Vec<PutPayload>,
497 storage: Arc<RwLock<Storage>>,
498}
499
500#[async_trait]
501impl MultipartUpload for InMemoryUpload {
502 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
503 self.parts.push(payload);
504 Box::pin(futures::future::ready(Ok(())))
505 }
506
507 async fn complete(&mut self) -> Result<PutResult> {
508 let cap = self.parts.iter().map(|x| x.content_length()).sum();
509 let mut buf = Vec::with_capacity(cap);
510 let parts = self.parts.iter().flatten();
511 parts.for_each(|x| buf.extend_from_slice(x));
512 let etag = self.storage.write().insert(
513 &self.location,
514 buf.into(),
515 std::mem::take(&mut self.attributes),
516 );
517
518 Ok(PutResult {
519 e_tag: Some(etag.to_string()),
520 version: None,
521 })
522 }
523
524 async fn abort(&mut self) -> Result<()> {
525 Ok(())
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use crate::integration::*;
532
533 use super::*;
534
535 #[tokio::test]
536 async fn in_memory_test() {
537 let integration = InMemory::new();
538
539 put_get_delete_list(&integration).await;
540 get_opts(&integration).await;
541 list_uses_directories_correctly(&integration).await;
542 list_with_delimiter(&integration).await;
543 rename_and_copy(&integration).await;
544 copy_if_not_exists(&integration).await;
545 stream_get(&integration).await;
546 put_opts(&integration, true).await;
547 multipart(&integration, &integration).await;
548 put_get_attributes(&integration).await;
549 }
550
551 #[tokio::test]
552 async fn box_test() {
553 let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
554
555 put_get_delete_list(&integration).await;
556 get_opts(&integration).await;
557 list_uses_directories_correctly(&integration).await;
558 list_with_delimiter(&integration).await;
559 rename_and_copy(&integration).await;
560 copy_if_not_exists(&integration).await;
561 stream_get(&integration).await;
562 }
563
564 #[tokio::test]
565 async fn arc_test() {
566 let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
567
568 put_get_delete_list(&integration).await;
569 get_opts(&integration).await;
570 list_uses_directories_correctly(&integration).await;
571 list_with_delimiter(&integration).await;
572 rename_and_copy(&integration).await;
573 copy_if_not_exists(&integration).await;
574 stream_get(&integration).await;
575 }
576
577 #[tokio::test]
578 async fn unknown_length() {
579 let integration = InMemory::new();
580
581 let location = Path::from("some_file");
582
583 let data = Bytes::from("arbitrary data");
584
585 integration
586 .put(&location, data.clone().into())
587 .await
588 .unwrap();
589
590 let read_data = integration
591 .get(&location)
592 .await
593 .unwrap()
594 .bytes()
595 .await
596 .unwrap();
597 assert_eq!(&*read_data, data);
598 }
599
600 const NON_EXISTENT_NAME: &str = "nonexistentname";
601
602 #[tokio::test]
603 async fn nonexistent_location() {
604 let integration = InMemory::new();
605
606 let location = Path::from(NON_EXISTENT_NAME);
607
608 let err = get_nonexistent_object(&integration, Some(location))
609 .await
610 .unwrap_err();
611 if let crate::Error::NotFound { path, source } = err {
612 let source_variant = source.downcast_ref::<Error>();
613 assert!(
614 matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
615 "got: {source_variant:?}"
616 );
617 assert_eq!(path, NON_EXISTENT_NAME);
618 } else {
619 panic!("unexpected error type: {err:?}");
620 }
621 }
622}