object_store/aws/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for S3
19//!
20//! ## Multipart uploads
21//!
22//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method.
23//!
24//! If the writer fails for any reason, you may have parts uploaded to AWS but not
25//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
26//! these unneeded parts, however, it is recommended that you consider implementing
27//! [automatic cleanup] of unused parts that are older than some threshold.
28//!
29//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
30
31use async_trait::async_trait;
32use futures::stream::BoxStream;
33use futures::{StreamExt, TryStreamExt};
34use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
35use reqwest::{Method, StatusCode};
36use std::{sync::Arc, time::Duration};
37use url::Url;
38
39use crate::aws::client::{RequestError, S3Client};
40use crate::client::get::GetClientExt;
41use crate::client::list::ListClientExt;
42use crate::client::CredentialProvider;
43use crate::multipart::{MultipartStore, PartId};
44use crate::signer::Signer;
45use crate::util::STRICT_ENCODE_SET;
46use crate::{
47    Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
48    ObjectStore, Path, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
49    UploadPart,
50};
51
52static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
53static COPY_SOURCE_HEADER: HeaderName = HeaderName::from_static("x-amz-copy-source");
54
55mod builder;
56mod checksum;
57mod client;
58mod credential;
59mod dynamo;
60mod precondition;
61mod resolve;
62
63pub use builder::{AmazonS3Builder, AmazonS3ConfigKey, S3EncryptionHeaders};
64pub use checksum::Checksum;
65pub use dynamo::DynamoCommit;
66pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
67pub use resolve::resolve_bucket_region;
68
69/// This struct is used to maintain the URI path encoding
70const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
71
72const STORE: &str = "S3";
73
74/// [`CredentialProvider`] for [`AmazonS3`]
75pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
76use crate::client::parts::Parts;
77pub use credential::{AwsAuthorizer, AwsCredential};
78
79/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
80#[derive(Debug)]
81pub struct AmazonS3 {
82    client: Arc<S3Client>,
83}
84
85impl std::fmt::Display for AmazonS3 {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(f, "AmazonS3({})", self.client.config.bucket)
88    }
89}
90
91impl AmazonS3 {
92    /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`]
93    pub fn credentials(&self) -> &AwsCredentialProvider {
94        &self.client.config.credentials
95    }
96
97    /// Create a full URL to the resource specified by `path` with this instance's configuration.
98    fn path_url(&self, path: &Path) -> String {
99        self.client.config.path_url(path)
100    }
101}
102
103#[async_trait]
104impl Signer for AmazonS3 {
105    /// Create a URL containing the relevant [AWS SigV4] query parameters that authorize a request
106    /// via `method` to the resource at `path` valid for the duration specified in `expires_in`.
107    ///
108    /// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
109    ///
110    /// # Example
111    ///
112    /// This example returns a URL that will enable a user to upload a file to
113    /// "some-folder/some-file.txt" in the next hour.
114    ///
115    /// ```
116    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
117    /// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
118    /// # use reqwest::Method;
119    /// # use std::time::Duration;
120    /// #
121    /// let region = "us-east-1";
122    /// let s3 = AmazonS3Builder::new()
123    ///     .with_region(region)
124    ///     .with_bucket_name("my-bucket")
125    ///     .with_access_key_id("my-access-key-id")
126    ///     .with_secret_access_key("my-secret-access-key")
127    ///     .build()?;
128    ///
129    /// let url = s3.signed_url(
130    ///     Method::PUT,
131    ///     &Path::from("some-folder/some-file.txt"),
132    ///     Duration::from_secs(60 * 60)
133    /// ).await?;
134    /// #     Ok(())
135    /// # }
136    /// ```
137    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
138        let credential = self.credentials().get_credential().await?;
139        let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region);
140
141        let path_url = self.path_url(path);
142        let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
143            store: STORE,
144            source: format!("Unable to parse url {path_url}: {e}").into(),
145        })?;
146
147        authorizer.sign(method, &mut url, expires_in);
148
149        Ok(url)
150    }
151}
152
153#[async_trait]
154impl ObjectStore for AmazonS3 {
155    async fn put_opts(
156        &self,
157        location: &Path,
158        payload: PutPayload,
159        opts: PutOptions,
160    ) -> Result<PutResult> {
161        let request = self
162            .client
163            .request(Method::PUT, location)
164            .with_payload(payload)
165            .with_attributes(opts.attributes)
166            .with_tags(opts.tags)
167            .with_encryption_headers();
168
169        match (opts.mode, &self.client.config.conditional_put) {
170            (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
171            (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
172            (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
173                match request.header(&IF_NONE_MATCH, "*").do_put().await {
174                    // Technically If-None-Match should return NotModified but some stores,
175                    // such as R2, instead return PreconditionFailed
176                    // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject
177                    Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => {
178                        Err(Error::AlreadyExists {
179                            path: location.to_string(),
180                            source: Box::new(e),
181                        })
182                    }
183                    r => r,
184                }
185            }
186            (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
187                d.conditional_op(&self.client, location, None, move || request.do_put())
188                    .await
189            }
190            (PutMode::Update(v), Some(put)) => {
191                let etag = v.e_tag.ok_or_else(|| Error::Generic {
192                    store: STORE,
193                    source: "ETag required for conditional put".to_string().into(),
194                })?;
195                match put {
196                    S3ConditionalPut::ETagMatch => {
197                        request.header(&IF_MATCH, etag.as_str()).do_put().await
198                    }
199                    S3ConditionalPut::Dynamo(d) => {
200                        d.conditional_op(&self.client, location, Some(&etag), move || {
201                            request.do_put()
202                        })
203                        .await
204                    }
205                }
206            }
207        }
208    }
209
210    async fn put_multipart_opts(
211        &self,
212        location: &Path,
213        opts: PutMultipartOpts,
214    ) -> Result<Box<dyn MultipartUpload>> {
215        let upload_id = self.client.create_multipart(location, opts).await?;
216
217        Ok(Box::new(S3MultiPartUpload {
218            part_idx: 0,
219            state: Arc::new(UploadState {
220                client: Arc::clone(&self.client),
221                location: location.clone(),
222                upload_id: upload_id.clone(),
223                parts: Default::default(),
224            }),
225        }))
226    }
227
228    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
229        self.client.get_opts(location, options).await
230    }
231
232    async fn delete(&self, location: &Path) -> Result<()> {
233        self.client.request(Method::DELETE, location).send().await?;
234        Ok(())
235    }
236
237    fn delete_stream<'a>(
238        &'a self,
239        locations: BoxStream<'a, Result<Path>>,
240    ) -> BoxStream<'a, Result<Path>> {
241        locations
242            .try_chunks(1_000)
243            .map(move |locations| async {
244                // Early return the error. We ignore the paths that have already been
245                // collected into the chunk.
246                let locations = locations.map_err(|e| e.1)?;
247                self.client
248                    .bulk_delete_request(locations)
249                    .await
250                    .map(futures::stream::iter)
251            })
252            .buffered(20)
253            .try_flatten()
254            .boxed()
255    }
256
257    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
258        self.client.list(prefix)
259    }
260
261    fn list_with_offset(
262        &self,
263        prefix: Option<&Path>,
264        offset: &Path,
265    ) -> BoxStream<'_, Result<ObjectMeta>> {
266        if self.client.config.is_s3_express() {
267            let offset = offset.clone();
268            // S3 Express does not support start-after
269            return self
270                .client
271                .list(prefix)
272                .try_filter(move |f| futures::future::ready(f.location > offset))
273                .boxed();
274        }
275
276        self.client.list_with_offset(prefix, offset)
277    }
278
279    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
280        self.client.list_with_delimiter(prefix).await
281    }
282
283    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
284        self.client
285            .copy_request(from, to)
286            .idempotent(true)
287            .send()
288            .await?;
289        Ok(())
290    }
291
292    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
293        let (k, v, status) = match &self.client.config.copy_if_not_exists {
294            Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
295            Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
296            Some(S3CopyIfNotExists::Dynamo(lock)) => {
297                return lock.copy_if_not_exists(&self.client, from, to).await
298            }
299            None => {
300                return Err(Error::NotSupported {
301                    source: "S3 does not support copy-if-not-exists".to_string().into(),
302                })
303            }
304        };
305
306        let req = self.client.copy_request(from, to);
307        match req.header(k, v).send().await {
308            Err(RequestError::Retry { source, path }) if source.status() == Some(status) => {
309                Err(Error::AlreadyExists {
310                    source: Box::new(source),
311                    path,
312                })
313            }
314            Err(e) => Err(e.into()),
315            Ok(_) => Ok(()),
316        }
317    }
318}
319
320#[derive(Debug)]
321struct S3MultiPartUpload {
322    part_idx: usize,
323    state: Arc<UploadState>,
324}
325
326#[derive(Debug)]
327struct UploadState {
328    parts: Parts,
329    location: Path,
330    upload_id: String,
331    client: Arc<S3Client>,
332}
333
334#[async_trait]
335impl MultipartUpload for S3MultiPartUpload {
336    fn put_part(&mut self, data: PutPayload) -> UploadPart {
337        let idx = self.part_idx;
338        self.part_idx += 1;
339        let state = Arc::clone(&self.state);
340        Box::pin(async move {
341            let part = state
342                .client
343                .put_part(&state.location, &state.upload_id, idx, data)
344                .await?;
345            state.parts.put(idx, part);
346            Ok(())
347        })
348    }
349
350    async fn complete(&mut self) -> Result<PutResult> {
351        let parts = self.state.parts.finish(self.part_idx)?;
352
353        self.state
354            .client
355            .complete_multipart(&self.state.location, &self.state.upload_id, parts)
356            .await
357    }
358
359    async fn abort(&mut self) -> Result<()> {
360        self.state
361            .client
362            .request(Method::DELETE, &self.state.location)
363            .query(&[("uploadId", &self.state.upload_id)])
364            .idempotent(true)
365            .send()
366            .await?;
367
368        Ok(())
369    }
370}
371
372#[async_trait]
373impl MultipartStore for AmazonS3 {
374    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
375        self.client
376            .create_multipart(path, PutMultipartOpts::default())
377            .await
378    }
379
380    async fn put_part(
381        &self,
382        path: &Path,
383        id: &MultipartId,
384        part_idx: usize,
385        data: PutPayload,
386    ) -> Result<PartId> {
387        self.client.put_part(path, id, part_idx, data).await
388    }
389
390    async fn complete_multipart(
391        &self,
392        path: &Path,
393        id: &MultipartId,
394        parts: Vec<PartId>,
395    ) -> Result<PutResult> {
396        self.client.complete_multipart(path, id, parts).await
397    }
398
399    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
400        self.client
401            .request(Method::DELETE, path)
402            .query(&[("uploadId", id)])
403            .send()
404            .await?;
405        Ok(())
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use crate::client::get::GetClient;
413    use crate::integration::*;
414    use crate::tests::*;
415    use hyper::HeaderMap;
416
417    const NON_EXISTENT_NAME: &str = "nonexistentname";
418
419    #[tokio::test]
420    async fn s3_test() {
421        maybe_skip_integration!();
422        let config = AmazonS3Builder::from_env();
423
424        let integration = config.build().unwrap();
425        let config = &integration.client.config;
426        let test_not_exists = config.copy_if_not_exists.is_some();
427        let test_conditional_put = config.conditional_put.is_some();
428
429        put_get_delete_list(&integration).await;
430        get_opts(&integration).await;
431        list_uses_directories_correctly(&integration).await;
432        list_with_delimiter(&integration).await;
433        rename_and_copy(&integration).await;
434        stream_get(&integration).await;
435        multipart(&integration, &integration).await;
436        signing(&integration).await;
437        s3_encryption(&integration).await;
438        put_get_attributes(&integration).await;
439
440        // Object tagging is not supported by S3 Express One Zone
441        if config.session_provider.is_none() {
442            tagging(
443                Arc::new(AmazonS3 {
444                    client: Arc::clone(&integration.client),
445                }),
446                !config.disable_tagging,
447                |p| {
448                    let client = Arc::clone(&integration.client);
449                    async move { client.get_object_tagging(&p).await }
450                },
451            )
452            .await;
453        }
454
455        if test_not_exists {
456            copy_if_not_exists(&integration).await;
457        }
458        if test_conditional_put {
459            put_opts(&integration, true).await;
460        }
461
462        // run integration test with unsigned payload enabled
463        let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
464        let integration = builder.build().unwrap();
465        put_get_delete_list(&integration).await;
466
467        // run integration test with checksum set to sha256
468        let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
469        let integration = builder.build().unwrap();
470        put_get_delete_list(&integration).await;
471
472        match &integration.client.config.copy_if_not_exists {
473            Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await,
474            _ => eprintln!("Skipping dynamo integration test - dynamo not configured"),
475        };
476    }
477
478    #[tokio::test]
479    async fn s3_test_get_nonexistent_location() {
480        maybe_skip_integration!();
481        let integration = AmazonS3Builder::from_env().build().unwrap();
482
483        let location = Path::from_iter([NON_EXISTENT_NAME]);
484
485        let err = get_nonexistent_object(&integration, Some(location))
486            .await
487            .unwrap_err();
488        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
489    }
490
491    #[tokio::test]
492    async fn s3_test_get_nonexistent_bucket() {
493        maybe_skip_integration!();
494        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
495        let integration = config.build().unwrap();
496
497        let location = Path::from_iter([NON_EXISTENT_NAME]);
498
499        let err = integration.get(&location).await.unwrap_err();
500        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
501    }
502
503    #[tokio::test]
504    async fn s3_test_put_nonexistent_bucket() {
505        maybe_skip_integration!();
506        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
507        let integration = config.build().unwrap();
508
509        let location = Path::from_iter([NON_EXISTENT_NAME]);
510        let data = PutPayload::from("arbitrary data");
511
512        let err = integration.put(&location, data).await.unwrap_err();
513        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
514    }
515
516    #[tokio::test]
517    async fn s3_test_delete_nonexistent_location() {
518        maybe_skip_integration!();
519        let integration = AmazonS3Builder::from_env().build().unwrap();
520
521        let location = Path::from_iter([NON_EXISTENT_NAME]);
522
523        integration.delete(&location).await.unwrap();
524    }
525
526    #[tokio::test]
527    async fn s3_test_delete_nonexistent_bucket() {
528        maybe_skip_integration!();
529        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
530        let integration = config.build().unwrap();
531
532        let location = Path::from_iter([NON_EXISTENT_NAME]);
533
534        let err = integration.delete(&location).await.unwrap_err();
535        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
536    }
537
538    #[tokio::test]
539    #[ignore = "Tests shouldn't call use remote services by default"]
540    async fn test_disable_creds() {
541        // https://registry.opendata.aws/daylight-osm/
542        let v1 = AmazonS3Builder::new()
543            .with_bucket_name("daylight-map-distribution")
544            .with_region("us-west-1")
545            .with_access_key_id("local")
546            .with_secret_access_key("development")
547            .build()
548            .unwrap();
549
550        let prefix = Path::from("release");
551
552        v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
553
554        let v2 = AmazonS3Builder::new()
555            .with_bucket_name("daylight-map-distribution")
556            .with_region("us-west-1")
557            .with_skip_signature(true)
558            .build()
559            .unwrap();
560
561        v2.list_with_delimiter(Some(&prefix)).await.unwrap();
562    }
563
564    async fn s3_encryption(store: &AmazonS3) {
565        maybe_skip_integration!();
566
567        let data = PutPayload::from(vec![3u8; 1024]);
568
569        let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
570        let expected_encryption =
571            if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") {
572                encryption_type
573            } else {
574                eprintln!("Skipping S3 encryption test - encryption not configured");
575                return;
576            };
577
578        let locations = [
579            Path::from("test-encryption-1"),
580            Path::from("test-encryption-2"),
581            Path::from("test-encryption-3"),
582        ];
583
584        store.put(&locations[0], data.clone()).await.unwrap();
585        store.copy(&locations[0], &locations[1]).await.unwrap();
586
587        let mut upload = store.put_multipart(&locations[2]).await.unwrap();
588        upload.put_part(data.clone()).await.unwrap();
589        upload.complete().await.unwrap();
590
591        for location in &locations {
592            let res = store
593                .client
594                .get_request(location, GetOptions::default())
595                .await
596                .unwrap();
597            let headers = res.headers();
598            assert_eq!(
599                headers
600                    .get("x-amz-server-side-encryption")
601                    .expect("object is not encrypted"),
602                expected_encryption
603            );
604
605            store.delete(location).await.unwrap();
606        }
607    }
608}