1use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22use std::sync::Arc;
23use std::time::SystemTime;
24use std::{collections::BTreeSet, convert::TryFrom, io};
25use std::{collections::VecDeque, path::PathBuf};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use chrono::{DateTime, Utc};
30use futures::{stream::BoxStream, StreamExt};
31use futures::{FutureExt, TryStreamExt};
32use parking_lot::Mutex;
33use snafu::{ensure, OptionExt, ResultExt, Snafu};
34use url::Url;
35use walkdir::{DirEntry, WalkDir};
36
37use crate::{
38 maybe_spawn_blocking,
39 path::{absolute_path_to_url, Path},
40 util::InvalidGetRange,
41 Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
42 ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
43};
44
45#[derive(Debug, Snafu)]
47#[allow(missing_docs)]
48pub(crate) enum Error {
49 #[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
50 FileSizeOverflowedUsize {
51 source: std::num::TryFromIntError,
52 path: String,
53 },
54
55 #[snafu(display("Unable to walk dir: {}", source))]
56 UnableToWalkDir {
57 source: walkdir::Error,
58 },
59
60 #[snafu(display("Unable to access metadata for {}: {}", path, source))]
61 Metadata {
62 source: Box<dyn std::error::Error + Send + Sync + 'static>,
63 path: String,
64 },
65
66 #[snafu(display("Unable to copy data to file: {}", source))]
67 UnableToCopyDataToFile {
68 source: io::Error,
69 },
70
71 #[snafu(display("Unable to rename file: {}", source))]
72 UnableToRenameFile {
73 source: io::Error,
74 },
75
76 #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
77 UnableToCreateDir {
78 source: io::Error,
79 path: PathBuf,
80 },
81
82 #[snafu(display("Unable to create file {}: {}", path.display(), source))]
83 UnableToCreateFile {
84 source: io::Error,
85 path: PathBuf,
86 },
87
88 #[snafu(display("Unable to delete file {}: {}", path.display(), source))]
89 UnableToDeleteFile {
90 source: io::Error,
91 path: PathBuf,
92 },
93
94 #[snafu(display("Unable to open file {}: {}", path.display(), source))]
95 UnableToOpenFile {
96 source: io::Error,
97 path: PathBuf,
98 },
99
100 #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
101 UnableToReadBytes {
102 source: io::Error,
103 path: PathBuf,
104 },
105
106 #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
107 OutOfRange {
108 path: PathBuf,
109 expected: usize,
110 actual: usize,
111 },
112
113 #[snafu(display("Requested range was invalid"))]
114 InvalidRange {
115 source: InvalidGetRange,
116 },
117
118 #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
119 UnableToCopyFile {
120 from: PathBuf,
121 to: PathBuf,
122 source: io::Error,
123 },
124
125 NotFound {
126 path: PathBuf,
127 source: io::Error,
128 },
129
130 #[snafu(display("Error seeking file {}: {}", path.display(), source))]
131 Seek {
132 source: io::Error,
133 path: PathBuf,
134 },
135
136 #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
137 InvalidUrl {
138 url: Url,
139 },
140
141 AlreadyExists {
142 path: String,
143 source: io::Error,
144 },
145
146 #[snafu(display("Unable to canonicalize filesystem root: {}", path.display()))]
147 UnableToCanonicalize {
148 path: PathBuf,
149 source: io::Error,
150 },
151
152 #[snafu(display("Filenames containing trailing '/#\\d+/' are not supported: {}", path))]
153 InvalidPath {
154 path: String,
155 },
156
157 #[snafu(display("Upload aborted"))]
158 Aborted,
159}
160
161impl From<Error> for super::Error {
162 fn from(source: Error) -> Self {
163 match source {
164 Error::NotFound { path, source } => Self::NotFound {
165 path: path.to_string_lossy().to_string(),
166 source: source.into(),
167 },
168 Error::AlreadyExists { path, source } => Self::AlreadyExists {
169 path,
170 source: source.into(),
171 },
172 _ => Self::Generic {
173 store: "LocalFileSystem",
174 source: Box::new(source),
175 },
176 }
177 }
178}
179
180#[derive(Debug)]
241pub struct LocalFileSystem {
242 config: Arc<Config>,
243 automatic_cleanup: bool,
245}
246
247#[derive(Debug)]
248struct Config {
249 root: Url,
250}
251
252impl std::fmt::Display for LocalFileSystem {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "LocalFileSystem({})", self.config.root)
255 }
256}
257
258impl Default for LocalFileSystem {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264impl LocalFileSystem {
265 pub fn new() -> Self {
267 Self {
268 config: Arc::new(Config {
269 root: Url::parse("file:///").unwrap(),
270 }),
271 automatic_cleanup: false,
272 }
273 }
274
275 pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
280 let path = std::fs::canonicalize(&prefix).context(UnableToCanonicalizeSnafu {
281 path: prefix.as_ref(),
282 })?;
283
284 Ok(Self {
285 config: Arc::new(Config {
286 root: absolute_path_to_url(path)?,
287 }),
288 automatic_cleanup: false,
289 })
290 }
291
292 pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
294 ensure!(
295 is_valid_file_path(location),
296 InvalidPathSnafu {
297 path: location.as_ref()
298 }
299 );
300 let path = self.config.prefix_to_filesystem(location)?;
301
302 #[cfg(target_os = "windows")]
303 let path = {
304 let path = path.to_string_lossy();
305
306 let mut out = String::new();
308 let drive = &path[..2]; let filepath = &path[2..].replace(':', "%3A"); out.push_str(drive);
311 out.push_str(filepath);
312 PathBuf::from(out)
313 };
314
315 Ok(path)
316 }
317
318 pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
320 self.automatic_cleanup = automatic_cleanup;
321 self
322 }
323}
324
325impl Config {
326 fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
328 let mut url = self.root.clone();
329 url.path_segments_mut()
330 .expect("url path")
331 .pop_if_empty()
334 .extend(location.parts());
335
336 url.to_file_path()
337 .map_err(|_| Error::InvalidUrl { url }.into())
338 }
339
340 fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
342 Ok(Path::from_absolute_path_with_base(
343 location,
344 Some(&self.root),
345 )?)
346 }
347}
348
349fn is_valid_file_path(path: &Path) -> bool {
350 match path.filename() {
351 Some(p) => match p.split_once('#') {
352 Some((_, suffix)) if !suffix.is_empty() => {
353 !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
355 }
356 _ => true,
357 },
358 None => false,
359 }
360}
361
362#[async_trait]
363impl ObjectStore for LocalFileSystem {
364 async fn put_opts(
365 &self,
366 location: &Path,
367 payload: PutPayload,
368 opts: PutOptions,
369 ) -> Result<PutResult> {
370 if matches!(opts.mode, PutMode::Update(_)) {
371 return Err(crate::Error::NotImplemented);
372 }
373
374 if !opts.attributes.is_empty() {
375 return Err(crate::Error::NotImplemented);
376 }
377
378 let path = self.path_to_filesystem(location)?;
379 maybe_spawn_blocking(move || {
380 let (mut file, staging_path) = new_staged_upload(&path)?;
381 let mut e_tag = None;
382
383 let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
384 Ok(_) => {
385 let metadata = file.metadata().map_err(|e| Error::Metadata {
386 source: e.into(),
387 path: path.to_string_lossy().to_string(),
388 })?;
389 e_tag = Some(get_etag(&metadata));
390 match opts.mode {
391 PutMode::Overwrite => {
392 std::mem::drop(file);
395 match std::fs::rename(&staging_path, &path) {
396 Ok(_) => None,
397 Err(source) => Some(Error::UnableToRenameFile { source }),
398 }
399 }
400 PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
401 Ok(_) => {
402 let _ = std::fs::remove_file(&staging_path); None
404 }
405 Err(source) => match source.kind() {
406 ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
407 path: path.to_str().unwrap().to_string(),
408 source,
409 }),
410 _ => Some(Error::UnableToRenameFile { source }),
411 },
412 },
413 PutMode::Update(_) => unreachable!(),
414 }
415 }
416 Err(source) => Some(Error::UnableToCopyDataToFile { source }),
417 };
418
419 if let Some(err) = err {
420 let _ = std::fs::remove_file(&staging_path); return Err(err.into());
422 }
423
424 Ok(PutResult {
425 e_tag,
426 version: None,
427 })
428 })
429 .await
430 }
431
432 async fn put_multipart_opts(
433 &self,
434 location: &Path,
435 opts: PutMultipartOpts,
436 ) -> Result<Box<dyn MultipartUpload>> {
437 if !opts.attributes.is_empty() {
438 return Err(crate::Error::NotImplemented);
439 }
440
441 let dest = self.path_to_filesystem(location)?;
442 let (file, src) = new_staged_upload(&dest)?;
443 Ok(Box::new(LocalUpload::new(src, dest, file)))
444 }
445
446 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
447 let location = location.clone();
448 let path = self.path_to_filesystem(&location)?;
449 maybe_spawn_blocking(move || {
450 let (file, metadata) = open_file(&path)?;
451 let meta = convert_metadata(metadata, location)?;
452 options.check_preconditions(&meta)?;
453
454 let range = match options.range {
455 Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
456 None => 0..meta.size,
457 };
458
459 Ok(GetResult {
460 payload: GetResultPayload::File(file, path),
461 attributes: Attributes::default(),
462 range,
463 meta,
464 })
465 })
466 .await
467 }
468
469 async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
470 let path = self.path_to_filesystem(location)?;
471 maybe_spawn_blocking(move || {
472 let (mut file, _) = open_file(&path)?;
473 read_range(&mut file, &path, range)
474 })
475 .await
476 }
477
478 async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
479 let path = self.path_to_filesystem(location)?;
480 let ranges = ranges.to_vec();
481 maybe_spawn_blocking(move || {
482 let (mut file, _) = open_file(&path)?;
484 ranges
485 .into_iter()
486 .map(|r| read_range(&mut file, &path, r))
487 .collect()
488 })
489 .await
490 }
491
492 async fn delete(&self, location: &Path) -> Result<()> {
493 let config = Arc::clone(&self.config);
494 let path = self.path_to_filesystem(location)?;
495 let automactic_cleanup = self.automatic_cleanup;
496 maybe_spawn_blocking(move || {
497 if let Err(e) = std::fs::remove_file(&path) {
498 Err(match e.kind() {
499 ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
500 _ => Error::UnableToDeleteFile { path, source: e }.into(),
501 })
502 } else if automactic_cleanup {
503 let root = &config.root;
504 let root = root
505 .to_file_path()
506 .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
507
508 let mut parent = path.parent();
510
511 while let Some(loc) = parent {
512 if loc != root && std::fs::remove_dir(loc).is_ok() {
513 parent = loc.parent();
514 } else {
515 break;
516 }
517 }
518
519 Ok(())
520 } else {
521 Ok(())
522 }
523 })
524 .await
525 }
526
527 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
528 let config = Arc::clone(&self.config);
529
530 let root_path = match prefix {
531 Some(prefix) => match config.prefix_to_filesystem(prefix) {
532 Ok(path) => path,
533 Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
534 },
535 None => self.config.root.to_file_path().unwrap(),
536 };
537
538 let walkdir = WalkDir::new(root_path)
539 .min_depth(1)
541 .follow_links(true);
542
543 let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
544 let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
545 Ok(entry) => entry,
546 Err(e) => return Some(Err(e)),
547 };
548
549 if !entry.path().is_file() {
550 return None;
551 }
552
553 match config.filesystem_to_path(entry.path()) {
554 Ok(path) => match is_valid_file_path(&path) {
555 true => convert_entry(entry, path).transpose(),
556 false => None,
557 },
558 Err(e) => Some(Err(e)),
559 }
560 });
561
562 if tokio::runtime::Handle::try_current().is_err() {
565 return futures::stream::iter(s).boxed();
566 }
567
568 const CHUNK_SIZE: usize = 1024;
570
571 let buffer = VecDeque::with_capacity(CHUNK_SIZE);
572 futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
573 if buffer.is_empty() {
574 (s, buffer) = tokio::task::spawn_blocking(move || {
575 for _ in 0..CHUNK_SIZE {
576 match s.next() {
577 Some(r) => buffer.push_back(r),
578 None => break,
579 }
580 }
581 (s, buffer)
582 })
583 .await?;
584 }
585
586 match buffer.pop_front() {
587 Some(Err(e)) => Err(e),
588 Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
589 None => Ok(None),
590 }
591 })
592 .boxed()
593 }
594
595 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
596 let config = Arc::clone(&self.config);
597
598 let prefix = prefix.cloned().unwrap_or_default();
599 let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
600
601 maybe_spawn_blocking(move || {
602 let walkdir = WalkDir::new(&resolved_prefix)
603 .min_depth(1)
604 .max_depth(1)
605 .follow_links(true);
606
607 let mut common_prefixes = BTreeSet::new();
608 let mut objects = Vec::new();
609
610 for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
611 if let Some(entry) = entry_res? {
612 let is_directory = entry.file_type().is_dir();
613 let entry_location = config.filesystem_to_path(entry.path())?;
614 if !is_directory && !is_valid_file_path(&entry_location) {
615 continue;
616 }
617
618 let mut parts = match entry_location.prefix_match(&prefix) {
619 Some(parts) => parts,
620 None => continue,
621 };
622
623 let common_prefix = match parts.next() {
624 Some(p) => p,
625 None => continue,
626 };
627
628 drop(parts);
629
630 if is_directory {
631 common_prefixes.insert(prefix.child(common_prefix));
632 } else if let Some(metadata) = convert_entry(entry, entry_location)? {
633 objects.push(metadata);
634 }
635 }
636 }
637
638 Ok(ListResult {
639 common_prefixes: common_prefixes.into_iter().collect(),
640 objects,
641 })
642 })
643 .await
644 }
645
646 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
647 let from = self.path_to_filesystem(from)?;
648 let to = self.path_to_filesystem(to)?;
649 let mut id = 0;
650 maybe_spawn_blocking(move || loop {
657 let staged = staged_upload_path(&to, &id.to_string());
658 match std::fs::hard_link(&from, &staged) {
659 Ok(_) => {
660 return std::fs::rename(&staged, &to).map_err(|source| {
661 let _ = std::fs::remove_file(&staged); Error::UnableToCopyFile { from, to, source }.into()
663 });
664 }
665 Err(source) => match source.kind() {
666 ErrorKind::AlreadyExists => id += 1,
667 ErrorKind::NotFound => match from.exists() {
668 true => create_parent_dirs(&to, source)?,
669 false => return Err(Error::NotFound { path: from, source }.into()),
670 },
671 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
672 },
673 }
674 })
675 .await
676 }
677
678 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
679 let from = self.path_to_filesystem(from)?;
680 let to = self.path_to_filesystem(to)?;
681 maybe_spawn_blocking(move || loop {
682 match std::fs::rename(&from, &to) {
683 Ok(_) => return Ok(()),
684 Err(source) => match source.kind() {
685 ErrorKind::NotFound => match from.exists() {
686 true => create_parent_dirs(&to, source)?,
687 false => return Err(Error::NotFound { path: from, source }.into()),
688 },
689 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
690 },
691 }
692 })
693 .await
694 }
695
696 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
697 let from = self.path_to_filesystem(from)?;
698 let to = self.path_to_filesystem(to)?;
699
700 maybe_spawn_blocking(move || loop {
701 match std::fs::hard_link(&from, &to) {
702 Ok(_) => return Ok(()),
703 Err(source) => match source.kind() {
704 ErrorKind::AlreadyExists => {
705 return Err(Error::AlreadyExists {
706 path: to.to_str().unwrap().to_string(),
707 source,
708 }
709 .into())
710 }
711 ErrorKind::NotFound => match from.exists() {
712 true => create_parent_dirs(&to, source)?,
713 false => return Err(Error::NotFound { path: from, source }.into()),
714 },
715 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
716 },
717 }
718 })
719 .await
720 }
721}
722
723fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
725 let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
726 path: path.to_path_buf(),
727 source,
728 })?;
729
730 std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
731 Ok(())
732}
733
734fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
738 let mut multipart_id = 1;
739 loop {
740 let suffix = multipart_id.to_string();
741 let path = staged_upload_path(base, &suffix);
742 let mut options = OpenOptions::new();
743 match options.read(true).write(true).create_new(true).open(&path) {
744 Ok(f) => return Ok((f, path)),
745 Err(source) => match source.kind() {
746 ErrorKind::AlreadyExists => multipart_id += 1,
747 ErrorKind::NotFound => create_parent_dirs(&path, source)?,
748 _ => return Err(Error::UnableToOpenFile { source, path }.into()),
749 },
750 }
751 }
752}
753
754fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
756 let mut staging_path = dest.as_os_str().to_owned();
757 staging_path.push("#");
758 staging_path.push(suffix);
759 staging_path.into()
760}
761
762#[derive(Debug)]
763struct LocalUpload {
764 state: Arc<UploadState>,
766 src: Option<PathBuf>,
768 offset: u64,
770}
771
772#[derive(Debug)]
773struct UploadState {
774 dest: PathBuf,
775 file: Mutex<File>,
776}
777
778impl LocalUpload {
779 pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
780 Self {
781 state: Arc::new(UploadState {
782 dest,
783 file: Mutex::new(file),
784 }),
785 src: Some(src),
786 offset: 0,
787 }
788 }
789}
790
791#[async_trait]
792impl MultipartUpload for LocalUpload {
793 fn put_part(&mut self, data: PutPayload) -> UploadPart {
794 let offset = self.offset;
795 self.offset += data.content_length() as u64;
796
797 let s = Arc::clone(&self.state);
798 maybe_spawn_blocking(move || {
799 let mut file = s.file.lock();
800 file.seek(SeekFrom::Start(offset))
801 .context(SeekSnafu { path: &s.dest })?;
802
803 data.iter()
804 .try_for_each(|x| file.write_all(x))
805 .context(UnableToCopyDataToFileSnafu)?;
806
807 Ok(())
808 })
809 .boxed()
810 }
811
812 async fn complete(&mut self) -> Result<PutResult> {
813 let src = self.src.take().context(AbortedSnafu)?;
814 let s = Arc::clone(&self.state);
815 maybe_spawn_blocking(move || {
816 let file = s.file.lock();
818 std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?;
819 let metadata = file.metadata().map_err(|e| Error::Metadata {
820 source: e.into(),
821 path: src.to_string_lossy().to_string(),
822 })?;
823
824 Ok(PutResult {
825 e_tag: Some(get_etag(&metadata)),
826 version: None,
827 })
828 })
829 .await
830 }
831
832 async fn abort(&mut self) -> Result<()> {
833 let src = self.src.take().context(AbortedSnafu)?;
834 maybe_spawn_blocking(move || {
835 std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: &src })?;
836 Ok(())
837 })
838 .await
839 }
840}
841
842impl Drop for LocalUpload {
843 fn drop(&mut self) {
844 if let Some(src) = self.src.take() {
845 match tokio::runtime::Handle::try_current() {
847 Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
848 Err(_) => drop(std::fs::remove_file(src)),
849 };
850 }
851 }
852}
853
854pub(crate) fn chunked_stream(
855 mut file: File,
856 path: PathBuf,
857 range: Range<usize>,
858 chunk_size: usize,
859) -> BoxStream<'static, Result<Bytes, super::Error>> {
860 futures::stream::once(async move {
861 let (file, path) = maybe_spawn_blocking(move || {
862 file.seek(SeekFrom::Start(range.start as _))
863 .map_err(|source| Error::Seek {
864 source,
865 path: path.clone(),
866 })?;
867 Ok((file, path))
868 })
869 .await?;
870
871 let stream = futures::stream::try_unfold(
872 (file, path, range.end - range.start),
873 move |(mut file, path, remaining)| {
874 maybe_spawn_blocking(move || {
875 if remaining == 0 {
876 return Ok(None);
877 }
878
879 let to_read = remaining.min(chunk_size);
880 let mut buffer = Vec::with_capacity(to_read);
881 let read = (&mut file)
882 .take(to_read as u64)
883 .read_to_end(&mut buffer)
884 .map_err(|e| Error::UnableToReadBytes {
885 source: e,
886 path: path.clone(),
887 })?;
888
889 Ok(Some((buffer.into(), (file, path, remaining - read))))
890 })
891 },
892 );
893 Ok::<_, super::Error>(stream)
894 })
895 .try_flatten()
896 .boxed()
897}
898
899pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
900 let to_read = range.end - range.start;
901 file.seek(SeekFrom::Start(range.start as u64))
902 .context(SeekSnafu { path })?;
903
904 let mut buf = Vec::with_capacity(to_read);
905 let read = file
906 .take(to_read as u64)
907 .read_to_end(&mut buf)
908 .context(UnableToReadBytesSnafu { path })?;
909
910 ensure!(
911 read == to_read,
912 OutOfRangeSnafu {
913 path,
914 expected: to_read,
915 actual: read
916 }
917 );
918 Ok(buf.into())
919}
920
921fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
922 let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
923 Err(e) => Err(match e.kind() {
924 ErrorKind::NotFound => Error::NotFound {
925 path: path.clone(),
926 source: e,
927 },
928 _ => Error::UnableToOpenFile {
929 path: path.clone(),
930 source: e,
931 },
932 }),
933 Ok((metadata, file)) => match !metadata.is_dir() {
934 true => Ok((file, metadata)),
935 false => Err(Error::NotFound {
936 path: path.clone(),
937 source: io::Error::new(ErrorKind::NotFound, "is directory"),
938 }),
939 },
940 }?;
941 Ok(ret)
942}
943
944fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
945 match entry.metadata() {
946 Ok(metadata) => convert_metadata(metadata, location).map(Some),
947 Err(e) => {
948 if let Some(io_err) = e.io_error() {
949 if io_err.kind() == ErrorKind::NotFound {
950 return Ok(None);
951 }
952 }
953 Err(Error::Metadata {
954 source: e.into(),
955 path: location.to_string(),
956 })?
957 }
958 }
959}
960
961fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
962 metadata
963 .modified()
964 .expect("Modified file time should be supported on this platform")
965 .into()
966}
967
968fn get_etag(metadata: &Metadata) -> String {
969 let inode = get_inode(metadata);
970 let size = metadata.len();
971 let mtime = metadata
972 .modified()
973 .ok()
974 .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
975 .unwrap_or_default()
976 .as_micros();
977
978 format!("{inode:x}-{mtime:x}-{size:x}")
982}
983
984fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
985 let last_modified = last_modified(&metadata);
986 let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
987 path: location.as_ref(),
988 })?;
989
990 Ok(ObjectMeta {
991 location,
992 last_modified,
993 size,
994 e_tag: Some(get_etag(&metadata)),
995 version: None,
996 })
997}
998
999#[cfg(unix)]
1000fn get_inode(metadata: &Metadata) -> u64 {
1003 std::os::unix::fs::MetadataExt::ino(metadata)
1004}
1005
1006#[cfg(not(unix))]
1007fn get_inode(metadata: &Metadata) -> u64 {
1009 0
1010}
1011
1012fn convert_walkdir_result(
1015 res: std::result::Result<DirEntry, walkdir::Error>,
1016) -> Result<Option<DirEntry>> {
1017 match res {
1018 Ok(entry) => {
1019 match symlink_metadata(entry.path()) {
1022 Ok(attr) => {
1023 if attr.is_symlink() {
1024 let target_metadata = metadata(entry.path());
1025 match target_metadata {
1026 Ok(_) => {
1027 Ok(Some(entry))
1029 }
1030 Err(_) => {
1031 Ok(None)
1033 }
1034 }
1035 } else {
1036 Ok(Some(entry))
1037 }
1038 }
1039 Err(_) => Ok(None),
1040 }
1041 }
1042
1043 Err(walkdir_err) => match walkdir_err.io_error() {
1044 Some(io_err) => match io_err.kind() {
1045 ErrorKind::NotFound => Ok(None),
1046 _ => Err(Error::UnableToWalkDir {
1047 source: walkdir_err,
1048 }
1049 .into()),
1050 },
1051 None => Err(Error::UnableToWalkDir {
1052 source: walkdir_err,
1053 }
1054 .into()),
1055 },
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use std::fs;
1062
1063 use futures::TryStreamExt;
1064 use tempfile::{NamedTempFile, TempDir};
1065
1066 use crate::integration::*;
1067
1068 use super::*;
1069
1070 #[tokio::test]
1071 #[cfg(target_family = "unix")]
1072 async fn file_test() {
1073 let root = TempDir::new().unwrap();
1074 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1075
1076 put_get_delete_list(&integration).await;
1077 get_opts(&integration).await;
1078 list_uses_directories_correctly(&integration).await;
1079 list_with_delimiter(&integration).await;
1080 rename_and_copy(&integration).await;
1081 copy_if_not_exists(&integration).await;
1082 copy_rename_nonexistent_object(&integration).await;
1083 stream_get(&integration).await;
1084 put_opts(&integration, false).await;
1085 }
1086
1087 #[test]
1088 #[cfg(target_family = "unix")]
1089 fn test_non_tokio() {
1090 let root = TempDir::new().unwrap();
1091 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1092 futures::executor::block_on(async move {
1093 put_get_delete_list(&integration).await;
1094 list_uses_directories_correctly(&integration).await;
1095 list_with_delimiter(&integration).await;
1096
1097 let p = Path::from("manual_upload");
1099 let mut upload = integration.put_multipart(&p).await.unwrap();
1100 upload.put_part("123".into()).await.unwrap();
1101 upload.put_part("45678".into()).await.unwrap();
1102 let r = upload.complete().await.unwrap();
1103
1104 let get = integration.get(&p).await.unwrap();
1105 assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1106 let actual = get.bytes().await.unwrap();
1107 assert_eq!(actual.as_ref(), b"12345678");
1108 });
1109 }
1110
1111 #[tokio::test]
1112 async fn creates_dir_if_not_present() {
1113 let root = TempDir::new().unwrap();
1114 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1115
1116 let location = Path::from("nested/file/test_file");
1117
1118 let data = Bytes::from("arbitrary data");
1119
1120 integration
1121 .put(&location, data.clone().into())
1122 .await
1123 .unwrap();
1124
1125 let read_data = integration
1126 .get(&location)
1127 .await
1128 .unwrap()
1129 .bytes()
1130 .await
1131 .unwrap();
1132 assert_eq!(&*read_data, data);
1133 }
1134
1135 #[tokio::test]
1136 async fn unknown_length() {
1137 let root = TempDir::new().unwrap();
1138 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1139
1140 let location = Path::from("some_file");
1141
1142 let data = Bytes::from("arbitrary data");
1143
1144 integration
1145 .put(&location, data.clone().into())
1146 .await
1147 .unwrap();
1148
1149 let read_data = integration
1150 .get(&location)
1151 .await
1152 .unwrap()
1153 .bytes()
1154 .await
1155 .unwrap();
1156 assert_eq!(&*read_data, data);
1157 }
1158
1159 #[tokio::test]
1160 #[cfg(target_family = "unix")]
1161 #[ignore]
1163 async fn bubble_up_io_errors() {
1164 use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1165
1166 let root = TempDir::new().unwrap();
1167
1168 let metadata = root.path().metadata().unwrap();
1170 let mut permissions = metadata.permissions();
1171 permissions.set_mode(0o000);
1172 set_permissions(root.path(), permissions).unwrap();
1173
1174 let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1175
1176 let mut stream = store.list(None);
1177 let mut any_err = false;
1178 while let Some(res) = stream.next().await {
1179 if res.is_err() {
1180 any_err = true;
1181 }
1182 }
1183 assert!(any_err);
1184
1185 assert!(store.list_with_delimiter(None).await.is_err());
1187 }
1188
1189 const NON_EXISTENT_NAME: &str = "nonexistentname";
1190
1191 #[tokio::test]
1192 async fn get_nonexistent_location() {
1193 let root = TempDir::new().unwrap();
1194 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1195
1196 let location = Path::from(NON_EXISTENT_NAME);
1197
1198 let err = get_nonexistent_object(&integration, Some(location))
1199 .await
1200 .unwrap_err();
1201 if let crate::Error::NotFound { path, source } = err {
1202 let source_variant = source.downcast_ref::<std::io::Error>();
1203 assert!(
1204 matches!(source_variant, Some(std::io::Error { .. }),),
1205 "got: {source_variant:?}"
1206 );
1207 assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1208 } else {
1209 panic!("unexpected error type: {err:?}");
1210 }
1211 }
1212
1213 #[tokio::test]
1214 async fn root() {
1215 let integration = LocalFileSystem::new();
1216
1217 let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1218 let url = Url::from_directory_path(&canonical).unwrap();
1219 let path = Path::parse(url.path()).unwrap();
1220
1221 let roundtrip = integration.path_to_filesystem(&path).unwrap();
1222
1223 let roundtrip = roundtrip.canonicalize().unwrap();
1226
1227 assert_eq!(roundtrip, canonical);
1228
1229 integration.head(&path).await.unwrap();
1230 }
1231
1232 #[tokio::test]
1233 #[cfg(target_family = "windows")]
1234 async fn test_list_root() {
1235 let fs = LocalFileSystem::new();
1236 let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1237
1238 assert!(
1239 r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1240 "{}",
1241 r
1242 );
1243 }
1244
1245 #[tokio::test]
1246 #[cfg(target_os = "linux")]
1247 async fn test_list_root() {
1248 let fs = LocalFileSystem::new();
1249 fs.list_with_delimiter(None).await.unwrap();
1250 }
1251
1252 async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1253 let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1254
1255 let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1256 strings.sort_unstable();
1257 assert_eq!(&strings, expected)
1258 }
1259
1260 #[tokio::test]
1261 #[cfg(target_family = "unix")]
1262 async fn test_symlink() {
1263 let root = TempDir::new().unwrap();
1264 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1265
1266 let subdir = root.path().join("a");
1267 std::fs::create_dir(&subdir).unwrap();
1268 let file = subdir.join("file.parquet");
1269 std::fs::write(file, "test").unwrap();
1270
1271 check_list(&integration, None, &["a/file.parquet"]).await;
1272 integration
1273 .head(&Path::from("a/file.parquet"))
1274 .await
1275 .unwrap();
1276
1277 let other = NamedTempFile::new().unwrap();
1279 std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1280
1281 check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1283
1284 integration.head(&Path::from("test.parquet")).await.unwrap();
1286
1287 std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1289 check_list(
1290 &integration,
1291 None,
1292 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1293 )
1294 .await;
1295 check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1296
1297 integration
1299 .head(&Path::from("b/file.parquet"))
1300 .await
1301 .unwrap();
1302
1303 std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1305
1306 check_list(
1307 &integration,
1308 None,
1309 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1310 )
1311 .await;
1312
1313 let mut r = integration.list_with_delimiter(None).await.unwrap();
1314 r.common_prefixes.sort_unstable();
1315 assert_eq!(r.common_prefixes.len(), 2);
1316 assert_eq!(r.common_prefixes[0].as_ref(), "a");
1317 assert_eq!(r.common_prefixes[1].as_ref(), "b");
1318 assert_eq!(r.objects.len(), 1);
1319 assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1320
1321 let r = integration
1322 .list_with_delimiter(Some(&Path::from("a")))
1323 .await
1324 .unwrap();
1325 assert_eq!(r.common_prefixes.len(), 0);
1326 assert_eq!(r.objects.len(), 1);
1327 assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1328
1329 integration
1331 .delete(&Path::from("test.parquet"))
1332 .await
1333 .unwrap();
1334 assert!(other.path().exists());
1335
1336 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1337
1338 integration
1340 .delete(&Path::from("b/file.parquet"))
1341 .await
1342 .unwrap();
1343
1344 check_list(&integration, None, &[]).await;
1345
1346 integration
1348 .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1349 .await
1350 .unwrap();
1351
1352 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1353 }
1354
1355 #[tokio::test]
1356 async fn invalid_path() {
1357 let root = TempDir::new().unwrap();
1358 let root = root.path().join("🙀");
1359 std::fs::create_dir(root.clone()).unwrap();
1360
1361 let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1363
1364 let directory = Path::from("directory");
1365 let object = directory.child("child.txt");
1366 let data = Bytes::from("arbitrary");
1367 integration.put(&object, data.clone().into()).await.unwrap();
1368 integration.head(&object).await.unwrap();
1369 let result = integration.get(&object).await.unwrap();
1370 assert_eq!(result.bytes().await.unwrap(), data);
1371
1372 flatten_list_stream(&integration, None).await.unwrap();
1373 flatten_list_stream(&integration, Some(&directory))
1374 .await
1375 .unwrap();
1376
1377 let result = integration
1378 .list_with_delimiter(Some(&directory))
1379 .await
1380 .unwrap();
1381 assert_eq!(result.objects.len(), 1);
1382 assert!(result.common_prefixes.is_empty());
1383 assert_eq!(result.objects[0].location, object);
1384
1385 let emoji = root.join("💀");
1386 std::fs::write(emoji, "foo").unwrap();
1387
1388 let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1390 paths.sort_unstable();
1391
1392 assert_eq!(
1393 paths,
1394 vec![
1395 Path::parse("directory/child.txt").unwrap(),
1396 Path::parse("💀").unwrap()
1397 ]
1398 );
1399 }
1400
1401 #[tokio::test]
1402 async fn list_hides_incomplete_uploads() {
1403 let root = TempDir::new().unwrap();
1404 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1405 let location = Path::from("some_file");
1406
1407 let data = PutPayload::from("arbitrary data");
1408 let mut u1 = integration.put_multipart(&location).await.unwrap();
1409 u1.put_part(data.clone()).await.unwrap();
1410
1411 let mut u2 = integration.put_multipart(&location).await.unwrap();
1412 u2.put_part(data).await.unwrap();
1413
1414 let list = flatten_list_stream(&integration, None).await.unwrap();
1415 assert_eq!(list.len(), 0);
1416
1417 assert_eq!(
1418 integration
1419 .list_with_delimiter(None)
1420 .await
1421 .unwrap()
1422 .objects
1423 .len(),
1424 0
1425 );
1426 }
1427
1428 #[tokio::test]
1429 async fn filesystem_filename_with_percent() {
1430 let temp_dir = TempDir::new().unwrap();
1431 let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1432 let filename = "L%3ABC.parquet";
1433
1434 std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1435
1436 let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1437 assert_eq!(res.len(), 1);
1438 assert_eq!(res[0].location.as_ref(), filename);
1439
1440 let res = integration.list_with_delimiter(None).await.unwrap();
1441 assert_eq!(res.objects.len(), 1);
1442 assert_eq!(res.objects[0].location.as_ref(), filename);
1443 }
1444
1445 #[tokio::test]
1446 async fn relative_paths() {
1447 LocalFileSystem::new_with_prefix(".").unwrap();
1448 LocalFileSystem::new_with_prefix("..").unwrap();
1449 LocalFileSystem::new_with_prefix("../..").unwrap();
1450
1451 let integration = LocalFileSystem::new();
1452 let path = Path::from_filesystem_path(".").unwrap();
1453 integration.list_with_delimiter(Some(&path)).await.unwrap();
1454 }
1455
1456 #[test]
1457 fn test_valid_path() {
1458 let cases = [
1459 ("foo#123/test.txt", true),
1460 ("foo#123/test#23.txt", true),
1461 ("foo#123/test#34", false),
1462 ("foo😁/test#34", false),
1463 ("foo/test#😁34", true),
1464 ];
1465
1466 for (case, expected) in cases {
1467 let path = Path::parse(case).unwrap();
1468 assert_eq!(is_valid_file_path(&path), expected);
1469 }
1470 }
1471
1472 #[tokio::test]
1473 async fn test_intermediate_files() {
1474 let root = TempDir::new().unwrap();
1475 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1476
1477 let a = Path::parse("foo#123/test.txt").unwrap();
1478 integration.put(&a, "test".into()).await.unwrap();
1479
1480 let list = flatten_list_stream(&integration, None).await.unwrap();
1481 assert_eq!(list, vec![a.clone()]);
1482
1483 std::fs::write(root.path().join("bar#123"), "test").unwrap();
1484
1485 let list = flatten_list_stream(&integration, None).await.unwrap();
1487 assert_eq!(list, vec![a.clone()]);
1488
1489 let b = Path::parse("bar#123").unwrap();
1490 let err = integration.get(&b).await.unwrap_err().to_string();
1491 assert_eq!(err, "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123");
1492
1493 let c = Path::parse("foo#123.txt").unwrap();
1494 integration.put(&c, "test".into()).await.unwrap();
1495
1496 let mut list = flatten_list_stream(&integration, None).await.unwrap();
1497 list.sort_unstable();
1498 assert_eq!(list, vec![c, a]);
1499 }
1500
1501 #[tokio::test]
1502 #[cfg(target_os = "windows")]
1503 async fn filesystem_filename_with_colon() {
1504 let root = TempDir::new().unwrap();
1505 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1506 let path = Path::parse("file%3Aname.parquet").unwrap();
1507 let location = Path::parse("file:name.parquet").unwrap();
1508
1509 integration.put(&location, "test".into()).await.unwrap();
1510 let list = flatten_list_stream(&integration, None).await.unwrap();
1511 assert_eq!(list, vec![path.clone()]);
1512
1513 let result = integration
1514 .get(&location)
1515 .await
1516 .unwrap()
1517 .bytes()
1518 .await
1519 .unwrap();
1520 assert_eq!(result, Bytes::from("test"));
1521 }
1522
1523 #[tokio::test]
1524 async fn delete_dirs_automatically() {
1525 let root = TempDir::new().unwrap();
1526 let integration = LocalFileSystem::new_with_prefix(root.path())
1527 .unwrap()
1528 .with_automatic_cleanup(true);
1529 let location = Path::from("nested/file/test_file");
1530 let data = Bytes::from("arbitrary data");
1531
1532 integration
1533 .put(&location, data.clone().into())
1534 .await
1535 .unwrap();
1536
1537 let read_data = integration
1538 .get(&location)
1539 .await
1540 .unwrap()
1541 .bytes()
1542 .await
1543 .unwrap();
1544
1545 assert_eq!(&*read_data, data);
1546 assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1547 integration.delete(&location).await.unwrap();
1548 assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1549 }
1550}
1551
1552#[cfg(not(target_arch = "wasm32"))]
1553#[cfg(test)]
1554mod not_wasm_tests {
1555 use std::time::Duration;
1556 use tempfile::TempDir;
1557
1558 use crate::local::LocalFileSystem;
1559 use crate::{ObjectStore, Path, PutPayload};
1560
1561 #[tokio::test]
1562 async fn test_cleanup_intermediate_files() {
1563 let root = TempDir::new().unwrap();
1564 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1565
1566 let location = Path::from("some_file");
1567 let data = PutPayload::from_static(b"hello");
1568 let mut upload = integration.put_multipart(&location).await.unwrap();
1569 upload.put_part(data).await.unwrap();
1570
1571 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1572 assert_eq!(file_count, 1);
1573 drop(upload);
1574
1575 for _ in 0..100 {
1576 tokio::time::sleep(Duration::from_millis(1)).await;
1577 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1578 if file_count == 0 {
1579 return;
1580 }
1581 }
1582 panic!("Failed to cleanup file in 100ms")
1583 }
1584}
1585
1586#[cfg(target_family = "unix")]
1587#[cfg(test)]
1588mod unix_test {
1589 use std::fs::OpenOptions;
1590
1591 use nix::sys::stat;
1592 use nix::unistd;
1593 use tempfile::TempDir;
1594
1595 use crate::local::LocalFileSystem;
1596 use crate::{ObjectStore, Path};
1597
1598 #[tokio::test]
1599 async fn test_fifo() {
1600 let filename = "some_file";
1601 let root = TempDir::new().unwrap();
1602 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1603 let path = root.path().join(filename);
1604 unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1605
1606 let spawned =
1608 tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1609
1610 let location = Path::from(filename);
1611 integration.head(&location).await.unwrap();
1612 integration.get(&location).await.unwrap();
1613
1614 spawned.await.unwrap();
1615 }
1616}