snix_castore/blobservice/
chunked_reader.rs1use futures::{TryStreamExt, ready};
2use pin_project_lite::pin_project;
3use tokio::io::{AsyncRead, AsyncSeekExt};
4use tokio_stream::StreamExt;
5use tokio_util::io::{ReaderStream, StreamReader};
6use tracing::{instrument, trace, warn};
7
8use crate::B3Digest;
9use std::{cmp::Ordering, pin::Pin};
10
11use super::{BlobReader, BlobService};
12
13pin_project! {
14 pub struct ChunkedReader<BS> {
19 chunked_blob: ChunkedBlob<BS>,
20
21 #[pin]
22 r: Box<dyn AsyncRead + Unpin + Send>,
23
24 pos: u64,
25 }
26}
27
28impl<BS> ChunkedReader<BS>
29where
30 BS: AsRef<dyn BlobService> + Clone + 'static + Send,
31{
32 pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
35 let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
36 let r = chunked_blob.reader_skipped_offset(0);
37
38 Self {
39 chunked_blob,
40 r,
41 pos: 0,
42 }
43 }
44}
45
46impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {}
48
49impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
50where
51 BS: AsRef<dyn BlobService> + Clone + 'static,
52{
53 fn poll_read(
54 self: std::pin::Pin<&mut Self>,
55 cx: &mut std::task::Context<'_>,
56 buf: &mut tokio::io::ReadBuf<'_>,
57 ) -> std::task::Poll<std::io::Result<()>> {
58 let filled_before = buf.filled().len();
61
62 let this = self.project();
63
64 ready!(this.r.poll_read(cx, buf))?;
65 let bytes_read = buf.filled().len() - filled_before;
66 *this.pos += bytes_read as u64;
67
68 Ok(()).into()
69 }
70}
71
72impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
73where
74 BS: AsRef<dyn BlobService> + Clone + Send + 'static,
75{
76 #[instrument(skip(self), err(Debug))]
77 fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
78 let total_len = self.chunked_blob.blob_length();
79 let mut this = self.project();
80
81 let absolute_offset: u64 = match position {
82 std::io::SeekFrom::Start(from_start) => from_start,
83 std::io::SeekFrom::End(from_end) => {
84 total_len.checked_add_signed(from_end).ok_or_else(|| {
86 std::io::Error::new(
87 std::io::ErrorKind::InvalidInput,
88 "over/underflow while seeking",
89 )
90 })?
91 }
92 std::io::SeekFrom::Current(from_current) => {
93 (*this.pos)
95 .checked_add_signed(from_current)
96 .ok_or_else(|| {
97 std::io::Error::new(
98 std::io::ErrorKind::InvalidInput,
99 "over/underflow while seeking",
100 )
101 })?
102 }
103 };
104
105 if absolute_offset != *this.pos {
107 if absolute_offset > total_len {
109 Err(std::io::Error::new(
110 std::io::ErrorKind::InvalidInput,
111 "seeked beyond EOF",
112 ))?
113 }
114
115 *this.pos = absolute_offset;
117
118 *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
121 }
122
123 Ok(())
124 }
125
126 fn poll_complete(
127 self: Pin<&mut Self>,
128 _cx: &mut std::task::Context<'_>,
129 ) -> std::task::Poll<std::io::Result<u64>> {
130 std::task::Poll::Ready(Ok(self.pos))
131 }
132}
133
134struct ChunkedBlob<BS> {
139 blob_service: BS,
140 chunks: Vec<(u64, u64, B3Digest)>,
141}
142
143impl<BS> ChunkedBlob<BS>
144where
145 BS: AsRef<dyn BlobService> + Clone + 'static + Send,
146{
147 fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
151 let mut chunks = Vec::new();
152 let mut offset: u64 = 0;
153
154 for (chunk_digest, chunk_size) in chunks_it {
155 chunks.push((offset, chunk_size, chunk_digest));
156 offset += chunk_size;
157 }
158
159 assert!(
160 !chunks.is_empty(),
161 "Chunks must be provided, don't use this for blobs without chunks"
162 );
163
164 Self {
165 blob_service,
166 chunks,
167 }
168 }
169
170 fn blob_length(&self) -> u64 {
172 self.chunks
173 .last()
174 .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
175 .unwrap_or(0)
176 }
177
178 #[instrument(level = "trace", skip(self), ret)]
181 fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
182 self.chunks
184 .binary_search_by(|(chunk_start_pos, chunk_size, _)| {
185 if chunk_start_pos + chunk_size <= pos {
186 Ordering::Less
187 } else if *chunk_start_pos > pos {
188 Ordering::Greater
189 } else {
190 Ordering::Equal
191 }
192 })
193 .ok()
194 }
195
196 #[instrument(level = "trace", skip(self))]
203 fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
204 if offset == self.blob_length() {
205 return Box::new(std::io::Cursor::new(vec![]));
206 }
207 let start_chunk_idx = self
209 .get_chunk_idx_for_position(offset)
210 .expect("outside of blob");
211 let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
214
215 let blob_service = self.blob_service.clone();
216 let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
217 let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
218 move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
219 let chunk_digest = chunk_digest.to_owned();
220 let blob_service = blob_service.clone();
221 async move {
222 trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
223 let mut blob_reader = blob_service
224 .as_ref()
225 .open_read(&chunk_digest.to_owned())
226 .await?
227 .ok_or_else(|| {
228 warn!(chunk.digest = %chunk_digest, "chunk not found");
229 std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
230 })?;
231
232 if nth_chunk == 0 && skip_first_chunk_bytes > 0 {
234 blob_reader
235 .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
236 .await?;
237 }
238 Ok::<_, std::io::Error>(blob_reader)
239 }
240 },
241 );
242
243 let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
245
246 let bytes_stream = bytes_streams.try_flatten();
248
249 Box::new(StreamReader::new(Box::pin(bytes_stream)))
251 }
252}
253
254#[cfg(test)]
255mod test {
256 use std::{
257 io::SeekFrom,
258 sync::{Arc, LazyLock},
259 };
260
261 use crate::{
262 B3Digest,
263 blobservice::{BlobService, MemoryBlobService, chunked_reader::ChunkedReader},
264 };
265 use hex_literal::hex;
266 use tokio::io::{AsyncReadExt, AsyncSeekExt};
267
268 const CHUNK_1: [u8; 2] = hex!("0001");
269 const CHUNK_2: [u8; 4] = hex!("02030405");
270 const CHUNK_3: [u8; 1] = hex!("06");
271 const CHUNK_4: [u8; 2] = hex!("0708");
272 const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
273
274 pub static CHUNK_1_DIGEST: LazyLock<B3Digest> =
276 LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into());
277 pub static CHUNK_2_DIGEST: LazyLock<B3Digest> =
278 LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into());
279 pub static CHUNK_3_DIGEST: LazyLock<B3Digest> =
280 LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into());
281 pub static CHUNK_4_DIGEST: LazyLock<B3Digest> =
282 LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into());
283 pub static CHUNK_5_DIGEST: LazyLock<B3Digest> =
284 LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into());
285 pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| {
286 [
287 (CHUNK_1_DIGEST.clone(), 2),
288 (CHUNK_2_DIGEST.clone(), 4),
289 (CHUNK_3_DIGEST.clone(), 1),
290 (CHUNK_4_DIGEST.clone(), 2),
291 (CHUNK_5_DIGEST.clone(), 7),
292 ]
293 });
294
295 use super::ChunkedBlob;
296
297 #[test]
299 fn from_iter() {
300 let cb = ChunkedBlob::from_iter(
301 BLOB_1_LIST.clone().into_iter(),
302 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
303 );
304
305 assert_eq!(
306 cb.chunks,
307 Vec::from_iter([
308 (0, 2, CHUNK_1_DIGEST.clone()),
309 (2, 4, CHUNK_2_DIGEST.clone()),
310 (6, 1, CHUNK_3_DIGEST.clone()),
311 (7, 2, CHUNK_4_DIGEST.clone()),
312 (9, 7, CHUNK_5_DIGEST.clone()),
313 ])
314 );
315 }
316
317 #[test]
319 #[should_panic]
320 fn from_iter_empty() {
321 ChunkedBlob::from_iter(
322 [].into_iter(),
323 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
324 );
325 }
326
327 #[test]
329 fn chunk_idx_for_position() {
330 let cb = ChunkedBlob::from_iter(
331 BLOB_1_LIST.clone().into_iter(),
332 Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
333 );
334
335 assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
336
337 assert_eq!(
338 Some(0),
339 cb.get_chunk_idx_for_position(1),
340 "middle of first chunk"
341 );
342 assert_eq!(
343 Some(1),
344 cb.get_chunk_idx_for_position(2),
345 "beginning of second chunk"
346 );
347
348 assert_eq!(
349 Some(4),
350 cb.get_chunk_idx_for_position(15),
351 "right before the end of the blob"
352 );
353 assert_eq!(
354 None,
355 cb.get_chunk_idx_for_position(16),
356 "right outside the blob"
357 );
358 assert_eq!(
359 None,
360 cb.get_chunk_idx_for_position(100),
361 "way outside the blob"
362 );
363 }
364
365 async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
367 let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
368
369 for blob_contents in [
371 CHUNK_1.to_vec(),
372 CHUNK_2.to_vec(),
373 CHUNK_3.to_vec(),
374 CHUNK_4.to_vec(),
375 CHUNK_5.to_vec(),
376 ] {
377 let mut bw = blob_service.open_write().await;
378 tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
379 .await
380 .expect("writing blob");
381 bw.close().await.expect("close blobwriter");
382 }
383
384 blob_service
385 }
386
387 #[tokio::test]
388 async fn test_read() {
389 let blob_service = gen_blobservice_blob1().await;
390 let mut chunked_reader =
391 ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
392
393 let mut buf = Vec::new();
395 tokio::io::copy(&mut chunked_reader, &mut buf)
396 .await
397 .expect("copy");
398
399 assert_eq!(
400 hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
401 buf,
402 "read data must match"
403 );
404 }
405
406 #[tokio::test]
407 async fn test_seek() {
408 let blob_service = gen_blobservice_blob1().await;
409 let mut chunked_reader =
410 ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
411
412 {
415 chunked_reader
416 .seek(SeekFrom::End(0))
417 .await
418 .expect("seek to end");
419
420 let mut buf = Vec::new();
421 chunked_reader
422 .read_to_end(&mut buf)
423 .await
424 .expect("read to end");
425
426 assert_eq!(hex!("").to_vec(), buf);
427 }
428
429 {
431 chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
432
433 let mut buf = Vec::new();
434 chunked_reader
435 .read_to_end(&mut buf)
436 .await
437 .expect("read to end");
438
439 assert_eq!(hex!("0f").to_vec(), buf);
440 }
441
442 {
445 chunked_reader
446 .seek(SeekFrom::Current(-3))
447 .await
448 .expect("seek");
449
450 let mut buf = [0b0; 2];
451 chunked_reader
452 .read_exact(&mut buf)
453 .await
454 .expect("read exact");
455
456 assert_eq!(hex!("0d0e"), buf);
457 }
458 }
459
460 #[tokio::test]
464 async fn test_read_missing_chunks() {
465 let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
466
467 for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
468 let mut bw = blob_service.open_write().await;
469 tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
470 .await
471 .expect("writing blob");
472
473 bw.close().await.expect("close blobwriter");
474 }
475
476 let mut chunked_reader =
477 ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
478
479 let mut buf = [0b0; 5];
481 chunked_reader
482 .read_exact(&mut buf)
483 .await
484 .expect("read exact");
485
486 assert_eq!(hex!("0001020304"), buf);
487
488 chunked_reader
490 .seek(SeekFrom::Current(2))
491 .await
492 .expect("seek");
493
494 let mut buf = Vec::new();
495 chunked_reader
496 .read_to_end(&mut buf)
497 .await
498 .expect_err("must fail");
499
500 }
503}