1use std::{
2 cmp::min,
3 io,
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use super::RenderError;
10
11use bytes::{BufMut, Bytes};
12
13use nix_compat::nar::writer::sync as nar_writer;
14use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
15use snix_castore::{B3Digest, Node};
16use snix_castore::{Directory, directoryservice::DirectoryOrder};
17use snix_castore::{
18 blobservice::{BlobReader, BlobService},
19 directoryservice::DirectoryGraphBuilder,
20};
21
22use futures::FutureExt;
23use futures::TryStreamExt;
24use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
25
26use tokio::io::AsyncSeekExt;
27
28#[derive(Debug)]
29struct BlobRef {
30 digest: B3Digest,
31 size: u64,
32}
33
34#[derive(Debug)]
35enum Data {
36 Literal(Bytes),
37 Blob(BlobRef),
38}
39
40impl Data {
41 pub fn len(&self) -> u64 {
42 match self {
43 Data::Literal(data) => data.len() as u64,
44 Data::Blob(BlobRef { size, .. }) => *size,
45 }
46 }
47}
48
49pub struct Reader<B: BlobService> {
50 segments: Vec<(u64, Data)>,
51 position_bytes: u64,
52 position_index: usize,
53 blob_service: Arc<B>,
54 seeking: bool,
55 current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
56}
57
58fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
62 let segment_size = cur_segment.len();
63 segments.push((*offset, Data::Literal(cur_segment.into())));
64 *offset += segment_size as u64;
65}
66
67fn walk_node(
71 segments: &mut Vec<(u64, Data)>,
72 offset: &mut u64,
73 get_directory: &impl Fn(&B3Digest) -> Directory,
74 node: Node,
75 nar_node: nar_writer::Node<'_, Vec<u8>>,
77) -> Result<(), RenderError> {
78 match node {
79 snix_castore::Node::Symlink { target } => {
80 nar_node
81 .symlink(target.as_ref())
82 .map_err(RenderError::NARWriterError)?;
83 }
84 snix_castore::Node::File {
85 digest,
86 size,
87 executable,
88 } => {
89 let (cur_segment, skip) = nar_node
90 .file_manual_write(executable, size)
91 .map_err(RenderError::NARWriterError)?;
92
93 flush_segment(segments, offset, std::mem::take(cur_segment));
95
96 segments.push((*offset, Data::Blob(BlobRef { digest, size })));
98 *offset += size;
99
100 skip.close(cur_segment)
106 .map_err(RenderError::NARWriterError)?;
107 }
108 snix_castore::Node::Directory { digest, .. } => {
109 let directory = get_directory(&digest);
110
111 let mut nar_node_directory =
113 nar_node.directory().map_err(RenderError::NARWriterError)?;
114
115 for (name, node) in directory.nodes() {
118 let child_node = nar_node_directory
119 .entry(name.as_ref())
120 .map_err(RenderError::NARWriterError)?;
121
122 walk_node(segments, offset, get_directory, node.clone(), child_node)?;
123 }
124
125 nar_node_directory
127 .close()
128 .map_err(RenderError::NARWriterError)?;
129 }
130 }
131 Ok(())
132}
133
134impl<B: BlobService + 'static> Reader<B> {
135 pub async fn new(
143 root_node: Node,
144 blob_service: B,
145 directory_service: impl DirectoryService,
146 ) -> Result<Self, RenderError> {
147 let maybe_directory_closure = match &root_node {
148 Node::Directory { digest, .. } => {
150 let mut builder =
151 DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
152 let mut directories = directory_service.get_recursive(digest);
153 while let Some(dir) = directories
154 .try_next()
155 .await
156 .map_err(|e| RenderError::StoreError(e.into()))?
157 {
158 builder.insert(dir).map_err(|e| {
159 RenderError::StoreError(
160 snix_castore::Error::StorageError(e.to_string()).into(),
161 )
162 })?;
163 }
164
165 Some(builder.build().map_err(|e| {
166 RenderError::StoreError(snix_castore::Error::StorageError(e.to_string()).into())
167 })?)
168 }
169 Node::File { .. } => None,
171 Node::Symlink { .. } => None,
172 };
173
174 Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
175 }
176
177 pub fn new_with_directory_closure(
183 root_node: Node,
184 blob_service: B,
185 directory_closure: Option<DirectoryGraph>,
186 ) -> Result<Self, RenderError> {
187 let directories_sorted = directory_closure
188 .map(|directory_closure| {
189 let mut directories_sorted: Vec<(B3Digest, Directory)> = vec![];
190 for dir in directory_closure.drain(DirectoryOrder::RootToLeaves) {
192 let digest = dir.digest();
193 let pos = directories_sorted
194 .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
195 digest.as_slice()
196 })
197 .expect_err("duplicate directory"); directories_sorted.insert(pos, (digest, dir));
199 }
200 directories_sorted
201 })
202 .unwrap_or_default();
203
204 let mut segments = vec![];
205 let mut cur_segment: Vec<u8> = vec![];
206 let mut offset = 0;
207
208 let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
209
210 walk_node(
211 &mut segments,
212 &mut offset,
213 &|digest| {
214 directories_sorted
215 .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
216 .map(|pos| directories_sorted[pos].clone())
217 .expect("missing directory") .1
219 },
220 root_node,
221 nar_node,
222 )?;
223 flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
225
226 Ok(Reader {
227 segments,
228 position_bytes: 0,
229 position_index: 0,
230 blob_service: blob_service.into(),
231 seeking: false,
232 current_blob: TryMaybeDone::Gone,
233 })
234 }
235
236 pub fn stream_len(&self) -> u64 {
237 self.segments
238 .last()
239 .map(|&(off, ref data)| off + data.len())
240 .expect("no segment found")
241 }
242}
243
244impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
245 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
246 let stream_len = Reader::stream_len(&self);
247
248 let this = &mut *self;
249 if this.seeking {
250 return Err(io::Error::other("Already seeking"));
251 }
252 this.seeking = true;
253
254 let pos = match pos {
256 io::SeekFrom::Start(n) => n,
257 io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
258 io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
259 };
260
261 let prev_position_bytes = this.position_bytes;
262 let prev_position_index = this.position_index;
263
264 this.position_bytes = min(pos, stream_len);
265 this.position_index = match this
266 .segments
267 .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
268 {
269 Ok(idx) => idx,
270 Err(idx) => idx - 1,
271 };
272
273 let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
274 this.segments.get(this.position_index)
275 else {
276 this.current_blob = TryMaybeDone::Gone;
278 return Ok(());
279 };
280 let offset_in_segment = this.position_bytes - offset;
281
282 if prev_position_bytes == this.position_bytes {
283 } else if prev_position_index == this.position_index {
285 let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
287 this.current_blob = futures::future::try_maybe_done(
288 (async move {
289 let mut reader = Pin::new(&mut prev).take_output().unwrap();
290 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
291 Ok(reader)
292 })
293 .boxed(),
294 );
295 } else {
296 let blob_service = this.blob_service.clone();
298 let digest = digest.clone();
299 this.current_blob = futures::future::try_maybe_done(
300 (async move {
301 let mut reader =
302 blob_service
303 .open_read(&digest)
304 .await?
305 .ok_or(io::Error::new(
306 io::ErrorKind::NotFound,
307 RenderError::BlobNotFound(digest.clone(), Default::default()),
308 ))?;
309 if offset_in_segment != 0 {
310 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
311 }
312 Ok(reader)
313 })
314 .boxed(),
315 );
316 };
317
318 Ok(())
319 }
320 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
321 let this = &mut *self;
322
323 if !this.current_blob.is_terminated() {
324 futures::ready!(this.current_blob.poll_unpin(cx))?;
325 }
326 this.seeking = false;
327
328 Poll::Ready(Ok(this.position_bytes))
329 }
330}
331
332impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
333 fn poll_read(
334 mut self: Pin<&mut Self>,
335 cx: &mut Context,
336 buf: &mut tokio::io::ReadBuf,
337 ) -> Poll<io::Result<()>> {
338 let this = &mut *self;
339
340 let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
341 return Poll::Ready(Ok(())); };
343
344 let prev_read_buf_pos = buf.filled().len();
345 match segment {
346 Data::Literal(data) => {
347 let offset_in_segment = this.position_bytes - offset;
348 let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
349 let remaining_data = data.len() - offset_in_segment;
350 let read_size = std::cmp::min(remaining_data, buf.remaining());
351 buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
352 }
353 Data::Blob(BlobRef { size, .. }) => {
354 futures::ready!(this.current_blob.poll_unpin(cx))?;
355 this.seeking = false;
356 let blob = Pin::new(&mut this.current_blob)
357 .output_mut()
358 .expect("missing blob");
359 futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
360 let read_length = buf.filled().len() - prev_read_buf_pos;
361 let maximum_expected_read_length = (offset + size) - this.position_bytes;
362 let is_eof = read_length == 0;
363 let too_much_returned = read_length as u64 > maximum_expected_read_length;
364 match (is_eof, too_much_returned) {
365 (true, false) => {
366 return Poll::Ready(Err(io::Error::new(
367 io::ErrorKind::UnexpectedEof,
368 "blob short read",
369 )));
370 }
371 (false, true) => {
372 buf.set_filled(prev_read_buf_pos);
373 return Poll::Ready(Err(io::Error::new(
374 io::ErrorKind::InvalidInput,
375 "blob continued to yield data beyond end",
376 )));
377 }
378 _ => {}
379 }
380 }
381 };
382 let new_read_buf_pos = buf.filled().len();
383 this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
384
385 let prev_position_index = this.position_index;
386 while {
387 if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
388 (this.position_bytes - offset) >= segment.len()
389 } else {
390 false
391 }
392 } {
393 this.position_index += 1;
394 }
395 if prev_position_index != this.position_index {
396 let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
397 this.segments.get(this.position_index)
398 else {
399 this.current_blob = TryMaybeDone::Gone;
401 return Poll::Ready(Ok(()));
402 };
403
404 let blob_service = this.blob_service.clone();
406 let digest = digest.clone();
407 this.current_blob = futures::future::try_maybe_done(
408 (async move {
409 let reader = blob_service
410 .open_read(&digest)
411 .await?
412 .ok_or(io::Error::new(
413 io::ErrorKind::NotFound,
414 RenderError::BlobNotFound(digest.clone(), Default::default()),
415 ))?;
416 Ok(reader)
417 })
418 .boxed(),
419 );
420 }
421
422 Poll::Ready(Ok(()))
423 }
424}