object_store/
local.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for a local filesystem
19use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22use std::sync::Arc;
23use std::time::SystemTime;
24use std::{collections::BTreeSet, convert::TryFrom, io};
25use std::{collections::VecDeque, path::PathBuf};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use chrono::{DateTime, Utc};
30use futures::{stream::BoxStream, StreamExt};
31use futures::{FutureExt, TryStreamExt};
32use parking_lot::Mutex;
33use snafu::{ensure, OptionExt, ResultExt, Snafu};
34use url::Url;
35use walkdir::{DirEntry, WalkDir};
36
37use crate::{
38    maybe_spawn_blocking,
39    path::{absolute_path_to_url, Path},
40    util::InvalidGetRange,
41    Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
42    ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
43};
44
45/// A specialized `Error` for filesystem object store-related errors
46#[derive(Debug, Snafu)]
47#[allow(missing_docs)]
48pub(crate) enum Error {
49    #[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
50    FileSizeOverflowedUsize {
51        source: std::num::TryFromIntError,
52        path: String,
53    },
54
55    #[snafu(display("Unable to walk dir: {}", source))]
56    UnableToWalkDir {
57        source: walkdir::Error,
58    },
59
60    #[snafu(display("Unable to access metadata for {}: {}", path, source))]
61    Metadata {
62        source: Box<dyn std::error::Error + Send + Sync + 'static>,
63        path: String,
64    },
65
66    #[snafu(display("Unable to copy data to file: {}", source))]
67    UnableToCopyDataToFile {
68        source: io::Error,
69    },
70
71    #[snafu(display("Unable to rename file: {}", source))]
72    UnableToRenameFile {
73        source: io::Error,
74    },
75
76    #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
77    UnableToCreateDir {
78        source: io::Error,
79        path: PathBuf,
80    },
81
82    #[snafu(display("Unable to create file {}: {}", path.display(), source))]
83    UnableToCreateFile {
84        source: io::Error,
85        path: PathBuf,
86    },
87
88    #[snafu(display("Unable to delete file {}: {}", path.display(), source))]
89    UnableToDeleteFile {
90        source: io::Error,
91        path: PathBuf,
92    },
93
94    #[snafu(display("Unable to open file {}: {}", path.display(), source))]
95    UnableToOpenFile {
96        source: io::Error,
97        path: PathBuf,
98    },
99
100    #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
101    UnableToReadBytes {
102        source: io::Error,
103        path: PathBuf,
104    },
105
106    #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
107    OutOfRange {
108        path: PathBuf,
109        expected: usize,
110        actual: usize,
111    },
112
113    #[snafu(display("Requested range was invalid"))]
114    InvalidRange {
115        source: InvalidGetRange,
116    },
117
118    #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
119    UnableToCopyFile {
120        from: PathBuf,
121        to: PathBuf,
122        source: io::Error,
123    },
124
125    NotFound {
126        path: PathBuf,
127        source: io::Error,
128    },
129
130    #[snafu(display("Error seeking file {}: {}", path.display(), source))]
131    Seek {
132        source: io::Error,
133        path: PathBuf,
134    },
135
136    #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
137    InvalidUrl {
138        url: Url,
139    },
140
141    AlreadyExists {
142        path: String,
143        source: io::Error,
144    },
145
146    #[snafu(display("Unable to canonicalize filesystem root: {}", path.display()))]
147    UnableToCanonicalize {
148        path: PathBuf,
149        source: io::Error,
150    },
151
152    #[snafu(display("Filenames containing trailing '/#\\d+/' are not supported: {}", path))]
153    InvalidPath {
154        path: String,
155    },
156
157    #[snafu(display("Upload aborted"))]
158    Aborted,
159}
160
161impl From<Error> for super::Error {
162    fn from(source: Error) -> Self {
163        match source {
164            Error::NotFound { path, source } => Self::NotFound {
165                path: path.to_string_lossy().to_string(),
166                source: source.into(),
167            },
168            Error::AlreadyExists { path, source } => Self::AlreadyExists {
169                path,
170                source: source.into(),
171            },
172            _ => Self::Generic {
173                store: "LocalFileSystem",
174                source: Box::new(source),
175            },
176        }
177    }
178}
179
180/// Local filesystem storage providing an [`ObjectStore`] interface to files on
181/// local disk. Can optionally be created with a directory prefix
182///
183/// # Path Semantics
184///
185/// This implementation follows the [file URI] scheme outlined in [RFC 3986]. In
186/// particular paths are delimited by `/`
187///
188/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
189/// [RFC 3986]: https://www.rfc-editor.org/rfc/rfc3986
190///
191/// # Path Semantics
192///
193/// [`LocalFileSystem`] will expose the path semantics of the underlying filesystem, which may
194/// have additional restrictions beyond those enforced by [`Path`].
195///
196/// For example:
197///
198/// * Windows forbids certain filenames, e.g. `COM0`,
199/// * Windows forbids folders with trailing `.`
200/// * Windows forbids certain ASCII characters, e.g. `<` or `|`
201/// * OS X forbids filenames containing `:`
202/// * Leading `-` are discouraged on Unix systems where they may be interpreted as CLI flags
203/// * Filesystems may have restrictions on the maximum path or path segment length
204/// * Filesystem support for non-ASCII characters is inconsistent
205///
206/// Additionally some filesystems, such as NTFS, are case-insensitive, whilst others like
207/// FAT don't preserve case at all. Further some filesystems support non-unicode character
208/// sequences, such as unpaired UTF-16 surrogates, and [`LocalFileSystem`] will error on
209/// encountering such sequences.
210///
211/// Finally, filenames matching the regex `/.*#\d+/`, e.g. `foo.parquet#123`, are not supported
212/// by [`LocalFileSystem`] as they are used to provide atomic writes. Such files will be ignored
213/// for listing operations, and attempting to address such a file will error.
214///
215/// # Tokio Compatibility
216///
217/// Tokio discourages performing blocking IO on a tokio worker thread, however,
218/// no major operating systems have stable async file APIs. Therefore if called from
219/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
220/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
221///
222/// If not called from a tokio context, this will perform IO on the current thread with
223/// no additional complexity or overheads
224///
225/// # Symlinks
226///
227/// [`LocalFileSystem`] will follow symlinks as normal, however, it is worth noting:
228///
229/// * Broken symlinks will be silently ignored by listing operations
230/// * No effort is made to prevent breaking symlinks when deleting files
231/// * Symlinks that resolve to paths outside the root **will** be followed
232/// * Mutating a file through one or more symlinks will mutate the underlying file
233/// * Deleting a path that resolves to a symlink will only delete the symlink
234///
235/// # Cross-Filesystem Copy
236///
237/// [`LocalFileSystem::copy`] is implemented using [`std::fs::hard_link`], and therefore
238/// does not support copying across filesystem boundaries.
239///
240#[derive(Debug)]
241pub struct LocalFileSystem {
242    config: Arc<Config>,
243    // if you want to delete empty directories when deleting files
244    automatic_cleanup: bool,
245}
246
247#[derive(Debug)]
248struct Config {
249    root: Url,
250}
251
252impl std::fmt::Display for LocalFileSystem {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "LocalFileSystem({})", self.config.root)
255    }
256}
257
258impl Default for LocalFileSystem {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264impl LocalFileSystem {
265    /// Create new filesystem storage with no prefix
266    pub fn new() -> Self {
267        Self {
268            config: Arc::new(Config {
269                root: Url::parse("file:///").unwrap(),
270            }),
271            automatic_cleanup: false,
272        }
273    }
274
275    /// Create new filesystem storage with `prefix` applied to all paths
276    ///
277    /// Returns an error if the path does not exist
278    ///
279    pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
280        let path = std::fs::canonicalize(&prefix).context(UnableToCanonicalizeSnafu {
281            path: prefix.as_ref(),
282        })?;
283
284        Ok(Self {
285            config: Arc::new(Config {
286                root: absolute_path_to_url(path)?,
287            }),
288            automatic_cleanup: false,
289        })
290    }
291
292    /// Return an absolute filesystem path of the given file location
293    pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
294        ensure!(
295            is_valid_file_path(location),
296            InvalidPathSnafu {
297                path: location.as_ref()
298            }
299        );
300        let path = self.config.prefix_to_filesystem(location)?;
301
302        #[cfg(target_os = "windows")]
303        let path = {
304            let path = path.to_string_lossy();
305
306            // Assume the first char is the drive letter and the next is a colon.
307            let mut out = String::new();
308            let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
309            let filepath = &path[2..].replace(':', "%3A"); // Replace subsequent colons
310            out.push_str(drive);
311            out.push_str(filepath);
312            PathBuf::from(out)
313        };
314
315        Ok(path)
316    }
317
318    /// Enable automatic cleanup of empty directories when deleting files
319    pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
320        self.automatic_cleanup = automatic_cleanup;
321        self
322    }
323}
324
325impl Config {
326    /// Return an absolute filesystem path of the given location
327    fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
328        let mut url = self.root.clone();
329        url.path_segments_mut()
330            .expect("url path")
331            // technically not necessary as Path ignores empty segments
332            // but avoids creating paths with "//" which look odd in error messages.
333            .pop_if_empty()
334            .extend(location.parts());
335
336        url.to_file_path()
337            .map_err(|_| Error::InvalidUrl { url }.into())
338    }
339
340    /// Resolves the provided absolute filesystem path to a [`Path`] prefix
341    fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
342        Ok(Path::from_absolute_path_with_base(
343            location,
344            Some(&self.root),
345        )?)
346    }
347}
348
349fn is_valid_file_path(path: &Path) -> bool {
350    match path.filename() {
351        Some(p) => match p.split_once('#') {
352            Some((_, suffix)) if !suffix.is_empty() => {
353                // Valid if contains non-digits
354                !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
355            }
356            _ => true,
357        },
358        None => false,
359    }
360}
361
362#[async_trait]
363impl ObjectStore for LocalFileSystem {
364    async fn put_opts(
365        &self,
366        location: &Path,
367        payload: PutPayload,
368        opts: PutOptions,
369    ) -> Result<PutResult> {
370        if matches!(opts.mode, PutMode::Update(_)) {
371            return Err(crate::Error::NotImplemented);
372        }
373
374        if !opts.attributes.is_empty() {
375            return Err(crate::Error::NotImplemented);
376        }
377
378        let path = self.path_to_filesystem(location)?;
379        maybe_spawn_blocking(move || {
380            let (mut file, staging_path) = new_staged_upload(&path)?;
381            let mut e_tag = None;
382
383            let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
384                Ok(_) => {
385                    let metadata = file.metadata().map_err(|e| Error::Metadata {
386                        source: e.into(),
387                        path: path.to_string_lossy().to_string(),
388                    })?;
389                    e_tag = Some(get_etag(&metadata));
390                    match opts.mode {
391                        PutMode::Overwrite => {
392                            // For some fuse types of file systems, the file must be closed first
393                            // to trigger the upload operation, and then renamed, such as Blobfuse
394                            std::mem::drop(file);
395                            match std::fs::rename(&staging_path, &path) {
396                                Ok(_) => None,
397                                Err(source) => Some(Error::UnableToRenameFile { source }),
398                            }
399                        }
400                        PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
401                            Ok(_) => {
402                                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
403                                None
404                            }
405                            Err(source) => match source.kind() {
406                                ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
407                                    path: path.to_str().unwrap().to_string(),
408                                    source,
409                                }),
410                                _ => Some(Error::UnableToRenameFile { source }),
411                            },
412                        },
413                        PutMode::Update(_) => unreachable!(),
414                    }
415                }
416                Err(source) => Some(Error::UnableToCopyDataToFile { source }),
417            };
418
419            if let Some(err) = err {
420                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
421                return Err(err.into());
422            }
423
424            Ok(PutResult {
425                e_tag,
426                version: None,
427            })
428        })
429        .await
430    }
431
432    async fn put_multipart_opts(
433        &self,
434        location: &Path,
435        opts: PutMultipartOpts,
436    ) -> Result<Box<dyn MultipartUpload>> {
437        if !opts.attributes.is_empty() {
438            return Err(crate::Error::NotImplemented);
439        }
440
441        let dest = self.path_to_filesystem(location)?;
442        let (file, src) = new_staged_upload(&dest)?;
443        Ok(Box::new(LocalUpload::new(src, dest, file)))
444    }
445
446    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
447        let location = location.clone();
448        let path = self.path_to_filesystem(&location)?;
449        maybe_spawn_blocking(move || {
450            let (file, metadata) = open_file(&path)?;
451            let meta = convert_metadata(metadata, location)?;
452            options.check_preconditions(&meta)?;
453
454            let range = match options.range {
455                Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
456                None => 0..meta.size,
457            };
458
459            Ok(GetResult {
460                payload: GetResultPayload::File(file, path),
461                attributes: Attributes::default(),
462                range,
463                meta,
464            })
465        })
466        .await
467    }
468
469    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
470        let path = self.path_to_filesystem(location)?;
471        maybe_spawn_blocking(move || {
472            let (mut file, _) = open_file(&path)?;
473            read_range(&mut file, &path, range)
474        })
475        .await
476    }
477
478    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
479        let path = self.path_to_filesystem(location)?;
480        let ranges = ranges.to_vec();
481        maybe_spawn_blocking(move || {
482            // Vectored IO might be faster
483            let (mut file, _) = open_file(&path)?;
484            ranges
485                .into_iter()
486                .map(|r| read_range(&mut file, &path, r))
487                .collect()
488        })
489        .await
490    }
491
492    async fn delete(&self, location: &Path) -> Result<()> {
493        let config = Arc::clone(&self.config);
494        let path = self.path_to_filesystem(location)?;
495        let automactic_cleanup = self.automatic_cleanup;
496        maybe_spawn_blocking(move || {
497            if let Err(e) = std::fs::remove_file(&path) {
498                Err(match e.kind() {
499                    ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
500                    _ => Error::UnableToDeleteFile { path, source: e }.into(),
501                })
502            } else if automactic_cleanup {
503                let root = &config.root;
504                let root = root
505                    .to_file_path()
506                    .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
507
508                // here we will try to traverse up and delete an empty dir if possible until we reach the root or get an error
509                let mut parent = path.parent();
510
511                while let Some(loc) = parent {
512                    if loc != root && std::fs::remove_dir(loc).is_ok() {
513                        parent = loc.parent();
514                    } else {
515                        break;
516                    }
517                }
518
519                Ok(())
520            } else {
521                Ok(())
522            }
523        })
524        .await
525    }
526
527    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
528        let config = Arc::clone(&self.config);
529
530        let root_path = match prefix {
531            Some(prefix) => match config.prefix_to_filesystem(prefix) {
532                Ok(path) => path,
533                Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
534            },
535            None => self.config.root.to_file_path().unwrap(),
536        };
537
538        let walkdir = WalkDir::new(root_path)
539            // Don't include the root directory itself
540            .min_depth(1)
541            .follow_links(true);
542
543        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
544            let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
545                Ok(entry) => entry,
546                Err(e) => return Some(Err(e)),
547            };
548
549            if !entry.path().is_file() {
550                return None;
551            }
552
553            match config.filesystem_to_path(entry.path()) {
554                Ok(path) => match is_valid_file_path(&path) {
555                    true => convert_entry(entry, path).transpose(),
556                    false => None,
557                },
558                Err(e) => Some(Err(e)),
559            }
560        });
561
562        // If no tokio context, return iterator directly as no
563        // need to perform chunked spawn_blocking reads
564        if tokio::runtime::Handle::try_current().is_err() {
565            return futures::stream::iter(s).boxed();
566        }
567
568        // Otherwise list in batches of CHUNK_SIZE
569        const CHUNK_SIZE: usize = 1024;
570
571        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
572        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
573            if buffer.is_empty() {
574                (s, buffer) = tokio::task::spawn_blocking(move || {
575                    for _ in 0..CHUNK_SIZE {
576                        match s.next() {
577                            Some(r) => buffer.push_back(r),
578                            None => break,
579                        }
580                    }
581                    (s, buffer)
582                })
583                .await?;
584            }
585
586            match buffer.pop_front() {
587                Some(Err(e)) => Err(e),
588                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
589                None => Ok(None),
590            }
591        })
592        .boxed()
593    }
594
595    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
596        let config = Arc::clone(&self.config);
597
598        let prefix = prefix.cloned().unwrap_or_default();
599        let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
600
601        maybe_spawn_blocking(move || {
602            let walkdir = WalkDir::new(&resolved_prefix)
603                .min_depth(1)
604                .max_depth(1)
605                .follow_links(true);
606
607            let mut common_prefixes = BTreeSet::new();
608            let mut objects = Vec::new();
609
610            for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
611                if let Some(entry) = entry_res? {
612                    let is_directory = entry.file_type().is_dir();
613                    let entry_location = config.filesystem_to_path(entry.path())?;
614                    if !is_directory && !is_valid_file_path(&entry_location) {
615                        continue;
616                    }
617
618                    let mut parts = match entry_location.prefix_match(&prefix) {
619                        Some(parts) => parts,
620                        None => continue,
621                    };
622
623                    let common_prefix = match parts.next() {
624                        Some(p) => p,
625                        None => continue,
626                    };
627
628                    drop(parts);
629
630                    if is_directory {
631                        common_prefixes.insert(prefix.child(common_prefix));
632                    } else if let Some(metadata) = convert_entry(entry, entry_location)? {
633                        objects.push(metadata);
634                    }
635                }
636            }
637
638            Ok(ListResult {
639                common_prefixes: common_prefixes.into_iter().collect(),
640                objects,
641            })
642        })
643        .await
644    }
645
646    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
647        let from = self.path_to_filesystem(from)?;
648        let to = self.path_to_filesystem(to)?;
649        let mut id = 0;
650        // In order to make this atomic we:
651        //
652        // - hard link to a hidden temporary file
653        // - atomically rename this temporary file into place
654        //
655        // This is necessary because hard_link returns an error if the destination already exists
656        maybe_spawn_blocking(move || loop {
657            let staged = staged_upload_path(&to, &id.to_string());
658            match std::fs::hard_link(&from, &staged) {
659                Ok(_) => {
660                    return std::fs::rename(&staged, &to).map_err(|source| {
661                        let _ = std::fs::remove_file(&staged); // Attempt to clean up
662                        Error::UnableToCopyFile { from, to, source }.into()
663                    });
664                }
665                Err(source) => match source.kind() {
666                    ErrorKind::AlreadyExists => id += 1,
667                    ErrorKind::NotFound => match from.exists() {
668                        true => create_parent_dirs(&to, source)?,
669                        false => return Err(Error::NotFound { path: from, source }.into()),
670                    },
671                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
672                },
673            }
674        })
675        .await
676    }
677
678    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
679        let from = self.path_to_filesystem(from)?;
680        let to = self.path_to_filesystem(to)?;
681        maybe_spawn_blocking(move || loop {
682            match std::fs::rename(&from, &to) {
683                Ok(_) => return Ok(()),
684                Err(source) => match source.kind() {
685                    ErrorKind::NotFound => match from.exists() {
686                        true => create_parent_dirs(&to, source)?,
687                        false => return Err(Error::NotFound { path: from, source }.into()),
688                    },
689                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
690                },
691            }
692        })
693        .await
694    }
695
696    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
697        let from = self.path_to_filesystem(from)?;
698        let to = self.path_to_filesystem(to)?;
699
700        maybe_spawn_blocking(move || loop {
701            match std::fs::hard_link(&from, &to) {
702                Ok(_) => return Ok(()),
703                Err(source) => match source.kind() {
704                    ErrorKind::AlreadyExists => {
705                        return Err(Error::AlreadyExists {
706                            path: to.to_str().unwrap().to_string(),
707                            source,
708                        }
709                        .into())
710                    }
711                    ErrorKind::NotFound => match from.exists() {
712                        true => create_parent_dirs(&to, source)?,
713                        false => return Err(Error::NotFound { path: from, source }.into()),
714                    },
715                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
716                },
717            }
718        })
719        .await
720    }
721}
722
723/// Creates the parent directories of `path` or returns an error based on `source` if no parent
724fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
725    let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
726        path: path.to_path_buf(),
727        source,
728    })?;
729
730    std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
731    Ok(())
732}
733
734/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path`
735///
736/// Creates any directories if necessary
737fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
738    let mut multipart_id = 1;
739    loop {
740        let suffix = multipart_id.to_string();
741        let path = staged_upload_path(base, &suffix);
742        let mut options = OpenOptions::new();
743        match options.read(true).write(true).create_new(true).open(&path) {
744            Ok(f) => return Ok((f, path)),
745            Err(source) => match source.kind() {
746                ErrorKind::AlreadyExists => multipart_id += 1,
747                ErrorKind::NotFound => create_parent_dirs(&path, source)?,
748                _ => return Err(Error::UnableToOpenFile { source, path }.into()),
749            },
750        }
751    }
752}
753
754/// Returns the unique upload for the given path and suffix
755fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
756    let mut staging_path = dest.as_os_str().to_owned();
757    staging_path.push("#");
758    staging_path.push(suffix);
759    staging_path.into()
760}
761
762#[derive(Debug)]
763struct LocalUpload {
764    /// The upload state
765    state: Arc<UploadState>,
766    /// The location of the temporary file
767    src: Option<PathBuf>,
768    /// The next offset to write into the file
769    offset: u64,
770}
771
772#[derive(Debug)]
773struct UploadState {
774    dest: PathBuf,
775    file: Mutex<File>,
776}
777
778impl LocalUpload {
779    pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
780        Self {
781            state: Arc::new(UploadState {
782                dest,
783                file: Mutex::new(file),
784            }),
785            src: Some(src),
786            offset: 0,
787        }
788    }
789}
790
791#[async_trait]
792impl MultipartUpload for LocalUpload {
793    fn put_part(&mut self, data: PutPayload) -> UploadPart {
794        let offset = self.offset;
795        self.offset += data.content_length() as u64;
796
797        let s = Arc::clone(&self.state);
798        maybe_spawn_blocking(move || {
799            let mut file = s.file.lock();
800            file.seek(SeekFrom::Start(offset))
801                .context(SeekSnafu { path: &s.dest })?;
802
803            data.iter()
804                .try_for_each(|x| file.write_all(x))
805                .context(UnableToCopyDataToFileSnafu)?;
806
807            Ok(())
808        })
809        .boxed()
810    }
811
812    async fn complete(&mut self) -> Result<PutResult> {
813        let src = self.src.take().context(AbortedSnafu)?;
814        let s = Arc::clone(&self.state);
815        maybe_spawn_blocking(move || {
816            // Ensure no inflight writes
817            let file = s.file.lock();
818            std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?;
819            let metadata = file.metadata().map_err(|e| Error::Metadata {
820                source: e.into(),
821                path: src.to_string_lossy().to_string(),
822            })?;
823
824            Ok(PutResult {
825                e_tag: Some(get_etag(&metadata)),
826                version: None,
827            })
828        })
829        .await
830    }
831
832    async fn abort(&mut self) -> Result<()> {
833        let src = self.src.take().context(AbortedSnafu)?;
834        maybe_spawn_blocking(move || {
835            std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?;
836            Ok(())
837        })
838        .await
839    }
840}
841
842impl Drop for LocalUpload {
843    fn drop(&mut self) {
844        if let Some(src) = self.src.take() {
845            // Try to clean up intermediate file ignoring any error
846            match tokio::runtime::Handle::try_current() {
847                Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
848                Err(_) => drop(std::fs::remove_file(src)),
849            };
850        }
851    }
852}
853
854pub(crate) fn chunked_stream(
855    mut file: File,
856    path: PathBuf,
857    range: Range<usize>,
858    chunk_size: usize,
859) -> BoxStream<'static, Result<Bytes, super::Error>> {
860    futures::stream::once(async move {
861        let (file, path) = maybe_spawn_blocking(move || {
862            file.seek(SeekFrom::Start(range.start as _))
863                .map_err(|source| Error::Seek {
864                    source,
865                    path: path.clone(),
866                })?;
867            Ok((file, path))
868        })
869        .await?;
870
871        let stream = futures::stream::try_unfold(
872            (file, path, range.end - range.start),
873            move |(mut file, path, remaining)| {
874                maybe_spawn_blocking(move || {
875                    if remaining == 0 {
876                        return Ok(None);
877                    }
878
879                    let to_read = remaining.min(chunk_size);
880                    let mut buffer = Vec::with_capacity(to_read);
881                    let read = (&mut file)
882                        .take(to_read as u64)
883                        .read_to_end(&mut buffer)
884                        .map_err(|e| Error::UnableToReadBytes {
885                            source: e,
886                            path: path.clone(),
887                        })?;
888
889                    Ok(Some((buffer.into(), (file, path, remaining - read))))
890                })
891            },
892        );
893        Ok::<_, super::Error>(stream)
894    })
895    .try_flatten()
896    .boxed()
897}
898
899pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
900    let to_read = range.end - range.start;
901    file.seek(SeekFrom::Start(range.start as u64))
902        .context(SeekSnafu { path })?;
903
904    let mut buf = Vec::with_capacity(to_read);
905    let read = file
906        .take(to_read as u64)
907        .read_to_end(&mut buf)
908        .context(UnableToReadBytesSnafu { path })?;
909
910    ensure!(
911        read == to_read,
912        OutOfRangeSnafu {
913            path,
914            expected: to_read,
915            actual: read
916        }
917    );
918    Ok(buf.into())
919}
920
921fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
922    let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
923        Err(e) => Err(match e.kind() {
924            ErrorKind::NotFound => Error::NotFound {
925                path: path.clone(),
926                source: e,
927            },
928            _ => Error::UnableToOpenFile {
929                path: path.clone(),
930                source: e,
931            },
932        }),
933        Ok((metadata, file)) => match !metadata.is_dir() {
934            true => Ok((file, metadata)),
935            false => Err(Error::NotFound {
936                path: path.clone(),
937                source: io::Error::new(ErrorKind::NotFound, "is directory"),
938            }),
939        },
940    }?;
941    Ok(ret)
942}
943
944fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
945    match entry.metadata() {
946        Ok(metadata) => convert_metadata(metadata, location).map(Some),
947        Err(e) => {
948            if let Some(io_err) = e.io_error() {
949                if io_err.kind() == ErrorKind::NotFound {
950                    return Ok(None);
951                }
952            }
953            Err(Error::Metadata {
954                source: e.into(),
955                path: location.to_string(),
956            })?
957        }
958    }
959}
960
961fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
962    metadata
963        .modified()
964        .expect("Modified file time should be supported on this platform")
965        .into()
966}
967
968fn get_etag(metadata: &Metadata) -> String {
969    let inode = get_inode(metadata);
970    let size = metadata.len();
971    let mtime = metadata
972        .modified()
973        .ok()
974        .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
975        .unwrap_or_default()
976        .as_micros();
977
978    // Use an ETag scheme based on that used by many popular HTTP servers
979    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
980    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
981    format!("{inode:x}-{mtime:x}-{size:x}")
982}
983
984fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
985    let last_modified = last_modified(&metadata);
986    let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
987        path: location.as_ref(),
988    })?;
989
990    Ok(ObjectMeta {
991        location,
992        last_modified,
993        size,
994        e_tag: Some(get_etag(&metadata)),
995        version: None,
996    })
997}
998
999#[cfg(unix)]
1000/// We include the inode when available to yield an ETag more resistant to collisions
1001/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
1002fn get_inode(metadata: &Metadata) -> u64 {
1003    std::os::unix::fs::MetadataExt::ino(metadata)
1004}
1005
1006#[cfg(not(unix))]
1007/// On platforms where an inode isn't available, fallback to just relying on size and mtime
1008fn get_inode(metadata: &Metadata) -> u64 {
1009    0
1010}
1011
1012/// Convert walkdir results and converts not-found errors into `None`.
1013/// Convert broken symlinks to `None`.
1014fn convert_walkdir_result(
1015    res: std::result::Result<DirEntry, walkdir::Error>,
1016) -> Result<Option<DirEntry>> {
1017    match res {
1018        Ok(entry) => {
1019            // To check for broken symlink: call symlink_metadata() - it does not traverse symlinks);
1020            // if ok: check if entry is symlink; and try to read it by calling metadata().
1021            match symlink_metadata(entry.path()) {
1022                Ok(attr) => {
1023                    if attr.is_symlink() {
1024                        let target_metadata = metadata(entry.path());
1025                        match target_metadata {
1026                            Ok(_) => {
1027                                // symlink is valid
1028                                Ok(Some(entry))
1029                            }
1030                            Err(_) => {
1031                                // this is a broken symlink, return None
1032                                Ok(None)
1033                            }
1034                        }
1035                    } else {
1036                        Ok(Some(entry))
1037                    }
1038                }
1039                Err(_) => Ok(None),
1040            }
1041        }
1042
1043        Err(walkdir_err) => match walkdir_err.io_error() {
1044            Some(io_err) => match io_err.kind() {
1045                ErrorKind::NotFound => Ok(None),
1046                _ => Err(Error::UnableToWalkDir {
1047                    source: walkdir_err,
1048                }
1049                .into()),
1050            },
1051            None => Err(Error::UnableToWalkDir {
1052                source: walkdir_err,
1053            }
1054            .into()),
1055        },
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use std::fs;
1062
1063    use futures::TryStreamExt;
1064    use tempfile::{NamedTempFile, TempDir};
1065
1066    use crate::integration::*;
1067
1068    use super::*;
1069
1070    #[tokio::test]
1071    #[cfg(target_family = "unix")]
1072    async fn file_test() {
1073        let root = TempDir::new().unwrap();
1074        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1075
1076        put_get_delete_list(&integration).await;
1077        get_opts(&integration).await;
1078        list_uses_directories_correctly(&integration).await;
1079        list_with_delimiter(&integration).await;
1080        rename_and_copy(&integration).await;
1081        copy_if_not_exists(&integration).await;
1082        copy_rename_nonexistent_object(&integration).await;
1083        stream_get(&integration).await;
1084        put_opts(&integration, false).await;
1085    }
1086
1087    #[test]
1088    #[cfg(target_family = "unix")]
1089    fn test_non_tokio() {
1090        let root = TempDir::new().unwrap();
1091        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1092        futures::executor::block_on(async move {
1093            put_get_delete_list(&integration).await;
1094            list_uses_directories_correctly(&integration).await;
1095            list_with_delimiter(&integration).await;
1096
1097            // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
1098            let p = Path::from("manual_upload");
1099            let mut upload = integration.put_multipart(&p).await.unwrap();
1100            upload.put_part("123".into()).await.unwrap();
1101            upload.put_part("45678".into()).await.unwrap();
1102            let r = upload.complete().await.unwrap();
1103
1104            let get = integration.get(&p).await.unwrap();
1105            assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1106            let actual = get.bytes().await.unwrap();
1107            assert_eq!(actual.as_ref(), b"12345678");
1108        });
1109    }
1110
1111    #[tokio::test]
1112    async fn creates_dir_if_not_present() {
1113        let root = TempDir::new().unwrap();
1114        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1115
1116        let location = Path::from("nested/file/test_file");
1117
1118        let data = Bytes::from("arbitrary data");
1119
1120        integration
1121            .put(&location, data.clone().into())
1122            .await
1123            .unwrap();
1124
1125        let read_data = integration
1126            .get(&location)
1127            .await
1128            .unwrap()
1129            .bytes()
1130            .await
1131            .unwrap();
1132        assert_eq!(&*read_data, data);
1133    }
1134
1135    #[tokio::test]
1136    async fn unknown_length() {
1137        let root = TempDir::new().unwrap();
1138        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1139
1140        let location = Path::from("some_file");
1141
1142        let data = Bytes::from("arbitrary data");
1143
1144        integration
1145            .put(&location, data.clone().into())
1146            .await
1147            .unwrap();
1148
1149        let read_data = integration
1150            .get(&location)
1151            .await
1152            .unwrap()
1153            .bytes()
1154            .await
1155            .unwrap();
1156        assert_eq!(&*read_data, data);
1157    }
1158
1159    #[tokio::test]
1160    #[cfg(target_family = "unix")]
1161    // Fails on github actions runner (which runs the tests as root)
1162    #[ignore]
1163    async fn bubble_up_io_errors() {
1164        use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1165
1166        let root = TempDir::new().unwrap();
1167
1168        // make non-readable
1169        let metadata = root.path().metadata().unwrap();
1170        let mut permissions = metadata.permissions();
1171        permissions.set_mode(0o000);
1172        set_permissions(root.path(), permissions).unwrap();
1173
1174        let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1175
1176        let mut stream = store.list(None);
1177        let mut any_err = false;
1178        while let Some(res) = stream.next().await {
1179            if res.is_err() {
1180                any_err = true;
1181            }
1182        }
1183        assert!(any_err);
1184
1185        // `list_with_delimiter
1186        assert!(store.list_with_delimiter(None).await.is_err());
1187    }
1188
1189    const NON_EXISTENT_NAME: &str = "nonexistentname";
1190
1191    #[tokio::test]
1192    async fn get_nonexistent_location() {
1193        let root = TempDir::new().unwrap();
1194        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1195
1196        let location = Path::from(NON_EXISTENT_NAME);
1197
1198        let err = get_nonexistent_object(&integration, Some(location))
1199            .await
1200            .unwrap_err();
1201        if let crate::Error::NotFound { path, source } = err {
1202            let source_variant = source.downcast_ref::<std::io::Error>();
1203            assert!(
1204                matches!(source_variant, Some(std::io::Error { .. }),),
1205                "got: {source_variant:?}"
1206            );
1207            assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1208        } else {
1209            panic!("unexpected error type: {err:?}");
1210        }
1211    }
1212
1213    #[tokio::test]
1214    async fn root() {
1215        let integration = LocalFileSystem::new();
1216
1217        let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1218        let url = Url::from_directory_path(&canonical).unwrap();
1219        let path = Path::parse(url.path()).unwrap();
1220
1221        let roundtrip = integration.path_to_filesystem(&path).unwrap();
1222
1223        // Needed as on Windows canonicalize returns extended length path syntax
1224        // C:\Users\circleci -> \\?\C:\Users\circleci
1225        let roundtrip = roundtrip.canonicalize().unwrap();
1226
1227        assert_eq!(roundtrip, canonical);
1228
1229        integration.head(&path).await.unwrap();
1230    }
1231
1232    #[tokio::test]
1233    #[cfg(target_family = "windows")]
1234    async fn test_list_root() {
1235        let fs = LocalFileSystem::new();
1236        let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1237
1238        assert!(
1239            r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1240            "{}",
1241            r
1242        );
1243    }
1244
1245    #[tokio::test]
1246    #[cfg(target_os = "linux")]
1247    async fn test_list_root() {
1248        let fs = LocalFileSystem::new();
1249        fs.list_with_delimiter(None).await.unwrap();
1250    }
1251
1252    async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1253        let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1254
1255        let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1256        strings.sort_unstable();
1257        assert_eq!(&strings, expected)
1258    }
1259
1260    #[tokio::test]
1261    #[cfg(target_family = "unix")]
1262    async fn test_symlink() {
1263        let root = TempDir::new().unwrap();
1264        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1265
1266        let subdir = root.path().join("a");
1267        std::fs::create_dir(&subdir).unwrap();
1268        let file = subdir.join("file.parquet");
1269        std::fs::write(file, "test").unwrap();
1270
1271        check_list(&integration, None, &["a/file.parquet"]).await;
1272        integration
1273            .head(&Path::from("a/file.parquet"))
1274            .await
1275            .unwrap();
1276
1277        // Follow out of tree symlink
1278        let other = NamedTempFile::new().unwrap();
1279        std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1280
1281        // Should return test.parquet even though out of tree
1282        check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1283
1284        // Can fetch test.parquet
1285        integration.head(&Path::from("test.parquet")).await.unwrap();
1286
1287        // Follow in tree symlink
1288        std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1289        check_list(
1290            &integration,
1291            None,
1292            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1293        )
1294        .await;
1295        check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1296
1297        // Can fetch through symlink
1298        integration
1299            .head(&Path::from("b/file.parquet"))
1300            .await
1301            .unwrap();
1302
1303        // Ignore broken symlink
1304        std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1305
1306        check_list(
1307            &integration,
1308            None,
1309            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1310        )
1311        .await;
1312
1313        let mut r = integration.list_with_delimiter(None).await.unwrap();
1314        r.common_prefixes.sort_unstable();
1315        assert_eq!(r.common_prefixes.len(), 2);
1316        assert_eq!(r.common_prefixes[0].as_ref(), "a");
1317        assert_eq!(r.common_prefixes[1].as_ref(), "b");
1318        assert_eq!(r.objects.len(), 1);
1319        assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1320
1321        let r = integration
1322            .list_with_delimiter(Some(&Path::from("a")))
1323            .await
1324            .unwrap();
1325        assert_eq!(r.common_prefixes.len(), 0);
1326        assert_eq!(r.objects.len(), 1);
1327        assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1328
1329        // Deleting a symlink doesn't delete the source file
1330        integration
1331            .delete(&Path::from("test.parquet"))
1332            .await
1333            .unwrap();
1334        assert!(other.path().exists());
1335
1336        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1337
1338        // Deleting through a symlink deletes both files
1339        integration
1340            .delete(&Path::from("b/file.parquet"))
1341            .await
1342            .unwrap();
1343
1344        check_list(&integration, None, &[]).await;
1345
1346        // Adding a file through a symlink creates in both paths
1347        integration
1348            .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1349            .await
1350            .unwrap();
1351
1352        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1353    }
1354
1355    #[tokio::test]
1356    async fn invalid_path() {
1357        let root = TempDir::new().unwrap();
1358        let root = root.path().join("🙀");
1359        std::fs::create_dir(root.clone()).unwrap();
1360
1361        // Invalid paths supported above root of store
1362        let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1363
1364        let directory = Path::from("directory");
1365        let object = directory.child("child.txt");
1366        let data = Bytes::from("arbitrary");
1367        integration.put(&object, data.clone().into()).await.unwrap();
1368        integration.head(&object).await.unwrap();
1369        let result = integration.get(&object).await.unwrap();
1370        assert_eq!(result.bytes().await.unwrap(), data);
1371
1372        flatten_list_stream(&integration, None).await.unwrap();
1373        flatten_list_stream(&integration, Some(&directory))
1374            .await
1375            .unwrap();
1376
1377        let result = integration
1378            .list_with_delimiter(Some(&directory))
1379            .await
1380            .unwrap();
1381        assert_eq!(result.objects.len(), 1);
1382        assert!(result.common_prefixes.is_empty());
1383        assert_eq!(result.objects[0].location, object);
1384
1385        let emoji = root.join("💀");
1386        std::fs::write(emoji, "foo").unwrap();
1387
1388        // Can list illegal file
1389        let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1390        paths.sort_unstable();
1391
1392        assert_eq!(
1393            paths,
1394            vec![
1395                Path::parse("directory/child.txt").unwrap(),
1396                Path::parse("💀").unwrap()
1397            ]
1398        );
1399    }
1400
1401    #[tokio::test]
1402    async fn list_hides_incomplete_uploads() {
1403        let root = TempDir::new().unwrap();
1404        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1405        let location = Path::from("some_file");
1406
1407        let data = PutPayload::from("arbitrary data");
1408        let mut u1 = integration.put_multipart(&location).await.unwrap();
1409        u1.put_part(data.clone()).await.unwrap();
1410
1411        let mut u2 = integration.put_multipart(&location).await.unwrap();
1412        u2.put_part(data).await.unwrap();
1413
1414        let list = flatten_list_stream(&integration, None).await.unwrap();
1415        assert_eq!(list.len(), 0);
1416
1417        assert_eq!(
1418            integration
1419                .list_with_delimiter(None)
1420                .await
1421                .unwrap()
1422                .objects
1423                .len(),
1424            0
1425        );
1426    }
1427
1428    #[tokio::test]
1429    async fn filesystem_filename_with_percent() {
1430        let temp_dir = TempDir::new().unwrap();
1431        let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1432        let filename = "L%3ABC.parquet";
1433
1434        std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1435
1436        let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1437        assert_eq!(res.len(), 1);
1438        assert_eq!(res[0].location.as_ref(), filename);
1439
1440        let res = integration.list_with_delimiter(None).await.unwrap();
1441        assert_eq!(res.objects.len(), 1);
1442        assert_eq!(res.objects[0].location.as_ref(), filename);
1443    }
1444
1445    #[tokio::test]
1446    async fn relative_paths() {
1447        LocalFileSystem::new_with_prefix(".").unwrap();
1448        LocalFileSystem::new_with_prefix("..").unwrap();
1449        LocalFileSystem::new_with_prefix("../..").unwrap();
1450
1451        let integration = LocalFileSystem::new();
1452        let path = Path::from_filesystem_path(".").unwrap();
1453        integration.list_with_delimiter(Some(&path)).await.unwrap();
1454    }
1455
1456    #[test]
1457    fn test_valid_path() {
1458        let cases = [
1459            ("foo#123/test.txt", true),
1460            ("foo#123/test#23.txt", true),
1461            ("foo#123/test#34", false),
1462            ("foo😁/test#34", false),
1463            ("foo/test#😁34", true),
1464        ];
1465
1466        for (case, expected) in cases {
1467            let path = Path::parse(case).unwrap();
1468            assert_eq!(is_valid_file_path(&path), expected);
1469        }
1470    }
1471
1472    #[tokio::test]
1473    async fn test_intermediate_files() {
1474        let root = TempDir::new().unwrap();
1475        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1476
1477        let a = Path::parse("foo#123/test.txt").unwrap();
1478        integration.put(&a, "test".into()).await.unwrap();
1479
1480        let list = flatten_list_stream(&integration, None).await.unwrap();
1481        assert_eq!(list, vec![a.clone()]);
1482
1483        std::fs::write(root.path().join("bar#123"), "test").unwrap();
1484
1485        // Should ignore file
1486        let list = flatten_list_stream(&integration, None).await.unwrap();
1487        assert_eq!(list, vec![a.clone()]);
1488
1489        let b = Path::parse("bar#123").unwrap();
1490        let err = integration.get(&b).await.unwrap_err().to_string();
1491        assert_eq!(err, "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123");
1492
1493        let c = Path::parse("foo#123.txt").unwrap();
1494        integration.put(&c, "test".into()).await.unwrap();
1495
1496        let mut list = flatten_list_stream(&integration, None).await.unwrap();
1497        list.sort_unstable();
1498        assert_eq!(list, vec![c, a]);
1499    }
1500
1501    #[tokio::test]
1502    #[cfg(target_os = "windows")]
1503    async fn filesystem_filename_with_colon() {
1504        let root = TempDir::new().unwrap();
1505        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1506        let path = Path::parse("file%3Aname.parquet").unwrap();
1507        let location = Path::parse("file:name.parquet").unwrap();
1508
1509        integration.put(&location, "test".into()).await.unwrap();
1510        let list = flatten_list_stream(&integration, None).await.unwrap();
1511        assert_eq!(list, vec![path.clone()]);
1512
1513        let result = integration
1514            .get(&location)
1515            .await
1516            .unwrap()
1517            .bytes()
1518            .await
1519            .unwrap();
1520        assert_eq!(result, Bytes::from("test"));
1521    }
1522
1523    #[tokio::test]
1524    async fn delete_dirs_automatically() {
1525        let root = TempDir::new().unwrap();
1526        let integration = LocalFileSystem::new_with_prefix(root.path())
1527            .unwrap()
1528            .with_automatic_cleanup(true);
1529        let location = Path::from("nested/file/test_file");
1530        let data = Bytes::from("arbitrary data");
1531
1532        integration
1533            .put(&location, data.clone().into())
1534            .await
1535            .unwrap();
1536
1537        let read_data = integration
1538            .get(&location)
1539            .await
1540            .unwrap()
1541            .bytes()
1542            .await
1543            .unwrap();
1544
1545        assert_eq!(&*read_data, data);
1546        assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1547        integration.delete(&location).await.unwrap();
1548        assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1549    }
1550}
1551
1552#[cfg(not(target_arch = "wasm32"))]
1553#[cfg(test)]
1554mod not_wasm_tests {
1555    use std::time::Duration;
1556    use tempfile::TempDir;
1557
1558    use crate::local::LocalFileSystem;
1559    use crate::{ObjectStore, Path, PutPayload};
1560
1561    #[tokio::test]
1562    async fn test_cleanup_intermediate_files() {
1563        let root = TempDir::new().unwrap();
1564        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1565
1566        let location = Path::from("some_file");
1567        let data = PutPayload::from_static(b"hello");
1568        let mut upload = integration.put_multipart(&location).await.unwrap();
1569        upload.put_part(data).await.unwrap();
1570
1571        let file_count = std::fs::read_dir(root.path()).unwrap().count();
1572        assert_eq!(file_count, 1);
1573        drop(upload);
1574
1575        for _ in 0..100 {
1576            tokio::time::sleep(Duration::from_millis(1)).await;
1577            let file_count = std::fs::read_dir(root.path()).unwrap().count();
1578            if file_count == 0 {
1579                return;
1580            }
1581        }
1582        panic!("Failed to cleanup file in 100ms")
1583    }
1584}
1585
1586#[cfg(target_family = "unix")]
1587#[cfg(test)]
1588mod unix_test {
1589    use std::fs::OpenOptions;
1590
1591    use nix::sys::stat;
1592    use nix::unistd;
1593    use tempfile::TempDir;
1594
1595    use crate::local::LocalFileSystem;
1596    use crate::{ObjectStore, Path};
1597
1598    #[tokio::test]
1599    async fn test_fifo() {
1600        let filename = "some_file";
1601        let root = TempDir::new().unwrap();
1602        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1603        let path = root.path().join(filename);
1604        unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1605
1606        // Need to open read and write side in parallel
1607        let spawned =
1608            tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1609
1610        let location = Path::from(filename);
1611        integration.head(&location).await.unwrap();
1612        integration.get(&location).await.unwrap();
1613
1614        spawned.await.unwrap();
1615    }
1616}