snix_castore/blobservice/
chunked_reader.rs1use futures::{TryStreamExt, ready};
2use pin_project_lite::pin_project;
3use tokio::io::{AsyncRead, AsyncSeekExt};
4use tokio_stream::StreamExt;
5use tokio_util::io::{ReaderStream, StreamReader};
6use tracing::{instrument, trace, warn};
7
8use crate::B3Digest;
9use std::{cmp::Ordering, pin::Pin};
10
11use super::{BlobReader, BlobService};
12
13pin_project! {
14 pub struct ChunkedReader<BS> {
19 chunked_blob: ChunkedBlob<BS>,
20
21 #[pin]
22 r: Box<dyn AsyncRead + Unpin + Send>,
23
24 pos: u64,
25 }
26}
27
28impl<BS> ChunkedReader<BS>
29where
30 BS: BlobService + Clone + 'static,
31{
32 pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
35 let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
36 let r = chunked_blob.reader_skipped_offset(0);
37
38 Self {
39 chunked_blob,
40 r,
41 pos: 0,
42 }
43 }
44}
45
46impl<BS> BlobReader for ChunkedReader<BS> where BS: BlobService + Clone + 'static {}
48
49impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
50where
51 BS: BlobService + Clone + 'static,
52{
53 fn poll_read(
54 self: std::pin::Pin<&mut Self>,
55 cx: &mut std::task::Context<'_>,
56 buf: &mut tokio::io::ReadBuf<'_>,
57 ) -> std::task::Poll<std::io::Result<()>> {
58 let filled_before = buf.filled().len();
61
62 let this = self.project();
63
64 ready!(this.r.poll_read(cx, buf))?;
65 let bytes_read = buf.filled().len() - filled_before;
66 *this.pos += bytes_read as u64;
67
68 Ok(()).into()
69 }
70}
71
72impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
73where
74 BS: BlobService + Clone + 'static,
75{
76 #[instrument(skip(self), err(Debug))]
77 fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
78 let total_len = self.chunked_blob.blob_length();
79 let mut this = self.project();
80
81 let absolute_offset: u64 = match position {
82 std::io::SeekFrom::Start(from_start) => from_start,
83 std::io::SeekFrom::End(from_end) => {
84 total_len.checked_add_signed(from_end).ok_or_else(|| {
86 std::io::Error::new(
87 std::io::ErrorKind::InvalidInput,
88 "over/underflow while seeking",
89 )
90 })?
91 }
92 std::io::SeekFrom::Current(from_current) => {
93 (*this.pos)
95 .checked_add_signed(from_current)
96 .ok_or_else(|| {
97 std::io::Error::new(
98 std::io::ErrorKind::InvalidInput,
99 "over/underflow while seeking",
100 )
101 })?
102 }
103 };
104
105 if absolute_offset != *this.pos {
107 if absolute_offset > total_len {
109 Err(std::io::Error::new(
110 std::io::ErrorKind::InvalidInput,
111 "seeked beyond EOF",
112 ))?
113 }
114
115 *this.pos = absolute_offset;
117
118 *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
121 }
122
123 Ok(())
124 }
125
126 fn poll_complete(
127 self: Pin<&mut Self>,
128 _cx: &mut std::task::Context<'_>,
129 ) -> std::task::Poll<std::io::Result<u64>> {
130 std::task::Poll::Ready(Ok(self.pos))
131 }
132}
133
134struct ChunkedBlob<BS> {
139 blob_service: BS,
140 chunks: Vec<(u64, u64, B3Digest)>,
141}
142
143impl<BS> ChunkedBlob<BS>
144where
145 BS: BlobService + Clone + 'static,
146{
147 fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
151 let mut chunks = Vec::new();
152 let mut offset: u64 = 0;
153
154 for (chunk_digest, chunk_size) in chunks_it {
155 chunks.push((offset, chunk_size, chunk_digest));
156 offset += chunk_size;
157 }
158
159 assert!(
160 !chunks.is_empty(),
161 "Chunks must be provided, don't use this for blobs without chunks"
162 );
163
164 Self {
165 blob_service,
166 chunks,
167 }
168 }
169
170 fn blob_length(&self) -> u64 {
172 self.chunks
173 .last()
174 .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
175 .unwrap_or(0)
176 }
177
178 #[instrument(level = "trace", skip(self), ret)]
181 fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
182 self.chunks
184 .binary_search_by(|(chunk_start_pos, chunk_size, _)| {
185 if chunk_start_pos + chunk_size <= pos {
186 Ordering::Less
187 } else if *chunk_start_pos > pos {
188 Ordering::Greater
189 } else {
190 Ordering::Equal
191 }
192 })
193 .ok()
194 }
195
196 #[instrument(level = "trace", skip(self))]
203 fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
204 if offset == self.blob_length() {
205 return Box::new(std::io::Cursor::new(vec![]));
206 }
207 let start_chunk_idx = self
209 .get_chunk_idx_for_position(offset)
210 .expect("outside of blob");
211 let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
214
215 let blob_service = self.blob_service.clone();
216 let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
217 let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
218 move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
219 let chunk_digest = chunk_digest.to_owned();
220 let blob_service = blob_service.clone();
221 async move {
222 trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
223 let mut blob_reader = blob_service
224 .open_read(&chunk_digest.to_owned())
225 .await?
226 .ok_or_else(|| {
227 warn!(chunk.digest = %chunk_digest, "chunk not found");
228 std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
229 })?;
230
231 if nth_chunk == 0 && skip_first_chunk_bytes > 0 {
233 blob_reader
234 .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
235 .await?;
236 }
237 Ok::<_, std::io::Error>(blob_reader)
238 }
239 },
240 );
241
242 let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
244
245 let bytes_stream = bytes_streams.try_flatten();
247
248 Box::new(StreamReader::new(Box::pin(bytes_stream)))
250 }
251}
252
253#[cfg(test)]
254mod test {
255 use std::{
256 io::SeekFrom,
257 sync::{Arc, LazyLock},
258 };
259
260 use crate::{
261 B3Digest,
262 blobservice::{BlobService, MemoryBlobService, chunked_reader::ChunkedReader},
263 };
264 use hex_literal::hex;
265 use tokio::io::{AsyncReadExt, AsyncSeekExt};
266
267 const CHUNK_1: [u8; 2] = hex!("0001");
268 const CHUNK_2: [u8; 4] = hex!("02030405");
269 const CHUNK_3: [u8; 1] = hex!("06");
270 const CHUNK_4: [u8; 2] = hex!("0708");
271 const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
272
273 pub static CHUNK_1_DIGEST: LazyLock<B3Digest> =
275 LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into());
276 pub static CHUNK_2_DIGEST: LazyLock<B3Digest> =
277 LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into());
278 pub static CHUNK_3_DIGEST: LazyLock<B3Digest> =
279 LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into());
280 pub static CHUNK_4_DIGEST: LazyLock<B3Digest> =
281 LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into());
282 pub static CHUNK_5_DIGEST: LazyLock<B3Digest> =
283 LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into());
284 pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| {
285 [
286 (*CHUNK_1_DIGEST, 2),
287 (*CHUNK_2_DIGEST, 4),
288 (*CHUNK_3_DIGEST, 1),
289 (*CHUNK_4_DIGEST, 2),
290 (*CHUNK_5_DIGEST, 7),
291 ]
292 });
293
294 use super::ChunkedBlob;
295
296 #[test]
298 fn from_iter() {
299 let cb = ChunkedBlob::from_iter(
300 (*BLOB_1_LIST).into_iter(),
301 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
302 );
303
304 assert_eq!(
305 cb.chunks,
306 Vec::from_iter([
307 (0, 2, *CHUNK_1_DIGEST),
308 (2, 4, *CHUNK_2_DIGEST),
309 (6, 1, *CHUNK_3_DIGEST),
310 (7, 2, *CHUNK_4_DIGEST),
311 (9, 7, *CHUNK_5_DIGEST),
312 ])
313 );
314 }
315
316 #[test]
318 #[should_panic]
319 fn from_iter_empty() {
320 ChunkedBlob::from_iter(
321 [].into_iter(),
322 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
323 );
324 }
325
326 #[test]
328 fn chunk_idx_for_position() {
329 let cb = ChunkedBlob::from_iter(
330 (*BLOB_1_LIST).into_iter(),
331 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
332 );
333
334 assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
335
336 assert_eq!(
337 Some(0),
338 cb.get_chunk_idx_for_position(1),
339 "middle of first chunk"
340 );
341 assert_eq!(
342 Some(1),
343 cb.get_chunk_idx_for_position(2),
344 "beginning of second chunk"
345 );
346
347 assert_eq!(
348 Some(4),
349 cb.get_chunk_idx_for_position(15),
350 "right before the end of the blob"
351 );
352 assert_eq!(
353 None,
354 cb.get_chunk_idx_for_position(16),
355 "right outside the blob"
356 );
357 assert_eq!(
358 None,
359 cb.get_chunk_idx_for_position(100),
360 "way outside the blob"
361 );
362 }
363
364 async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
366 let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
367
368 for blob_contents in [
370 CHUNK_1.to_vec(),
371 CHUNK_2.to_vec(),
372 CHUNK_3.to_vec(),
373 CHUNK_4.to_vec(),
374 CHUNK_5.to_vec(),
375 ] {
376 let mut bw = blob_service.open_write().await;
377 tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
378 .await
379 .expect("writing blob");
380 bw.close().await.expect("close blobwriter");
381 }
382
383 blob_service
384 }
385
386 #[tokio::test]
387 async fn test_read() {
388 let blob_service = gen_blobservice_blob1().await;
389 let mut chunked_reader =
390 ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
391
392 let mut buf = Vec::new();
394 tokio::io::copy(&mut chunked_reader, &mut buf)
395 .await
396 .expect("copy");
397
398 assert_eq!(
399 hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
400 buf,
401 "read data must match"
402 );
403 }
404
405 #[tokio::test]
406 async fn test_seek() {
407 let blob_service = gen_blobservice_blob1().await;
408 let mut chunked_reader =
409 ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
410
411 {
414 chunked_reader
415 .seek(SeekFrom::End(0))
416 .await
417 .expect("seek to end");
418
419 let mut buf = Vec::new();
420 chunked_reader
421 .read_to_end(&mut buf)
422 .await
423 .expect("read to end");
424
425 assert_eq!(hex!("").to_vec(), buf);
426 }
427
428 {
430 chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
431
432 let mut buf = Vec::new();
433 chunked_reader
434 .read_to_end(&mut buf)
435 .await
436 .expect("read to end");
437
438 assert_eq!(hex!("0f").to_vec(), buf);
439 }
440
441 {
444 chunked_reader
445 .seek(SeekFrom::Current(-3))
446 .await
447 .expect("seek");
448
449 let mut buf = [0b0; 2];
450 chunked_reader
451 .read_exact(&mut buf)
452 .await
453 .expect("read exact");
454
455 assert_eq!(hex!("0d0e"), buf);
456 }
457 }
458
459 #[tokio::test]
463 async fn test_read_missing_chunks() {
464 let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
465
466 for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
467 let mut bw = blob_service.open_write().await;
468 tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
469 .await
470 .expect("writing blob");
471
472 bw.close().await.expect("close blobwriter");
473 }
474
475 let mut chunked_reader =
476 ChunkedReader::from_chunks((*BLOB_1_LIST).into_iter(), blob_service);
477
478 let mut buf = [0b0; 5];
480 chunked_reader
481 .read_exact(&mut buf)
482 .await
483 .expect("read exact");
484
485 assert_eq!(hex!("0001020304"), buf);
486
487 chunked_reader
489 .seek(SeekFrom::Current(2))
490 .await
491 .expect("seek");
492
493 let mut buf = Vec::new();
494 chunked_reader
495 .read_to_end(&mut buf)
496 .await
497 .expect_err("must fail");
498
499 }
502}