snix_store/nar/
seekable.rs

1use std::{
2    cmp::min,
3    io,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use super::RenderError;
10
11use bytes::{BufMut, Bytes};
12
13use nix_compat::nar::writer::sync as nar_writer;
14use snix_castore::directoryservice::{DirectoryGraph, DirectoryService};
15use snix_castore::{B3Digest, Node};
16use snix_castore::{Directory, directoryservice::DirectoryOrder};
17use snix_castore::{
18    blobservice::{BlobReader, BlobService},
19    directoryservice::DirectoryGraphBuilder,
20};
21
22use futures::FutureExt;
23use futures::TryStreamExt;
24use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
25
26use tokio::io::AsyncSeekExt;
27
28#[derive(Debug)]
29struct BlobRef {
30    digest: B3Digest,
31    size: u64,
32}
33
34#[derive(Debug)]
35enum Data {
36    Literal(Bytes),
37    Blob(BlobRef),
38}
39
40impl Data {
41    pub fn len(&self) -> u64 {
42        match self {
43            Data::Literal(data) => data.len() as u64,
44            Data::Blob(BlobRef { size, .. }) => *size,
45        }
46    }
47}
48
49pub struct Reader<B: BlobService> {
50    segments: Vec<(u64, Data)>,
51    position_bytes: u64,
52    position_index: usize,
53    blob_service: Arc<B>,
54    seeking: bool,
55    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
56}
57
58/// Used during construction.
59/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
60/// inserts it into `self.segments`.
61fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
62    let segment_size = cur_segment.len();
63    segments.push((*offset, Data::Literal(cur_segment.into())));
64    *offset += segment_size as u64;
65}
66
67/// Used during construction.
68/// Recursively walks the node and its children, and fills `segments` with the appropriate
69/// `Data::Literal` and `Data::Blob` elements.
70fn walk_node(
71    segments: &mut Vec<(u64, Data)>,
72    offset: &mut u64,
73    get_directory: &impl Fn(&B3Digest) -> Directory,
74    node: Node,
75    // Includes a reference to the current segment's buffer
76    nar_node: nar_writer::Node<'_, Vec<u8>>,
77) -> Result<(), RenderError> {
78    match node {
79        snix_castore::Node::Symlink { target } => {
80            nar_node
81                .symlink(target.as_ref())
82                .map_err(RenderError::NARWriterError)?;
83        }
84        snix_castore::Node::File {
85            digest,
86            size,
87            executable,
88        } => {
89            let (cur_segment, skip) = nar_node
90                .file_manual_write(executable, size)
91                .map_err(RenderError::NARWriterError)?;
92
93            // Flush the segment up until the beginning of the blob
94            flush_segment(segments, offset, std::mem::take(cur_segment));
95
96            // Insert the blob segment
97            segments.push((*offset, Data::Blob(BlobRef { digest, size })));
98            *offset += size;
99
100            // Close the file node
101            // We **intentionally** do not write the file contents anywhere.
102            // Instead we have stored the blob reference in a Data::Blob segment,
103            // and the poll_read implementation will take care of serving the
104            // appropriate blob at this offset.
105            skip.close(cur_segment)
106                .map_err(RenderError::NARWriterError)?;
107        }
108        snix_castore::Node::Directory { digest, .. } => {
109            let directory = get_directory(&digest);
110
111            // start a directory node
112            let mut nar_node_directory =
113                nar_node.directory().map_err(RenderError::NARWriterError)?;
114
115            // for each node in the directory, create a new entry with its name,
116            // and then recurse on that entry.
117            for (name, node) in directory.nodes() {
118                let child_node = nar_node_directory
119                    .entry(name.as_ref())
120                    .map_err(RenderError::NARWriterError)?;
121
122                walk_node(segments, offset, get_directory, node.clone(), child_node)?;
123            }
124
125            // close the directory
126            nar_node_directory
127                .close()
128                .map_err(RenderError::NARWriterError)?;
129        }
130    }
131    Ok(())
132}
133
134impl<B: BlobService + 'static> Reader<B> {
135    /// Creates a new seekable NAR renderer for the given castore root node.
136    ///
137    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
138    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
139    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
140    /// between serving the precomputed literal segments, and the appropriate blob for the file
141    /// contents.
142    pub async fn new(
143        root_node: Node,
144        blob_service: B,
145        directory_service: impl DirectoryService,
146    ) -> Result<Self, RenderError> {
147        let maybe_directory_closure = match &root_node {
148            // If this is a directory, resolve all subdirectories
149            Node::Directory { digest, .. } => {
150                let mut builder =
151                    DirectoryGraphBuilder::new_with_insertion_order(DirectoryOrder::RootToLeaves);
152                let mut directories = directory_service.get_recursive(digest);
153                while let Some(dir) = directories
154                    .try_next()
155                    .await
156                    .map_err(|e| RenderError::StoreError(e.into()))?
157                {
158                    builder.insert(dir).map_err(|e| {
159                        RenderError::StoreError(
160                            snix_castore::Error::StorageError(e.to_string()).into(),
161                        )
162                    })?;
163                }
164
165                Some(builder.build().map_err(|e| {
166                    RenderError::StoreError(snix_castore::Error::StorageError(e.to_string()).into())
167                })?)
168            }
169            // If the top-level node is a file or a symlink, just pass it on
170            Node::File { .. } => None,
171            Node::Symlink { .. } => None,
172        };
173
174        Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
175    }
176
177    /// Creates a new seekable NAR renderer for the given castore root node.
178    /// This version of the instantiation does not perform any I/O and as such is not async.
179    /// However it requires all directories to be passed as a ValidatedDirectoryGraph.
180    ///
181    /// panics if the directory closure is not the closure of the root node
182    pub fn new_with_directory_closure(
183        root_node: Node,
184        blob_service: B,
185        directory_closure: Option<DirectoryGraph>,
186    ) -> Result<Self, RenderError> {
187        let directories_sorted = directory_closure
188            .map(|directory_closure| {
189                let mut directories_sorted: Vec<(B3Digest, Directory)> = vec![];
190                // Drain order doesn't really matter
191                for dir in directory_closure.drain(DirectoryOrder::RootToLeaves) {
192                    let digest = dir.digest();
193                    let pos = directories_sorted
194                        .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
195                            digest.as_slice()
196                        })
197                        .expect_err("duplicate directory"); // DirectoryGraph checks this
198                    directories_sorted.insert(pos, (digest, dir));
199                }
200                directories_sorted
201            })
202            .unwrap_or_default();
203
204        let mut segments = vec![];
205        let mut cur_segment: Vec<u8> = vec![];
206        let mut offset = 0;
207
208        let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
209
210        walk_node(
211            &mut segments,
212            &mut offset,
213            &|digest| {
214                directories_sorted
215                    .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
216                    .map(|pos| directories_sorted[pos].clone())
217                    .expect("missing directory") // DirectoryGraph checks this
218                    .1
219            },
220            root_node,
221            nar_node,
222        )?;
223        // Flush the final segment
224        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
225
226        Ok(Reader {
227            segments,
228            position_bytes: 0,
229            position_index: 0,
230            blob_service: blob_service.into(),
231            seeking: false,
232            current_blob: TryMaybeDone::Gone,
233        })
234    }
235
236    pub fn stream_len(&self) -> u64 {
237        self.segments
238            .last()
239            .map(|&(off, ref data)| off + data.len())
240            .expect("no segment found")
241    }
242}
243
244impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
245    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
246        let stream_len = Reader::stream_len(&self);
247
248        let this = &mut *self;
249        if this.seeking {
250            return Err(io::Error::other("Already seeking"));
251        }
252        this.seeking = true;
253
254        // TODO(edef): be sane about overflows
255        let pos = match pos {
256            io::SeekFrom::Start(n) => n,
257            io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
258            io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
259        };
260
261        let prev_position_bytes = this.position_bytes;
262        let prev_position_index = this.position_index;
263
264        this.position_bytes = min(pos, stream_len);
265        this.position_index = match this
266            .segments
267            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
268        {
269            Ok(idx) => idx,
270            Err(idx) => idx - 1,
271        };
272
273        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
274            this.segments.get(this.position_index)
275        else {
276            // If not seeking into a blob, we clear the active blob reader and then we're done
277            this.current_blob = TryMaybeDone::Gone;
278            return Ok(());
279        };
280        let offset_in_segment = this.position_bytes - offset;
281
282        if prev_position_bytes == this.position_bytes {
283            // position has not changed. do nothing
284        } else if prev_position_index == this.position_index {
285            // seeking within the same segment, re-use the blob reader
286            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
287            this.current_blob = futures::future::try_maybe_done(
288                (async move {
289                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
290                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
291                    Ok(reader)
292                })
293                .boxed(),
294            );
295        } else {
296            // seek to a different segment
297            let blob_service = this.blob_service.clone();
298            let digest = digest.clone();
299            this.current_blob = futures::future::try_maybe_done(
300                (async move {
301                    let mut reader =
302                        blob_service
303                            .open_read(&digest)
304                            .await?
305                            .ok_or(io::Error::new(
306                                io::ErrorKind::NotFound,
307                                RenderError::BlobNotFound(digest.clone(), Default::default()),
308                            ))?;
309                    if offset_in_segment != 0 {
310                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
311                    }
312                    Ok(reader)
313                })
314                .boxed(),
315            );
316        };
317
318        Ok(())
319    }
320    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
321        let this = &mut *self;
322
323        if !this.current_blob.is_terminated() {
324            futures::ready!(this.current_blob.poll_unpin(cx))?;
325        }
326        this.seeking = false;
327
328        Poll::Ready(Ok(this.position_bytes))
329    }
330}
331
332impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
333    fn poll_read(
334        mut self: Pin<&mut Self>,
335        cx: &mut Context,
336        buf: &mut tokio::io::ReadBuf,
337    ) -> Poll<io::Result<()>> {
338        let this = &mut *self;
339
340        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
341            return Poll::Ready(Ok(())); // EOF
342        };
343
344        let prev_read_buf_pos = buf.filled().len();
345        match segment {
346            Data::Literal(data) => {
347                let offset_in_segment = this.position_bytes - offset;
348                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
349                let remaining_data = data.len() - offset_in_segment;
350                let read_size = std::cmp::min(remaining_data, buf.remaining());
351                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
352            }
353            Data::Blob(BlobRef { size, .. }) => {
354                futures::ready!(this.current_blob.poll_unpin(cx))?;
355                this.seeking = false;
356                let blob = Pin::new(&mut this.current_blob)
357                    .output_mut()
358                    .expect("missing blob");
359                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
360                let read_length = buf.filled().len() - prev_read_buf_pos;
361                let maximum_expected_read_length = (offset + size) - this.position_bytes;
362                let is_eof = read_length == 0;
363                let too_much_returned = read_length as u64 > maximum_expected_read_length;
364                match (is_eof, too_much_returned) {
365                    (true, false) => {
366                        return Poll::Ready(Err(io::Error::new(
367                            io::ErrorKind::UnexpectedEof,
368                            "blob short read",
369                        )));
370                    }
371                    (false, true) => {
372                        buf.set_filled(prev_read_buf_pos);
373                        return Poll::Ready(Err(io::Error::new(
374                            io::ErrorKind::InvalidInput,
375                            "blob continued to yield data beyond end",
376                        )));
377                    }
378                    _ => {}
379                }
380            }
381        };
382        let new_read_buf_pos = buf.filled().len();
383        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
384
385        let prev_position_index = this.position_index;
386        while {
387            if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
388                (this.position_bytes - offset) >= segment.len()
389            } else {
390                false
391            }
392        } {
393            this.position_index += 1;
394        }
395        if prev_position_index != this.position_index {
396            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
397                this.segments.get(this.position_index)
398            else {
399                // If the next segment is not a blob, we clear the active blob reader and then we're done
400                this.current_blob = TryMaybeDone::Gone;
401                return Poll::Ready(Ok(()));
402            };
403
404            // The next segment is a blob, open the BlobReader
405            let blob_service = this.blob_service.clone();
406            let digest = digest.clone();
407            this.current_blob = futures::future::try_maybe_done(
408                (async move {
409                    let reader = blob_service
410                        .open_read(&digest)
411                        .await?
412                        .ok_or(io::Error::new(
413                            io::ErrorKind::NotFound,
414                            RenderError::BlobNotFound(digest.clone(), Default::default()),
415                        ))?;
416                    Ok(reader)
417                })
418                .boxed(),
419            );
420        }
421
422        Poll::Ready(Ok(()))
423    }
424}