1use std::{
2 collections::{HashMap, hash_map},
3 io::{self, Cursor},
4 pin::pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use data_encoding::HEXLOWER;
10use fastcdc::v2020::AsyncStreamCDC;
11use futures::{Future, TryStreamExt};
12use object_store::{ObjectStore, path::Path};
13use pin_project_lite::pin_project;
14use prost::Message;
15use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
16use tonic::async_trait;
17use tracing::{Level, debug, instrument, trace};
18use url::Url;
19
20use crate::{
21 B3Digest, B3HashingReader,
22 composition::{CompositionContext, ServiceBuilder},
23 proto::{StatBlobResponse, stat_blob_response::ChunkMeta},
24};
25
26use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
27
28const COCURRENT_CHUNK_UPLOADS: usize = 64;
30
31#[derive(Clone)]
68pub struct ObjectStoreBlobService {
69 instance_name: String,
70 object_store: Arc<dyn ObjectStore>,
71 base_path: Path,
72
73 avg_chunk_size: u32,
76}
77
78#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
79fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
80 base_path
81 .child("blobs")
82 .child("b3")
83 .child(HEXLOWER.encode(&digest[..2]))
84 .child(HEXLOWER.encode(&digest[..]))
85}
86
87#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
88fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
89 base_path
90 .child("chunks")
91 .child("b3")
92 .child(HEXLOWER.encode(&digest[..2]))
93 .child(HEXLOWER.encode(&digest[..]))
94}
95
96#[async_trait]
97impl BlobService for ObjectStoreBlobService {
98 #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
99 async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
100 let p = derive_blob_path(&self.base_path, digest);
103
104 match self.object_store.head(&p).await {
105 Ok(_) => Ok(true),
106 Err(object_store::Error::NotFound { .. }) => {
107 let p = derive_chunk_path(&self.base_path, digest);
108 match self.object_store.head(&p).await {
109 Ok(_) => Ok(true),
110 Err(object_store::Error::NotFound { .. }) => Ok(false),
111 Err(e) => Err(e)?,
112 }
113 }
114 Err(e) => Err(e)?,
115 }
116 }
117
118 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
119 async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
120 if digest.as_slice() == blake3::hash(b"").as_bytes() {
122 return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
123 }
124 match self
125 .object_store
126 .get(&derive_chunk_path(&self.base_path, digest))
127 .await
128 {
129 Ok(res) => {
130 let chunk_raw_bytes = res.bytes().await?;
136 let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
137
138 if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
139 Err(io::Error::other("chunk contents invalid"))?;
140 }
141
142 Ok(Some(Box::new(Cursor::new(chunk_contents))))
143 }
144 Err(object_store::Error::NotFound { .. }) => {
145 if let Some(chunks) = self.chunks(digest).await? {
155 let chunked_reader = ChunkedReader::from_chunks(
156 chunks.into_iter().map(|chunk| {
157 (
158 chunk.digest.try_into().expect("invalid b3 digest"),
159 chunk.size,
160 )
161 }),
162 Arc::new(self.clone()) as Arc<dyn BlobService>,
163 );
164
165 Ok(Some(Box::new(chunked_reader)))
166 } else {
167 Ok(None)
169 }
170 }
171 Err(e) => Err(e.into()),
172 }
173 }
174
175 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
176 async fn open_write(&self) -> Box<dyn BlobWriter> {
177 let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
183
184 Box::new(ObjectStoreBlobWriter {
185 writer: Some(w),
186 fut: Some(Box::pin(chunk_and_upload(
187 r,
188 self.object_store.clone(),
189 self.base_path.clone(),
190 self.avg_chunk_size / 2,
191 self.avg_chunk_size,
192 self.avg_chunk_size * 2,
193 ))),
194 fut_output: None,
195 })
196 }
197
198 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
199 async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
200 match self
201 .object_store
202 .get(&derive_blob_path(&self.base_path, digest))
203 .await
204 {
205 Ok(get_result) => {
206 let blob_data = get_result.bytes().await?;
208 let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
210
211 debug!(
212 chunk.count = stat_blob_response.chunks.len(),
213 blob.size = stat_blob_response
214 .chunks
215 .iter()
216 .map(|x| x.size)
217 .sum::<u64>(),
218 "found more granular chunks"
219 );
220
221 Ok(Some(stat_blob_response.chunks))
222 }
223 Err(object_store::Error::NotFound { .. }) => {
224 match self
226 .object_store
227 .head(&derive_chunk_path(&self.base_path, digest))
228 .await
229 {
230 Ok(_) => {
231 debug!("found a single chunk");
233 Ok(Some(vec![]))
234 }
235 Err(object_store::Error::NotFound { .. }) => {
236 debug!("not found");
238 Ok(None)
239 }
240 Err(e) => Err(e.into()),
242 }
243 }
244 Err(err) => Err(err.into()),
246 }
247 }
248}
249
250fn default_avg_chunk_size() -> u32 {
251 256 * 1024
252}
253
254#[derive(serde::Deserialize)]
255#[serde(deny_unknown_fields)]
256pub struct ObjectStoreBlobServiceConfig {
257 object_store_url: String,
258 #[serde(default = "default_avg_chunk_size")]
259 avg_chunk_size: u32,
260 object_store_options: HashMap<String, String>,
261}
262
263impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
264 type Error = Box<dyn std::error::Error + Send + Sync>;
265 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
271 let trimmed_url = {
274 let s = url.to_string();
275 let mut url = Url::parse(
276 s.strip_prefix("objectstore+")
277 .ok_or("Missing objectstore uri")?,
278 )?;
279 url.set_query(None);
281 url
282 };
283 Ok(ObjectStoreBlobServiceConfig {
284 object_store_url: trimmed_url.into(),
285 object_store_options: url
286 .query_pairs()
287 .into_iter()
288 .map(|(k, v)| (k.to_string(), v.to_string()))
289 .collect(),
290 avg_chunk_size: 256 * 1024,
291 })
292 }
293}
294
295#[async_trait]
296impl ServiceBuilder for ObjectStoreBlobServiceConfig {
297 type Output = dyn BlobService;
298 async fn build<'a>(
299 &'a self,
300 instance_name: &str,
301 _context: &CompositionContext,
302 ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
303 let opts = {
304 let mut opts: HashMap<&str, _> = self
305 .object_store_options
306 .iter()
307 .map(|(k, v)| (k.as_str(), v.as_str()))
308 .collect();
309
310 if let hash_map::Entry::Vacant(e) =
311 opts.entry(object_store::ClientConfigKey::UserAgent.as_ref())
312 {
313 e.insert(crate::USER_AGENT);
314 }
315
316 opts
317 };
318
319 let (object_store, path) =
320 object_store::parse_url_opts(&self.object_store_url.parse()?, opts)?;
321 Ok(Arc::new(ObjectStoreBlobService {
322 instance_name: instance_name.to_string(),
323 object_store: Arc::new(object_store),
324 base_path: path,
325 avg_chunk_size: self.avg_chunk_size,
326 }))
327 }
328}
329
330#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
333async fn chunk_and_upload<R: AsyncRead + Unpin>(
334 r: R,
335 object_store: Arc<dyn ObjectStore>,
336 base_path: Path,
337 min_chunk_size: u32,
338 avg_chunk_size: u32,
339 max_chunk_size: u32,
340) -> io::Result<B3Digest> {
341 let mut b3_r = B3HashingReader::from(r);
343 let mut chunker =
345 AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
346
347 let chunks = chunker
350 .as_stream()
351 .err_into()
352 .map_ok(|chunk_data| {
353 let object_store = object_store.clone();
354 let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
355 let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
356 upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data)
357 })
358 .try_buffered(COCURRENT_CHUNK_UPLOADS)
359 .try_collect::<Vec<ChunkMeta>>()
360 .await?;
361
362 let chunks = if chunks.len() < 2 {
363 vec![]
367 } else {
368 chunks
369 };
370
371 let stat_blob_response = StatBlobResponse {
372 chunks,
373 bao: "".into(), };
375
376 let blob_digest: B3Digest = b3_r.digest().into();
378 let blob_path = derive_blob_path(&base_path, &blob_digest);
379
380 match object_store.head(&blob_path).await {
381 Ok(_) => {
383 trace!(
384 blob.digest = %blob_digest,
385 blob.path = %blob_path,
386 "blob already exists on backend"
387 );
388 }
389 Err(object_store::Error::NotFound { .. }) => {
391 debug!(
392 blob.digest = %blob_digest,
393 blob.path = %blob_path,
394 "uploading blob"
395 );
396 object_store
397 .put(&blob_path, stat_blob_response.encode_to_vec().into())
398 .await?;
399 }
400 Err(err) => {
401 Err(err)?
403 }
404 }
405
406 Ok(blob_digest)
407}
408
409#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
411async fn upload_chunk(
412 object_store: Arc<dyn ObjectStore>,
413 chunk_digest: B3Digest,
414 chunk_path: Path,
415 chunk_data: Vec<u8>,
416) -> std::io::Result<ChunkMeta> {
417 let chunk_size = chunk_data.len();
418 match object_store.head(&chunk_path).await {
419 Ok(_) => {
421 debug!("chunk already exists");
422 }
423
424 Err(object_store::Error::NotFound { .. }) => {
426 let chunk_data_compressed =
427 zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
428
429 debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
430
431 object_store
432 .as_ref()
433 .put(&chunk_path, chunk_data_compressed.into())
434 .await?;
435 }
436 Err(err) => Err(err)?,
438 }
439
440 Ok(ChunkMeta {
441 digest: chunk_digest.into(),
442 size: chunk_size as u64,
443 })
444}
445
446pin_project! {
447 pub struct ObjectStoreBlobWriter<W, Fut>
455 where
456 W: AsyncWrite,
457 Fut: Future,
458 {
459 #[pin]
460 writer: Option<W>,
461
462 #[pin]
463 fut: Option<Fut>,
464
465 fut_output: Option<io::Result<B3Digest>>
466 }
467}
468
469impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
470where
471 W: AsyncWrite + Send + Unpin,
472 Fut: Future,
473{
474 fn poll_write(
475 self: std::pin::Pin<&mut Self>,
476 cx: &mut std::task::Context<'_>,
477 buf: &[u8],
478 ) -> std::task::Poll<Result<usize, io::Error>> {
479 let this = self.project();
480 let fut = this.fut.as_pin_mut().expect("not future");
482 let fut_p = fut.poll(cx);
483 if fut_p.is_ready() {
487 return Poll::Ready(Err(io::Error::other("upload failed")));
488 }
489
490 this.writer
492 .as_pin_mut()
493 .expect("writer must be some")
494 .poll_write(cx, buf)
495 }
496
497 fn poll_flush(
498 self: std::pin::Pin<&mut Self>,
499 cx: &mut std::task::Context<'_>,
500 ) -> std::task::Poll<Result<(), io::Error>> {
501 let this = self.project();
502 let fut = this.fut.as_pin_mut().expect("not future");
504 let fut_p = fut.poll(cx);
505 if fut_p.is_ready() {
509 return Poll::Ready(Err(io::Error::other("upload failed")));
510 }
511
512 this.writer
514 .as_pin_mut()
515 .expect("writer must be some")
516 .poll_flush(cx)
517 }
518
519 fn poll_shutdown(
520 self: std::pin::Pin<&mut Self>,
521 _cx: &mut std::task::Context<'_>,
522 ) -> std::task::Poll<Result<(), io::Error>> {
523 std::task::Poll::Ready(Ok(()))
526 }
527}
528
529#[async_trait]
530impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
531where
532 W: AsyncWrite + Send + Unpin,
533 Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
534{
535 async fn close(&mut self) -> io::Result<B3Digest> {
536 match self.writer.take() {
537 Some(mut writer) => {
538 writer.shutdown().await?;
540
541 let fut = self.fut.take().expect("fut must be some");
543 let resp = pin!(fut).await;
545
546 match resp.as_ref() {
547 Ok(b3_digest) => {
550 self.fut_output = Some(Ok(*b3_digest));
551 }
552 Err(e) => {
553 self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
557 }
558 }
559 resp
560 }
561 None => {
562 match self.fut_output.as_ref().unwrap() {
564 Ok(b3_digest) => Ok(*b3_digest),
565 Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
566 }
567 }
568 }
569 }
570}
571
572#[cfg(test)]
573mod test {
574 use super::{chunk_and_upload, default_avg_chunk_size};
575 use crate::{
576 blobservice::{BlobService, ObjectStoreBlobService},
577 fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
578 };
579 use std::{io::Cursor, sync::Arc};
580 use url::Url;
581
582 #[rstest::rstest]
584 #[case::a(&BLOB_A, &BLOB_A_DIGEST)]
585 #[case::b(&BLOB_B, &BLOB_B_DIGEST)]
586 #[tokio::test]
587 async fn test_chunk_and_upload(
588 #[case] blob: &bytes::Bytes,
589 #[case] blob_digest: &crate::B3Digest,
590 ) {
591 let (object_store, base_path) =
592 object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
593 let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
594 let blobsvc = Arc::new(ObjectStoreBlobService {
595 instance_name: "test".into(),
596 object_store: object_store.clone(),
597 avg_chunk_size: default_avg_chunk_size(),
598 base_path,
599 });
600
601 let inserted_blob_digest = chunk_and_upload(
602 &mut Cursor::new(blob.to_vec()),
603 object_store,
604 object_store::path::Path::from("/"),
605 1024 / 2,
606 1024,
607 1024 * 2,
608 )
609 .await
610 .expect("chunk_and_upload succeeds");
611
612 assert_eq!(blob_digest.clone(), inserted_blob_digest);
613
614 assert!(blobsvc.has(blob_digest).await.unwrap());
616
617 let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
619 if blob.len() < 1024 / 2 {
620 assert!(chunks.is_empty());
622 } else if blob.len() > 1024 * 2 {
623 assert!(chunks.len() >= 2);
626 }
627 }
628}