object_store/client/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic utilities reqwest based ObjectStore implementations
19
20pub mod backoff;
21
22#[cfg(test)]
23pub mod mock_server;
24
25pub mod retry;
26
27#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
28pub mod pagination;
29
30pub mod get;
31
32#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
33pub mod list;
34
35#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
36pub mod token;
37
38pub mod header;
39
40#[cfg(any(feature = "aws", feature = "gcp"))]
41pub mod s3;
42
43#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
44pub mod parts;
45
46use async_trait::async_trait;
47use std::collections::HashMap;
48use std::str::FromStr;
49use std::sync::Arc;
50use std::time::Duration;
51
52use reqwest::header::{HeaderMap, HeaderValue};
53use reqwest::{Client, ClientBuilder, NoProxy, Proxy, RequestBuilder};
54use serde::{Deserialize, Serialize};
55
56use crate::config::{fmt_duration, ConfigValue};
57use crate::path::Path;
58use crate::{GetOptions, Result};
59
60fn map_client_error(e: reqwest::Error) -> super::Error {
61    super::Error::Generic {
62        store: "HTTP client",
63        source: Box::new(e),
64    }
65}
66
67static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
68
69/// Configuration keys for [`ClientOptions`]
70#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Deserialize, Serialize)]
71#[non_exhaustive]
72pub enum ClientConfigKey {
73    /// Allow non-TLS, i.e. non-HTTPS connections
74    AllowHttp,
75    /// Skip certificate validation on https connections.
76    ///
77    /// # Warning
78    ///
79    /// You should think very carefully before using this method. If
80    /// invalid certificates are trusted, *any* certificate for *any* site
81    /// will be trusted for use. This includes expired certificates. This
82    /// introduces significant vulnerabilities, and should only be used
83    /// as a last resort or for testing
84    AllowInvalidCertificates,
85    /// Timeout for only the connect phase of a Client
86    ConnectTimeout,
87    /// default CONTENT_TYPE for uploads
88    DefaultContentType,
89    /// Only use http1 connections
90    Http1Only,
91    /// Interval for HTTP2 Ping frames should be sent to keep a connection alive.
92    Http2KeepAliveInterval,
93    /// Timeout for receiving an acknowledgement of the keep-alive ping.
94    Http2KeepAliveTimeout,
95    /// Enable HTTP2 keep alive pings for idle connections
96    Http2KeepAliveWhileIdle,
97    /// Only use http2 connections
98    Http2Only,
99    /// The pool max idle timeout
100    ///
101    /// This is the length of time an idle connection will be kept alive
102    PoolIdleTimeout,
103    /// maximum number of idle connections per host
104    PoolMaxIdlePerHost,
105    /// HTTP proxy to use for requests
106    ProxyUrl,
107    /// PEM-formatted CA certificate for proxy connections
108    ProxyCaCertificate,
109    /// List of hosts that bypass proxy
110    ProxyExcludes,
111    /// Request timeout
112    ///
113    /// The timeout is applied from when the request starts connecting until the
114    /// response body has finished
115    Timeout,
116    /// User-Agent header to be used by this client
117    UserAgent,
118}
119
120impl AsRef<str> for ClientConfigKey {
121    fn as_ref(&self) -> &str {
122        match self {
123            Self::AllowHttp => "allow_http",
124            Self::AllowInvalidCertificates => "allow_invalid_certificates",
125            Self::ConnectTimeout => "connect_timeout",
126            Self::DefaultContentType => "default_content_type",
127            Self::Http1Only => "http1_only",
128            Self::Http2Only => "http2_only",
129            Self::Http2KeepAliveInterval => "http2_keep_alive_interval",
130            Self::Http2KeepAliveTimeout => "http2_keep_alive_timeout",
131            Self::Http2KeepAliveWhileIdle => "http2_keep_alive_while_idle",
132            Self::PoolIdleTimeout => "pool_idle_timeout",
133            Self::PoolMaxIdlePerHost => "pool_max_idle_per_host",
134            Self::ProxyUrl => "proxy_url",
135            Self::ProxyCaCertificate => "proxy_ca_certificate",
136            Self::ProxyExcludes => "proxy_excludes",
137            Self::Timeout => "timeout",
138            Self::UserAgent => "user_agent",
139        }
140    }
141}
142
143impl FromStr for ClientConfigKey {
144    type Err = super::Error;
145
146    fn from_str(s: &str) -> Result<Self, Self::Err> {
147        match s {
148            "allow_http" => Ok(Self::AllowHttp),
149            "allow_invalid_certificates" => Ok(Self::AllowInvalidCertificates),
150            "connect_timeout" => Ok(Self::ConnectTimeout),
151            "default_content_type" => Ok(Self::DefaultContentType),
152            "http1_only" => Ok(Self::Http1Only),
153            "http2_only" => Ok(Self::Http2Only),
154            "http2_keep_alive_interval" => Ok(Self::Http2KeepAliveInterval),
155            "http2_keep_alive_timeout" => Ok(Self::Http2KeepAliveTimeout),
156            "http2_keep_alive_while_idle" => Ok(Self::Http2KeepAliveWhileIdle),
157            "pool_idle_timeout" => Ok(Self::PoolIdleTimeout),
158            "pool_max_idle_per_host" => Ok(Self::PoolMaxIdlePerHost),
159            "proxy_url" => Ok(Self::ProxyUrl),
160            "timeout" => Ok(Self::Timeout),
161            "user_agent" => Ok(Self::UserAgent),
162            _ => Err(super::Error::UnknownConfigurationKey {
163                store: "HTTP",
164                key: s.into(),
165            }),
166        }
167    }
168}
169
170/// HTTP client configuration for remote object stores
171#[derive(Debug, Clone)]
172pub struct ClientOptions {
173    user_agent: Option<ConfigValue<HeaderValue>>,
174    content_type_map: HashMap<String, String>,
175    default_content_type: Option<String>,
176    default_headers: Option<HeaderMap>,
177    proxy_url: Option<String>,
178    proxy_ca_certificate: Option<String>,
179    proxy_excludes: Option<String>,
180    allow_http: ConfigValue<bool>,
181    allow_insecure: ConfigValue<bool>,
182    timeout: Option<ConfigValue<Duration>>,
183    connect_timeout: Option<ConfigValue<Duration>>,
184    pool_idle_timeout: Option<ConfigValue<Duration>>,
185    pool_max_idle_per_host: Option<ConfigValue<usize>>,
186    http2_keep_alive_interval: Option<ConfigValue<Duration>>,
187    http2_keep_alive_timeout: Option<ConfigValue<Duration>>,
188    http2_keep_alive_while_idle: ConfigValue<bool>,
189    http1_only: ConfigValue<bool>,
190    http2_only: ConfigValue<bool>,
191}
192
193impl Default for ClientOptions {
194    fn default() -> Self {
195        // Defaults based on
196        // <https://docs.aws.amazon.com/sdkref/latest/guide/feature-smart-config-defaults.html>
197        // <https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html>
198        // Which recommend a connection timeout of 3.1s and a request timeout of 2s
199        //
200        // As object store requests may involve the transfer of non-trivial volumes of data
201        // we opt for a slightly higher default timeout of 30 seconds
202        Self {
203            user_agent: None,
204            content_type_map: Default::default(),
205            default_content_type: None,
206            default_headers: None,
207            proxy_url: None,
208            proxy_ca_certificate: None,
209            proxy_excludes: None,
210            allow_http: Default::default(),
211            allow_insecure: Default::default(),
212            timeout: Some(Duration::from_secs(30).into()),
213            connect_timeout: Some(Duration::from_secs(5).into()),
214            pool_idle_timeout: None,
215            pool_max_idle_per_host: None,
216            http2_keep_alive_interval: None,
217            http2_keep_alive_timeout: None,
218            http2_keep_alive_while_idle: Default::default(),
219            // HTTP2 is known to be significantly slower than HTTP1, so we default
220            // to HTTP1 for now.
221            // https://github.com/apache/arrow-rs/issues/5194
222            http1_only: true.into(),
223            http2_only: Default::default(),
224        }
225    }
226}
227
228impl ClientOptions {
229    /// Create a new [`ClientOptions`] with default values
230    pub fn new() -> Self {
231        Default::default()
232    }
233
234    /// Set an option by key
235    pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
236        match key {
237            ClientConfigKey::AllowHttp => self.allow_http.parse(value),
238            ClientConfigKey::AllowInvalidCertificates => self.allow_insecure.parse(value),
239            ClientConfigKey::ConnectTimeout => {
240                self.connect_timeout = Some(ConfigValue::Deferred(value.into()))
241            }
242            ClientConfigKey::DefaultContentType => self.default_content_type = Some(value.into()),
243            ClientConfigKey::Http1Only => self.http1_only.parse(value),
244            ClientConfigKey::Http2Only => self.http2_only.parse(value),
245            ClientConfigKey::Http2KeepAliveInterval => {
246                self.http2_keep_alive_interval = Some(ConfigValue::Deferred(value.into()))
247            }
248            ClientConfigKey::Http2KeepAliveTimeout => {
249                self.http2_keep_alive_timeout = Some(ConfigValue::Deferred(value.into()))
250            }
251            ClientConfigKey::Http2KeepAliveWhileIdle => {
252                self.http2_keep_alive_while_idle.parse(value)
253            }
254            ClientConfigKey::PoolIdleTimeout => {
255                self.pool_idle_timeout = Some(ConfigValue::Deferred(value.into()))
256            }
257            ClientConfigKey::PoolMaxIdlePerHost => {
258                self.pool_max_idle_per_host = Some(ConfigValue::Deferred(value.into()))
259            }
260            ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
261            ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
262            ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
263            ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
264            ClientConfigKey::UserAgent => {
265                self.user_agent = Some(ConfigValue::Deferred(value.into()))
266            }
267        }
268        self
269    }
270
271    /// Get an option by key
272    pub fn get_config_value(&self, key: &ClientConfigKey) -> Option<String> {
273        match key {
274            ClientConfigKey::AllowHttp => Some(self.allow_http.to_string()),
275            ClientConfigKey::AllowInvalidCertificates => Some(self.allow_insecure.to_string()),
276            ClientConfigKey::ConnectTimeout => self.connect_timeout.as_ref().map(fmt_duration),
277            ClientConfigKey::DefaultContentType => self.default_content_type.clone(),
278            ClientConfigKey::Http1Only => Some(self.http1_only.to_string()),
279            ClientConfigKey::Http2KeepAliveInterval => {
280                self.http2_keep_alive_interval.as_ref().map(fmt_duration)
281            }
282            ClientConfigKey::Http2KeepAliveTimeout => {
283                self.http2_keep_alive_timeout.as_ref().map(fmt_duration)
284            }
285            ClientConfigKey::Http2KeepAliveWhileIdle => {
286                Some(self.http2_keep_alive_while_idle.to_string())
287            }
288            ClientConfigKey::Http2Only => Some(self.http2_only.to_string()),
289            ClientConfigKey::PoolIdleTimeout => self.pool_idle_timeout.as_ref().map(fmt_duration),
290            ClientConfigKey::PoolMaxIdlePerHost => {
291                self.pool_max_idle_per_host.as_ref().map(|v| v.to_string())
292            }
293            ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
294            ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
295            ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
296            ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
297            ClientConfigKey::UserAgent => self
298                .user_agent
299                .as_ref()
300                .and_then(|v| v.get().ok())
301                .and_then(|v| v.to_str().ok().map(|s| s.to_string())),
302        }
303    }
304
305    /// Sets the User-Agent header to be used by this client
306    ///
307    /// Default is based on the version of this crate
308    pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
309        self.user_agent = Some(agent.into());
310        self
311    }
312
313    /// Set the default CONTENT_TYPE for uploads
314    pub fn with_default_content_type(mut self, mime: impl Into<String>) -> Self {
315        self.default_content_type = Some(mime.into());
316        self
317    }
318
319    /// Set the CONTENT_TYPE for a given file extension
320    pub fn with_content_type_for_suffix(
321        mut self,
322        extension: impl Into<String>,
323        mime: impl Into<String>,
324    ) -> Self {
325        self.content_type_map.insert(extension.into(), mime.into());
326        self
327    }
328
329    /// Sets the default headers for every request
330    pub fn with_default_headers(mut self, headers: HeaderMap) -> Self {
331        self.default_headers = Some(headers);
332        self
333    }
334
335    /// Sets what protocol is allowed. If `allow_http` is :
336    /// * false (default):  Only HTTPS are allowed
337    /// * true:  HTTP and HTTPS are allowed
338    pub fn with_allow_http(mut self, allow_http: bool) -> Self {
339        self.allow_http = allow_http.into();
340        self
341    }
342    /// Allows connections to invalid SSL certificates
343    /// * false (default):  Only valid HTTPS certificates are allowed
344    /// * true:  All HTTPS certificates are allowed
345    ///
346    /// # Warning
347    ///
348    /// You should think very carefully before using this method. If
349    /// invalid certificates are trusted, *any* certificate for *any* site
350    /// will be trusted for use. This includes expired certificates. This
351    /// introduces significant vulnerabilities, and should only be used
352    /// as a last resort or for testing
353    pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
354        self.allow_insecure = allow_insecure.into();
355        self
356    }
357
358    /// Only use http1 connections
359    ///
360    /// This is on by default, since http2 is known to be significantly slower than http1.
361    pub fn with_http1_only(mut self) -> Self {
362        self.http2_only = false.into();
363        self.http1_only = true.into();
364        self
365    }
366
367    /// Only use http2 connections
368    pub fn with_http2_only(mut self) -> Self {
369        self.http1_only = false.into();
370        self.http2_only = true.into();
371        self
372    }
373
374    /// Use http2 if supported, otherwise use http1.
375    pub fn with_allow_http2(mut self) -> Self {
376        self.http1_only = false.into();
377        self.http2_only = false.into();
378        self
379    }
380
381    /// Set a proxy URL to use for requests
382    pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
383        self.proxy_url = Some(proxy_url.into());
384        self
385    }
386
387    /// Set a trusted proxy CA certificate
388    pub fn with_proxy_ca_certificate(mut self, proxy_ca_certificate: impl Into<String>) -> Self {
389        self.proxy_ca_certificate = Some(proxy_ca_certificate.into());
390        self
391    }
392
393    /// Set a list of hosts to exclude from proxy connections
394    pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) -> Self {
395        self.proxy_excludes = Some(proxy_excludes.into());
396        self
397    }
398
399    /// Set a request timeout
400    ///
401    /// The timeout is applied from when the request starts connecting until the
402    /// response body has finished
403    ///
404    /// Default is 30 seconds
405    pub fn with_timeout(mut self, timeout: Duration) -> Self {
406        self.timeout = Some(ConfigValue::Parsed(timeout));
407        self
408    }
409
410    /// Disables the request timeout
411    ///
412    /// See [`Self::with_timeout`]
413    pub fn with_timeout_disabled(mut self) -> Self {
414        self.timeout = None;
415        self
416    }
417
418    /// Set a timeout for only the connect phase of a Client
419    ///
420    /// Default is 5 seconds
421    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
422        self.connect_timeout = Some(ConfigValue::Parsed(timeout));
423        self
424    }
425
426    /// Disables the connection timeout
427    ///
428    /// See [`Self::with_connect_timeout`]
429    pub fn with_connect_timeout_disabled(mut self) -> Self {
430        self.timeout = None;
431        self
432    }
433
434    /// Set the pool max idle timeout
435    ///
436    /// This is the length of time an idle connection will be kept alive
437    ///
438    /// Default is 90 seconds enforced by reqwest
439    pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
440        self.pool_idle_timeout = Some(ConfigValue::Parsed(timeout));
441        self
442    }
443
444    /// Set the maximum number of idle connections per host
445    ///
446    /// Default is no limit enforced by reqwest
447    pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
448        self.pool_max_idle_per_host = Some(max.into());
449        self
450    }
451
452    /// Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive.
453    ///
454    /// Default is disabled enforced by reqwest
455    pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
456        self.http2_keep_alive_interval = Some(ConfigValue::Parsed(interval));
457        self
458    }
459
460    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
461    ///
462    /// If the ping is not acknowledged within the timeout, the connection will be closed.
463    /// Does nothing if http2_keep_alive_interval is disabled.
464    ///
465    /// Default is disabled enforced by reqwest
466    pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
467        self.http2_keep_alive_timeout = Some(ConfigValue::Parsed(interval));
468        self
469    }
470
471    /// Enable HTTP2 keep alive pings for idle connections
472    ///
473    /// If disabled, keep-alive pings are only sent while there are open request/response
474    /// streams. If enabled, pings are also sent when no streams are active
475    ///
476    /// Default is disabled enforced by reqwest
477    pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
478        self.http2_keep_alive_while_idle = true.into();
479        self
480    }
481
482    /// Get the mime type for the file in `path` to be uploaded
483    ///
484    /// Gets the file extension from `path`, and returns the
485    /// mime type if it was defined initially through
486    /// `ClientOptions::with_content_type_for_suffix`
487    ///
488    /// Otherwise, returns the default mime type if it was defined
489    /// earlier through `ClientOptions::with_default_content_type`
490    pub fn get_content_type(&self, path: &Path) -> Option<&str> {
491        match path.extension() {
492            Some(extension) => match self.content_type_map.get(extension) {
493                Some(ct) => Some(ct.as_str()),
494                None => self.default_content_type.as_deref(),
495            },
496            None => self.default_content_type.as_deref(),
497        }
498    }
499
500    /// Create a [`Client`] with overrides optimised for metadata endpoint access
501    ///
502    /// In particular:
503    /// * Allows HTTP as metadata endpoints do not use TLS
504    /// * Configures a low connection timeout to provide quick feedback if not present
505    #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
506    pub(crate) fn metadata_client(&self) -> Result<Client> {
507        self.clone()
508            .with_allow_http(true)
509            .with_connect_timeout(Duration::from_secs(1))
510            .client()
511    }
512
513    pub(crate) fn client(&self) -> Result<Client> {
514        let mut builder = ClientBuilder::new();
515
516        match &self.user_agent {
517            Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
518            None => builder = builder.user_agent(DEFAULT_USER_AGENT),
519        }
520
521        if let Some(headers) = &self.default_headers {
522            builder = builder.default_headers(headers.clone())
523        }
524
525        if let Some(proxy) = &self.proxy_url {
526            let mut proxy = Proxy::all(proxy).map_err(map_client_error)?;
527
528            if let Some(certificate) = &self.proxy_ca_certificate {
529                let certificate = reqwest::tls::Certificate::from_pem(certificate.as_bytes())
530                    .map_err(map_client_error)?;
531
532                builder = builder.add_root_certificate(certificate);
533            }
534
535            if let Some(proxy_excludes) = &self.proxy_excludes {
536                let no_proxy = NoProxy::from_string(proxy_excludes);
537
538                proxy = proxy.no_proxy(no_proxy);
539            }
540
541            builder = builder.proxy(proxy);
542        }
543
544        if let Some(timeout) = &self.timeout {
545            builder = builder.timeout(timeout.get()?)
546        }
547
548        if let Some(timeout) = &self.connect_timeout {
549            builder = builder.connect_timeout(timeout.get()?)
550        }
551
552        if let Some(timeout) = &self.pool_idle_timeout {
553            builder = builder.pool_idle_timeout(timeout.get()?)
554        }
555
556        if let Some(max) = &self.pool_max_idle_per_host {
557            builder = builder.pool_max_idle_per_host(max.get()?)
558        }
559
560        if let Some(interval) = &self.http2_keep_alive_interval {
561            builder = builder.http2_keep_alive_interval(interval.get()?)
562        }
563
564        if let Some(interval) = &self.http2_keep_alive_timeout {
565            builder = builder.http2_keep_alive_timeout(interval.get()?)
566        }
567
568        if self.http2_keep_alive_while_idle.get()? {
569            builder = builder.http2_keep_alive_while_idle(true)
570        }
571
572        if self.http1_only.get()? {
573            builder = builder.http1_only()
574        }
575
576        if self.http2_only.get()? {
577            builder = builder.http2_prior_knowledge()
578        }
579
580        if self.allow_insecure.get()? {
581            builder = builder.danger_accept_invalid_certs(true)
582        }
583
584        builder
585            .https_only(!self.allow_http.get()?)
586            .build()
587            .map_err(map_client_error)
588    }
589}
590
591pub trait GetOptionsExt {
592    fn with_get_options(self, options: GetOptions) -> Self;
593}
594
595impl GetOptionsExt for RequestBuilder {
596    fn with_get_options(mut self, options: GetOptions) -> Self {
597        use hyper::header::*;
598
599        if let Some(range) = options.range {
600            self = self.header(RANGE, range.to_string());
601        }
602
603        if let Some(tag) = options.if_match {
604            self = self.header(IF_MATCH, tag);
605        }
606
607        if let Some(tag) = options.if_none_match {
608            self = self.header(IF_NONE_MATCH, tag);
609        }
610
611        const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
612        if let Some(date) = options.if_unmodified_since {
613            self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
614        }
615
616        if let Some(date) = options.if_modified_since {
617            self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
618        }
619
620        self
621    }
622}
623
624/// Provides credentials for use when signing requests
625#[async_trait]
626pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
627    /// The type of credential returned by this provider
628    type Credential;
629
630    /// Return a credential
631    async fn get_credential(&self) -> Result<Arc<Self::Credential>>;
632}
633
634/// A static set of credentials
635#[derive(Debug)]
636pub struct StaticCredentialProvider<T> {
637    credential: Arc<T>,
638}
639
640impl<T> StaticCredentialProvider<T> {
641    /// A [`CredentialProvider`] for a static credential of type `T`
642    pub fn new(credential: T) -> Self {
643        Self {
644            credential: Arc::new(credential),
645        }
646    }
647}
648
649#[async_trait]
650impl<T> CredentialProvider for StaticCredentialProvider<T>
651where
652    T: std::fmt::Debug + Send + Sync,
653{
654    type Credential = T;
655
656    async fn get_credential(&self) -> Result<Arc<T>> {
657        Ok(Arc::clone(&self.credential))
658    }
659}
660
661#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
662mod cloud {
663    use super::*;
664    use crate::client::token::{TemporaryToken, TokenCache};
665    use crate::RetryConfig;
666
667    /// A [`CredentialProvider`] that uses [`Client`] to fetch temporary tokens
668    #[derive(Debug)]
669    pub struct TokenCredentialProvider<T: TokenProvider> {
670        inner: T,
671        client: Client,
672        retry: RetryConfig,
673        cache: TokenCache<Arc<T::Credential>>,
674    }
675
676    impl<T: TokenProvider> TokenCredentialProvider<T> {
677        pub fn new(inner: T, client: Client, retry: RetryConfig) -> Self {
678            Self {
679                inner,
680                client,
681                retry,
682                cache: Default::default(),
683            }
684        }
685
686        /// Override the minimum remaining TTL for a cached token to be used
687        #[cfg(feature = "aws")]
688        pub fn with_min_ttl(mut self, min_ttl: Duration) -> Self {
689            self.cache = self.cache.with_min_ttl(min_ttl);
690            self
691        }
692    }
693
694    #[async_trait]
695    impl<T: TokenProvider> CredentialProvider for TokenCredentialProvider<T> {
696        type Credential = T::Credential;
697
698        async fn get_credential(&self) -> Result<Arc<Self::Credential>> {
699            self.cache
700                .get_or_insert_with(|| self.inner.fetch_token(&self.client, &self.retry))
701                .await
702        }
703    }
704
705    #[async_trait]
706    pub trait TokenProvider: std::fmt::Debug + Send + Sync {
707        type Credential: std::fmt::Debug + Send + Sync;
708
709        async fn fetch_token(
710            &self,
711            client: &Client,
712            retry: &RetryConfig,
713        ) -> Result<TemporaryToken<Arc<Self::Credential>>>;
714    }
715}
716
717#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
718pub use cloud::*;
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use std::collections::HashMap;
724
725    #[test]
726    fn client_test_config_from_map() {
727        let allow_http = "true".to_string();
728        let allow_invalid_certificates = "false".to_string();
729        let connect_timeout = "90 seconds".to_string();
730        let default_content_type = "object_store:fake_default_content_type".to_string();
731        let http1_only = "true".to_string();
732        let http2_only = "false".to_string();
733        let http2_keep_alive_interval = "90 seconds".to_string();
734        let http2_keep_alive_timeout = "91 seconds".to_string();
735        let http2_keep_alive_while_idle = "92 seconds".to_string();
736        let pool_idle_timeout = "93 seconds".to_string();
737        let pool_max_idle_per_host = "94".to_string();
738        let proxy_url = "https://fake_proxy_url".to_string();
739        let timeout = "95 seconds".to_string();
740        let user_agent = "object_store:fake_user_agent".to_string();
741
742        let options = HashMap::from([
743            ("allow_http", allow_http.clone()),
744            (
745                "allow_invalid_certificates",
746                allow_invalid_certificates.clone(),
747            ),
748            ("connect_timeout", connect_timeout.clone()),
749            ("default_content_type", default_content_type.clone()),
750            ("http1_only", http1_only.clone()),
751            ("http2_only", http2_only.clone()),
752            (
753                "http2_keep_alive_interval",
754                http2_keep_alive_interval.clone(),
755            ),
756            ("http2_keep_alive_timeout", http2_keep_alive_timeout.clone()),
757            (
758                "http2_keep_alive_while_idle",
759                http2_keep_alive_while_idle.clone(),
760            ),
761            ("pool_idle_timeout", pool_idle_timeout.clone()),
762            ("pool_max_idle_per_host", pool_max_idle_per_host.clone()),
763            ("proxy_url", proxy_url.clone()),
764            ("timeout", timeout.clone()),
765            ("user_agent", user_agent.clone()),
766        ]);
767
768        let builder = options
769            .into_iter()
770            .fold(ClientOptions::new(), |builder, (key, value)| {
771                builder.with_config(key.parse().unwrap(), value)
772            });
773
774        assert_eq!(
775            builder
776                .get_config_value(&ClientConfigKey::AllowHttp)
777                .unwrap(),
778            allow_http
779        );
780        assert_eq!(
781            builder
782                .get_config_value(&ClientConfigKey::AllowInvalidCertificates)
783                .unwrap(),
784            allow_invalid_certificates
785        );
786        assert_eq!(
787            builder
788                .get_config_value(&ClientConfigKey::ConnectTimeout)
789                .unwrap(),
790            connect_timeout
791        );
792        assert_eq!(
793            builder
794                .get_config_value(&ClientConfigKey::DefaultContentType)
795                .unwrap(),
796            default_content_type
797        );
798        assert_eq!(
799            builder
800                .get_config_value(&ClientConfigKey::Http1Only)
801                .unwrap(),
802            http1_only
803        );
804        assert_eq!(
805            builder
806                .get_config_value(&ClientConfigKey::Http2Only)
807                .unwrap(),
808            http2_only
809        );
810        assert_eq!(
811            builder
812                .get_config_value(&ClientConfigKey::Http2KeepAliveInterval)
813                .unwrap(),
814            http2_keep_alive_interval
815        );
816        assert_eq!(
817            builder
818                .get_config_value(&ClientConfigKey::Http2KeepAliveTimeout)
819                .unwrap(),
820            http2_keep_alive_timeout
821        );
822        assert_eq!(
823            builder
824                .get_config_value(&ClientConfigKey::Http2KeepAliveWhileIdle)
825                .unwrap(),
826            http2_keep_alive_while_idle
827        );
828
829        assert_eq!(
830            builder
831                .get_config_value(&ClientConfigKey::PoolIdleTimeout)
832                .unwrap(),
833            pool_idle_timeout
834        );
835        assert_eq!(
836            builder
837                .get_config_value(&ClientConfigKey::PoolMaxIdlePerHost)
838                .unwrap(),
839            pool_max_idle_per_host
840        );
841        assert_eq!(
842            builder
843                .get_config_value(&ClientConfigKey::ProxyUrl)
844                .unwrap(),
845            proxy_url
846        );
847        assert_eq!(
848            builder.get_config_value(&ClientConfigKey::Timeout).unwrap(),
849            timeout
850        );
851        assert_eq!(
852            builder
853                .get_config_value(&ClientConfigKey::UserAgent)
854                .unwrap(),
855            user_agent
856        );
857    }
858}