snix_castore/import/
fs.rs

1//! Import from a real filesystem.
2
3use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::Instrument;
12use tracing::Span;
13use tracing::info_span;
14use tracing::instrument;
15use tracing_indicatif::span_ext::IndicatifSpanExt;
16use walkdir::DirEntry;
17use walkdir::WalkDir;
18
19use crate::blobservice::BlobService;
20use crate::directoryservice::DirectoryService;
21use crate::refscan::{ReferenceReader, ReferenceScanner};
22use crate::{B3Digest, Node};
23
24use super::IngestionEntry;
25use super::IngestionError;
26use super::ingest_entries;
27
28/// Ingests the contents at a given path into the snix store, interacting with a [BlobService] and
29/// [DirectoryService]. It returns the root node or an error.
30///
31/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
32///
33/// This function will walk the filesystem using `walkdir` and will consume
34/// `O(#number of entries)` space.
35#[instrument(
36    skip(blob_service, directory_service, reference_scanner),
37    fields(path),
38    err
39)]
40pub async fn ingest_path<BS, DS, P, P2>(
41    blob_service: BS,
42    directory_service: DS,
43    path: P,
44    reference_scanner: Option<&ReferenceScanner<P2>>,
45) -> Result<Node, IngestionError<Error>>
46where
47    P: AsRef<std::path::Path> + std::fmt::Debug,
48    BS: BlobService + Clone,
49    DS: DirectoryService,
50    P2: AsRef<[u8]> + Send + Sync,
51{
52    let span = Span::current();
53
54    let iter = WalkDir::new(path.as_ref())
55        .follow_links(false)
56        .follow_root_links(false)
57        .contents_first(true)
58        .into_iter();
59
60    let entries =
61        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner);
62    ingest_entries(
63        directory_service,
64        entries.inspect({
65            let span = span.clone();
66            move |e| {
67                if e.is_ok() {
68                    span.pb_inc(1)
69                }
70            }
71        }),
72    )
73    .await
74}
75
76/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
77/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
78///
79/// The produced stream is buffered, so uploads can happen concurrently.
80///
81/// The root is the [std::path::Path] in the filesystem that is being ingested
82/// into castore.
83pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
84    blob_service: BS,
85    iter: I,
86    root: &'a std::path::Path,
87    reference_scanner: Option<&'a ReferenceScanner<P>>,
88) -> BoxStream<'a, Result<IngestionEntry, Error>>
89where
90    BS: BlobService + Clone + 'a,
91    I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
92    P: AsRef<[u8]> + Send + Sync,
93{
94    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
95
96    Box::pin(
97        futures::stream::iter(iter)
98            .map(move |x| {
99                let blob_service = blob_service.clone();
100                async move {
101                    match x {
102                        Ok(dir_entry) => {
103                            dir_entry_to_ingestion_entry(
104                                blob_service,
105                                &dir_entry,
106                                prefix,
107                                reference_scanner,
108                            )
109                            .await
110                        }
111                        Err(e) => Err(Error::Stat(
112                            prefix.to_path_buf(),
113                            e.into_io_error().expect("walkdir err must be some"),
114                        )),
115                    }
116                }
117            })
118            .buffered(50),
119    )
120}
121
122/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
123/// provided [BlobService].
124///
125/// The prefix path is stripped from the path of each entry. This is usually the parent path
126/// of the path being ingested so that the last element of the stream only has one component.
127pub async fn dir_entry_to_ingestion_entry<BS, P>(
128    blob_service: BS,
129    entry: &DirEntry,
130    prefix: &std::path::Path,
131    reference_scanner: Option<&ReferenceScanner<P>>,
132) -> Result<IngestionEntry, Error>
133where
134    BS: BlobService,
135    P: AsRef<[u8]>,
136{
137    let file_type = entry.file_type();
138
139    let fs_path = entry
140        .path()
141        .strip_prefix(prefix)
142        .expect("Snix bug: failed to strip root path prefix");
143
144    // convert to castore PathBuf
145    let path = crate::path::PathBuf::from_host_path(fs_path, false)
146        .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {}", e));
147
148    if file_type.is_dir() {
149        Ok(IngestionEntry::Dir { path })
150    } else if file_type.is_symlink() {
151        let target = std::fs::read_link(entry.path())
152            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
153            .into_os_string()
154            .into_vec();
155
156        if let Some(reference_scanner) = &reference_scanner {
157            reference_scanner.scan(&target);
158        }
159
160        Ok(IngestionEntry::Symlink { path, target })
161    } else if file_type.is_file() {
162        let metadata = entry
163            .metadata()
164            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
165
166        let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
167            .instrument({
168                let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
169                span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
170                span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
171
172                span
173            })
174            .await?;
175
176        Ok(IngestionEntry::Regular {
177            path,
178            size: metadata.size(),
179            // If it's executable by the user, it'll become executable.
180            // This matches nix's dump() function behaviour.
181            executable: metadata.permissions().mode() & 64 != 0,
182            digest,
183        })
184    } else {
185        return Err(Error::FileType(fs_path.to_path_buf(), file_type));
186    }
187}
188
189/// Uploads the file at the provided [std::path::Path] the the [BlobService].
190#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
191async fn upload_blob<BS, P>(
192    blob_service: BS,
193    path: impl AsRef<std::path::Path>,
194    reference_scanner: Option<&ReferenceScanner<P>>,
195) -> Result<B3Digest, Error>
196where
197    BS: BlobService,
198    P: AsRef<[u8]>,
199{
200    let span = Span::current();
201    span.pb_start();
202
203    let file = tokio::fs::File::open(path.as_ref())
204        .await
205        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
206
207    let metadata = file
208        .metadata()
209        .await
210        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
211
212    span.pb_set_length(metadata.len());
213    let reader = InspectReader::new(file, |d| {
214        span.pb_inc(d.len() as u64);
215    });
216
217    let mut writer = blob_service.open_write().await;
218    if let Some(reference_scanner) = reference_scanner {
219        let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader));
220        tokio::io::copy(&mut reader, &mut writer)
221            .await
222            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
223    } else {
224        tokio::io::copy(&mut BufReader::new(reader), &mut writer)
225            .await
226            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
227    }
228
229    let digest = writer
230        .close()
231        .await
232        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
233
234    Ok(digest)
235}
236
237#[derive(Debug, thiserror::Error)]
238pub enum Error {
239    #[error("unsupported file type at {0}: {1:?}")]
240    FileType(std::path::PathBuf, FileType),
241
242    #[error("unable to stat {0}: {1}")]
243    Stat(std::path::PathBuf, std::io::Error),
244
245    #[error("unable to open {0}: {1}")]
246    Open(std::path::PathBuf, std::io::Error),
247
248    #[error("unable to read {0}: {1}")]
249    BlobRead(std::path::PathBuf, std::io::Error),
250
251    // TODO: proper error for blob finalize
252    #[error("unable to finalize blob {0}: {1}")]
253    BlobFinalize(std::path::PathBuf, std::io::Error),
254}