1use crate::client::get::GetClient;
19use crate::client::header::{get_put_result, get_version, HeaderConfig};
20use crate::client::list::ListClient;
21use crate::client::retry::RetryExt;
22use crate::client::s3::{
23 CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
24 ListResponse,
25};
26use crate::client::GetOptionsExt;
27use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE};
28use crate::multipart::PartId;
29use crate::path::{Path, DELIMITER};
30use crate::util::hex_encode;
31use crate::{
32 Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, PutMode,
33 PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
34};
35use async_trait::async_trait;
36use base64::prelude::BASE64_STANDARD;
37use base64::Engine;
38use bytes::Buf;
39use hyper::header::{
40 CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
41 CONTENT_TYPE,
42};
43use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
44use reqwest::header::HeaderName;
45use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
46use serde::{Deserialize, Serialize};
47use snafu::{OptionExt, ResultExt, Snafu};
48use std::sync::Arc;
49
50const VERSION_HEADER: &str = "x-goog-generation";
51const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
52const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-goog-meta-";
53
54static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match");
55
56#[derive(Debug, Snafu)]
57enum Error {
58 #[snafu(display("Error performing list request: {}", source))]
59 ListRequest { source: crate::client::retry::Error },
60
61 #[snafu(display("Error getting list response body: {}", source))]
62 ListResponseBody { source: reqwest::Error },
63
64 #[snafu(display("Got invalid list response: {}", source))]
65 InvalidListResponse { source: quick_xml::de::DeError },
66
67 #[snafu(display("Error performing get request {}: {}", path, source))]
68 GetRequest {
69 source: crate::client::retry::Error,
70 path: String,
71 },
72
73 #[snafu(display("Error performing request {}: {}", path, source))]
74 Request {
75 source: crate::client::retry::Error,
76 path: String,
77 },
78
79 #[snafu(display("Error getting put response body: {}", source))]
80 PutResponseBody { source: reqwest::Error },
81
82 #[snafu(display("Got invalid put response: {}", source))]
83 InvalidPutResponse { source: quick_xml::de::DeError },
84
85 #[snafu(display("Unable to extract metadata from headers: {}", source))]
86 Metadata {
87 source: crate::client::header::Error,
88 },
89
90 #[snafu(display("Version required for conditional update"))]
91 MissingVersion,
92
93 #[snafu(display("Error performing complete multipart request: {}", source))]
94 CompleteMultipartRequest { source: crate::client::retry::Error },
95
96 #[snafu(display("Error getting complete multipart response body: {}", source))]
97 CompleteMultipartResponseBody { source: reqwest::Error },
98
99 #[snafu(display("Got invalid multipart response: {}", source))]
100 InvalidMultipartResponse { source: quick_xml::de::DeError },
101
102 #[snafu(display("Error signing blob: {}", source))]
103 SignBlobRequest { source: crate::client::retry::Error },
104
105 #[snafu(display("Got invalid signing blob response: {}", source))]
106 InvalidSignBlobResponse { source: reqwest::Error },
107
108 #[snafu(display("Got invalid signing blob signature: {}", source))]
109 InvalidSignBlobSignature { source: base64::DecodeError },
110}
111
112impl From<Error> for crate::Error {
113 fn from(err: Error) -> Self {
114 match err {
115 Error::GetRequest { source, path } | Error::Request { source, path } => {
116 source.error(STORE, path)
117 }
118 _ => Self::Generic {
119 store: STORE,
120 source: Box::new(err),
121 },
122 }
123 }
124}
125
126#[derive(Debug)]
127pub struct GoogleCloudStorageConfig {
128 pub base_url: String,
129
130 pub credentials: GcpCredentialProvider,
131
132 pub signing_credentials: GcpSigningCredentialProvider,
133
134 pub bucket_name: String,
135
136 pub retry_config: RetryConfig,
137
138 pub client_options: ClientOptions,
139}
140
141impl GoogleCloudStorageConfig {
142 pub fn new(
143 base_url: String,
144 credentials: GcpCredentialProvider,
145 signing_credentials: GcpSigningCredentialProvider,
146 bucket_name: String,
147 retry_config: RetryConfig,
148 client_options: ClientOptions,
149 ) -> Self {
150 Self {
151 base_url,
152 credentials,
153 signing_credentials,
154 bucket_name,
155 retry_config,
156 client_options,
157 }
158 }
159
160 pub fn path_url(&self, path: &Path) -> String {
161 format!("{}/{}/{}", self.base_url, self.bucket_name, path)
162 }
163}
164
165pub struct Request<'a> {
167 path: &'a Path,
168 config: &'a GoogleCloudStorageConfig,
169 payload: Option<PutPayload>,
170 builder: RequestBuilder,
171 idempotent: bool,
172}
173
174impl<'a> Request<'a> {
175 fn header(self, k: &HeaderName, v: &str) -> Self {
176 let builder = self.builder.header(k, v);
177 Self { builder, ..self }
178 }
179
180 fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
181 let builder = self.builder.query(query);
182 Self { builder, ..self }
183 }
184
185 fn idempotent(mut self, idempotent: bool) -> Self {
186 self.idempotent = idempotent;
187 self
188 }
189
190 fn with_attributes(self, attributes: Attributes) -> Self {
191 let mut builder = self.builder;
192 let mut has_content_type = false;
193 for (k, v) in &attributes {
194 builder = match k {
195 Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
196 Attribute::ContentDisposition => builder.header(CONTENT_DISPOSITION, v.as_ref()),
197 Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
198 Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
199 Attribute::ContentType => {
200 has_content_type = true;
201 builder.header(CONTENT_TYPE, v.as_ref())
202 }
203 Attribute::Metadata(k_suffix) => builder.header(
204 &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix),
205 v.as_ref(),
206 ),
207 };
208 }
209
210 if !has_content_type {
211 let value = self.config.client_options.get_content_type(self.path);
212 builder = builder.header(CONTENT_TYPE, value.unwrap_or(DEFAULT_CONTENT_TYPE))
213 }
214 Self { builder, ..self }
215 }
216
217 fn with_payload(self, payload: PutPayload) -> Self {
218 let content_length = payload.content_length();
219 Self {
220 builder: self.builder.header(CONTENT_LENGTH, content_length),
221 payload: Some(payload),
222 ..self
223 }
224 }
225
226 async fn send(self) -> Result<Response> {
227 let credential = self.config.credentials.get_credential().await?;
228 let resp = self
229 .builder
230 .bearer_auth(&credential.bearer)
231 .retryable(&self.config.retry_config)
232 .idempotent(self.idempotent)
233 .payload(self.payload)
234 .send()
235 .await
236 .context(RequestSnafu {
237 path: self.path.as_ref(),
238 })?;
239 Ok(resp)
240 }
241
242 async fn do_put(self) -> Result<PutResult> {
243 let response = self.send().await?;
244 Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
245 }
246}
247
248#[derive(Debug, Serialize)]
250struct SignBlobBody {
251 payload: String,
253}
254
255#[derive(Deserialize)]
257#[serde(rename_all = "camelCase")]
258struct SignBlobResponse {
259 signed_blob: String,
261}
262
263#[derive(Debug)]
264pub struct GoogleCloudStorageClient {
265 config: GoogleCloudStorageConfig,
266
267 client: Client,
268
269 bucket_name_encoded: String,
270
271 max_list_results: Option<String>,
273}
274
275impl GoogleCloudStorageClient {
276 pub fn new(config: GoogleCloudStorageConfig) -> Result<Self> {
277 let client = config.client_options.client()?;
278 let bucket_name_encoded =
279 percent_encode(config.bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
280
281 Ok(Self {
282 config,
283 client,
284 bucket_name_encoded,
285 max_list_results: None,
286 })
287 }
288
289 pub fn config(&self) -> &GoogleCloudStorageConfig {
290 &self.config
291 }
292
293 async fn get_credential(&self) -> Result<Arc<GcpCredential>> {
294 self.config.credentials.get_credential().await
295 }
296
297 pub async fn sign_blob(&self, string_to_sign: &str, client_email: &str) -> Result<String> {
313 let credential = self.get_credential().await?;
314 let body = SignBlobBody {
315 payload: BASE64_STANDARD.encode(string_to_sign),
316 };
317
318 let url = format!(
319 "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob",
320 client_email
321 );
322
323 let response = self
324 .client
325 .post(&url)
326 .bearer_auth(&credential.bearer)
327 .json(&body)
328 .retryable(&self.config.retry_config)
329 .idempotent(true)
330 .send()
331 .await
332 .context(SignBlobRequestSnafu)?;
333
334 let response = response
336 .json::<SignBlobResponse>()
337 .await
338 .context(InvalidSignBlobResponseSnafu)?;
339
340 let signed_blob = BASE64_STANDARD
341 .decode(response.signed_blob)
342 .context(InvalidSignBlobSignatureSnafu)?;
343
344 Ok(hex_encode(&signed_blob))
345 }
346
347 pub fn object_url(&self, path: &Path) -> String {
348 let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
349 format!(
350 "{}/{}/{}",
351 self.config.base_url, self.bucket_name_encoded, encoded
352 )
353 }
354
355 pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
359 let builder = self.client.request(method, self.object_url(path));
360
361 Request {
362 path,
363 builder,
364 payload: None,
365 config: &self.config,
366 idempotent: false,
367 }
368 }
369
370 pub async fn put(
371 &self,
372 path: &Path,
373 payload: PutPayload,
374 opts: PutOptions,
375 ) -> Result<PutResult> {
376 let builder = self
377 .request(Method::PUT, path)
378 .with_payload(payload)
379 .with_attributes(opts.attributes);
380
381 let builder = match &opts.mode {
382 PutMode::Overwrite => builder.idempotent(true),
383 PutMode::Create => builder.header(&VERSION_MATCH, "0"),
384 PutMode::Update(v) => {
385 let etag = v.version.as_ref().context(MissingVersionSnafu)?;
386 builder.header(&VERSION_MATCH, etag)
387 }
388 };
389
390 match (opts.mode, builder.do_put().await) {
391 (PutMode::Create, Err(crate::Error::Precondition { path, source })) => {
392 Err(crate::Error::AlreadyExists { path, source })
393 }
394 (_, r) => r,
395 }
396 }
397
398 pub async fn put_part(
402 &self,
403 path: &Path,
404 upload_id: &MultipartId,
405 part_idx: usize,
406 data: PutPayload,
407 ) -> Result<PartId> {
408 let query = &[
409 ("partNumber", &format!("{}", part_idx + 1)),
410 ("uploadId", upload_id),
411 ];
412 let result = self
413 .request(Method::PUT, path)
414 .with_payload(data)
415 .query(query)
416 .idempotent(true)
417 .do_put()
418 .await?;
419
420 Ok(PartId {
421 content_id: result.e_tag.unwrap(),
422 })
423 }
424
425 pub async fn multipart_initiate(
427 &self,
428 path: &Path,
429 opts: PutMultipartOpts,
430 ) -> Result<MultipartId> {
431 let response = self
432 .request(Method::POST, path)
433 .with_attributes(opts.attributes)
434 .header(&CONTENT_LENGTH, "0")
435 .query(&[("uploads", "")])
436 .send()
437 .await?;
438
439 let data = response.bytes().await.context(PutResponseBodySnafu)?;
440 let result: InitiateMultipartUploadResult =
441 quick_xml::de::from_reader(data.as_ref().reader()).context(InvalidPutResponseSnafu)?;
442
443 Ok(result.upload_id)
444 }
445
446 pub async fn multipart_cleanup(&self, path: &Path, multipart_id: &MultipartId) -> Result<()> {
448 let credential = self.get_credential().await?;
449 let url = self.object_url(path);
450
451 self.client
452 .request(Method::DELETE, &url)
453 .bearer_auth(&credential.bearer)
454 .header(CONTENT_TYPE, "application/octet-stream")
455 .header(CONTENT_LENGTH, "0")
456 .query(&[("uploadId", multipart_id)])
457 .send_retry(&self.config.retry_config)
458 .await
459 .context(RequestSnafu {
460 path: path.as_ref(),
461 })?;
462
463 Ok(())
464 }
465
466 pub async fn multipart_complete(
467 &self,
468 path: &Path,
469 multipart_id: &MultipartId,
470 completed_parts: Vec<PartId>,
471 ) -> Result<PutResult> {
472 if completed_parts.is_empty() {
473 let result = self
475 .request(Method::PUT, path)
476 .idempotent(true)
477 .do_put()
478 .await?;
479 self.multipart_cleanup(path, multipart_id).await?;
480 return Ok(result);
481 }
482
483 let upload_id = multipart_id.clone();
484 let url = self.object_url(path);
485
486 let upload_info = CompleteMultipartUpload::from(completed_parts);
487 let credential = self.get_credential().await?;
488
489 let data = quick_xml::se::to_string(&upload_info)
490 .context(InvalidPutResponseSnafu)?
491 .replace(""", "\"");
495
496 let response = self
497 .client
498 .request(Method::POST, &url)
499 .bearer_auth(&credential.bearer)
500 .query(&[("uploadId", upload_id)])
501 .body(data)
502 .retryable(&self.config.retry_config)
503 .idempotent(true)
504 .send()
505 .await
506 .context(CompleteMultipartRequestSnafu)?;
507
508 let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
509
510 let data = response
511 .bytes()
512 .await
513 .context(CompleteMultipartResponseBodySnafu)?;
514
515 let response: CompleteMultipartUploadResult =
516 quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
517
518 Ok(PutResult {
519 e_tag: Some(response.e_tag),
520 version,
521 })
522 }
523
524 pub async fn delete_request(&self, path: &Path) -> Result<()> {
526 self.request(Method::DELETE, path).send().await?;
527 Ok(())
528 }
529
530 pub async fn copy_request(&self, from: &Path, to: &Path, if_not_exists: bool) -> Result<()> {
532 let credential = self.get_credential().await?;
533 let url = self.object_url(to);
534
535 let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
536 let source = format!("{}/{}", self.bucket_name_encoded, from);
537
538 let mut builder = self
539 .client
540 .request(Method::PUT, url)
541 .header("x-goog-copy-source", source);
542
543 if if_not_exists {
544 builder = builder.header(&VERSION_MATCH, 0);
545 }
546
547 builder
548 .bearer_auth(&credential.bearer)
549 .header(CONTENT_LENGTH, 0)
552 .retryable(&self.config.retry_config)
553 .idempotent(!if_not_exists)
554 .send()
555 .await
556 .map_err(|err| match err.status() {
557 Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists {
558 source: Box::new(err),
559 path: to.to_string(),
560 },
561 _ => err.error(STORE, from.to_string()),
562 })?;
563
564 Ok(())
565 }
566}
567
568#[async_trait]
569impl GetClient for GoogleCloudStorageClient {
570 const STORE: &'static str = STORE;
571 const HEADER_CONFIG: HeaderConfig = HeaderConfig {
572 etag_required: true,
573 last_modified_required: true,
574 version_header: Some(VERSION_HEADER),
575 user_defined_metadata_prefix: Some(USER_DEFINED_METADATA_HEADER_PREFIX),
576 };
577
578 async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
580 let credential = self.get_credential().await?;
581 let url = self.object_url(path);
582
583 let method = match options.head {
584 true => Method::HEAD,
585 false => Method::GET,
586 };
587
588 let mut request = self.client.request(method, url);
589
590 if let Some(version) = &options.version {
591 request = request.query(&[("generation", version)]);
592 }
593
594 if !credential.bearer.is_empty() {
595 request = request.bearer_auth(&credential.bearer);
596 }
597
598 let response = request
599 .with_get_options(options)
600 .send_retry(&self.config.retry_config)
601 .await
602 .context(GetRequestSnafu {
603 path: path.as_ref(),
604 })?;
605
606 Ok(response)
607 }
608}
609
610#[async_trait]
611impl ListClient for GoogleCloudStorageClient {
612 async fn list_request(
614 &self,
615 prefix: Option<&str>,
616 delimiter: bool,
617 page_token: Option<&str>,
618 offset: Option<&str>,
619 ) -> Result<(ListResult, Option<String>)> {
620 let credential = self.get_credential().await?;
621 let url = format!("{}/{}", self.config.base_url, self.bucket_name_encoded);
622
623 let mut query = Vec::with_capacity(5);
624 query.push(("list-type", "2"));
625 if delimiter {
626 query.push(("delimiter", DELIMITER))
627 }
628
629 if let Some(prefix) = &prefix {
630 query.push(("prefix", prefix))
631 }
632
633 if let Some(page_token) = page_token {
634 query.push(("continuation-token", page_token))
635 }
636
637 if let Some(max_results) = &self.max_list_results {
638 query.push(("max-keys", max_results))
639 }
640
641 if let Some(offset) = offset {
642 query.push(("start-after", offset))
643 }
644
645 let response = self
646 .client
647 .request(Method::GET, url)
648 .query(&query)
649 .bearer_auth(&credential.bearer)
650 .send_retry(&self.config.retry_config)
651 .await
652 .context(ListRequestSnafu)?
653 .bytes()
654 .await
655 .context(ListResponseBodySnafu)?;
656
657 let mut response: ListResponse =
658 quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
659
660 let token = response.next_continuation_token.take();
661 Ok((response.try_into()?, token))
662 }
663}