object_store/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
19#![warn(
20    missing_copy_implementations,
21    missing_debug_implementations,
22    missing_docs,
23    clippy::explicit_iter_loop,
24    clippy::future_not_send,
25    clippy::use_self,
26    clippy::clone_on_ref_ptr
27)]
28
29//! # object_store
30//!
31//! This crate provides a uniform API for interacting with object
32//! storage services and local files via the [`ObjectStore`]
33//! trait.
34//!
35//! Using this crate, the same binary and code can run in multiple
36//! clouds and local test environments, via a simple runtime
37//! configuration change.
38//!
39//! # Highlights
40//!
41//! 1. A high-performance async API focused on providing a consistent interface
42//! mirroring that of object stores such as [S3]
43//!
44//! 2. Production quality, leading this crate to be used in large
45//! scale production systems, such as [crates.io] and [InfluxDB IOx]
46//!
47//! 3. Support for advanced functionality, including atomic, conditional reads
48//! and writes, vectored IO, bulk deletion, and more...
49//!
50//! 4. Stable and predictable governance via the [Apache Arrow] project
51//!
52//! 5. Small dependency footprint, depending on only a small number of common crates
53//!
54//! Originally developed by [InfluxData] and subsequently donated
55//! to [Apache Arrow].
56//!
57//! [Apache Arrow]: https://arrow.apache.org/
58//! [InfluxData]: https://www.influxdata.com/
59//! [crates.io]: https://github.com/rust-lang/crates.io
60//! [ACID]: https://en.wikipedia.org/wiki/ACID
61//! [S3]: https://aws.amazon.com/s3/
62//!
63//! # Available [`ObjectStore`] Implementations
64//!
65//! By default, this crate provides the following implementations:
66//!
67//! * Memory: [`InMemory`](memory::InMemory)
68//! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
69//!
70//! Feature flags are used to enable support for other implementations:
71//!
72#![cfg_attr(
73    feature = "gcp",
74    doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
75)]
76#![cfg_attr(
77    feature = "aws",
78    doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
79)]
80#![cfg_attr(
81    feature = "azure",
82    doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
83)]
84#![cfg_attr(
85    feature = "http",
86    doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
87)]
88//!
89//! # Why not a Filesystem Interface?
90//!
91//! The [`ObjectStore`] interface is designed to mirror the APIs
92//! of object stores and *not* filesystems, and thus has stateless APIs instead
93//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
94//!
95//! This design provides the following advantages:
96//!
97//! * All operations are atomic, and readers cannot observe partial and/or failed writes
98//! * Methods map directly to object store APIs, providing both efficiency and predictability
99//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
100//! * Allows for functionality not native to filesystems, such as operation preconditions
101//! and atomic multipart uploads
102//!
103//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
104//! which provide a more filesystem-like API for working with the
105//! [`ObjectStore`] trait, however, they should be used with care
106//!
107//! [`BufReader`]: buffered::BufReader
108//! [`BufWriter`]: buffered::BufWriter
109//!
110//! # Adapters
111//!
112//! [`ObjectStore`] instances can be composed with various adapters
113//! which add additional functionality:
114//!
115//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
116//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
117//!
118//! # Configuration System
119//!
120//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
121//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
122//! from a URL and an optional list of key value pairs. This provides a flexible interface
123//! to support a wide variety of user-defined store configurations, with minimal additional
124//! application complexity.
125//!
126//! ```no_run
127//! # #[cfg(feature = "aws")] {
128//! # use url::Url;
129//! # use object_store::{parse_url, parse_url_opts};
130//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
131//! #
132//! #
133//! // Can manually create a specific store variant using the appropriate builder
134//! let store: AmazonS3 = AmazonS3Builder::from_env()
135//!     .with_bucket_name("my-bucket").build().unwrap();
136//!
137//! // Alternatively can create an ObjectStore from an S3 URL
138//! let url = Url::parse("s3://bucket/path").unwrap();
139//! let (store, path) = parse_url(&url).unwrap();
140//! assert_eq!(path.as_ref(), "path");
141//!
142//! // Potentially with additional options
143//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
144//!
145//! // Or with URLs that encode the bucket name in the URL path
146//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
147//! let (store, path) = parse_url(&url).unwrap();
148//! assert_eq!(path.as_ref(), "path");
149//! # }
150//! ```
151//!
152//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
153//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
154//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
155//!
156//! # List objects
157//!
158//! Use the [`ObjectStore::list`] method to iterate over objects in
159//! remote storage or files in the local filesystem:
160//!
161//! ```
162//! # use object_store::local::LocalFileSystem;
163//! # use std::sync::Arc;
164//! # use object_store::{path::Path, ObjectStore};
165//! # use futures::stream::StreamExt;
166//! # // use LocalFileSystem for example
167//! # fn get_object_store() -> Arc<dyn ObjectStore> {
168//! #   Arc::new(LocalFileSystem::new())
169//! # }
170//! #
171//! # async fn example() {
172//! #
173//! // create an ObjectStore
174//! let object_store: Arc<dyn ObjectStore> = get_object_store();
175//!
176//! // Recursively list all files below the 'data' path.
177//! // 1. On AWS S3 this would be the 'data/' prefix
178//! // 2. On a local filesystem, this would be the 'data' directory
179//! let prefix = Path::from("data");
180//!
181//! // Get an `async` stream of Metadata objects:
182//! let mut list_stream = object_store.list(Some(&prefix));
183//!
184//! // Print a line about each object
185//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
186//!     println!("Name: {}, size: {}", meta.location, meta.size);
187//! }
188//! # }
189//! ```
190//!
191//! Which will print out something like the following:
192//!
193//! ```text
194//! Name: data/file01.parquet, size: 112832
195//! Name: data/file02.parquet, size: 143119
196//! Name: data/child/file03.parquet, size: 100
197//! ...
198//! ```
199//!
200//! # Fetch objects
201//!
202//! Use the [`ObjectStore::get`] method to fetch the data bytes
203//! from remote storage or files in the local filesystem as a stream.
204//!
205//! ```
206//! # use futures::TryStreamExt;
207//! # use object_store::local::LocalFileSystem;
208//! # use std::sync::Arc;
209//! #  use bytes::Bytes;
210//! # use object_store::{path::Path, ObjectStore, GetResult};
211//! # fn get_object_store() -> Arc<dyn ObjectStore> {
212//! #   Arc::new(LocalFileSystem::new())
213//! # }
214//! #
215//! # async fn example() {
216//! #
217//! // Create an ObjectStore
218//! let object_store: Arc<dyn ObjectStore> = get_object_store();
219//!
220//! // Retrieve a specific file
221//! let path = Path::from("data/file01.parquet");
222//!
223//! // Fetch just the file metadata
224//! let meta = object_store.head(&path).await.unwrap();
225//! println!("{meta:?}");
226//!
227//! // Fetch the object including metadata
228//! let result: GetResult = object_store.get(&path).await.unwrap();
229//! assert_eq!(result.meta, meta);
230//!
231//! // Buffer the entire object in memory
232//! let object: Bytes = result.bytes().await.unwrap();
233//! assert_eq!(object.len(), meta.size);
234//!
235//! // Alternatively stream the bytes from object storage
236//! let stream = object_store.get(&path).await.unwrap().into_stream();
237//!
238//! // Count the '0's using `try_fold` from `TryStreamExt` trait
239//! let num_zeros = stream
240//!     .try_fold(0, |acc, bytes| async move {
241//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
242//!     }).await.unwrap();
243//!
244//! println!("Num zeros in {} is {}", path, num_zeros);
245//! # }
246//! ```
247//!
248//! # Put Object
249//!
250//! Use the [`ObjectStore::put`] method to atomically write data.
251//!
252//! ```
253//! # use object_store::local::LocalFileSystem;
254//! # use object_store::{ObjectStore, PutPayload};
255//! # use std::sync::Arc;
256//! # use object_store::path::Path;
257//! # fn get_object_store() -> Arc<dyn ObjectStore> {
258//! #   Arc::new(LocalFileSystem::new())
259//! # }
260//! # async fn put() {
261//! #
262//! let object_store: Arc<dyn ObjectStore> = get_object_store();
263//! let path = Path::from("data/file1");
264//! let payload = PutPayload::from_static(b"hello");
265//! object_store.put(&path, payload).await.unwrap();
266//! # }
267//! ```
268//!
269//! # Multipart Upload
270//!
271//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data
272//!
273//! ```
274//! # use object_store::local::LocalFileSystem;
275//! # use object_store::{ObjectStore, WriteMultipart};
276//! # use std::sync::Arc;
277//! # use bytes::Bytes;
278//! # use tokio::io::AsyncWriteExt;
279//! # use object_store::path::Path;
280//! # fn get_object_store() -> Arc<dyn ObjectStore> {
281//! #   Arc::new(LocalFileSystem::new())
282//! # }
283//! # async fn multi_upload() {
284//! #
285//! let object_store: Arc<dyn ObjectStore> = get_object_store();
286//! let path = Path::from("data/large_file");
287//! let upload =  object_store.put_multipart(&path).await.unwrap();
288//! let mut write = WriteMultipart::new(upload);
289//! write.write(b"hello");
290//! write.finish().await.unwrap();
291//! # }
292//! ```
293//!
294//! # Vectored Read
295//!
296//! A common pattern, especially when reading structured datasets, is to need to fetch
297//! multiple, potentially non-contiguous, ranges of a particular object.
298//!
299//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
300//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
301//!
302//! ```
303//! # use object_store::local::LocalFileSystem;
304//! # use object_store::ObjectStore;
305//! # use std::sync::Arc;
306//! # use bytes::Bytes;
307//! # use tokio::io::AsyncWriteExt;
308//! # use object_store::path::Path;
309//! # fn get_object_store() -> Arc<dyn ObjectStore> {
310//! #   Arc::new(LocalFileSystem::new())
311//! # }
312//! # async fn multi_upload() {
313//! #
314//! let object_store: Arc<dyn ObjectStore> = get_object_store();
315//! let path = Path::from("data/large_file");
316//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
317//! assert_eq!(ranges.len(), 3);
318//! assert_eq!(ranges[0].len(), 10);
319//! # }
320//! ```
321//!
322//! # Vectored Write
323//!
324//! When writing data it is often the case that the size of the output is not known ahead of time.
325//!
326//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
327//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
328//! this is suboptimal as reallocating memory will often involve copying it to a new location.
329//!
330//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
331//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
332//! encapsulates this approach
333//!
334//! ```
335//! # use object_store::local::LocalFileSystem;
336//! # use object_store::{ObjectStore, PutPayloadMut};
337//! # use std::sync::Arc;
338//! # use bytes::Bytes;
339//! # use tokio::io::AsyncWriteExt;
340//! # use object_store::path::Path;
341//! # fn get_object_store() -> Arc<dyn ObjectStore> {
342//! #   Arc::new(LocalFileSystem::new())
343//! # }
344//! # async fn multi_upload() {
345//! #
346//! let object_store: Arc<dyn ObjectStore> = get_object_store();
347//! let path = Path::from("data/large_file");
348//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
349//! for _ in 0..22 {
350//!     buffer.extend_from_slice(&[0; 1024]);
351//! }
352//! let payload = buffer.freeze();
353//!
354//! // Payload consists of 3 separate 8KB allocations
355//! assert_eq!(payload.as_ref().len(), 3);
356//! assert_eq!(payload.as_ref()[0].len(), 8192);
357//! assert_eq!(payload.as_ref()[1].len(), 8192);
358//! assert_eq!(payload.as_ref()[2].len(), 6144);
359//!
360//! object_store.put(&path, payload).await.unwrap();
361//! # }
362//! ```
363//!
364//! # Conditional Fetch
365//!
366//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
367//!
368//! For example, efficiently refreshing a cache without re-fetching the entire object
369//! data if the object hasn't been modified.
370//!
371//! ```
372//! # use std::collections::btree_map::Entry;
373//! # use std::collections::HashMap;
374//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
375//! # use std::sync::Arc;
376//! # use std::time::{Duration, Instant};
377//! # use bytes::Bytes;
378//! # use tokio::io::AsyncWriteExt;
379//! # use object_store::path::Path;
380//! struct CacheEntry {
381//!     /// Data returned by last request
382//!     data: Bytes,
383//!     /// ETag identifying the object returned by the server
384//!     e_tag: String,
385//!     /// Instant of last refresh
386//!     refreshed_at: Instant,
387//! }
388//!
389//! /// Example cache that checks entries after 10 seconds for a new version
390//! struct Cache {
391//!     entries: HashMap<Path, CacheEntry>,
392//!     store: Arc<dyn ObjectStore>,
393//! }
394//!
395//! impl Cache {
396//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
397//!         Ok(match self.entries.get_mut(path) {
398//!             Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
399//!                 true => e.data.clone(), // Return cached data
400//!                 false => { // Check if remote version has changed
401//!                     let opts = GetOptions {
402//!                         if_none_match: Some(e.e_tag.clone()),
403//!                         ..GetOptions::default()
404//!                     };
405//!                     match self.store.get_opts(&path, opts).await {
406//!                         Ok(d) => e.data = d.bytes().await?,
407//!                         Err(Error::NotModified { .. }) => {} // Data has not changed
408//!                         Err(e) => return Err(e),
409//!                     };
410//!                     e.refreshed_at = Instant::now();
411//!                     e.data.clone()
412//!                 }
413//!             },
414//!             None => { // Not cached, fetch data
415//!                 let get = self.store.get(&path).await?;
416//!                 let e_tag = get.meta.e_tag.clone();
417//!                 let data = get.bytes().await?;
418//!                 if let Some(e_tag) = e_tag {
419//!                     let entry = CacheEntry {
420//!                         e_tag,
421//!                         data: data.clone(),
422//!                         refreshed_at: Instant::now(),
423//!                     };
424//!                     self.entries.insert(path.clone(), entry);
425//!                 }
426//!                 data
427//!             }
428//!         })
429//!     }
430//! }
431//! ```
432//!
433//! # Conditional Put
434//!
435//! The default behaviour when writing data is to upsert any existing object at the given path,
436//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
437//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
438//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
439//! storage, without relying on a separate DBMS.
440//!
441//! ```
442//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
443//! # use std::sync::Arc;
444//! # use bytes::Bytes;
445//! # use tokio::io::AsyncWriteExt;
446//! # use object_store::memory::InMemory;
447//! # use object_store::path::Path;
448//! # fn get_object_store() -> Arc<dyn ObjectStore> {
449//! #   Arc::new(InMemory::new())
450//! # }
451//! # fn do_update(b: Bytes) -> Bytes {b}
452//! # async fn conditional_put() {
453//! let store = get_object_store();
454//! let path = Path::from("test");
455//!
456//! // Perform a conditional update on path
457//! loop {
458//!     // Perform get request
459//!     let r = store.get(&path).await.unwrap();
460//!
461//!     // Save version information fetched
462//!     let version = UpdateVersion {
463//!         e_tag: r.meta.e_tag.clone(),
464//!         version: r.meta.version.clone(),
465//!     };
466//!
467//!     // Compute new version of object contents
468//!     let new = do_update(r.bytes().await.unwrap());
469//!
470//!     // Attempt to commit transaction
471//!     match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
472//!         Ok(_) => break, // Successfully committed
473//!         Err(Error::Precondition { .. }) => continue, // Object has changed, try again
474//!         Err(e) => panic!("{e}")
475//!     }
476//! }
477//! # }
478//! ```
479//!
480//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
481//! [Apache Iceberg]: https://iceberg.apache.org/
482//! [Delta Lake]: https://delta.io/
483//!
484//! # TLS Certificates
485//!
486//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA]
487//! certificates. By default the system-bundled certificates are used (see
488//! [`rustls-native-certs`]). The `tls-webpki-roots` feature switch can be used to also bundle Mozilla's
489//! root certificates with the library/application (see [`webpki-roots`]).
490//!
491//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
492//! [`rustls-native-certs`]: https://crates.io/crates/rustls-native-certs/
493//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
494//!
495
496#[cfg(all(
497    target_arch = "wasm32",
498    any(feature = "gcp", feature = "aws", feature = "azure", feature = "http")
499))]
500compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm.");
501
502#[cfg(feature = "aws")]
503pub mod aws;
504#[cfg(feature = "azure")]
505pub mod azure;
506pub mod buffered;
507#[cfg(not(target_arch = "wasm32"))]
508pub mod chunked;
509pub mod delimited;
510#[cfg(feature = "gcp")]
511pub mod gcp;
512#[cfg(feature = "http")]
513pub mod http;
514pub mod limit;
515#[cfg(not(target_arch = "wasm32"))]
516pub mod local;
517pub mod memory;
518pub mod path;
519pub mod prefix;
520#[cfg(feature = "cloud")]
521pub mod signer;
522pub mod throttle;
523
524#[cfg(feature = "cloud")]
525mod client;
526
527#[cfg(feature = "cloud")]
528pub use client::{
529    backoff::BackoffConfig, retry::RetryConfig, ClientConfigKey, ClientOptions, CredentialProvider,
530    StaticCredentialProvider,
531};
532
533#[cfg(feature = "cloud")]
534mod config;
535
536mod tags;
537
538pub use tags::TagSet;
539
540pub mod multipart;
541mod parse;
542mod payload;
543mod upload;
544mod util;
545
546mod attributes;
547
548#[cfg(any(feature = "integration", test))]
549pub mod integration;
550
551pub use attributes::*;
552
553pub use parse::{parse_url, parse_url_opts, ObjectStoreScheme};
554pub use payload::*;
555pub use upload::*;
556pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT};
557
558use crate::path::Path;
559#[cfg(not(target_arch = "wasm32"))]
560use crate::util::maybe_spawn_blocking;
561use async_trait::async_trait;
562use bytes::Bytes;
563use chrono::{DateTime, Utc};
564use futures::{stream::BoxStream, StreamExt, TryStreamExt};
565use snafu::Snafu;
566use std::fmt::{Debug, Formatter};
567#[cfg(not(target_arch = "wasm32"))]
568use std::io::{Read, Seek, SeekFrom};
569use std::ops::Range;
570use std::sync::Arc;
571
572/// An alias for a dynamically dispatched object store implementation.
573pub type DynObjectStore = dyn ObjectStore;
574
575/// Id type for multipart uploads.
576pub type MultipartId = String;
577
578/// Universal API to multiple object store services.
579#[async_trait]
580pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
581    /// Save the provided bytes to the specified location
582    ///
583    /// The operation is guaranteed to be atomic, it will either successfully
584    /// write the entirety of `payload` to `location`, or fail. No clients
585    /// should be able to observe a partially written object
586    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
587        self.put_opts(location, payload, PutOptions::default())
588            .await
589    }
590
591    /// Save the provided `payload` to `location` with the given options
592    async fn put_opts(
593        &self,
594        location: &Path,
595        payload: PutPayload,
596        opts: PutOptions,
597    ) -> Result<PutResult>;
598
599    /// Perform a multipart upload
600    ///
601    /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
602    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
603    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
604        self.put_multipart_opts(location, PutMultipartOpts::default())
605            .await
606    }
607
608    /// Perform a multipart upload with options
609    ///
610    /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
611    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
612    async fn put_multipart_opts(
613        &self,
614        location: &Path,
615        opts: PutMultipartOpts,
616    ) -> Result<Box<dyn MultipartUpload>>;
617
618    /// Return the bytes that are stored at the specified location.
619    async fn get(&self, location: &Path) -> Result<GetResult> {
620        self.get_opts(location, GetOptions::default()).await
621    }
622
623    /// Perform a get request with options
624    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
625
626    /// Return the bytes that are stored at the specified location
627    /// in the given byte range.
628    ///
629    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
630    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
631        let options = GetOptions {
632            range: Some(range.into()),
633            ..Default::default()
634        };
635        self.get_opts(location, options).await?.bytes().await
636    }
637
638    /// Return the bytes that are stored at the specified location
639    /// in the given byte ranges
640    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
641        coalesce_ranges(
642            ranges,
643            |range| self.get_range(location, range),
644            OBJECT_STORE_COALESCE_DEFAULT,
645        )
646        .await
647    }
648
649    /// Return the metadata for the specified location
650    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
651        let options = GetOptions {
652            head: true,
653            ..Default::default()
654        };
655        Ok(self.get_opts(location, options).await?.meta)
656    }
657
658    /// Delete the object at the specified location.
659    async fn delete(&self, location: &Path) -> Result<()>;
660
661    /// Delete all the objects at the specified locations
662    ///
663    /// When supported, this method will use bulk operations that delete more
664    /// than one object per a request. The default implementation will call
665    /// the single object delete method for each location, but with up to 10
666    /// concurrent requests.
667    ///
668    /// The returned stream yields the results of the delete operations in the
669    /// same order as the input locations. However, some errors will be from
670    /// an overall call to a bulk delete operation, and not from a specific
671    /// location.
672    ///
673    /// If the object did not exist, the result may be an error or a success,
674    /// depending on the behavior of the underlying store. For example, local
675    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
676    /// return Ok. If it is an error, it will be [`Error::NotFound`].
677    ///
678    /// ```
679    /// # use futures::{StreamExt, TryStreamExt};
680    /// # use object_store::local::LocalFileSystem;
681    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
682    /// # let root = tempfile::TempDir::new().unwrap();
683    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
684    /// # use object_store::{ObjectStore, ObjectMeta};
685    /// # use object_store::path::Path;
686    /// # use futures::{StreamExt, TryStreamExt};
687    /// #
688    /// // Create two objects
689    /// store.put(&Path::from("foo"), "foo".into()).await?;
690    /// store.put(&Path::from("bar"), "bar".into()).await?;
691    ///
692    /// // List object
693    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
694    ///
695    /// // Delete them
696    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
697    /// # Ok(())
698    /// # }
699    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
700    /// # rt.block_on(example()).unwrap();
701    /// ```
702    fn delete_stream<'a>(
703        &'a self,
704        locations: BoxStream<'a, Result<Path>>,
705    ) -> BoxStream<'a, Result<Path>> {
706        locations
707            .map(|location| async {
708                let location = location?;
709                self.delete(&location).await?;
710                Ok(location)
711            })
712            .buffered(10)
713            .boxed()
714    }
715
716    /// List all the objects with the given prefix.
717    ///
718    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
719    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
720    ///
721    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
722    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
723
724    /// List all the objects with the given prefix and a location greater than `offset`
725    ///
726    /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
727    /// the number of network requests required
728    ///
729    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
730    fn list_with_offset(
731        &self,
732        prefix: Option<&Path>,
733        offset: &Path,
734    ) -> BoxStream<'_, Result<ObjectMeta>> {
735        let offset = offset.clone();
736        self.list(prefix)
737            .try_filter(move |f| futures::future::ready(f.location > offset))
738            .boxed()
739    }
740
741    /// List objects with the given prefix and an implementation specific
742    /// delimiter. Returns common prefixes (directories) in addition to object
743    /// metadata.
744    ///
745    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
746    /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
747    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
748
749    /// Copy an object from one path to another in the same object store.
750    ///
751    /// If there exists an object at the destination, it will be overwritten.
752    async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
753
754    /// Move an object from one path to another in the same object store.
755    ///
756    /// By default, this is implemented as a copy and then delete source. It may not
757    /// check when deleting source that it was the same object that was originally copied.
758    ///
759    /// If there exists an object at the destination, it will be overwritten.
760    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
761        self.copy(from, to).await?;
762        self.delete(from).await
763    }
764
765    /// Copy an object from one path to another, only if destination is empty.
766    ///
767    /// Will return an error if the destination already has an object.
768    ///
769    /// Performs an atomic operation if the underlying object storage supports it.
770    /// If atomic operations are not supported by the underlying object storage (like S3)
771    /// it will return an error.
772    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
773
774    /// Move an object from one path to another in the same object store.
775    ///
776    /// Will return an error if the destination already has an object.
777    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
778        self.copy_if_not_exists(from, to).await?;
779        self.delete(from).await
780    }
781}
782
783macro_rules! as_ref_impl {
784    ($type:ty) => {
785        #[async_trait]
786        impl ObjectStore for $type {
787            async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
788                self.as_ref().put(location, payload).await
789            }
790
791            async fn put_opts(
792                &self,
793                location: &Path,
794                payload: PutPayload,
795                opts: PutOptions,
796            ) -> Result<PutResult> {
797                self.as_ref().put_opts(location, payload, opts).await
798            }
799
800            async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
801                self.as_ref().put_multipart(location).await
802            }
803
804            async fn put_multipart_opts(
805                &self,
806                location: &Path,
807                opts: PutMultipartOpts,
808            ) -> Result<Box<dyn MultipartUpload>> {
809                self.as_ref().put_multipart_opts(location, opts).await
810            }
811
812            async fn get(&self, location: &Path) -> Result<GetResult> {
813                self.as_ref().get(location).await
814            }
815
816            async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
817                self.as_ref().get_opts(location, options).await
818            }
819
820            async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
821                self.as_ref().get_range(location, range).await
822            }
823
824            async fn get_ranges(
825                &self,
826                location: &Path,
827                ranges: &[Range<usize>],
828            ) -> Result<Vec<Bytes>> {
829                self.as_ref().get_ranges(location, ranges).await
830            }
831
832            async fn head(&self, location: &Path) -> Result<ObjectMeta> {
833                self.as_ref().head(location).await
834            }
835
836            async fn delete(&self, location: &Path) -> Result<()> {
837                self.as_ref().delete(location).await
838            }
839
840            fn delete_stream<'a>(
841                &'a self,
842                locations: BoxStream<'a, Result<Path>>,
843            ) -> BoxStream<'a, Result<Path>> {
844                self.as_ref().delete_stream(locations)
845            }
846
847            fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
848                self.as_ref().list(prefix)
849            }
850
851            fn list_with_offset(
852                &self,
853                prefix: Option<&Path>,
854                offset: &Path,
855            ) -> BoxStream<'_, Result<ObjectMeta>> {
856                self.as_ref().list_with_offset(prefix, offset)
857            }
858
859            async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
860                self.as_ref().list_with_delimiter(prefix).await
861            }
862
863            async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
864                self.as_ref().copy(from, to).await
865            }
866
867            async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
868                self.as_ref().rename(from, to).await
869            }
870
871            async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
872                self.as_ref().copy_if_not_exists(from, to).await
873            }
874
875            async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
876                self.as_ref().rename_if_not_exists(from, to).await
877            }
878        }
879    };
880}
881
882as_ref_impl!(Arc<dyn ObjectStore>);
883as_ref_impl!(Box<dyn ObjectStore>);
884
885/// Result of a list call that includes objects, prefixes (directories) and a
886/// token for the next set of results. Individual result sets may be limited to
887/// 1,000 objects based on the underlying object storage's limitations.
888#[derive(Debug)]
889pub struct ListResult {
890    /// Prefixes that are common (like directories)
891    pub common_prefixes: Vec<Path>,
892    /// Object metadata for the listing
893    pub objects: Vec<ObjectMeta>,
894}
895
896/// The metadata that describes an object.
897#[derive(Debug, Clone, PartialEq, Eq)]
898pub struct ObjectMeta {
899    /// The full path to the object
900    pub location: Path,
901    /// The last modified time
902    pub last_modified: DateTime<Utc>,
903    /// The size in bytes of the object
904    pub size: usize,
905    /// The unique identifier for the object
906    ///
907    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
908    pub e_tag: Option<String>,
909    /// A version indicator for this object
910    pub version: Option<String>,
911}
912
913/// Options for a get request, such as range
914#[derive(Debug, Default)]
915pub struct GetOptions {
916    /// Request will succeed if the `ObjectMeta::e_tag` matches
917    /// otherwise returning [`Error::Precondition`]
918    ///
919    /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
920    ///
921    /// Examples:
922    ///
923    /// ```text
924    /// If-Match: "xyzzy"
925    /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
926    /// If-Match: *
927    /// ```
928    pub if_match: Option<String>,
929    /// Request will succeed if the `ObjectMeta::e_tag` does not match
930    /// otherwise returning [`Error::NotModified`]
931    ///
932    /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
933    ///
934    /// Examples:
935    ///
936    /// ```text
937    /// If-None-Match: "xyzzy"
938    /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
939    /// If-None-Match: *
940    /// ```
941    pub if_none_match: Option<String>,
942    /// Request will succeed if the object has been modified since
943    ///
944    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
945    pub if_modified_since: Option<DateTime<Utc>>,
946    /// Request will succeed if the object has not been modified since
947    /// otherwise returning [`Error::Precondition`]
948    ///
949    /// Some stores, such as S3, will only return `NotModified` for exact
950    /// timestamp matches, instead of for any timestamp greater than or equal.
951    ///
952    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
953    pub if_unmodified_since: Option<DateTime<Utc>>,
954    /// Request transfer of only the specified range of bytes
955    /// otherwise returning [`Error::NotModified`]
956    ///
957    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
958    pub range: Option<GetRange>,
959    /// Request a particular object version
960    pub version: Option<String>,
961    /// Request transfer of no content
962    ///
963    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
964    pub head: bool,
965}
966
967impl GetOptions {
968    /// Returns an error if the modification conditions on this request are not satisfied
969    ///
970    /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
971    fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
972        // The use of the invalid etag "*" means no ETag is equivalent to never matching
973        let etag = meta.e_tag.as_deref().unwrap_or("*");
974        let last_modified = meta.last_modified;
975
976        if let Some(m) = &self.if_match {
977            if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
978                return Err(Error::Precondition {
979                    path: meta.location.to_string(),
980                    source: format!("{etag} does not match {m}").into(),
981                });
982            }
983        } else if let Some(date) = self.if_unmodified_since {
984            if last_modified > date {
985                return Err(Error::Precondition {
986                    path: meta.location.to_string(),
987                    source: format!("{date} < {last_modified}").into(),
988                });
989            }
990        }
991
992        if let Some(m) = &self.if_none_match {
993            if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
994                return Err(Error::NotModified {
995                    path: meta.location.to_string(),
996                    source: format!("{etag} matches {m}").into(),
997                });
998            }
999        } else if let Some(date) = self.if_modified_since {
1000            if last_modified <= date {
1001                return Err(Error::NotModified {
1002                    path: meta.location.to_string(),
1003                    source: format!("{date} >= {last_modified}").into(),
1004                });
1005            }
1006        }
1007        Ok(())
1008    }
1009}
1010
1011/// Result for a get request
1012#[derive(Debug)]
1013pub struct GetResult {
1014    /// The [`GetResultPayload`]
1015    pub payload: GetResultPayload,
1016    /// The [`ObjectMeta`] for this object
1017    pub meta: ObjectMeta,
1018    /// The range of bytes returned by this request
1019    pub range: Range<usize>,
1020    /// Additional object attributes
1021    pub attributes: Attributes,
1022}
1023
1024/// The kind of a [`GetResult`]
1025///
1026/// This special cases the case of a local file, as some systems may
1027/// be able to optimise the case of a file already present on local disk
1028pub enum GetResultPayload {
1029    /// The file, path
1030    File(std::fs::File, std::path::PathBuf),
1031    /// An opaque stream of bytes
1032    Stream(BoxStream<'static, Result<Bytes>>),
1033}
1034
1035impl Debug for GetResultPayload {
1036    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037        match self {
1038            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1039            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1040        }
1041    }
1042}
1043
1044impl GetResult {
1045    /// Collects the data into a [`Bytes`]
1046    pub async fn bytes(self) -> Result<Bytes> {
1047        let len = self.range.end - self.range.start;
1048        match self.payload {
1049            #[cfg(not(target_arch = "wasm32"))]
1050            GetResultPayload::File(mut file, path) => {
1051                maybe_spawn_blocking(move || {
1052                    file.seek(SeekFrom::Start(self.range.start as _))
1053                        .map_err(|source| local::Error::Seek {
1054                            source,
1055                            path: path.clone(),
1056                        })?;
1057
1058                    let mut buffer = Vec::with_capacity(len);
1059                    file.take(len as _)
1060                        .read_to_end(&mut buffer)
1061                        .map_err(|source| local::Error::UnableToReadBytes { source, path })?;
1062
1063                    Ok(buffer.into())
1064                })
1065                .await
1066            }
1067            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1068            #[cfg(target_arch = "wasm32")]
1069            _ => unimplemented!("File IO not implemented on wasm32."),
1070        }
1071    }
1072
1073    /// Converts this into a byte stream
1074    ///
1075    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1076    /// otherwise will return the [`GetResultPayload::Stream`].
1077    ///
1078    /// # Tokio Compatibility
1079    ///
1080    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1081    /// no major operating systems have stable async file APIs. Therefore if called from
1082    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1083    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1084    ///
1085    /// If not called from a tokio context, this will perform IO on the current thread with
1086    /// no additional complexity or overheads
1087    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1088        match self.payload {
1089            #[cfg(not(target_arch = "wasm32"))]
1090            GetResultPayload::File(file, path) => {
1091                const CHUNK_SIZE: usize = 8 * 1024;
1092                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1093            }
1094            GetResultPayload::Stream(s) => s,
1095            #[cfg(target_arch = "wasm32")]
1096            _ => unimplemented!("File IO not implemented on wasm32."),
1097        }
1098    }
1099}
1100
1101/// Configure preconditions for the put operation
1102#[derive(Debug, Clone, PartialEq, Eq, Default)]
1103pub enum PutMode {
1104    /// Perform an atomic write operation, overwriting any object present at the provided path
1105    #[default]
1106    Overwrite,
1107    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1108    /// object already exists at the provided path
1109    Create,
1110    /// Perform an atomic write operation if the current version of the object matches the
1111    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1112    Update(UpdateVersion),
1113}
1114
1115/// Uniquely identifies a version of an object to update
1116///
1117/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1118/// updates, and it is therefore recommended applications preserve both
1119#[derive(Debug, Clone, PartialEq, Eq)]
1120pub struct UpdateVersion {
1121    /// The unique identifier for the newly created object
1122    ///
1123    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1124    pub e_tag: Option<String>,
1125    /// A version indicator for the newly created object
1126    pub version: Option<String>,
1127}
1128
1129impl From<PutResult> for UpdateVersion {
1130    fn from(value: PutResult) -> Self {
1131        Self {
1132            e_tag: value.e_tag,
1133            version: value.version,
1134        }
1135    }
1136}
1137
1138/// Options for a put request
1139#[derive(Debug, Clone, PartialEq, Eq, Default)]
1140pub struct PutOptions {
1141    /// Configure the [`PutMode`] for this operation
1142    pub mode: PutMode,
1143    /// Provide a [`TagSet`] for this object
1144    ///
1145    /// Implementations that don't support object tagging should ignore this
1146    pub tags: TagSet,
1147    /// Provide a set of [`Attributes`]
1148    ///
1149    /// Implementations that don't support an attribute should return an error
1150    pub attributes: Attributes,
1151}
1152
1153impl From<PutMode> for PutOptions {
1154    fn from(mode: PutMode) -> Self {
1155        Self {
1156            mode,
1157            ..Default::default()
1158        }
1159    }
1160}
1161
1162impl From<TagSet> for PutOptions {
1163    fn from(tags: TagSet) -> Self {
1164        Self {
1165            tags,
1166            ..Default::default()
1167        }
1168    }
1169}
1170
1171impl From<Attributes> for PutOptions {
1172    fn from(attributes: Attributes) -> Self {
1173        Self {
1174            attributes,
1175            ..Default::default()
1176        }
1177    }
1178}
1179
1180/// Options for [`ObjectStore::put_multipart_opts`]
1181#[derive(Debug, Clone, PartialEq, Eq, Default)]
1182pub struct PutMultipartOpts {
1183    /// Provide a [`TagSet`] for this object
1184    ///
1185    /// Implementations that don't support object tagging should ignore this
1186    pub tags: TagSet,
1187    /// Provide a set of [`Attributes`]
1188    ///
1189    /// Implementations that don't support an attribute should return an error
1190    pub attributes: Attributes,
1191}
1192
1193impl From<TagSet> for PutMultipartOpts {
1194    fn from(tags: TagSet) -> Self {
1195        Self {
1196            tags,
1197            ..Default::default()
1198        }
1199    }
1200}
1201
1202impl From<Attributes> for PutMultipartOpts {
1203    fn from(attributes: Attributes) -> Self {
1204        Self {
1205            attributes,
1206            ..Default::default()
1207        }
1208    }
1209}
1210
1211/// Result for a put request
1212#[derive(Debug, Clone, PartialEq, Eq)]
1213pub struct PutResult {
1214    /// The unique identifier for the newly created object
1215    ///
1216    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1217    pub e_tag: Option<String>,
1218    /// A version indicator for the newly created object
1219    pub version: Option<String>,
1220}
1221
1222/// A specialized `Result` for object store-related errors
1223pub type Result<T, E = Error> = std::result::Result<T, E>;
1224
1225/// A specialized `Error` for object store-related errors
1226#[derive(Debug, Snafu)]
1227#[allow(missing_docs)]
1228pub enum Error {
1229    #[snafu(display("Generic {} error: {}", store, source))]
1230    Generic {
1231        store: &'static str,
1232        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1233    },
1234
1235    #[snafu(display("Object at location {} not found: {}", path, source))]
1236    NotFound {
1237        path: String,
1238        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1239    },
1240
1241    #[snafu(
1242        display("Encountered object with invalid path: {}", source),
1243        context(false)
1244    )]
1245    InvalidPath { source: path::Error },
1246
1247    #[snafu(display("Error joining spawned task: {}", source), context(false))]
1248    JoinError { source: tokio::task::JoinError },
1249
1250    #[snafu(display("Operation not supported: {}", source))]
1251    NotSupported {
1252        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1253    },
1254
1255    #[snafu(display("Object at location {} already exists: {}", path, source))]
1256    AlreadyExists {
1257        path: String,
1258        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1259    },
1260
1261    #[snafu(display("Request precondition failure for path {}: {}", path, source))]
1262    Precondition {
1263        path: String,
1264        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1265    },
1266
1267    #[snafu(display("Object at location {} not modified: {}", path, source))]
1268    NotModified {
1269        path: String,
1270        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1271    },
1272
1273    #[snafu(display("Operation not yet implemented."))]
1274    NotImplemented,
1275
1276    #[snafu(display("Configuration key: '{}' is not valid for store '{}'.", key, store))]
1277    UnknownConfigurationKey { store: &'static str, key: String },
1278}
1279
1280impl From<Error> for std::io::Error {
1281    fn from(e: Error) -> Self {
1282        let kind = match &e {
1283            Error::NotFound { .. } => std::io::ErrorKind::NotFound,
1284            _ => std::io::ErrorKind::Other,
1285        };
1286        Self::new(kind, e)
1287    }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292    use super::*;
1293    use crate::buffered::BufWriter;
1294    use chrono::TimeZone;
1295    use tokio::io::AsyncWriteExt;
1296
1297    macro_rules! maybe_skip_integration {
1298        () => {
1299            if std::env::var("TEST_INTEGRATION").is_err() {
1300                eprintln!("Skipping integration test - set TEST_INTEGRATION");
1301                return;
1302            }
1303        };
1304    }
1305    pub(crate) use maybe_skip_integration;
1306
1307    /// Test that the returned stream does not borrow the lifetime of Path
1308    fn list_store<'a>(
1309        store: &'a dyn ObjectStore,
1310        path_str: &str,
1311    ) -> BoxStream<'a, Result<ObjectMeta>> {
1312        let path = Path::from(path_str);
1313        store.list(Some(&path))
1314    }
1315
1316    #[cfg(any(feature = "azure", feature = "aws"))]
1317    pub async fn signing<T>(integration: &T)
1318    where
1319        T: ObjectStore + signer::Signer,
1320    {
1321        use reqwest::Method;
1322        use std::time::Duration;
1323
1324        let data = Bytes::from("hello world");
1325        let path = Path::from("file.txt");
1326        integration.put(&path, data.clone().into()).await.unwrap();
1327
1328        let signed = integration
1329            .signed_url(Method::GET, &path, Duration::from_secs(60))
1330            .await
1331            .unwrap();
1332
1333        let resp = reqwest::get(signed).await.unwrap();
1334        let loaded = resp.bytes().await.unwrap();
1335
1336        assert_eq!(data, loaded);
1337    }
1338
1339    #[cfg(any(feature = "aws", feature = "azure"))]
1340    pub async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
1341    where
1342        F: Fn(Path) -> Fut + Send + Sync,
1343        Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
1344    {
1345        use bytes::Buf;
1346        use serde::Deserialize;
1347
1348        #[derive(Deserialize)]
1349        struct Tagging {
1350            #[serde(rename = "TagSet")]
1351            list: TagList,
1352        }
1353
1354        #[derive(Debug, Deserialize)]
1355        struct TagList {
1356            #[serde(rename = "Tag")]
1357            tags: Vec<Tag>,
1358        }
1359
1360        #[derive(Debug, Deserialize, Eq, PartialEq)]
1361        #[serde(rename_all = "PascalCase")]
1362        struct Tag {
1363            key: String,
1364            value: String,
1365        }
1366
1367        let tags = vec![
1368            Tag {
1369                key: "foo.com=bar/s".to_string(),
1370                value: "bananas/foo.com-_".to_string(),
1371            },
1372            Tag {
1373                key: "namespace/key.foo".to_string(),
1374                value: "value with a space".to_string(),
1375            },
1376        ];
1377        let mut tag_set = TagSet::default();
1378        for t in &tags {
1379            tag_set.push(&t.key, &t.value)
1380        }
1381
1382        let path = Path::from("tag_test");
1383        storage
1384            .put_opts(&path, "test".into(), tag_set.clone().into())
1385            .await
1386            .unwrap();
1387
1388        let multi_path = Path::from("tag_test_multi");
1389        let mut write = storage
1390            .put_multipart_opts(&multi_path, tag_set.clone().into())
1391            .await
1392            .unwrap();
1393
1394        write.put_part("foo".into()).await.unwrap();
1395        write.complete().await.unwrap();
1396
1397        let buf_path = Path::from("tag_test_buf");
1398        let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
1399        buf.write_all(b"foo").await.unwrap();
1400        buf.shutdown().await.unwrap();
1401
1402        // Write should always succeed, but certain configurations may simply ignore tags
1403        if !validate {
1404            return;
1405        }
1406
1407        for path in [path, multi_path, buf_path] {
1408            let resp = get_tags(path.clone()).await.unwrap();
1409            let body = resp.bytes().await.unwrap();
1410
1411            let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
1412            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
1413            assert_eq!(resp.list.tags, tags);
1414        }
1415    }
1416
1417    #[tokio::test]
1418    async fn test_list_lifetimes() {
1419        let store = memory::InMemory::new();
1420        let mut stream = list_store(&store, "path");
1421        assert!(stream.next().await.is_none());
1422    }
1423
1424    #[test]
1425    fn test_preconditions() {
1426        let mut meta = ObjectMeta {
1427            location: Path::from("test"),
1428            last_modified: Utc.timestamp_nanos(100),
1429            size: 100,
1430            e_tag: Some("123".to_string()),
1431            version: None,
1432        };
1433
1434        let mut options = GetOptions::default();
1435        options.check_preconditions(&meta).unwrap();
1436
1437        options.if_modified_since = Some(Utc.timestamp_nanos(50));
1438        options.check_preconditions(&meta).unwrap();
1439
1440        options.if_modified_since = Some(Utc.timestamp_nanos(100));
1441        options.check_preconditions(&meta).unwrap_err();
1442
1443        options.if_modified_since = Some(Utc.timestamp_nanos(101));
1444        options.check_preconditions(&meta).unwrap_err();
1445
1446        options = GetOptions::default();
1447
1448        options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
1449        options.check_preconditions(&meta).unwrap_err();
1450
1451        options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
1452        options.check_preconditions(&meta).unwrap();
1453
1454        options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
1455        options.check_preconditions(&meta).unwrap();
1456
1457        options = GetOptions::default();
1458
1459        options.if_match = Some("123".to_string());
1460        options.check_preconditions(&meta).unwrap();
1461
1462        options.if_match = Some("123,354".to_string());
1463        options.check_preconditions(&meta).unwrap();
1464
1465        options.if_match = Some("354, 123,".to_string());
1466        options.check_preconditions(&meta).unwrap();
1467
1468        options.if_match = Some("354".to_string());
1469        options.check_preconditions(&meta).unwrap_err();
1470
1471        options.if_match = Some("*".to_string());
1472        options.check_preconditions(&meta).unwrap();
1473
1474        // If-Match takes precedence
1475        options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
1476        options.check_preconditions(&meta).unwrap();
1477
1478        options = GetOptions::default();
1479
1480        options.if_none_match = Some("123".to_string());
1481        options.check_preconditions(&meta).unwrap_err();
1482
1483        options.if_none_match = Some("*".to_string());
1484        options.check_preconditions(&meta).unwrap_err();
1485
1486        options.if_none_match = Some("1232".to_string());
1487        options.check_preconditions(&meta).unwrap();
1488
1489        options.if_none_match = Some("23, 123".to_string());
1490        options.check_preconditions(&meta).unwrap_err();
1491
1492        // If-None-Match takes precedence
1493        options.if_modified_since = Some(Utc.timestamp_nanos(10));
1494        options.check_preconditions(&meta).unwrap_err();
1495
1496        // Check missing ETag
1497        meta.e_tag = None;
1498        options = GetOptions::default();
1499
1500        options.if_none_match = Some("*".to_string()); // Fails if any file exists
1501        options.check_preconditions(&meta).unwrap_err();
1502
1503        options = GetOptions::default();
1504        options.if_match = Some("*".to_string()); // Passes if file exists
1505        options.check_preconditions(&meta).unwrap();
1506    }
1507}