Skip to main content

snix_castore/import/
fs.rs

1//! Import from a real filesystem.
2
3use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::{Instrument, info_span, instrument, trace_span};
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13use walkdir::DirEntry;
14use walkdir::WalkDir;
15
16use crate::blobservice::BlobService;
17use crate::directoryservice::DirectoryService;
18use crate::refscan::{ReferenceReader, ReferenceScanner};
19use crate::{B3Digest, Node};
20
21use super::IngestionEntry;
22use super::IngestionError;
23use super::ingest_entries;
24
25/// Ingests the contents at a given path into the snix store, interacting with a [BlobService] and
26/// [DirectoryService]. It returns the root node or an error.
27///
28/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
29///
30/// This function will walk the filesystem using `walkdir` and will consume
31/// `O(#number of entries)` space.
32#[instrument(
33    skip(blob_service, directory_service, reference_scanner),
34    fields(path),
35    err
36)]
37pub async fn ingest_path<BS, DS, P, P2>(
38    blob_service: BS,
39    directory_service: DS,
40    path: P,
41    reference_scanner: Option<&ReferenceScanner<P2>>,
42) -> Result<Node, IngestionError<Error>>
43where
44    P: AsRef<std::path::Path>,
45    BS: BlobService + Clone,
46    DS: DirectoryService,
47    P2: AsRef<[u8]> + Send + Sync,
48{
49    let iter = WalkDir::new(path.as_ref())
50        .follow_links(false)
51        .follow_root_links(false)
52        .contents_first(true)
53        .into_iter();
54
55    ingest_entries(
56        directory_service,
57        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner),
58    )
59    .await
60}
61
62/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
63/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
64///
65/// The produced stream is buffered, so uploads can happen concurrently.
66///
67/// The root is the [std::path::Path] in the filesystem that is being ingested
68/// into castore.
69pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
70    blob_service: BS,
71    walkdir_direntries: I,
72    root: &'a std::path::Path,
73    reference_scanner: Option<&'a ReferenceScanner<P>>,
74) -> BoxStream<'a, Result<IngestionEntry, Error>>
75where
76    BS: BlobService + Clone + 'a,
77    I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
78    P: AsRef<[u8]> + Send + Sync,
79{
80    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
81
82    futures::stream::iter(walkdir_direntries)
83        .map(move |x| {
84            let blob_service = blob_service.clone();
85            async move {
86                match x {
87                    Ok(dir_entry) => {
88                        dir_entry_to_ingestion_entry(
89                            blob_service,
90                            &dir_entry,
91                            prefix,
92                            reference_scanner,
93                        )
94                        .await
95                    }
96                    Err(e) => Err(Error::Stat(
97                        prefix.to_path_buf(),
98                        e.into_io_error().expect("walkdir err must be some"),
99                    )),
100                }
101            }
102            .instrument(trace_span!("process_walkdir_direntry"))
103        })
104        .buffered(50)
105        .boxed()
106}
107
108/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
109/// provided [BlobService].
110///
111/// The prefix path is stripped from the path of each entry. This is usually the parent path
112/// of the path being ingested so that the last element of the stream only has one component.
113pub async fn dir_entry_to_ingestion_entry<BS, P>(
114    blob_service: BS,
115    walkdir_direntry: &DirEntry,
116    prefix: &std::path::Path,
117    reference_scanner: Option<&ReferenceScanner<P>>,
118) -> Result<IngestionEntry, Error>
119where
120    BS: BlobService,
121    P: AsRef<[u8]>,
122{
123    let file_type = walkdir_direntry.file_type();
124
125    let fs_path = walkdir_direntry
126        .path()
127        .strip_prefix(prefix)
128        .expect("Snix bug: failed to strip root path prefix");
129
130    // convert to castore PathBuf
131    let path = crate::path::PathBuf::from_host_path(fs_path, false)
132        .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {e}"));
133
134    if file_type.is_dir() {
135        Ok(IngestionEntry::Dir { path })
136    } else if file_type.is_symlink() {
137        let target = std::fs::read_link(walkdir_direntry.path())
138            .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e))?
139            .into_os_string()
140            .into_vec();
141
142        if let Some(reference_scanner) = &reference_scanner {
143            reference_scanner.scan(&target);
144        }
145
146        Ok(IngestionEntry::Symlink { path, target })
147    } else if file_type.is_file() {
148        let metadata = walkdir_direntry
149            .metadata()
150            .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e.into()))?;
151
152        let digest = upload_blob(blob_service, walkdir_direntry.path(), reference_scanner).await?;
153
154        Ok(IngestionEntry::Regular {
155            path,
156            size: metadata.size(),
157            // If it's executable by the user, it'll become executable.
158            // This matches nix's dump() function behaviour.
159            executable: metadata.permissions().mode() & 64 != 0,
160            digest,
161        })
162    } else {
163        Err(Error::FileType(fs_path.to_path_buf(), file_type))
164    }
165}
166
167/// Uploads the file at the provided [std::path::Path] to the [BlobService].
168#[instrument(skip_all, fields(blob.path=%path.as_ref().display()), err)]
169async fn upload_blob<BS, P>(
170    blob_service: BS,
171    path: impl AsRef<std::path::Path>,
172    reference_scanner: Option<&ReferenceScanner<P>>,
173) -> Result<B3Digest, Error>
174where
175    BS: BlobService,
176    P: AsRef<[u8]>,
177{
178    let progress_span = info_span!("upload_blobs", "indicatif.pb_show" = tracing::field::Empty);
179    progress_span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
180    progress_span.pb_start();
181    progress_span.pb_set_message(&format!("Uploading blob at {:?}", path.as_ref()));
182
183    let file = tokio::fs::File::open(path.as_ref())
184        .await
185        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
186
187    let metadata = file
188        .metadata()
189        .await
190        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
191
192    progress_span.pb_set_length(metadata.len());
193    let reader = InspectReader::new(file, |d| {
194        progress_span.pb_inc(d.len() as u64);
195    });
196
197    let mut writer = blob_service.open_write().await;
198    let mut reader = BufReader::with_capacity(128 * 1024, reader);
199    if let Some(reference_scanner) = reference_scanner {
200        let mut reader = ReferenceReader::new(reference_scanner, reader);
201        tokio::io::copy_buf(&mut reader, &mut writer)
202            .await
203            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
204    } else {
205        tokio::io::copy_buf(&mut reader, &mut writer)
206            .await
207            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
208    }
209
210    let digest = writer
211        .close()
212        .await
213        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
214
215    Ok(digest)
216}
217
218#[derive(Debug, thiserror::Error)]
219pub enum Error {
220    #[error("unsupported file type at {0}: {1:?}")]
221    FileType(std::path::PathBuf, FileType),
222
223    #[error("unable to stat {0}: {1}")]
224    Stat(std::path::PathBuf, std::io::Error),
225
226    #[error("unable to open {0}: {1}")]
227    Open(std::path::PathBuf, std::io::Error),
228
229    #[error("unable to read {0}: {1}")]
230    BlobRead(std::path::PathBuf, std::io::Error),
231
232    // TODO: proper error for blob finalize
233    #[error("unable to finalize blob {0}: {1}")]
234    BlobFinalize(std::path::PathBuf, std::io::Error),
235}