object_store/gcp/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::get::GetClient;
19use crate::client::header::{get_put_result, get_version, HeaderConfig};
20use crate::client::list::ListClient;
21use crate::client::retry::RetryExt;
22use crate::client::s3::{
23    CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
24    ListResponse,
25};
26use crate::client::GetOptionsExt;
27use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE};
28use crate::multipart::PartId;
29use crate::path::{Path, DELIMITER};
30use crate::util::hex_encode;
31use crate::{
32    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, PutMode,
33    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
34};
35use async_trait::async_trait;
36use base64::prelude::BASE64_STANDARD;
37use base64::Engine;
38use bytes::Buf;
39use hyper::header::{
40    CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
41    CONTENT_TYPE,
42};
43use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
44use reqwest::header::HeaderName;
45use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
46use serde::{Deserialize, Serialize};
47use snafu::{OptionExt, ResultExt, Snafu};
48use std::sync::Arc;
49
50const VERSION_HEADER: &str = "x-goog-generation";
51const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
52const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-goog-meta-";
53
54static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match");
55
56#[derive(Debug, Snafu)]
57enum Error {
58    #[snafu(display("Error performing list request: {}", source))]
59    ListRequest { source: crate::client::retry::Error },
60
61    #[snafu(display("Error getting list response body: {}", source))]
62    ListResponseBody { source: reqwest::Error },
63
64    #[snafu(display("Got invalid list response: {}", source))]
65    InvalidListResponse { source: quick_xml::de::DeError },
66
67    #[snafu(display("Error performing get request {}: {}", path, source))]
68    GetRequest {
69        source: crate::client::retry::Error,
70        path: String,
71    },
72
73    #[snafu(display("Error performing request {}: {}", path, source))]
74    Request {
75        source: crate::client::retry::Error,
76        path: String,
77    },
78
79    #[snafu(display("Error getting put response body: {}", source))]
80    PutResponseBody { source: reqwest::Error },
81
82    #[snafu(display("Got invalid put response: {}", source))]
83    InvalidPutResponse { source: quick_xml::de::DeError },
84
85    #[snafu(display("Unable to extract metadata from headers: {}", source))]
86    Metadata {
87        source: crate::client::header::Error,
88    },
89
90    #[snafu(display("Version required for conditional update"))]
91    MissingVersion,
92
93    #[snafu(display("Error performing complete multipart request: {}", source))]
94    CompleteMultipartRequest { source: crate::client::retry::Error },
95
96    #[snafu(display("Error getting complete multipart response body: {}", source))]
97    CompleteMultipartResponseBody { source: reqwest::Error },
98
99    #[snafu(display("Got invalid multipart response: {}", source))]
100    InvalidMultipartResponse { source: quick_xml::de::DeError },
101
102    #[snafu(display("Error signing blob: {}", source))]
103    SignBlobRequest { source: crate::client::retry::Error },
104
105    #[snafu(display("Got invalid signing blob response: {}", source))]
106    InvalidSignBlobResponse { source: reqwest::Error },
107
108    #[snafu(display("Got invalid signing blob signature: {}", source))]
109    InvalidSignBlobSignature { source: base64::DecodeError },
110}
111
112impl From<Error> for crate::Error {
113    fn from(err: Error) -> Self {
114        match err {
115            Error::GetRequest { source, path } | Error::Request { source, path } => {
116                source.error(STORE, path)
117            }
118            _ => Self::Generic {
119                store: STORE,
120                source: Box::new(err),
121            },
122        }
123    }
124}
125
126#[derive(Debug)]
127pub struct GoogleCloudStorageConfig {
128    pub base_url: String,
129
130    pub credentials: GcpCredentialProvider,
131
132    pub signing_credentials: GcpSigningCredentialProvider,
133
134    pub bucket_name: String,
135
136    pub retry_config: RetryConfig,
137
138    pub client_options: ClientOptions,
139}
140
141impl GoogleCloudStorageConfig {
142    pub fn new(
143        base_url: String,
144        credentials: GcpCredentialProvider,
145        signing_credentials: GcpSigningCredentialProvider,
146        bucket_name: String,
147        retry_config: RetryConfig,
148        client_options: ClientOptions,
149    ) -> Self {
150        Self {
151            base_url,
152            credentials,
153            signing_credentials,
154            bucket_name,
155            retry_config,
156            client_options,
157        }
158    }
159
160    pub fn path_url(&self, path: &Path) -> String {
161        format!("{}/{}/{}", self.base_url, self.bucket_name, path)
162    }
163}
164
165/// A builder for a put request allowing customisation of the headers and query string
166pub struct Request<'a> {
167    path: &'a Path,
168    config: &'a GoogleCloudStorageConfig,
169    payload: Option<PutPayload>,
170    builder: RequestBuilder,
171    idempotent: bool,
172}
173
174impl<'a> Request<'a> {
175    fn header(self, k: &HeaderName, v: &str) -> Self {
176        let builder = self.builder.header(k, v);
177        Self { builder, ..self }
178    }
179
180    fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
181        let builder = self.builder.query(query);
182        Self { builder, ..self }
183    }
184
185    fn idempotent(mut self, idempotent: bool) -> Self {
186        self.idempotent = idempotent;
187        self
188    }
189
190    fn with_attributes(self, attributes: Attributes) -> Self {
191        let mut builder = self.builder;
192        let mut has_content_type = false;
193        for (k, v) in &attributes {
194            builder = match k {
195                Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
196                Attribute::ContentDisposition => builder.header(CONTENT_DISPOSITION, v.as_ref()),
197                Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
198                Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
199                Attribute::ContentType => {
200                    has_content_type = true;
201                    builder.header(CONTENT_TYPE, v.as_ref())
202                }
203                Attribute::Metadata(k_suffix) => builder.header(
204                    &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix),
205                    v.as_ref(),
206                ),
207            };
208        }
209
210        if !has_content_type {
211            let value = self.config.client_options.get_content_type(self.path);
212            builder = builder.header(CONTENT_TYPE, value.unwrap_or(DEFAULT_CONTENT_TYPE))
213        }
214        Self { builder, ..self }
215    }
216
217    fn with_payload(self, payload: PutPayload) -> Self {
218        let content_length = payload.content_length();
219        Self {
220            builder: self.builder.header(CONTENT_LENGTH, content_length),
221            payload: Some(payload),
222            ..self
223        }
224    }
225
226    async fn send(self) -> Result<Response> {
227        let credential = self.config.credentials.get_credential().await?;
228        let resp = self
229            .builder
230            .bearer_auth(&credential.bearer)
231            .retryable(&self.config.retry_config)
232            .idempotent(self.idempotent)
233            .payload(self.payload)
234            .send()
235            .await
236            .context(RequestSnafu {
237                path: self.path.as_ref(),
238            })?;
239        Ok(resp)
240    }
241
242    async fn do_put(self) -> Result<PutResult> {
243        let response = self.send().await?;
244        Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
245    }
246}
247
248/// Sign Blob Request Body
249#[derive(Debug, Serialize)]
250struct SignBlobBody {
251    /// The payload to sign
252    payload: String,
253}
254
255/// Sign Blob Response
256#[derive(Deserialize)]
257#[serde(rename_all = "camelCase")]
258struct SignBlobResponse {
259    /// The signature for the payload
260    signed_blob: String,
261}
262
263#[derive(Debug)]
264pub struct GoogleCloudStorageClient {
265    config: GoogleCloudStorageConfig,
266
267    client: Client,
268
269    bucket_name_encoded: String,
270
271    // TODO: Hook this up in tests
272    max_list_results: Option<String>,
273}
274
275impl GoogleCloudStorageClient {
276    pub fn new(config: GoogleCloudStorageConfig) -> Result<Self> {
277        let client = config.client_options.client()?;
278        let bucket_name_encoded =
279            percent_encode(config.bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
280
281        Ok(Self {
282            config,
283            client,
284            bucket_name_encoded,
285            max_list_results: None,
286        })
287    }
288
289    pub fn config(&self) -> &GoogleCloudStorageConfig {
290        &self.config
291    }
292
293    async fn get_credential(&self) -> Result<Arc<GcpCredential>> {
294        self.config.credentials.get_credential().await
295    }
296
297    /// Create a signature from a string-to-sign using Google Cloud signBlob method.
298    /// form like:
299    /// ```plaintext
300    /// curl -X POST --data-binary @JSON_FILE_NAME \
301    /// -H "Authorization: Bearer OAUTH2_TOKEN" \
302    /// -H "Content-Type: application/json" \
303    /// "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:signBlob"
304    /// ```
305    ///
306    /// 'JSON_FILE_NAME' is a file containing the following JSON object:
307    /// ```plaintext
308    /// {
309    ///  "payload": "REQUEST_INFORMATION"
310    /// }
311    /// ```
312    pub async fn sign_blob(&self, string_to_sign: &str, client_email: &str) -> Result<String> {
313        let credential = self.get_credential().await?;
314        let body = SignBlobBody {
315            payload: BASE64_STANDARD.encode(string_to_sign),
316        };
317
318        let url = format!(
319            "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob",
320            client_email
321        );
322
323        let response = self
324            .client
325            .post(&url)
326            .bearer_auth(&credential.bearer)
327            .json(&body)
328            .retryable(&self.config.retry_config)
329            .idempotent(true)
330            .send()
331            .await
332            .context(SignBlobRequestSnafu)?;
333
334        //If successful, the signature is returned in the signedBlob field in the response.
335        let response = response
336            .json::<SignBlobResponse>()
337            .await
338            .context(InvalidSignBlobResponseSnafu)?;
339
340        let signed_blob = BASE64_STANDARD
341            .decode(response.signed_blob)
342            .context(InvalidSignBlobSignatureSnafu)?;
343
344        Ok(hex_encode(&signed_blob))
345    }
346
347    pub fn object_url(&self, path: &Path) -> String {
348        let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
349        format!(
350            "{}/{}/{}",
351            self.config.base_url, self.bucket_name_encoded, encoded
352        )
353    }
354
355    /// Perform a put request <https://cloud.google.com/storage/docs/xml-api/put-object-upload>
356    ///
357    /// Returns the new ETag
358    pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
359        let builder = self.client.request(method, self.object_url(path));
360
361        Request {
362            path,
363            builder,
364            payload: None,
365            config: &self.config,
366            idempotent: false,
367        }
368    }
369
370    pub async fn put(
371        &self,
372        path: &Path,
373        payload: PutPayload,
374        opts: PutOptions,
375    ) -> Result<PutResult> {
376        let builder = self
377            .request(Method::PUT, path)
378            .with_payload(payload)
379            .with_attributes(opts.attributes);
380
381        let builder = match &opts.mode {
382            PutMode::Overwrite => builder.idempotent(true),
383            PutMode::Create => builder.header(&VERSION_MATCH, "0"),
384            PutMode::Update(v) => {
385                let etag = v.version.as_ref().context(MissingVersionSnafu)?;
386                builder.header(&VERSION_MATCH, etag)
387            }
388        };
389
390        match (opts.mode, builder.do_put().await) {
391            (PutMode::Create, Err(crate::Error::Precondition { path, source })) => {
392                Err(crate::Error::AlreadyExists { path, source })
393            }
394            (_, r) => r,
395        }
396    }
397
398    /// Perform a put part request <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
399    ///
400    /// Returns the new [`PartId`]
401    pub async fn put_part(
402        &self,
403        path: &Path,
404        upload_id: &MultipartId,
405        part_idx: usize,
406        data: PutPayload,
407    ) -> Result<PartId> {
408        let query = &[
409            ("partNumber", &format!("{}", part_idx + 1)),
410            ("uploadId", upload_id),
411        ];
412        let result = self
413            .request(Method::PUT, path)
414            .with_payload(data)
415            .query(query)
416            .idempotent(true)
417            .do_put()
418            .await?;
419
420        Ok(PartId {
421            content_id: result.e_tag.unwrap(),
422        })
423    }
424
425    /// Initiate a multipart upload <https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
426    pub async fn multipart_initiate(
427        &self,
428        path: &Path,
429        opts: PutMultipartOpts,
430    ) -> Result<MultipartId> {
431        let response = self
432            .request(Method::POST, path)
433            .with_attributes(opts.attributes)
434            .header(&CONTENT_LENGTH, "0")
435            .query(&[("uploads", "")])
436            .send()
437            .await?;
438
439        let data = response.bytes().await.context(PutResponseBodySnafu)?;
440        let result: InitiateMultipartUploadResult =
441            quick_xml::de::from_reader(data.as_ref().reader()).context(InvalidPutResponseSnafu)?;
442
443        Ok(result.upload_id)
444    }
445
446    /// Cleanup unused parts <https://cloud.google.com/storage/docs/xml-api/delete-multipart>
447    pub async fn multipart_cleanup(&self, path: &Path, multipart_id: &MultipartId) -> Result<()> {
448        let credential = self.get_credential().await?;
449        let url = self.object_url(path);
450
451        self.client
452            .request(Method::DELETE, &url)
453            .bearer_auth(&credential.bearer)
454            .header(CONTENT_TYPE, "application/octet-stream")
455            .header(CONTENT_LENGTH, "0")
456            .query(&[("uploadId", multipart_id)])
457            .send_retry(&self.config.retry_config)
458            .await
459            .context(RequestSnafu {
460                path: path.as_ref(),
461            })?;
462
463        Ok(())
464    }
465
466    pub async fn multipart_complete(
467        &self,
468        path: &Path,
469        multipart_id: &MultipartId,
470        completed_parts: Vec<PartId>,
471    ) -> Result<PutResult> {
472        if completed_parts.is_empty() {
473            // GCS doesn't allow empty multipart uploads
474            let result = self
475                .request(Method::PUT, path)
476                .idempotent(true)
477                .do_put()
478                .await?;
479            self.multipart_cleanup(path, multipart_id).await?;
480            return Ok(result);
481        }
482
483        let upload_id = multipart_id.clone();
484        let url = self.object_url(path);
485
486        let upload_info = CompleteMultipartUpload::from(completed_parts);
487        let credential = self.get_credential().await?;
488
489        let data = quick_xml::se::to_string(&upload_info)
490            .context(InvalidPutResponseSnafu)?
491            // We cannot disable the escaping that transforms "/" to "&quote;" :(
492            // https://github.com/tafia/quick-xml/issues/362
493            // https://github.com/tafia/quick-xml/issues/350
494            .replace("&quot;", "\"");
495
496        let response = self
497            .client
498            .request(Method::POST, &url)
499            .bearer_auth(&credential.bearer)
500            .query(&[("uploadId", upload_id)])
501            .body(data)
502            .retryable(&self.config.retry_config)
503            .idempotent(true)
504            .send()
505            .await
506            .context(CompleteMultipartRequestSnafu)?;
507
508        let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
509
510        let data = response
511            .bytes()
512            .await
513            .context(CompleteMultipartResponseBodySnafu)?;
514
515        let response: CompleteMultipartUploadResult =
516            quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
517
518        Ok(PutResult {
519            e_tag: Some(response.e_tag),
520            version,
521        })
522    }
523
524    /// Perform a delete request <https://cloud.google.com/storage/docs/xml-api/delete-object>
525    pub async fn delete_request(&self, path: &Path) -> Result<()> {
526        self.request(Method::DELETE, path).send().await?;
527        Ok(())
528    }
529
530    /// Perform a copy request <https://cloud.google.com/storage/docs/xml-api/put-object-copy>
531    pub async fn copy_request(&self, from: &Path, to: &Path, if_not_exists: bool) -> Result<()> {
532        let credential = self.get_credential().await?;
533        let url = self.object_url(to);
534
535        let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
536        let source = format!("{}/{}", self.bucket_name_encoded, from);
537
538        let mut builder = self
539            .client
540            .request(Method::PUT, url)
541            .header("x-goog-copy-source", source);
542
543        if if_not_exists {
544            builder = builder.header(&VERSION_MATCH, 0);
545        }
546
547        builder
548            .bearer_auth(&credential.bearer)
549            // Needed if reqwest is compiled with native-tls instead of rustls-tls
550            // See https://github.com/apache/arrow-rs/pull/3921
551            .header(CONTENT_LENGTH, 0)
552            .retryable(&self.config.retry_config)
553            .idempotent(!if_not_exists)
554            .send()
555            .await
556            .map_err(|err| match err.status() {
557                Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists {
558                    source: Box::new(err),
559                    path: to.to_string(),
560                },
561                _ => err.error(STORE, from.to_string()),
562            })?;
563
564        Ok(())
565    }
566}
567
568#[async_trait]
569impl GetClient for GoogleCloudStorageClient {
570    const STORE: &'static str = STORE;
571    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
572        etag_required: true,
573        last_modified_required: true,
574        version_header: Some(VERSION_HEADER),
575        user_defined_metadata_prefix: Some(USER_DEFINED_METADATA_HEADER_PREFIX),
576    };
577
578    /// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
579    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
580        let credential = self.get_credential().await?;
581        let url = self.object_url(path);
582
583        let method = match options.head {
584            true => Method::HEAD,
585            false => Method::GET,
586        };
587
588        let mut request = self.client.request(method, url);
589
590        if let Some(version) = &options.version {
591            request = request.query(&[("generation", version)]);
592        }
593
594        if !credential.bearer.is_empty() {
595            request = request.bearer_auth(&credential.bearer);
596        }
597
598        let response = request
599            .with_get_options(options)
600            .send_retry(&self.config.retry_config)
601            .await
602            .context(GetRequestSnafu {
603                path: path.as_ref(),
604            })?;
605
606        Ok(response)
607    }
608}
609
610#[async_trait]
611impl ListClient for GoogleCloudStorageClient {
612    /// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
613    async fn list_request(
614        &self,
615        prefix: Option<&str>,
616        delimiter: bool,
617        page_token: Option<&str>,
618        offset: Option<&str>,
619    ) -> Result<(ListResult, Option<String>)> {
620        let credential = self.get_credential().await?;
621        let url = format!("{}/{}", self.config.base_url, self.bucket_name_encoded);
622
623        let mut query = Vec::with_capacity(5);
624        query.push(("list-type", "2"));
625        if delimiter {
626            query.push(("delimiter", DELIMITER))
627        }
628
629        if let Some(prefix) = &prefix {
630            query.push(("prefix", prefix))
631        }
632
633        if let Some(page_token) = page_token {
634            query.push(("continuation-token", page_token))
635        }
636
637        if let Some(max_results) = &self.max_list_results {
638            query.push(("max-keys", max_results))
639        }
640
641        if let Some(offset) = offset {
642            query.push(("start-after", offset))
643        }
644
645        let response = self
646            .client
647            .request(Method::GET, url)
648            .query(&query)
649            .bearer_auth(&credential.bearer)
650            .send_retry(&self.config.retry_config)
651            .await
652            .context(ListRequestSnafu)?
653            .bytes()
654            .await
655            .context(ListResponseBodySnafu)?;
656
657        let mut response: ListResponse =
658            quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
659
660        let token = response.next_continuation_token.take();
661        Ok((response.try_into()?, token))
662    }
663}