object_store/http/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::get::GetClient;
19use crate::client::header::HeaderConfig;
20use crate::client::retry::{self, RetryConfig, RetryExt};
21use crate::client::GetOptionsExt;
22use crate::path::{Path, DELIMITER};
23use crate::util::deserialize_rfc1123;
24use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
25use async_trait::async_trait;
26use bytes::Buf;
27use chrono::{DateTime, Utc};
28use hyper::header::{
29    CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
30    CONTENT_TYPE,
31};
32use percent_encoding::percent_decode_str;
33use reqwest::{Method, Response, StatusCode};
34use serde::Deserialize;
35use snafu::{OptionExt, ResultExt, Snafu};
36use url::Url;
37
38#[derive(Debug, Snafu)]
39enum Error {
40    #[snafu(display("Request error: {}", source))]
41    Request { source: retry::Error },
42
43    #[snafu(display("Request error: {}", source))]
44    Reqwest { source: reqwest::Error },
45
46    #[snafu(display("Range request not supported by {}", href))]
47    RangeNotSupported { href: String },
48
49    #[snafu(display("Error decoding PROPFIND response: {}", source))]
50    InvalidPropFind { source: quick_xml::de::DeError },
51
52    #[snafu(display("Missing content size for {}", href))]
53    MissingSize { href: String },
54
55    #[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, status))]
56    PropStatus { href: String, status: String },
57
58    #[snafu(display("Failed to parse href \"{}\": {}", href, source))]
59    InvalidHref {
60        href: String,
61        source: url::ParseError,
62    },
63
64    #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
65    NonUnicode {
66        path: String,
67        source: std::str::Utf8Error,
68    },
69
70    #[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
71    InvalidPath {
72        path: String,
73        source: crate::path::Error,
74    },
75}
76
77impl From<Error> for crate::Error {
78    fn from(err: Error) -> Self {
79        Self::Generic {
80            store: "HTTP",
81            source: Box::new(err),
82        }
83    }
84}
85
86/// Internal client for HttpStore
87#[derive(Debug)]
88pub struct Client {
89    url: Url,
90    client: reqwest::Client,
91    retry_config: RetryConfig,
92    client_options: ClientOptions,
93}
94
95impl Client {
96    pub fn new(url: Url, client_options: ClientOptions, retry_config: RetryConfig) -> Result<Self> {
97        let client = client_options.client()?;
98        Ok(Self {
99            url,
100            retry_config,
101            client_options,
102            client,
103        })
104    }
105
106    pub fn base_url(&self) -> &Url {
107        &self.url
108    }
109
110    fn path_url(&self, location: &Path) -> Url {
111        let mut url = self.url.clone();
112        url.path_segments_mut().unwrap().extend(location.parts());
113        url
114    }
115
116    /// Create a directory with `path` using MKCOL
117    async fn make_directory(&self, path: &str) -> Result<(), Error> {
118        let method = Method::from_bytes(b"MKCOL").unwrap();
119        let mut url = self.url.clone();
120        url.path_segments_mut()
121            .unwrap()
122            .extend(path.split(DELIMITER));
123
124        self.client
125            .request(method, url)
126            .send_retry(&self.retry_config)
127            .await
128            .context(RequestSnafu)?;
129
130        Ok(())
131    }
132
133    /// Recursively create parent directories
134    async fn create_parent_directories(&self, location: &Path) -> Result<()> {
135        let mut stack = vec![];
136
137        // Walk backwards until a request succeeds
138        let mut last_prefix = location.as_ref();
139        while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
140            last_prefix = prefix;
141
142            match self.make_directory(prefix).await {
143                Ok(_) => break,
144                Err(Error::Request { source })
145                    if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
146                {
147                    // Need to create parent
148                    stack.push(prefix)
149                }
150                Err(e) => return Err(e.into()),
151            }
152        }
153
154        // Retry the failed requests, which should now succeed
155        for prefix in stack.into_iter().rev() {
156            self.make_directory(prefix).await?;
157        }
158
159        Ok(())
160    }
161
162    pub async fn put(
163        &self,
164        location: &Path,
165        payload: PutPayload,
166        attributes: Attributes,
167    ) -> Result<Response> {
168        let mut retry = false;
169        loop {
170            let url = self.path_url(location);
171            let mut builder = self.client.put(url);
172
173            let mut has_content_type = false;
174            for (k, v) in &attributes {
175                builder = match k {
176                    Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
177                    Attribute::ContentDisposition => {
178                        builder.header(CONTENT_DISPOSITION, v.as_ref())
179                    }
180                    Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
181                    Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
182                    Attribute::ContentType => {
183                        has_content_type = true;
184                        builder.header(CONTENT_TYPE, v.as_ref())
185                    }
186                    // Ignore metadata attributes
187                    Attribute::Metadata(_) => builder,
188                };
189            }
190
191            if !has_content_type {
192                if let Some(value) = self.client_options.get_content_type(location) {
193                    builder = builder.header(CONTENT_TYPE, value);
194                }
195            }
196
197            let resp = builder
198                .header(CONTENT_LENGTH, payload.content_length())
199                .retryable(&self.retry_config)
200                .idempotent(true)
201                .payload(Some(payload.clone()))
202                .send()
203                .await;
204
205            match resp {
206                Ok(response) => return Ok(response),
207                Err(source) => match source.status() {
208                    // Some implementations return 404 instead of 409
209                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
210                        retry = true;
211                        self.create_parent_directories(location).await?
212                    }
213                    _ => return Err(Error::Request { source }.into()),
214                },
215            }
216        }
217    }
218
219    pub async fn list(&self, location: Option<&Path>, depth: &str) -> Result<MultiStatus> {
220        let url = location
221            .map(|path| self.path_url(path))
222            .unwrap_or_else(|| self.url.clone());
223
224        let method = Method::from_bytes(b"PROPFIND").unwrap();
225        let result = self
226            .client
227            .request(method, url)
228            .header("Depth", depth)
229            .retryable(&self.retry_config)
230            .idempotent(true)
231            .send()
232            .await;
233
234        let response = match result {
235            Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
236            Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
237                return match depth {
238                    "0" => {
239                        let path = location.map(|x| x.as_ref()).unwrap_or("");
240                        Err(crate::Error::NotFound {
241                            path: path.to_string(),
242                            source: Box::new(e),
243                        })
244                    }
245                    _ => {
246                        // If prefix not found, return empty result set
247                        Ok(Default::default())
248                    }
249                };
250            }
251            Err(source) => return Err(Error::Request { source }.into()),
252        };
253
254        let status = quick_xml::de::from_reader(response.reader()).context(InvalidPropFindSnafu)?;
255        Ok(status)
256    }
257
258    pub async fn delete(&self, path: &Path) -> Result<()> {
259        let url = self.path_url(path);
260        self.client
261            .delete(url)
262            .send_retry(&self.retry_config)
263            .await
264            .map_err(|source| match source.status() {
265                Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
266                    source: Box::new(source),
267                    path: path.to_string(),
268                },
269                _ => Error::Request { source }.into(),
270            })?;
271        Ok(())
272    }
273
274    pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
275        let mut retry = false;
276        loop {
277            let method = Method::from_bytes(b"COPY").unwrap();
278
279            let mut builder = self
280                .client
281                .request(method, self.path_url(from))
282                .header("Destination", self.path_url(to).as_str());
283
284            if !overwrite {
285                // While the Overwrite header appears to duplicate
286                // the functionality of the If-Match: * header of HTTP/1.1, If-Match
287                // applies only to the Request-URI, and not to the Destination of a COPY
288                // or MOVE.
289                builder = builder.header("Overwrite", "F");
290            }
291
292            return match builder.send_retry(&self.retry_config).await {
293                Ok(_) => Ok(()),
294                Err(source) => Err(match source.status() {
295                    Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
296                        crate::Error::AlreadyExists {
297                            path: to.to_string(),
298                            source: Box::new(source),
299                        }
300                    }
301                    // Some implementations return 404 instead of 409
302                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
303                        retry = true;
304                        self.create_parent_directories(to).await?;
305                        continue;
306                    }
307                    _ => Error::Request { source }.into(),
308                }),
309            };
310        }
311    }
312}
313
314#[async_trait]
315impl GetClient for Client {
316    const STORE: &'static str = "HTTP";
317
318    /// Override the [`HeaderConfig`] to be less strict to support a
319    /// broader range of HTTP servers (#4831)
320    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
321        etag_required: false,
322        last_modified_required: false,
323        version_header: None,
324        user_defined_metadata_prefix: None,
325    };
326
327    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
328        let url = self.path_url(path);
329        let method = match options.head {
330            true => Method::HEAD,
331            false => Method::GET,
332        };
333        let has_range = options.range.is_some();
334        let builder = self.client.request(method, url);
335
336        let res = builder
337            .with_get_options(options)
338            .send_retry(&self.retry_config)
339            .await
340            .map_err(|source| match source.status() {
341                // Some stores return METHOD_NOT_ALLOWED for get on directories
342                Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
343                    crate::Error::NotFound {
344                        source: Box::new(source),
345                        path: path.to_string(),
346                    }
347                }
348                _ => Error::Request { source }.into(),
349            })?;
350
351        // We expect a 206 Partial Content response if a range was requested
352        // a 200 OK response would indicate the server did not fulfill the request
353        if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
354            return Err(crate::Error::NotSupported {
355                source: Box::new(Error::RangeNotSupported {
356                    href: path.to_string(),
357                }),
358            });
359        }
360
361        Ok(res)
362    }
363}
364
365/// The response returned by a PROPFIND request, i.e. list
366#[derive(Deserialize, Default)]
367pub struct MultiStatus {
368    pub response: Vec<MultiStatusResponse>,
369}
370
371#[derive(Deserialize)]
372pub struct MultiStatusResponse {
373    href: String,
374    #[serde(rename = "propstat")]
375    prop_stat: PropStat,
376}
377
378impl MultiStatusResponse {
379    /// Returns an error if this response is not OK
380    pub fn check_ok(&self) -> Result<()> {
381        match self.prop_stat.status.contains("200 OK") {
382            true => Ok(()),
383            false => Err(Error::PropStatus {
384                href: self.href.clone(),
385                status: self.prop_stat.status.clone(),
386            }
387            .into()),
388        }
389    }
390
391    /// Returns the resolved path of this element relative to `base_url`
392    pub fn path(&self, base_url: &Url) -> Result<Path> {
393        let url = Url::options()
394            .base_url(Some(base_url))
395            .parse(&self.href)
396            .context(InvalidHrefSnafu { href: &self.href })?;
397
398        // Reverse any percent encoding
399        let path = percent_decode_str(url.path())
400            .decode_utf8()
401            .context(NonUnicodeSnafu { path: url.path() })?;
402
403        Ok(Path::parse(path.as_ref()).context(InvalidPathSnafu { path })?)
404    }
405
406    fn size(&self) -> Result<usize> {
407        let size = self
408            .prop_stat
409            .prop
410            .content_length
411            .context(MissingSizeSnafu { href: &self.href })?;
412        Ok(size)
413    }
414
415    /// Returns this objects metadata as [`ObjectMeta`]
416    pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
417        let last_modified = self.prop_stat.prop.last_modified;
418        Ok(ObjectMeta {
419            location: self.path(base_url)?,
420            last_modified,
421            size: self.size()?,
422            e_tag: self.prop_stat.prop.e_tag.clone(),
423            version: None,
424        })
425    }
426
427    /// Returns true if this is a directory / collection
428    pub fn is_dir(&self) -> bool {
429        self.prop_stat.prop.resource_type.collection.is_some()
430    }
431}
432
433#[derive(Deserialize)]
434pub struct PropStat {
435    prop: Prop,
436    status: String,
437}
438
439#[derive(Deserialize)]
440pub struct Prop {
441    #[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
442    last_modified: DateTime<Utc>,
443
444    #[serde(rename = "getcontentlength")]
445    content_length: Option<usize>,
446
447    #[serde(rename = "resourcetype")]
448    resource_type: ResourceType,
449
450    #[serde(rename = "getetag")]
451    e_tag: Option<String>,
452}
453
454#[derive(Deserialize)]
455pub struct ResourceType {
456    collection: Option<()>,
457}