1use crate::aws::client::{S3Client, S3Config};
19use crate::aws::credential::{
20 InstanceCredentialProvider, SessionProvider, TaskCredentialProvider, WebIdentityProvider,
21};
22use crate::aws::{
23 AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
24 STORE,
25};
26use crate::client::TokenCredentialProvider;
27use crate::config::ConfigValue;
28use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
29use itertools::Itertools;
30use reqwest::header::{HeaderMap, HeaderValue};
31use serde::{Deserialize, Serialize};
32use snafu::{OptionExt, ResultExt, Snafu};
33use std::str::FromStr;
34use std::sync::Arc;
35use std::time::Duration;
36use tracing::info;
37use url::Url;
38
39static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";
41
42#[derive(Debug, Snafu)]
44#[allow(missing_docs)]
45enum Error {
46 #[snafu(display("Missing bucket name"))]
47 MissingBucketName,
48
49 #[snafu(display("Missing AccessKeyId"))]
50 MissingAccessKeyId,
51
52 #[snafu(display("Missing SecretAccessKey"))]
53 MissingSecretAccessKey,
54
55 #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
56 UnableToParseUrl {
57 source: url::ParseError,
58 url: String,
59 },
60
61 #[snafu(display(
62 "Unknown url scheme cannot be parsed into storage location: {}",
63 scheme
64 ))]
65 UnknownUrlScheme { scheme: String },
66
67 #[snafu(display("URL did not match any known pattern for scheme: {}", url))]
68 UrlNotRecognised { url: String },
69
70 #[snafu(display("Configuration key: '{}' is not known.", key))]
71 UnknownConfigurationKey { key: String },
72
73 #[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
74 ZoneSuffix { bucket: String },
75
76 #[snafu(display("Invalid encryption type: {}. Valid values are \"AES256\", \"sse:kms\", and \"sse:kms:dsse\".", passed))]
77 InvalidEncryptionType { passed: String },
78
79 #[snafu(display(
80 "Invalid encryption header values. Header: {}, source: {}",
81 header,
82 source
83 ))]
84 InvalidEncryptionHeader {
85 header: &'static str,
86 source: Box<dyn std::error::Error + Send + Sync + 'static>,
87 },
88}
89
90impl From<Error> for crate::Error {
91 fn from(source: Error) -> Self {
92 match source {
93 Error::UnknownConfigurationKey { key } => {
94 Self::UnknownConfigurationKey { store: STORE, key }
95 }
96 _ => Self::Generic {
97 store: STORE,
98 source: Box::new(source),
99 },
100 }
101 }
102}
103
104#[derive(Debug, Default, Clone)]
122pub struct AmazonS3Builder {
123 access_key_id: Option<String>,
125 secret_access_key: Option<String>,
127 region: Option<String>,
129 bucket_name: Option<String>,
131 endpoint: Option<String>,
133 token: Option<String>,
135 url: Option<String>,
137 retry_config: RetryConfig,
139 imdsv1_fallback: ConfigValue<bool>,
141 virtual_hosted_style_request: ConfigValue<bool>,
143 s3_express: ConfigValue<bool>,
145 unsigned_payload: ConfigValue<bool>,
147 checksum_algorithm: Option<ConfigValue<Checksum>>,
149 metadata_endpoint: Option<String>,
151 container_credentials_relative_uri: Option<String>,
153 client_options: ClientOptions,
155 credentials: Option<AwsCredentialProvider>,
157 skip_signature: ConfigValue<bool>,
159 copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
161 conditional_put: Option<ConfigValue<S3ConditionalPut>>,
163 disable_tagging: ConfigValue<bool>,
165 encryption_type: Option<ConfigValue<S3EncryptionType>>,
167 encryption_kms_key_id: Option<String>,
168 encryption_bucket_key_enabled: Option<ConfigValue<bool>>,
169}
170
171#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Serialize, Deserialize)]
183#[non_exhaustive]
184pub enum AmazonS3ConfigKey {
185 AccessKeyId,
193
194 SecretAccessKey,
202
203 Region,
211
212 DefaultRegion,
220
221 Bucket,
231
232 Endpoint,
242
243 Token,
253
254 ImdsV1Fallback,
262
263 VirtualHostedStyleRequest,
271
272 UnsignedPayload,
280
281 Checksum,
285
286 MetadataEndpoint,
294
295 ContainerCredentialsRelativeUri,
299
300 CopyIfNotExists,
304
305 ConditionalPut,
309
310 SkipSignature,
312
313 DisableTagging,
321
322 S3Express,
328
329 Client(ClientConfigKey),
331
332 Encryption(S3EncryptionConfigKey),
334}
335
336impl AsRef<str> for AmazonS3ConfigKey {
337 fn as_ref(&self) -> &str {
338 match self {
339 Self::AccessKeyId => "aws_access_key_id",
340 Self::SecretAccessKey => "aws_secret_access_key",
341 Self::Region => "aws_region",
342 Self::Bucket => "aws_bucket",
343 Self::Endpoint => "aws_endpoint",
344 Self::Token => "aws_session_token",
345 Self::ImdsV1Fallback => "aws_imdsv1_fallback",
346 Self::VirtualHostedStyleRequest => "aws_virtual_hosted_style_request",
347 Self::S3Express => "aws_s3_express",
348 Self::DefaultRegion => "aws_default_region",
349 Self::MetadataEndpoint => "aws_metadata_endpoint",
350 Self::UnsignedPayload => "aws_unsigned_payload",
351 Self::Checksum => "aws_checksum_algorithm",
352 Self::ContainerCredentialsRelativeUri => "aws_container_credentials_relative_uri",
353 Self::SkipSignature => "aws_skip_signature",
354 Self::CopyIfNotExists => "aws_copy_if_not_exists",
355 Self::ConditionalPut => "aws_conditional_put",
356 Self::DisableTagging => "aws_disable_tagging",
357 Self::Client(opt) => opt.as_ref(),
358 Self::Encryption(opt) => opt.as_ref(),
359 }
360 }
361}
362
363impl FromStr for AmazonS3ConfigKey {
364 type Err = crate::Error;
365
366 fn from_str(s: &str) -> Result<Self, Self::Err> {
367 match s {
368 "aws_access_key_id" | "access_key_id" => Ok(Self::AccessKeyId),
369 "aws_secret_access_key" | "secret_access_key" => Ok(Self::SecretAccessKey),
370 "aws_default_region" | "default_region" => Ok(Self::DefaultRegion),
371 "aws_region" | "region" => Ok(Self::Region),
372 "aws_bucket" | "aws_bucket_name" | "bucket_name" | "bucket" => Ok(Self::Bucket),
373 "aws_endpoint_url" | "aws_endpoint" | "endpoint_url" | "endpoint" => Ok(Self::Endpoint),
374 "aws_session_token" | "aws_token" | "session_token" | "token" => Ok(Self::Token),
375 "aws_virtual_hosted_style_request" | "virtual_hosted_style_request" => {
376 Ok(Self::VirtualHostedStyleRequest)
377 }
378 "aws_s3_express" | "s3_express" => Ok(Self::S3Express),
379 "aws_imdsv1_fallback" | "imdsv1_fallback" => Ok(Self::ImdsV1Fallback),
380 "aws_metadata_endpoint" | "metadata_endpoint" => Ok(Self::MetadataEndpoint),
381 "aws_unsigned_payload" | "unsigned_payload" => Ok(Self::UnsignedPayload),
382 "aws_checksum_algorithm" | "checksum_algorithm" => Ok(Self::Checksum),
383 "aws_container_credentials_relative_uri" => Ok(Self::ContainerCredentialsRelativeUri),
384 "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
385 "aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
386 "aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
387 "aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
388 "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
390 "aws_server_side_encryption" => Ok(Self::Encryption(
391 S3EncryptionConfigKey::ServerSideEncryption,
392 )),
393 "aws_sse_kms_key_id" => Ok(Self::Encryption(S3EncryptionConfigKey::KmsKeyId)),
394 "aws_sse_bucket_key_enabled" => {
395 Ok(Self::Encryption(S3EncryptionConfigKey::BucketKeyEnabled))
396 }
397 _ => match s.parse() {
398 Ok(key) => Ok(Self::Client(key)),
399 Err(_) => Err(Error::UnknownConfigurationKey { key: s.into() }.into()),
400 },
401 }
402 }
403}
404
405impl AmazonS3Builder {
406 pub fn new() -> Self {
408 Default::default()
409 }
410
411 pub fn from_env() -> Self {
430 let mut builder: Self = Default::default();
431
432 for (os_key, os_value) in std::env::vars_os() {
433 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
434 if key.starts_with("AWS_") {
435 if let Ok(config_key) = key.to_ascii_lowercase().parse() {
436 builder = builder.with_config(config_key, value);
437 }
438 }
439 }
440 }
441
442 builder
443 }
444
445 pub fn with_url(mut self, url: impl Into<String>) -> Self {
466 self.url = Some(url.into());
467 self
468 }
469
470 pub fn with_config(mut self, key: AmazonS3ConfigKey, value: impl Into<String>) -> Self {
472 match key {
473 AmazonS3ConfigKey::AccessKeyId => self.access_key_id = Some(value.into()),
474 AmazonS3ConfigKey::SecretAccessKey => self.secret_access_key = Some(value.into()),
475 AmazonS3ConfigKey::Region => self.region = Some(value.into()),
476 AmazonS3ConfigKey::Bucket => self.bucket_name = Some(value.into()),
477 AmazonS3ConfigKey::Endpoint => self.endpoint = Some(value.into()),
478 AmazonS3ConfigKey::Token => self.token = Some(value.into()),
479 AmazonS3ConfigKey::ImdsV1Fallback => self.imdsv1_fallback.parse(value),
480 AmazonS3ConfigKey::VirtualHostedStyleRequest => {
481 self.virtual_hosted_style_request.parse(value)
482 }
483 AmazonS3ConfigKey::S3Express => self.s3_express.parse(value),
484 AmazonS3ConfigKey::DefaultRegion => {
485 self.region = self.region.or_else(|| Some(value.into()))
486 }
487 AmazonS3ConfigKey::MetadataEndpoint => self.metadata_endpoint = Some(value.into()),
488 AmazonS3ConfigKey::UnsignedPayload => self.unsigned_payload.parse(value),
489 AmazonS3ConfigKey::Checksum => {
490 self.checksum_algorithm = Some(ConfigValue::Deferred(value.into()))
491 }
492 AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
493 self.container_credentials_relative_uri = Some(value.into())
494 }
495 AmazonS3ConfigKey::Client(key) => {
496 self.client_options = self.client_options.with_config(key, value)
497 }
498 AmazonS3ConfigKey::SkipSignature => self.skip_signature.parse(value),
499 AmazonS3ConfigKey::DisableTagging => self.disable_tagging.parse(value),
500 AmazonS3ConfigKey::CopyIfNotExists => {
501 self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into()))
502 }
503 AmazonS3ConfigKey::ConditionalPut => {
504 self.conditional_put = Some(ConfigValue::Deferred(value.into()))
505 }
506 AmazonS3ConfigKey::Encryption(key) => match key {
507 S3EncryptionConfigKey::ServerSideEncryption => {
508 self.encryption_type = Some(ConfigValue::Deferred(value.into()))
509 }
510 S3EncryptionConfigKey::KmsKeyId => self.encryption_kms_key_id = Some(value.into()),
511 S3EncryptionConfigKey::BucketKeyEnabled => {
512 self.encryption_bucket_key_enabled = Some(ConfigValue::Deferred(value.into()))
513 }
514 },
515 };
516 self
517 }
518
519 pub fn get_config_value(&self, key: &AmazonS3ConfigKey) -> Option<String> {
531 match key {
532 AmazonS3ConfigKey::AccessKeyId => self.access_key_id.clone(),
533 AmazonS3ConfigKey::SecretAccessKey => self.secret_access_key.clone(),
534 AmazonS3ConfigKey::Region | AmazonS3ConfigKey::DefaultRegion => self.region.clone(),
535 AmazonS3ConfigKey::Bucket => self.bucket_name.clone(),
536 AmazonS3ConfigKey::Endpoint => self.endpoint.clone(),
537 AmazonS3ConfigKey::Token => self.token.clone(),
538 AmazonS3ConfigKey::ImdsV1Fallback => Some(self.imdsv1_fallback.to_string()),
539 AmazonS3ConfigKey::VirtualHostedStyleRequest => {
540 Some(self.virtual_hosted_style_request.to_string())
541 }
542 AmazonS3ConfigKey::S3Express => Some(self.s3_express.to_string()),
543 AmazonS3ConfigKey::MetadataEndpoint => self.metadata_endpoint.clone(),
544 AmazonS3ConfigKey::UnsignedPayload => Some(self.unsigned_payload.to_string()),
545 AmazonS3ConfigKey::Checksum => {
546 self.checksum_algorithm.as_ref().map(ToString::to_string)
547 }
548 AmazonS3ConfigKey::Client(key) => self.client_options.get_config_value(key),
549 AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
550 self.container_credentials_relative_uri.clone()
551 }
552 AmazonS3ConfigKey::SkipSignature => Some(self.skip_signature.to_string()),
553 AmazonS3ConfigKey::CopyIfNotExists => {
554 self.copy_if_not_exists.as_ref().map(ToString::to_string)
555 }
556 AmazonS3ConfigKey::ConditionalPut => {
557 self.conditional_put.as_ref().map(ToString::to_string)
558 }
559 AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
560 AmazonS3ConfigKey::Encryption(key) => match key {
561 S3EncryptionConfigKey::ServerSideEncryption => {
562 self.encryption_type.as_ref().map(ToString::to_string)
563 }
564 S3EncryptionConfigKey::KmsKeyId => self.encryption_kms_key_id.clone(),
565 S3EncryptionConfigKey::BucketKeyEnabled => self
566 .encryption_bucket_key_enabled
567 .as_ref()
568 .map(ToString::to_string),
569 },
570 }
571 }
572
573 fn parse_url(&mut self, url: &str) -> Result<()> {
578 let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
579 let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
580 match parsed.scheme() {
581 "s3" | "s3a" => self.bucket_name = Some(host.to_string()),
582 "https" => match host.splitn(4, '.').collect_tuple() {
583 Some(("s3", region, "amazonaws", "com")) => {
584 self.region = Some(region.to_string());
585 let bucket = parsed.path_segments().into_iter().flatten().next();
586 if let Some(bucket) = bucket {
587 self.bucket_name = Some(bucket.into());
588 }
589 }
590 Some((bucket, "s3", region, "amazonaws.com")) => {
591 self.bucket_name = Some(bucket.to_string());
592 self.region = Some(region.to_string());
593 self.virtual_hosted_style_request = true.into();
594 }
595 Some((account, "r2", "cloudflarestorage", "com")) => {
596 self.region = Some("auto".to_string());
597 let endpoint = format!("https://{account}.r2.cloudflarestorage.com");
598 self.endpoint = Some(endpoint);
599
600 let bucket = parsed.path_segments().into_iter().flatten().next();
601 if let Some(bucket) = bucket {
602 self.bucket_name = Some(bucket.into());
603 }
604 }
605 _ => return Err(UrlNotRecognisedSnafu { url }.build().into()),
606 },
607 scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()),
608 };
609 Ok(())
610 }
611
612 pub fn with_access_key_id(mut self, access_key_id: impl Into<String>) -> Self {
614 self.access_key_id = Some(access_key_id.into());
615 self
616 }
617
618 pub fn with_secret_access_key(mut self, secret_access_key: impl Into<String>) -> Self {
620 self.secret_access_key = Some(secret_access_key.into());
621 self
622 }
623
624 pub fn with_token(mut self, token: impl Into<String>) -> Self {
626 self.token = Some(token.into());
627 self
628 }
629
630 pub fn with_region(mut self, region: impl Into<String>) -> Self {
632 self.region = Some(region.into());
633 self
634 }
635
636 pub fn with_bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
638 self.bucket_name = Some(bucket_name.into());
639 self
640 }
641
642 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
656 self.endpoint = Some(endpoint.into());
657 self
658 }
659
660 pub fn with_credentials(mut self, credentials: AwsCredentialProvider) -> Self {
662 self.credentials = Some(credentials);
663 self
664 }
665
666 pub fn with_allow_http(mut self, allow_http: bool) -> Self {
670 self.client_options = self.client_options.with_allow_http(allow_http);
671 self
672 }
673
674 pub fn with_virtual_hosted_style_request(mut self, virtual_hosted_style_request: bool) -> Self {
685 self.virtual_hosted_style_request = virtual_hosted_style_request.into();
686 self
687 }
688
689 pub fn with_s3_express(mut self, s3_express: bool) -> Self {
691 self.s3_express = s3_express.into();
692 self
693 }
694
695 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
697 self.retry_config = retry_config;
698 self
699 }
700
701 pub fn with_imdsv1_fallback(mut self) -> Self {
714 self.imdsv1_fallback = true.into();
715 self
716 }
717
718 pub fn with_unsigned_payload(mut self, unsigned_payload: bool) -> Self {
723 self.unsigned_payload = unsigned_payload.into();
724 self
725 }
726
727 pub fn with_skip_signature(mut self, skip_signature: bool) -> Self {
731 self.skip_signature = skip_signature.into();
732 self
733 }
734
735 pub fn with_checksum_algorithm(mut self, checksum_algorithm: Checksum) -> Self {
739 self.checksum_algorithm = Some(checksum_algorithm.into());
741 self
742 }
743
744 pub fn with_metadata_endpoint(mut self, endpoint: impl Into<String>) -> Self {
750 self.metadata_endpoint = Some(endpoint.into());
751 self
752 }
753
754 pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
756 self.client_options = self.client_options.with_proxy_url(proxy_url);
757 self
758 }
759
760 pub fn with_proxy_ca_certificate(mut self, proxy_ca_certificate: impl Into<String>) -> Self {
762 self.client_options = self
763 .client_options
764 .with_proxy_ca_certificate(proxy_ca_certificate);
765 self
766 }
767
768 pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) -> Self {
770 self.client_options = self.client_options.with_proxy_excludes(proxy_excludes);
771 self
772 }
773
774 pub fn with_client_options(mut self, options: ClientOptions) -> Self {
776 self.client_options = options;
777 self
778 }
779
780 pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) -> Self {
782 self.copy_if_not_exists = Some(config.into());
783 self
784 }
785
786 pub fn with_conditional_put(mut self, config: S3ConditionalPut) -> Self {
788 self.conditional_put = Some(config.into());
789 self
790 }
791
792 pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
794 self.disable_tagging = ignore.into();
795 self
796 }
797
798 pub fn with_sse_kms_encryption(mut self, kms_key_id: impl Into<String>) -> Self {
800 self.encryption_type = Some(ConfigValue::Parsed(S3EncryptionType::SseKms));
801 if let Some(kms_key_id) = kms_key_id.into().into() {
802 self.encryption_kms_key_id = Some(kms_key_id);
803 }
804 self
805 }
806
807 pub fn with_dsse_kms_encryption(mut self, kms_key_id: impl Into<String>) -> Self {
809 self.encryption_type = Some(ConfigValue::Parsed(S3EncryptionType::DsseKms));
810 if let Some(kms_key_id) = kms_key_id.into().into() {
811 self.encryption_kms_key_id = Some(kms_key_id);
812 }
813 self
814 }
815
816 pub fn with_bucket_key(mut self, enabled: bool) -> Self {
823 self.encryption_bucket_key_enabled = Some(ConfigValue::Parsed(enabled));
824 self
825 }
826
827 pub fn build(mut self) -> Result<AmazonS3> {
830 if let Some(url) = self.url.take() {
831 self.parse_url(&url)?;
832 }
833
834 let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
835 let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
836 let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
837 let copy_if_not_exists = self.copy_if_not_exists.map(|x| x.get()).transpose()?;
838 let put_precondition = self.conditional_put.map(|x| x.get()).transpose()?;
839
840 let credentials = if let Some(credentials) = self.credentials {
841 credentials
842 } else if self.access_key_id.is_some() || self.secret_access_key.is_some() {
843 match (self.access_key_id, self.secret_access_key, self.token) {
844 (Some(key_id), Some(secret_key), token) => {
845 info!("Using Static credential provider");
846 let credential = AwsCredential {
847 key_id,
848 secret_key,
849 token,
850 };
851 Arc::new(StaticCredentialProvider::new(credential)) as _
852 }
853 (None, Some(_), _) => return Err(Error::MissingAccessKeyId.into()),
854 (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
855 (None, None, _) => unreachable!(),
856 }
857 } else if let (Ok(token_path), Ok(role_arn)) = (
858 std::env::var("AWS_WEB_IDENTITY_TOKEN_FILE"),
859 std::env::var("AWS_ROLE_ARN"),
860 ) {
861 info!("Using WebIdentity credential provider");
863
864 let session_name = std::env::var("AWS_ROLE_SESSION_NAME")
865 .unwrap_or_else(|_| "WebIdentitySession".to_string());
866
867 let endpoint = format!("https://sts.{region}.amazonaws.com");
868
869 let client = self
871 .client_options
872 .clone()
873 .with_allow_http(false)
874 .client()?;
875
876 let token = WebIdentityProvider {
877 token_path,
878 session_name,
879 role_arn,
880 endpoint,
881 };
882
883 Arc::new(TokenCredentialProvider::new(
884 token,
885 client,
886 self.retry_config.clone(),
887 )) as _
888 } else if let Some(uri) = self.container_credentials_relative_uri {
889 info!("Using Task credential provider");
890 Arc::new(TaskCredentialProvider {
891 url: format!("http://169.254.170.2{uri}"),
892 retry: self.retry_config.clone(),
893 client: self.client_options.clone().with_allow_http(true).client()?,
895 cache: Default::default(),
896 }) as _
897 } else {
898 info!("Using Instance credential provider");
899
900 let token = InstanceCredentialProvider {
901 imdsv1_fallback: self.imdsv1_fallback.get()?,
902 metadata_endpoint: self
903 .metadata_endpoint
904 .unwrap_or_else(|| DEFAULT_METADATA_ENDPOINT.into()),
905 };
906
907 Arc::new(TokenCredentialProvider::new(
908 token,
909 self.client_options.metadata_client()?,
910 self.retry_config.clone(),
911 )) as _
912 };
913
914 let (session_provider, zonal_endpoint) = match self.s3_express.get()? {
915 true => {
916 let zone = parse_bucket_az(&bucket).context(ZoneSuffixSnafu { bucket: &bucket })?;
917
918 let endpoint = format!("https://{bucket}.s3express-{zone}.{region}.amazonaws.com");
920
921 let session = Arc::new(
922 TokenCredentialProvider::new(
923 SessionProvider {
924 endpoint: endpoint.clone(),
925 region: region.clone(),
926 credentials: Arc::clone(&credentials),
927 },
928 self.client_options.client()?,
929 self.retry_config.clone(),
930 )
931 .with_min_ttl(Duration::from_secs(60)), );
933 (Some(session as _), Some(endpoint))
934 }
935 false => (None, None),
936 };
937
938 let virtual_hosted = self.virtual_hosted_style_request.get()?;
941 let bucket_endpoint = match (&self.endpoint, zonal_endpoint, virtual_hosted) {
942 (Some(endpoint), _, true) => endpoint.clone(),
943 (Some(endpoint), _, false) => format!("{endpoint}/{bucket}"),
944 (None, Some(endpoint), _) => endpoint,
945 (None, None, true) => format!("https://{bucket}.s3.{region}.amazonaws.com"),
946 (None, None, false) => format!("https://s3.{region}.amazonaws.com/{bucket}"),
947 };
948
949 let encryption_headers = if let Some(encryption_type) = self.encryption_type {
950 S3EncryptionHeaders::try_new(
951 &encryption_type.get()?,
952 self.encryption_kms_key_id,
953 self.encryption_bucket_key_enabled
954 .map(|val| val.get())
955 .transpose()?,
956 )?
957 } else {
958 S3EncryptionHeaders::default()
959 };
960
961 let config = S3Config {
962 region,
963 endpoint: self.endpoint,
964 bucket,
965 bucket_endpoint,
966 credentials,
967 session_provider,
968 retry_config: self.retry_config,
969 client_options: self.client_options,
970 sign_payload: !self.unsigned_payload.get()?,
971 skip_signature: self.skip_signature.get()?,
972 disable_tagging: self.disable_tagging.get()?,
973 checksum,
974 copy_if_not_exists,
975 conditional_put: put_precondition,
976 encryption_headers,
977 };
978
979 let client = Arc::new(S3Client::new(config)?);
980
981 Ok(AmazonS3 { client })
982 }
983}
984
985fn parse_bucket_az(bucket: &str) -> Option<&str> {
989 Some(bucket.strip_suffix("--x-s3")?.rsplit_once("--")?.1)
990}
991
992#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Serialize, Deserialize)]
1003#[non_exhaustive]
1004pub enum S3EncryptionConfigKey {
1005 ServerSideEncryption,
1007 KmsKeyId,
1010 BucketKeyEnabled,
1013}
1014
1015impl AsRef<str> for S3EncryptionConfigKey {
1016 fn as_ref(&self) -> &str {
1017 match self {
1018 Self::ServerSideEncryption => "aws_server_side_encryption",
1019 Self::KmsKeyId => "aws_sse_kms_key_id",
1020 Self::BucketKeyEnabled => "aws_sse_bucket_key_enabled",
1021 }
1022 }
1023}
1024
1025#[derive(Debug, Clone)]
1026enum S3EncryptionType {
1027 S3,
1028 SseKms,
1029 DsseKms,
1030}
1031
1032impl crate::config::Parse for S3EncryptionType {
1033 fn parse(s: &str) -> Result<Self> {
1034 match s {
1035 "AES256" => Ok(Self::S3),
1036 "aws:kms" => Ok(Self::SseKms),
1037 "aws:kms:dsse" => Ok(Self::DsseKms),
1038 _ => Err(Error::InvalidEncryptionType { passed: s.into() }.into()),
1039 }
1040 }
1041}
1042
1043impl From<&S3EncryptionType> for &'static str {
1044 fn from(value: &S3EncryptionType) -> Self {
1045 match value {
1046 S3EncryptionType::S3 => "AES256",
1047 S3EncryptionType::SseKms => "aws:kms",
1048 S3EncryptionType::DsseKms => "aws:kms:dsse",
1049 }
1050 }
1051}
1052
1053impl std::fmt::Display for S3EncryptionType {
1054 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1055 f.write_str(self.into())
1056 }
1057}
1058
1059#[derive(Default, Clone, Debug)]
1065pub struct S3EncryptionHeaders(HeaderMap);
1066
1067impl S3EncryptionHeaders {
1068 fn try_new(
1069 encryption_type: &S3EncryptionType,
1070 key_id: Option<String>,
1071 bucket_key_enabled: Option<bool>,
1072 ) -> Result<Self> {
1073 let mut headers = HeaderMap::new();
1074 headers.insert(
1077 "x-amz-server-side-encryption",
1078 HeaderValue::from_static(encryption_type.into()),
1079 );
1080 if let Some(key_id) = key_id {
1081 headers.insert(
1082 "x-amz-server-side-encryption-aws-kms-key-id",
1083 key_id
1084 .try_into()
1085 .map_err(|err| Error::InvalidEncryptionHeader {
1086 header: "kms-key-id",
1087 source: Box::new(err),
1088 })?,
1089 );
1090 }
1091 if let Some(bucket_key_enabled) = bucket_key_enabled {
1092 headers.insert(
1093 "x-amz-server-side-encryption-bucket-key-enabled",
1094 HeaderValue::from_static(if bucket_key_enabled { "true" } else { "false" }),
1095 );
1096 }
1097 Ok(Self(headers))
1098 }
1099}
1100
1101impl From<S3EncryptionHeaders> for HeaderMap {
1102 fn from(headers: S3EncryptionHeaders) -> Self {
1103 headers.0
1104 }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use std::collections::HashMap;
1111
1112 #[test]
1113 fn s3_test_config_from_map() {
1114 let aws_access_key_id = "object_store:fake_access_key_id".to_string();
1115 let aws_secret_access_key = "object_store:fake_secret_key".to_string();
1116 let aws_default_region = "object_store:fake_default_region".to_string();
1117 let aws_endpoint = "object_store:fake_endpoint".to_string();
1118 let aws_session_token = "object_store:fake_session_token".to_string();
1119 let options = HashMap::from([
1120 ("aws_access_key_id", aws_access_key_id.clone()),
1121 ("aws_secret_access_key", aws_secret_access_key),
1122 ("aws_default_region", aws_default_region.clone()),
1123 ("aws_endpoint", aws_endpoint.clone()),
1124 ("aws_session_token", aws_session_token.clone()),
1125 ("aws_unsigned_payload", "true".to_string()),
1126 ("aws_checksum_algorithm", "sha256".to_string()),
1127 ]);
1128
1129 let builder = options
1130 .into_iter()
1131 .fold(AmazonS3Builder::new(), |builder, (key, value)| {
1132 builder.with_config(key.parse().unwrap(), value)
1133 })
1134 .with_config(AmazonS3ConfigKey::SecretAccessKey, "new-secret-key");
1135
1136 assert_eq!(builder.access_key_id.unwrap(), aws_access_key_id.as_str());
1137 assert_eq!(builder.secret_access_key.unwrap(), "new-secret-key");
1138 assert_eq!(builder.region.unwrap(), aws_default_region);
1139 assert_eq!(builder.endpoint.unwrap(), aws_endpoint);
1140 assert_eq!(builder.token.unwrap(), aws_session_token);
1141 assert_eq!(
1142 builder.checksum_algorithm.unwrap().get().unwrap(),
1143 Checksum::SHA256
1144 );
1145 assert!(builder.unsigned_payload.get().unwrap());
1146 }
1147
1148 #[test]
1149 fn s3_test_config_get_value() {
1150 let aws_access_key_id = "object_store:fake_access_key_id".to_string();
1151 let aws_secret_access_key = "object_store:fake_secret_key".to_string();
1152 let aws_default_region = "object_store:fake_default_region".to_string();
1153 let aws_endpoint = "object_store:fake_endpoint".to_string();
1154 let aws_session_token = "object_store:fake_session_token".to_string();
1155
1156 let builder = AmazonS3Builder::new()
1157 .with_config(AmazonS3ConfigKey::AccessKeyId, &aws_access_key_id)
1158 .with_config(AmazonS3ConfigKey::SecretAccessKey, &aws_secret_access_key)
1159 .with_config(AmazonS3ConfigKey::DefaultRegion, &aws_default_region)
1160 .with_config(AmazonS3ConfigKey::Endpoint, &aws_endpoint)
1161 .with_config(AmazonS3ConfigKey::Token, &aws_session_token)
1162 .with_config(AmazonS3ConfigKey::UnsignedPayload, "true")
1163 .with_config("aws_server_side_encryption".parse().unwrap(), "AES256")
1164 .with_config("aws_sse_kms_key_id".parse().unwrap(), "some_key_id")
1165 .with_config("aws_sse_bucket_key_enabled".parse().unwrap(), "true");
1166
1167 assert_eq!(
1168 builder
1169 .get_config_value(&AmazonS3ConfigKey::AccessKeyId)
1170 .unwrap(),
1171 aws_access_key_id
1172 );
1173 assert_eq!(
1174 builder
1175 .get_config_value(&AmazonS3ConfigKey::SecretAccessKey)
1176 .unwrap(),
1177 aws_secret_access_key
1178 );
1179 assert_eq!(
1180 builder
1181 .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
1182 .unwrap(),
1183 aws_default_region
1184 );
1185 assert_eq!(
1186 builder
1187 .get_config_value(&AmazonS3ConfigKey::Endpoint)
1188 .unwrap(),
1189 aws_endpoint
1190 );
1191 assert_eq!(
1192 builder.get_config_value(&AmazonS3ConfigKey::Token).unwrap(),
1193 aws_session_token
1194 );
1195 assert_eq!(
1196 builder
1197 .get_config_value(&AmazonS3ConfigKey::UnsignedPayload)
1198 .unwrap(),
1199 "true"
1200 );
1201 assert_eq!(
1202 builder
1203 .get_config_value(&"aws_server_side_encryption".parse().unwrap())
1204 .unwrap(),
1205 "AES256"
1206 );
1207 assert_eq!(
1208 builder
1209 .get_config_value(&"aws_sse_kms_key_id".parse().unwrap())
1210 .unwrap(),
1211 "some_key_id"
1212 );
1213 assert_eq!(
1214 builder
1215 .get_config_value(&"aws_sse_bucket_key_enabled".parse().unwrap())
1216 .unwrap(),
1217 "true"
1218 );
1219 }
1220
1221 #[test]
1222 fn s3_default_region() {
1223 let builder = AmazonS3Builder::new()
1224 .with_bucket_name("foo")
1225 .build()
1226 .unwrap();
1227 assert_eq!(builder.client.config.region, "us-east-1");
1228 }
1229
1230 #[test]
1231 fn s3_test_urls() {
1232 let mut builder = AmazonS3Builder::new();
1233 builder.parse_url("s3://bucket/path").unwrap();
1234 assert_eq!(builder.bucket_name, Some("bucket".to_string()));
1235
1236 let mut builder = AmazonS3Builder::new();
1237 builder
1238 .parse_url("s3://buckets.can.have.dots/path")
1239 .unwrap();
1240 assert_eq!(
1241 builder.bucket_name,
1242 Some("buckets.can.have.dots".to_string())
1243 );
1244
1245 let mut builder = AmazonS3Builder::new();
1246 builder
1247 .parse_url("https://s3.region.amazonaws.com")
1248 .unwrap();
1249 assert_eq!(builder.region, Some("region".to_string()));
1250
1251 let mut builder = AmazonS3Builder::new();
1252 builder
1253 .parse_url("https://s3.region.amazonaws.com/bucket")
1254 .unwrap();
1255 assert_eq!(builder.region, Some("region".to_string()));
1256 assert_eq!(builder.bucket_name, Some("bucket".to_string()));
1257
1258 let mut builder = AmazonS3Builder::new();
1259 builder
1260 .parse_url("https://s3.region.amazonaws.com/bucket.with.dot/path")
1261 .unwrap();
1262 assert_eq!(builder.region, Some("region".to_string()));
1263 assert_eq!(builder.bucket_name, Some("bucket.with.dot".to_string()));
1264
1265 let mut builder = AmazonS3Builder::new();
1266 builder
1267 .parse_url("https://bucket.s3.region.amazonaws.com")
1268 .unwrap();
1269 assert_eq!(builder.bucket_name, Some("bucket".to_string()));
1270 assert_eq!(builder.region, Some("region".to_string()));
1271 assert!(builder.virtual_hosted_style_request.get().unwrap());
1272
1273 let mut builder = AmazonS3Builder::new();
1274 builder
1275 .parse_url("https://account123.r2.cloudflarestorage.com/bucket-123")
1276 .unwrap();
1277
1278 assert_eq!(builder.bucket_name, Some("bucket-123".to_string()));
1279 assert_eq!(builder.region, Some("auto".to_string()));
1280 assert_eq!(
1281 builder.endpoint,
1282 Some("https://account123.r2.cloudflarestorage.com".to_string())
1283 );
1284
1285 let err_cases = [
1286 "mailto://bucket/path",
1287 "https://s3.bucket.mydomain.com",
1288 "https://s3.bucket.foo.amazonaws.com",
1289 "https://bucket.mydomain.region.amazonaws.com",
1290 "https://bucket.s3.region.bar.amazonaws.com",
1291 "https://bucket.foo.s3.amazonaws.com",
1292 ];
1293 let mut builder = AmazonS3Builder::new();
1294 for case in err_cases {
1295 builder.parse_url(case).unwrap_err();
1296 }
1297 }
1298
1299 #[tokio::test]
1300 async fn s3_test_proxy_url() {
1301 let s3 = AmazonS3Builder::new()
1302 .with_access_key_id("access_key_id")
1303 .with_secret_access_key("secret_access_key")
1304 .with_region("region")
1305 .with_bucket_name("bucket_name")
1306 .with_allow_http(true)
1307 .with_proxy_url("https://example.com")
1308 .build();
1309
1310 assert!(s3.is_ok());
1311
1312 let err = AmazonS3Builder::new()
1313 .with_access_key_id("access_key_id")
1314 .with_secret_access_key("secret_access_key")
1315 .with_region("region")
1316 .with_bucket_name("bucket_name")
1317 .with_allow_http(true)
1318 .with_proxy_url("asdf://example.com")
1319 .build()
1320 .unwrap_err()
1321 .to_string();
1322
1323 assert_eq!("Generic HTTP client error: builder error", err);
1324 }
1325
1326 #[test]
1327 fn test_invalid_config() {
1328 let err = AmazonS3Builder::new()
1329 .with_config(AmazonS3ConfigKey::ImdsV1Fallback, "enabled")
1330 .with_bucket_name("bucket")
1331 .with_region("region")
1332 .build()
1333 .unwrap_err()
1334 .to_string();
1335
1336 assert_eq!(
1337 err,
1338 "Generic Config error: failed to parse \"enabled\" as boolean"
1339 );
1340
1341 let err = AmazonS3Builder::new()
1342 .with_config(AmazonS3ConfigKey::Checksum, "md5")
1343 .with_bucket_name("bucket")
1344 .with_region("region")
1345 .build()
1346 .unwrap_err()
1347 .to_string();
1348
1349 assert_eq!(
1350 err,
1351 "Generic Config error: \"md5\" is not a valid checksum algorithm"
1352 );
1353 }
1354
1355 #[test]
1356 fn test_parse_bucket_az() {
1357 let cases = [
1358 ("bucket-base-name--usw2-az1--x-s3", Some("usw2-az1")),
1359 ("bucket-base--name--azid--x-s3", Some("azid")),
1360 ("bucket-base-name", None),
1361 ("bucket-base-name--x-s3", None),
1362 ];
1363
1364 for (bucket, expected) in cases {
1365 assert_eq!(parse_bucket_az(bucket), expected)
1366 }
1367 }
1368}