1use async_trait::async_trait;
32use futures::stream::BoxStream;
33use futures::{StreamExt, TryStreamExt};
34use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
35use reqwest::{Method, StatusCode};
36use std::{sync::Arc, time::Duration};
37use url::Url;
38
39use crate::aws::client::{RequestError, S3Client};
40use crate::client::get::GetClientExt;
41use crate::client::list::ListClientExt;
42use crate::client::CredentialProvider;
43use crate::multipart::{MultipartStore, PartId};
44use crate::signer::Signer;
45use crate::util::STRICT_ENCODE_SET;
46use crate::{
47 Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
48 ObjectStore, Path, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
49 UploadPart,
50};
51
52static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
53static COPY_SOURCE_HEADER: HeaderName = HeaderName::from_static("x-amz-copy-source");
54
55mod builder;
56mod checksum;
57mod client;
58mod credential;
59mod dynamo;
60mod precondition;
61mod resolve;
62
63pub use builder::{AmazonS3Builder, AmazonS3ConfigKey, S3EncryptionHeaders};
64pub use checksum::Checksum;
65pub use dynamo::DynamoCommit;
66pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
67pub use resolve::resolve_bucket_region;
68
69const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
71
72const STORE: &str = "S3";
73
74pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
76use crate::client::parts::Parts;
77pub use credential::{AwsAuthorizer, AwsCredential};
78
79#[derive(Debug)]
81pub struct AmazonS3 {
82 client: Arc<S3Client>,
83}
84
85impl std::fmt::Display for AmazonS3 {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "AmazonS3({})", self.client.config.bucket)
88 }
89}
90
91impl AmazonS3 {
92 pub fn credentials(&self) -> &AwsCredentialProvider {
94 &self.client.config.credentials
95 }
96
97 fn path_url(&self, path: &Path) -> String {
99 self.client.config.path_url(path)
100 }
101}
102
103#[async_trait]
104impl Signer for AmazonS3 {
105 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
138 let credential = self.credentials().get_credential().await?;
139 let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region);
140
141 let path_url = self.path_url(path);
142 let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
143 store: STORE,
144 source: format!("Unable to parse url {path_url}: {e}").into(),
145 })?;
146
147 authorizer.sign(method, &mut url, expires_in);
148
149 Ok(url)
150 }
151}
152
153#[async_trait]
154impl ObjectStore for AmazonS3 {
155 async fn put_opts(
156 &self,
157 location: &Path,
158 payload: PutPayload,
159 opts: PutOptions,
160 ) -> Result<PutResult> {
161 let request = self
162 .client
163 .request(Method::PUT, location)
164 .with_payload(payload)
165 .with_attributes(opts.attributes)
166 .with_tags(opts.tags)
167 .with_encryption_headers();
168
169 match (opts.mode, &self.client.config.conditional_put) {
170 (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
171 (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
172 (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
173 match request.header(&IF_NONE_MATCH, "*").do_put().await {
174 Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => {
178 Err(Error::AlreadyExists {
179 path: location.to_string(),
180 source: Box::new(e),
181 })
182 }
183 r => r,
184 }
185 }
186 (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
187 d.conditional_op(&self.client, location, None, move || request.do_put())
188 .await
189 }
190 (PutMode::Update(v), Some(put)) => {
191 let etag = v.e_tag.ok_or_else(|| Error::Generic {
192 store: STORE,
193 source: "ETag required for conditional put".to_string().into(),
194 })?;
195 match put {
196 S3ConditionalPut::ETagMatch => {
197 request.header(&IF_MATCH, etag.as_str()).do_put().await
198 }
199 S3ConditionalPut::Dynamo(d) => {
200 d.conditional_op(&self.client, location, Some(&etag), move || {
201 request.do_put()
202 })
203 .await
204 }
205 }
206 }
207 }
208 }
209
210 async fn put_multipart_opts(
211 &self,
212 location: &Path,
213 opts: PutMultipartOpts,
214 ) -> Result<Box<dyn MultipartUpload>> {
215 let upload_id = self.client.create_multipart(location, opts).await?;
216
217 Ok(Box::new(S3MultiPartUpload {
218 part_idx: 0,
219 state: Arc::new(UploadState {
220 client: Arc::clone(&self.client),
221 location: location.clone(),
222 upload_id: upload_id.clone(),
223 parts: Default::default(),
224 }),
225 }))
226 }
227
228 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
229 self.client.get_opts(location, options).await
230 }
231
232 async fn delete(&self, location: &Path) -> Result<()> {
233 self.client.request(Method::DELETE, location).send().await?;
234 Ok(())
235 }
236
237 fn delete_stream<'a>(
238 &'a self,
239 locations: BoxStream<'a, Result<Path>>,
240 ) -> BoxStream<'a, Result<Path>> {
241 locations
242 .try_chunks(1_000)
243 .map(move |locations| async {
244 let locations = locations.map_err(|e| e.1)?;
247 self.client
248 .bulk_delete_request(locations)
249 .await
250 .map(futures::stream::iter)
251 })
252 .buffered(20)
253 .try_flatten()
254 .boxed()
255 }
256
257 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
258 self.client.list(prefix)
259 }
260
261 fn list_with_offset(
262 &self,
263 prefix: Option<&Path>,
264 offset: &Path,
265 ) -> BoxStream<'_, Result<ObjectMeta>> {
266 if self.client.config.is_s3_express() {
267 let offset = offset.clone();
268 return self
270 .client
271 .list(prefix)
272 .try_filter(move |f| futures::future::ready(f.location > offset))
273 .boxed();
274 }
275
276 self.client.list_with_offset(prefix, offset)
277 }
278
279 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
280 self.client.list_with_delimiter(prefix).await
281 }
282
283 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
284 self.client
285 .copy_request(from, to)
286 .idempotent(true)
287 .send()
288 .await?;
289 Ok(())
290 }
291
292 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
293 let (k, v, status) = match &self.client.config.copy_if_not_exists {
294 Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
295 Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
296 Some(S3CopyIfNotExists::Dynamo(lock)) => {
297 return lock.copy_if_not_exists(&self.client, from, to).await
298 }
299 None => {
300 return Err(Error::NotSupported {
301 source: "S3 does not support copy-if-not-exists".to_string().into(),
302 })
303 }
304 };
305
306 let req = self.client.copy_request(from, to);
307 match req.header(k, v).send().await {
308 Err(RequestError::Retry { source, path }) if source.status() == Some(status) => {
309 Err(Error::AlreadyExists {
310 source: Box::new(source),
311 path,
312 })
313 }
314 Err(e) => Err(e.into()),
315 Ok(_) => Ok(()),
316 }
317 }
318}
319
320#[derive(Debug)]
321struct S3MultiPartUpload {
322 part_idx: usize,
323 state: Arc<UploadState>,
324}
325
326#[derive(Debug)]
327struct UploadState {
328 parts: Parts,
329 location: Path,
330 upload_id: String,
331 client: Arc<S3Client>,
332}
333
334#[async_trait]
335impl MultipartUpload for S3MultiPartUpload {
336 fn put_part(&mut self, data: PutPayload) -> UploadPart {
337 let idx = self.part_idx;
338 self.part_idx += 1;
339 let state = Arc::clone(&self.state);
340 Box::pin(async move {
341 let part = state
342 .client
343 .put_part(&state.location, &state.upload_id, idx, data)
344 .await?;
345 state.parts.put(idx, part);
346 Ok(())
347 })
348 }
349
350 async fn complete(&mut self) -> Result<PutResult> {
351 let parts = self.state.parts.finish(self.part_idx)?;
352
353 self.state
354 .client
355 .complete_multipart(&self.state.location, &self.state.upload_id, parts)
356 .await
357 }
358
359 async fn abort(&mut self) -> Result<()> {
360 self.state
361 .client
362 .request(Method::DELETE, &self.state.location)
363 .query(&[("uploadId", &self.state.upload_id)])
364 .idempotent(true)
365 .send()
366 .await?;
367
368 Ok(())
369 }
370}
371
372#[async_trait]
373impl MultipartStore for AmazonS3 {
374 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
375 self.client
376 .create_multipart(path, PutMultipartOpts::default())
377 .await
378 }
379
380 async fn put_part(
381 &self,
382 path: &Path,
383 id: &MultipartId,
384 part_idx: usize,
385 data: PutPayload,
386 ) -> Result<PartId> {
387 self.client.put_part(path, id, part_idx, data).await
388 }
389
390 async fn complete_multipart(
391 &self,
392 path: &Path,
393 id: &MultipartId,
394 parts: Vec<PartId>,
395 ) -> Result<PutResult> {
396 self.client.complete_multipart(path, id, parts).await
397 }
398
399 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
400 self.client
401 .request(Method::DELETE, path)
402 .query(&[("uploadId", id)])
403 .send()
404 .await?;
405 Ok(())
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use crate::client::get::GetClient;
413 use crate::integration::*;
414 use crate::tests::*;
415 use hyper::HeaderMap;
416
417 const NON_EXISTENT_NAME: &str = "nonexistentname";
418
419 #[tokio::test]
420 async fn s3_test() {
421 maybe_skip_integration!();
422 let config = AmazonS3Builder::from_env();
423
424 let integration = config.build().unwrap();
425 let config = &integration.client.config;
426 let test_not_exists = config.copy_if_not_exists.is_some();
427 let test_conditional_put = config.conditional_put.is_some();
428
429 put_get_delete_list(&integration).await;
430 get_opts(&integration).await;
431 list_uses_directories_correctly(&integration).await;
432 list_with_delimiter(&integration).await;
433 rename_and_copy(&integration).await;
434 stream_get(&integration).await;
435 multipart(&integration, &integration).await;
436 signing(&integration).await;
437 s3_encryption(&integration).await;
438 put_get_attributes(&integration).await;
439
440 if config.session_provider.is_none() {
442 tagging(
443 Arc::new(AmazonS3 {
444 client: Arc::clone(&integration.client),
445 }),
446 !config.disable_tagging,
447 |p| {
448 let client = Arc::clone(&integration.client);
449 async move { client.get_object_tagging(&p).await }
450 },
451 )
452 .await;
453 }
454
455 if test_not_exists {
456 copy_if_not_exists(&integration).await;
457 }
458 if test_conditional_put {
459 put_opts(&integration, true).await;
460 }
461
462 let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
464 let integration = builder.build().unwrap();
465 put_get_delete_list(&integration).await;
466
467 let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
469 let integration = builder.build().unwrap();
470 put_get_delete_list(&integration).await;
471
472 match &integration.client.config.copy_if_not_exists {
473 Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await,
474 _ => eprintln!("Skipping dynamo integration test - dynamo not configured"),
475 };
476 }
477
478 #[tokio::test]
479 async fn s3_test_get_nonexistent_location() {
480 maybe_skip_integration!();
481 let integration = AmazonS3Builder::from_env().build().unwrap();
482
483 let location = Path::from_iter([NON_EXISTENT_NAME]);
484
485 let err = get_nonexistent_object(&integration, Some(location))
486 .await
487 .unwrap_err();
488 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
489 }
490
491 #[tokio::test]
492 async fn s3_test_get_nonexistent_bucket() {
493 maybe_skip_integration!();
494 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
495 let integration = config.build().unwrap();
496
497 let location = Path::from_iter([NON_EXISTENT_NAME]);
498
499 let err = integration.get(&location).await.unwrap_err();
500 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
501 }
502
503 #[tokio::test]
504 async fn s3_test_put_nonexistent_bucket() {
505 maybe_skip_integration!();
506 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
507 let integration = config.build().unwrap();
508
509 let location = Path::from_iter([NON_EXISTENT_NAME]);
510 let data = PutPayload::from("arbitrary data");
511
512 let err = integration.put(&location, data).await.unwrap_err();
513 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
514 }
515
516 #[tokio::test]
517 async fn s3_test_delete_nonexistent_location() {
518 maybe_skip_integration!();
519 let integration = AmazonS3Builder::from_env().build().unwrap();
520
521 let location = Path::from_iter([NON_EXISTENT_NAME]);
522
523 integration.delete(&location).await.unwrap();
524 }
525
526 #[tokio::test]
527 async fn s3_test_delete_nonexistent_bucket() {
528 maybe_skip_integration!();
529 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
530 let integration = config.build().unwrap();
531
532 let location = Path::from_iter([NON_EXISTENT_NAME]);
533
534 let err = integration.delete(&location).await.unwrap_err();
535 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
536 }
537
538 #[tokio::test]
539 #[ignore = "Tests shouldn't call use remote services by default"]
540 async fn test_disable_creds() {
541 let v1 = AmazonS3Builder::new()
543 .with_bucket_name("daylight-map-distribution")
544 .with_region("us-west-1")
545 .with_access_key_id("local")
546 .with_secret_access_key("development")
547 .build()
548 .unwrap();
549
550 let prefix = Path::from("release");
551
552 v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
553
554 let v2 = AmazonS3Builder::new()
555 .with_bucket_name("daylight-map-distribution")
556 .with_region("us-west-1")
557 .with_skip_signature(true)
558 .build()
559 .unwrap();
560
561 v2.list_with_delimiter(Some(&prefix)).await.unwrap();
562 }
563
564 async fn s3_encryption(store: &AmazonS3) {
565 maybe_skip_integration!();
566
567 let data = PutPayload::from(vec![3u8; 1024]);
568
569 let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
570 let expected_encryption =
571 if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") {
572 encryption_type
573 } else {
574 eprintln!("Skipping S3 encryption test - encryption not configured");
575 return;
576 };
577
578 let locations = [
579 Path::from("test-encryption-1"),
580 Path::from("test-encryption-2"),
581 Path::from("test-encryption-3"),
582 ];
583
584 store.put(&locations[0], data.clone()).await.unwrap();
585 store.copy(&locations[0], &locations[1]).await.unwrap();
586
587 let mut upload = store.put_multipart(&locations[2]).await.unwrap();
588 upload.put_part(data.clone()).await.unwrap();
589 upload.complete().await.unwrap();
590
591 for location in &locations {
592 let res = store
593 .client
594 .get_request(location, GetOptions::default())
595 .await
596 .unwrap();
597 let headers = res.headers();
598 assert_eq!(
599 headers
600 .get("x-amz-server-side-encryption")
601 .expect("object is not encrypted"),
602 expected_encryption
603 );
604
605 store.delete(location).await.unwrap();
606 }
607 }
608}