snix_store/nar/
seekable.rs

1use std::{
2    cmp::min,
3    collections::HashMap,
4    io,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8};
9
10use super::RenderError;
11
12use bytes::{BufMut, Bytes};
13
14use nix_compat::nar::writer::sync as nar_writer;
15use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
16use snix_castore::{B3Digest, Node};
17use snix_castore::{
18    Directory,
19    blobservice::{BlobReader, BlobService},
20    directoryservice::DirectoryGraphBuilder,
21};
22
23use futures::FutureExt;
24use futures::TryStreamExt;
25use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
26
27use tokio::io::AsyncSeekExt;
28use tracing::instrument;
29
30#[derive(Debug)]
31struct BlobRef {
32    digest: B3Digest,
33    size: u64,
34}
35
36#[derive(Debug)]
37enum Data {
38    Literal(Bytes),
39    Blob(BlobRef),
40}
41
42impl Data {
43    pub fn len(&self) -> u64 {
44        match self {
45            Data::Literal(data) => data.len() as u64,
46            Data::Blob(BlobRef { size, .. }) => *size,
47        }
48    }
49}
50
51pub struct Reader<B: BlobService> {
52    segments: Vec<(u64, Data)>,
53    position_bytes: u64,
54    position_index: usize,
55    blob_service: Arc<B>,
56    seeking: bool,
57    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
58}
59
60/// Used during construction.
61/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
62/// inserts it into `self.segments`.
63fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
64    let segment_size = cur_segment.len();
65    segments.push((*offset, Data::Literal(cur_segment.into())));
66    *offset += segment_size as u64;
67}
68
69/// Used during construction.
70/// Recursively walks the node and its children, and fills `segments` with the appropriate
71/// `Data::Literal` and `Data::Blob` elements.
72fn walk_node(
73    segments: &mut Vec<(u64, Data)>,
74    offset: &mut u64,
75    directories: &HashMap<B3Digest, Directory>,
76    node: &Node,
77    // Includes a reference to the current segment's buffer
78    nar_node: nar_writer::Node<'_, Vec<u8>>,
79) -> Result<(), RenderError> {
80    match node {
81        snix_castore::Node::Symlink { target } => {
82            nar_node
83                .symlink(target.as_ref())
84                .map_err(RenderError::NARWriterError)?;
85        }
86        snix_castore::Node::File {
87            digest,
88            size,
89            executable,
90        } => {
91            let (cur_segment, skip) = nar_node
92                .file_manual_write(*executable, *size)
93                .map_err(RenderError::NARWriterError)?;
94
95            // Flush the segment up until the beginning of the blob
96            flush_segment(segments, offset, std::mem::take(cur_segment));
97
98            // Insert the blob segment
99            segments.push((
100                *offset,
101                Data::Blob(BlobRef {
102                    digest: *digest,
103                    size: *size,
104                }),
105            ));
106            *offset += size;
107
108            // Close the file node
109            // We **intentionally** do not write the file contents anywhere.
110            // Instead we have stored the blob reference in a Data::Blob segment,
111            // and the poll_read implementation will take care of serving the
112            // appropriate blob at this offset.
113            skip.close(cur_segment)
114                .map_err(RenderError::NARWriterError)?;
115        }
116        snix_castore::Node::Directory { digest, .. } => {
117            let directory = directories
118                .get(digest)
119                .expect("Snix bug: directory not found");
120
121            // start a directory node
122            let mut nar_node_directory =
123                nar_node.directory().map_err(RenderError::NARWriterError)?;
124
125            // for each node in the directory, create a new entry with its name,
126            // and then recurse on that entry.
127            for (name, node) in directory.nodes() {
128                let child_node = nar_node_directory
129                    .entry(name.as_ref())
130                    .map_err(RenderError::NARWriterError)?;
131
132                walk_node(segments, offset, directories, node, child_node)?;
133            }
134
135            // close the directory
136            nar_node_directory
137                .close()
138                .map_err(RenderError::NARWriterError)?;
139        }
140    }
141    Ok(())
142}
143
144impl<B: BlobService + 'static> Reader<B> {
145    /// Creates a new seekable NAR renderer for the given castore root node.
146    ///
147    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
148    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
149    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
150    /// between serving the precomputed literal segments, and the appropriate blob for the file
151    /// contents.
152    #[instrument(skip(blob_service, directory_service), err)]
153    pub async fn new(
154        root_node: Node,
155        blob_service: B,
156        directory_service: impl DirectoryService,
157    ) -> Result<Self, RenderError> {
158        // If this is a directory, resolve all subdirectories
159        let maybe_directory_graph = if let Node::Directory { digest, .. } = &root_node {
160            let mut directories = directory_service.get_recursive(digest);
161            let mut builder = DirectoryGraphBuilder::new_root_to_leaves(digest.to_owned());
162
163            while let Some(directory) = directories
164                .try_next()
165                .await
166                .map_err(RenderError::DirectoryService)?
167            {
168                builder
169                    .try_insert(directory)
170                    .map_err(RenderError::OrderingError)?;
171            }
172
173            match builder.build() {
174                Ok(directory_graph) => Some(directory_graph),
175                Err(snix_castore::directoryservice::OrderingError::EmptySet) => None,
176                Err(e) => Err(RenderError::OrderingError(e))?,
177            }
178        } else {
179            // If the top-level node is a file or a symlink, just pass it on
180            None
181        };
182
183        Self::new_with_directory_graph(root_node, blob_service, maybe_directory_graph)
184    }
185
186    /// Creates a new seekable NAR renderer for the given castore root node.
187    /// This version of the instantiation does not perform any I/O and as such is not async.
188    /// However it requires all directories to be passed as a [DirectoryGraph].
189    ///
190    /// panics if the directory closure is not the closure of the root node
191    pub fn new_with_directory_graph(
192        root_node: Node,
193        blob_service: B,
194        directory_closure: Option<DirectoryGraph>,
195    ) -> Result<Self, RenderError> {
196        let directories: HashMap<B3Digest, Directory> = directory_closure
197            .map(|directory_graph| {
198                // We don't really care about the drain order
199                HashMap::from_iter(
200                    directory_graph
201                        .drain_leaves_to_root()
202                        .map(|d| (d.digest(), d)),
203                )
204            })
205            .unwrap_or_default();
206
207        let mut segments = vec![];
208        let mut cur_segment: Vec<u8> = vec![];
209        let mut offset = 0;
210
211        let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
212
213        walk_node(
214            &mut segments,
215            &mut offset,
216            &directories,
217            &root_node,
218            nar_node,
219        )?;
220        // Flush the final segment
221        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
222
223        Ok(Reader {
224            segments,
225            position_bytes: 0,
226            position_index: 0,
227            blob_service: blob_service.into(),
228            seeking: false,
229            current_blob: TryMaybeDone::Gone,
230        })
231    }
232
233    pub fn stream_len(&self) -> u64 {
234        self.segments
235            .last()
236            .map(|&(off, ref data)| off + data.len())
237            .expect("no segment found")
238    }
239}
240
241impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
242    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
243        let stream_len = Reader::stream_len(&self);
244
245        let this = &mut *self;
246        if this.seeking {
247            return Err(io::Error::other("Already seeking"));
248        }
249        this.seeking = true;
250
251        let pos = {
252            let (base, offset) = match pos {
253                io::SeekFrom::Start(n) => (n, 0),
254                io::SeekFrom::End(n) => (stream_len, n),
255                io::SeekFrom::Current(n) => (this.position_bytes, n),
256            };
257
258            base.saturating_add_signed(offset)
259        };
260
261        let prev_position_bytes = this.position_bytes;
262        let prev_position_index = this.position_index;
263
264        this.position_bytes = min(pos, stream_len);
265        this.position_index = match this
266            .segments
267            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
268        {
269            Ok(idx) => idx,
270            Err(idx) => idx - 1,
271        };
272
273        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
274            this.segments.get(this.position_index)
275        else {
276            // If not seeking into a blob, we clear the active blob reader and then we're done
277            this.current_blob = TryMaybeDone::Gone;
278            return Ok(());
279        };
280        let offset_in_segment = this.position_bytes - offset;
281
282        if prev_position_bytes == this.position_bytes {
283            // position has not changed. do nothing
284        } else if prev_position_index == this.position_index {
285            // seeking within the same segment, re-use the blob reader
286            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
287            this.current_blob = futures::future::try_maybe_done(
288                (async move {
289                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
290                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
291                    Ok(reader)
292                })
293                .boxed(),
294            );
295        } else {
296            // seek to a different segment
297            let blob_service = this.blob_service.clone();
298            let digest = *digest;
299            this.current_blob = futures::future::try_maybe_done(
300                (async move {
301                    let mut reader =
302                        blob_service
303                            .open_read(&digest)
304                            .await?
305                            .ok_or(io::Error::new(
306                                io::ErrorKind::NotFound,
307                                RenderError::BlobNotFound(digest, Default::default()),
308                            ))?;
309                    if offset_in_segment != 0 {
310                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
311                    }
312                    Ok(reader)
313                })
314                .boxed(),
315            );
316        };
317
318        Ok(())
319    }
320    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
321        let this = &mut *self;
322
323        if !this.current_blob.is_terminated() {
324            futures::ready!(this.current_blob.poll_unpin(cx))?;
325        }
326        this.seeking = false;
327
328        Poll::Ready(Ok(this.position_bytes))
329    }
330}
331
332impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
333    fn poll_read(
334        mut self: Pin<&mut Self>,
335        cx: &mut Context,
336        buf: &mut tokio::io::ReadBuf,
337    ) -> Poll<io::Result<()>> {
338        let this = &mut *self;
339
340        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
341            return Poll::Ready(Ok(())); // EOF
342        };
343
344        let prev_read_buf_pos = buf.filled().len();
345        match segment {
346            Data::Literal(data) => {
347                let offset_in_segment = this.position_bytes - offset;
348                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
349                let remaining_data = data.len() - offset_in_segment;
350                let read_size = std::cmp::min(remaining_data, buf.remaining());
351                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
352            }
353            Data::Blob(BlobRef { size, .. }) => {
354                futures::ready!(this.current_blob.poll_unpin(cx))?;
355                this.seeking = false;
356                let blob = Pin::new(&mut this.current_blob)
357                    .output_mut()
358                    .expect("missing blob");
359                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
360                let read_length = buf.filled().len() - prev_read_buf_pos;
361                let maximum_expected_read_length = (offset + size) - this.position_bytes;
362                let is_eof = read_length == 0;
363                let too_much_returned = read_length as u64 > maximum_expected_read_length;
364                match (is_eof, too_much_returned) {
365                    (true, false) => {
366                        return Poll::Ready(Err(io::Error::new(
367                            io::ErrorKind::UnexpectedEof,
368                            "blob short read",
369                        )));
370                    }
371                    (false, true) => {
372                        buf.set_filled(prev_read_buf_pos);
373                        return Poll::Ready(Err(io::Error::new(
374                            io::ErrorKind::InvalidInput,
375                            "blob continued to yield data beyond end",
376                        )));
377                    }
378                    _ => {}
379                }
380            }
381        };
382        let new_read_buf_pos = buf.filled().len();
383        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
384
385        let prev_position_index = this.position_index;
386        while {
387            if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
388                (this.position_bytes - offset) >= segment.len()
389            } else {
390                false
391            }
392        } {
393            this.position_index += 1;
394        }
395        if prev_position_index != this.position_index {
396            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
397                this.segments.get(this.position_index)
398            else {
399                // If the next segment is not a blob, we clear the active blob reader and then we're done
400                this.current_blob = TryMaybeDone::Gone;
401                return Poll::Ready(Ok(()));
402            };
403
404            // The next segment is a blob, open the BlobReader
405            let blob_service = this.blob_service.clone();
406            let digest = *digest;
407            this.current_blob = futures::future::try_maybe_done(
408                (async move {
409                    let reader = blob_service
410                        .open_read(&digest)
411                        .await?
412                        .ok_or(io::Error::new(
413                            io::ErrorKind::NotFound,
414                            RenderError::BlobNotFound(digest, Default::default()),
415                        ))?;
416                    Ok(reader)
417                })
418                .boxed(),
419            );
420        }
421
422        Poll::Ready(Ok(()))
423    }
424}