snix_store/nar/
seekable.rs

1use std::{
2    cmp::min,
3    io,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use super::RenderError;
10
11use bytes::{BufMut, Bytes};
12
13use nix_compat::nar::writer::sync as nar_writer;
14use snix_castore::Directory;
15use snix_castore::blobservice::{BlobReader, BlobService};
16use snix_castore::directoryservice::{
17    DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
18};
19use snix_castore::{B3Digest, Node};
20
21use futures::FutureExt;
22use futures::TryStreamExt;
23use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
24
25use tokio::io::AsyncSeekExt;
26
27#[derive(Debug)]
28struct BlobRef {
29    digest: B3Digest,
30    size: u64,
31}
32
33#[derive(Debug)]
34enum Data {
35    Literal(Bytes),
36    Blob(BlobRef),
37}
38
39impl Data {
40    pub fn len(&self) -> u64 {
41        match self {
42            Data::Literal(data) => data.len() as u64,
43            Data::Blob(BlobRef { size, .. }) => *size,
44        }
45    }
46}
47
48pub struct Reader<B: BlobService> {
49    segments: Vec<(u64, Data)>,
50    position_bytes: u64,
51    position_index: usize,
52    blob_service: Arc<B>,
53    seeking: bool,
54    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
55}
56
57/// Used during construction.
58/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
59/// inserts it into `self.segments`.
60fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
61    let segment_size = cur_segment.len();
62    segments.push((*offset, Data::Literal(cur_segment.into())));
63    *offset += segment_size as u64;
64}
65
66/// Used during construction.
67/// Recursively walks the node and its children, and fills `segments` with the appropriate
68/// `Data::Literal` and `Data::Blob` elements.
69fn walk_node(
70    segments: &mut Vec<(u64, Data)>,
71    offset: &mut u64,
72    get_directory: &impl Fn(&B3Digest) -> Directory,
73    node: Node,
74    // Includes a reference to the current segment's buffer
75    nar_node: nar_writer::Node<'_, Vec<u8>>,
76) -> Result<(), RenderError> {
77    match node {
78        snix_castore::Node::Symlink { target } => {
79            nar_node
80                .symlink(target.as_ref())
81                .map_err(RenderError::NARWriterError)?;
82        }
83        snix_castore::Node::File {
84            digest,
85            size,
86            executable,
87        } => {
88            let (cur_segment, skip) = nar_node
89                .file_manual_write(executable, size)
90                .map_err(RenderError::NARWriterError)?;
91
92            // Flush the segment up until the beginning of the blob
93            flush_segment(segments, offset, std::mem::take(cur_segment));
94
95            // Insert the blob segment
96            segments.push((*offset, Data::Blob(BlobRef { digest, size })));
97            *offset += size;
98
99            // Close the file node
100            // We **intentionally** do not write the file contents anywhere.
101            // Instead we have stored the blob reference in a Data::Blob segment,
102            // and the poll_read implementation will take care of serving the
103            // appropriate blob at this offset.
104            skip.close(cur_segment)
105                .map_err(RenderError::NARWriterError)?;
106        }
107        snix_castore::Node::Directory { digest, .. } => {
108            let directory = get_directory(&digest);
109
110            // start a directory node
111            let mut nar_node_directory =
112                nar_node.directory().map_err(RenderError::NARWriterError)?;
113
114            // for each node in the directory, create a new entry with its name,
115            // and then recurse on that entry.
116            for (name, node) in directory.nodes() {
117                let child_node = nar_node_directory
118                    .entry(name.as_ref())
119                    .map_err(RenderError::NARWriterError)?;
120
121                walk_node(segments, offset, get_directory, node.clone(), child_node)?;
122            }
123
124            // close the directory
125            nar_node_directory
126                .close()
127                .map_err(RenderError::NARWriterError)?;
128        }
129    }
130    Ok(())
131}
132
133impl<B: BlobService + 'static> Reader<B> {
134    /// Creates a new seekable NAR renderer for the given castore root node.
135    ///
136    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
137    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
138    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
139    /// between serving the precomputed literal segments, and the appropriate blob for the file
140    /// contents.
141    pub async fn new(
142        root_node: Node,
143        blob_service: B,
144        directory_service: impl DirectoryService,
145    ) -> Result<Self, RenderError> {
146        let maybe_directory_closure = match &root_node {
147            // If this is a directory, resolve all subdirectories
148            Node::Directory { digest, .. } => {
149                let mut closure = DirectoryGraph::with_order(
150                    RootToLeavesValidator::new_with_root_digest(digest.clone()),
151                );
152                let mut stream = directory_service.get_recursive(digest);
153                while let Some(dir) = stream
154                    .try_next()
155                    .await
156                    .map_err(|e| RenderError::StoreError(e.into()))?
157                {
158                    closure.add(dir).map_err(|e| {
159                        RenderError::StoreError(
160                            snix_castore::Error::StorageError(e.to_string()).into(),
161                        )
162                    })?;
163                }
164                Some(closure.validate().map_err(|e| {
165                    RenderError::StoreError(snix_castore::Error::StorageError(e.to_string()).into())
166                })?)
167            }
168            // If the top-level node is a file or a symlink, just pass it on
169            Node::File { .. } => None,
170            Node::Symlink { .. } => None,
171        };
172
173        Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
174    }
175
176    /// Creates a new seekable NAR renderer for the given castore root node.
177    /// This version of the instantiation does not perform any I/O and as such is not async.
178    /// However it requires all directories to be passed as a ValidatedDirectoryGraph.
179    ///
180    /// panics if the directory closure is not the closure of the root node
181    pub fn new_with_directory_closure(
182        root_node: Node,
183        blob_service: B,
184        directory_closure: Option<ValidatedDirectoryGraph>,
185    ) -> Result<Self, RenderError> {
186        let directories = directory_closure
187            .map(|directory_closure| {
188                let mut directories: Vec<(B3Digest, Directory)> = vec![];
189                for dir in directory_closure.drain_root_to_leaves() {
190                    let digest = dir.digest();
191                    let pos = directories
192                        .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
193                            digest.as_slice()
194                        })
195                        .expect_err("duplicate directory"); // DirectoryGraph checks this
196                    directories.insert(pos, (digest, dir));
197                }
198                directories
199            })
200            .unwrap_or_default();
201
202        let mut segments = vec![];
203        let mut cur_segment: Vec<u8> = vec![];
204        let mut offset = 0;
205
206        let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
207
208        walk_node(
209            &mut segments,
210            &mut offset,
211            &|digest| {
212                directories
213                    .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
214                    .map(|pos| directories[pos].clone())
215                    .expect("missing directory") // DirectoryGraph checks this
216                    .1
217            },
218            root_node,
219            nar_node,
220        )?;
221        // Flush the final segment
222        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
223
224        Ok(Reader {
225            segments,
226            position_bytes: 0,
227            position_index: 0,
228            blob_service: blob_service.into(),
229            seeking: false,
230            current_blob: TryMaybeDone::Gone,
231        })
232    }
233
234    pub fn stream_len(&self) -> u64 {
235        self.segments
236            .last()
237            .map(|&(off, ref data)| off + data.len())
238            .expect("no segment found")
239    }
240}
241
242impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
243    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
244        let stream_len = Reader::stream_len(&self);
245
246        let this = &mut *self;
247        if this.seeking {
248            return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
249        }
250        this.seeking = true;
251
252        // TODO(edef): be sane about overflows
253        let pos = match pos {
254            io::SeekFrom::Start(n) => n,
255            io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
256            io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
257        };
258
259        let prev_position_bytes = this.position_bytes;
260        let prev_position_index = this.position_index;
261
262        this.position_bytes = min(pos, stream_len);
263        this.position_index = match this
264            .segments
265            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
266        {
267            Ok(idx) => idx,
268            Err(idx) => idx - 1,
269        };
270
271        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
272            this.segments.get(this.position_index)
273        else {
274            // If not seeking into a blob, we clear the active blob reader and then we're done
275            this.current_blob = TryMaybeDone::Gone;
276            return Ok(());
277        };
278        let offset_in_segment = this.position_bytes - offset;
279
280        if prev_position_bytes == this.position_bytes {
281            // position has not changed. do nothing
282        } else if prev_position_index == this.position_index {
283            // seeking within the same segment, re-use the blob reader
284            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
285            this.current_blob = futures::future::try_maybe_done(
286                (async move {
287                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
288                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
289                    Ok(reader)
290                })
291                .boxed(),
292            );
293        } else {
294            // seek to a different segment
295            let blob_service = this.blob_service.clone();
296            let digest = digest.clone();
297            this.current_blob = futures::future::try_maybe_done(
298                (async move {
299                    let mut reader =
300                        blob_service
301                            .open_read(&digest)
302                            .await?
303                            .ok_or(io::Error::new(
304                                io::ErrorKind::NotFound,
305                                RenderError::BlobNotFound(digest.clone(), Default::default()),
306                            ))?;
307                    if offset_in_segment != 0 {
308                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
309                    }
310                    Ok(reader)
311                })
312                .boxed(),
313            );
314        };
315
316        Ok(())
317    }
318    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
319        let this = &mut *self;
320
321        if !this.current_blob.is_terminated() {
322            futures::ready!(this.current_blob.poll_unpin(cx))?;
323        }
324        this.seeking = false;
325
326        Poll::Ready(Ok(this.position_bytes))
327    }
328}
329
330impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
331    fn poll_read(
332        mut self: Pin<&mut Self>,
333        cx: &mut Context,
334        buf: &mut tokio::io::ReadBuf,
335    ) -> Poll<io::Result<()>> {
336        let this = &mut *self;
337
338        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
339            return Poll::Ready(Ok(())); // EOF
340        };
341
342        let prev_read_buf_pos = buf.filled().len();
343        match segment {
344            Data::Literal(data) => {
345                let offset_in_segment = this.position_bytes - offset;
346                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
347                let remaining_data = data.len() - offset_in_segment;
348                let read_size = std::cmp::min(remaining_data, buf.remaining());
349                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
350            }
351            Data::Blob(BlobRef { size, .. }) => {
352                futures::ready!(this.current_blob.poll_unpin(cx))?;
353                this.seeking = false;
354                let blob = Pin::new(&mut this.current_blob)
355                    .output_mut()
356                    .expect("missing blob");
357                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
358                let read_length = buf.filled().len() - prev_read_buf_pos;
359                let maximum_expected_read_length = (offset + size) - this.position_bytes;
360                let is_eof = read_length == 0;
361                let too_much_returned = read_length as u64 > maximum_expected_read_length;
362                match (is_eof, too_much_returned) {
363                    (true, false) => {
364                        return Poll::Ready(Err(io::Error::new(
365                            io::ErrorKind::UnexpectedEof,
366                            "blob short read",
367                        )));
368                    }
369                    (false, true) => {
370                        buf.set_filled(prev_read_buf_pos);
371                        return Poll::Ready(Err(io::Error::new(
372                            io::ErrorKind::InvalidInput,
373                            "blob continued to yield data beyond end",
374                        )));
375                    }
376                    _ => {}
377                }
378            }
379        };
380        let new_read_buf_pos = buf.filled().len();
381        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
382
383        let prev_position_index = this.position_index;
384        while {
385            if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
386                (this.position_bytes - offset) >= segment.len()
387            } else {
388                false
389            }
390        } {
391            this.position_index += 1;
392        }
393        if prev_position_index != this.position_index {
394            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
395                this.segments.get(this.position_index)
396            else {
397                // If the next segment is not a blob, we clear the active blob reader and then we're done
398                this.current_blob = TryMaybeDone::Gone;
399                return Poll::Ready(Ok(()));
400            };
401
402            // The next segment is a blob, open the BlobReader
403            let blob_service = this.blob_service.clone();
404            let digest = digest.clone();
405            this.current_blob = futures::future::try_maybe_done(
406                (async move {
407                    let reader = blob_service
408                        .open_read(&digest)
409                        .await?
410                        .ok_or(io::Error::new(
411                            io::ErrorKind::NotFound,
412                            RenderError::BlobNotFound(digest.clone(), Default::default()),
413                        ))?;
414                    Ok(reader)
415                })
416                .boxed(),
417            );
418        }
419
420        Poll::Ready(Ok(()))
421    }
422}