fastcdc/v2020/
async_stream_cdc.rs

1//
2// Copyright (c) 2023 Nathan Fiedler
3//
4
5use super::*;
6
7#[cfg(all(feature = "futures", not(feature = "tokio")))]
8use futures::{
9    io::{AsyncRead, AsyncReadExt},
10    stream::Stream,
11};
12
13#[cfg(all(feature = "tokio", not(feature = "futures")))]
14use tokio_stream::Stream;
15
16#[cfg(all(feature = "tokio", not(feature = "futures")))]
17use tokio::io::{AsyncRead, AsyncReadExt};
18
19#[cfg(all(feature = "tokio", not(feature = "futures")))]
20use async_stream::try_stream;
21
22///
23/// An async-streamable version of the FastCDC chunker implementation from 2020
24/// with streaming support.
25///
26/// Use `new` to construct an instance, and then `as_stream` to produce an async
27/// [Stream] of the chunks.
28///
29/// Both `futures` and `tokio`-based [AsyncRead] inputs are supported via
30/// feature flags. But, if necessary you can also use the
31/// [`async_compat`](https://docs.rs/async-compat/latest/async_compat/) crate to
32/// adapt your inputs as circumstances may require.
33///
34/// Note that this struct allocates a `Vec<u8>` of `max_size` bytes to act as a
35/// buffer when reading from the source and finding chunk boundaries.
36///
37/// ```no_run
38/// # use std::fs::File;
39/// # use fastcdc::v2020::AsyncStreamCDC;
40/// # #[cfg(all(feature = "futures", not(feature = "tokio")))]
41/// # use futures::stream::StreamExt;
42/// # #[cfg(all(feature = "tokio", not(feature = "futures")))]
43/// # use tokio_stream::StreamExt;
44///
45/// async fn run() {
46///     let source = std::fs::read("test/fixtures/SekienAkashita.jpg").unwrap();
47///     let mut chunker = AsyncStreamCDC::new(source.as_ref(), 4096, 16384, 65535);
48///     let stream = chunker.as_stream();
49///
50///     let chunks = stream.collect::<Vec<_>>().await;
51///
52///     for result in chunks {
53///         let chunk = result.unwrap();
54///         println!("offset={} length={}", chunk.offset, chunk.length);
55///     }
56/// }
57/// ```
58///
59pub struct AsyncStreamCDC<R> {
60    /// Buffer of data from source for finding cut points.
61    buffer: Vec<u8>,
62    /// Maximum capacity of the buffer (always `max_size`).
63    capacity: usize,
64    /// Number of relevant bytes in the `buffer`.
65    length: usize,
66    /// Source from which data is read into `buffer`.
67    source: R,
68    /// Number of bytes read from the source so far.
69    processed: u64,
70    /// True when the source produces no more data.
71    eof: bool,
72    min_size: usize,
73    avg_size: usize,
74    max_size: usize,
75    mask_s: u64,
76    mask_l: u64,
77    mask_s_ls: u64,
78    mask_l_ls: u64,
79}
80
81impl<R: AsyncRead + Unpin> AsyncStreamCDC<R> {
82    ///
83    /// Construct a `StreamCDC` that will process bytes from the given source.
84    ///
85    /// Uses chunk size normalization level 1 by default.
86    ///
87    pub fn new(source: R, min_size: u32, avg_size: u32, max_size: u32) -> Self {
88        Self::with_level(source, min_size, avg_size, max_size, Normalization::Level1)
89    }
90
91    ///
92    /// Create a new `StreamCDC` with the given normalization level.
93    ///
94    pub fn with_level(
95        source: R,
96        min_size: u32,
97        avg_size: u32,
98        max_size: u32,
99        level: Normalization,
100    ) -> Self {
101        assert!(min_size >= MINIMUM_MIN);
102        assert!(min_size <= MINIMUM_MAX);
103        assert!(avg_size >= AVERAGE_MIN);
104        assert!(avg_size <= AVERAGE_MAX);
105        assert!(max_size >= MAXIMUM_MIN);
106        assert!(max_size <= MAXIMUM_MAX);
107        let bits = logarithm2(avg_size);
108        let normalization = level.bits();
109        let mask_s = MASKS[(bits + normalization) as usize];
110        let mask_l = MASKS[(bits - normalization) as usize];
111        Self {
112            buffer: vec![0_u8; max_size as usize],
113            capacity: max_size as usize,
114            length: 0,
115            source,
116            eof: false,
117            processed: 0,
118            min_size: min_size as usize,
119            avg_size: avg_size as usize,
120            max_size: max_size as usize,
121            mask_s,
122            mask_l,
123            mask_s_ls: mask_s << 1,
124            mask_l_ls: mask_l << 1,
125        }
126    }
127
128    /// Fill the buffer with data from the source, returning the number of bytes
129    /// read (zero if end of source has been reached).
130    async fn fill_buffer(&mut self) -> Result<usize, Error> {
131        // this code originally copied from asuran crate
132        if self.eof {
133            Ok(0)
134        } else {
135            let mut all_bytes_read = 0;
136            while !self.eof && self.length < self.capacity {
137                let bytes_read = self.source.read(&mut self.buffer[self.length..]).await?;
138                if bytes_read == 0 {
139                    self.eof = true;
140                } else {
141                    self.length += bytes_read;
142                    all_bytes_read += bytes_read;
143                }
144            }
145            Ok(all_bytes_read)
146        }
147    }
148
149    /// Drains a specified number of bytes from the buffer, then resizes the
150    /// buffer back to `capacity` size in preparation for further reads.
151    fn drain_bytes(&mut self, count: usize) -> Result<Vec<u8>, Error> {
152        // this code originally copied from asuran crate
153        if count > self.length {
154            Err(Error::Other(format!(
155                "drain_bytes() called with count larger than length: {} > {}",
156                count, self.length
157            )))
158        } else {
159            let data = self.buffer.drain(..count).collect::<Vec<u8>>();
160            self.length -= count;
161            self.buffer.resize(self.capacity, 0_u8);
162            Ok(data)
163        }
164    }
165
166    /// Find the next chunk in the source. If the end of the source has been
167    /// reached, returns `Error::Empty` as the error.
168    async fn read_chunk(&mut self) -> Result<ChunkData, Error> {
169        self.fill_buffer().await?;
170        if self.length == 0 {
171            Err(Error::Empty)
172        } else {
173            let (hash, count) = cut(
174                &self.buffer[..self.length],
175                self.min_size,
176                self.avg_size,
177                self.max_size,
178                self.mask_s,
179                self.mask_l,
180                self.mask_s_ls,
181                self.mask_l_ls,
182            );
183            if count == 0 {
184                Err(Error::Empty)
185            } else {
186                let offset = self.processed;
187                self.processed += count as u64;
188                let data = self.drain_bytes(count)?;
189                Ok(ChunkData {
190                    hash,
191                    offset,
192                    length: count,
193                    data,
194                })
195            }
196        }
197    }
198
199    #[cfg(all(feature = "tokio", not(feature = "futures")))]
200    pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
201        try_stream! {
202            loop {
203                match self.read_chunk().await {
204                    Ok(chunk) => yield chunk,
205                    Err(Error::Empty) => {
206                        break;
207                    }
208                    error @ Err(_) => {
209                        error?;
210                    }
211                }
212            }
213        }
214    }
215
216    #[cfg(all(feature = "futures", not(feature = "tokio")))]
217    pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
218        futures::stream::unfold(self, |this| async {
219            let chunk = this.read_chunk().await;
220            if let Err(Error::Empty) = chunk {
221                None
222            } else {
223                Some((chunk, this))
224            }
225        })
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use crate::v2020::MASKS;
232
233    use super::AsyncStreamCDC;
234
235    #[test]
236    #[should_panic]
237    fn test_minimum_too_low() {
238        let array = [0u8; 1024];
239        AsyncStreamCDC::new(array.as_slice(), 63, 256, 1024);
240    }
241
242    #[test]
243    #[should_panic]
244    fn test_minimum_too_high() {
245        let array = [0u8; 1024];
246        AsyncStreamCDC::new(array.as_slice(), 67_108_867, 256, 1024);
247    }
248
249    #[test]
250    #[should_panic]
251    fn test_average_too_low() {
252        let array = [0u8; 1024];
253        AsyncStreamCDC::new(array.as_slice(), 64, 255, 1024);
254    }
255
256    #[test]
257    #[should_panic]
258    fn test_average_too_high() {
259        let array = [0u8; 1024];
260        AsyncStreamCDC::new(array.as_slice(), 64, 268_435_457, 1024);
261    }
262
263    #[test]
264    #[should_panic]
265    fn test_maximum_too_low() {
266        let array = [0u8; 1024];
267        AsyncStreamCDC::new(array.as_slice(), 64, 256, 1023);
268    }
269
270    #[test]
271    #[should_panic]
272    fn test_maximum_too_high() {
273        let array = [0u8; 1024];
274        AsyncStreamCDC::new(array.as_slice(), 64, 256, 1_073_741_825);
275    }
276
277    #[test]
278    fn test_masks() {
279        let source = [0u8; 1024];
280        let chunker = AsyncStreamCDC::new(source.as_slice(), 64, 256, 1024);
281        assert_eq!(chunker.mask_l, MASKS[7]);
282        assert_eq!(chunker.mask_s, MASKS[9]);
283        let chunker = AsyncStreamCDC::new(source.as_slice(), 8192, 16384, 32768);
284        assert_eq!(chunker.mask_l, MASKS[13]);
285        assert_eq!(chunker.mask_s, MASKS[15]);
286        let chunker = AsyncStreamCDC::new(source.as_slice(), 1_048_576, 4_194_304, 16_777_216);
287        assert_eq!(chunker.mask_l, MASKS[21]);
288        assert_eq!(chunker.mask_s, MASKS[23]);
289    }
290
291    struct ExpectedChunk {
292        hash: u64,
293        offset: u64,
294        length: usize,
295        digest: String,
296    }
297
298    use md5::{Digest, Md5};
299
300    #[cfg(all(feature = "futures", not(feature = "tokio")))]
301    use futures::stream::StreamExt;
302    #[cfg(all(feature = "tokio", not(feature = "futures")))]
303    use tokio_stream::StreamExt;
304
305    #[cfg_attr(all(feature = "tokio", not(feature = "futures")), tokio::test)]
306    #[cfg_attr(all(feature = "futures", not(feature = "tokio")), futures_test::test)]
307    async fn test_iter_sekien_16k_chunks() {
308        let read_result = std::fs::read("test/fixtures/SekienAkashita.jpg");
309        assert!(read_result.is_ok());
310        let contents = read_result.unwrap();
311        // The digest values are not needed here, but they serve to validate
312        // that the streaming version tested below is returning the correct
313        // chunk data on each iteration.
314        let expected_chunks = vec![
315            ExpectedChunk {
316                hash: 17968276318003433923,
317                offset: 0,
318                length: 21325,
319                digest: "2bb52734718194617c957f5e07ee6054".into(),
320            },
321            ExpectedChunk {
322                hash: 8197189939299398838,
323                offset: 21325,
324                length: 17140,
325                digest: "badfb0757fe081c20336902e7131f768".into(),
326            },
327            ExpectedChunk {
328                hash: 13019990849178155730,
329                offset: 38465,
330                length: 28084,
331                digest: "18412d7414de6eb42f638351711f729d".into(),
332            },
333            ExpectedChunk {
334                hash: 4509236223063678303,
335                offset: 66549,
336                length: 18217,
337                digest: "04fe1405fc5f960363bfcd834c056407".into(),
338            },
339            ExpectedChunk {
340                hash: 2504464741100432583,
341                offset: 84766,
342                length: 24700,
343                digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(),
344            },
345        ];
346        let mut chunker = AsyncStreamCDC::new(contents.as_ref(), 4096, 16384, 65535);
347        let stream = chunker.as_stream();
348
349        let chunks = stream.collect::<Vec<_>>().await;
350
351        let mut index = 0;
352
353        for chunk in chunks {
354            let chunk = chunk.unwrap();
355            assert_eq!(chunk.hash, expected_chunks[index].hash);
356            assert_eq!(chunk.offset, expected_chunks[index].offset);
357            assert_eq!(chunk.length, expected_chunks[index].length);
358            let mut hasher = Md5::new();
359            hasher
360                .update(&contents[(chunk.offset as usize)..(chunk.offset as usize) + chunk.length]);
361            let table = hasher.finalize();
362            let digest = format!("{:x}", table);
363            assert_eq!(digest, expected_chunks[index].digest);
364            index += 1;
365        }
366        assert_eq!(index, 5);
367    }
368}