snix_castore/import/
fs.rs

1//! Import from a real filesystem.
2
3use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::Instrument;
12use tracing::info;
13use tracing::{info_span, instrument};
14use tracing_indicatif::span_ext::IndicatifSpanExt;
15use walkdir::DirEntry;
16use walkdir::WalkDir;
17
18use crate::blobservice::BlobService;
19use crate::directoryservice::DirectoryService;
20use crate::refscan::{ReferenceReader, ReferenceScanner};
21use crate::{B3Digest, Node};
22
23use super::IngestionEntry;
24use super::IngestionError;
25use super::ingest_entries;
26
27/// Ingests the contents at a given path into the snix store, interacting with a [BlobService] and
28/// [DirectoryService]. It returns the root node or an error.
29///
30/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
31///
32/// This function will walk the filesystem using `walkdir` and will consume
33/// `O(#number of entries)` space.
34#[instrument(
35    skip(blob_service, directory_service, reference_scanner),
36    fields(path),
37    err
38)]
39pub async fn ingest_path<BS, DS, P, P2>(
40    blob_service: BS,
41    directory_service: DS,
42    path: P,
43    reference_scanner: Option<&ReferenceScanner<P2>>,
44) -> Result<Node, IngestionError<Error>>
45where
46    P: AsRef<std::path::Path>,
47    BS: BlobService + Clone,
48    DS: DirectoryService,
49    P2: AsRef<[u8]> + Send + Sync,
50{
51    let iter = WalkDir::new(path.as_ref())
52        .follow_links(false)
53        .follow_root_links(false)
54        .contents_first(true)
55        .into_iter();
56
57    ingest_entries(
58        directory_service,
59        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner),
60    )
61    .await
62}
63
64/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
65/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
66///
67/// The produced stream is buffered, so uploads can happen concurrently.
68///
69/// The root is the [std::path::Path] in the filesystem that is being ingested
70/// into castore.
71pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
72    blob_service: BS,
73    walkdir_direntries: I,
74    root: &'a std::path::Path,
75    reference_scanner: Option<&'a ReferenceScanner<P>>,
76) -> BoxStream<'a, Result<IngestionEntry, Error>>
77where
78    BS: BlobService + Clone + 'a,
79    I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
80    P: AsRef<[u8]> + Send + Sync,
81{
82    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
83
84    futures::stream::iter(walkdir_direntries)
85        .map(move |x| {
86            let blob_service = blob_service.clone();
87            let span = info_span!("process_walkdir_direntry");
88            async move {
89                match x {
90                    Ok(dir_entry) => {
91                        dir_entry_to_ingestion_entry(
92                            blob_service,
93                            &dir_entry,
94                            prefix,
95                            reference_scanner,
96                        )
97                        .await
98                    }
99                    Err(e) => Err(Error::Stat(
100                        prefix.to_path_buf(),
101                        e.into_io_error().expect("walkdir err must be some"),
102                    )),
103                }
104            }
105            .instrument(span)
106        })
107        .buffered(50)
108        .boxed()
109}
110
111/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
112/// provided [BlobService].
113///
114/// The prefix path is stripped from the path of each entry. This is usually the parent path
115/// of the path being ingested so that the last element of the stream only has one component.
116pub async fn dir_entry_to_ingestion_entry<BS, P>(
117    blob_service: BS,
118    walkdir_direntry: &DirEntry,
119    prefix: &std::path::Path,
120    reference_scanner: Option<&ReferenceScanner<P>>,
121) -> Result<IngestionEntry, Error>
122where
123    BS: BlobService,
124    P: AsRef<[u8]>,
125{
126    let file_type = walkdir_direntry.file_type();
127
128    let fs_path = walkdir_direntry
129        .path()
130        .strip_prefix(prefix)
131        .expect("Snix bug: failed to strip root path prefix");
132
133    // convert to castore PathBuf
134    let path = crate::path::PathBuf::from_host_path(fs_path, false)
135        .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {e}"));
136
137    if file_type.is_dir() {
138        Ok(IngestionEntry::Dir { path })
139    } else if file_type.is_symlink() {
140        let target = std::fs::read_link(walkdir_direntry.path())
141            .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e))?
142            .into_os_string()
143            .into_vec();
144
145        if let Some(reference_scanner) = &reference_scanner {
146            reference_scanner.scan(&target);
147        }
148
149        Ok(IngestionEntry::Symlink { path, target })
150    } else if file_type.is_file() {
151        let metadata = walkdir_direntry
152            .metadata()
153            .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e.into()))?;
154
155        let digest = upload_blob(blob_service, walkdir_direntry.path(), reference_scanner).await?;
156
157        Ok(IngestionEntry::Regular {
158            path,
159            size: metadata.size(),
160            // If it's executable by the user, it'll become executable.
161            // This matches nix's dump() function behaviour.
162            executable: metadata.permissions().mode() & 64 != 0,
163            digest,
164        })
165    } else {
166        Err(Error::FileType(fs_path.to_path_buf(), file_type))
167    }
168}
169
170/// Uploads the file at the provided [std::path::Path] to the [BlobService].
171#[instrument(skip_all, fields(blob.path=%path.as_ref().display()), err)]
172async fn upload_blob<BS, P>(
173    blob_service: BS,
174    path: impl AsRef<std::path::Path>,
175    reference_scanner: Option<&ReferenceScanner<P>>,
176) -> Result<B3Digest, Error>
177where
178    BS: BlobService,
179    P: AsRef<[u8]>,
180{
181    let progress_span = info_span!("upload_blobs", "indicatif.pb_show" = tracing::field::Empty);
182    progress_span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
183    progress_span.pb_start();
184    progress_span.pb_set_message(&format!("Uploading blob at {:?}", path.as_ref()));
185    let _enter = progress_span.enter();
186
187    let file = tokio::fs::File::open(path.as_ref())
188        .await
189        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
190
191    let metadata = file
192        .metadata()
193        .await
194        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
195
196    progress_span.pb_set_length(metadata.len());
197    let reader = InspectReader::new(file, |d| {
198        progress_span.pb_inc(d.len() as u64);
199    });
200
201    let mut writer = blob_service.open_write().await;
202    let mut reader = BufReader::with_capacity(1024 * 1024, reader);
203    if let Some(reference_scanner) = reference_scanner {
204        let mut reader = ReferenceReader::new(reference_scanner, reader);
205        tokio::io::copy(&mut reader, &mut writer)
206            .await
207            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
208    } else {
209        tokio::io::copy(&mut reader, &mut writer)
210            .await
211            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
212    }
213
214    let digest = writer
215        .close()
216        .await
217        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
218
219    drop(_enter);
220
221    Ok(digest)
222}
223
224#[derive(Debug, thiserror::Error)]
225pub enum Error {
226    #[error("unsupported file type at {0}: {1:?}")]
227    FileType(std::path::PathBuf, FileType),
228
229    #[error("unable to stat {0}: {1}")]
230    Stat(std::path::PathBuf, std::io::Error),
231
232    #[error("unable to open {0}: {1}")]
233    Open(std::path::PathBuf, std::io::Error),
234
235    #[error("unable to read {0}: {1}")]
236    BlobRead(std::path::PathBuf, std::io::Error),
237
238    // TODO: proper error for blob finalize
239    #[error("unable to finalize blob {0}: {1}")]
240    BlobFinalize(std::path::PathBuf, std::io::Error),
241}