1use std::{
2 cmp::min,
3 collections::HashMap,
4 io,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9
10use super::RenderError;
11
12use bytes::{BufMut, Bytes};
13
14use nix_compat::nar::writer::sync as nar_writer;
15use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
16use snix_castore::{B3Digest, Node};
17use snix_castore::{
18 Directory,
19 blobservice::{BlobReader, BlobService},
20 directoryservice::DirectoryGraphBuilder,
21};
22
23use futures::FutureExt;
24use futures::TryStreamExt;
25use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
26
27use tokio::io::AsyncSeekExt;
28use tracing::instrument;
29
30#[derive(Debug)]
31struct BlobRef {
32 digest: B3Digest,
33 size: u64,
34}
35
36#[derive(Debug)]
37enum Data {
38 Literal(Bytes),
39 Blob(BlobRef),
40}
41
42impl Data {
43 pub fn len(&self) -> u64 {
44 match self {
45 Data::Literal(data) => data.len() as u64,
46 Data::Blob(BlobRef { size, .. }) => *size,
47 }
48 }
49}
50
51pub struct Reader<B: BlobService> {
52 segments: Vec<(u64, Data)>,
53 position_bytes: u64,
54 position_index: usize,
55 blob_service: Arc<B>,
56 seeking: bool,
57 current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
58}
59
60fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
64 let segment_size = cur_segment.len();
65 segments.push((*offset, Data::Literal(cur_segment.into())));
66 *offset += segment_size as u64;
67}
68
69fn walk_node(
73 segments: &mut Vec<(u64, Data)>,
74 offset: &mut u64,
75 directories: &HashMap<B3Digest, Directory>,
76 node: &Node,
77 nar_node: nar_writer::Node<'_, Vec<u8>>,
79) -> Result<(), RenderError> {
80 match node {
81 snix_castore::Node::Symlink { target } => {
82 nar_node
83 .symlink(target.as_ref())
84 .map_err(RenderError::NARWriterError)?;
85 }
86 snix_castore::Node::File {
87 digest,
88 size,
89 executable,
90 } => {
91 let (cur_segment, skip) = nar_node
92 .file_manual_write(*executable, *size)
93 .map_err(RenderError::NARWriterError)?;
94
95 flush_segment(segments, offset, std::mem::take(cur_segment));
97
98 segments.push((
100 *offset,
101 Data::Blob(BlobRef {
102 digest: *digest,
103 size: *size,
104 }),
105 ));
106 *offset += size;
107
108 skip.close(cur_segment)
114 .map_err(RenderError::NARWriterError)?;
115 }
116 snix_castore::Node::Directory { digest, .. } => {
117 let directory = directories
118 .get(digest)
119 .expect("Snix bug: directory not found");
120
121 let mut nar_node_directory =
123 nar_node.directory().map_err(RenderError::NARWriterError)?;
124
125 for (name, node) in directory.nodes() {
128 let child_node = nar_node_directory
129 .entry(name.as_ref())
130 .map_err(RenderError::NARWriterError)?;
131
132 walk_node(segments, offset, directories, node, child_node)?;
133 }
134
135 nar_node_directory
137 .close()
138 .map_err(RenderError::NARWriterError)?;
139 }
140 }
141 Ok(())
142}
143
144impl<B: BlobService + 'static> Reader<B> {
145 #[instrument(skip(blob_service, directory_service), err)]
153 pub async fn new(
154 root_node: Node,
155 blob_service: B,
156 directory_service: impl DirectoryService,
157 ) -> Result<Self, RenderError> {
158 let maybe_directory_graph = if let Node::Directory { digest, .. } = &root_node {
160 let mut directories = directory_service.get_recursive(digest);
161 let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest.to_owned());
162
163 while let Some(directory) = directories
164 .try_next()
165 .await
166 .map_err(RenderError::DirectoryService)?
167 {
168 builder
169 .try_insert(directory)
170 .map_err(RenderError::OrderingError)?;
171 }
172
173 match builder.build() {
174 Ok(directory_graph) => Some(directory_graph),
175 Err(snix_castore::directoryservice::OrderingError::EmptySet) => None,
176 Err(e) => Err(RenderError::OrderingError(e))?,
177 }
178 } else {
179 None
181 };
182
183 Self::new_with_directory_graph(root_node, blob_service, maybe_directory_graph)
184 }
185
186 pub fn new_with_directory_graph(
192 root_node: Node,
193 blob_service: B,
194 directory_closure: Option<DirectoryGraph>,
195 ) -> Result<Self, RenderError> {
196 let directories: HashMap<B3Digest, Directory> = directory_closure
197 .map(|directory_graph| {
198 HashMap::from_iter(
200 directory_graph
201 .drain_leaves_to_root()
202 .map(|d| (d.digest(), d)),
203 )
204 })
205 .unwrap_or_default();
206
207 let mut segments = vec![];
208 let mut cur_segment: Vec<u8> = vec![];
209 let mut offset = 0;
210
211 let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
212
213 walk_node(
214 &mut segments,
215 &mut offset,
216 &directories,
217 &root_node,
218 nar_node,
219 )?;
220 flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
222
223 Ok(Reader {
224 segments,
225 position_bytes: 0,
226 position_index: 0,
227 blob_service: blob_service.into(),
228 seeking: false,
229 current_blob: TryMaybeDone::Gone,
230 })
231 }
232
233 pub fn stream_len(&self) -> u64 {
234 self.segments
235 .last()
236 .map(|&(off, ref data)| off + data.len())
237 .expect("no segment found")
238 }
239}
240
241impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
242 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
243 let stream_len = Reader::stream_len(&self);
244
245 let this = &mut *self;
246 if this.seeking {
247 return Err(io::Error::other("Already seeking"));
248 }
249 this.seeking = true;
250
251 let pos = {
252 let (base, offset) = match pos {
253 io::SeekFrom::Start(n) => (n, 0),
254 io::SeekFrom::End(n) => (stream_len, n),
255 io::SeekFrom::Current(n) => (this.position_bytes, n),
256 };
257
258 base.saturating_add_signed(offset)
259 };
260
261 let prev_position_bytes = this.position_bytes;
262 let prev_position_index = this.position_index;
263
264 this.position_bytes = min(pos, stream_len);
265 this.position_index = match this
266 .segments
267 .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
268 {
269 Ok(idx) => idx,
270 Err(idx) => idx - 1,
271 };
272
273 let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
274 this.segments.get(this.position_index)
275 else {
276 this.current_blob = TryMaybeDone::Gone;
278 return Ok(());
279 };
280 let offset_in_segment = this.position_bytes - offset;
281
282 if prev_position_bytes == this.position_bytes {
283 } else if prev_position_index == this.position_index {
285 let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
287 this.current_blob = futures::future::try_maybe_done(
288 (async move {
289 let mut reader = Pin::new(&mut prev).take_output().unwrap();
290 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
291 Ok(reader)
292 })
293 .boxed(),
294 );
295 } else {
296 let blob_service = this.blob_service.clone();
298 let digest = *digest;
299 this.current_blob = futures::future::try_maybe_done(
300 (async move {
301 let mut reader =
302 blob_service
303 .open_read(&digest)
304 .await?
305 .ok_or(io::Error::new(
306 io::ErrorKind::NotFound,
307 RenderError::BlobNotFound(digest, Default::default()),
308 ))?;
309 if offset_in_segment != 0 {
310 reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
311 }
312 Ok(reader)
313 })
314 .boxed(),
315 );
316 };
317
318 Ok(())
319 }
320 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
321 let this = &mut *self;
322
323 if !this.current_blob.is_terminated() {
324 futures::ready!(this.current_blob.poll_unpin(cx))?;
325 }
326 this.seeking = false;
327
328 Poll::Ready(Ok(this.position_bytes))
329 }
330}
331
332impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
333 fn poll_read(
334 mut self: Pin<&mut Self>,
335 cx: &mut Context,
336 buf: &mut tokio::io::ReadBuf,
337 ) -> Poll<io::Result<()>> {
338 let this = &mut *self;
339
340 let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
341 return Poll::Ready(Ok(())); };
343
344 let prev_read_buf_pos = buf.filled().len();
345 match segment {
346 Data::Literal(data) => {
347 let offset_in_segment = this.position_bytes - offset;
348 let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
349 let remaining_data = data.len() - offset_in_segment;
350 let read_size = std::cmp::min(remaining_data, buf.remaining());
351 buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
352 }
353 Data::Blob(BlobRef { size, .. }) => {
354 futures::ready!(this.current_blob.poll_unpin(cx))?;
355 this.seeking = false;
356 let blob = Pin::new(&mut this.current_blob)
357 .output_mut()
358 .expect("missing blob");
359 futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
360 let read_length = buf.filled().len() - prev_read_buf_pos;
361 let maximum_expected_read_length = (offset + size) - this.position_bytes;
362 let is_eof = read_length == 0;
363 let too_much_returned = read_length as u64 > maximum_expected_read_length;
364 match (is_eof, too_much_returned) {
365 (true, false) => {
366 return Poll::Ready(Err(io::Error::new(
367 io::ErrorKind::UnexpectedEof,
368 "blob short read",
369 )));
370 }
371 (false, true) => {
372 buf.set_filled(prev_read_buf_pos);
373 return Poll::Ready(Err(io::Error::new(
374 io::ErrorKind::InvalidInput,
375 "blob continued to yield data beyond end",
376 )));
377 }
378 _ => {}
379 }
380 }
381 };
382 let new_read_buf_pos = buf.filled().len();
383 this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
384
385 let prev_position_index = this.position_index;
386 while {
387 if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
388 (this.position_bytes - offset) >= segment.len()
389 } else {
390 false
391 }
392 } {
393 this.position_index += 1;
394 }
395 if prev_position_index != this.position_index {
396 let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
397 this.segments.get(this.position_index)
398 else {
399 this.current_blob = TryMaybeDone::Gone;
401 return Poll::Ready(Ok(()));
402 };
403
404 let blob_service = this.blob_service.clone();
406 let digest = *digest;
407 this.current_blob = futures::future::try_maybe_done(
408 (async move {
409 let reader = blob_service
410 .open_read(&digest)
411 .await?
412 .ok_or(io::Error::new(
413 io::ErrorKind::NotFound,
414 RenderError::BlobNotFound(digest, Default::default()),
415 ))?;
416 Ok(reader)
417 })
418 .boxed(),
419 );
420 }
421
422 Poll::Ready(Ok(()))
423 }
424}