1use async_trait::async_trait;
35use futures::stream::BoxStream;
36use futures::{StreamExt, TryStreamExt};
37use itertools::Itertools;
38use snafu::{OptionExt, ResultExt, Snafu};
39use url::Url;
40
41use crate::client::get::GetClientExt;
42use crate::client::header::get_etag;
43use crate::http::client::Client;
44use crate::path::Path;
45use crate::{
46 ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
47 ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
48};
49
50mod client;
51
52#[derive(Debug, Snafu)]
53enum Error {
54 #[snafu(display("Must specify a URL"))]
55 MissingUrl,
56
57 #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
58 UnableToParseUrl {
59 source: url::ParseError,
60 url: String,
61 },
62
63 #[snafu(display("Unable to extract metadata from headers: {}", source))]
64 Metadata {
65 source: crate::client::header::Error,
66 },
67}
68
69impl From<Error> for crate::Error {
70 fn from(err: Error) -> Self {
71 Self::Generic {
72 store: "HTTP",
73 source: Box::new(err),
74 }
75 }
76}
77
78#[derive(Debug)]
82pub struct HttpStore {
83 client: Client,
84}
85
86impl std::fmt::Display for HttpStore {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 write!(f, "HttpStore")
89 }
90}
91
92#[async_trait]
93impl ObjectStore for HttpStore {
94 async fn put_opts(
95 &self,
96 location: &Path,
97 payload: PutPayload,
98 opts: PutOptions,
99 ) -> Result<PutResult> {
100 if opts.mode != PutMode::Overwrite {
101 return Err(crate::Error::NotImplemented);
103 }
104
105 let response = self.client.put(location, payload, opts.attributes).await?;
106 let e_tag = match get_etag(response.headers()) {
107 Ok(e_tag) => Some(e_tag),
108 Err(crate::client::header::Error::MissingEtag) => None,
109 Err(source) => return Err(Error::Metadata { source }.into()),
110 };
111
112 Ok(PutResult {
113 e_tag,
114 version: None,
115 })
116 }
117
118 async fn put_multipart_opts(
119 &self,
120 _location: &Path,
121 _opts: PutMultipartOpts,
122 ) -> Result<Box<dyn MultipartUpload>> {
123 Err(crate::Error::NotImplemented)
124 }
125
126 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
127 self.client.get_opts(location, options).await
128 }
129
130 async fn delete(&self, location: &Path) -> Result<()> {
131 self.client.delete(location).await
132 }
133
134 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
135 let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
136 let prefix = prefix.cloned();
137 futures::stream::once(async move {
138 let status = self.client.list(prefix.as_ref(), "infinity").await?;
139
140 let iter = status
141 .response
142 .into_iter()
143 .filter(|r| !r.is_dir())
144 .map(|response| {
145 response.check_ok()?;
146 response.object_meta(self.client.base_url())
147 })
148 .filter_ok(move |r| r.location.as_ref().len() > prefix_len);
150
151 Ok::<_, crate::Error>(futures::stream::iter(iter))
152 })
153 .try_flatten()
154 .boxed()
155 }
156
157 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
158 let status = self.client.list(prefix, "1").await?;
159 let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
160
161 let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len());
162 let mut common_prefixes = Vec::with_capacity(status.response.len());
163 for response in status.response {
164 response.check_ok()?;
165 match response.is_dir() {
166 false => {
167 let meta = response.object_meta(self.client.base_url())?;
168 if meta.location.as_ref().len() > prefix_len {
170 objects.push(meta);
171 }
172 }
173 true => {
174 let path = response.path(self.client.base_url())?;
175 if path.as_ref().len() > prefix_len {
177 common_prefixes.push(path);
178 }
179 }
180 }
181 }
182
183 Ok(ListResult {
184 common_prefixes,
185 objects,
186 })
187 }
188
189 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
190 self.client.copy(from, to, true).await
191 }
192
193 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
194 self.client.copy(from, to, false).await
195 }
196}
197
198#[derive(Debug, Default, Clone)]
200pub struct HttpBuilder {
201 url: Option<String>,
202 client_options: ClientOptions,
203 retry_config: RetryConfig,
204}
205
206impl HttpBuilder {
207 pub fn new() -> Self {
209 Default::default()
210 }
211
212 pub fn with_url(mut self, url: impl Into<String>) -> Self {
214 self.url = Some(url.into());
215 self
216 }
217
218 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
220 self.retry_config = retry_config;
221 self
222 }
223
224 pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
226 self.client_options = self.client_options.with_config(key, value);
227 self
228 }
229
230 pub fn with_client_options(mut self, options: ClientOptions) -> Self {
232 self.client_options = options;
233 self
234 }
235
236 pub fn build(self) -> Result<HttpStore> {
238 let url = self.url.context(MissingUrlSnafu)?;
239 let parsed = Url::parse(&url).context(UnableToParseUrlSnafu { url })?;
240
241 Ok(HttpStore {
242 client: Client::new(parsed, self.client_options, self.retry_config)?,
243 })
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use crate::integration::*;
250 use crate::tests::*;
251
252 use super::*;
253
254 #[tokio::test]
255 async fn http_test() {
256 maybe_skip_integration!();
257 let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
258 let options = ClientOptions::new().with_allow_http(true);
259 let integration = HttpBuilder::new()
260 .with_url(url)
261 .with_client_options(options)
262 .build()
263 .unwrap();
264
265 put_get_delete_list(&integration).await;
266 list_uses_directories_correctly(&integration).await;
267 list_with_delimiter(&integration).await;
268 rename_and_copy(&integration).await;
269 copy_if_not_exists(&integration).await;
270 }
271}