nix_compat/nar/reader/async/
mod.rs1use std::{
2 mem::MaybeUninit,
3 pin::Pin,
4 task::{self, Poll},
5};
6
7use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData};
8
9use crate::{
11 nar::{self, wire::PadPar},
12 wire::{self, BytesReader},
13};
14
15mod read;
16#[cfg(test)]
17mod test;
18
19pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a;
20
21pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
23 read::token(reader, &nar::wire::TOK_NAR).await?;
24 Node::new(reader).await
25}
26
27pub enum Node<'a, 'r: 'a> {
28 Symlink {
29 target: Vec<u8>,
30 },
31 File {
32 executable: bool,
33 reader: FileReader<'a, 'r>,
34 },
35 Directory(DirReader<'a, 'r>),
36}
37
38impl<'a, 'r: 'a> Node<'a, 'r> {
39 async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
44 Ok(match read::tag(reader).await? {
45 nar::wire::Node::Sym => {
46 let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?;
47
48 if target.contains(&0) {
49 return Err(InvalidData.into());
50 }
51
52 read::token(reader, &nar::wire::TOK_PAR).await?;
53
54 Node::Symlink { target }
55 }
56 tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File {
57 executable: tag == nar::wire::Node::Exe,
58 reader: FileReader {
59 inner: BytesReader::new_internal(reader, ..).await?,
60 },
61 },
62 nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)),
63 })
64 }
65}
66
67pub struct FileReader<'a, 'r> {
75 inner: BytesReader<&'a mut Reader<'r>, PadPar>,
76}
77
78impl FileReader<'_, '_> {
79 pub fn is_empty(&self) -> bool {
80 self.len() == 0
81 }
82
83 pub fn len(&self) -> u64 {
84 self.inner.len()
85 }
86}
87
88impl AsyncRead for FileReader<'_, '_> {
89 fn poll_read(
90 self: Pin<&mut Self>,
91 cx: &mut task::Context,
92 buf: &mut io::ReadBuf,
93 ) -> Poll<io::Result<()>> {
94 Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
95 }
96}
97
98impl AsyncBufRead for FileReader<'_, '_> {
99 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
100 Pin::new(&mut self.get_mut().inner).poll_fill_buf(cx)
101 }
102
103 fn consume(self: Pin<&mut Self>, amt: usize) {
104 Pin::new(&mut self.get_mut().inner).consume(amt)
105 }
106}
107
108pub struct DirReader<'a, 'r> {
111 reader: &'a mut Reader<'r>,
112 prev_name: Vec<u8>,
115}
116
117pub struct Entry<'a, 'r> {
118 pub name: &'a [u8],
119 pub node: Node<'a, 'r>,
120}
121
122impl<'a, 'r> DirReader<'a, 'r> {
123 fn new(reader: &'a mut Reader<'r>) -> Self {
124 Self {
125 reader,
126 prev_name: vec![],
127 }
128 }
129
130 pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
140 if !self.prev_name.is_empty() {
143 read::token(self.reader, &nar::wire::TOK_PAR).await?;
144 }
145
146 if let nar::wire::Entry::None = read::tag(self.reader).await? {
147 return Ok(None);
148 }
149
150 let mut name = [MaybeUninit::uninit(); nar::wire::MAX_NAME_LEN + 1];
151 let name =
152 wire::read_bytes_buf(self.reader, &mut name, 1..=nar::wire::MAX_NAME_LEN).await?;
153
154 if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." {
155 return Err(InvalidData.into());
156 }
157
158 if &self.prev_name[..] >= name {
160 return Err(InvalidData.into());
161 }
162
163 self.prev_name.clear();
164 self.prev_name.extend_from_slice(name);
165
166 read::token(self.reader, &nar::wire::TOK_NOD).await?;
167
168 Ok(Some(Entry {
169 name: &self.prev_name,
170 node: Node::new(self.reader).await?,
171 }))
172 }
173}