1use std::{
2 cmp::min,
3 collections::HashMap,
4 io,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9
10use super::RenderError;
11
12use bytes::{BufMut, Bytes};
13
14use nix_compat::nar::writer::sync as nar_writer;
15use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
16use snix_castore::{B3Digest, Node};
17use snix_castore::{
18 Directory,
19 blobservice::{BlobReader, BlobService},
20 directoryservice::DirectoryGraphBuilder,
21};
22
23use futures::FutureExt;
24use futures::TryStreamExt;
25use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
26
27use tokio::io::AsyncSeekExt;
28use tracing::instrument;
29
30#[derive(Debug)]
31struct BlobRef {
32 digest: B3Digest,
33 size: u64,
34}
35
36#[derive(Debug)]
37enum Data {
38 Literal(Bytes),
39 Blob(BlobRef),
40}
41
42impl Data {
43 pub fn len(&self) -> u64 {
44 match self {
45 Data::Literal(data) => data.len() as u64,
46 Data::Blob(BlobRef { size, .. }) => *size,
47 }
48 }
49}
50
51pub struct Reader<B: BlobService> {
52 segments: Vec<(u64, Data)>,
53 position_bytes: u64,
54 position_index: usize,
55 blob_service: Arc<B>,
56 seeking: bool,
57 current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
58}
59
60fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
64 let segment_size = cur_segment.len();
65 segments.push((*offset, Data::Literal(cur_segment.into())));
66 *offset += segment_size as u64;
67}
68
69fn walk_node(
73 segments: &mut Vec<(u64, Data)>,
74 offset: &mut u64,
75 directories: &HashMap<B3Digest, Directory>,
76 node: &Node,
77 nar_node: nar_writer::Node<'_, Vec<u8>>,
79) -> Result<(), std::io::Error> {
80 match node {
81 snix_castore::Node::Symlink { target } => {
82 nar_node.symlink(target.as_ref())?;
83 }
84 snix_castore::Node::File {
85 digest,
86 size,
87 executable,
88 } => {
89 let (cur_segment, skip) = nar_node.file_manual_write(*executable, *size)?;
90
91 flush_segment(segments, offset, std::mem::take(cur_segment));
93
94 segments.push((
96 *offset,
97 Data::Blob(BlobRef {
98 digest: *digest,
99 size: *size,
100 }),
101 ));
102 *offset += size;
103
104 skip.close(cur_segment)?;
110 }
111 snix_castore::Node::Directory { digest, .. } => {
112 let directory = directories
113 .get(digest)
114 .expect("Snix bug: directory not found");
115
116 let mut nar_node_directory = nar_node.directory()?;
118
119 for (name, node) in directory.nodes() {
122 let child_node = nar_node_directory.entry(name.as_ref())?;
123
124 walk_node(segments, offset, directories, node, child_node)?;
125 }
126
127 nar_node_directory.close()?;
129 }
130 }
131 Ok(())
132}
133
134impl<B: BlobService + 'static> Reader<B> {
135 #[instrument(skip(blob_service, directory_service), err)]
143 pub async fn new(
144 root_node: Node,
145 blob_service: B,
146 directory_service: impl DirectoryService,
147 ) -> Result<Self, RenderError> {
148 let maybe_directory_graph = if let Node::Directory { digest, .. } = &root_node {
150 let mut directories = directory_service.get_recursive(digest);
151 let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest.to_owned());
152
153 while let Some(directory) = directories
154 .try_next()
155 .await
156 .map_err(RenderError::DirectoryService)?
157 {
158 builder
159 .try_insert(directory)
160 .map_err(RenderError::OrderingError)?;
161 }
162
163 match builder.build() {
164 Ok(directory_graph) => Some(directory_graph),
165 Err(snix_castore::directoryservice::OrderingError::EmptySet) => None,
166 Err(e) => Err(RenderError::OrderingError(e))?,
167 }
168 } else {
169 None
171 };
172
173 Self::new_with_directory_graph(root_node, blob_service, maybe_directory_graph)
174 .map_err(RenderError::NARWriterError)
175 }
176
177 fn new_with_directory_graph(
183 root_node: Node,
184 blob_service: B,
185 directory_closure: Option<DirectoryGraph>,
186 ) -> Result<Self, std::io::Error> {
187 let directories: HashMap<B3Digest, Directory> = directory_closure
188 .map(|directory_graph| {
189 HashMap::from_iter(
191 directory_graph
192 .drain_leaves_to_root()
193 .map(|d| (d.digest(), d)),
194 )
195 })
196 .unwrap_or_default();
197
198 let mut segments = vec![];
199 let mut cur_segment: Vec<u8> = vec![];
200 let mut offset = 0;
201
202 let nar_node = nar_writer::open(&mut cur_segment)?;
203
204 walk_node(
205 &mut segments,
206 &mut offset,
207 &directories,
208 &root_node,
209 nar_node,
210 )?;
211 flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
213
214 Ok(Reader {
215 segments,
216 position_bytes: 0,
217 position_index: 0,
218 blob_service: blob_service.into(),
219 seeking: false,
220 current_blob: TryMaybeDone::Gone,
221 })
222 }
223
224 pub fn stream_len(&self) -> u64 {
225 self.segments
226 .last()
227 .map(|&(off, ref data)| off + data.len())
228 .expect("no segment found")
229 }
230}
231
232impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
233 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
234 let stream_len = Reader::stream_len(&self);
235
236 let this = &mut *self;
237 if this.seeking {
238 return Err(io::Error::other("Already seeking"));
239 }
240 this.seeking = true;
241
242 let pos = {
243 let (base, offset) = match pos {
244 io::SeekFrom::Start(n) => (n, 0),
245 io::SeekFrom::End(n) => (stream_len, n),
246 io::SeekFrom::Current(n) => (this.position_bytes, n),
247 };
248
249 base.saturating_add_signed(offset)
250 };
251
252 let prev_position_bytes = this.position_bytes;
253 let prev_position_index = this.position_index;
254
255 this.position_bytes = min(pos, stream_len);
256 this.position_index = match this
257 .segments
258 .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
259 {
260 Ok(idx) => idx,
261 Err(idx) => idx - 1,
262 };
263
264 let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
265 this.segments.get(this.position_index)
266 else {
267 this.current_blob = TryMaybeDone::Gone;
269 return Ok(());
270 };
271 let offset_in_segment = this.position_bytes - offset;
272
273 if prev_position_bytes == this.position_bytes {
274 } else if prev_position_index == this.position_index {
276 let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
278 this.current_blob = futures::future::try_maybe_done(
279 (async move {
280 let mut reader = Pin::new(&mut prev).take_output().unwrap();
281 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
282 Ok(reader)
283 })
284 .boxed(),
285 );
286 } else {
287 let blob_service = this.blob_service.clone();
289 let digest = *digest;
290 this.current_blob = futures::future::try_maybe_done(
291 (async move {
292 let mut reader =
293 blob_service
294 .open_read(&digest)
295 .await?
296 .ok_or(io::Error::new(
297 io::ErrorKind::NotFound,
298 RenderError::BlobNotFound(digest, Default::default()),
299 ))?;
300 if offset_in_segment != 0 {
301 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
302 }
303 Ok(reader)
304 })
305 .boxed(),
306 );
307 };
308
309 Ok(())
310 }
311 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
312 let this = &mut *self;
313
314 if !this.current_blob.is_terminated() {
315 futures::ready!(this.current_blob.poll_unpin(cx))?;
316 }
317 this.seeking = false;
318
319 Poll::Ready(Ok(this.position_bytes))
320 }
321}
322
323impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
324 fn poll_read(
325 mut self: Pin<&mut Self>,
326 cx: &mut Context,
327 buf: &mut tokio::io::ReadBuf,
328 ) -> Poll<io::Result<()>> {
329 let this = &mut *self;
330
331 let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
332 return Poll::Ready(Ok(())); };
334
335 let prev_read_buf_pos = buf.filled().len();
336 match segment {
337 Data::Literal(data) => {
338 let offset_in_segment = this.position_bytes - offset;
339 let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
340 let remaining_data = data.len() - offset_in_segment;
341 let read_size = std::cmp::min(remaining_data, buf.remaining());
342 buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
343 }
344 Data::Blob(BlobRef { size, .. }) => {
345 futures::ready!(this.current_blob.poll_unpin(cx))?;
346 this.seeking = false;
347 let blob = Pin::new(&mut this.current_blob)
348 .output_mut()
349 .expect("missing blob");
350 futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
351 let read_length = buf.filled().len() - prev_read_buf_pos;
352 let maximum_expected_read_length = (offset + size) - this.position_bytes;
353 let is_eof = read_length == 0;
354 let too_much_returned = read_length as u64 > maximum_expected_read_length;
355 match (is_eof, too_much_returned) {
356 (true, false) => {
357 return Poll::Ready(Err(io::Error::new(
358 io::ErrorKind::UnexpectedEof,
359 "blob short read",
360 )));
361 }
362 (false, true) => {
363 buf.set_filled(prev_read_buf_pos);
364 return Poll::Ready(Err(io::Error::new(
365 io::ErrorKind::InvalidInput,
366 "blob continued to yield data beyond end",
367 )));
368 }
369 _ => {}
370 }
371 }
372 };
373 let new_read_buf_pos = buf.filled().len();
374 this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
375
376 let prev_position_index = this.position_index;
377 while this
378 .segments
379 .get(this.position_index)
380 .is_some_and(|&(offset, ref segment)| (this.position_bytes - offset) >= segment.len())
381 {
382 this.position_index += 1;
383 }
384 if prev_position_index != this.position_index {
385 let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
386 this.segments.get(this.position_index)
387 else {
388 this.current_blob = TryMaybeDone::Gone;
390 return Poll::Ready(Ok(()));
391 };
392
393 let blob_service = this.blob_service.clone();
395 let digest = *digest;
396 this.current_blob = futures::future::try_maybe_done(
397 (async move {
398 let reader = blob_service
399 .open_read(&digest)
400 .await?
401 .ok_or(io::Error::new(
402 io::ErrorKind::NotFound,
403 RenderError::BlobNotFound(digest, Default::default()),
404 ))?;
405 Ok(reader)
406 })
407 .boxed(),
408 );
409 }
410
411 Poll::Ready(Ok(()))
412 }
413}