object_store/aws/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aws::builder::S3EncryptionHeaders;
19use crate::aws::checksum::Checksum;
20use crate::aws::credential::{AwsCredential, CredentialExt};
21use crate::aws::{
22    AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, COPY_SOURCE_HEADER,
23    STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER,
24};
25use crate::client::get::GetClient;
26use crate::client::header::{get_etag, HeaderConfig};
27use crate::client::header::{get_put_result, get_version};
28use crate::client::list::ListClient;
29use crate::client::retry::RetryExt;
30use crate::client::s3::{
31    CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
32    ListResponse,
33};
34use crate::client::GetOptionsExt;
35use crate::multipart::PartId;
36use crate::path::DELIMITER;
37use crate::{
38    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path,
39    PutMultipartOpts, PutPayload, PutResult, Result, RetryConfig, TagSet,
40};
41use async_trait::async_trait;
42use base64::prelude::BASE64_STANDARD;
43use base64::Engine;
44use bytes::{Buf, Bytes};
45use hyper::header::{
46    CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
47    CONTENT_TYPE,
48};
49use hyper::http::HeaderName;
50use hyper::{http, HeaderMap};
51use itertools::Itertools;
52use md5::{Digest, Md5};
53use percent_encoding::{utf8_percent_encode, PercentEncode};
54use quick_xml::events::{self as xml_events};
55use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response};
56use ring::digest;
57use ring::digest::Context;
58use serde::{Deserialize, Serialize};
59use snafu::{ResultExt, Snafu};
60use std::sync::Arc;
61
62const VERSION_HEADER: &str = "x-amz-version-id";
63const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
64const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
65
66/// A specialized `Error` for object store-related errors
67#[derive(Debug, Snafu)]
68#[allow(missing_docs)]
69pub(crate) enum Error {
70    #[snafu(display("Error performing DeleteObjects request: {}", source))]
71    DeleteObjectsRequest { source: crate::client::retry::Error },
72
73    #[snafu(display(
74        "DeleteObjects request failed for key {}: {} (code: {})",
75        path,
76        message,
77        code
78    ))]
79    DeleteFailed {
80        path: String,
81        code: String,
82        message: String,
83    },
84
85    #[snafu(display("Error getting DeleteObjects response body: {}", source))]
86    DeleteObjectsResponse { source: reqwest::Error },
87
88    #[snafu(display("Got invalid DeleteObjects response: {}", source))]
89    InvalidDeleteObjectsResponse {
90        source: Box<dyn std::error::Error + Send + Sync + 'static>,
91    },
92
93    #[snafu(display("Error performing list request: {}", source))]
94    ListRequest { source: crate::client::retry::Error },
95
96    #[snafu(display("Error getting list response body: {}", source))]
97    ListResponseBody { source: reqwest::Error },
98
99    #[snafu(display("Error getting create multipart response body: {}", source))]
100    CreateMultipartResponseBody { source: reqwest::Error },
101
102    #[snafu(display("Error performing complete multipart request: {}", source))]
103    CompleteMultipartRequest { source: crate::client::retry::Error },
104
105    #[snafu(display("Error getting complete multipart response body: {}", source))]
106    CompleteMultipartResponseBody { source: reqwest::Error },
107
108    #[snafu(display("Got invalid list response: {}", source))]
109    InvalidListResponse { source: quick_xml::de::DeError },
110
111    #[snafu(display("Got invalid multipart response: {}", source))]
112    InvalidMultipartResponse { source: quick_xml::de::DeError },
113
114    #[snafu(display("Unable to extract metadata from headers: {}", source))]
115    Metadata {
116        source: crate::client::header::Error,
117    },
118}
119
120impl From<Error> for crate::Error {
121    fn from(err: Error) -> Self {
122        Self::Generic {
123            store: STORE,
124            source: Box::new(err),
125        }
126    }
127}
128
129#[derive(Deserialize)]
130#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
131struct BatchDeleteResponse {
132    #[serde(rename = "$value")]
133    content: Vec<DeleteObjectResult>,
134}
135
136#[derive(Deserialize)]
137enum DeleteObjectResult {
138    #[allow(unused)]
139    Deleted(DeletedObject),
140    Error(DeleteError),
141}
142
143#[derive(Deserialize)]
144#[serde(rename_all = "PascalCase", rename = "Deleted")]
145struct DeletedObject {
146    #[allow(dead_code)]
147    key: String,
148}
149
150#[derive(Deserialize)]
151#[serde(rename_all = "PascalCase", rename = "Error")]
152struct DeleteError {
153    key: String,
154    code: String,
155    message: String,
156}
157
158impl From<DeleteError> for Error {
159    fn from(err: DeleteError) -> Self {
160        Self::DeleteFailed {
161            path: err.key,
162            code: err.code,
163            message: err.message,
164        }
165    }
166}
167
168#[derive(Debug)]
169pub struct S3Config {
170    pub region: String,
171    pub endpoint: Option<String>,
172    pub bucket: String,
173    pub bucket_endpoint: String,
174    pub credentials: AwsCredentialProvider,
175    pub session_provider: Option<AwsCredentialProvider>,
176    pub retry_config: RetryConfig,
177    pub client_options: ClientOptions,
178    pub sign_payload: bool,
179    pub skip_signature: bool,
180    pub disable_tagging: bool,
181    pub checksum: Option<Checksum>,
182    pub copy_if_not_exists: Option<S3CopyIfNotExists>,
183    pub conditional_put: Option<S3ConditionalPut>,
184    pub encryption_headers: S3EncryptionHeaders,
185}
186
187impl S3Config {
188    pub(crate) fn path_url(&self, path: &Path) -> String {
189        format!("{}/{}", self.bucket_endpoint, encode_path(path))
190    }
191
192    async fn get_session_credential(&self) -> Result<SessionCredential<'_>> {
193        let credential = match self.skip_signature {
194            false => {
195                let provider = self.session_provider.as_ref().unwrap_or(&self.credentials);
196                Some(provider.get_credential().await?)
197            }
198            true => None,
199        };
200
201        Ok(SessionCredential {
202            credential,
203            session_token: self.session_provider.is_some(),
204            config: self,
205        })
206    }
207
208    pub(crate) async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
209        Ok(match self.skip_signature {
210            false => Some(self.credentials.get_credential().await?),
211            true => None,
212        })
213    }
214
215    #[inline]
216    pub(crate) fn is_s3_express(&self) -> bool {
217        self.session_provider.is_some()
218    }
219}
220
221struct SessionCredential<'a> {
222    credential: Option<Arc<AwsCredential>>,
223    session_token: bool,
224    config: &'a S3Config,
225}
226
227impl<'a> SessionCredential<'a> {
228    fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
229        let mut authorizer =
230            AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
231                .with_sign_payload(self.config.sign_payload);
232
233        if self.session_token {
234            let token = HeaderName::from_static("x-amz-s3session-token");
235            authorizer = authorizer.with_token_header(token)
236        }
237
238        Some(authorizer)
239    }
240}
241
242#[derive(Debug, Snafu)]
243pub enum RequestError {
244    #[snafu(context(false))]
245    Generic { source: crate::Error },
246    Retry {
247        source: crate::client::retry::Error,
248        path: String,
249    },
250}
251
252impl From<RequestError> for crate::Error {
253    fn from(value: RequestError) -> Self {
254        match value {
255            RequestError::Generic { source } => source,
256            RequestError::Retry { source, path } => source.error(STORE, path),
257        }
258    }
259}
260
261/// A builder for a request allowing customisation of the headers and query string
262pub(crate) struct Request<'a> {
263    path: &'a Path,
264    config: &'a S3Config,
265    builder: RequestBuilder,
266    payload_sha256: Option<digest::Digest>,
267    payload: Option<PutPayload>,
268    use_session_creds: bool,
269    idempotent: bool,
270}
271
272impl<'a> Request<'a> {
273    pub fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
274        let builder = self.builder.query(query);
275        Self { builder, ..self }
276    }
277
278    pub fn header<K>(self, k: K, v: &str) -> Self
279    where
280        HeaderName: TryFrom<K>,
281        <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
282    {
283        let builder = self.builder.header(k, v);
284        Self { builder, ..self }
285    }
286
287    pub fn headers(self, headers: HeaderMap) -> Self {
288        let builder = self.builder.headers(headers);
289        Self { builder, ..self }
290    }
291
292    pub fn idempotent(self, idempotent: bool) -> Self {
293        Self { idempotent, ..self }
294    }
295
296    pub fn with_encryption_headers(self) -> Self {
297        let headers = self.config.encryption_headers.clone().into();
298        let builder = self.builder.headers(headers);
299        Self { builder, ..self }
300    }
301
302    pub fn with_session_creds(self, use_session_creds: bool) -> Self {
303        Self {
304            use_session_creds,
305            ..self
306        }
307    }
308
309    pub fn with_tags(mut self, tags: TagSet) -> Self {
310        let tags = tags.encoded();
311        if !tags.is_empty() && !self.config.disable_tagging {
312            self.builder = self.builder.header(&TAGS_HEADER, tags);
313        }
314        self
315    }
316
317    pub fn with_attributes(self, attributes: Attributes) -> Self {
318        let mut has_content_type = false;
319        let mut builder = self.builder;
320        for (k, v) in &attributes {
321            builder = match k {
322                Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
323                Attribute::ContentDisposition => builder.header(CONTENT_DISPOSITION, v.as_ref()),
324                Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
325                Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
326                Attribute::ContentType => {
327                    has_content_type = true;
328                    builder.header(CONTENT_TYPE, v.as_ref())
329                }
330                Attribute::Metadata(k_suffix) => builder.header(
331                    &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix),
332                    v.as_ref(),
333                ),
334            };
335        }
336
337        if !has_content_type {
338            if let Some(value) = self.config.client_options.get_content_type(self.path) {
339                builder = builder.header(CONTENT_TYPE, value);
340            }
341        }
342        Self { builder, ..self }
343    }
344
345    pub fn with_payload(mut self, payload: PutPayload) -> Self {
346        if !self.config.skip_signature || self.config.checksum.is_some() {
347            let mut sha256 = Context::new(&digest::SHA256);
348            payload.iter().for_each(|x| sha256.update(x));
349            let payload_sha256 = sha256.finish();
350
351            if let Some(Checksum::SHA256) = self.config.checksum {
352                self.builder = self.builder.header(
353                    "x-amz-checksum-sha256",
354                    BASE64_STANDARD.encode(payload_sha256),
355                );
356            }
357            self.payload_sha256 = Some(payload_sha256);
358        }
359
360        let content_length = payload.content_length();
361        self.builder = self.builder.header(CONTENT_LENGTH, content_length);
362        self.payload = Some(payload);
363        self
364    }
365
366    pub async fn send(self) -> Result<Response, RequestError> {
367        let credential = match self.use_session_creds {
368            true => self.config.get_session_credential().await?,
369            false => SessionCredential {
370                credential: self.config.get_credential().await?,
371                session_token: false,
372                config: self.config,
373            },
374        };
375
376        let sha = self.payload_sha256.as_ref().map(|x| x.as_ref());
377
378        let path = self.path.as_ref();
379        self.builder
380            .with_aws_sigv4(credential.authorizer(), sha)
381            .retryable(&self.config.retry_config)
382            .idempotent(self.idempotent)
383            .payload(self.payload)
384            .send()
385            .await
386            .context(RetrySnafu { path })
387    }
388
389    pub async fn do_put(self) -> Result<PutResult> {
390        let response = self.send().await?;
391        Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
392    }
393}
394
395#[derive(Debug)]
396pub(crate) struct S3Client {
397    pub config: S3Config,
398    pub client: ReqwestClient,
399}
400
401impl S3Client {
402    pub fn new(config: S3Config) -> Result<Self> {
403        let client = config.client_options.client()?;
404        Ok(Self { config, client })
405    }
406
407    pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
408        let url = self.config.path_url(path);
409        Request {
410            path,
411            builder: self.client.request(method, url),
412            payload: None,
413            payload_sha256: None,
414            config: &self.config,
415            use_session_creds: true,
416            idempotent: false,
417        }
418    }
419
420    /// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
421    ///
422    /// Produces a vector of results, one for each path in the input vector. If
423    /// the delete was successful, the path is returned in the `Ok` variant. If
424    /// there was an error for a certain path, the error will be returned in the
425    /// vector. If there was an issue with making the overall request, an error
426    /// will be returned at the top level.
427    pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> Result<Vec<Result<Path>>> {
428        if paths.is_empty() {
429            return Ok(Vec::new());
430        }
431
432        let credential = self.config.get_session_credential().await?;
433        let url = format!("{}?delete", self.config.bucket_endpoint);
434
435        let mut buffer = Vec::new();
436        let mut writer = quick_xml::Writer::new(&mut buffer);
437        writer
438            .write_event(xml_events::Event::Start(
439                xml_events::BytesStart::new("Delete")
440                    .with_attributes([("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")]),
441            ))
442            .unwrap();
443        for path in &paths {
444            // <Object><Key>{path}</Key></Object>
445            writer
446                .write_event(xml_events::Event::Start(xml_events::BytesStart::new(
447                    "Object",
448                )))
449                .unwrap();
450            writer
451                .write_event(xml_events::Event::Start(xml_events::BytesStart::new("Key")))
452                .unwrap();
453            writer
454                .write_event(xml_events::Event::Text(xml_events::BytesText::new(
455                    path.as_ref(),
456                )))
457                .map_err(|err| crate::Error::Generic {
458                    store: STORE,
459                    source: Box::new(err),
460                })?;
461            writer
462                .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Key")))
463                .unwrap();
464            writer
465                .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Object")))
466                .unwrap();
467        }
468        writer
469            .write_event(xml_events::Event::End(xml_events::BytesEnd::new("Delete")))
470            .unwrap();
471
472        let body = Bytes::from(buffer);
473
474        let mut builder = self.client.request(Method::POST, url);
475
476        let digest = digest::digest(&digest::SHA256, &body);
477        builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest));
478
479        // S3 *requires* DeleteObjects to include a Content-MD5 header:
480        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
481        // >   "The Content-MD5 request header is required for all Multi-Object Delete requests"
482        // Some platforms, like MinIO, enforce this requirement and fail requests without the header.
483        let mut hasher = Md5::new();
484        hasher.update(&body);
485        builder = builder.header("Content-MD5", BASE64_STANDARD.encode(hasher.finalize()));
486
487        let response = builder
488            .header(CONTENT_TYPE, "application/xml")
489            .body(body)
490            .with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
491            .send_retry(&self.config.retry_config)
492            .await
493            .context(DeleteObjectsRequestSnafu {})?
494            .bytes()
495            .await
496            .context(DeleteObjectsResponseSnafu {})?;
497
498        let response: BatchDeleteResponse =
499            quick_xml::de::from_reader(response.reader()).map_err(|err| {
500                Error::InvalidDeleteObjectsResponse {
501                    source: Box::new(err),
502                }
503            })?;
504
505        // Assume all were ok, then fill in errors. This guarantees output order
506        // matches input order.
507        let mut results: Vec<Result<Path>> = paths.iter().cloned().map(Ok).collect();
508        for content in response.content.into_iter() {
509            if let DeleteObjectResult::Error(error) = content {
510                let path =
511                    Path::parse(&error.key).map_err(|err| Error::InvalidDeleteObjectsResponse {
512                        source: Box::new(err),
513                    })?;
514                let i = paths.iter().find_position(|&p| p == &path).unwrap().0;
515                results[i] = Err(Error::from(error).into());
516            }
517        }
518
519        Ok(results)
520    }
521
522    /// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
523    pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> Request<'a> {
524        let source = format!("{}/{}", self.config.bucket, encode_path(from));
525        self.request(Method::PUT, to)
526            .idempotent(true)
527            .header(&COPY_SOURCE_HEADER, &source)
528            .headers(self.config.encryption_headers.clone().into())
529            .with_session_creds(false)
530    }
531
532    pub async fn create_multipart(
533        &self,
534        location: &Path,
535        opts: PutMultipartOpts,
536    ) -> Result<MultipartId> {
537        let response = self
538            .request(Method::POST, location)
539            .query(&[("uploads", "")])
540            .with_encryption_headers()
541            .with_attributes(opts.attributes)
542            .with_tags(opts.tags)
543            .idempotent(true)
544            .send()
545            .await?
546            .bytes()
547            .await
548            .context(CreateMultipartResponseBodySnafu)?;
549
550        let response: InitiateMultipartUploadResult =
551            quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?;
552
553        Ok(response.upload_id)
554    }
555
556    pub async fn put_part(
557        &self,
558        path: &Path,
559        upload_id: &MultipartId,
560        part_idx: usize,
561        data: PutPayload,
562    ) -> Result<PartId> {
563        let part = (part_idx + 1).to_string();
564
565        let response = self
566            .request(Method::PUT, path)
567            .with_payload(data)
568            .query(&[("partNumber", &part), ("uploadId", upload_id)])
569            .idempotent(true)
570            .send()
571            .await?;
572
573        let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
574        Ok(PartId { content_id })
575    }
576
577    pub async fn complete_multipart(
578        &self,
579        location: &Path,
580        upload_id: &str,
581        parts: Vec<PartId>,
582    ) -> Result<PutResult> {
583        let parts = if parts.is_empty() {
584            // If no parts were uploaded, upload an empty part
585            // otherwise the completion request will fail
586            let part = self
587                .put_part(location, &upload_id.to_string(), 0, PutPayload::default())
588                .await?;
589            vec![part]
590        } else {
591            parts
592        };
593        let request = CompleteMultipartUpload::from(parts);
594        let body = quick_xml::se::to_string(&request).unwrap();
595
596        let credential = self.config.get_session_credential().await?;
597        let url = self.config.path_url(location);
598
599        let response = self
600            .client
601            .request(Method::POST, url)
602            .query(&[("uploadId", upload_id)])
603            .body(body)
604            .with_aws_sigv4(credential.authorizer(), None)
605            .retryable(&self.config.retry_config)
606            .idempotent(true)
607            .send()
608            .await
609            .context(CompleteMultipartRequestSnafu)?;
610
611        let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
612
613        let data = response
614            .bytes()
615            .await
616            .context(CompleteMultipartResponseBodySnafu)?;
617
618        let response: CompleteMultipartUploadResult =
619            quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
620
621        Ok(PutResult {
622            e_tag: Some(response.e_tag),
623            version,
624        })
625    }
626
627    #[cfg(test)]
628    pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
629        let credential = self.config.get_session_credential().await?;
630        let url = format!("{}?tagging", self.config.path_url(path));
631        let response = self
632            .client
633            .request(Method::GET, url)
634            .with_aws_sigv4(credential.authorizer(), None)
635            .send_retry(&self.config.retry_config)
636            .await
637            .map_err(|e| e.error(STORE, path.to_string()))?;
638        Ok(response)
639    }
640}
641
642#[async_trait]
643impl GetClient for S3Client {
644    const STORE: &'static str = STORE;
645
646    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
647        etag_required: false,
648        last_modified_required: false,
649        version_header: Some(VERSION_HEADER),
650        user_defined_metadata_prefix: Some(USER_DEFINED_METADATA_HEADER_PREFIX),
651    };
652
653    /// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
654    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
655        let credential = self.config.get_session_credential().await?;
656        let url = self.config.path_url(path);
657        let method = match options.head {
658            true => Method::HEAD,
659            false => Method::GET,
660        };
661
662        let mut builder = self.client.request(method, url);
663
664        if let Some(v) = &options.version {
665            builder = builder.query(&[("versionId", v)])
666        }
667
668        let response = builder
669            .with_get_options(options)
670            .with_aws_sigv4(credential.authorizer(), None)
671            .send_retry(&self.config.retry_config)
672            .await
673            .map_err(|e| e.error(STORE, path.to_string()))?;
674
675        Ok(response)
676    }
677}
678
679#[async_trait]
680impl ListClient for S3Client {
681    /// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
682    async fn list_request(
683        &self,
684        prefix: Option<&str>,
685        delimiter: bool,
686        token: Option<&str>,
687        offset: Option<&str>,
688    ) -> Result<(ListResult, Option<String>)> {
689        let credential = self.config.get_session_credential().await?;
690        let url = self.config.bucket_endpoint.clone();
691
692        let mut query = Vec::with_capacity(4);
693
694        if let Some(token) = token {
695            query.push(("continuation-token", token))
696        }
697
698        if delimiter {
699            query.push(("delimiter", DELIMITER))
700        }
701
702        query.push(("list-type", "2"));
703
704        if let Some(prefix) = prefix {
705            query.push(("prefix", prefix))
706        }
707
708        if let Some(offset) = offset {
709            query.push(("start-after", offset))
710        }
711
712        let response = self
713            .client
714            .request(Method::GET, &url)
715            .query(&query)
716            .with_aws_sigv4(credential.authorizer(), None)
717            .send_retry(&self.config.retry_config)
718            .await
719            .context(ListRequestSnafu)?
720            .bytes()
721            .await
722            .context(ListResponseBodySnafu)?;
723
724        let mut response: ListResponse =
725            quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
726        let token = response.next_continuation_token.take();
727
728        Ok((response.try_into()?, token))
729    }
730}
731
732fn encode_path(path: &Path) -> PercentEncode<'_> {
733    utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
734}