1use crate::{
26 multipart::{MultipartStore, PartId},
27 path::Path,
28 signer::Signer,
29 GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
30 PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
31};
32use async_trait::async_trait;
33use futures::stream::BoxStream;
34use reqwest::Method;
35use std::fmt::Debug;
36use std::sync::Arc;
37use std::time::Duration;
38use url::Url;
39
40use crate::client::get::GetClientExt;
41use crate::client::list::ListClientExt;
42use crate::client::CredentialProvider;
43pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
44
45mod builder;
46mod client;
47mod credential;
48
49pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
51use crate::azure::client::AzureClient;
52use crate::client::parts::Parts;
53pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
54pub use credential::AzureCredential;
55
56const STORE: &str = "MicrosoftAzure";
57
58#[derive(Debug)]
60pub struct MicrosoftAzure {
61 client: Arc<client::AzureClient>,
62}
63
64impl MicrosoftAzure {
65 pub fn credentials(&self) -> &AzureCredentialProvider {
67 &self.client.config().credentials
68 }
69
70 fn path_url(&self, path: &Path) -> url::Url {
72 self.client.config().path_url(path)
73 }
74}
75
76impl std::fmt::Display for MicrosoftAzure {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "MicrosoftAzure {{ account: {}, container: {} }}",
81 self.client.config().account,
82 self.client.config().container
83 )
84 }
85}
86
87#[async_trait]
88impl ObjectStore for MicrosoftAzure {
89 async fn put_opts(
90 &self,
91 location: &Path,
92 payload: PutPayload,
93 opts: PutOptions,
94 ) -> Result<PutResult> {
95 self.client.put_blob(location, payload, opts).await
96 }
97
98 async fn put_multipart_opts(
99 &self,
100 location: &Path,
101 opts: PutMultipartOpts,
102 ) -> Result<Box<dyn MultipartUpload>> {
103 Ok(Box::new(AzureMultiPartUpload {
104 part_idx: 0,
105 opts,
106 state: Arc::new(UploadState {
107 client: Arc::clone(&self.client),
108 location: location.clone(),
109 parts: Default::default(),
110 }),
111 }))
112 }
113
114 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
115 self.client.get_opts(location, options).await
116 }
117
118 async fn delete(&self, location: &Path) -> Result<()> {
119 self.client.delete_request(location, &()).await
120 }
121
122 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
123 self.client.list(prefix)
124 }
125
126 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
127 self.client.list_with_delimiter(prefix).await
128 }
129
130 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
131 self.client.copy_request(from, to, true).await
132 }
133
134 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
135 self.client.copy_request(from, to, false).await
136 }
137}
138
139#[async_trait]
140impl Signer for MicrosoftAzure {
141 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
172 let mut url = self.path_url(path);
173 let signer = self.client.signer(expires_in).await?;
174 signer.sign(&method, &mut url)?;
175 Ok(url)
176 }
177
178 async fn signed_urls(
179 &self,
180 method: Method,
181 paths: &[Path],
182 expires_in: Duration,
183 ) -> Result<Vec<Url>> {
184 let mut urls = Vec::with_capacity(paths.len());
185 let signer = self.client.signer(expires_in).await?;
186 for path in paths {
187 let mut url = self.path_url(path);
188 signer.sign(&method, &mut url)?;
189 urls.push(url);
190 }
191 Ok(urls)
192 }
193}
194
195#[derive(Debug)]
201struct AzureMultiPartUpload {
202 part_idx: usize,
203 state: Arc<UploadState>,
204 opts: PutMultipartOpts,
205}
206
207#[derive(Debug)]
208struct UploadState {
209 location: Path,
210 parts: Parts,
211 client: Arc<AzureClient>,
212}
213
214#[async_trait]
215impl MultipartUpload for AzureMultiPartUpload {
216 fn put_part(&mut self, data: PutPayload) -> UploadPart {
217 let idx = self.part_idx;
218 self.part_idx += 1;
219 let state = Arc::clone(&self.state);
220 Box::pin(async move {
221 let part = state.client.put_block(&state.location, idx, data).await?;
222 state.parts.put(idx, part);
223 Ok(())
224 })
225 }
226
227 async fn complete(&mut self) -> Result<PutResult> {
228 let parts = self.state.parts.finish(self.part_idx)?;
229
230 self.state
231 .client
232 .put_block_list(&self.state.location, parts, std::mem::take(&mut self.opts))
233 .await
234 }
235
236 async fn abort(&mut self) -> Result<()> {
237 Ok(())
239 }
240}
241
242#[async_trait]
243impl MultipartStore for MicrosoftAzure {
244 async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
245 Ok(String::new())
246 }
247
248 async fn put_part(
249 &self,
250 path: &Path,
251 _: &MultipartId,
252 part_idx: usize,
253 data: PutPayload,
254 ) -> Result<PartId> {
255 self.client.put_block(path, part_idx, data).await
256 }
257
258 async fn complete_multipart(
259 &self,
260 path: &Path,
261 _: &MultipartId,
262 parts: Vec<PartId>,
263 ) -> Result<PutResult> {
264 self.client
265 .put_block_list(path, parts, Default::default())
266 .await
267 }
268
269 async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
270 Ok(())
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::integration::*;
280 use crate::tests::*;
281 use bytes::Bytes;
282
283 #[tokio::test]
284 async fn azure_blob_test() {
285 maybe_skip_integration!();
286 let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
287
288 put_get_delete_list(&integration).await;
289 get_opts(&integration).await;
290 list_uses_directories_correctly(&integration).await;
291 list_with_delimiter(&integration).await;
292 rename_and_copy(&integration).await;
293 copy_if_not_exists(&integration).await;
294 stream_get(&integration).await;
295 put_opts(&integration, true).await;
296 multipart(&integration, &integration).await;
297 signing(&integration).await;
298
299 let validate = !integration.client.config().disable_tagging;
300 tagging(
301 Arc::new(MicrosoftAzure {
302 client: Arc::clone(&integration.client),
303 }),
304 validate,
305 |p| {
306 let client = Arc::clone(&integration.client);
307 async move { client.get_blob_tagging(&p).await }
308 },
309 )
310 .await;
311
312 if !integration.client.config().is_emulator {
314 put_get_attributes(&integration).await;
315 }
316 }
317
318 #[ignore = "Used for manual testing against a real storage account."]
319 #[tokio::test]
320 async fn test_user_delegation_key() {
321 let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
322 let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
323 let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
324 let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
325 let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
326 let integration = MicrosoftAzureBuilder::new()
327 .with_account(account)
328 .with_container_name(container)
329 .with_client_id(client_id)
330 .with_client_secret(client_secret)
331 .with_tenant_id(&tenant_id)
332 .build()
333 .unwrap();
334
335 let data = Bytes::from("hello world");
336 let path = Path::from("file.txt");
337 integration.put(&path, data.clone().into()).await.unwrap();
338
339 let signed = integration
340 .signed_url(Method::GET, &path, Duration::from_secs(60))
341 .await
342 .unwrap();
343
344 let resp = reqwest::get(signed).await.unwrap();
345 let loaded = resp.bytes().await.unwrap();
346
347 assert_eq!(data, loaded);
348 }
349
350 #[test]
351 fn azure_test_config_get_value() {
352 let azure_client_id = "object_store:fake_access_key_id".to_string();
353 let azure_storage_account_name = "object_store:fake_secret_key".to_string();
354 let azure_storage_token = "object_store:fake_default_region".to_string();
355 let builder = MicrosoftAzureBuilder::new()
356 .with_config(AzureConfigKey::ClientId, &azure_client_id)
357 .with_config(AzureConfigKey::AccountName, &azure_storage_account_name)
358 .with_config(AzureConfigKey::Token, &azure_storage_token);
359
360 assert_eq!(
361 builder.get_config_value(&AzureConfigKey::ClientId).unwrap(),
362 azure_client_id
363 );
364 assert_eq!(
365 builder
366 .get_config_value(&AzureConfigKey::AccountName)
367 .unwrap(),
368 azure_storage_account_name
369 );
370 assert_eq!(
371 builder.get_config_value(&AzureConfigKey::Token).unwrap(),
372 azure_storage_token
373 );
374 }
375}