Skip to main content

snix_castore/blobservice/
chunked_reader.rs

1use futures::{TryStreamExt, ready};
2use pin_project_lite::pin_project;
3use tokio::io::{AsyncRead, AsyncSeekExt};
4use tokio_stream::StreamExt;
5use tokio_util::io::{ReaderStream, StreamReader};
6use tracing::{instrument, trace, warn};
7
8use crate::B3Digest;
9use std::{cmp::Ordering, pin::Pin};
10
11use super::{BlobReader, BlobService};
12
13pin_project! {
14    /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and
15    /// seeking into a blob.
16    /// It internally holds a [ChunkedBlob], which is storing chunk information
17    /// able to emit a reader seeked to a specific position whenever we need to seek.
18    pub struct ChunkedReader<BS> {
19        chunked_blob: ChunkedBlob<BS>,
20
21        #[pin]
22        r: Box<dyn AsyncRead + Unpin + Send>,
23
24        pos: u64,
25    }
26}
27
28impl<BS> ChunkedReader<BS>
29where
30    BS: BlobService + Clone + 'static,
31{
32    /// Construct a new [ChunkedReader], by retrieving a list of chunks (their
33    /// blake3 digests and chunk sizes)
34    pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
35        let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
36        let r = chunked_blob.reader_skipped_offset(0);
37
38        Self {
39            chunked_blob,
40            r,
41            pos: 0,
42        }
43    }
44}
45
46/// ChunkedReader implements BlobReader.
47impl<BS> BlobReader for ChunkedReader<BS> where BS: BlobService + Clone + 'static {}
48
49impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
50where
51    BS: BlobService + Clone + 'static,
52{
53    fn poll_read(
54        self: std::pin::Pin<&mut Self>,
55        cx: &mut std::task::Context<'_>,
56        buf: &mut tokio::io::ReadBuf<'_>,
57    ) -> std::task::Poll<std::io::Result<()>> {
58        // The amount of data read can be determined by the increase
59        // in the length of the slice returned by `ReadBuf::filled`.
60        let filled_before = buf.filled().len();
61
62        let this = self.project();
63
64        ready!(this.r.poll_read(cx, buf))?;
65        let bytes_read = buf.filled().len() - filled_before;
66        *this.pos += bytes_read as u64;
67
68        Ok(()).into()
69    }
70}
71
72impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
73where
74    BS: BlobService + Clone + 'static,
75{
76    #[instrument(skip(self), err(Debug))]
77    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
78        let total_len = self.chunked_blob.blob_length();
79        let mut this = self.project();
80
81        let absolute_offset: u64 = match position {
82            std::io::SeekFrom::Start(from_start) => from_start,
83            std::io::SeekFrom::End(from_end) => {
84                // note from_end is i64, not u64, so this is usually negative.
85                total_len.checked_add_signed(from_end).ok_or_else(|| {
86                    std::io::Error::new(
87                        std::io::ErrorKind::InvalidInput,
88                        "over/underflow while seeking",
89                    )
90                })?
91            }
92            std::io::SeekFrom::Current(from_current) => {
93                // note from_end is i64, not u64, so this can be positive or negative.
94                (*this.pos)
95                    .checked_add_signed(from_current)
96                    .ok_or_else(|| {
97                        std::io::Error::new(
98                            std::io::ErrorKind::InvalidInput,
99                            "over/underflow while seeking",
100                        )
101                    })?
102            }
103        };
104
105        // check if the position actually did change.
106        if absolute_offset != *this.pos {
107            // ensure the new position still is inside the file.
108            if absolute_offset > total_len {
109                Err(std::io::Error::new(
110                    std::io::ErrorKind::InvalidInput,
111                    "seeked beyond EOF",
112                ))?
113            }
114
115            // Update the position and the internal reader.
116            *this.pos = absolute_offset;
117
118            // FUTUREWORK: if we can seek forward, avoid re-assembling.
119            // At least if it's still in the same chunk?
120            *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
121        }
122
123        Ok(())
124    }
125
126    fn poll_complete(
127        self: Pin<&mut Self>,
128        _cx: &mut std::task::Context<'_>,
129    ) -> std::task::Poll<std::io::Result<u64>> {
130        std::task::Poll::Ready(Ok(self.pos))
131    }
132}
133
134/// Holds a list of blake3 digests for individual chunks (and their sizes).
135/// Is able to construct a Reader that seeked to a certain offset, which
136/// is useful to construct a BlobReader (that implements AsyncSeek).
137/// - the current chunk index, and a `Custor<Vec<u8>>` holding the data of that chunk.
138struct ChunkedBlob<BS> {
139    blob_service: BS,
140    chunks: Vec<(u64, u64, B3Digest)>,
141}
142
143impl<BS> ChunkedBlob<BS>
144where
145    BS: BlobService + Clone + 'static,
146{
147    /// Constructs [Self] from a list of blake3 digests of chunks and their
148    /// sizes, and a reference to a blob service.
149    /// Initializing it with an empty list is disallowed.
150    fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
151        let mut chunks = Vec::new();
152        let mut offset: u64 = 0;
153
154        for (chunk_digest, chunk_size) in chunks_it {
155            chunks.push((offset, chunk_size, chunk_digest));
156            offset += chunk_size;
157        }
158
159        assert!(
160            !chunks.is_empty(),
161            "Chunks must be provided, don't use this for blobs without chunks"
162        );
163
164        Self {
165            blob_service,
166            chunks,
167        }
168    }
169
170    /// Returns the length of the blob.
171    fn blob_length(&self) -> u64 {
172        self.chunks
173            .last()
174            .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
175            .unwrap_or(0)
176    }
177
178    /// For a given position pos, return the chunk containing the data.
179    /// In case this would range outside the blob, None is returned.
180    #[instrument(level = "trace", skip(self), ret)]
181    fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
182        // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
183        self.chunks
184            .binary_search_by(|(chunk_start_pos, chunk_size, _)| {
185                if chunk_start_pos + chunk_size <= pos {
186                    Ordering::Less
187                } else if *chunk_start_pos > pos {
188                    Ordering::Greater
189                } else {
190                    Ordering::Equal
191                }
192            })
193            .ok()
194    }
195
196    /// Returns a stream of bytes of the data in that blob.
197    /// It internally assembles a stream reading from each chunk (skipping over
198    /// chunks containing irrelevant data).
199    /// From the first relevant chunk, the irrelevant bytes are skipped too.
200    /// The returned boxed thing does not implement AsyncSeek on its own, but
201    /// ChunkedReader does.
202    #[instrument(level = "trace", skip(self))]
203    fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
204        if offset == self.blob_length() {
205            return Box::new(std::io::Cursor::new(vec![]));
206        }
207        // construct a stream of all chunks starting with the given offset
208        let start_chunk_idx = self
209            .get_chunk_idx_for_position(offset)
210            .expect("outside of blob");
211        // It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking.
212
213        let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
214
215        let blob_service = self.blob_service.clone();
216        let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
217        let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
218            move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
219                let chunk_digest = chunk_digest.to_owned();
220                let blob_service = blob_service.clone();
221                async move {
222                    trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
223                    let mut blob_reader = blob_service
224                        .open_read(&chunk_digest.to_owned())
225                        .await?
226                        .ok_or_else(|| {
227                            warn!(chunk.digest = %chunk_digest, "chunk not found");
228                            std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
229                        })?;
230
231                    // iff this is the first chunk in the stream, skip by skip_first_chunk_bytes
232                    if nth_chunk == 0 && skip_first_chunk_bytes > 0 {
233                        blob_reader
234                            .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
235                            .await?;
236                    }
237                    Ok::<_, std::io::Error>(blob_reader)
238                }
239            },
240        );
241
242        // convert the stream of readers to a stream of streams of byte chunks
243        let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
244
245        // flatten into one stream of byte chunks
246        let bytes_stream = bytes_streams.try_flatten();
247
248        // convert into AsyncRead
249        Box::new(StreamReader::new(Box::pin(bytes_stream)))
250    }
251}
252
253#[cfg(test)]
254mod test {
255    use std::{
256        io::SeekFrom,
257        sync::{Arc, LazyLock},
258    };
259
260    use crate::{
261        B3Digest,
262        blobservice::{BlobService, MemoryBlobService, chunked_reader::ChunkedReader},
263    };
264    use hex_literal::hex;
265    use tokio::io::{AsyncReadExt, AsyncSeekExt};
266
267    const CHUNK_1: [u8; 2] = hex!("0001");
268    const CHUNK_2: [u8; 4] = hex!("02030405");
269    const CHUNK_3: [u8; 1] = hex!("06");
270    const CHUNK_4: [u8; 2] = hex!("0708");
271    const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
272
273    // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
274    pub static CHUNK_1_DIGEST: LazyLock<B3Digest> =
275        LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into());
276    pub static CHUNK_2_DIGEST: LazyLock<B3Digest> =
277        LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into());
278    pub static CHUNK_3_DIGEST: LazyLock<B3Digest> =
279        LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into());
280    pub static CHUNK_4_DIGEST: LazyLock<B3Digest> =
281        LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into());
282    pub static CHUNK_5_DIGEST: LazyLock<B3Digest> =
283        LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into());
284    pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| {
285        [
286            (*CHUNK_1_DIGEST, 2),
287            (*CHUNK_2_DIGEST, 4),
288            (*CHUNK_3_DIGEST, 1),
289            (*CHUNK_4_DIGEST, 2),
290            (*CHUNK_5_DIGEST, 7),
291        ]
292    });
293
294    use super::ChunkedBlob;
295
296    /// ensure the start offsets are properly calculated.
297    #[test]
298    fn from_iter() {
299        let cb = ChunkedBlob::from_iter(
300            (*BLOB_1_LIST).into_iter(),
301            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
302        );
303
304        assert_eq!(
305            cb.chunks,
306            Vec::from_iter([
307                (0, 2, *CHUNK_1_DIGEST),
308                (2, 4, *CHUNK_2_DIGEST),
309                (6, 1, *CHUNK_3_DIGEST),
310                (7, 2, *CHUNK_4_DIGEST),
311                (9, 7, *CHUNK_5_DIGEST),
312            ])
313        );
314    }
315
316    /// ensure ChunkedBlob can't be used with an empty list of chunks
317    #[test]
318    #[should_panic]
319    fn from_iter_empty() {
320        ChunkedBlob::from_iter(
321            [].into_iter(),
322            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
323        );
324    }
325
326    /// ensure the right chunk is selected
327    #[test]
328    fn chunk_idx_for_position() {
329        let cb = ChunkedBlob::from_iter(
330            (*BLOB_1_LIST).into_iter(),
331            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
332        );
333
334        assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
335
336        assert_eq!(
337            Some(0),
338            cb.get_chunk_idx_for_position(1),
339            "middle of first chunk"
340        );
341        assert_eq!(
342            Some(1),
343            cb.get_chunk_idx_for_position(2),
344            "beginning of second chunk"
345        );
346
347        assert_eq!(
348            Some(4),
349            cb.get_chunk_idx_for_position(15),
350            "right before the end of the blob"
351        );
352        assert_eq!(
353            None,
354            cb.get_chunk_idx_for_position(16),
355            "right outside the blob"
356        );
357        assert_eq!(
358            None,
359            cb.get_chunk_idx_for_position(100),
360            "way outside the blob"
361        );
362    }
363
364    /// returns a blobservice with all chunks in BLOB_1 present.
365    async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
366        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
367
368        // seed blob service with all chunks
369        for blob_contents in [
370            CHUNK_1.to_vec(),
371            CHUNK_2.to_vec(),
372            CHUNK_3.to_vec(),
373            CHUNK_4.to_vec(),
374            CHUNK_5.to_vec(),
375        ] {
376            let mut bw = blob_service.open_write().await;
377            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
378                .await
379                .expect("writing blob");
380            bw.close().await.expect("close blobwriter");
381        }
382
383        blob_service
384    }
385
386    #[tokio::test]
387    async fn test_read() {
388        let blob_service = gen_blobservice_blob1().await;
389        let mut chunked_reader =
390            ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
391
392        // read all data
393        let mut buf = Vec::new();
394        tokio::io::copy(&mut chunked_reader, &mut buf)
395            .await
396            .expect("copy");
397
398        assert_eq!(
399            hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
400            buf,
401            "read data must match"
402        );
403    }
404
405    #[tokio::test]
406    async fn test_seek() {
407        let blob_service = gen_blobservice_blob1().await;
408        let mut chunked_reader =
409            ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
410
411        // seek to the end
412        // expect to read 0 bytes
413        {
414            chunked_reader
415                .seek(SeekFrom::End(0))
416                .await
417                .expect("seek to end");
418
419            let mut buf = Vec::new();
420            chunked_reader
421                .read_to_end(&mut buf)
422                .await
423                .expect("read to end");
424
425            assert_eq!(hex!("").to_vec(), buf);
426        }
427
428        // seek one bytes before the end
429        {
430            chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
431
432            let mut buf = Vec::new();
433            chunked_reader
434                .read_to_end(&mut buf)
435                .await
436                .expect("read to end");
437
438            assert_eq!(hex!("0f").to_vec(), buf);
439        }
440
441        // seek back three bytes, but using relative positioning
442        // read two bytes
443        {
444            chunked_reader
445                .seek(SeekFrom::Current(-3))
446                .await
447                .expect("seek");
448
449            let mut buf = [0b0; 2];
450            chunked_reader
451                .read_exact(&mut buf)
452                .await
453                .expect("read exact");
454
455            assert_eq!(hex!("0d0e"), buf);
456        }
457    }
458
459    // seeds a blob service with only the first two chunks, reads a bit in the
460    // front (which succeeds), but then tries to seek past and read more (which
461    // should fail).
462    #[tokio::test]
463    async fn test_read_missing_chunks() {
464        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
465
466        for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
467            let mut bw = blob_service.open_write().await;
468            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
469                .await
470                .expect("writing blob");
471
472            bw.close().await.expect("close blobwriter");
473        }
474
475        let mut chunked_reader =
476            ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
477
478        // read a bit from the front (5 bytes out of 6 available)
479        let mut buf = [0b0; 5];
480        chunked_reader
481            .read_exact(&mut buf)
482            .await
483            .expect("read exact");
484
485        assert_eq!(hex!("0001020304"), buf);
486
487        // seek 2 bytes forward, into an area where we don't have chunks
488        chunked_reader
489            .seek(SeekFrom::Current(2))
490            .await
491            .expect("seek");
492
493        let mut buf = Vec::new();
494        chunked_reader
495            .read_to_end(&mut buf)
496            .await
497            .expect_err("must fail");
498
499        // FUTUREWORK: check semantics on errorkinds. Should this be InvalidData
500        // or NotFound?
501    }
502}