object_store/aws/
precondition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aws::dynamo::DynamoCommit;
19use crate::config::Parse;
20
21use itertools::Itertools;
22
23/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`].
24///
25/// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
26/// [`AmazonS3`]: super::AmazonS3
27#[derive(Debug, Clone, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum S3CopyIfNotExists {
30    /// Some S3-compatible stores, such as Cloudflare R2, support copy if not exists
31    /// semantics through custom headers.
32    ///
33    /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy operation
34    /// with the provided header pair, and expect the store to fail with `412 Precondition Failed`
35    /// if the destination file already exists.
36    ///
37    /// Encoded as `header:<HEADER_NAME>:<HEADER_VALUE>` ignoring whitespace
38    ///
39    /// For example `header: cf-copy-destination-if-none-match: *`, would set
40    /// the header `cf-copy-destination-if-none-match` to `*`
41    ///
42    /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
43    Header(String, String),
44    /// The same as [`S3CopyIfNotExists::Header`] but allows custom status code checking, for object stores that return values
45    /// other than 412.
46    ///
47    /// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
48    HeaderWithStatus(String, String, reqwest::StatusCode),
49    /// The name of a DynamoDB table to use for coordination
50    ///
51    /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
52    /// ignoring whitespace. The default timeout is used if not specified
53    ///
54    /// See [`DynamoCommit`] for more information
55    ///
56    /// This will use the same region, credentials and endpoint as configured for S3
57    Dynamo(DynamoCommit),
58}
59
60impl std::fmt::Display for S3CopyIfNotExists {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            Self::Header(k, v) => write!(f, "header: {}: {}", k, v),
64            Self::HeaderWithStatus(k, v, code) => {
65                write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
66            }
67            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
68        }
69    }
70}
71
72impl S3CopyIfNotExists {
73    fn from_str(s: &str) -> Option<Self> {
74        let (variant, value) = s.split_once(':')?;
75        match variant.trim() {
76            "header" => {
77                let (k, v) = value.split_once(':')?;
78                Some(Self::Header(k.trim().to_string(), v.trim().to_string()))
79            }
80            "header-with-status" => {
81                let (k, v, status) = value.split(':').collect_tuple()?;
82
83                let code = status.trim().parse().ok()?;
84
85                Some(Self::HeaderWithStatus(
86                    k.trim().to_string(),
87                    v.trim().to_string(),
88                    code,
89                ))
90            }
91            "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
92            _ => None,
93        }
94    }
95}
96
97impl Parse for S3CopyIfNotExists {
98    fn parse(v: &str) -> crate::Result<Self> {
99        Self::from_str(v).ok_or_else(|| crate::Error::Generic {
100            store: "Config",
101            source: format!("Failed to parse \"{v}\" as S3CopyIfNotExists").into(),
102        })
103    }
104}
105
106/// Configure how to provide conditional put support for [`AmazonS3`].
107///
108/// [`AmazonS3`]: super::AmazonS3
109#[derive(Debug, Clone, Eq, PartialEq)]
110#[allow(missing_copy_implementations)]
111#[non_exhaustive]
112pub enum S3ConditionalPut {
113    /// Some S3-compatible stores, such as Cloudflare R2 and minio support conditional
114    /// put using the standard [HTTP precondition] headers If-Match and If-None-Match
115    ///
116    /// Encoded as `etag` ignoring whitespace
117    ///
118    /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
119    ETagMatch,
120
121    /// The name of a DynamoDB table to use for coordination
122    ///
123    /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
124    /// ignoring whitespace. The default timeout is used if not specified
125    ///
126    /// See [`DynamoCommit`] for more information
127    ///
128    /// This will use the same region, credentials and endpoint as configured for S3
129    Dynamo(DynamoCommit),
130}
131
132impl std::fmt::Display for S3ConditionalPut {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        match self {
135            Self::ETagMatch => write!(f, "etag"),
136            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
137        }
138    }
139}
140
141impl S3ConditionalPut {
142    fn from_str(s: &str) -> Option<Self> {
143        match s.trim() {
144            "etag" => Some(Self::ETagMatch),
145            trimmed => match trimmed.split_once(':')? {
146                ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
147                _ => None,
148            },
149        }
150    }
151}
152
153impl Parse for S3ConditionalPut {
154    fn parse(v: &str) -> crate::Result<Self> {
155        Self::from_str(v).ok_or_else(|| crate::Error::Generic {
156            store: "Config",
157            source: format!("Failed to parse \"{v}\" as S3PutConditional").into(),
158        })
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::S3CopyIfNotExists;
165    use crate::aws::{DynamoCommit, S3ConditionalPut};
166
167    #[test]
168    fn parse_s3_copy_if_not_exists_header() {
169        let input = "header: cf-copy-destination-if-none-match: *";
170        let expected = Some(S3CopyIfNotExists::Header(
171            "cf-copy-destination-if-none-match".to_owned(),
172            "*".to_owned(),
173        ));
174
175        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
176    }
177
178    #[test]
179    fn parse_s3_copy_if_not_exists_header_with_status() {
180        let input = "header-with-status:key:value:403";
181        let expected = Some(S3CopyIfNotExists::HeaderWithStatus(
182            "key".to_owned(),
183            "value".to_owned(),
184            reqwest::StatusCode::FORBIDDEN,
185        ));
186
187        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
188    }
189
190    #[test]
191    fn parse_s3_copy_if_not_exists_dynamo() {
192        let input = "dynamo: table:100";
193        let expected = Some(S3CopyIfNotExists::Dynamo(
194            DynamoCommit::new("table".into()).with_timeout(100),
195        ));
196        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
197    }
198
199    #[test]
200    fn parse_s3_condition_put_dynamo() {
201        let input = "dynamo: table:1300";
202        let expected = Some(S3ConditionalPut::Dynamo(
203            DynamoCommit::new("table".into()).with_timeout(1300),
204        ));
205        assert_eq!(expected, S3ConditionalPut::from_str(input));
206    }
207
208    #[test]
209    fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
210        let expected = Some(S3CopyIfNotExists::Header(
211            "cf-copy-destination-if-none-match".to_owned(),
212            "*".to_owned(),
213        ));
214
215        const INPUTS: &[&str] = &[
216            "header:cf-copy-destination-if-none-match:*",
217            "header: cf-copy-destination-if-none-match:*",
218            "header: cf-copy-destination-if-none-match: *",
219            "header : cf-copy-destination-if-none-match: *",
220            "header : cf-copy-destination-if-none-match : *",
221            "header : cf-copy-destination-if-none-match : * ",
222        ];
223
224        for input in INPUTS {
225            assert_eq!(expected, S3CopyIfNotExists::from_str(input));
226        }
227    }
228
229    #[test]
230    fn parse_s3_copy_if_not_exists_header_with_status_whitespace_invariant() {
231        let expected = Some(S3CopyIfNotExists::HeaderWithStatus(
232            "key".to_owned(),
233            "value".to_owned(),
234            reqwest::StatusCode::FORBIDDEN,
235        ));
236
237        const INPUTS: &[&str] = &[
238            "header-with-status:key:value:403",
239            "header-with-status: key:value:403",
240            "header-with-status: key: value:403",
241            "header-with-status: key: value: 403",
242            "header-with-status : key: value: 403",
243            "header-with-status : key : value: 403",
244            "header-with-status : key : value : 403",
245            "header-with-status : key : value : 403 ",
246        ];
247
248        for input in INPUTS {
249            assert_eq!(expected, S3CopyIfNotExists::from_str(input));
250        }
251    }
252}