1use crate::aws::builder::S3EncryptionHeaders;
19use crate::aws::checksum::Checksum;
20use crate::aws::credential::{AwsCredential, CredentialExt};
21use crate::aws::{
22 AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, COPY_SOURCE_HEADER,
23 STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER,
24};
25use crate::client::get::GetClient;
26use crate::client::header::{get_etag, HeaderConfig};
27use crate::client::header::{get_put_result, get_version};
28use crate::client::list::ListClient;
29use crate::client::retry::RetryExt;
30use crate::client::s3::{
31 CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
32 ListResponse,
33};
34use crate::client::GetOptionsExt;
35use crate::multipart::PartId;
36use crate::path::DELIMITER;
37use crate::{
38 Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path,
39 PutMultipartOpts, PutPayload, PutResult, Result, RetryConfig, TagSet,
40};
41use async_trait::async_trait;
42use base64::prelude::BASE64_STANDARD;
43use base64::Engine;
44use bytes::{Buf, Bytes};
45use hyper::header::{
46 CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
47 CONTENT_TYPE,
48};
49use hyper::http::HeaderName;
50use hyper::{http, HeaderMap};
51use itertools::Itertools;
52use md5::{Digest, Md5};
53use percent_encoding::{utf8_percent_encode, PercentEncode};
54use quick_xml::events::{self as xml_events};
55use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response};
56use ring::digest;
57use ring::digest::Context;
58use serde::{Deserialize, Serialize};
59use snafu::{ResultExt, Snafu};
60use std::sync::Arc;
61
62const VERSION_HEADER: &str = "x-amz-version-id";
63const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
64const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
65
66#[derive(Debug, Snafu)]
68#[allow(missing_docs)]
69pub(crate) enum Error {
70 #[snafu(display("Error performing DeleteObjects request: {}", source))]
71 DeleteObjectsRequest { source: crate::client::retry::Error },
72
73 #[snafu(display(
74 "DeleteObjects request failed for key {}: {} (code: {})",
75 path,
76 message,
77 code
78 ))]
79 DeleteFailed {
80 path: String,
81 code: String,
82 message: String,
83 },
84
85 #[snafu(display("Error getting DeleteObjects response body: {}", source))]
86 DeleteObjectsResponse { source: reqwest::Error },
87
88 #[snafu(display("Got invalid DeleteObjects response: {}", source))]
89 InvalidDeleteObjectsResponse {
90 source: Box<dyn std::error::Error + Send + Sync + 'static>,
91 },
92
93 #[snafu(display("Error performing list request: {}", source))]
94 ListRequest { source: crate::client::retry::Error },
95
96 #[snafu(display("Error getting list response body: {}", source))]
97 ListResponseBody { source: reqwest::Error },
98
99 #[snafu(display("Error getting create multipart response body: {}", source))]
100 CreateMultipartResponseBody { source: reqwest::Error },
101
102 #[snafu(display("Error performing complete multipart request: {}", source))]
103 CompleteMultipartRequest { source: crate::client::retry::Error },
104
105 #[snafu(display("Error getting complete multipart response body: {}", source))]
106 CompleteMultipartResponseBody { source: reqwest::Error },
107
108 #[snafu(display("Got invalid list response: {}", source))]
109 InvalidListResponse { source: quick_xml::de::DeError },
110
111 #[snafu(display("Got invalid multipart response: {}", source))]
112 InvalidMultipartResponse { source: quick_xml::de::DeError },
113
114 #[snafu(display("Unable to extract metadata from headers: {}", source))]
115 Metadata {
116 source: crate::client::header::Error,
117 },
118}
119
120impl From<Error> for crate::Error {
121 fn from(err: Error) -> Self {
122 Self::Generic {
123 store: STORE,
124 source: Box::new(err),
125 }
126 }
127}
128
129#[derive(Deserialize)]
130#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
131struct BatchDeleteResponse {
132 #[serde(rename = "$value")]
133 content: Vec<DeleteObjectResult>,
134}
135
136#[derive(Deserialize)]
137enum DeleteObjectResult {
138 #[allow(unused)]
139 Deleted(DeletedObject),
140 Error(DeleteError),
141}
142
143#[derive(Deserialize)]
144#[serde(rename_all = "PascalCase", rename = "Deleted")]
145struct DeletedObject {
146 #[allow(dead_code)]
147 key: String,
148}
149
150#[derive(Deserialize)]
151#[serde(rename_all = "PascalCase", rename = "Error")]
152struct DeleteError {
153 key: String,
154 code: String,
155 message: String,
156}
157
158impl From<DeleteError> for Error {
159 fn from(err: DeleteError) -> Self {
160 Self::DeleteFailed {
161 path: err.key,
162 code: err.code,
163 message: err.message,
164 }
165 }
166}
167
168#[derive(Debug)]
169pub struct S3Config {
170 pub region: String,
171 pub endpoint: Option<String>,
172 pub bucket: String,
173 pub bucket_endpoint: String,
174 pub credentials: AwsCredentialProvider,
175 pub session_provider: Option<AwsCredentialProvider>,
176 pub retry_config: RetryConfig,
177 pub client_options: ClientOptions,
178 pub sign_payload: bool,
179 pub skip_signature: bool,
180 pub disable_tagging: bool,
181 pub checksum: Option<Checksum>,
182 pub copy_if_not_exists: Option<S3CopyIfNotExists>,
183 pub conditional_put: Option<S3ConditionalPut>,
184 pub encryption_headers: S3EncryptionHeaders,
185}
186
187impl S3Config {
188 pub(crate) fn path_url(&self, path: &Path) -> String {
189 format!("{}/{}", self.bucket_endpoint, encode_path(path))
190 }
191
192 async fn get_session_credential(&self) -> Result<SessionCredential<'_>> {
193 let credential = match self.skip_signature {
194 false => {
195 let provider = self.session_provider.as_ref().unwrap_or(&self.credentials);
196 Some(provider.get_credential().await?)
197 }
198 true => None,
199 };
200
201 Ok(SessionCredential {
202 credential,
203 session_token: self.session_provider.is_some(),
204 config: self,
205 })
206 }
207
208 pub(crate) async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
209 Ok(match self.skip_signature {
210 false => Some(self.credentials.get_credential().await?),
211 true => None,
212 })
213 }
214
215 #[inline]
216 pub(crate) fn is_s3_express(&self) -> bool {
217 self.session_provider.is_some()
218 }
219}
220
221struct SessionCredential<'a> {
222 credential: Option<Arc<AwsCredential>>,
223 session_token: bool,
224 config: &'a S3Config,
225}
226
227impl<'a> SessionCredential<'a> {
228 fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
229 let mut authorizer =
230 AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
231 .with_sign_payload(self.config.sign_payload);
232
233 if self.session_token {
234 let token = HeaderName::from_static("x-amz-s3session-token");
235 authorizer = authorizer.with_token_header(token)
236 }
237
238 Some(authorizer)
239 }
240}
241
242#[derive(Debug, Snafu)]
243pub enum RequestError {
244 #[snafu(context(false))]
245 Generic { source: crate::Error },
246 Retry {
247 source: crate::client::retry::Error,
248 path: String,
249 },
250}
251
252impl From<RequestError> for crate::Error {
253 fn from(value: RequestError) -> Self {
254 match value {
255 RequestError::Generic { source } => source,
256 RequestError::Retry { source, path } => source.error(STORE, path),
257 }
258 }
259}
260
261pub(crate) struct Request<'a> {
263 path: &'a Path,
264 config: &'a S3Config,
265 builder: RequestBuilder,
266 payload_sha256: Option<digest::Digest>,
267 payload: Option<PutPayload>,
268 use_session_creds: bool,
269 idempotent: bool,
270}
271
272impl<'a> Request<'a> {
273 pub fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
274 let builder = self.builder.query(query);
275 Self { builder, ..self }
276 }
277
278 pub fn header<K>(self, k: K, v: &str) -> Self
279 where
280 HeaderName: TryFrom<K>,
281 <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
282 {
283 let builder = self.builder.header(k, v);
284 Self { builder, ..self }
285 }
286
287 pub fn headers(self, headers: HeaderMap) -> Self {
288 let builder = self.builder.headers(headers);
289 Self { builder, ..self }
290 }
291
292 pub fn idempotent(self, idempotent: bool) -> Self {
293 Self { idempotent, ..self }
294 }
295
296 pub fn with_encryption_headers(self) -> Self {
297 let headers = self.config.encryption_headers.clone().into();
298 let builder = self.builder.headers(headers);
299 Self { builder, ..self }
300 }
301
302 pub fn with_session_creds(self, use_session_creds: bool) -> Self {
303 Self {
304 use_session_creds,
305 ..self
306 }
307 }
308
309 pub fn with_tags(mut self, tags: TagSet) -> Self {
310 let tags = tags.encoded();
311 if !tags.is_empty() && !self.config.disable_tagging {
312 self.builder = self.builder.header(&TAGS_HEADER, tags);
313 }
314 self
315 }
316
317 pub fn with_attributes(self, attributes: Attributes) -> Self {
318 let mut has_content_type = false;
319 let mut builder = self.builder;
320 for (k, v) in &attributes {
321 builder = match k {
322 Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
323 Attribute::ContentDisposition => builder.header(CONTENT_DISPOSITION, v.as_ref()),
324 Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
325 Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
326 Attribute::ContentType => {
327 has_content_type = true;
328 builder.header(CONTENT_TYPE, v.as_ref())
329 }
330 Attribute::Metadata(k_suffix) => builder.header(
331 &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix),
332 v.as_ref(),
333 ),
334 };
335 }
336
337 if !has_content_type {
338 if let Some(value) = self.config.client_options.get_content_type(self.path) {
339 builder = builder.header(CONTENT_TYPE, value);
340 }
341 }
342 Self { builder, ..self }
343 }
344
345 pub fn with_payload(mut self, payload: PutPayload) -> Self {
346 if !self.config.skip_signature || self.config.checksum.is_some() {
347 let mut sha256 = Context::new(&digest::SHA256);
348 payload.iter().for_each(|x| sha256.update(x));
349 let payload_sha256 = sha256.finish();
350
351 if let Some(Checksum::SHA256) = self.config.checksum {
352 self.builder = self.builder.header(
353 "x-amz-checksum-sha256",
354 BASE64_STANDARD.encode(payload_sha256),
355 );
356 }
357 self.payload_sha256 = Some(payload_sha256);
358 }
359
360 let content_length = payload.content_length();
361 self.builder = self.builder.header(CONTENT_LENGTH, content_length);
362 self.payload = Some(payload);
363 self
364 }
365
366 pub async fn send(self) -> Result<Response, RequestError> {
367 let credential = match self.use_session_creds {
368 true => self.config.get_session_credential().await?,
369 false => SessionCredential {
370 credential: self.config.get_credential().await?,
371 session_token: false,
372 config: self.config,
373 },
374 };
375
376 let sha = self.payload_sha256.as_ref().map(|x| x.as_ref());
377
378 let path = self.path.as_ref();
379 self.builder
380 .with_aws_sigv4(credential.authorizer(), sha)
381 .retryable(&self.config.retry_config)
382 .idempotent(self.idempotent)
383 .payload(self.payload)
384 .send()
385 .await
386 .context(RetrySnafu { path })
387 }
388
389 pub async fn do_put(self) -> Result<PutResult> {
390 let response = self.send().await?;
391 Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
392 }
393}
394
395#[derive(Debug)]
396pub(crate) struct S3Client {
397 pub config: S3Config,
398 pub client: ReqwestClient,
399}
400
401impl S3Client {
402 pub fn new(config: S3Config) -> Result<Self> {
403 let client = config.client_options.client()?;
404 Ok(Self { config, client })
405 }
406
407 pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
408 let url = self.config.path_url(path);
409 Request {
410 path,
411 builder: self.client.request(method, url),
412 payload: None,
413 payload_sha256: None,
414 config: &self.config,
415 use_session_creds: true,
416 idempotent: false,
417 }
418 }
419
420 pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> Result<Vec<Result<Path>>> {
428 if paths.is_empty() {
429 return Ok(Vec::new());
430 }
431
432 let credential = self.config.get_session_credential().await?;
433 let url = format!("{}?delete", self.config.bucket_endpoint);
434
435 let mut buffer = Vec::new();
436 let mut writer = quick_xml::Writer::new(&mut buffer);
437 writer
438 .write_event(xml_events::Event::Start(
439 xml_events::BytesStart::new("Delete")
440 .with_attributes([("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")]),
441 ))
442 .unwrap();
443 for path in &paths {
444 writer
446 .write_event(xml_events::Event::Start(xml_events::BytesStart::new(
447 "Object",
448 )))
449 .unwrap();
450 writer
451 .write_event(xml_events::Event::Start(xml_events::BytesStart::new("Key")))
452 .unwrap();
453 writer
454 .write_event(xml_events::Event::Text(xml_events::BytesText::new(
455 path.as_ref(),
456 )))
457 .map_err(|err| crate::Error::Generic {
458 store: STORE,
459 source: Box::new(err),
460 })?;
461 writer
462 .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Key")))
463 .unwrap();
464 writer
465 .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Object")))
466 .unwrap();
467 }
468 writer
469 .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Delete")))
470 .unwrap();
471
472 let body = Bytes::from(buffer);
473
474 let mut builder = self.client.request(Method::POST, url);
475
476 let digest = digest::digest(&digest::SHA256, &body);
477 builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest));
478
479 let mut hasher = Md5::new();
484 hasher.update(&body);
485 builder = builder.header("Content-MD5", BASE64_STANDARD.encode(hasher.finalize()));
486
487 let response = builder
488 .header(CONTENT_TYPE, "application/xml")
489 .body(body)
490 .with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
491 .send_retry(&self.config.retry_config)
492 .await
493 .context(DeleteObjectsRequestSnafu {})?
494 .bytes()
495 .await
496 .context(DeleteObjectsResponseSnafu {})?;
497
498 let response: BatchDeleteResponse =
499 quick_xml::de::from_reader(response.reader()).map_err(|err| {
500 Error::InvalidDeleteObjectsResponse {
501 source: Box::new(err),
502 }
503 })?;
504
505 let mut results: Vec<Result<Path>> = paths.iter().cloned().map(Ok).collect();
508 for content in response.content.into_iter() {
509 if let DeleteObjectResult::Error(error) = content {
510 let path =
511 Path::parse(&error.key).map_err(|err| Error::InvalidDeleteObjectsResponse {
512 source: Box::new(err),
513 })?;
514 let i = paths.iter().find_position(|&p| p == &path).unwrap().0;
515 results[i] = Err(Error::from(error).into());
516 }
517 }
518
519 Ok(results)
520 }
521
522 pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> Request<'a> {
524 let source = format!("{}/{}", self.config.bucket, encode_path(from));
525 self.request(Method::PUT, to)
526 .idempotent(true)
527 .header(©_SOURCE_HEADER, &source)
528 .headers(self.config.encryption_headers.clone().into())
529 .with_session_creds(false)
530 }
531
532 pub async fn create_multipart(
533 &self,
534 location: &Path,
535 opts: PutMultipartOpts,
536 ) -> Result<MultipartId> {
537 let response = self
538 .request(Method::POST, location)
539 .query(&[("uploads", "")])
540 .with_encryption_headers()
541 .with_attributes(opts.attributes)
542 .with_tags(opts.tags)
543 .idempotent(true)
544 .send()
545 .await?
546 .bytes()
547 .await
548 .context(CreateMultipartResponseBodySnafu)?;
549
550 let response: InitiateMultipartUploadResult =
551 quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?;
552
553 Ok(response.upload_id)
554 }
555
556 pub async fn put_part(
557 &self,
558 path: &Path,
559 upload_id: &MultipartId,
560 part_idx: usize,
561 data: PutPayload,
562 ) -> Result<PartId> {
563 let part = (part_idx + 1).to_string();
564
565 let response = self
566 .request(Method::PUT, path)
567 .with_payload(data)
568 .query(&[("partNumber", &part), ("uploadId", upload_id)])
569 .idempotent(true)
570 .send()
571 .await?;
572
573 let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
574 Ok(PartId { content_id })
575 }
576
577 pub async fn complete_multipart(
578 &self,
579 location: &Path,
580 upload_id: &str,
581 parts: Vec<PartId>,
582 ) -> Result<PutResult> {
583 let parts = if parts.is_empty() {
584 let part = self
587 .put_part(location, &upload_id.to_string(), 0, PutPayload::default())
588 .await?;
589 vec![part]
590 } else {
591 parts
592 };
593 let request = CompleteMultipartUpload::from(parts);
594 let body = quick_xml::se::to_string(&request).unwrap();
595
596 let credential = self.config.get_session_credential().await?;
597 let url = self.config.path_url(location);
598
599 let response = self
600 .client
601 .request(Method::POST, url)
602 .query(&[("uploadId", upload_id)])
603 .body(body)
604 .with_aws_sigv4(credential.authorizer(), None)
605 .retryable(&self.config.retry_config)
606 .idempotent(true)
607 .send()
608 .await
609 .context(CompleteMultipartRequestSnafu)?;
610
611 let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
612
613 let data = response
614 .bytes()
615 .await
616 .context(CompleteMultipartResponseBodySnafu)?;
617
618 let response: CompleteMultipartUploadResult =
619 quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
620
621 Ok(PutResult {
622 e_tag: Some(response.e_tag),
623 version,
624 })
625 }
626
627 #[cfg(test)]
628 pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
629 let credential = self.config.get_session_credential().await?;
630 let url = format!("{}?tagging", self.config.path_url(path));
631 let response = self
632 .client
633 .request(Method::GET, url)
634 .with_aws_sigv4(credential.authorizer(), None)
635 .send_retry(&self.config.retry_config)
636 .await
637 .map_err(|e| e.error(STORE, path.to_string()))?;
638 Ok(response)
639 }
640}
641
642#[async_trait]
643impl GetClient for S3Client {
644 const STORE: &'static str = STORE;
645
646 const HEADER_CONFIG: HeaderConfig = HeaderConfig {
647 etag_required: false,
648 last_modified_required: false,
649 version_header: Some(VERSION_HEADER),
650 user_defined_metadata_prefix: Some(USER_DEFINED_METADATA_HEADER_PREFIX),
651 };
652
653 async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
655 let credential = self.config.get_session_credential().await?;
656 let url = self.config.path_url(path);
657 let method = match options.head {
658 true => Method::HEAD,
659 false => Method::GET,
660 };
661
662 let mut builder = self.client.request(method, url);
663
664 if let Some(v) = &options.version {
665 builder = builder.query(&[("versionId", v)])
666 }
667
668 let response = builder
669 .with_get_options(options)
670 .with_aws_sigv4(credential.authorizer(), None)
671 .send_retry(&self.config.retry_config)
672 .await
673 .map_err(|e| e.error(STORE, path.to_string()))?;
674
675 Ok(response)
676 }
677}
678
679#[async_trait]
680impl ListClient for S3Client {
681 async fn list_request(
683 &self,
684 prefix: Option<&str>,
685 delimiter: bool,
686 token: Option<&str>,
687 offset: Option<&str>,
688 ) -> Result<(ListResult, Option<String>)> {
689 let credential = self.config.get_session_credential().await?;
690 let url = self.config.bucket_endpoint.clone();
691
692 let mut query = Vec::with_capacity(4);
693
694 if let Some(token) = token {
695 query.push(("continuation-token", token))
696 }
697
698 if delimiter {
699 query.push(("delimiter", DELIMITER))
700 }
701
702 query.push(("list-type", "2"));
703
704 if let Some(prefix) = prefix {
705 query.push(("prefix", prefix))
706 }
707
708 if let Some(offset) = offset {
709 query.push(("start-after", offset))
710 }
711
712 let response = self
713 .client
714 .request(Method::GET, &url)
715 .query(&query)
716 .with_aws_sigv4(credential.authorizer(), None)
717 .send_retry(&self.config.retry_config)
718 .await
719 .context(ListRequestSnafu)?
720 .bytes()
721 .await
722 .context(ListResponseBodySnafu)?;
723
724 let mut response: ListResponse =
725 quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
726 let token = response.next_continuation_token.take();
727
728 Ok((response.try_into()?, token))
729 }
730}
731
732fn encode_path(path: &Path) -> PercentEncode<'_> {
733 utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
734}