snix_store/nar/
seekable.rs

1use std::{
2    cmp::min,
3    collections::HashMap,
4    io,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8};
9
10use super::RenderError;
11
12use bytes::{BufMut, Bytes};
13
14use nix_compat::nar::writer::sync as nar_writer;
15use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
16use snix_castore::{B3Digest, Node};
17use snix_castore::{
18    Directory,
19    blobservice::{BlobReader, BlobService},
20    directoryservice::DirectoryGraphBuilder,
21};
22
23use futures::FutureExt;
24use futures::TryStreamExt;
25use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
26
27use tokio::io::AsyncSeekExt;
28use tracing::instrument;
29
30#[derive(Debug)]
31struct BlobRef {
32    digest: B3Digest,
33    size: u64,
34}
35
36#[derive(Debug)]
37enum Data {
38    Literal(Bytes),
39    Blob(BlobRef),
40}
41
42impl Data {
43    pub fn len(&self) -> u64 {
44        match self {
45            Data::Literal(data) => data.len() as u64,
46            Data::Blob(BlobRef { size, .. }) => *size,
47        }
48    }
49}
50
51pub struct Reader<B: BlobService> {
52    segments: Vec<(u64, Data)>,
53    position_bytes: u64,
54    position_index: usize,
55    blob_service: Arc<B>,
56    seeking: bool,
57    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
58}
59
60/// Used during construction.
61/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
62/// inserts it into `self.segments`.
63fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
64    let segment_size = cur_segment.len();
65    segments.push((*offset, Data::Literal(cur_segment.into())));
66    *offset += segment_size as u64;
67}
68
69/// Used during construction.
70/// Recursively walks the node and its children, and fills `segments` with the appropriate
71/// `Data::Literal` and `Data::Blob` elements.
72fn walk_node(
73    segments: &mut Vec<(u64, Data)>,
74    offset: &mut u64,
75    directories: &HashMap<B3Digest, Directory>,
76    node: &Node,
77    // Includes a reference to the current segment's buffer
78    nar_node: nar_writer::Node<'_, Vec<u8>>,
79) -> Result<(), std::io::Error> {
80    match node {
81        snix_castore::Node::Symlink { target } => {
82            nar_node.symlink(target.as_ref())?;
83        }
84        snix_castore::Node::File {
85            digest,
86            size,
87            executable,
88        } => {
89            let (cur_segment, skip) = nar_node.file_manual_write(*executable, *size)?;
90
91            // Flush the segment up until the beginning of the blob
92            flush_segment(segments, offset, std::mem::take(cur_segment));
93
94            // Insert the blob segment
95            segments.push((
96                *offset,
97                Data::Blob(BlobRef {
98                    digest: *digest,
99                    size: *size,
100                }),
101            ));
102            *offset += size;
103
104            // Close the file node
105            // We **intentionally** do not write the file contents anywhere.
106            // Instead we have stored the blob reference in a Data::Blob segment,
107            // and the poll_read implementation will take care of serving the
108            // appropriate blob at this offset.
109            skip.close(cur_segment)?;
110        }
111        snix_castore::Node::Directory { digest, .. } => {
112            let directory = directories
113                .get(digest)
114                .expect("Snix bug: directory not found");
115
116            // start a directory node
117            let mut nar_node_directory = nar_node.directory()?;
118
119            // for each node in the directory, create a new entry with its name,
120            // and then recurse on that entry.
121            for (name, node) in directory.nodes() {
122                let child_node = nar_node_directory.entry(name.as_ref())?;
123
124                walk_node(segments, offset, directories, node, child_node)?;
125            }
126
127            // close the directory
128            nar_node_directory.close()?;
129        }
130    }
131    Ok(())
132}
133
134impl<B: BlobService + 'static> Reader<B> {
135    /// Creates a new seekable NAR renderer for the given castore root node.
136    ///
137    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
138    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
139    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
140    /// between serving the precomputed literal segments, and the appropriate blob for the file
141    /// contents.
142    #[instrument(skip(blob_service, directory_service), err)]
143    pub async fn new(
144        root_node: Node,
145        blob_service: B,
146        directory_service: impl DirectoryService,
147    ) -> Result<Self, RenderError> {
148        // If this is a directory, resolve all subdirectories
149        let maybe_directory_graph = if let Node::Directory { digest, .. } = &root_node {
150            let mut directories = directory_service.get_recursive(digest);
151            let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest.to_owned());
152
153            while let Some(directory) = directories
154                .try_next()
155                .await
156                .map_err(RenderError::DirectoryService)?
157            {
158                builder
159                    .try_insert(directory)
160                    .map_err(RenderError::OrderingError)?;
161            }
162
163            match builder.build() {
164                Ok(directory_graph) => Some(directory_graph),
165                Err(snix_castore::directoryservice::OrderingError::EmptySet) => None,
166                Err(e) => Err(RenderError::OrderingError(e))?,
167            }
168        } else {
169            // If the top-level node is a file or a symlink, just pass it on
170            None
171        };
172
173        Self::new_with_directory_graph(root_node, blob_service, maybe_directory_graph)
174            .map_err(RenderError::NARWriterError)
175    }
176
177    /// Creates a new seekable NAR renderer for the given castore root node.
178    /// This version of the instantiation does not perform any I/O and as such is not async.
179    /// However it requires all directories to be passed as a [DirectoryGraph].
180    ///
181    /// panics if the directory closure is not the closure of the root node
182    fn new_with_directory_graph(
183        root_node: Node,
184        blob_service: B,
185        directory_closure: Option<DirectoryGraph>,
186    ) -> Result<Self, std::io::Error> {
187        let directories: HashMap<B3Digest, Directory> = directory_closure
188            .map(|directory_graph| {
189                // We don't really care about the drain order
190                HashMap::from_iter(
191                    directory_graph
192                        .drain_leaves_to_root()
193                        .map(|d| (d.digest(), d)),
194                )
195            })
196            .unwrap_or_default();
197
198        let mut segments = vec![];
199        let mut cur_segment: Vec<u8> = vec![];
200        let mut offset = 0;
201
202        let nar_node = nar_writer::open(&mut cur_segment)?;
203
204        walk_node(
205            &mut segments,
206            &mut offset,
207            &directories,
208            &root_node,
209            nar_node,
210        )?;
211        // Flush the final segment
212        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
213
214        Ok(Reader {
215            segments,
216            position_bytes: 0,
217            position_index: 0,
218            blob_service: blob_service.into(),
219            seeking: false,
220            current_blob: TryMaybeDone::Gone,
221        })
222    }
223
224    pub fn stream_len(&self) -> u64 {
225        self.segments
226            .last()
227            .map(|&(off, ref data)| off + data.len())
228            .expect("no segment found")
229    }
230}
231
232impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
233    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
234        let stream_len = Reader::stream_len(&self);
235
236        let this = &mut *self;
237        if this.seeking {
238            return Err(io::Error::other("Already seeking"));
239        }
240        this.seeking = true;
241
242        let pos = {
243            let (base, offset) = match pos {
244                io::SeekFrom::Start(n) => (n, 0),
245                io::SeekFrom::End(n) => (stream_len, n),
246                io::SeekFrom::Current(n) => (this.position_bytes, n),
247            };
248
249            base.saturating_add_signed(offset)
250        };
251
252        let prev_position_bytes = this.position_bytes;
253        let prev_position_index = this.position_index;
254
255        this.position_bytes = min(pos, stream_len);
256        this.position_index = match this
257            .segments
258            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
259        {
260            Ok(idx) => idx,
261            Err(idx) => idx - 1,
262        };
263
264        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
265            this.segments.get(this.position_index)
266        else {
267            // If not seeking into a blob, we clear the active blob reader and then we're done
268            this.current_blob = TryMaybeDone::Gone;
269            return Ok(());
270        };
271        let offset_in_segment = this.position_bytes - offset;
272
273        if prev_position_bytes == this.position_bytes {
274            // position has not changed. do nothing
275        } else if prev_position_index == this.position_index {
276            // seeking within the same segment, re-use the blob reader
277            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
278            this.current_blob = futures::future::try_maybe_done(
279                (async move {
280                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
281                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
282                    Ok(reader)
283                })
284                .boxed(),
285            );
286        } else {
287            // seek to a different segment
288            let blob_service = this.blob_service.clone();
289            let digest = *digest;
290            this.current_blob = futures::future::try_maybe_done(
291                (async move {
292                    let mut reader =
293                        blob_service
294                            .open_read(&digest)
295                            .await?
296                            .ok_or(io::Error::new(
297                                io::ErrorKind::NotFound,
298                                RenderError::BlobNotFound(digest, Default::default()),
299                            ))?;
300                    if offset_in_segment != 0 {
301                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
302                    }
303                    Ok(reader)
304                })
305                .boxed(),
306            );
307        };
308
309        Ok(())
310    }
311    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
312        let this = &mut *self;
313
314        if !this.current_blob.is_terminated() {
315            futures::ready!(this.current_blob.poll_unpin(cx))?;
316        }
317        this.seeking = false;
318
319        Poll::Ready(Ok(this.position_bytes))
320    }
321}
322
323impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
324    fn poll_read(
325        mut self: Pin<&mut Self>,
326        cx: &mut Context,
327        buf: &mut tokio::io::ReadBuf,
328    ) -> Poll<io::Result<()>> {
329        let this = &mut *self;
330
331        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
332            return Poll::Ready(Ok(())); // EOF
333        };
334
335        let prev_read_buf_pos = buf.filled().len();
336        match segment {
337            Data::Literal(data) => {
338                let offset_in_segment = this.position_bytes - offset;
339                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
340                let remaining_data = data.len() - offset_in_segment;
341                let read_size = std::cmp::min(remaining_data, buf.remaining());
342                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
343            }
344            Data::Blob(BlobRef { size, .. }) => {
345                futures::ready!(this.current_blob.poll_unpin(cx))?;
346                this.seeking = false;
347                let blob = Pin::new(&mut this.current_blob)
348                    .output_mut()
349                    .expect("missing blob");
350                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
351                let read_length = buf.filled().len() - prev_read_buf_pos;
352                let maximum_expected_read_length = (offset + size) - this.position_bytes;
353                let is_eof = read_length == 0;
354                let too_much_returned = read_length as u64 > maximum_expected_read_length;
355                match (is_eof, too_much_returned) {
356                    (true, false) => {
357                        return Poll::Ready(Err(io::Error::new(
358                            io::ErrorKind::UnexpectedEof,
359                            "blob short read",
360                        )));
361                    }
362                    (false, true) => {
363                        buf.set_filled(prev_read_buf_pos);
364                        return Poll::Ready(Err(io::Error::new(
365                            io::ErrorKind::InvalidInput,
366                            "blob continued to yield data beyond end",
367                        )));
368                    }
369                    _ => {}
370                }
371            }
372        };
373        let new_read_buf_pos = buf.filled().len();
374        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
375
376        let prev_position_index = this.position_index;
377        while this
378            .segments
379            .get(this.position_index)
380            .is_some_and(|&(offset, ref segment)| (this.position_bytes - offset) >= segment.len())
381        {
382            this.position_index += 1;
383        }
384        if prev_position_index != this.position_index {
385            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
386                this.segments.get(this.position_index)
387            else {
388                // If the next segment is not a blob, we clear the active blob reader and then we're done
389                this.current_blob = TryMaybeDone::Gone;
390                return Poll::Ready(Ok(()));
391            };
392
393            // The next segment is a blob, open the BlobReader
394            let blob_service = this.blob_service.clone();
395            let digest = *digest;
396            this.current_blob = futures::future::try_maybe_done(
397                (async move {
398                    let reader = blob_service
399                        .open_read(&digest)
400                        .await?
401                        .ok_or(io::Error::new(
402                            io::ErrorKind::NotFound,
403                            RenderError::BlobNotFound(digest, Default::default()),
404                        ))?;
405                    Ok(reader)
406                })
407                .boxed(),
408            );
409        }
410
411        Poll::Ready(Ok(()))
412    }
413}