object_store/azure/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for Azure blob storage
19//!
20//! ## Streaming uploads
21//!
22//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks.
23//!
24//! Unused blocks will automatically be dropped after 7 days.
25use crate::{
26    multipart::{MultipartStore, PartId},
27    path::Path,
28    signer::Signer,
29    GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
30    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
31};
32use async_trait::async_trait;
33use futures::stream::BoxStream;
34use reqwest::Method;
35use std::fmt::Debug;
36use std::sync::Arc;
37use std::time::Duration;
38use url::Url;
39
40use crate::client::get::GetClientExt;
41use crate::client::list::ListClientExt;
42use crate::client::CredentialProvider;
43pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
44
45mod builder;
46mod client;
47mod credential;
48
49/// [`CredentialProvider`] for [`MicrosoftAzure`]
50pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
51use crate::azure::client::AzureClient;
52use crate::client::parts::Parts;
53pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
54pub use credential::AzureCredential;
55
56const STORE: &str = "MicrosoftAzure";
57
58/// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
59#[derive(Debug)]
60pub struct MicrosoftAzure {
61    client: Arc<client::AzureClient>,
62}
63
64impl MicrosoftAzure {
65    /// Returns the [`AzureCredentialProvider`] used by [`MicrosoftAzure`]
66    pub fn credentials(&self) -> &AzureCredentialProvider {
67        &self.client.config().credentials
68    }
69
70    /// Create a full URL to the resource specified by `path` with this instance's configuration.
71    fn path_url(&self, path: &Path) -> url::Url {
72        self.client.config().path_url(path)
73    }
74}
75
76impl std::fmt::Display for MicrosoftAzure {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(
79            f,
80            "MicrosoftAzure {{ account: {}, container: {} }}",
81            self.client.config().account,
82            self.client.config().container
83        )
84    }
85}
86
87#[async_trait]
88impl ObjectStore for MicrosoftAzure {
89    async fn put_opts(
90        &self,
91        location: &Path,
92        payload: PutPayload,
93        opts: PutOptions,
94    ) -> Result<PutResult> {
95        self.client.put_blob(location, payload, opts).await
96    }
97
98    async fn put_multipart_opts(
99        &self,
100        location: &Path,
101        opts: PutMultipartOpts,
102    ) -> Result<Box<dyn MultipartUpload>> {
103        Ok(Box::new(AzureMultiPartUpload {
104            part_idx: 0,
105            opts,
106            state: Arc::new(UploadState {
107                client: Arc::clone(&self.client),
108                location: location.clone(),
109                parts: Default::default(),
110            }),
111        }))
112    }
113
114    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
115        self.client.get_opts(location, options).await
116    }
117
118    async fn delete(&self, location: &Path) -> Result<()> {
119        self.client.delete_request(location, &()).await
120    }
121
122    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
123        self.client.list(prefix)
124    }
125
126    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
127        self.client.list_with_delimiter(prefix).await
128    }
129
130    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
131        self.client.copy_request(from, to, true).await
132    }
133
134    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
135        self.client.copy_request(from, to, false).await
136    }
137}
138
139#[async_trait]
140impl Signer for MicrosoftAzure {
141    /// Create a URL containing the relevant [Service SAS] query parameters that authorize a request
142    /// via `method` to the resource at `path` valid for the duration specified in `expires_in`.
143    ///
144    /// [Service SAS]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas
145    ///
146    /// # Example
147    ///
148    /// This example returns a URL that will enable a user to upload a file to
149    /// "some-folder/some-file.txt" in the next hour.
150    ///
151    /// ```
152    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
153    /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer};
154    /// # use reqwest::Method;
155    /// # use std::time::Duration;
156    /// #
157    /// let azure = MicrosoftAzureBuilder::new()
158    ///     .with_account("my-account")
159    ///     .with_access_key("my-access-key")
160    ///     .with_container_name("my-container")
161    ///     .build()?;
162    ///
163    /// let url = azure.signed_url(
164    ///     Method::PUT,
165    ///     &Path::from("some-folder/some-file.txt"),
166    ///     Duration::from_secs(60 * 60)
167    /// ).await?;
168    /// #     Ok(())
169    /// # }
170    /// ```
171    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
172        let mut url = self.path_url(path);
173        let signer = self.client.signer(expires_in).await?;
174        signer.sign(&method, &mut url)?;
175        Ok(url)
176    }
177
178    async fn signed_urls(
179        &self,
180        method: Method,
181        paths: &[Path],
182        expires_in: Duration,
183    ) -> Result<Vec<Url>> {
184        let mut urls = Vec::with_capacity(paths.len());
185        let signer = self.client.signer(expires_in).await?;
186        for path in paths {
187            let mut url = self.path_url(path);
188            signer.sign(&method, &mut url)?;
189            urls.push(url);
190        }
191        Ok(urls)
192    }
193}
194
195/// Relevant docs: <https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/>
196/// In Azure Blob Store, parts are "blocks"
197/// put_multipart_part -> PUT block
198/// complete -> PUT block list
199/// abort -> No equivalent; blocks are simply dropped after 7 days
200#[derive(Debug)]
201struct AzureMultiPartUpload {
202    part_idx: usize,
203    state: Arc<UploadState>,
204    opts: PutMultipartOpts,
205}
206
207#[derive(Debug)]
208struct UploadState {
209    location: Path,
210    parts: Parts,
211    client: Arc<AzureClient>,
212}
213
214#[async_trait]
215impl MultipartUpload for AzureMultiPartUpload {
216    fn put_part(&mut self, data: PutPayload) -> UploadPart {
217        let idx = self.part_idx;
218        self.part_idx += 1;
219        let state = Arc::clone(&self.state);
220        Box::pin(async move {
221            let part = state.client.put_block(&state.location, idx, data).await?;
222            state.parts.put(idx, part);
223            Ok(())
224        })
225    }
226
227    async fn complete(&mut self) -> Result<PutResult> {
228        let parts = self.state.parts.finish(self.part_idx)?;
229
230        self.state
231            .client
232            .put_block_list(&self.state.location, parts, std::mem::take(&mut self.opts))
233            .await
234    }
235
236    async fn abort(&mut self) -> Result<()> {
237        // Nothing to do
238        Ok(())
239    }
240}
241
242#[async_trait]
243impl MultipartStore for MicrosoftAzure {
244    async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
245        Ok(String::new())
246    }
247
248    async fn put_part(
249        &self,
250        path: &Path,
251        _: &MultipartId,
252        part_idx: usize,
253        data: PutPayload,
254    ) -> Result<PartId> {
255        self.client.put_block(path, part_idx, data).await
256    }
257
258    async fn complete_multipart(
259        &self,
260        path: &Path,
261        _: &MultipartId,
262        parts: Vec<PartId>,
263    ) -> Result<PutResult> {
264        self.client
265            .put_block_list(path, parts, Default::default())
266            .await
267    }
268
269    async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
270        // There is no way to drop blocks that have been uploaded. Instead, they simply
271        // expire in 7 days.
272        Ok(())
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::integration::*;
280    use crate::tests::*;
281    use bytes::Bytes;
282
283    #[tokio::test]
284    async fn azure_blob_test() {
285        maybe_skip_integration!();
286        let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
287
288        put_get_delete_list(&integration).await;
289        get_opts(&integration).await;
290        list_uses_directories_correctly(&integration).await;
291        list_with_delimiter(&integration).await;
292        rename_and_copy(&integration).await;
293        copy_if_not_exists(&integration).await;
294        stream_get(&integration).await;
295        put_opts(&integration, true).await;
296        multipart(&integration, &integration).await;
297        signing(&integration).await;
298
299        let validate = !integration.client.config().disable_tagging;
300        tagging(
301            Arc::new(MicrosoftAzure {
302                client: Arc::clone(&integration.client),
303            }),
304            validate,
305            |p| {
306                let client = Arc::clone(&integration.client);
307                async move { client.get_blob_tagging(&p).await }
308            },
309        )
310        .await;
311
312        // Azurite doesn't support attributes properly
313        if !integration.client.config().is_emulator {
314            put_get_attributes(&integration).await;
315        }
316    }
317
318    #[ignore = "Used for manual testing against a real storage account."]
319    #[tokio::test]
320    async fn test_user_delegation_key() {
321        let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
322        let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
323        let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
324        let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
325        let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
326        let integration = MicrosoftAzureBuilder::new()
327            .with_account(account)
328            .with_container_name(container)
329            .with_client_id(client_id)
330            .with_client_secret(client_secret)
331            .with_tenant_id(&tenant_id)
332            .build()
333            .unwrap();
334
335        let data = Bytes::from("hello world");
336        let path = Path::from("file.txt");
337        integration.put(&path, data.clone().into()).await.unwrap();
338
339        let signed = integration
340            .signed_url(Method::GET, &path, Duration::from_secs(60))
341            .await
342            .unwrap();
343
344        let resp = reqwest::get(signed).await.unwrap();
345        let loaded = resp.bytes().await.unwrap();
346
347        assert_eq!(data, loaded);
348    }
349
350    #[test]
351    fn azure_test_config_get_value() {
352        let azure_client_id = "object_store:fake_access_key_id".to_string();
353        let azure_storage_account_name = "object_store:fake_secret_key".to_string();
354        let azure_storage_token = "object_store:fake_default_region".to_string();
355        let builder = MicrosoftAzureBuilder::new()
356            .with_config(AzureConfigKey::ClientId, &azure_client_id)
357            .with_config(AzureConfigKey::AccountName, &azure_storage_account_name)
358            .with_config(AzureConfigKey::Token, &azure_storage_token);
359
360        assert_eq!(
361            builder.get_config_value(&AzureConfigKey::ClientId).unwrap(),
362            azure_client_id
363        );
364        assert_eq!(
365            builder
366                .get_config_value(&AzureConfigKey::AccountName)
367                .unwrap(),
368            azure_storage_account_name
369        );
370        assert_eq!(
371            builder.get_config_value(&AzureConfigKey::Token).unwrap(),
372            azure_storage_token
373        );
374    }
375}