1use crate::client::get::GetClient;
19use crate::client::header::HeaderConfig;
20use crate::client::retry::{self, RetryConfig, RetryExt};
21use crate::client::GetOptionsExt;
22use crate::path::{Path, DELIMITER};
23use crate::util::deserialize_rfc1123;
24use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
25use async_trait::async_trait;
26use bytes::Buf;
27use chrono::{DateTime, Utc};
28use hyper::header::{
29 CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
30 CONTENT_TYPE,
31};
32use percent_encoding::percent_decode_str;
33use reqwest::{Method, Response, StatusCode};
34use serde::Deserialize;
35use snafu::{OptionExt, ResultExt, Snafu};
36use url::Url;
37
38#[derive(Debug, Snafu)]
39enum Error {
40 #[snafu(display("Request error: {}", source))]
41 Request { source: retry::Error },
42
43 #[snafu(display("Request error: {}", source))]
44 Reqwest { source: reqwest::Error },
45
46 #[snafu(display("Range request not supported by {}", href))]
47 RangeNotSupported { href: String },
48
49 #[snafu(display("Error decoding PROPFIND response: {}", source))]
50 InvalidPropFind { source: quick_xml::de::DeError },
51
52 #[snafu(display("Missing content size for {}", href))]
53 MissingSize { href: String },
54
55 #[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, status))]
56 PropStatus { href: String, status: String },
57
58 #[snafu(display("Failed to parse href \"{}\": {}", href, source))]
59 InvalidHref {
60 href: String,
61 source: url::ParseError,
62 },
63
64 #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
65 NonUnicode {
66 path: String,
67 source: std::str::Utf8Error,
68 },
69
70 #[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
71 InvalidPath {
72 path: String,
73 source: crate::path::Error,
74 },
75}
76
77impl From<Error> for crate::Error {
78 fn from(err: Error) -> Self {
79 Self::Generic {
80 store: "HTTP",
81 source: Box::new(err),
82 }
83 }
84}
85
86#[derive(Debug)]
88pub struct Client {
89 url: Url,
90 client: reqwest::Client,
91 retry_config: RetryConfig,
92 client_options: ClientOptions,
93}
94
95impl Client {
96 pub fn new(url: Url, client_options: ClientOptions, retry_config: RetryConfig) -> Result<Self> {
97 let client = client_options.client()?;
98 Ok(Self {
99 url,
100 retry_config,
101 client_options,
102 client,
103 })
104 }
105
106 pub fn base_url(&self) -> &Url {
107 &self.url
108 }
109
110 fn path_url(&self, location: &Path) -> Url {
111 let mut url = self.url.clone();
112 url.path_segments_mut().unwrap().extend(location.parts());
113 url
114 }
115
116 async fn make_directory(&self, path: &str) -> Result<(), Error> {
118 let method = Method::from_bytes(b"MKCOL").unwrap();
119 let mut url = self.url.clone();
120 url.path_segments_mut()
121 .unwrap()
122 .extend(path.split(DELIMITER));
123
124 self.client
125 .request(method, url)
126 .send_retry(&self.retry_config)
127 .await
128 .context(RequestSnafu)?;
129
130 Ok(())
131 }
132
133 async fn create_parent_directories(&self, location: &Path) -> Result<()> {
135 let mut stack = vec![];
136
137 let mut last_prefix = location.as_ref();
139 while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
140 last_prefix = prefix;
141
142 match self.make_directory(prefix).await {
143 Ok(_) => break,
144 Err(Error::Request { source })
145 if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
146 {
147 stack.push(prefix)
149 }
150 Err(e) => return Err(e.into()),
151 }
152 }
153
154 for prefix in stack.into_iter().rev() {
156 self.make_directory(prefix).await?;
157 }
158
159 Ok(())
160 }
161
162 pub async fn put(
163 &self,
164 location: &Path,
165 payload: PutPayload,
166 attributes: Attributes,
167 ) -> Result<Response> {
168 let mut retry = false;
169 loop {
170 let url = self.path_url(location);
171 let mut builder = self.client.put(url);
172
173 let mut has_content_type = false;
174 for (k, v) in &attributes {
175 builder = match k {
176 Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
177 Attribute::ContentDisposition => {
178 builder.header(CONTENT_DISPOSITION, v.as_ref())
179 }
180 Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
181 Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
182 Attribute::ContentType => {
183 has_content_type = true;
184 builder.header(CONTENT_TYPE, v.as_ref())
185 }
186 Attribute::Metadata(_) => builder,
188 };
189 }
190
191 if !has_content_type {
192 if let Some(value) = self.client_options.get_content_type(location) {
193 builder = builder.header(CONTENT_TYPE, value);
194 }
195 }
196
197 let resp = builder
198 .header(CONTENT_LENGTH, payload.content_length())
199 .retryable(&self.retry_config)
200 .idempotent(true)
201 .payload(Some(payload.clone()))
202 .send()
203 .await;
204
205 match resp {
206 Ok(response) => return Ok(response),
207 Err(source) => match source.status() {
208 Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
210 retry = true;
211 self.create_parent_directories(location).await?
212 }
213 _ => return Err(Error::Request { source }.into()),
214 },
215 }
216 }
217 }
218
219 pub async fn list(&self, location: Option<&Path>, depth: &str) -> Result<MultiStatus> {
220 let url = location
221 .map(|path| self.path_url(path))
222 .unwrap_or_else(|| self.url.clone());
223
224 let method = Method::from_bytes(b"PROPFIND").unwrap();
225 let result = self
226 .client
227 .request(method, url)
228 .header("Depth", depth)
229 .retryable(&self.retry_config)
230 .idempotent(true)
231 .send()
232 .await;
233
234 let response = match result {
235 Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
236 Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
237 return match depth {
238 "0" => {
239 let path = location.map(|x| x.as_ref()).unwrap_or("");
240 Err(crate::Error::NotFound {
241 path: path.to_string(),
242 source: Box::new(e),
243 })
244 }
245 _ => {
246 Ok(Default::default())
248 }
249 };
250 }
251 Err(source) => return Err(Error::Request { source }.into()),
252 };
253
254 let status = quick_xml::de::from_reader(response.reader()).context(InvalidPropFindSnafu)?;
255 Ok(status)
256 }
257
258 pub async fn delete(&self, path: &Path) -> Result<()> {
259 let url = self.path_url(path);
260 self.client
261 .delete(url)
262 .send_retry(&self.retry_config)
263 .await
264 .map_err(|source| match source.status() {
265 Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
266 source: Box::new(source),
267 path: path.to_string(),
268 },
269 _ => Error::Request { source }.into(),
270 })?;
271 Ok(())
272 }
273
274 pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
275 let mut retry = false;
276 loop {
277 let method = Method::from_bytes(b"COPY").unwrap();
278
279 let mut builder = self
280 .client
281 .request(method, self.path_url(from))
282 .header("Destination", self.path_url(to).as_str());
283
284 if !overwrite {
285 builder = builder.header("Overwrite", "F");
290 }
291
292 return match builder.send_retry(&self.retry_config).await {
293 Ok(_) => Ok(()),
294 Err(source) => Err(match source.status() {
295 Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
296 crate::Error::AlreadyExists {
297 path: to.to_string(),
298 source: Box::new(source),
299 }
300 }
301 Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
303 retry = true;
304 self.create_parent_directories(to).await?;
305 continue;
306 }
307 _ => Error::Request { source }.into(),
308 }),
309 };
310 }
311 }
312}
313
314#[async_trait]
315impl GetClient for Client {
316 const STORE: &'static str = "HTTP";
317
318 const HEADER_CONFIG: HeaderConfig = HeaderConfig {
321 etag_required: false,
322 last_modified_required: false,
323 version_header: None,
324 user_defined_metadata_prefix: None,
325 };
326
327 async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
328 let url = self.path_url(path);
329 let method = match options.head {
330 true => Method::HEAD,
331 false => Method::GET,
332 };
333 let has_range = options.range.is_some();
334 let builder = self.client.request(method, url);
335
336 let res = builder
337 .with_get_options(options)
338 .send_retry(&self.retry_config)
339 .await
340 .map_err(|source| match source.status() {
341 Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
343 crate::Error::NotFound {
344 source: Box::new(source),
345 path: path.to_string(),
346 }
347 }
348 _ => Error::Request { source }.into(),
349 })?;
350
351 if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
354 return Err(crate::Error::NotSupported {
355 source: Box::new(Error::RangeNotSupported {
356 href: path.to_string(),
357 }),
358 });
359 }
360
361 Ok(res)
362 }
363}
364
365#[derive(Deserialize, Default)]
367pub struct MultiStatus {
368 pub response: Vec<MultiStatusResponse>,
369}
370
371#[derive(Deserialize)]
372pub struct MultiStatusResponse {
373 href: String,
374 #[serde(rename = "propstat")]
375 prop_stat: PropStat,
376}
377
378impl MultiStatusResponse {
379 pub fn check_ok(&self) -> Result<()> {
381 match self.prop_stat.status.contains("200 OK") {
382 true => Ok(()),
383 false => Err(Error::PropStatus {
384 href: self.href.clone(),
385 status: self.prop_stat.status.clone(),
386 }
387 .into()),
388 }
389 }
390
391 pub fn path(&self, base_url: &Url) -> Result<Path> {
393 let url = Url::options()
394 .base_url(Some(base_url))
395 .parse(&self.href)
396 .context(InvalidHrefSnafu { href: &self.href })?;
397
398 let path = percent_decode_str(url.path())
400 .decode_utf8()
401 .context(NonUnicodeSnafu { path: url.path() })?;
402
403 Ok(Path::parse(path.as_ref()).context(InvalidPathSnafu { path })?)
404 }
405
406 fn size(&self) -> Result<usize> {
407 let size = self
408 .prop_stat
409 .prop
410 .content_length
411 .context(MissingSizeSnafu { href: &self.href })?;
412 Ok(size)
413 }
414
415 pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
417 let last_modified = self.prop_stat.prop.last_modified;
418 Ok(ObjectMeta {
419 location: self.path(base_url)?,
420 last_modified,
421 size: self.size()?,
422 e_tag: self.prop_stat.prop.e_tag.clone(),
423 version: None,
424 })
425 }
426
427 pub fn is_dir(&self) -> bool {
429 self.prop_stat.prop.resource_type.collection.is_some()
430 }
431}
432
433#[derive(Deserialize)]
434pub struct PropStat {
435 prop: Prop,
436 status: String,
437}
438
439#[derive(Deserialize)]
440pub struct Prop {
441 #[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
442 last_modified: DateTime<Utc>,
443
444 #[serde(rename = "getcontentlength")]
445 content_length: Option<usize>,
446
447 #[serde(rename = "resourcetype")]
448 resource_type: ResourceType,
449
450 #[serde(rename = "getetag")]
451 e_tag: Option<String>,
452}
453
454#[derive(Deserialize)]
455pub struct ResourceType {
456 collection: Option<()>,
457}