1use std::{
2 cmp,
3 collections::VecDeque,
4 path::Path,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicU64, Ordering},
8 Arc,
9 },
10 task::{Context, Poll},
11};
12use tokio::{
13 io::{self, AsyncRead as Read, AsyncReadExt},
14 sync::Mutex,
15};
16use tokio_stream::*;
17
18use crate::{
19 entry::{EntryFields, EntryIo},
20 error::TarError,
21 other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
22};
23
24#[derive(Debug)]
28pub struct Archive<R: Read + Unpin> {
29 inner: Arc<ArchiveInner<R>>,
30}
31
32impl<R: Read + Unpin> Clone for Archive<R> {
33 fn clone(&self) -> Self {
34 Archive {
35 inner: self.inner.clone(),
36 }
37 }
38}
39
40#[derive(Debug)]
41pub struct ArchiveInner<R> {
42 pos: AtomicU64,
43 unpack_xattrs: bool,
44 preserve_permissions: bool,
45 preserve_mtime: bool,
46 ignore_zeros: bool,
47 obj: Mutex<R>,
48}
49
50pub struct ArchiveBuilder<R: Read + Unpin> {
52 obj: R,
53 unpack_xattrs: bool,
54 preserve_permissions: bool,
55 preserve_mtime: bool,
56 ignore_zeros: bool,
57}
58
59impl<R: Read + Unpin> ArchiveBuilder<R> {
60 pub fn new(obj: R) -> Self {
62 ArchiveBuilder {
63 unpack_xattrs: false,
64 preserve_permissions: false,
65 preserve_mtime: true,
66 ignore_zeros: false,
67 obj,
68 }
69 }
70
71 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
79 self.unpack_xattrs = unpack_xattrs;
80 self
81 }
82
83 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
89 self.preserve_permissions = preserve;
90 self
91 }
92
93 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
98 self.preserve_mtime = preserve;
99 self
100 }
101
102 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
107 self.ignore_zeros = ignore_zeros;
108 self
109 }
110
111 pub fn build(self) -> Archive<R> {
113 let Self {
114 unpack_xattrs,
115 preserve_permissions,
116 preserve_mtime,
117 ignore_zeros,
118 obj,
119 } = self;
120
121 Archive {
122 inner: Arc::new(ArchiveInner {
123 unpack_xattrs,
124 preserve_permissions,
125 preserve_mtime,
126 ignore_zeros,
127 obj: Mutex::new(obj),
128 pos: 0.into(),
129 }),
130 }
131 }
132}
133
134impl<R: Read + Unpin> Archive<R> {
135 pub fn new(obj: R) -> Archive<R> {
137 Archive {
138 inner: Arc::new(ArchiveInner {
139 unpack_xattrs: false,
140 preserve_permissions: false,
141 preserve_mtime: true,
142 ignore_zeros: false,
143 obj: Mutex::new(obj),
144 pos: 0.into(),
145 }),
146 }
147 }
148
149 pub fn into_inner(self) -> Result<R, Self> {
151 let Self { inner } = self;
152
153 match Arc::try_unwrap(inner) {
154 Ok(inner) => Ok(inner.obj.into_inner()),
155 Err(inner) => Err(Self { inner }),
156 }
157 }
158
159 pub fn entries(&mut self) -> io::Result<Entries<R>> {
166 if self.inner.pos.load(Ordering::SeqCst) != 0 {
167 return Err(other(
168 "cannot call entries unless archive is at \
169 position 0",
170 ));
171 }
172
173 Ok(Entries {
174 archive: self.clone(),
175 current: (0, None, 0, None),
176 gnu_longlink: None,
177 gnu_longname: None,
178 pax_extensions: None,
179 })
180 }
181
182 pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
189 if self.inner.pos.load(Ordering::SeqCst) != 0 {
190 return Err(other(
191 "cannot call entries_raw unless archive is at \
192 position 0",
193 ));
194 }
195
196 Ok(RawEntries {
197 archive: self.clone(),
198 current: (0, None, 0),
199 })
200 }
201
202 pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
226 let mut entries = self.entries()?;
227 let mut pinned = Pin::new(&mut entries);
228 while let Some(entry) = pinned.next().await {
229 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
230 file.unpack_in(dst.as_ref()).await?;
231 }
232 Ok(())
233 }
234}
235
236pub struct Entries<R: Read + Unpin> {
238 archive: Archive<R>,
239 current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
240 gnu_longname: Option<Vec<u8>>,
241 gnu_longlink: Option<Vec<u8>>,
242 pax_extensions: Option<Vec<u8>>,
243}
244
245macro_rules! ready_opt_err {
246 ($val:expr) => {
247 match futures_core::ready!($val) {
248 Some(Ok(val)) => val,
249 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
250 None => return Poll::Ready(None),
251 }
252 };
253}
254
255macro_rules! ready_err {
256 ($val:expr) => {
257 match futures_core::ready!($val) {
258 Ok(val) => val,
259 Err(err) => return Poll::Ready(Some(Err(err))),
260 }
261 };
262}
263
264impl<R: Read + Unpin> Stream for Entries<R> {
265 type Item = io::Result<Entry<Archive<R>>>;
266
267 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268 loop {
269 let archive = self.archive.clone();
270 let (next, current_header, current_header_pos, _) = &mut self.current;
271 let entry = ready_opt_err!(poll_next_raw(
272 archive,
273 next,
274 current_header,
275 current_header_pos,
276 cx
277 ));
278
279 let is_recognized_header =
280 entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
281 if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
282 if self.gnu_longname.is_some() {
283 return Poll::Ready(Some(Err(other(
284 "two long name entries describing \
285 the same member",
286 ))));
287 }
288
289 let mut ef = EntryFields::from(entry);
290 let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
291 self.gnu_longname = Some(val);
292 continue;
293 }
294
295 if is_recognized_header && entry.header().entry_type().is_gnu_longlink() {
296 if self.gnu_longlink.is_some() {
297 return Poll::Ready(Some(Err(other(
298 "two long name entries describing \
299 the same member",
300 ))));
301 }
302 let mut ef = EntryFields::from(entry);
303 let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
304 self.gnu_longlink = Some(val);
305 continue;
306 }
307
308 if is_recognized_header && entry.header().entry_type().is_pax_local_extensions() {
309 if self.pax_extensions.is_some() {
310 return Poll::Ready(Some(Err(other(
311 "two pax extensions entries describing \
312 the same member",
313 ))));
314 }
315 let mut ef = EntryFields::from(entry);
316 let val = ready_err!(Pin::new(&mut ef).poll_read_all(cx));
317 self.pax_extensions = Some(val);
318 continue;
319 }
320
321 let mut fields = EntryFields::from(entry);
322 fields.long_pathname = self.gnu_longname.take();
323 fields.long_linkname = self.gnu_longlink.take();
324 fields.pax_extensions = self.pax_extensions.take();
325
326 let archive = self.archive.clone();
327 let (next, _, current_pos, current_ext) = &mut self.current;
328
329 ready_err!(poll_parse_sparse_header(
330 archive,
331 next,
332 current_ext,
333 current_pos,
334 &mut fields,
335 cx
336 ));
337
338 return Poll::Ready(Some(Ok(fields.into_entry())));
339 }
340 }
341}
342
343pub struct RawEntries<R: Read + Unpin> {
345 archive: Archive<R>,
346 current: (u64, Option<Header>, usize),
347}
348
349impl<R: Read + Unpin> Stream for RawEntries<R> {
350 type Item = io::Result<Entry<Archive<R>>>;
351
352 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
353 let archive = self.archive.clone();
354 let (next, current_header, current_header_pos) = &mut self.current;
355 poll_next_raw(archive, next, current_header, current_header_pos, cx)
356 }
357}
358
359fn poll_next_raw<R: Read + Unpin>(
360 mut archive: Archive<R>,
361 next: &mut u64,
362 current_header: &mut Option<Header>,
363 current_header_pos: &mut usize,
364 cx: &mut Context<'_>,
365) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
366 let mut header_pos = *next;
367
368 loop {
369 if current_header.is_none() {
371 let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
372 match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
373 Ok(_) => {}
374 Err(err) => return Poll::Ready(Some(Err(err))),
375 }
376
377 *current_header = Some(Header::new_old());
378 *current_header_pos = 0;
379 }
380
381 let header = current_header.as_mut().unwrap();
382
383 match futures_core::ready!(poll_try_read_all(
385 &mut archive,
386 cx,
387 header.as_mut_bytes(),
388 current_header_pos,
389 )) {
390 Ok(true) => {}
391 Ok(false) => return Poll::Ready(None),
392 Err(err) => return Poll::Ready(Some(Err(err))),
393 }
394
395 if !header.as_bytes().iter().all(|i| *i == 0) {
399 *next += 512;
400 break;
401 }
402
403 if !archive.inner.ignore_zeros {
404 return Poll::Ready(None);
405 }
406
407 *next += 512;
408 header_pos = *next;
409 }
410
411 let header = current_header.as_mut().unwrap();
412
413 let sum = header.as_bytes()[..148]
415 .iter()
416 .chain(&header.as_bytes()[156..])
417 .fold(0, |a, b| a + (*b as u32))
418 + 8 * 32;
419 let cksum = header.cksum()?;
420 if sum != cksum {
421 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
422 }
423
424 let file_pos = *next;
425 let size = header.entry_size()?;
426
427 let mut data = VecDeque::with_capacity(1);
428 data.push_back(EntryIo::Data(archive.clone().take(size)));
429
430 let header = current_header.take().unwrap();
431
432 let ret = EntryFields {
433 size,
434 header_pos,
435 file_pos,
436 data,
437 header,
438 long_pathname: None,
439 long_linkname: None,
440 pax_extensions: None,
441 unpack_xattrs: archive.inner.unpack_xattrs,
442 preserve_permissions: archive.inner.preserve_permissions,
443 preserve_mtime: archive.inner.preserve_mtime,
444 read_state: None,
445 };
446
447 let size = (size + 511) & !(512 - 1);
450 *next += size;
451
452 Poll::Ready(Some(Ok(ret.into_entry())))
453}
454
455fn poll_parse_sparse_header<R: Read + Unpin>(
456 mut archive: Archive<R>,
457 next: &mut u64,
458 current_ext: &mut Option<GnuExtSparseHeader>,
459 current_ext_pos: &mut usize,
460 entry: &mut EntryFields<Archive<R>>,
461 cx: &mut Context<'_>,
462) -> Poll<io::Result<()>> {
463 if !entry.header.entry_type().is_gnu_sparse() {
464 return Poll::Ready(Ok(()));
465 }
466
467 let gnu = match entry.header.as_gnu() {
468 Some(gnu) => gnu,
469 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
470 };
471
472 entry.data.truncate(0);
492
493 let mut cur = 0;
494 let mut remaining = entry.size;
495 {
496 let data = &mut entry.data;
497 let reader = archive.clone();
498 let size = entry.size;
499 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
500 if block.is_empty() {
501 return Ok(());
502 }
503 let off = block.offset()?;
504 let len = block.length()?;
505
506 if (size - remaining) % 512 != 0 {
507 return Err(other(
508 "previous block in sparse file was not \
509 aligned to 512-byte boundary",
510 ));
511 } else if off < cur {
512 return Err(other(
513 "out of order or overlapping sparse \
514 blocks",
515 ));
516 } else if cur < off {
517 let block = io::repeat(0).take(off - cur);
518 data.push_back(EntryIo::Pad(block));
519 }
520 cur = off
521 .checked_add(len)
522 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
523 remaining = remaining.checked_sub(len).ok_or_else(|| {
524 other(
525 "sparse file consumed more data than the header \
526 listed",
527 )
528 })?;
529 data.push_back(EntryIo::Data(reader.clone().take(len)));
530 Ok(())
531 };
532 for block in gnu.sparse.iter() {
533 add_block(block)?
534 }
535 if gnu.is_extended() {
536 let started_header = current_ext.is_some();
537 if !started_header {
538 let mut ext = GnuExtSparseHeader::new();
539 ext.isextended[0] = 1;
540 *current_ext = Some(ext);
541 *current_ext_pos = 0;
542 }
543
544 let ext = current_ext.as_mut().unwrap();
545 while ext.is_extended() {
546 match futures_core::ready!(poll_try_read_all(
547 &mut archive,
548 cx,
549 ext.as_mut_bytes(),
550 current_ext_pos,
551 )) {
552 Ok(true) => {}
553 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
554 Err(err) => return Poll::Ready(Err(err)),
555 }
556
557 *next += 512;
558 for block in ext.sparse.iter() {
559 add_block(block)?;
560 }
561 }
562 }
563 }
564 if cur != gnu.real_size()? {
565 return Poll::Ready(Err(other(
566 "mismatch in sparse file chunks and \
567 size in header",
568 )));
569 }
570 entry.size = cur;
571 if remaining > 0 {
572 return Poll::Ready(Err(other(
573 "mismatch in sparse file chunks and \
574 entry size in header",
575 )));
576 }
577
578 Poll::Ready(Ok(()))
579}
580
581impl<R: Read + Unpin> Read for Archive<R> {
582 fn poll_read(
583 self: Pin<&mut Self>,
584 cx: &mut Context<'_>,
585 into: &mut io::ReadBuf<'_>,
586 ) -> Poll<io::Result<()>> {
587 let mut r = if let Ok(v) = self.inner.obj.try_lock() {
588 v
589 } else {
590 return Poll::Pending;
591 };
592
593 let res = futures_core::ready!(Pin::new(&mut *r).poll_read(cx, into));
594 match res {
595 Ok(()) => {
596 self.inner
597 .pos
598 .fetch_add(into.filled().len() as u64, Ordering::SeqCst);
599 Poll::Ready(Ok(()))
600 }
601 Err(err) => Poll::Ready(Err(err)),
602 }
603 }
604}
605
606fn poll_try_read_all<R: Read + Unpin>(
611 mut source: R,
612 cx: &mut Context<'_>,
613 buf: &mut [u8],
614 pos: &mut usize,
615) -> Poll<io::Result<bool>> {
616 while *pos < buf.len() {
617 let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
618 match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
619 Ok(()) if read_buf.filled().is_empty() => {
620 if *pos == 0 {
621 return Poll::Ready(Ok(false));
622 }
623
624 return Poll::Ready(Err(other("failed to read entire block")));
625 }
626 Ok(()) => *pos += read_buf.filled().len(),
627 Err(err) => return Poll::Ready(Err(err)),
628 }
629 }
630
631 *pos = 0;
632 Poll::Ready(Ok(true))
633}
634
635fn poll_skip<R: Read + Unpin>(
637 mut source: R,
638 cx: &mut Context<'_>,
639 mut amt: u64,
640) -> Poll<io::Result<()>> {
641 let mut buf = [0u8; 4096 * 8];
642 while amt > 0 {
643 let n = cmp::min(amt, buf.len() as u64);
644 let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
645 match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
646 Ok(()) if read_buf.filled().is_empty() => {
647 return Poll::Ready(Err(other("unexpected EOF during skip")));
648 }
649 Ok(()) => {
650 amt -= read_buf.filled().len() as u64;
651 }
652 Err(err) => return Poll::Ready(Err(err)),
653 }
654 }
655
656 Poll::Ready(Ok(()))
657}