1use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::Instrument;
12use tracing::Span;
13use tracing::info_span;
14use tracing::instrument;
15use tracing_indicatif::span_ext::IndicatifSpanExt;
16use walkdir::DirEntry;
17use walkdir::WalkDir;
18
19use crate::blobservice::BlobService;
20use crate::directoryservice::DirectoryService;
21use crate::refscan::{ReferenceReader, ReferenceScanner};
22use crate::{B3Digest, Node};
23
24use super::IngestionEntry;
25use super::IngestionError;
26use super::ingest_entries;
27
28#[instrument(
36 skip(blob_service, directory_service, reference_scanner),
37 fields(path),
38 err
39)]
40pub async fn ingest_path<BS, DS, P, P2>(
41 blob_service: BS,
42 directory_service: DS,
43 path: P,
44 reference_scanner: Option<&ReferenceScanner<P2>>,
45) -> Result<Node, IngestionError<Error>>
46where
47 P: AsRef<std::path::Path> + std::fmt::Debug,
48 BS: BlobService + Clone,
49 DS: DirectoryService,
50 P2: AsRef<[u8]> + Send + Sync,
51{
52 let span = Span::current();
53
54 let iter = WalkDir::new(path.as_ref())
55 .follow_links(false)
56 .follow_root_links(false)
57 .contents_first(true)
58 .into_iter();
59
60 let entries =
61 dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner);
62 ingest_entries(
63 directory_service,
64 entries.inspect({
65 let span = span.clone();
66 move |e| {
67 if e.is_ok() {
68 span.pb_inc(1)
69 }
70 }
71 }),
72 )
73 .await
74}
75
76pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
84 blob_service: BS,
85 iter: I,
86 root: &'a std::path::Path,
87 reference_scanner: Option<&'a ReferenceScanner<P>>,
88) -> BoxStream<'a, Result<IngestionEntry, Error>>
89where
90 BS: BlobService + Clone + 'a,
91 I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
92 P: AsRef<[u8]> + Send + Sync,
93{
94 let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
95
96 Box::pin(
97 futures::stream::iter(iter)
98 .map(move |x| {
99 let blob_service = blob_service.clone();
100 async move {
101 match x {
102 Ok(dir_entry) => {
103 dir_entry_to_ingestion_entry(
104 blob_service,
105 &dir_entry,
106 prefix,
107 reference_scanner,
108 )
109 .await
110 }
111 Err(e) => Err(Error::Stat(
112 prefix.to_path_buf(),
113 e.into_io_error().expect("walkdir err must be some"),
114 )),
115 }
116 }
117 })
118 .buffered(50),
119 )
120}
121
122pub async fn dir_entry_to_ingestion_entry<BS, P>(
128 blob_service: BS,
129 entry: &DirEntry,
130 prefix: &std::path::Path,
131 reference_scanner: Option<&ReferenceScanner<P>>,
132) -> Result<IngestionEntry, Error>
133where
134 BS: BlobService,
135 P: AsRef<[u8]>,
136{
137 let file_type = entry.file_type();
138
139 let fs_path = entry
140 .path()
141 .strip_prefix(prefix)
142 .expect("Snix bug: failed to strip root path prefix");
143
144 let path = crate::path::PathBuf::from_host_path(fs_path, false)
146 .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {}", e));
147
148 if file_type.is_dir() {
149 Ok(IngestionEntry::Dir { path })
150 } else if file_type.is_symlink() {
151 let target = std::fs::read_link(entry.path())
152 .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
153 .into_os_string()
154 .into_vec();
155
156 if let Some(reference_scanner) = &reference_scanner {
157 reference_scanner.scan(&target);
158 }
159
160 Ok(IngestionEntry::Symlink { path, target })
161 } else if file_type.is_file() {
162 let metadata = entry
163 .metadata()
164 .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
165
166 let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
167 .instrument({
168 let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
169 span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
170 span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
171
172 span
173 })
174 .await?;
175
176 Ok(IngestionEntry::Regular {
177 path,
178 size: metadata.size(),
179 executable: metadata.permissions().mode() & 64 != 0,
182 digest,
183 })
184 } else {
185 return Err(Error::FileType(fs_path.to_path_buf(), file_type));
186 }
187}
188
189#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
191async fn upload_blob<BS, P>(
192 blob_service: BS,
193 path: impl AsRef<std::path::Path>,
194 reference_scanner: Option<&ReferenceScanner<P>>,
195) -> Result<B3Digest, Error>
196where
197 BS: BlobService,
198 P: AsRef<[u8]>,
199{
200 let span = Span::current();
201 span.pb_start();
202
203 let file = tokio::fs::File::open(path.as_ref())
204 .await
205 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
206
207 let metadata = file
208 .metadata()
209 .await
210 .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
211
212 span.pb_set_length(metadata.len());
213 let reader = InspectReader::new(file, |d| {
214 span.pb_inc(d.len() as u64);
215 });
216
217 let mut writer = blob_service.open_write().await;
218 if let Some(reference_scanner) = reference_scanner {
219 let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader));
220 tokio::io::copy(&mut reader, &mut writer)
221 .await
222 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
223 } else {
224 tokio::io::copy(&mut BufReader::new(reader), &mut writer)
225 .await
226 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
227 }
228
229 let digest = writer
230 .close()
231 .await
232 .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
233
234 Ok(digest)
235}
236
237#[derive(Debug, thiserror::Error)]
238pub enum Error {
239 #[error("unsupported file type at {0}: {1:?}")]
240 FileType(std::path::PathBuf, FileType),
241
242 #[error("unable to stat {0}: {1}")]
243 Stat(std::path::PathBuf, std::io::Error),
244
245 #[error("unable to open {0}: {1}")]
246 Open(std::path::PathBuf, std::io::Error),
247
248 #[error("unable to read {0}: {1}")]
249 BlobRead(std::path::PathBuf, std::io::Error),
250
251 #[error("unable to finalize blob {0}: {1}")]
253 BlobFinalize(std::path::PathBuf, std::io::Error),
254}