object_store/http/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for generic HTTP servers
19//!
20//! This follows [rfc2518] commonly known as [WebDAV]
21//!
22//! Basic get support will work out of the box with most HTTP servers,
23//! even those that don't explicitly support [rfc2518]
24//!
25//! Other operations such as list, delete, copy, etc... will likely
26//! require server-side configuration. A list of HTTP servers with support
27//! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server)
28//!
29//! Multipart uploads are not currently supported
30//!
31//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
32//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
33
34use async_trait::async_trait;
35use futures::stream::BoxStream;
36use futures::{StreamExt, TryStreamExt};
37use itertools::Itertools;
38use snafu::{OptionExt, ResultExt, Snafu};
39use url::Url;
40
41use crate::client::get::GetClientExt;
42use crate::client::header::get_etag;
43use crate::http::client::Client;
44use crate::path::Path;
45use crate::{
46    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
47    ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
48};
49
50mod client;
51
52#[derive(Debug, Snafu)]
53enum Error {
54    #[snafu(display("Must specify a URL"))]
55    MissingUrl,
56
57    #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
58    UnableToParseUrl {
59        source: url::ParseError,
60        url: String,
61    },
62
63    #[snafu(display("Unable to extract metadata from headers: {}", source))]
64    Metadata {
65        source: crate::client::header::Error,
66    },
67}
68
69impl From<Error> for crate::Error {
70    fn from(err: Error) -> Self {
71        Self::Generic {
72            store: "HTTP",
73            source: Box::new(err),
74        }
75    }
76}
77
78/// An [`ObjectStore`] implementation for generic HTTP servers
79///
80/// See [`crate::http`] for more information
81#[derive(Debug)]
82pub struct HttpStore {
83    client: Client,
84}
85
86impl std::fmt::Display for HttpStore {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        write!(f, "HttpStore")
89    }
90}
91
92#[async_trait]
93impl ObjectStore for HttpStore {
94    async fn put_opts(
95        &self,
96        location: &Path,
97        payload: PutPayload,
98        opts: PutOptions,
99    ) -> Result<PutResult> {
100        if opts.mode != PutMode::Overwrite {
101            // TODO: Add support for If header - https://datatracker.ietf.org/doc/html/rfc2518#section-9.4
102            return Err(crate::Error::NotImplemented);
103        }
104
105        let response = self.client.put(location, payload, opts.attributes).await?;
106        let e_tag = match get_etag(response.headers()) {
107            Ok(e_tag) => Some(e_tag),
108            Err(crate::client::header::Error::MissingEtag) => None,
109            Err(source) => return Err(Error::Metadata { source }.into()),
110        };
111
112        Ok(PutResult {
113            e_tag,
114            version: None,
115        })
116    }
117
118    async fn put_multipart_opts(
119        &self,
120        _location: &Path,
121        _opts: PutMultipartOpts,
122    ) -> Result<Box<dyn MultipartUpload>> {
123        Err(crate::Error::NotImplemented)
124    }
125
126    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
127        self.client.get_opts(location, options).await
128    }
129
130    async fn delete(&self, location: &Path) -> Result<()> {
131        self.client.delete(location).await
132    }
133
134    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
135        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
136        let prefix = prefix.cloned();
137        futures::stream::once(async move {
138            let status = self.client.list(prefix.as_ref(), "infinity").await?;
139
140            let iter = status
141                .response
142                .into_iter()
143                .filter(|r| !r.is_dir())
144                .map(|response| {
145                    response.check_ok()?;
146                    response.object_meta(self.client.base_url())
147                })
148                // Filter out exact prefix matches
149                .filter_ok(move |r| r.location.as_ref().len() > prefix_len);
150
151            Ok::<_, crate::Error>(futures::stream::iter(iter))
152        })
153        .try_flatten()
154        .boxed()
155    }
156
157    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
158        let status = self.client.list(prefix, "1").await?;
159        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
160
161        let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len());
162        let mut common_prefixes = Vec::with_capacity(status.response.len());
163        for response in status.response {
164            response.check_ok()?;
165            match response.is_dir() {
166                false => {
167                    let meta = response.object_meta(self.client.base_url())?;
168                    // Filter out exact prefix matches
169                    if meta.location.as_ref().len() > prefix_len {
170                        objects.push(meta);
171                    }
172                }
173                true => {
174                    let path = response.path(self.client.base_url())?;
175                    // Exclude the current object
176                    if path.as_ref().len() > prefix_len {
177                        common_prefixes.push(path);
178                    }
179                }
180            }
181        }
182
183        Ok(ListResult {
184            common_prefixes,
185            objects,
186        })
187    }
188
189    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
190        self.client.copy(from, to, true).await
191    }
192
193    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
194        self.client.copy(from, to, false).await
195    }
196}
197
198/// Configure a connection to a generic HTTP server
199#[derive(Debug, Default, Clone)]
200pub struct HttpBuilder {
201    url: Option<String>,
202    client_options: ClientOptions,
203    retry_config: RetryConfig,
204}
205
206impl HttpBuilder {
207    /// Create a new [`HttpBuilder`] with default values.
208    pub fn new() -> Self {
209        Default::default()
210    }
211
212    /// Set the URL
213    pub fn with_url(mut self, url: impl Into<String>) -> Self {
214        self.url = Some(url.into());
215        self
216    }
217
218    /// Set the retry configuration
219    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
220        self.retry_config = retry_config;
221        self
222    }
223
224    /// Set individual client configuration without overriding the entire config
225    pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
226        self.client_options = self.client_options.with_config(key, value);
227        self
228    }
229
230    /// Sets the client options, overriding any already set
231    pub fn with_client_options(mut self, options: ClientOptions) -> Self {
232        self.client_options = options;
233        self
234    }
235
236    /// Build an [`HttpStore`] with the configured options
237    pub fn build(self) -> Result<HttpStore> {
238        let url = self.url.context(MissingUrlSnafu)?;
239        let parsed = Url::parse(&url).context(UnableToParseUrlSnafu { url })?;
240
241        Ok(HttpStore {
242            client: Client::new(parsed, self.client_options, self.retry_config)?,
243        })
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use crate::integration::*;
250    use crate::tests::*;
251
252    use super::*;
253
254    #[tokio::test]
255    async fn http_test() {
256        maybe_skip_integration!();
257        let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
258        let options = ClientOptions::new().with_allow_http(true);
259        let integration = HttpBuilder::new()
260            .with_url(url)
261            .with_client_options(options)
262            .build()
263            .unwrap();
264
265        put_get_delete_list(&integration).await;
266        list_uses_directories_correctly(&integration).await;
267        list_with_delimiter(&integration).await;
268        rename_and_copy(&integration).await;
269        copy_if_not_exists(&integration).await;
270    }
271}