1use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::Instrument;
12use tracing::info;
13use tracing::{info_span, instrument};
14use tracing_indicatif::span_ext::IndicatifSpanExt;
15use walkdir::DirEntry;
16use walkdir::WalkDir;
17
18use crate::blobservice::BlobService;
19use crate::directoryservice::DirectoryService;
20use crate::refscan::{ReferenceReader, ReferenceScanner};
21use crate::{B3Digest, Node};
22
23use super::IngestionEntry;
24use super::IngestionError;
25use super::ingest_entries;
26
27#[instrument(
35 skip(blob_service, directory_service, reference_scanner),
36 fields(path),
37 err
38)]
39pub async fn ingest_path<BS, DS, P, P2>(
40 blob_service: BS,
41 directory_service: DS,
42 path: P,
43 reference_scanner: Option<&ReferenceScanner<P2>>,
44) -> Result<Node, IngestionError<Error>>
45where
46 P: AsRef<std::path::Path>,
47 BS: BlobService + Clone,
48 DS: DirectoryService,
49 P2: AsRef<[u8]> + Send + Sync,
50{
51 let iter = WalkDir::new(path.as_ref())
52 .follow_links(false)
53 .follow_root_links(false)
54 .contents_first(true)
55 .into_iter();
56
57 ingest_entries(
58 directory_service,
59 dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner),
60 )
61 .await
62}
63
64pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
72 blob_service: BS,
73 walkdir_direntries: I,
74 root: &'a std::path::Path,
75 reference_scanner: Option<&'a ReferenceScanner<P>>,
76) -> BoxStream<'a, Result<IngestionEntry, Error>>
77where
78 BS: BlobService + Clone + 'a,
79 I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
80 P: AsRef<[u8]> + Send + Sync,
81{
82 let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
83
84 futures::stream::iter(walkdir_direntries)
85 .map(move |x| {
86 let blob_service = blob_service.clone();
87 let span = info_span!("process_walkdir_direntry");
88 async move {
89 match x {
90 Ok(dir_entry) => {
91 dir_entry_to_ingestion_entry(
92 blob_service,
93 &dir_entry,
94 prefix,
95 reference_scanner,
96 )
97 .await
98 }
99 Err(e) => Err(Error::Stat(
100 prefix.to_path_buf(),
101 e.into_io_error().expect("walkdir err must be some"),
102 )),
103 }
104 }
105 .instrument(span)
106 })
107 .buffered(50)
108 .boxed()
109}
110
111pub async fn dir_entry_to_ingestion_entry<BS, P>(
117 blob_service: BS,
118 walkdir_direntry: &DirEntry,
119 prefix: &std::path::Path,
120 reference_scanner: Option<&ReferenceScanner<P>>,
121) -> Result<IngestionEntry, Error>
122where
123 BS: BlobService,
124 P: AsRef<[u8]>,
125{
126 let file_type = walkdir_direntry.file_type();
127
128 let fs_path = walkdir_direntry
129 .path()
130 .strip_prefix(prefix)
131 .expect("Snix bug: failed to strip root path prefix");
132
133 let path = crate::path::PathBuf::from_host_path(fs_path, false)
135 .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {e}"));
136
137 if file_type.is_dir() {
138 Ok(IngestionEntry::Dir { path })
139 } else if file_type.is_symlink() {
140 let target = std::fs::read_link(walkdir_direntry.path())
141 .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e))?
142 .into_os_string()
143 .into_vec();
144
145 if let Some(reference_scanner) = &reference_scanner {
146 reference_scanner.scan(&target);
147 }
148
149 Ok(IngestionEntry::Symlink { path, target })
150 } else if file_type.is_file() {
151 let metadata = walkdir_direntry
152 .metadata()
153 .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e.into()))?;
154
155 let digest = upload_blob(blob_service, walkdir_direntry.path(), reference_scanner).await?;
156
157 Ok(IngestionEntry::Regular {
158 path,
159 size: metadata.size(),
160 executable: metadata.permissions().mode() & 64 != 0,
163 digest,
164 })
165 } else {
166 Err(Error::FileType(fs_path.to_path_buf(), file_type))
167 }
168}
169
170#[instrument(skip_all, fields(blob.path=%path.as_ref().display()), err)]
172async fn upload_blob<BS, P>(
173 blob_service: BS,
174 path: impl AsRef<std::path::Path>,
175 reference_scanner: Option<&ReferenceScanner<P>>,
176) -> Result<B3Digest, Error>
177where
178 BS: BlobService,
179 P: AsRef<[u8]>,
180{
181 let progress_span = info_span!("upload_blobs", "indicatif.pb_show" = tracing::field::Empty);
182 progress_span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
183 progress_span.pb_start();
184 progress_span.pb_set_message(&format!("Uploading blob at {:?}", path.as_ref()));
185 let _enter = progress_span.enter();
186
187 let file = tokio::fs::File::open(path.as_ref())
188 .await
189 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
190
191 let metadata = file
192 .metadata()
193 .await
194 .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
195
196 progress_span.pb_set_length(metadata.len());
197 let reader = InspectReader::new(file, |d| {
198 progress_span.pb_inc(d.len() as u64);
199 });
200
201 let mut writer = blob_service.open_write().await;
202 let mut reader = BufReader::with_capacity(1024 * 1024, reader);
203 if let Some(reference_scanner) = reference_scanner {
204 let mut reader = ReferenceReader::new(reference_scanner, reader);
205 tokio::io::copy(&mut reader, &mut writer)
206 .await
207 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
208 } else {
209 tokio::io::copy(&mut reader, &mut writer)
210 .await
211 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
212 }
213
214 let digest = writer
215 .close()
216 .await
217 .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
218
219 drop(_enter);
220
221 Ok(digest)
222}
223
224#[derive(Debug, thiserror::Error)]
225pub enum Error {
226 #[error("unsupported file type at {0}: {1:?}")]
227 FileType(std::path::PathBuf, FileType),
228
229 #[error("unable to stat {0}: {1}")]
230 Stat(std::path::PathBuf, std::io::Error),
231
232 #[error("unable to open {0}: {1}")]
233 Open(std::path::PathBuf, std::io::Error),
234
235 #[error("unable to read {0}: {1}")]
236 BlobRead(std::path::PathBuf, std::io::Error),
237
238 #[error("unable to finalize blob {0}: {1}")]
240 BlobFinalize(std::path::PathBuf, std::io::Error),
241}