snix_castore/blobservice/
chunked_reader.rs

1use futures::{TryStreamExt, ready};
2use pin_project_lite::pin_project;
3use tokio::io::{AsyncRead, AsyncSeekExt};
4use tokio_stream::StreamExt;
5use tokio_util::io::{ReaderStream, StreamReader};
6use tracing::{instrument, trace, warn};
7
8use crate::B3Digest;
9use std::{cmp::Ordering, pin::Pin};
10
11use super::{BlobReader, BlobService};
12
13pin_project! {
14    /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and
15    /// seeking into a blob.
16    /// It internally holds a [ChunkedBlob], which is storing chunk information
17    /// able to emit a reader seeked to a specific position whenever we need to seek.
18    pub struct ChunkedReader<BS> {
19        chunked_blob: ChunkedBlob<BS>,
20
21        #[pin]
22        r: Box<dyn AsyncRead + Unpin + Send>,
23
24        pos: u64,
25    }
26}
27
28impl<BS> ChunkedReader<BS>
29where
30    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
31{
32    /// Construct a new [ChunkedReader], by retrieving a list of chunks (their
33    /// blake3 digests and chunk sizes)
34    pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
35        let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
36        let r = chunked_blob.reader_skipped_offset(0);
37
38        Self {
39            chunked_blob,
40            r,
41            pos: 0,
42        }
43    }
44}
45
46/// ChunkedReader implements BlobReader.
47impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {}
48
49impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
50where
51    BS: AsRef<dyn BlobService> + Clone + 'static,
52{
53    fn poll_read(
54        self: std::pin::Pin<&mut Self>,
55        cx: &mut std::task::Context<'_>,
56        buf: &mut tokio::io::ReadBuf<'_>,
57    ) -> std::task::Poll<std::io::Result<()>> {
58        // The amount of data read can be determined by the increase
59        // in the length of the slice returned by `ReadBuf::filled`.
60        let filled_before = buf.filled().len();
61
62        let this = self.project();
63
64        ready!(this.r.poll_read(cx, buf))?;
65        let bytes_read = buf.filled().len() - filled_before;
66        *this.pos += bytes_read as u64;
67
68        Ok(()).into()
69    }
70}
71
72impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
73where
74    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
75{
76    #[instrument(skip(self), err(Debug))]
77    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
78        let total_len = self.chunked_blob.blob_length();
79        let mut this = self.project();
80
81        let absolute_offset: u64 = match position {
82            std::io::SeekFrom::Start(from_start) => from_start,
83            std::io::SeekFrom::End(from_end) => {
84                // note from_end is i64, not u64, so this is usually negative.
85                total_len.checked_add_signed(from_end).ok_or_else(|| {
86                    std::io::Error::new(
87                        std::io::ErrorKind::InvalidInput,
88                        "over/underflow while seeking",
89                    )
90                })?
91            }
92            std::io::SeekFrom::Current(from_current) => {
93                // note from_end is i64, not u64, so this can be positive or negative.
94                (*this.pos)
95                    .checked_add_signed(from_current)
96                    .ok_or_else(|| {
97                        std::io::Error::new(
98                            std::io::ErrorKind::InvalidInput,
99                            "over/underflow while seeking",
100                        )
101                    })?
102            }
103        };
104
105        // check if the position actually did change.
106        if absolute_offset != *this.pos {
107            // ensure the new position still is inside the file.
108            if absolute_offset > total_len {
109                Err(std::io::Error::new(
110                    std::io::ErrorKind::InvalidInput,
111                    "seeked beyond EOF",
112                ))?
113            }
114
115            // Update the position and the internal reader.
116            *this.pos = absolute_offset;
117
118            // FUTUREWORK: if we can seek forward, avoid re-assembling.
119            // At least if it's still in the same chunk?
120            *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
121        }
122
123        Ok(())
124    }
125
126    fn poll_complete(
127        self: Pin<&mut Self>,
128        _cx: &mut std::task::Context<'_>,
129    ) -> std::task::Poll<std::io::Result<u64>> {
130        std::task::Poll::Ready(Ok(self.pos))
131    }
132}
133
134/// Holds a list of blake3 digest for individual chunks (and their sizes).
135/// Is able to construct a Reader that seeked to a certain offset, which
136/// is useful to construct a BlobReader (that implements AsyncSeek).
137/// - the current chunk index, and a `Custor<Vec<u8>>` holding the data of that chunk.
138struct ChunkedBlob<BS> {
139    blob_service: BS,
140    chunks: Vec<(u64, u64, B3Digest)>,
141}
142
143impl<BS> ChunkedBlob<BS>
144where
145    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
146{
147    /// Constructs [Self] from a list of blake3 digests of chunks and their
148    /// sizes, and a reference to a blob service.
149    /// Initializing it with an empty list is disallowed.
150    fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
151        let mut chunks = Vec::new();
152        let mut offset: u64 = 0;
153
154        for (chunk_digest, chunk_size) in chunks_it {
155            chunks.push((offset, chunk_size, chunk_digest));
156            offset += chunk_size;
157        }
158
159        assert!(
160            !chunks.is_empty(),
161            "Chunks must be provided, don't use this for blobs without chunks"
162        );
163
164        Self {
165            blob_service,
166            chunks,
167        }
168    }
169
170    /// Returns the length of the blob.
171    fn blob_length(&self) -> u64 {
172        self.chunks
173            .last()
174            .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
175            .unwrap_or(0)
176    }
177
178    /// For a given position pos, return the chunk containing the data.
179    /// In case this would range outside the blob, None is returned.
180    #[instrument(level = "trace", skip(self), ret)]
181    fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
182        // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
183        self.chunks
184            .binary_search_by(|(chunk_start_pos, chunk_size, _)| {
185                if chunk_start_pos + chunk_size <= pos {
186                    Ordering::Less
187                } else if *chunk_start_pos > pos {
188                    Ordering::Greater
189                } else {
190                    Ordering::Equal
191                }
192            })
193            .ok()
194    }
195
196    /// Returns a stream of bytes of the data in that blob.
197    /// It internally assembles a stream reading from each chunk (skipping over
198    /// chunks containing irrelevant data).
199    /// From the first relevant chunk, the irrelevant bytes are skipped too.
200    /// The returned boxed thing does not implement AsyncSeek on its own, but
201    /// ChunkedReader does.
202    #[instrument(level = "trace", skip(self))]
203    fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
204        if offset == self.blob_length() {
205            return Box::new(std::io::Cursor::new(vec![]));
206        }
207        // construct a stream of all chunks starting with the given offset
208        let start_chunk_idx = self
209            .get_chunk_idx_for_position(offset)
210            .expect("outside of blob");
211        // It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking.
212
213        let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
214
215        let blob_service = self.blob_service.clone();
216        let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
217        let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
218            move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
219                let chunk_digest = chunk_digest.to_owned();
220                let blob_service = blob_service.clone();
221                async move {
222                    trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
223                    let mut blob_reader = blob_service
224                        .as_ref()
225                        .open_read(&chunk_digest.to_owned())
226                        .await?
227                        .ok_or_else(|| {
228                            warn!(chunk.digest = %chunk_digest, "chunk not found");
229                            std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
230                        })?;
231
232                    // iff this is the first chunk in the stream, skip by skip_first_chunk_bytes
233                    if nth_chunk == 0 && skip_first_chunk_bytes > 0 {
234                        blob_reader
235                            .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
236                            .await?;
237                    }
238                    Ok::<_, std::io::Error>(blob_reader)
239                }
240            },
241        );
242
243        // convert the stream of readers to a stream of streams of byte chunks
244        let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
245
246        // flatten into one stream of byte chunks
247        let bytes_stream = bytes_streams.try_flatten();
248
249        // convert into AsyncRead
250        Box::new(StreamReader::new(Box::pin(bytes_stream)))
251    }
252}
253
254#[cfg(test)]
255mod test {
256    use std::{
257        io::SeekFrom,
258        sync::{Arc, LazyLock},
259    };
260
261    use crate::{
262        B3Digest,
263        blobservice::{BlobService, MemoryBlobService, chunked_reader::ChunkedReader},
264    };
265    use hex_literal::hex;
266    use tokio::io::{AsyncReadExt, AsyncSeekExt};
267
268    const CHUNK_1: [u8; 2] = hex!("0001");
269    const CHUNK_2: [u8; 4] = hex!("02030405");
270    const CHUNK_3: [u8; 1] = hex!("06");
271    const CHUNK_4: [u8; 2] = hex!("0708");
272    const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
273
274    // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
275    pub static CHUNK_1_DIGEST: LazyLock<B3Digest> =
276        LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into());
277    pub static CHUNK_2_DIGEST: LazyLock<B3Digest> =
278        LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into());
279    pub static CHUNK_3_DIGEST: LazyLock<B3Digest> =
280        LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into());
281    pub static CHUNK_4_DIGEST: LazyLock<B3Digest> =
282        LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into());
283    pub static CHUNK_5_DIGEST: LazyLock<B3Digest> =
284        LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into());
285    pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| {
286        [
287            (CHUNK_1_DIGEST.clone(), 2),
288            (CHUNK_2_DIGEST.clone(), 4),
289            (CHUNK_3_DIGEST.clone(), 1),
290            (CHUNK_4_DIGEST.clone(), 2),
291            (CHUNK_5_DIGEST.clone(), 7),
292        ]
293    });
294
295    use super::ChunkedBlob;
296
297    /// ensure the start offsets are properly calculated.
298    #[test]
299    fn from_iter() {
300        let cb = ChunkedBlob::from_iter(
301            BLOB_1_LIST.clone().into_iter(),
302            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
303        );
304
305        assert_eq!(
306            cb.chunks,
307            Vec::from_iter([
308                (0, 2, CHUNK_1_DIGEST.clone()),
309                (2, 4, CHUNK_2_DIGEST.clone()),
310                (6, 1, CHUNK_3_DIGEST.clone()),
311                (7, 2, CHUNK_4_DIGEST.clone()),
312                (9, 7, CHUNK_5_DIGEST.clone()),
313            ])
314        );
315    }
316
317    /// ensure ChunkedBlob can't be used with an empty list of chunks
318    #[test]
319    #[should_panic]
320    fn from_iter_empty() {
321        ChunkedBlob::from_iter(
322            [].into_iter(),
323            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
324        );
325    }
326
327    /// ensure the right chunk is selected
328    #[test]
329    fn chunk_idx_for_position() {
330        let cb = ChunkedBlob::from_iter(
331            BLOB_1_LIST.clone().into_iter(),
332            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
333        );
334
335        assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
336
337        assert_eq!(
338            Some(0),
339            cb.get_chunk_idx_for_position(1),
340            "middle of first chunk"
341        );
342        assert_eq!(
343            Some(1),
344            cb.get_chunk_idx_for_position(2),
345            "beginning of second chunk"
346        );
347
348        assert_eq!(
349            Some(4),
350            cb.get_chunk_idx_for_position(15),
351            "right before the end of the blob"
352        );
353        assert_eq!(
354            None,
355            cb.get_chunk_idx_for_position(16),
356            "right outside the blob"
357        );
358        assert_eq!(
359            None,
360            cb.get_chunk_idx_for_position(100),
361            "way outside the blob"
362        );
363    }
364
365    /// returns a blobservice with all chunks in BLOB_1 present.
366    async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
367        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
368
369        // seed blob service with all chunks
370        for blob_contents in [
371            CHUNK_1.to_vec(),
372            CHUNK_2.to_vec(),
373            CHUNK_3.to_vec(),
374            CHUNK_4.to_vec(),
375            CHUNK_5.to_vec(),
376        ] {
377            let mut bw = blob_service.open_write().await;
378            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
379                .await
380                .expect("writing blob");
381            bw.close().await.expect("close blobwriter");
382        }
383
384        blob_service
385    }
386
387    #[tokio::test]
388    async fn test_read() {
389        let blob_service = gen_blobservice_blob1().await;
390        let mut chunked_reader =
391            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
392
393        // read all data
394        let mut buf = Vec::new();
395        tokio::io::copy(&mut chunked_reader, &mut buf)
396            .await
397            .expect("copy");
398
399        assert_eq!(
400            hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
401            buf,
402            "read data must match"
403        );
404    }
405
406    #[tokio::test]
407    async fn test_seek() {
408        let blob_service = gen_blobservice_blob1().await;
409        let mut chunked_reader =
410            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
411
412        // seek to the end
413        // expect to read 0 bytes
414        {
415            chunked_reader
416                .seek(SeekFrom::End(0))
417                .await
418                .expect("seek to end");
419
420            let mut buf = Vec::new();
421            chunked_reader
422                .read_to_end(&mut buf)
423                .await
424                .expect("read to end");
425
426            assert_eq!(hex!("").to_vec(), buf);
427        }
428
429        // seek one bytes before the end
430        {
431            chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
432
433            let mut buf = Vec::new();
434            chunked_reader
435                .read_to_end(&mut buf)
436                .await
437                .expect("read to end");
438
439            assert_eq!(hex!("0f").to_vec(), buf);
440        }
441
442        // seek back three bytes, but using relative positioning
443        // read two bytes
444        {
445            chunked_reader
446                .seek(SeekFrom::Current(-3))
447                .await
448                .expect("seek");
449
450            let mut buf = [0b0; 2];
451            chunked_reader
452                .read_exact(&mut buf)
453                .await
454                .expect("read exact");
455
456            assert_eq!(hex!("0d0e"), buf);
457        }
458    }
459
460    // seeds a blob service with only the first two chunks, reads a bit in the
461    // front (which succeeds), but then tries to seek past and read more (which
462    // should fail).
463    #[tokio::test]
464    async fn test_read_missing_chunks() {
465        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
466
467        for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
468            let mut bw = blob_service.open_write().await;
469            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
470                .await
471                .expect("writing blob");
472
473            bw.close().await.expect("close blobwriter");
474        }
475
476        let mut chunked_reader =
477            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
478
479        // read a bit from the front (5 bytes out of 6 available)
480        let mut buf = [0b0; 5];
481        chunked_reader
482            .read_exact(&mut buf)
483            .await
484            .expect("read exact");
485
486        assert_eq!(hex!("0001020304"), buf);
487
488        // seek 2 bytes forward, into an area where we don't have chunks
489        chunked_reader
490            .seek(SeekFrom::Current(2))
491            .await
492            .expect("seek");
493
494        let mut buf = Vec::new();
495        chunked_reader
496            .read_to_end(&mut buf)
497            .await
498            .expect_err("must fail");
499
500        // FUTUREWORK: check semantics on errorkinds. Should this be InvalidData
501        // or NotFound?
502    }
503}