1use std::sync::Arc;
38use std::time::Duration;
39
40use crate::client::CredentialProvider;
41use crate::gcp::credential::GCSAuthorizer;
42use crate::signer::Signer;
43use crate::{
44 multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
45 ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
46 UploadPart,
47};
48use async_trait::async_trait;
49use client::GoogleCloudStorageClient;
50use futures::stream::BoxStream;
51use hyper::Method;
52use url::Url;
53
54use crate::client::get::GetClientExt;
55use crate::client::list::ListClientExt;
56use crate::client::parts::Parts;
57use crate::multipart::MultipartStore;
58pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
59pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
60
61mod builder;
62mod client;
63mod credential;
64
65const STORE: &str = "GCS";
66
67pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
69
70pub type GcpSigningCredentialProvider =
72 Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
73
74#[derive(Debug)]
76pub struct GoogleCloudStorage {
77 client: Arc<GoogleCloudStorageClient>,
78}
79
80impl std::fmt::Display for GoogleCloudStorage {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 write!(
83 f,
84 "GoogleCloudStorage({})",
85 self.client.config().bucket_name
86 )
87 }
88}
89
90impl GoogleCloudStorage {
91 pub fn credentials(&self) -> &GcpCredentialProvider {
93 &self.client.config().credentials
94 }
95
96 pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
98 &self.client.config().signing_credentials
99 }
100}
101
102#[derive(Debug)]
103struct GCSMultipartUpload {
104 state: Arc<UploadState>,
105 part_idx: usize,
106}
107
108#[derive(Debug)]
109struct UploadState {
110 client: Arc<GoogleCloudStorageClient>,
111 path: Path,
112 multipart_id: MultipartId,
113 parts: Parts,
114}
115
116#[async_trait]
117impl MultipartUpload for GCSMultipartUpload {
118 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
119 let idx = self.part_idx;
120 self.part_idx += 1;
121 let state = Arc::clone(&self.state);
122 Box::pin(async move {
123 let part = state
124 .client
125 .put_part(&state.path, &state.multipart_id, idx, payload)
126 .await?;
127 state.parts.put(idx, part);
128 Ok(())
129 })
130 }
131
132 async fn complete(&mut self) -> Result<PutResult> {
133 let parts = self.state.parts.finish(self.part_idx)?;
134
135 self.state
136 .client
137 .multipart_complete(&self.state.path, &self.state.multipart_id, parts)
138 .await
139 }
140
141 async fn abort(&mut self) -> Result<()> {
142 self.state
143 .client
144 .multipart_cleanup(&self.state.path, &self.state.multipart_id)
145 .await
146 }
147}
148
149#[async_trait]
150impl ObjectStore for GoogleCloudStorage {
151 async fn put_opts(
152 &self,
153 location: &Path,
154 payload: PutPayload,
155 opts: PutOptions,
156 ) -> Result<PutResult> {
157 self.client.put(location, payload, opts).await
158 }
159
160 async fn put_multipart_opts(
161 &self,
162 location: &Path,
163 opts: PutMultipartOpts,
164 ) -> Result<Box<dyn MultipartUpload>> {
165 let upload_id = self.client.multipart_initiate(location, opts).await?;
166
167 Ok(Box::new(GCSMultipartUpload {
168 part_idx: 0,
169 state: Arc::new(UploadState {
170 client: Arc::clone(&self.client),
171 path: location.clone(),
172 multipart_id: upload_id.clone(),
173 parts: Default::default(),
174 }),
175 }))
176 }
177
178 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
179 self.client.get_opts(location, options).await
180 }
181
182 async fn delete(&self, location: &Path) -> Result<()> {
183 self.client.delete_request(location).await
184 }
185
186 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
187 self.client.list(prefix)
188 }
189
190 fn list_with_offset(
191 &self,
192 prefix: Option<&Path>,
193 offset: &Path,
194 ) -> BoxStream<'_, Result<ObjectMeta>> {
195 self.client.list_with_offset(prefix, offset)
196 }
197
198 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
199 self.client.list_with_delimiter(prefix).await
200 }
201
202 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
203 self.client.copy_request(from, to, false).await
204 }
205
206 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
207 self.client.copy_request(from, to, true).await
208 }
209}
210
211#[async_trait]
212impl MultipartStore for GoogleCloudStorage {
213 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
214 self.client
215 .multipart_initiate(path, PutMultipartOpts::default())
216 .await
217 }
218
219 async fn put_part(
220 &self,
221 path: &Path,
222 id: &MultipartId,
223 part_idx: usize,
224 payload: PutPayload,
225 ) -> Result<PartId> {
226 self.client.put_part(path, id, part_idx, payload).await
227 }
228
229 async fn complete_multipart(
230 &self,
231 path: &Path,
232 id: &MultipartId,
233 parts: Vec<PartId>,
234 ) -> Result<PutResult> {
235 self.client.multipart_complete(path, id, parts).await
236 }
237
238 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
239 self.client.multipart_cleanup(path, id).await
240 }
241}
242
243#[async_trait]
244impl Signer for GoogleCloudStorage {
245 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
246 if expires_in.as_secs() > 604800 {
247 return Err(crate::Error::Generic {
248 store: STORE,
249 source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(),
250 });
251 }
252
253 let config = self.client.config();
254 let path_url = config.path_url(path);
255 let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
256 store: STORE,
257 source: format!("Unable to parse url {path_url}: {e}").into(),
258 })?;
259
260 let signing_credentials = self.signing_credentials().get_credential().await?;
261 let authorizer = GCSAuthorizer::new(signing_credentials);
262
263 authorizer
264 .sign(method, &mut url, expires_in, &self.client)
265 .await?;
266
267 Ok(url)
268 }
269}
270
271#[cfg(test)]
272mod test {
273
274 use credential::DEFAULT_GCS_BASE_URL;
275
276 use crate::integration::*;
277 use crate::tests::*;
278
279 use super::*;
280
281 const NON_EXISTENT_NAME: &str = "nonexistentname";
282
283 #[tokio::test]
284 async fn gcs_test() {
285 maybe_skip_integration!();
286 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
287
288 put_get_delete_list(&integration).await;
289 list_uses_directories_correctly(&integration).await;
290 list_with_delimiter(&integration).await;
291 rename_and_copy(&integration).await;
292 if integration.client.config().base_url == DEFAULT_GCS_BASE_URL {
293 copy_if_not_exists(&integration).await;
296 stream_get(&integration).await;
299 multipart(&integration, &integration).await;
300 get_opts(&integration).await;
302 put_opts(&integration, true).await;
303 put_get_attributes(&integration).await;
305 }
306 }
307
308 #[tokio::test]
309 #[ignore]
310 async fn gcs_test_sign() {
311 maybe_skip_integration!();
312 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
313
314 let client = reqwest::Client::new();
315
316 let path = Path::from("test_sign");
317 let url = integration
318 .signed_url(Method::PUT, &path, Duration::from_secs(3600))
319 .await
320 .unwrap();
321 println!("PUT {url}");
322
323 let resp = client.put(url).body("data").send().await.unwrap();
324 resp.error_for_status().unwrap();
325
326 let url = integration
327 .signed_url(Method::GET, &path, Duration::from_secs(3600))
328 .await
329 .unwrap();
330 println!("GET {url}");
331
332 let resp = client.get(url).send().await.unwrap();
333 let resp = resp.error_for_status().unwrap();
334 let data = resp.bytes().await.unwrap();
335 assert_eq!(data.as_ref(), b"data");
336 }
337
338 #[tokio::test]
339 async fn gcs_test_get_nonexistent_location() {
340 maybe_skip_integration!();
341 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
342
343 let location = Path::from_iter([NON_EXISTENT_NAME]);
344
345 let err = integration.get(&location).await.unwrap_err();
346
347 assert!(
348 matches!(err, crate::Error::NotFound { .. }),
349 "unexpected error type: {err}"
350 );
351 }
352
353 #[tokio::test]
354 async fn gcs_test_get_nonexistent_bucket() {
355 maybe_skip_integration!();
356 let config = GoogleCloudStorageBuilder::from_env();
357 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
358
359 let location = Path::from_iter([NON_EXISTENT_NAME]);
360
361 let err = get_nonexistent_object(&integration, Some(location))
362 .await
363 .unwrap_err();
364
365 assert!(
366 matches!(err, crate::Error::NotFound { .. }),
367 "unexpected error type: {err}"
368 );
369 }
370
371 #[tokio::test]
372 async fn gcs_test_delete_nonexistent_location() {
373 maybe_skip_integration!();
374 let integration = GoogleCloudStorageBuilder::from_env().build().unwrap();
375
376 let location = Path::from_iter([NON_EXISTENT_NAME]);
377
378 let err = integration.delete(&location).await.unwrap_err();
379 assert!(
380 matches!(err, crate::Error::NotFound { .. }),
381 "unexpected error type: {err}"
382 );
383 }
384
385 #[tokio::test]
386 async fn gcs_test_delete_nonexistent_bucket() {
387 maybe_skip_integration!();
388 let config = GoogleCloudStorageBuilder::from_env();
389 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
390
391 let location = Path::from_iter([NON_EXISTENT_NAME]);
392
393 let err = integration.delete(&location).await.unwrap_err();
394 assert!(
395 matches!(err, crate::Error::NotFound { .. }),
396 "unexpected error type: {err}"
397 );
398 }
399
400 #[tokio::test]
401 async fn gcs_test_put_nonexistent_bucket() {
402 maybe_skip_integration!();
403 let config = GoogleCloudStorageBuilder::from_env();
404 let integration = config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
405
406 let location = Path::from_iter([NON_EXISTENT_NAME]);
407 let data = PutPayload::from("arbitrary data");
408
409 let err = integration
410 .put(&location, data)
411 .await
412 .unwrap_err()
413 .to_string();
414 assert!(
415 err.contains("Client error with status 404 Not Found"),
416 "{}",
417 err
418 )
419 }
420}