1use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::fs::FileType;
6use std::os::unix::ffi::OsStringExt;
7use std::os::unix::fs::MetadataExt;
8use std::os::unix::fs::PermissionsExt;
9use tokio::io::BufReader;
10use tokio_util::io::InspectReader;
11use tracing::{Instrument, info_span, instrument, trace_span};
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13use walkdir::DirEntry;
14use walkdir::WalkDir;
15
16use crate::blobservice::BlobService;
17use crate::directoryservice::DirectoryService;
18use crate::refscan::{ReferenceReader, ReferenceScanner};
19use crate::{B3Digest, Node};
20
21use super::IngestionEntry;
22use super::IngestionError;
23use super::ingest_entries;
24
25#[instrument(
33 skip(blob_service, directory_service, reference_scanner),
34 fields(path),
35 err
36)]
37pub async fn ingest_path<BS, DS, P, P2>(
38 blob_service: BS,
39 directory_service: DS,
40 path: P,
41 reference_scanner: Option<&ReferenceScanner<P2>>,
42) -> Result<Node, IngestionError<Error>>
43where
44 P: AsRef<std::path::Path>,
45 BS: BlobService + Clone,
46 DS: DirectoryService,
47 P2: AsRef<[u8]> + Send + Sync,
48{
49 let iter = WalkDir::new(path.as_ref())
50 .follow_links(false)
51 .follow_root_links(false)
52 .contents_first(true)
53 .into_iter();
54
55 ingest_entries(
56 directory_service,
57 dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner),
58 )
59 .await
60}
61
62pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
70 blob_service: BS,
71 walkdir_direntries: I,
72 root: &'a std::path::Path,
73 reference_scanner: Option<&'a ReferenceScanner<P>>,
74) -> BoxStream<'a, Result<IngestionEntry, Error>>
75where
76 BS: BlobService + Clone + 'a,
77 I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
78 P: AsRef<[u8]> + Send + Sync,
79{
80 let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
81
82 futures::stream::iter(walkdir_direntries)
83 .map(move |x| {
84 let blob_service = blob_service.clone();
85 async move {
86 match x {
87 Ok(dir_entry) => {
88 dir_entry_to_ingestion_entry(
89 blob_service,
90 &dir_entry,
91 prefix,
92 reference_scanner,
93 )
94 .await
95 }
96 Err(e) => Err(Error::Stat(
97 prefix.to_path_buf(),
98 e.into_io_error().expect("walkdir err must be some"),
99 )),
100 }
101 }
102 .instrument(trace_span!("process_walkdir_direntry"))
103 })
104 .buffered(50)
105 .boxed()
106}
107
108pub async fn dir_entry_to_ingestion_entry<BS, P>(
114 blob_service: BS,
115 walkdir_direntry: &DirEntry,
116 prefix: &std::path::Path,
117 reference_scanner: Option<&ReferenceScanner<P>>,
118) -> Result<IngestionEntry, Error>
119where
120 BS: BlobService,
121 P: AsRef<[u8]>,
122{
123 let file_type = walkdir_direntry.file_type();
124
125 let fs_path = walkdir_direntry
126 .path()
127 .strip_prefix(prefix)
128 .expect("Snix bug: failed to strip root path prefix");
129
130 let path = crate::path::PathBuf::from_host_path(fs_path, false)
132 .unwrap_or_else(|e| panic!("Snix bug: walkdir direntry cannot be parsed: {e}"));
133
134 if file_type.is_dir() {
135 Ok(IngestionEntry::Dir { path })
136 } else if file_type.is_symlink() {
137 let target = std::fs::read_link(walkdir_direntry.path())
138 .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e))?
139 .into_os_string()
140 .into_vec();
141
142 if let Some(reference_scanner) = &reference_scanner {
143 reference_scanner.scan(&target);
144 }
145
146 Ok(IngestionEntry::Symlink { path, target })
147 } else if file_type.is_file() {
148 let metadata = walkdir_direntry
149 .metadata()
150 .map_err(|e| Error::Stat(walkdir_direntry.path().to_path_buf(), e.into()))?;
151
152 let digest = upload_blob(blob_service, walkdir_direntry.path(), reference_scanner).await?;
153
154 Ok(IngestionEntry::Regular {
155 path,
156 size: metadata.size(),
157 executable: metadata.permissions().mode() & 64 != 0,
160 digest,
161 })
162 } else {
163 Err(Error::FileType(fs_path.to_path_buf(), file_type))
164 }
165}
166
167#[instrument(skip_all, fields(blob.path=%path.as_ref().display()), err)]
169async fn upload_blob<BS, P>(
170 blob_service: BS,
171 path: impl AsRef<std::path::Path>,
172 reference_scanner: Option<&ReferenceScanner<P>>,
173) -> Result<B3Digest, Error>
174where
175 BS: BlobService,
176 P: AsRef<[u8]>,
177{
178 let progress_span = info_span!("upload_blobs", "indicatif.pb_show" = tracing::field::Empty);
179 progress_span.pb_set_style(&snix_tracing::PB_TRANSFER_STYLE);
180 progress_span.pb_start();
181 progress_span.pb_set_message(&format!("Uploading blob at {:?}", path.as_ref()));
182
183 let file = tokio::fs::File::open(path.as_ref())
184 .await
185 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
186
187 let metadata = file
188 .metadata()
189 .await
190 .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
191
192 progress_span.pb_set_length(metadata.len());
193 let reader = InspectReader::new(file, |d| {
194 progress_span.pb_inc(d.len() as u64);
195 });
196
197 let mut writer = blob_service.open_write().await;
198 let mut reader = BufReader::with_capacity(128 * 1024, reader);
199 if let Some(reference_scanner) = reference_scanner {
200 let mut reader = ReferenceReader::new(reference_scanner, reader);
201 tokio::io::copy_buf(&mut reader, &mut writer)
202 .await
203 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
204 } else {
205 tokio::io::copy_buf(&mut reader, &mut writer)
206 .await
207 .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
208 }
209
210 let digest = writer
211 .close()
212 .await
213 .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
214
215 Ok(digest)
216}
217
218#[derive(Debug, thiserror::Error)]
219pub enum Error {
220 #[error("unsupported file type at {0}: {1:?}")]
221 FileType(std::path::PathBuf, FileType),
222
223 #[error("unable to stat {0}: {1}")]
224 Stat(std::path::PathBuf, std::io::Error),
225
226 #[error("unable to open {0}: {1}")]
227 Open(std::path::PathBuf, std::io::Error),
228
229 #[error("unable to read {0}: {1}")]
230 BlobRead(std::path::PathBuf, std::io::Error),
231
232 #[error("unable to finalize blob {0}: {1}")]
234 BlobFinalize(std::path::PathBuf, std::io::Error),
235}