bigtable_rs/
auth_service.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use gcp_auth::TokenProvider;
7use http::{HeaderValue, Request, Response};
8use log::debug;
9use tonic::body::BoxBody;
10use tonic::transport::Channel;
11use tower::Service;
12
13#[derive(Clone)]
14pub struct AuthSvc {
15    inner: Channel,
16    token_provider: Option<Arc<dyn TokenProvider>>,
17    scopes: String,
18}
19
20impl AuthSvc {
21    pub fn new(
22        inner: Channel,
23        authentication_manager: Option<Arc<dyn TokenProvider>>,
24        scopes: String,
25    ) -> Self {
26        AuthSvc {
27            inner,
28            token_provider: authentication_manager,
29            scopes,
30        }
31    }
32}
33
34impl Service<Request<BoxBody>> for AuthSvc {
35    type Response = Response<BoxBody>;
36    type Error = Box<dyn std::error::Error + Send + Sync>;
37    #[allow(clippy::type_complexity)]
38    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
39
40    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
41        self.inner.poll_ready(cx).map_err(Into::into)
42    }
43
44    fn call(&mut self, mut request: Request<BoxBody>) -> Self::Future {
45        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
46        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
47        // for details on why this is necessary
48        let clone = self.inner.clone();
49        let mut inner = std::mem::replace(&mut self.inner, clone);
50        let token_provider = self.token_provider.clone();
51        let scopes = self.scopes.clone();
52
53        Box::pin(async move {
54            let scopes = &[scopes.as_ref()];
55            let token_f_opt = token_provider.as_ref().map(|m| m.token(scopes));
56
57            return match token_f_opt {
58                None => {
59                    debug!("auth intercepting and not attaching token");
60                    let response = inner.call(request).await?;
61                    Ok(response)
62                }
63                Some(token_future) => {
64                    let token = token_future.await?;
65                    let token = token.as_str().parse::<String>()?;
66                    let bearer_header =
67                        HeaderValue::from_str(format!("Bearer {}", token.as_str()).as_str())
68                            .unwrap();
69                    debug!(
70                        "auth intercepting with scope {:?} and attaching token's head {}",
71                        scopes,
72                        std::str::from_utf8(&token.as_bytes()[..5]).unwrap_or("")
73                    );
74                    request.headers_mut().insert("authorization", bearer_header);
75                    let response = inner.call(request).await?;
76                    Ok(response)
77                }
78            };
79        })
80    }
81}