object_store/aws/
dynamo.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A DynamoDB based lock system
19
20use std::borrow::Cow;
21use std::collections::HashMap;
22use std::future::Future;
23use std::time::{Duration, Instant};
24
25use chrono::Utc;
26use reqwest::{Response, StatusCode};
27use serde::ser::SerializeMap;
28use serde::{Deserialize, Serialize, Serializer};
29
30use crate::aws::client::S3Client;
31use crate::aws::credential::CredentialExt;
32use crate::aws::{AwsAuthorizer, AwsCredential};
33use crate::client::get::GetClientExt;
34use crate::client::retry::Error as RetryError;
35use crate::client::retry::RetryExt;
36use crate::path::Path;
37use crate::{Error, GetOptions, Result};
38
39/// The exception returned by DynamoDB on conflict
40const CONFLICT: &str = "ConditionalCheckFailedException";
41
42const STORE: &str = "DynamoDB";
43
44/// A DynamoDB-based commit protocol, used to provide conditional write support for S3
45///
46/// ## Limitations
47///
48/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can
49/// therefore race with non-conditional operations, e.g. `put`, `copy`, `delete`, or
50/// conditional operations performed by writers not configured to synchronize with DynamoDB.
51///
52/// Workloads making use of this mechanism **must** ensure:
53///
54/// * Conditional and non-conditional operations are not performed on the same paths
55/// * Conditional operations are only performed via similarly configured clients
56///
57/// Additionally as the locking mechanism relies on timeouts to detect stale locks,
58/// performance will be poor for systems that frequently delete and then create
59/// objects at the same path, instead being optimised for systems that primarily create
60/// files with paths never used before, or perform conditional updates to existing files
61///
62/// ## Commit Protocol
63///
64/// The DynamoDB schema is as follows:
65///
66/// * A string partition key named `"path"`
67/// * A string sort key named `"etag"`
68/// * A numeric [TTL] attribute named `"ttl"`
69/// * A numeric attribute named `"generation"`
70/// * A numeric attribute named `"timeout"`
71///
72/// An appropriate DynamoDB table can be created with the CLI as follows:
73///
74/// ```bash
75/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
76/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
77/// ```
78///
79/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
80/// the commit protocol is as follows:
81///
82/// 1. Perform HEAD request on `path` and error on precondition mismatch
83/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
84///     1. On Success: Perform operation with the configured timeout
85///     2. On Conflict:
86///         1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
87///         2. If `timeout * max_skew_rate` passed, replace the record incrementing the `"generation"`
88///             1. On Success: GOTO 2.1
89///             2. On Conflict: GOTO 2.2
90///
91/// Provided no writer modifies an object with a given `path` and `etag` without first adding a
92/// corresponding record to DynamoDB, we are guaranteed that only one writer will ever commit.
93///
94/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited
95/// requirements of synchronizing object storage. The major changes are:
96///
97/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
98///     * Cheaper to generate, serialize and compare
99///     * Cannot collide
100///     * More human readable / interpretable
101/// * Relies on [TTL] to eventually clean up old locks
102///
103/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit protocol, but
104/// generalised to not make assumptions about the workload and not rely on first writing
105/// to a temporary path.
106///
107/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
108/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
109/// [S3 Multi-Cluster]: https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
110#[derive(Debug, Clone, Eq, PartialEq)]
111pub struct DynamoCommit {
112    table_name: String,
113    /// The number of milliseconds a lease is valid for
114    timeout: u64,
115    /// The maximum clock skew rate tolerated by the system
116    max_clock_skew_rate: u32,
117    /// The length of time a record will be retained in DynamoDB before being cleaned up
118    ///
119    /// This is purely an optimisation to avoid indefinite growth of the DynamoDB table
120    /// and does not impact how long clients may wait to acquire a lock
121    ttl: Duration,
122    /// The backoff duration before retesting a condition
123    test_interval: Duration,
124}
125
126impl DynamoCommit {
127    /// Create a new [`DynamoCommit`] with a given table name
128    pub fn new(table_name: String) -> Self {
129        Self {
130            table_name,
131            timeout: 20_000,
132            max_clock_skew_rate: 3,
133            ttl: Duration::from_secs(60 * 60),
134            test_interval: Duration::from_millis(100),
135        }
136    }
137
138    /// Overrides the lock timeout.
139    ///
140    /// A longer lock timeout reduces the probability of spurious commit failures and multi-writer
141    /// races, but will increase the time that writers must wait to reclaim a lock lost. The
142    /// default value of 20 seconds should be appropriate for must use-cases.
143    pub fn with_timeout(mut self, millis: u64) -> Self {
144        self.timeout = millis;
145        self
146    }
147
148    /// The maximum clock skew rate tolerated by the system.
149    ///
150    /// An environment in which the clock on the fastest node ticks twice as fast as the slowest
151    /// node, would have a clock skew rate of 2. The default value of 3 should be appropriate
152    /// for most environments.
153    pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
154        self.max_clock_skew_rate = rate;
155        self
156    }
157
158    /// The length of time a record should be retained in DynamoDB before being cleaned up
159    ///
160    /// This should be significantly larger than the configured lock timeout, with the default
161    /// value of 1 hour appropriate for most use-cases.
162    pub fn with_ttl(mut self, ttl: Duration) -> Self {
163        self.ttl = ttl;
164        self
165    }
166
167    /// Parse [`DynamoCommit`] from a string
168    pub(crate) fn from_str(value: &str) -> Option<Self> {
169        Some(match value.split_once(':') {
170            Some((table_name, timeout)) => {
171                Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
172            }
173            None => Self::new(value.trim().to_string()),
174        })
175    }
176
177    /// Returns the name of the DynamoDB table.
178    pub(crate) fn table_name(&self) -> &str {
179        &self.table_name
180    }
181
182    pub(crate) async fn copy_if_not_exists(
183        &self,
184        client: &S3Client,
185        from: &Path,
186        to: &Path,
187    ) -> Result<()> {
188        self.conditional_op(client, to, None, || async {
189            client.copy_request(from, to).send().await?;
190            Ok(())
191        })
192        .await
193    }
194
195    #[allow(clippy::future_not_send)] // Generics confound this lint
196    pub(crate) async fn conditional_op<F, Fut, T>(
197        &self,
198        client: &S3Client,
199        to: &Path,
200        etag: Option<&str>,
201        op: F,
202    ) -> Result<T>
203    where
204        F: FnOnce() -> Fut,
205        Fut: Future<Output = Result<T, Error>>,
206    {
207        check_precondition(client, to, etag).await?;
208
209        let mut previous_lease = None;
210
211        loop {
212            let existing = previous_lease.as_ref();
213            match self.try_lock(client, to.as_ref(), etag, existing).await? {
214                TryLockResult::Ok(lease) => {
215                    let expiry = lease.acquire + lease.timeout;
216                    return match tokio::time::timeout_at(expiry.into(), op()).await {
217                        Ok(Ok(v)) => Ok(v),
218                        Ok(Err(e)) => Err(e),
219                        Err(_) => Err(Error::Generic {
220                            store: "DynamoDB",
221                            source: format!(
222                                "Failed to perform conditional operation in {} milliseconds",
223                                self.timeout
224                            )
225                            .into(),
226                        }),
227                    };
228                }
229                TryLockResult::Conflict(conflict) => {
230                    let mut interval = tokio::time::interval(self.test_interval);
231                    let expiry = conflict.timeout * self.max_clock_skew_rate;
232                    loop {
233                        interval.tick().await;
234                        check_precondition(client, to, etag).await?;
235                        if conflict.acquire.elapsed() > expiry {
236                            previous_lease = Some(conflict);
237                            break;
238                        }
239                    }
240                }
241            }
242        }
243    }
244
245    /// Attempt to acquire a lock, reclaiming an existing lease if provided
246    async fn try_lock(
247        &self,
248        s3: &S3Client,
249        path: &str,
250        etag: Option<&str>,
251        existing: Option<&Lease>,
252    ) -> Result<TryLockResult> {
253        let attributes;
254        let (next_gen, condition_expression, expression_attribute_values) = match existing {
255            None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
256            Some(existing) => {
257                attributes = [(":g", AttributeValue::Number(existing.generation))];
258                (
259                    existing.generation.checked_add(1).unwrap(),
260                    "attribute_exists(#pk) AND generation = :g",
261                    Map(attributes.as_slice()),
262                )
263            }
264        };
265
266        let ttl = (Utc::now() + self.ttl).timestamp();
267        let items = [
268            ("path", AttributeValue::from(path)),
269            ("etag", AttributeValue::from(etag.unwrap_or("*"))),
270            ("generation", AttributeValue::Number(next_gen)),
271            ("timeout", AttributeValue::Number(self.timeout)),
272            ("ttl", AttributeValue::Number(ttl as _)),
273        ];
274        let names = [("#pk", "path")];
275
276        let req = PutItem {
277            table_name: &self.table_name,
278            condition_expression,
279            expression_attribute_values,
280            expression_attribute_names: Map(&names),
281            item: Map(&items),
282            return_values: None,
283            return_values_on_condition_check_failure: Some(ReturnValues::AllOld),
284        };
285
286        let credential = s3.config.get_credential().await?;
287
288        let acquire = Instant::now();
289        match self
290            .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", req)
291            .await
292        {
293            Ok(_) => Ok(TryLockResult::Ok(Lease {
294                acquire,
295                generation: next_gen,
296                timeout: Duration::from_millis(self.timeout),
297            })),
298            Err(e) => match parse_error_response(&e) {
299                Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) {
300                    Some(lease) => Ok(TryLockResult::Conflict(lease)),
301                    None => Err(Error::Generic {
302                        store: STORE,
303                        source: "Failed to extract lease from conflict ReturnValuesOnConditionCheckFailure response".into()
304                    }),
305                },
306                _ => Err(Error::Generic {
307                    store: STORE,
308                    source: Box::new(e),
309                }),
310            },
311        }
312    }
313
314    async fn request<R: Serialize + Send + Sync>(
315        &self,
316        s3: &S3Client,
317        cred: Option<&AwsCredential>,
318        target: &str,
319        req: R,
320    ) -> Result<Response, RetryError> {
321        let region = &s3.config.region;
322        let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region));
323
324        let builder = match &s3.config.endpoint {
325            Some(e) => s3.client.post(e),
326            None => {
327                let url = format!("https://dynamodb.{region}.amazonaws.com");
328                s3.client.post(url)
329            }
330        };
331
332        builder
333            .timeout(Duration::from_millis(self.timeout))
334            .json(&req)
335            .header("X-Amz-Target", target)
336            .with_aws_sigv4(authorizer, None)
337            .send_retry(&s3.config.retry_config)
338            .await
339    }
340}
341
342#[derive(Debug)]
343enum TryLockResult {
344    /// Successfully acquired a lease
345    Ok(Lease),
346    /// An existing lease was found
347    Conflict(Lease),
348}
349
350/// Validates that `path` has the given `etag` or doesn't exist if `None`
351async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
352    let options = GetOptions {
353        head: true,
354        ..Default::default()
355    };
356
357    match etag {
358        Some(expected) => match client.get_opts(path, options).await {
359            Ok(r) => match r.meta.e_tag {
360                Some(actual) if expected == actual => Ok(()),
361                actual => Err(Error::Precondition {
362                    path: path.to_string(),
363                    source: format!("{} does not match {expected}", actual.unwrap_or_default())
364                        .into(),
365                }),
366            },
367            Err(Error::NotFound { .. }) => Err(Error::Precondition {
368                path: path.to_string(),
369                source: format!("Object at location {path} not found").into(),
370            }),
371            Err(e) => Err(e),
372        },
373        None => match client.get_opts(path, options).await {
374            Ok(_) => Err(Error::AlreadyExists {
375                path: path.to_string(),
376                source: "Already Exists".to_string().into(),
377            }),
378            Err(Error::NotFound { .. }) => Ok(()),
379            Err(e) => Err(e),
380        },
381    }
382}
383
384/// Parses the error response if any
385fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
386    match e {
387        RetryError::Client {
388            status: StatusCode::BAD_REQUEST,
389            body: Some(b),
390        } => serde_json::from_str(b).ok(),
391        _ => None,
392    }
393}
394
395/// Extracts a lease from `item`, returning `None` on error
396fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
397    let generation = match item.get("generation") {
398        Some(AttributeValue::Number(generation)) => generation,
399        _ => return None,
400    };
401
402    let timeout = match item.get("timeout") {
403        Some(AttributeValue::Number(timeout)) => *timeout,
404        _ => return None,
405    };
406
407    Some(Lease {
408        acquire: Instant::now(),
409        generation: *generation,
410        timeout: Duration::from_millis(timeout),
411    })
412}
413
414/// A lock lease
415#[derive(Debug, Clone)]
416struct Lease {
417    acquire: Instant,
418    generation: u64,
419    timeout: Duration,
420}
421
422/// A DynamoDB [PutItem] payload
423///
424/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
425#[derive(Serialize)]
426#[serde(rename_all = "PascalCase")]
427struct PutItem<'a> {
428    /// The table name
429    table_name: &'a str,
430
431    /// A condition that must be satisfied in order for a conditional PutItem operation to succeed.
432    condition_expression: &'a str,
433
434    /// One or more substitution tokens for attribute names in an expression
435    expression_attribute_names: Map<'a, &'a str, &'a str>,
436
437    /// One or more values that can be substituted in an expression
438    expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
439
440    /// A map of attribute name/value pairs, one for each attribute
441    item: Map<'a, &'a str, AttributeValue<'a>>,
442
443    /// Use ReturnValues if you want to get the item attributes as they appeared
444    /// before they were updated with the PutItem request.
445    #[serde(skip_serializing_if = "Option::is_none")]
446    return_values: Option<ReturnValues>,
447
448    /// An optional parameter that returns the item attributes for a PutItem operation
449    /// that failed a condition check.
450    #[serde(skip_serializing_if = "Option::is_none")]
451    return_values_on_condition_check_failure: Option<ReturnValues>,
452}
453
454#[derive(Deserialize)]
455struct ErrorResponse<'a> {
456    #[serde(rename = "__type")]
457    error: &'a str,
458
459    #[serde(borrow, default, rename = "Item")]
460    item: HashMap<&'a str, AttributeValue<'a>>,
461}
462
463#[derive(Serialize)]
464#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
465enum ReturnValues {
466    AllOld,
467}
468
469/// A collection of key value pairs
470///
471/// This provides cheap, ordered serialization of maps
472struct Map<'a, K, V>(&'a [(K, V)]);
473
474impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
475    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
476    where
477        S: Serializer,
478    {
479        if self.0.is_empty() {
480            return serializer.serialize_none();
481        }
482        let mut map = serializer.serialize_map(Some(self.0.len()))?;
483        for (k, v) in self.0 {
484            map.serialize_entry(k, v)?
485        }
486        map.end()
487    }
488}
489
490/// A DynamoDB [AttributeValue]
491///
492/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
493#[derive(Debug, Serialize, Deserialize)]
494enum AttributeValue<'a> {
495    #[serde(rename = "S")]
496    String(Cow<'a, str>),
497    #[serde(rename = "N", with = "number")]
498    Number(u64),
499}
500
501impl<'a> From<&'a str> for AttributeValue<'a> {
502    fn from(value: &'a str) -> Self {
503        Self::String(Cow::Borrowed(value))
504    }
505}
506
507/// Numbers are serialized as strings
508mod number {
509    use serde::{Deserialize, Deserializer, Serializer};
510
511    pub fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok, S::Error> {
512        s.serialize_str(&v.to_string())
513    }
514
515    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64, D::Error> {
516        let v: &str = Deserialize::deserialize(d)?;
517        v.parse().map_err(serde::de::Error::custom)
518    }
519}
520
521/// Re-export integration_test to be called by s3_test
522#[cfg(test)]
523pub(crate) use tests::integration_test;
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::aws::AmazonS3;
529    use crate::ObjectStore;
530    use rand::distributions::Alphanumeric;
531    use rand::{thread_rng, Rng};
532
533    #[test]
534    fn test_attribute_serde() {
535        let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap();
536        assert_eq!(serde, "{\"N\":\"23\"}");
537        let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
538        assert!(matches!(back, AttributeValue::Number(23)));
539    }
540
541    /// An integration test for DynamoDB
542    ///
543    /// This is a function called by s3_test to avoid test concurrency issues
544    pub async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) {
545        let client = integration.client.as_ref();
546
547        let src = Path::from("dynamo_path_src");
548        integration.put(&src, "asd".into()).await.unwrap();
549
550        let dst = Path::from("dynamo_path");
551        let _ = integration.delete(&dst).await; // Delete if present
552
553        // Create a lock if not already exists
554        let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
555            TryLockResult::Conflict(l) => l,
556            TryLockResult::Ok(l) => l,
557        };
558
559        // Should not be able to acquire a lock again
560        let r = d.try_lock(client, dst.as_ref(), None, None).await;
561        assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
562
563        // But should still be able to reclaim lock and perform copy
564        d.copy_if_not_exists(client, &src, &dst).await.unwrap();
565
566        match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
567            TryLockResult::Conflict(new) => {
568                // Should have incremented generation to do so
569                assert_eq!(new.generation, existing.generation + 1);
570            }
571            _ => panic!("Should conflict"),
572        }
573
574        let rng = thread_rng();
575        let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
576        let t = Some(etag.as_str());
577
578        let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
579            TryLockResult::Ok(l) => l,
580            _ => panic!("should not conflict"),
581        };
582
583        match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
584            TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
585            _ => panic!("should conflict"),
586        }
587
588        match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
589            TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
590            _ => panic!("should not conflict"),
591        }
592    }
593}