1use std::{
2 cmp::min,
3 io,
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use super::RenderError;
10
11use bytes::{BufMut, Bytes};
12
13use nix_compat::nar::writer::sync as nar_writer;
14use snix_castore::Directory;
15use snix_castore::blobservice::{BlobReader, BlobService};
16use snix_castore::directoryservice::{
17 DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
18};
19use snix_castore::{B3Digest, Node};
20
21use futures::FutureExt;
22use futures::TryStreamExt;
23use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
24
25use tokio::io::AsyncSeekExt;
26
27#[derive(Debug)]
28struct BlobRef {
29 digest: B3Digest,
30 size: u64,
31}
32
33#[derive(Debug)]
34enum Data {
35 Literal(Bytes),
36 Blob(BlobRef),
37}
38
39impl Data {
40 pub fn len(&self) -> u64 {
41 match self {
42 Data::Literal(data) => data.len() as u64,
43 Data::Blob(BlobRef { size, .. }) => *size,
44 }
45 }
46}
47
48pub struct Reader<B: BlobService> {
49 segments: Vec<(u64, Data)>,
50 position_bytes: u64,
51 position_index: usize,
52 blob_service: Arc<B>,
53 seeking: bool,
54 current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
55}
56
57fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
61 let segment_size = cur_segment.len();
62 segments.push((*offset, Data::Literal(cur_segment.into())));
63 *offset += segment_size as u64;
64}
65
66fn walk_node(
70 segments: &mut Vec<(u64, Data)>,
71 offset: &mut u64,
72 get_directory: &impl Fn(&B3Digest) -> Directory,
73 node: Node,
74 nar_node: nar_writer::Node<'_, Vec<u8>>,
76) -> Result<(), RenderError> {
77 match node {
78 snix_castore::Node::Symlink { target } => {
79 nar_node
80 .symlink(target.as_ref())
81 .map_err(RenderError::NARWriterError)?;
82 }
83 snix_castore::Node::File {
84 digest,
85 size,
86 executable,
87 } => {
88 let (cur_segment, skip) = nar_node
89 .file_manual_write(executable, size)
90 .map_err(RenderError::NARWriterError)?;
91
92 flush_segment(segments, offset, std::mem::take(cur_segment));
94
95 segments.push((*offset, Data::Blob(BlobRef { digest, size })));
97 *offset += size;
98
99 skip.close(cur_segment)
105 .map_err(RenderError::NARWriterError)?;
106 }
107 snix_castore::Node::Directory { digest, .. } => {
108 let directory = get_directory(&digest);
109
110 let mut nar_node_directory =
112 nar_node.directory().map_err(RenderError::NARWriterError)?;
113
114 for (name, node) in directory.nodes() {
117 let child_node = nar_node_directory
118 .entry(name.as_ref())
119 .map_err(RenderError::NARWriterError)?;
120
121 walk_node(segments, offset, get_directory, node.clone(), child_node)?;
122 }
123
124 nar_node_directory
126 .close()
127 .map_err(RenderError::NARWriterError)?;
128 }
129 }
130 Ok(())
131}
132
133impl<B: BlobService + 'static> Reader<B> {
134 pub async fn new(
142 root_node: Node,
143 blob_service: B,
144 directory_service: impl DirectoryService,
145 ) -> Result<Self, RenderError> {
146 let maybe_directory_closure = match &root_node {
147 Node::Directory { digest, .. } => {
149 let mut closure = DirectoryGraph::with_order(
150 RootToLeavesValidator::new_with_root_digest(digest.clone()),
151 );
152 let mut stream = directory_service.get_recursive(digest);
153 while let Some(dir) = stream
154 .try_next()
155 .await
156 .map_err(|e| RenderError::StoreError(e.into()))?
157 {
158 closure.add(dir).map_err(|e| {
159 RenderError::StoreError(
160 snix_castore::Error::StorageError(e.to_string()).into(),
161 )
162 })?;
163 }
164 Some(closure.validate().map_err(|e| {
165 RenderError::StoreError(snix_castore::Error::StorageError(e.to_string()).into())
166 })?)
167 }
168 Node::File { .. } => None,
170 Node::Symlink { .. } => None,
171 };
172
173 Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
174 }
175
176 pub fn new_with_directory_closure(
182 root_node: Node,
183 blob_service: B,
184 directory_closure: Option<ValidatedDirectoryGraph>,
185 ) -> Result<Self, RenderError> {
186 let directories = directory_closure
187 .map(|directory_closure| {
188 let mut directories: Vec<(B3Digest, Directory)> = vec![];
189 for dir in directory_closure.drain_root_to_leaves() {
190 let digest = dir.digest();
191 let pos = directories
192 .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
193 digest.as_slice()
194 })
195 .expect_err("duplicate directory"); directories.insert(pos, (digest, dir));
197 }
198 directories
199 })
200 .unwrap_or_default();
201
202 let mut segments = vec![];
203 let mut cur_segment: Vec<u8> = vec![];
204 let mut offset = 0;
205
206 let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
207
208 walk_node(
209 &mut segments,
210 &mut offset,
211 &|digest| {
212 directories
213 .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
214 .map(|pos| directories[pos].clone())
215 .expect("missing directory") .1
217 },
218 root_node,
219 nar_node,
220 )?;
221 flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
223
224 Ok(Reader {
225 segments,
226 position_bytes: 0,
227 position_index: 0,
228 blob_service: blob_service.into(),
229 seeking: false,
230 current_blob: TryMaybeDone::Gone,
231 })
232 }
233
234 pub fn stream_len(&self) -> u64 {
235 self.segments
236 .last()
237 .map(|&(off, ref data)| off + data.len())
238 .expect("no segment found")
239 }
240}
241
242impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
243 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
244 let stream_len = Reader::stream_len(&self);
245
246 let this = &mut *self;
247 if this.seeking {
248 return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
249 }
250 this.seeking = true;
251
252 let pos = match pos {
254 io::SeekFrom::Start(n) => n,
255 io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
256 io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
257 };
258
259 let prev_position_bytes = this.position_bytes;
260 let prev_position_index = this.position_index;
261
262 this.position_bytes = min(pos, stream_len);
263 this.position_index = match this
264 .segments
265 .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
266 {
267 Ok(idx) => idx,
268 Err(idx) => idx - 1,
269 };
270
271 let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
272 this.segments.get(this.position_index)
273 else {
274 this.current_blob = TryMaybeDone::Gone;
276 return Ok(());
277 };
278 let offset_in_segment = this.position_bytes - offset;
279
280 if prev_position_bytes == this.position_bytes {
281 } else if prev_position_index == this.position_index {
283 let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
285 this.current_blob = futures::future::try_maybe_done(
286 (async move {
287 let mut reader = Pin::new(&mut prev).take_output().unwrap();
288 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
289 Ok(reader)
290 })
291 .boxed(),
292 );
293 } else {
294 let blob_service = this.blob_service.clone();
296 let digest = digest.clone();
297 this.current_blob = futures::future::try_maybe_done(
298 (async move {
299 let mut reader =
300 blob_service
301 .open_read(&digest)
302 .await?
303 .ok_or(io::Error::new(
304 io::ErrorKind::NotFound,
305 RenderError::BlobNotFound(digest.clone(), Default::default()),
306 ))?;
307 if offset_in_segment != 0 {
308 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
309 }
310 Ok(reader)
311 })
312 .boxed(),
313 );
314 };
315
316 Ok(())
317 }
318 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
319 let this = &mut *self;
320
321 if !this.current_blob.is_terminated() {
322 futures::ready!(this.current_blob.poll_unpin(cx))?;
323 }
324 this.seeking = false;
325
326 Poll::Ready(Ok(this.position_bytes))
327 }
328}
329
330impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
331 fn poll_read(
332 mut self: Pin<&mut Self>,
333 cx: &mut Context,
334 buf: &mut tokio::io::ReadBuf,
335 ) -> Poll<io::Result<()>> {
336 let this = &mut *self;
337
338 let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
339 return Poll::Ready(Ok(())); };
341
342 let prev_read_buf_pos = buf.filled().len();
343 match segment {
344 Data::Literal(data) => {
345 let offset_in_segment = this.position_bytes - offset;
346 let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
347 let remaining_data = data.len() - offset_in_segment;
348 let read_size = std::cmp::min(remaining_data, buf.remaining());
349 buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
350 }
351 Data::Blob(BlobRef { size, .. }) => {
352 futures::ready!(this.current_blob.poll_unpin(cx))?;
353 this.seeking = false;
354 let blob = Pin::new(&mut this.current_blob)
355 .output_mut()
356 .expect("missing blob");
357 futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
358 let read_length = buf.filled().len() - prev_read_buf_pos;
359 let maximum_expected_read_length = (offset + size) - this.position_bytes;
360 let is_eof = read_length == 0;
361 let too_much_returned = read_length as u64 > maximum_expected_read_length;
362 match (is_eof, too_much_returned) {
363 (true, false) => {
364 return Poll::Ready(Err(io::Error::new(
365 io::ErrorKind::UnexpectedEof,
366 "blob short read",
367 )));
368 }
369 (false, true) => {
370 buf.set_filled(prev_read_buf_pos);
371 return Poll::Ready(Err(io::Error::new(
372 io::ErrorKind::InvalidInput,
373 "blob continued to yield data beyond end",
374 )));
375 }
376 _ => {}
377 }
378 }
379 };
380 let new_read_buf_pos = buf.filled().len();
381 this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
382
383 let prev_position_index = this.position_index;
384 while {
385 if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
386 (this.position_bytes - offset) >= segment.len()
387 } else {
388 false
389 }
390 } {
391 this.position_index += 1;
392 }
393 if prev_position_index != this.position_index {
394 let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
395 this.segments.get(this.position_index)
396 else {
397 this.current_blob = TryMaybeDone::Gone;
399 return Poll::Ready(Ok(()));
400 };
401
402 let blob_service = this.blob_service.clone();
404 let digest = digest.clone();
405 this.current_blob = futures::future::try_maybe_done(
406 (async move {
407 let reader = blob_service
408 .open_read(&digest)
409 .await?
410 .ok_or(io::Error::new(
411 io::ErrorKind::NotFound,
412 RenderError::BlobNotFound(digest.clone(), Default::default()),
413 ))?;
414 Ok(reader)
415 })
416 .boxed(),
417 );
418 }
419
420 Poll::Ready(Ok(()))
421 }
422}