1use super::*;
6
7#[cfg(all(feature = "futures", not(feature = "tokio")))]
8use futures::{
9 io::{AsyncRead, AsyncReadExt},
10 stream::Stream,
11};
12
13#[cfg(all(feature = "tokio", not(feature = "futures")))]
14use tokio_stream::Stream;
15
16#[cfg(all(feature = "tokio", not(feature = "futures")))]
17use tokio::io::{AsyncRead, AsyncReadExt};
18
19#[cfg(all(feature = "tokio", not(feature = "futures")))]
20use async_stream::try_stream;
21
22pub struct AsyncStreamCDC<R> {
60 buffer: Vec<u8>,
62 capacity: usize,
64 length: usize,
66 source: R,
68 processed: u64,
70 eof: bool,
72 min_size: usize,
73 avg_size: usize,
74 max_size: usize,
75 mask_s: u64,
76 mask_l: u64,
77 mask_s_ls: u64,
78 mask_l_ls: u64,
79}
80
81impl<R: AsyncRead + Unpin> AsyncStreamCDC<R> {
82 pub fn new(source: R, min_size: u32, avg_size: u32, max_size: u32) -> Self {
88 Self::with_level(source, min_size, avg_size, max_size, Normalization::Level1)
89 }
90
91 pub fn with_level(
95 source: R,
96 min_size: u32,
97 avg_size: u32,
98 max_size: u32,
99 level: Normalization,
100 ) -> Self {
101 assert!(min_size >= MINIMUM_MIN);
102 assert!(min_size <= MINIMUM_MAX);
103 assert!(avg_size >= AVERAGE_MIN);
104 assert!(avg_size <= AVERAGE_MAX);
105 assert!(max_size >= MAXIMUM_MIN);
106 assert!(max_size <= MAXIMUM_MAX);
107 let bits = logarithm2(avg_size);
108 let normalization = level.bits();
109 let mask_s = MASKS[(bits + normalization) as usize];
110 let mask_l = MASKS[(bits - normalization) as usize];
111 Self {
112 buffer: vec![0_u8; max_size as usize],
113 capacity: max_size as usize,
114 length: 0,
115 source,
116 eof: false,
117 processed: 0,
118 min_size: min_size as usize,
119 avg_size: avg_size as usize,
120 max_size: max_size as usize,
121 mask_s,
122 mask_l,
123 mask_s_ls: mask_s << 1,
124 mask_l_ls: mask_l << 1,
125 }
126 }
127
128 async fn fill_buffer(&mut self) -> Result<usize, Error> {
131 if self.eof {
133 Ok(0)
134 } else {
135 let mut all_bytes_read = 0;
136 while !self.eof && self.length < self.capacity {
137 let bytes_read = self.source.read(&mut self.buffer[self.length..]).await?;
138 if bytes_read == 0 {
139 self.eof = true;
140 } else {
141 self.length += bytes_read;
142 all_bytes_read += bytes_read;
143 }
144 }
145 Ok(all_bytes_read)
146 }
147 }
148
149 fn drain_bytes(&mut self, count: usize) -> Result<Vec<u8>, Error> {
152 if count > self.length {
154 Err(Error::Other(format!(
155 "drain_bytes() called with count larger than length: {} > {}",
156 count, self.length
157 )))
158 } else {
159 let data = self.buffer.drain(..count).collect::<Vec<u8>>();
160 self.length -= count;
161 self.buffer.resize(self.capacity, 0_u8);
162 Ok(data)
163 }
164 }
165
166 async fn read_chunk(&mut self) -> Result<ChunkData, Error> {
169 self.fill_buffer().await?;
170 if self.length == 0 {
171 Err(Error::Empty)
172 } else {
173 let (hash, count) = cut(
174 &self.buffer[..self.length],
175 self.min_size,
176 self.avg_size,
177 self.max_size,
178 self.mask_s,
179 self.mask_l,
180 self.mask_s_ls,
181 self.mask_l_ls,
182 );
183 if count == 0 {
184 Err(Error::Empty)
185 } else {
186 let offset = self.processed;
187 self.processed += count as u64;
188 let data = self.drain_bytes(count)?;
189 Ok(ChunkData {
190 hash,
191 offset,
192 length: count,
193 data,
194 })
195 }
196 }
197 }
198
199 #[cfg(all(feature = "tokio", not(feature = "futures")))]
200 pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
201 try_stream! {
202 loop {
203 match self.read_chunk().await {
204 Ok(chunk) => yield chunk,
205 Err(Error::Empty) => {
206 break;
207 }
208 error @ Err(_) => {
209 error?;
210 }
211 }
212 }
213 }
214 }
215
216 #[cfg(all(feature = "futures", not(feature = "tokio")))]
217 pub fn as_stream(&mut self) -> impl Stream<Item = Result<ChunkData, Error>> + '_ {
218 futures::stream::unfold(self, |this| async {
219 let chunk = this.read_chunk().await;
220 if let Err(Error::Empty) = chunk {
221 None
222 } else {
223 Some((chunk, this))
224 }
225 })
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use crate::v2020::MASKS;
232
233 use super::AsyncStreamCDC;
234
235 #[test]
236 #[should_panic]
237 fn test_minimum_too_low() {
238 let array = [0u8; 1024];
239 AsyncStreamCDC::new(array.as_slice(), 63, 256, 1024);
240 }
241
242 #[test]
243 #[should_panic]
244 fn test_minimum_too_high() {
245 let array = [0u8; 1024];
246 AsyncStreamCDC::new(array.as_slice(), 67_108_867, 256, 1024);
247 }
248
249 #[test]
250 #[should_panic]
251 fn test_average_too_low() {
252 let array = [0u8; 1024];
253 AsyncStreamCDC::new(array.as_slice(), 64, 255, 1024);
254 }
255
256 #[test]
257 #[should_panic]
258 fn test_average_too_high() {
259 let array = [0u8; 1024];
260 AsyncStreamCDC::new(array.as_slice(), 64, 268_435_457, 1024);
261 }
262
263 #[test]
264 #[should_panic]
265 fn test_maximum_too_low() {
266 let array = [0u8; 1024];
267 AsyncStreamCDC::new(array.as_slice(), 64, 256, 1023);
268 }
269
270 #[test]
271 #[should_panic]
272 fn test_maximum_too_high() {
273 let array = [0u8; 1024];
274 AsyncStreamCDC::new(array.as_slice(), 64, 256, 1_073_741_825);
275 }
276
277 #[test]
278 fn test_masks() {
279 let source = [0u8; 1024];
280 let chunker = AsyncStreamCDC::new(source.as_slice(), 64, 256, 1024);
281 assert_eq!(chunker.mask_l, MASKS[7]);
282 assert_eq!(chunker.mask_s, MASKS[9]);
283 let chunker = AsyncStreamCDC::new(source.as_slice(), 8192, 16384, 32768);
284 assert_eq!(chunker.mask_l, MASKS[13]);
285 assert_eq!(chunker.mask_s, MASKS[15]);
286 let chunker = AsyncStreamCDC::new(source.as_slice(), 1_048_576, 4_194_304, 16_777_216);
287 assert_eq!(chunker.mask_l, MASKS[21]);
288 assert_eq!(chunker.mask_s, MASKS[23]);
289 }
290
291 struct ExpectedChunk {
292 hash: u64,
293 offset: u64,
294 length: usize,
295 digest: String,
296 }
297
298 use md5::{Digest, Md5};
299
300 #[cfg(all(feature = "futures", not(feature = "tokio")))]
301 use futures::stream::StreamExt;
302 #[cfg(all(feature = "tokio", not(feature = "futures")))]
303 use tokio_stream::StreamExt;
304
305 #[cfg_attr(all(feature = "tokio", not(feature = "futures")), tokio::test)]
306 #[cfg_attr(all(feature = "futures", not(feature = "tokio")), futures_test::test)]
307 async fn test_iter_sekien_16k_chunks() {
308 let read_result = std::fs::read("test/fixtures/SekienAkashita.jpg");
309 assert!(read_result.is_ok());
310 let contents = read_result.unwrap();
311 let expected_chunks = vec![
315 ExpectedChunk {
316 hash: 17968276318003433923,
317 offset: 0,
318 length: 21325,
319 digest: "2bb52734718194617c957f5e07ee6054".into(),
320 },
321 ExpectedChunk {
322 hash: 8197189939299398838,
323 offset: 21325,
324 length: 17140,
325 digest: "badfb0757fe081c20336902e7131f768".into(),
326 },
327 ExpectedChunk {
328 hash: 13019990849178155730,
329 offset: 38465,
330 length: 28084,
331 digest: "18412d7414de6eb42f638351711f729d".into(),
332 },
333 ExpectedChunk {
334 hash: 4509236223063678303,
335 offset: 66549,
336 length: 18217,
337 digest: "04fe1405fc5f960363bfcd834c056407".into(),
338 },
339 ExpectedChunk {
340 hash: 2504464741100432583,
341 offset: 84766,
342 length: 24700,
343 digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(),
344 },
345 ];
346 let mut chunker = AsyncStreamCDC::new(contents.as_ref(), 4096, 16384, 65535);
347 let stream = chunker.as_stream();
348
349 let chunks = stream.collect::<Vec<_>>().await;
350
351 let mut index = 0;
352
353 for chunk in chunks {
354 let chunk = chunk.unwrap();
355 assert_eq!(chunk.hash, expected_chunks[index].hash);
356 assert_eq!(chunk.offset, expected_chunks[index].offset);
357 assert_eq!(chunk.length, expected_chunks[index].length);
358 let mut hasher = Md5::new();
359 hasher
360 .update(&contents[(chunk.offset as usize)..(chunk.offset as usize) + chunk.length]);
361 let table = hasher.finalize();
362 let digest = format!("{:x}", table);
363 assert_eq!(digest, expected_chunks[index].digest);
364 index += 1;
365 }
366 assert_eq!(index, 5);
367 }
368}