1use std::ops::Range;
19
20use crate::client::header::{header_meta, HeaderConfig};
21use crate::path::Path;
22use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result};
23use async_trait::async_trait;
24use futures::{StreamExt, TryStreamExt};
25use hyper::header::{
26 CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE,
27 CONTENT_TYPE,
28};
29use hyper::StatusCode;
30use reqwest::header::ToStrError;
31use reqwest::Response;
32use snafu::{ensure, OptionExt, ResultExt, Snafu};
33
34#[async_trait]
36pub trait GetClient: Send + Sync + 'static {
37 const STORE: &'static str;
38
39 const HEADER_CONFIG: HeaderConfig;
41
42 async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
43}
44
45#[async_trait]
47pub trait GetClientExt {
48 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
49}
50
51#[async_trait]
52impl<T: GetClient> GetClientExt for T {
53 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
54 let range = options.range.clone();
55 if let Some(r) = range.as_ref() {
56 r.is_valid().map_err(|e| crate::Error::Generic {
57 store: T::STORE,
58 source: Box::new(e),
59 })?;
60 }
61 let response = self.get_request(location, options).await?;
62 get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
63 store: T::STORE,
64 source: Box::new(e),
65 })
66 }
67}
68
69struct ContentRange {
70 range: Range<usize>,
72 size: usize,
74}
75
76impl ContentRange {
77 fn from_str(s: &str) -> Option<Self> {
81 let rem = s.trim().strip_prefix("bytes ")?;
82 let (range, size) = rem.split_once('/')?;
83 let size = size.parse().ok()?;
84
85 let (start_s, end_s) = range.split_once('-')?;
86
87 let start = start_s.parse().ok()?;
88 let end: usize = end_s.parse().ok()?;
89
90 Some(Self {
91 size,
92 range: start..end + 1,
93 })
94 }
95}
96
97#[derive(Debug, Snafu)]
99#[allow(missing_docs)]
100enum GetResultError {
101 #[snafu(context(false))]
102 Header {
103 source: crate::client::header::Error,
104 },
105
106 #[snafu(context(false))]
107 InvalidRangeRequest {
108 source: crate::util::InvalidGetRange,
109 },
110
111 #[snafu(display("Received non-partial response when range requested"))]
112 NotPartial,
113
114 #[snafu(display("Content-Range header not present in partial response"))]
115 NoContentRange,
116
117 #[snafu(display("Failed to parse value for CONTENT_RANGE header: \"{value}\""))]
118 ParseContentRange { value: String },
119
120 #[snafu(display("Content-Range header contained non UTF-8 characters"))]
121 InvalidContentRange { source: ToStrError },
122
123 #[snafu(display("Cache-Control header contained non UTF-8 characters"))]
124 InvalidCacheControl { source: ToStrError },
125
126 #[snafu(display("Content-Disposition header contained non UTF-8 characters"))]
127 InvalidContentDisposition { source: ToStrError },
128
129 #[snafu(display("Content-Encoding header contained non UTF-8 characters"))]
130 InvalidContentEncoding { source: ToStrError },
131
132 #[snafu(display("Content-Language header contained non UTF-8 characters"))]
133 InvalidContentLanguage { source: ToStrError },
134
135 #[snafu(display("Content-Type header contained non UTF-8 characters"))]
136 InvalidContentType { source: ToStrError },
137
138 #[snafu(display("Metadata value for \"{key:?}\" contained non UTF-8 characters"))]
139 InvalidMetadata { key: String },
140
141 #[snafu(display("Requested {expected:?}, got {actual:?}"))]
142 UnexpectedRange {
143 expected: Range<usize>,
144 actual: Range<usize>,
145 },
146}
147
148fn get_result<T: GetClient>(
149 location: &Path,
150 range: Option<GetRange>,
151 response: Response,
152) -> Result<GetResult, GetResultError> {
153 let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;
154
155 let range = if let Some(expected) = range {
157 ensure!(
158 response.status() == StatusCode::PARTIAL_CONTENT,
159 NotPartialSnafu
160 );
161 let val = response
162 .headers()
163 .get(CONTENT_RANGE)
164 .context(NoContentRangeSnafu)?;
165
166 let value = val.to_str().context(InvalidContentRangeSnafu)?;
167 let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
168 let actual = value.range;
169
170 meta.size = value.size;
172
173 let expected = expected.as_range(meta.size)?;
174
175 ensure!(
176 actual == expected,
177 UnexpectedRangeSnafu { expected, actual }
178 );
179
180 actual
181 } else {
182 0..meta.size
183 };
184
185 macro_rules! parse_attributes {
186 ($headers:expr, $(($header:expr, $attr:expr, $err:expr)),*) => {{
187 let mut attributes = Attributes::new();
188 $(
189 if let Some(x) = $headers.get($header) {
190 let x = x.to_str().context($err)?;
191 attributes.insert($attr, x.to_string().into());
192 }
193 )*
194 attributes
195 }}
196 }
197
198 let mut attributes = parse_attributes!(
199 response.headers(),
200 (
201 CACHE_CONTROL,
202 Attribute::CacheControl,
203 InvalidCacheControlSnafu
204 ),
205 (
206 CONTENT_DISPOSITION,
207 Attribute::ContentDisposition,
208 InvalidContentDispositionSnafu
209 ),
210 (
211 CONTENT_ENCODING,
212 Attribute::ContentEncoding,
213 InvalidContentEncodingSnafu
214 ),
215 (
216 CONTENT_LANGUAGE,
217 Attribute::ContentLanguage,
218 InvalidContentLanguageSnafu
219 ),
220 (
221 CONTENT_TYPE,
222 Attribute::ContentType,
223 InvalidContentTypeSnafu
224 )
225 );
226
227 if let Some(prefix) = T::HEADER_CONFIG.user_defined_metadata_prefix {
229 for (key, val) in response.headers() {
230 if let Some(suffix) = key.as_str().strip_prefix(prefix) {
231 if let Ok(val_str) = val.to_str() {
232 attributes.insert(
233 Attribute::Metadata(suffix.to_string().into()),
234 val_str.to_string().into(),
235 );
236 } else {
237 return Err(GetResultError::InvalidMetadata {
238 key: key.to_string(),
239 });
240 }
241 }
242 }
243 }
244
245 let stream = response
246 .bytes_stream()
247 .map_err(|source| crate::Error::Generic {
248 store: T::STORE,
249 source: Box::new(source),
250 })
251 .boxed();
252
253 Ok(GetResult {
254 range,
255 meta,
256 attributes,
257 payload: GetResultPayload::Stream(stream),
258 })
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use hyper::http;
265 use hyper::http::header::*;
266
267 struct TestClient {}
268
269 #[async_trait]
270 impl GetClient for TestClient {
271 const STORE: &'static str = "TEST";
272
273 const HEADER_CONFIG: HeaderConfig = HeaderConfig {
274 etag_required: false,
275 last_modified_required: false,
276 version_header: None,
277 user_defined_metadata_prefix: Some("x-test-meta-"),
278 };
279
280 async fn get_request(&self, _: &Path, _: GetOptions) -> Result<Response> {
281 unimplemented!()
282 }
283 }
284
285 fn make_response(
286 object_size: usize,
287 range: Option<Range<usize>>,
288 status: StatusCode,
289 content_range: Option<&str>,
290 headers: Option<Vec<(&str, &str)>>,
291 ) -> Response {
292 let mut builder = http::Response::builder();
293 if let Some(range) = content_range {
294 builder = builder.header(CONTENT_RANGE, range);
295 }
296
297 let body = match range {
298 Some(range) => vec![0_u8; range.end - range.start],
299 None => vec![0_u8; object_size],
300 };
301
302 if let Some(headers) = headers {
303 for (key, value) in headers {
304 builder = builder.header(key, value);
305 }
306 }
307
308 builder
309 .status(status)
310 .header(CONTENT_LENGTH, object_size)
311 .body(body)
312 .unwrap()
313 .into()
314 }
315
316 #[tokio::test]
317 async fn test_get_result() {
318 let path = Path::from("test");
319
320 let resp = make_response(12, None, StatusCode::OK, None, None);
321 let res = get_result::<TestClient>(&path, None, resp).unwrap();
322 assert_eq!(res.meta.size, 12);
323 assert_eq!(res.range, 0..12);
324 let bytes = res.bytes().await.unwrap();
325 assert_eq!(bytes.len(), 12);
326
327 let get_range = GetRange::from(2..3);
328
329 let resp = make_response(
330 12,
331 Some(2..3),
332 StatusCode::PARTIAL_CONTENT,
333 Some("bytes 2-2/12"),
334 None,
335 );
336 let res = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap();
337 assert_eq!(res.meta.size, 12);
338 assert_eq!(res.range, 2..3);
339 let bytes = res.bytes().await.unwrap();
340 assert_eq!(bytes.len(), 1);
341
342 let resp = make_response(12, Some(2..3), StatusCode::OK, None, None);
343 let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
344 assert_eq!(
345 err.to_string(),
346 "Received non-partial response when range requested"
347 );
348
349 let resp = make_response(
350 12,
351 Some(2..3),
352 StatusCode::PARTIAL_CONTENT,
353 Some("bytes 2-3/12"),
354 None,
355 );
356 let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
357 assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
358
359 let resp = make_response(
360 12,
361 Some(2..3),
362 StatusCode::PARTIAL_CONTENT,
363 Some("bytes 2-2/*"),
364 None,
365 );
366 let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
367 assert_eq!(
368 err.to_string(),
369 "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
370 );
371
372 let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None, None);
373 let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
374 assert_eq!(
375 err.to_string(),
376 "Content-Range header not present in partial response"
377 );
378
379 let resp = make_response(
380 2,
381 Some(2..3),
382 StatusCode::PARTIAL_CONTENT,
383 Some("bytes 2-3/2"),
384 None,
385 );
386 let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
387 assert_eq!(
388 err.to_string(),
389 "InvalidRangeRequest: Wanted range starting at 2, but object was only 2 bytes long"
390 );
391
392 let resp = make_response(
393 6,
394 Some(2..6),
395 StatusCode::PARTIAL_CONTENT,
396 Some("bytes 2-5/6"),
397 None,
398 );
399 let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap();
400 assert_eq!(res.meta.size, 6);
401 assert_eq!(res.range, 2..6);
402 let bytes = res.bytes().await.unwrap();
403 assert_eq!(bytes.len(), 4);
404
405 let resp = make_response(
406 6,
407 Some(2..6),
408 StatusCode::PARTIAL_CONTENT,
409 Some("bytes 2-3/6"),
410 None,
411 );
412 let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap_err();
413 assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
414
415 let resp = make_response(
416 12,
417 None,
418 StatusCode::OK,
419 None,
420 Some(vec![("x-test-meta-foo", "bar")]),
421 );
422 let res = get_result::<TestClient>(&path, None, resp).unwrap();
423 assert_eq!(res.meta.size, 12);
424 assert_eq!(res.range, 0..12);
425 assert_eq!(
426 res.attributes.get(&Attribute::Metadata("foo".into())),
427 Some(&"bar".into())
428 );
429 let bytes = res.bytes().await.unwrap();
430 assert_eq!(bytes.len(), 12);
431 }
432}