object_store/client/
get.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19
20use crate::client::header::{header_meta, HeaderConfig};
21use crate::path::Path;
22use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result};
23use async_trait::async_trait;
24use futures::{StreamExt, TryStreamExt};
25use hyper::header::{
26    CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE,
27    CONTENT_TYPE,
28};
29use hyper::StatusCode;
30use reqwest::header::ToStrError;
31use reqwest::Response;
32use snafu::{ensure, OptionExt, ResultExt, Snafu};
33
34/// A client that can perform a get request
35#[async_trait]
36pub trait GetClient: Send + Sync + 'static {
37    const STORE: &'static str;
38
39    /// Configure the [`HeaderConfig`] for this client
40    const HEADER_CONFIG: HeaderConfig;
41
42    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
43}
44
45/// Extension trait for [`GetClient`] that adds common retrieval functionality
46#[async_trait]
47pub trait GetClientExt {
48    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
49}
50
51#[async_trait]
52impl<T: GetClient> GetClientExt for T {
53    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
54        let range = options.range.clone();
55        if let Some(r) = range.as_ref() {
56            r.is_valid().map_err(|e| crate::Error::Generic {
57                store: T::STORE,
58                source: Box::new(e),
59            })?;
60        }
61        let response = self.get_request(location, options).await?;
62        get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
63            store: T::STORE,
64            source: Box::new(e),
65        })
66    }
67}
68
69struct ContentRange {
70    /// The range of the object returned
71    range: Range<usize>,
72    /// The total size of the object being requested
73    size: usize,
74}
75
76impl ContentRange {
77    /// Parse a content range of the form `bytes <range-start>-<range-end>/<size>`
78    ///
79    /// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
80    fn from_str(s: &str) -> Option<Self> {
81        let rem = s.trim().strip_prefix("bytes ")?;
82        let (range, size) = rem.split_once('/')?;
83        let size = size.parse().ok()?;
84
85        let (start_s, end_s) = range.split_once('-')?;
86
87        let start = start_s.parse().ok()?;
88        let end: usize = end_s.parse().ok()?;
89
90        Some(Self {
91            size,
92            range: start..end + 1,
93        })
94    }
95}
96
97/// A specialized `Error` for get-related errors
98#[derive(Debug, Snafu)]
99#[allow(missing_docs)]
100enum GetResultError {
101    #[snafu(context(false))]
102    Header {
103        source: crate::client::header::Error,
104    },
105
106    #[snafu(context(false))]
107    InvalidRangeRequest {
108        source: crate::util::InvalidGetRange,
109    },
110
111    #[snafu(display("Received non-partial response when range requested"))]
112    NotPartial,
113
114    #[snafu(display("Content-Range header not present in partial response"))]
115    NoContentRange,
116
117    #[snafu(display("Failed to parse value for CONTENT_RANGE header: \"{value}\""))]
118    ParseContentRange { value: String },
119
120    #[snafu(display("Content-Range header contained non UTF-8 characters"))]
121    InvalidContentRange { source: ToStrError },
122
123    #[snafu(display("Cache-Control header contained non UTF-8 characters"))]
124    InvalidCacheControl { source: ToStrError },
125
126    #[snafu(display("Content-Disposition header contained non UTF-8 characters"))]
127    InvalidContentDisposition { source: ToStrError },
128
129    #[snafu(display("Content-Encoding header contained non UTF-8 characters"))]
130    InvalidContentEncoding { source: ToStrError },
131
132    #[snafu(display("Content-Language header contained non UTF-8 characters"))]
133    InvalidContentLanguage { source: ToStrError },
134
135    #[snafu(display("Content-Type header contained non UTF-8 characters"))]
136    InvalidContentType { source: ToStrError },
137
138    #[snafu(display("Metadata value for \"{key:?}\" contained non UTF-8 characters"))]
139    InvalidMetadata { key: String },
140
141    #[snafu(display("Requested {expected:?}, got {actual:?}"))]
142    UnexpectedRange {
143        expected: Range<usize>,
144        actual: Range<usize>,
145    },
146}
147
148fn get_result<T: GetClient>(
149    location: &Path,
150    range: Option<GetRange>,
151    response: Response,
152) -> Result<GetResult, GetResultError> {
153    let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;
154
155    // ensure that we receive the range we asked for
156    let range = if let Some(expected) = range {
157        ensure!(
158            response.status() == StatusCode::PARTIAL_CONTENT,
159            NotPartialSnafu
160        );
161        let val = response
162            .headers()
163            .get(CONTENT_RANGE)
164            .context(NoContentRangeSnafu)?;
165
166        let value = val.to_str().context(InvalidContentRangeSnafu)?;
167        let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
168        let actual = value.range;
169
170        // Update size to reflect full size of object (#5272)
171        meta.size = value.size;
172
173        let expected = expected.as_range(meta.size)?;
174
175        ensure!(
176            actual == expected,
177            UnexpectedRangeSnafu { expected, actual }
178        );
179
180        actual
181    } else {
182        0..meta.size
183    };
184
185    macro_rules! parse_attributes {
186        ($headers:expr, $(($header:expr, $attr:expr, $err:expr)),*) => {{
187            let mut attributes = Attributes::new();
188            $(
189            if let Some(x) = $headers.get($header) {
190                let x = x.to_str().context($err)?;
191                attributes.insert($attr, x.to_string().into());
192            }
193            )*
194            attributes
195        }}
196    }
197
198    let mut attributes = parse_attributes!(
199        response.headers(),
200        (
201            CACHE_CONTROL,
202            Attribute::CacheControl,
203            InvalidCacheControlSnafu
204        ),
205        (
206            CONTENT_DISPOSITION,
207            Attribute::ContentDisposition,
208            InvalidContentDispositionSnafu
209        ),
210        (
211            CONTENT_ENCODING,
212            Attribute::ContentEncoding,
213            InvalidContentEncodingSnafu
214        ),
215        (
216            CONTENT_LANGUAGE,
217            Attribute::ContentLanguage,
218            InvalidContentLanguageSnafu
219        ),
220        (
221            CONTENT_TYPE,
222            Attribute::ContentType,
223            InvalidContentTypeSnafu
224        )
225    );
226
227    // Add attributes that match the user-defined metadata prefix (e.g. x-amz-meta-)
228    if let Some(prefix) = T::HEADER_CONFIG.user_defined_metadata_prefix {
229        for (key, val) in response.headers() {
230            if let Some(suffix) = key.as_str().strip_prefix(prefix) {
231                if let Ok(val_str) = val.to_str() {
232                    attributes.insert(
233                        Attribute::Metadata(suffix.to_string().into()),
234                        val_str.to_string().into(),
235                    );
236                } else {
237                    return Err(GetResultError::InvalidMetadata {
238                        key: key.to_string(),
239                    });
240                }
241            }
242        }
243    }
244
245    let stream = response
246        .bytes_stream()
247        .map_err(|source| crate::Error::Generic {
248            store: T::STORE,
249            source: Box::new(source),
250        })
251        .boxed();
252
253    Ok(GetResult {
254        range,
255        meta,
256        attributes,
257        payload: GetResultPayload::Stream(stream),
258    })
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use hyper::http;
265    use hyper::http::header::*;
266
267    struct TestClient {}
268
269    #[async_trait]
270    impl GetClient for TestClient {
271        const STORE: &'static str = "TEST";
272
273        const HEADER_CONFIG: HeaderConfig = HeaderConfig {
274            etag_required: false,
275            last_modified_required: false,
276            version_header: None,
277            user_defined_metadata_prefix: Some("x-test-meta-"),
278        };
279
280        async fn get_request(&self, _: &Path, _: GetOptions) -> Result<Response> {
281            unimplemented!()
282        }
283    }
284
285    fn make_response(
286        object_size: usize,
287        range: Option<Range<usize>>,
288        status: StatusCode,
289        content_range: Option<&str>,
290        headers: Option<Vec<(&str, &str)>>,
291    ) -> Response {
292        let mut builder = http::Response::builder();
293        if let Some(range) = content_range {
294            builder = builder.header(CONTENT_RANGE, range);
295        }
296
297        let body = match range {
298            Some(range) => vec![0_u8; range.end - range.start],
299            None => vec![0_u8; object_size],
300        };
301
302        if let Some(headers) = headers {
303            for (key, value) in headers {
304                builder = builder.header(key, value);
305            }
306        }
307
308        builder
309            .status(status)
310            .header(CONTENT_LENGTH, object_size)
311            .body(body)
312            .unwrap()
313            .into()
314    }
315
316    #[tokio::test]
317    async fn test_get_result() {
318        let path = Path::from("test");
319
320        let resp = make_response(12, None, StatusCode::OK, None, None);
321        let res = get_result::<TestClient>(&path, None, resp).unwrap();
322        assert_eq!(res.meta.size, 12);
323        assert_eq!(res.range, 0..12);
324        let bytes = res.bytes().await.unwrap();
325        assert_eq!(bytes.len(), 12);
326
327        let get_range = GetRange::from(2..3);
328
329        let resp = make_response(
330            12,
331            Some(2..3),
332            StatusCode::PARTIAL_CONTENT,
333            Some("bytes 2-2/12"),
334            None,
335        );
336        let res = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap();
337        assert_eq!(res.meta.size, 12);
338        assert_eq!(res.range, 2..3);
339        let bytes = res.bytes().await.unwrap();
340        assert_eq!(bytes.len(), 1);
341
342        let resp = make_response(12, Some(2..3), StatusCode::OK, None, None);
343        let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
344        assert_eq!(
345            err.to_string(),
346            "Received non-partial response when range requested"
347        );
348
349        let resp = make_response(
350            12,
351            Some(2..3),
352            StatusCode::PARTIAL_CONTENT,
353            Some("bytes 2-3/12"),
354            None,
355        );
356        let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
357        assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
358
359        let resp = make_response(
360            12,
361            Some(2..3),
362            StatusCode::PARTIAL_CONTENT,
363            Some("bytes 2-2/*"),
364            None,
365        );
366        let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
367        assert_eq!(
368            err.to_string(),
369            "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
370        );
371
372        let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None, None);
373        let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
374        assert_eq!(
375            err.to_string(),
376            "Content-Range header not present in partial response"
377        );
378
379        let resp = make_response(
380            2,
381            Some(2..3),
382            StatusCode::PARTIAL_CONTENT,
383            Some("bytes 2-3/2"),
384            None,
385        );
386        let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
387        assert_eq!(
388            err.to_string(),
389            "InvalidRangeRequest: Wanted range starting at 2, but object was only 2 bytes long"
390        );
391
392        let resp = make_response(
393            6,
394            Some(2..6),
395            StatusCode::PARTIAL_CONTENT,
396            Some("bytes 2-5/6"),
397            None,
398        );
399        let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap();
400        assert_eq!(res.meta.size, 6);
401        assert_eq!(res.range, 2..6);
402        let bytes = res.bytes().await.unwrap();
403        assert_eq!(bytes.len(), 4);
404
405        let resp = make_response(
406            6,
407            Some(2..6),
408            StatusCode::PARTIAL_CONTENT,
409            Some("bytes 2-3/6"),
410            None,
411        );
412        let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap_err();
413        assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
414
415        let resp = make_response(
416            12,
417            None,
418            StatusCode::OK,
419            None,
420            Some(vec![("x-test-meta-foo", "bar")]),
421        );
422        let res = get_result::<TestClient>(&path, None, resp).unwrap();
423        assert_eq!(res.meta.size, 12);
424        assert_eq!(res.range, 0..12);
425        assert_eq!(
426            res.attributes.get(&Attribute::Metadata("foo".into())),
427            Some(&"bar".into())
428        );
429        let bytes = res.bytes().await.unwrap();
430        assert_eq!(bytes.len(), 12);
431    }
432}