object_store/gcp/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for Google Cloud Storage
19//!
20//! ## Multipart uploads
21//!
22//! [Multipart uploads](https://cloud.google.com/storage/docs/multipart-uploads)
23//! can be initiated with the [ObjectStore::put_multipart] method. If neither
24//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, you may
25//! have parts uploaded to GCS but not used, that you will be charged for. It is recommended
26//! you configure a [lifecycle rule] to abort incomplete multipart uploads after a certain
27//! period of time to avoid being charged for storing partial uploads.
28//!
29//! ## Using HTTP/2
30//!
31//! Google Cloud Storage supports both HTTP/2 and HTTP/1. HTTP/1 is used by default
32//! because it allows much higher throughput in our benchmarks (see
33//! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be
34//! enabled by setting [crate::ClientConfigKey::Http1Only] to false.
35//!
36//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
37use std::sync::Arc;
38use std::time::Duration;
39
40use crate::client::CredentialProvider;
41use crate::gcp::credential::GCSAuthorizer;
42use crate::signer::Signer;
43use crate::{
44    multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
45    ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
46    UploadPart,
47};
48use async_trait::async_trait;
49use client::GoogleCloudStorageClient;
50use futures::stream::BoxStream;
51use hyper::Method;
52use url::Url;
53
54use crate::client::get::GetClientExt;
55use crate::client::list::ListClientExt;
56use crate::client::parts::Parts;
57use crate::multipart::MultipartStore;
58pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
59pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
60
61mod builder;
62mod client;
63mod credential;
64
65const STORE: &str = "GCS";
66
67/// [`CredentialProvider`] for [`GoogleCloudStorage`]
68pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
69
70/// [`GcpSigningCredential`] for [`GoogleCloudStorage`]
71pub type GcpSigningCredentialProvider =
72    Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
73
74/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
75#[derive(Debug)]
76pub struct GoogleCloudStorage {
77    client: Arc<GoogleCloudStorageClient>,
78}
79
80impl std::fmt::Display for GoogleCloudStorage {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        write!(
83            f,
84            "GoogleCloudStorage({})",
85            self.client.config().bucket_name
86        )
87    }
88}
89
90impl GoogleCloudStorage {
91    /// Returns the [`GcpCredentialProvider`] used by [`GoogleCloudStorage`]
92    pub fn credentials(&self) -> &GcpCredentialProvider {
93        &self.client.config().credentials
94    }
95
96    /// Returns the [`GcpSigningCredentialProvider`] used by [`GoogleCloudStorage`]
97    pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
98        &self.client.config().signing_credentials
99    }
100}
101
102#[derive(Debug)]
103struct GCSMultipartUpload {
104    state: Arc<UploadState>,
105    part_idx: usize,
106}
107
108#[derive(Debug)]
109struct UploadState {
110    client: Arc<GoogleCloudStorageClient>,
111    path: Path,
112    multipart_id: MultipartId,
113    parts: Parts,
114}
115
116#[async_trait]
117impl MultipartUpload for GCSMultipartUpload {
118    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
119        let idx = self.part_idx;
120        self.part_idx += 1;
121        let state = Arc::clone(&self.state);
122        Box::pin(async move {
123            let part = state
124                .client
125                .put_part(&state.path, &state.multipart_id, idx, payload)
126                .await?;
127            state.parts.put(idx, part);
128            Ok(())
129        })
130    }
131
132    async fn complete(&mut self) -> Result<PutResult> {
133        let parts = self.state.parts.finish(self.part_idx)?;
134
135        self.state
136            .client
137            .multipart_complete(&self.state.path, &self.state.multipart_id, parts)
138            .await
139    }
140
141    async fn abort(&mut self) -> Result<()> {
142        self.state
143            .client
144            .multipart_cleanup(&self.state.path, &self.state.multipart_id)
145            .await
146    }
147}
148
149#[async_trait]
150impl ObjectStore for GoogleCloudStorage {
151    async fn put_opts(
152        &self,
153        location: &Path,
154        payload: PutPayload,
155        opts: PutOptions,
156    ) -> Result<PutResult> {
157        self.client.put(location, payload, opts).await
158    }
159
160    async fn put_multipart_opts(
161        &self,
162        location: &Path,
163        opts: PutMultipartOpts,
164    ) -> Result<Box<dyn MultipartUpload>> {
165        let upload_id = self.client.multipart_initiate(location, opts).await?;
166
167        Ok(Box::new(GCSMultipartUpload {
168            part_idx: 0,
169            state: Arc::new(UploadState {
170                client: Arc::clone(&self.client),
171                path: location.clone(),
172                multipart_id: upload_id.clone(),
173                parts: Default::default(),
174            }),
175        }))
176    }
177
178    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
179        self.client.get_opts(location, options).await
180    }
181
182    async fn delete(&self, location: &Path) -> Result<()> {
183        self.client.delete_request(location).await
184    }
185
186    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
187        self.client.list(prefix)
188    }
189
190    fn list_with_offset(
191        &self,
192        prefix: Option<&Path>,
193        offset: &Path,
194    ) -> BoxStream<'_, Result<ObjectMeta>> {
195        self.client.list_with_offset(prefix, offset)
196    }
197
198    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
199        self.client.list_with_delimiter(prefix).await
200    }
201
202    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
203        self.client.copy_request(from, to, false).await
204    }
205
206    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
207        self.client.copy_request(from, to, true).await
208    }
209}
210
211#[async_trait]
212impl MultipartStore for GoogleCloudStorage {
213    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
214        self.client
215            .multipart_initiate(path, PutMultipartOpts::default())
216            .await
217    }
218
219    async fn put_part(
220        &self,
221        path: &Path,
222        id: &MultipartId,
223        part_idx: usize,
224        payload: PutPayload,
225    ) -> Result<PartId> {
226        self.client.put_part(path, id, part_idx, payload).await
227    }
228
229    async fn complete_multipart(
230        &self,
231        path: &Path,
232        id: &MultipartId,
233        parts: Vec<PartId>,
234    ) -> Result<PutResult> {
235        self.client.multipart_complete(path, id, parts).await
236    }
237
238    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
239        self.client.multipart_cleanup(path, id).await
240    }
241}
242
243#[async_trait]
244impl Signer for GoogleCloudStorage {
245    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
246        if expires_in.as_secs() > 604800 {
247            return Err(crate::Error::Generic {
248                store: STORE,
249                source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(),
250            });
251        }
252
253        let config = self.client.config();
254        let path_url = config.path_url(path);
255        let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
256            store: STORE,
257            source: format!("Unable to parse url {path_url}: {e}").into(),
258        })?;
259
260        let signing_credentials = self.signing_credentials().get_credential().await?;
261        let authorizer = GCSAuthorizer::new(signing_credentials);
262
263        authorizer
264            .sign(method, &mut url, expires_in, &self.client)
265            .await?;
266
267        Ok(url)
268    }
269}
270
271#[cfg(test)]
272mod test {
273
274    use credential::DEFAULT_GCS_BASE_URL;
275
276    use crate::integration::*;
277    use crate::tests::*;
278
279    use super::*;
280
281    const NON_EXISTENT_NAME: &str = "nonexistentname";
282
283    #[tokio::test]
284    async fn gcs_test() {
285        maybe_skip_integration!();
286        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
287
288        put_get_delete_list(&integration).await;
289        list_uses_directories_correctly(&integration).await;
290        list_with_delimiter(&integration).await;
291        rename_and_copy(&integration).await;
292        if integration.client.config().base_url == DEFAULT_GCS_BASE_URL {
293            // Fake GCS server doesn't currently honor ifGenerationMatch
294            // https://github.com/fsouza/fake-gcs-server/issues/994
295            copy_if_not_exists(&integration).await;
296            // Fake GCS server does not yet implement XML Multipart uploads
297            // https://github.com/fsouza/fake-gcs-server/issues/852
298            stream_get(&integration).await;
299            multipart(&integration, &integration).await;
300            // Fake GCS server doesn't currently honor preconditions
301            get_opts(&integration).await;
302            put_opts(&integration, true).await;
303            // Fake GCS server doesn't currently support attributes
304            put_get_attributes(&integration).await;
305        }
306    }
307
308    #[tokio::test]
309    #[ignore]
310    async fn gcs_test_sign() {
311        maybe_skip_integration!();
312        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
313
314        let client = reqwest::Client::new();
315
316        let path = Path::from("test_sign");
317        let url = integration
318            .signed_url(Method::PUT, &path, Duration::from_secs(3600))
319            .await
320            .unwrap();
321        println!("PUT {url}");
322
323        let resp = client.put(url).body("data").send().await.unwrap();
324        resp.error_for_status().unwrap();
325
326        let url = integration
327            .signed_url(Method::GET, &path, Duration::from_secs(3600))
328            .await
329            .unwrap();
330        println!("GET {url}");
331
332        let resp = client.get(url).send().await.unwrap();
333        let resp = resp.error_for_status().unwrap();
334        let data = resp.bytes().await.unwrap();
335        assert_eq!(data.as_ref(), b"data");
336    }
337
338    #[tokio::test]
339    async fn gcs_test_get_nonexistent_location() {
340        maybe_skip_integration!();
341        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
342
343        let location = Path::from_iter([NON_EXISTENT_NAME]);
344
345        let err = integration.get(&location).await.unwrap_err();
346
347        assert!(
348            matches!(err, crate::Error::NotFound { .. }),
349            "unexpected error type: {err}"
350        );
351    }
352
353    #[tokio::test]
354    async fn gcs_test_get_nonexistent_bucket() {
355        maybe_skip_integration!();
356        let config = GoogleCloudStorageBuilder::from_env();
357        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
358
359        let location = Path::from_iter([NON_EXISTENT_NAME]);
360
361        let err = get_nonexistent_object(&integration, Some(location))
362            .await
363            .unwrap_err();
364
365        assert!(
366            matches!(err, crate::Error::NotFound { .. }),
367            "unexpected error type: {err}"
368        );
369    }
370
371    #[tokio::test]
372    async fn gcs_test_delete_nonexistent_location() {
373        maybe_skip_integration!();
374        let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
375
376        let location = Path::from_iter([NON_EXISTENT_NAME]);
377
378        let err = integration.delete(&location).await.unwrap_err();
379        assert!(
380            matches!(err, crate::Error::NotFound { .. }),
381            "unexpected error type: {err}"
382        );
383    }
384
385    #[tokio::test]
386    async fn gcs_test_delete_nonexistent_bucket() {
387        maybe_skip_integration!();
388        let config = GoogleCloudStorageBuilder::from_env();
389        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
390
391        let location = Path::from_iter([NON_EXISTENT_NAME]);
392
393        let err = integration.delete(&location).await.unwrap_err();
394        assert!(
395            matches!(err, crate::Error::NotFound { .. }),
396            "unexpected error type: {err}"
397        );
398    }
399
400    #[tokio::test]
401    async fn gcs_test_put_nonexistent_bucket() {
402        maybe_skip_integration!();
403        let config = GoogleCloudStorageBuilder::from_env();
404        let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
405
406        let location = Path::from_iter([NON_EXISTENT_NAME]);
407        let data = PutPayload::from("arbitrary data");
408
409        let err = integration
410            .put(&location, data)
411            .await
412            .unwrap_err()
413            .to_string();
414        assert!(
415            err.contains("Client error with status 404 Not Found"),
416            "{}",
417            err
418        )
419    }
420}