tokio_tar/
archive.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    path::Path,
5    pin::Pin,
6    sync::{
7        atomic::{AtomicU64, Ordering},
8        Arc,
9    },
10    task::{Context, Poll},
11};
12use tokio::{
13    io::{self, AsyncRead as Read, AsyncReadExt},
14    sync::Mutex,
15};
16use tokio_stream::*;
17
18use crate::{
19    entry::{EntryFields, EntryIo},
20    error::TarError,
21    other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
22};
23
24/// A top-level representation of an archive file.
25///
26/// This archive can have an entry added to it and it can be iterated over.
27#[derive(Debug)]
28pub struct Archive<R: Read + Unpin> {
29    inner: Arc<ArchiveInner<R>>,
30}
31
32impl<R: Read + Unpin> Clone for Archive<R> {
33    fn clone(&self) -> Self {
34        Archive {
35            inner: self.inner.clone(),
36        }
37    }
38}
39
40#[derive(Debug)]
41pub struct ArchiveInner<R> {
42    pos: AtomicU64,
43    unpack_xattrs: bool,
44    preserve_permissions: bool,
45    preserve_mtime: bool,
46    ignore_zeros: bool,
47    obj: Mutex<R>,
48}
49
50/// Configure the archive.
51pub struct ArchiveBuilder<R: Read + Unpin> {
52    obj: R,
53    unpack_xattrs: bool,
54    preserve_permissions: bool,
55    preserve_mtime: bool,
56    ignore_zeros: bool,
57}
58
59impl<R: Read + Unpin> ArchiveBuilder<R> {
60    /// Create a new builder.
61    pub fn new(obj: R) -> Self {
62        ArchiveBuilder {
63            unpack_xattrs: false,
64            preserve_permissions: false,
65            preserve_mtime: true,
66            ignore_zeros: false,
67            obj,
68        }
69    }
70
71    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
72    /// when unpacking this archive.
73    ///
74    /// This flag is disabled by default and is currently only implemented on
75    /// Unix using xattr support. This may eventually be implemented for
76    /// Windows, however, if other archive implementations are found which do
77    /// this as well.
78    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
79        self.unpack_xattrs = unpack_xattrs;
80        self
81    }
82
83    /// Indicate whether extended permissions (like suid on Unix) are preserved
84    /// when unpacking this entry.
85    ///
86    /// This flag is disabled by default and is currently only implemented on
87    /// Unix.
88    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
89        self.preserve_permissions = preserve;
90        self
91    }
92
93    /// Indicate whether access time information is preserved when unpacking
94    /// this entry.
95    ///
96    /// This flag is enabled by default.
97    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
98        self.preserve_mtime = preserve;
99        self
100    }
101
102    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
103    /// entries.
104    ///
105    /// This can be used in case multiple tar archives have been concatenated together.
106    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
107        self.ignore_zeros = ignore_zeros;
108        self
109    }
110
111    /// Construct the archive, ready to accept inputs.
112    pub fn build(self) -> Archive<R> {
113        let Self {
114            unpack_xattrs,
115            preserve_permissions,
116            preserve_mtime,
117            ignore_zeros,
118            obj,
119        } = self;
120
121        Archive {
122            inner: Arc::new(ArchiveInner {
123                unpack_xattrs,
124                preserve_permissions,
125                preserve_mtime,
126                ignore_zeros,
127                obj: Mutex::new(obj),
128                pos: 0.into(),
129            }),
130        }
131    }
132}
133
134impl<R: Read + Unpin> Archive<R> {
135    /// Create a new archive with the underlying object as the reader.
136    pub fn new(obj: R) -> Archive<R> {
137        Archive {
138            inner: Arc::new(ArchiveInner {
139                unpack_xattrs: false,
140                preserve_permissions: false,
141                preserve_mtime: true,
142                ignore_zeros: false,
143                obj: Mutex::new(obj),
144                pos: 0.into(),
145            }),
146        }
147    }
148
149    /// Unwrap this archive, returning the underlying object.
150    pub fn into_inner(self) -> Result<R, Self> {
151        let Self { inner } = self;
152
153        match Arc::try_unwrap(inner) {
154            Ok(inner) => Ok(inner.obj.into_inner()),
155            Err(inner) => Err(Self { inner }),
156        }
157    }
158
159    /// Construct an stream over the entries in this archive.
160    ///
161    /// Note that care must be taken to consider each entry within an archive in
162    /// sequence. If entries are processed out of sequence (from what the
163    /// stream returns), then the contents read for each entry may be
164    /// corrupted.
165    pub fn entries(&mut self) -> io::Result<Entries<R>> {
166        if self.inner.pos.load(Ordering::SeqCst) != 0 {
167            return Err(other(
168                "cannot call entries unless archive is at \
169                 position 0",
170            ));
171        }
172
173        Ok(Entries {
174            archive: self.clone(),
175            current: (0, None, 0, None),
176            gnu_longlink: None,
177            gnu_longname: None,
178            pax_extensions: None,
179        })
180    }
181
182    /// Construct an stream over the raw entries in this archive.
183    ///
184    /// Note that care must be taken to consider each entry within an archive in
185    /// sequence. If entries are processed out of sequence (from what the
186    /// stream returns), then the contents read for each entry may be
187    /// corrupted.
188    pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
189        if self.inner.pos.load(Ordering::SeqCst) != 0 {
190            return Err(other(
191                "cannot call entries_raw unless archive is at \
192                 position 0",
193            ));
194        }
195
196        Ok(RawEntries {
197            archive: self.clone(),
198            current: (0, None, 0),
199        })
200    }
201
202    /// Unpacks the contents tarball into the specified `dst`.
203    ///
204    /// This function will iterate over the entire contents of this tarball,
205    /// extracting each file in turn to the location specified by the entry's
206    /// path name.
207    ///
208    /// This operation is relatively sensitive in that it will not write files
209    /// outside of the path specified by `dst`. Files in the archive which have
210    /// a '..' in their path are skipped during the unpacking process.
211    ///
212    /// # Examples
213    ///
214    /// ```no_run
215    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { tokio::runtime::Runtime::new().unwrap().block_on(async {
216    /// #
217    /// use tokio::fs::File;
218    /// use tokio_tar::Archive;
219    ///
220    /// let mut ar = Archive::new(File::open("foo.tar").await?);
221    /// ar.unpack("foo").await?;
222    /// #
223    /// # Ok(()) }) }
224    /// ```
225    pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
226        let mut entries = self.entries()?;
227        let mut pinned = Pin::new(&mut entries);
228        while let Some(entry) = pinned.next().await {
229            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
230            file.unpack_in(dst.as_ref()).await?;
231        }
232        Ok(())
233    }
234}
235
236/// Stream of `Entry`s.
237pub struct Entries<R: Read + Unpin> {
238    archive: Archive<R>,
239    current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
240    gnu_longname: Option<Vec<u8>>,
241    gnu_longlink: Option<Vec<u8>>,
242    pax_extensions: Option<Vec<u8>>,
243}
244
245macro_rules! ready_opt_err {
246    ($val:expr) => {
247        match futures_core::ready!($val) {
248            Some(Ok(val)) => val,
249            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
250            None => return Poll::Ready(None),
251        }
252    };
253}
254
255macro_rules! ready_err {
256    ($val:expr) => {
257        match futures_core::ready!($val) {
258            Ok(val) => val,
259            Err(err) => return Poll::Ready(Some(Err(err))),
260        }
261    };
262}
263
264impl<R: Read + Unpin> Stream for Entries<R> {
265    type Item = io::Result<Entry<Archive<R>>>;
266
267    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268        loop {
269            let archive = self.archive.clone();
270            let (next, current_header, current_header_pos, _) = &mut self.current;
271            let entry = ready_opt_err!(poll_next_raw(
272                archive,
273                next,
274                current_header,
275                current_header_pos,
276                cx
277            ));
278
279            let is_recognized_header =
280                entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
281            if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
282                if self.gnu_longname.is_some() {
283                    return Poll::Ready(Some(Err(other(
284                        "two long name entries describing \
285                         the same member",
286                    ))));
287                }
288
289                let mut ef = EntryFields::from(entry);
290                let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
291                self.gnu_longname = Some(val);
292                continue;
293            }
294
295            if is_recognized_header && entry.header().entry_type().is_gnu_longlink() {
296                if self.gnu_longlink.is_some() {
297                    return Poll::Ready(Some(Err(other(
298                        "two long name entries describing \
299                         the same member",
300                    ))));
301                }
302                let mut ef = EntryFields::from(entry);
303                let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
304                self.gnu_longlink = Some(val);
305                continue;
306            }
307
308            if is_recognized_header && entry.header().entry_type().is_pax_local_extensions() {
309                if self.pax_extensions.is_some() {
310                    return Poll::Ready(Some(Err(other(
311                        "two pax extensions entries describing \
312                         the same member",
313                    ))));
314                }
315                let mut ef = EntryFields::from(entry);
316                let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
317                self.pax_extensions = Some(val);
318                continue;
319            }
320
321            let mut fields = EntryFields::from(entry);
322            fields.long_pathname = self.gnu_longname.take();
323            fields.long_linkname = self.gnu_longlink.take();
324            fields.pax_extensions = self.pax_extensions.take();
325
326            let archive = self.archive.clone();
327            let (next, _, current_pos, current_ext) = &mut self.current;
328
329            ready_err!(poll_parse_sparse_header(
330                archive,
331                next,
332                current_ext,
333                current_pos,
334                &mut fields,
335                cx
336            ));
337
338            return Poll::Ready(Some(Ok(fields.into_entry())));
339        }
340    }
341}
342
343/// Stream of raw `Entry`s.
344pub struct RawEntries<R: Read + Unpin> {
345    archive: Archive<R>,
346    current: (u64, Option<Header>, usize),
347}
348
349impl<R: Read + Unpin> Stream for RawEntries<R> {
350    type Item = io::Result<Entry<Archive<R>>>;
351
352    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
353        let archive = self.archive.clone();
354        let (next, current_header, current_header_pos) = &mut self.current;
355        poll_next_raw(archive, next, current_header, current_header_pos, cx)
356    }
357}
358
359fn poll_next_raw<R: Read + Unpin>(
360    mut archive: Archive<R>,
361    next: &mut u64,
362    current_header: &mut Option<Header>,
363    current_header_pos: &mut usize,
364    cx: &mut Context<'_>,
365) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
366    let mut header_pos = *next;
367
368    loop {
369        // Seek to the start of the next header in the archive
370        if current_header.is_none() {
371            let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
372            match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
373                Ok(_) => {}
374                Err(err) => return Poll::Ready(Some(Err(err))),
375            }
376
377            *current_header = Some(Header::new_old());
378            *current_header_pos = 0;
379        }
380
381        let header = current_header.as_mut().unwrap();
382
383        // EOF is an indicator that we are at the end of the archive.
384        match futures_core::ready!(poll_try_read_all(
385            &mut archive,
386            cx,
387            header.as_mut_bytes(),
388            current_header_pos,
389        )) {
390            Ok(true) => {}
391            Ok(false) => return Poll::Ready(None),
392            Err(err) => return Poll::Ready(Some(Err(err))),
393        }
394
395        // If a header is not all zeros, we have another valid header.
396        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
397        // end of the archive.
398        if !header.as_bytes().iter().all(|i| *i == 0) {
399            *next += 512;
400            break;
401        }
402
403        if !archive.inner.ignore_zeros {
404            return Poll::Ready(None);
405        }
406
407        *next += 512;
408        header_pos = *next;
409    }
410
411    let header = current_header.as_mut().unwrap();
412
413    // Make sure the checksum is ok
414    let sum = header.as_bytes()[..148]
415        .iter()
416        .chain(&header.as_bytes()[156..])
417        .fold(0, |a, b| a + (*b as u32))
418        + 8 * 32;
419    let cksum = header.cksum()?;
420    if sum != cksum {
421        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
422    }
423
424    let file_pos = *next;
425    let size = header.entry_size()?;
426
427    let mut data = VecDeque::with_capacity(1);
428    data.push_back(EntryIo::Data(archive.clone().take(size)));
429
430    let header = current_header.take().unwrap();
431
432    let ret = EntryFields {
433        size,
434        header_pos,
435        file_pos,
436        data,
437        header,
438        long_pathname: None,
439        long_linkname: None,
440        pax_extensions: None,
441        unpack_xattrs: archive.inner.unpack_xattrs,
442        preserve_permissions: archive.inner.preserve_permissions,
443        preserve_mtime: archive.inner.preserve_mtime,
444        read_state: None,
445    };
446
447    // Store where the next entry is, rounding up by 512 bytes (the size of
448    // a header);
449    let size = (size + 511) & !(512 - 1);
450    *next += size;
451
452    Poll::Ready(Some(Ok(ret.into_entry())))
453}
454
455fn poll_parse_sparse_header<R: Read + Unpin>(
456    mut archive: Archive<R>,
457    next: &mut u64,
458    current_ext: &mut Option<GnuExtSparseHeader>,
459    current_ext_pos: &mut usize,
460    entry: &mut EntryFields<Archive<R>>,
461    cx: &mut Context<'_>,
462) -> Poll<io::Result<()>> {
463    if !entry.header.entry_type().is_gnu_sparse() {
464        return Poll::Ready(Ok(()));
465    }
466
467    let gnu = match entry.header.as_gnu() {
468        Some(gnu) => gnu,
469        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
470    };
471
472    // Sparse files are represented internally as a list of blocks that are
473    // read. Blocks are either a bunch of 0's or they're data from the
474    // underlying archive.
475    //
476    // Blocks of a sparse file are described by the `GnuSparseHeader`
477    // structure, some of which are contained in `GnuHeader` but some of
478    // which may also be contained after the first header in further
479    // headers.
480    //
481    // We read off all the blocks here and use the `add_block` function to
482    // incrementally add them to the list of I/O block (in `entry.data`).
483    // The `add_block` function also validates that each chunk comes after
484    // the previous, we don't overrun the end of the file, and each block is
485    // aligned to a 512-byte boundary in the archive itself.
486    //
487    // At the end we verify that the sparse file size (`Header::size`) is
488    // the same as the current offset (described by the list of blocks) as
489    // well as the amount of data read equals the size of the entry
490    // (`Header::entry_size`).
491    entry.data.truncate(0);
492
493    let mut cur = 0;
494    let mut remaining = entry.size;
495    {
496        let data = &mut entry.data;
497        let reader = archive.clone();
498        let size = entry.size;
499        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
500            if block.is_empty() {
501                return Ok(());
502            }
503            let off = block.offset()?;
504            let len = block.length()?;
505
506            if (size - remaining) % 512 != 0 {
507                return Err(other(
508                    "previous block in sparse file was not \
509                     aligned to 512-byte boundary",
510                ));
511            } else if off < cur {
512                return Err(other(
513                    "out of order or overlapping sparse \
514                     blocks",
515                ));
516            } else if cur < off {
517                let block = io::repeat(0).take(off - cur);
518                data.push_back(EntryIo::Pad(block));
519            }
520            cur = off
521                .checked_add(len)
522                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
523            remaining = remaining.checked_sub(len).ok_or_else(|| {
524                other(
525                    "sparse file consumed more data than the header \
526                     listed",
527                )
528            })?;
529            data.push_back(EntryIo::Data(reader.clone().take(len)));
530            Ok(())
531        };
532        for block in gnu.sparse.iter() {
533            add_block(block)?
534        }
535        if gnu.is_extended() {
536            let started_header = current_ext.is_some();
537            if !started_header {
538                let mut ext = GnuExtSparseHeader::new();
539                ext.isextended[0] = 1;
540                *current_ext = Some(ext);
541                *current_ext_pos = 0;
542            }
543
544            let ext = current_ext.as_mut().unwrap();
545            while ext.is_extended() {
546                match futures_core::ready!(poll_try_read_all(
547                    &mut archive,
548                    cx,
549                    ext.as_mut_bytes(),
550                    current_ext_pos,
551                )) {
552                    Ok(true) => {}
553                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
554                    Err(err) => return Poll::Ready(Err(err)),
555                }
556
557                *next += 512;
558                for block in ext.sparse.iter() {
559                    add_block(block)?;
560                }
561            }
562        }
563    }
564    if cur != gnu.real_size()? {
565        return Poll::Ready(Err(other(
566            "mismatch in sparse file chunks and \
567             size in header",
568        )));
569    }
570    entry.size = cur;
571    if remaining > 0 {
572        return Poll::Ready(Err(other(
573            "mismatch in sparse file chunks and \
574             entry size in header",
575        )));
576    }
577
578    Poll::Ready(Ok(()))
579}
580
581impl<R: Read + Unpin> Read for Archive<R> {
582    fn poll_read(
583        self: Pin<&mut Self>,
584        cx: &mut Context<'_>,
585        into: &mut io::ReadBuf<'_>,
586    ) -> Poll<io::Result<()>> {
587        let mut r = if let Ok(v) = self.inner.obj.try_lock() {
588            v
589        } else {
590            return Poll::Pending;
591        };
592
593        let res = futures_core::ready!(Pin::new(&mut *r).poll_read(cx, into));
594        match res {
595            Ok(()) => {
596                self.inner
597                    .pos
598                    .fetch_add(into.filled().len() as u64, Ordering::SeqCst);
599                Poll::Ready(Ok(()))
600            }
601            Err(err) => Poll::Ready(Err(err)),
602        }
603    }
604}
605
606/// Try to fill the buffer from the reader.
607///
608/// If the reader reaches its end before filling the buffer at all, returns `false`.
609/// Otherwise returns `true`.
610fn poll_try_read_all<R: Read + Unpin>(
611    mut source: R,
612    cx: &mut Context<'_>,
613    buf: &mut [u8],
614    pos: &mut usize,
615) -> Poll<io::Result<bool>> {
616    while *pos < buf.len() {
617        let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
618        match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
619            Ok(()) if read_buf.filled().is_empty() => {
620                if *pos == 0 {
621                    return Poll::Ready(Ok(false));
622                }
623
624                return Poll::Ready(Err(other("failed to read entire block")));
625            }
626            Ok(()) => *pos += read_buf.filled().len(),
627            Err(err) => return Poll::Ready(Err(err)),
628        }
629    }
630
631    *pos = 0;
632    Poll::Ready(Ok(true))
633}
634
635/// Skip n bytes on the given source.
636fn poll_skip<R: Read + Unpin>(
637    mut source: R,
638    cx: &mut Context<'_>,
639    mut amt: u64,
640) -> Poll<io::Result<()>> {
641    let mut buf = [0u8; 4096 * 8];
642    while amt > 0 {
643        let n = cmp::min(amt, buf.len() as u64);
644        let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
645        match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
646            Ok(()) if read_buf.filled().is_empty() => {
647                return Poll::Ready(Err(other("unexpected EOF during skip")));
648            }
649            Ok(()) => {
650                amt -= read_buf.filled().len() as u64;
651            }
652            Err(err) => return Poll::Ready(Err(err)),
653        }
654    }
655
656    Poll::Ready(Ok(()))
657}