1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718use crate::aws::dynamo::DynamoCommit;
19use crate::config::Parse;
2021use itertools::Itertools;
2223/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`].
24///
25/// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
26/// [`AmazonS3`]: super::AmazonS3
27#[derive(Debug, Clone, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum S3CopyIfNotExists {
30/// Some S3-compatible stores, such as Cloudflare R2, support copy if not exists
31 /// semantics through custom headers.
32 ///
33 /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy operation
34 /// with the provided header pair, and expect the store to fail with `412 Precondition Failed`
35 /// if the destination file already exists.
36 ///
37 /// Encoded as `header:<HEADER_NAME>:<HEADER_VALUE>` ignoring whitespace
38 ///
39 /// For example `header: cf-copy-destination-if-none-match: *`, would set
40 /// the header `cf-copy-destination-if-none-match` to `*`
41 ///
42 /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
43Header(String, String),
44/// The same as [`S3CopyIfNotExists::Header`] but allows custom status code checking, for object stores that return values
45 /// other than 412.
46 ///
47 /// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
48HeaderWithStatus(String, String, reqwest::StatusCode),
49/// The name of a DynamoDB table to use for coordination
50 ///
51 /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
52 /// ignoring whitespace. The default timeout is used if not specified
53 ///
54 /// See [`DynamoCommit`] for more information
55 ///
56 /// This will use the same region, credentials and endpoint as configured for S3
57Dynamo(DynamoCommit),
58}
5960impl std::fmt::Display for S3CopyIfNotExists {
61fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62match self {
63Self::Header(k, v) => write!(f, "header: {}: {}", k, v),
64Self::HeaderWithStatus(k, v, code) => {
65write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
66 }
67Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
68 }
69 }
70}
7172impl S3CopyIfNotExists {
73fn from_str(s: &str) -> Option<Self> {
74let (variant, value) = s.split_once(':')?;
75match variant.trim() {
76"header" => {
77let (k, v) = value.split_once(':')?;
78Some(Self::Header(k.trim().to_string(), v.trim().to_string()))
79 }
80"header-with-status" => {
81let (k, v, status) = value.split(':').collect_tuple()?;
8283let code = status.trim().parse().ok()?;
8485Some(Self::HeaderWithStatus(
86 k.trim().to_string(),
87 v.trim().to_string(),
88 code,
89 ))
90 }
91"dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
92_ => None,
93 }
94 }
95}
9697impl Parse for S3CopyIfNotExists {
98fn parse(v: &str) -> crate::Result<Self> {
99Self::from_str(v).ok_or_else(|| crate::Error::Generic {
100 store: "Config",
101 source: format!("Failed to parse \"{v}\" as S3CopyIfNotExists").into(),
102 })
103 }
104}
105106/// Configure how to provide conditional put support for [`AmazonS3`].
107///
108/// [`AmazonS3`]: super::AmazonS3
109#[derive(Debug, Clone, Eq, PartialEq)]
110#[allow(missing_copy_implementations)]
111#[non_exhaustive]
112pub enum S3ConditionalPut {
113/// Some S3-compatible stores, such as Cloudflare R2 and minio support conditional
114 /// put using the standard [HTTP precondition] headers If-Match and If-None-Match
115 ///
116 /// Encoded as `etag` ignoring whitespace
117 ///
118 /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
119ETagMatch,
120121/// The name of a DynamoDB table to use for coordination
122 ///
123 /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
124 /// ignoring whitespace. The default timeout is used if not specified
125 ///
126 /// See [`DynamoCommit`] for more information
127 ///
128 /// This will use the same region, credentials and endpoint as configured for S3
129Dynamo(DynamoCommit),
130}
131132impl std::fmt::Display for S3ConditionalPut {
133fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134match self {
135Self::ETagMatch => write!(f, "etag"),
136Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
137 }
138 }
139}
140141impl S3ConditionalPut {
142fn from_str(s: &str) -> Option<Self> {
143match s.trim() {
144"etag" => Some(Self::ETagMatch),
145 trimmed => match trimmed.split_once(':')? {
146 ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
147_ => None,
148 },
149 }
150 }
151}
152153impl Parse for S3ConditionalPut {
154fn parse(v: &str) -> crate::Result<Self> {
155Self::from_str(v).ok_or_else(|| crate::Error::Generic {
156 store: "Config",
157 source: format!("Failed to parse \"{v}\" as S3PutConditional").into(),
158 })
159 }
160}
161162#[cfg(test)]
163mod tests {
164use super::S3CopyIfNotExists;
165use crate::aws::{DynamoCommit, S3ConditionalPut};
166167#[test]
168fn parse_s3_copy_if_not_exists_header() {
169let input = "header: cf-copy-destination-if-none-match: *";
170let expected = Some(S3CopyIfNotExists::Header(
171"cf-copy-destination-if-none-match".to_owned(),
172"*".to_owned(),
173 ));
174175assert_eq!(expected, S3CopyIfNotExists::from_str(input));
176 }
177178#[test]
179fn parse_s3_copy_if_not_exists_header_with_status() {
180let input = "header-with-status:key:value:403";
181let expected = Some(S3CopyIfNotExists::HeaderWithStatus(
182"key".to_owned(),
183"value".to_owned(),
184 reqwest::StatusCode::FORBIDDEN,
185 ));
186187assert_eq!(expected, S3CopyIfNotExists::from_str(input));
188 }
189190#[test]
191fn parse_s3_copy_if_not_exists_dynamo() {
192let input = "dynamo: table:100";
193let expected = Some(S3CopyIfNotExists::Dynamo(
194 DynamoCommit::new("table".into()).with_timeout(100),
195 ));
196assert_eq!(expected, S3CopyIfNotExists::from_str(input));
197 }
198199#[test]
200fn parse_s3_condition_put_dynamo() {
201let input = "dynamo: table:1300";
202let expected = Some(S3ConditionalPut::Dynamo(
203 DynamoCommit::new("table".into()).with_timeout(1300),
204 ));
205assert_eq!(expected, S3ConditionalPut::from_str(input));
206 }
207208#[test]
209fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
210let expected = Some(S3CopyIfNotExists::Header(
211"cf-copy-destination-if-none-match".to_owned(),
212"*".to_owned(),
213 ));
214215const INPUTS: &[&str] = &[
216"header:cf-copy-destination-if-none-match:*",
217"header: cf-copy-destination-if-none-match:*",
218"header: cf-copy-destination-if-none-match: *",
219"header : cf-copy-destination-if-none-match: *",
220"header : cf-copy-destination-if-none-match : *",
221"header : cf-copy-destination-if-none-match : * ",
222 ];
223224for input in INPUTS {
225assert_eq!(expected, S3CopyIfNotExists::from_str(input));
226 }
227 }
228229#[test]
230fn parse_s3_copy_if_not_exists_header_with_status_whitespace_invariant() {
231let expected = Some(S3CopyIfNotExists::HeaderWithStatus(
232"key".to_owned(),
233"value".to_owned(),
234 reqwest::StatusCode::FORBIDDEN,
235 ));
236237const INPUTS: &[&str] = &[
238"header-with-status:key:value:403",
239"header-with-status: key:value:403",
240"header-with-status: key: value:403",
241"header-with-status: key: value: 403",
242"header-with-status : key: value: 403",
243"header-with-status : key : value: 403",
244"header-with-status : key : value : 403",
245"header-with-status : key : value : 403 ",
246 ];
247248for input in INPUTS {
249assert_eq!(expected, S3CopyIfNotExists::from_str(input));
250 }
251 }
252}