1use std::{
2 collections::{HashMap, hash_map},
3 io::{self, Cursor},
4 pin::pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use data_encoding::HEXLOWER;
10use fastcdc::v2020::AsyncStreamCDC;
11use futures::Future;
12use object_store::{ObjectStore, path::Path};
13use pin_project_lite::pin_project;
14use prost::Message;
15use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
16use tokio_stream::StreamExt;
17use tonic::async_trait;
18use tracing::{Level, debug, instrument, trace};
19use url::Url;
20
21use crate::{
22 B3Digest, B3HashingReader, Error,
23 composition::{CompositionContext, ServiceBuilder},
24 proto::{StatBlobResponse, stat_blob_response::ChunkMeta},
25};
26
27use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
28
29#[derive(Clone)]
66pub struct ObjectStoreBlobService {
67 instance_name: String,
68 object_store: Arc<dyn ObjectStore>,
69 base_path: Path,
70
71 avg_chunk_size: u32,
74}
75
76#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
77fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
78 base_path
79 .child("blobs")
80 .child("b3")
81 .child(HEXLOWER.encode(&digest[..2]))
82 .child(HEXLOWER.encode(&digest[..]))
83}
84
85#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
86fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
87 base_path
88 .child("chunks")
89 .child("b3")
90 .child(HEXLOWER.encode(&digest[..2]))
91 .child(HEXLOWER.encode(&digest[..]))
92}
93
94#[async_trait]
95impl BlobService for ObjectStoreBlobService {
96 #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
97 async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
98 let p = derive_blob_path(&self.base_path, digest);
101
102 match self.object_store.head(&p).await {
103 Ok(_) => Ok(true),
104 Err(object_store::Error::NotFound { .. }) => {
105 let p = derive_chunk_path(&self.base_path, digest);
106 match self.object_store.head(&p).await {
107 Ok(_) => Ok(true),
108 Err(object_store::Error::NotFound { .. }) => Ok(false),
109 Err(e) => Err(e)?,
110 }
111 }
112 Err(e) => Err(e)?,
113 }
114 }
115
116 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
117 async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
118 if digest.as_slice() == blake3::hash(b"").as_bytes() {
120 return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
121 }
122 match self
123 .object_store
124 .get(&derive_chunk_path(&self.base_path, digest))
125 .await
126 {
127 Ok(res) => {
128 let chunk_raw_bytes = res.bytes().await?;
134 let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
135
136 if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
137 Err(io::Error::other("chunk contents invalid"))?;
138 }
139
140 Ok(Some(Box::new(Cursor::new(chunk_contents))))
141 }
142 Err(object_store::Error::NotFound { .. }) => {
143 if let Some(chunks) = self.chunks(digest).await? {
153 let chunked_reader = ChunkedReader::from_chunks(
154 chunks.into_iter().map(|chunk| {
155 (
156 chunk.digest.try_into().expect("invalid b3 digest"),
157 chunk.size,
158 )
159 }),
160 Arc::new(self.clone()) as Arc<dyn BlobService>,
161 );
162
163 Ok(Some(Box::new(chunked_reader)))
164 } else {
165 Ok(None)
167 }
168 }
169 Err(e) => Err(e.into()),
170 }
171 }
172
173 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
174 async fn open_write(&self) -> Box<dyn BlobWriter> {
175 let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
181
182 Box::new(ObjectStoreBlobWriter {
183 writer: Some(w),
184 fut: Some(Box::pin(chunk_and_upload(
185 r,
186 self.object_store.clone(),
187 self.base_path.clone(),
188 self.avg_chunk_size / 2,
189 self.avg_chunk_size,
190 self.avg_chunk_size * 2,
191 ))),
192 fut_output: None,
193 })
194 }
195
196 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
197 async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
198 match self
199 .object_store
200 .get(&derive_blob_path(&self.base_path, digest))
201 .await
202 {
203 Ok(get_result) => {
204 let blob_data = get_result.bytes().await?;
206 let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
208
209 debug!(
210 chunk.count = stat_blob_response.chunks.len(),
211 blob.size = stat_blob_response
212 .chunks
213 .iter()
214 .map(|x| x.size)
215 .sum::<u64>(),
216 "found more granular chunks"
217 );
218
219 Ok(Some(stat_blob_response.chunks))
220 }
221 Err(object_store::Error::NotFound { .. }) => {
222 match self
224 .object_store
225 .head(&derive_chunk_path(&self.base_path, digest))
226 .await
227 {
228 Ok(_) => {
229 debug!("found a single chunk");
231 Ok(Some(vec![]))
232 }
233 Err(object_store::Error::NotFound { .. }) => {
234 debug!("not found");
236 Ok(None)
237 }
238 Err(e) => Err(e.into()),
240 }
241 }
242 Err(err) => Err(err.into()),
244 }
245 }
246}
247
248fn default_avg_chunk_size() -> u32 {
249 256 * 1024
250}
251
252#[derive(serde::Deserialize)]
253#[serde(deny_unknown_fields)]
254pub struct ObjectStoreBlobServiceConfig {
255 object_store_url: String,
256 #[serde(default = "default_avg_chunk_size")]
257 avg_chunk_size: u32,
258 object_store_options: HashMap<String, String>,
259}
260
261impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
262 type Error = Box<dyn std::error::Error + Send + Sync>;
263 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
269 let trimmed_url = {
272 let s = url.to_string();
273 let mut url = Url::parse(
274 s.strip_prefix("objectstore+")
275 .ok_or(Error::StorageError("Missing objectstore uri".into()))?,
276 )?;
277 url.set_query(None);
279 url
280 };
281 Ok(ObjectStoreBlobServiceConfig {
282 object_store_url: trimmed_url.into(),
283 object_store_options: url
284 .query_pairs()
285 .into_iter()
286 .map(|(k, v)| (k.to_string(), v.to_string()))
287 .collect(),
288 avg_chunk_size: 256 * 1024,
289 })
290 }
291}
292
293#[async_trait]
294impl ServiceBuilder for ObjectStoreBlobServiceConfig {
295 type Output = dyn BlobService;
296 async fn build<'a>(
297 &'a self,
298 instance_name: &str,
299 _context: &CompositionContext,
300 ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
301 let opts = {
302 let mut opts: HashMap<&str, _> = self
303 .object_store_options
304 .iter()
305 .map(|(k, v)| (k.as_str(), v.as_str()))
306 .collect();
307
308 if let hash_map::Entry::Vacant(e) =
309 opts.entry(object_store::ClientConfigKey::UserAgent.as_ref())
310 {
311 e.insert(crate::USER_AGENT);
312 }
313
314 opts
315 };
316
317 let (object_store, path) =
318 object_store::parse_url_opts(&self.object_store_url.parse()?, opts)?;
319 Ok(Arc::new(ObjectStoreBlobService {
320 instance_name: instance_name.to_string(),
321 object_store: Arc::new(object_store),
322 base_path: path,
323 avg_chunk_size: self.avg_chunk_size,
324 }))
325 }
326}
327
328#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
331async fn chunk_and_upload<R: AsyncRead + Unpin>(
332 r: R,
333 object_store: Arc<dyn ObjectStore>,
334 base_path: Path,
335 min_chunk_size: u32,
336 avg_chunk_size: u32,
337 max_chunk_size: u32,
338) -> io::Result<B3Digest> {
339 let mut b3_r = B3HashingReader::from(r);
341 let mut chunker =
343 AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
344
345 async fn fastcdc_chunk_uploader(
349 resp: Result<fastcdc::v2020::ChunkData, fastcdc::v2020::Error>,
350 base_path: Path,
351 object_store: Arc<dyn ObjectStore>,
352 ) -> std::io::Result<ChunkMeta> {
353 let chunk_data = resp?;
354 let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
355 let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
356
357 upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data).await
358 }
359
360 let chunks = chunker
363 .as_stream()
364 .then(|resp| fastcdc_chunk_uploader(resp, base_path.clone(), object_store.clone()))
365 .collect::<io::Result<Vec<ChunkMeta>>>()
366 .await?;
367
368 let chunks = if chunks.len() < 2 {
369 vec![]
373 } else {
374 chunks
375 };
376
377 let stat_blob_response = StatBlobResponse {
378 chunks,
379 bao: "".into(), };
381
382 let blob_digest: B3Digest = b3_r.digest().into();
384 let blob_path = derive_blob_path(&base_path, &blob_digest);
385
386 match object_store.head(&blob_path).await {
387 Ok(_) => {
389 trace!(
390 blob.digest = %blob_digest,
391 blob.path = %blob_path,
392 "blob already exists on backend"
393 );
394 }
395 Err(object_store::Error::NotFound { .. }) => {
397 debug!(
398 blob.digest = %blob_digest,
399 blob.path = %blob_path,
400 "uploading blob"
401 );
402 object_store
403 .put(&blob_path, stat_blob_response.encode_to_vec().into())
404 .await?;
405 }
406 Err(err) => {
407 Err(err)?
409 }
410 }
411
412 Ok(blob_digest)
413}
414
415#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
417async fn upload_chunk(
418 object_store: Arc<dyn ObjectStore>,
419 chunk_digest: B3Digest,
420 chunk_path: Path,
421 chunk_data: Vec<u8>,
422) -> std::io::Result<ChunkMeta> {
423 let chunk_size = chunk_data.len();
424 match object_store.head(&chunk_path).await {
425 Ok(_) => {
427 debug!("chunk already exists");
428 }
429
430 Err(object_store::Error::NotFound { .. }) => {
432 let chunk_data_compressed =
433 zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
434
435 debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
436
437 object_store
438 .as_ref()
439 .put(&chunk_path, chunk_data_compressed.into())
440 .await?;
441 }
442 Err(err) => Err(err)?,
444 }
445
446 Ok(ChunkMeta {
447 digest: chunk_digest.into(),
448 size: chunk_size as u64,
449 })
450}
451
452pin_project! {
453 pub struct ObjectStoreBlobWriter<W, Fut>
461 where
462 W: AsyncWrite,
463 Fut: Future,
464 {
465 #[pin]
466 writer: Option<W>,
467
468 #[pin]
469 fut: Option<Fut>,
470
471 fut_output: Option<io::Result<B3Digest>>
472 }
473}
474
475impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
476where
477 W: AsyncWrite + Send + Unpin,
478 Fut: Future,
479{
480 fn poll_write(
481 self: std::pin::Pin<&mut Self>,
482 cx: &mut std::task::Context<'_>,
483 buf: &[u8],
484 ) -> std::task::Poll<Result<usize, io::Error>> {
485 let this = self.project();
486 let fut = this.fut.as_pin_mut().expect("not future");
488 let fut_p = fut.poll(cx);
489 if fut_p.is_ready() {
493 return Poll::Ready(Err(io::Error::other("upload failed")));
494 }
495
496 this.writer
498 .as_pin_mut()
499 .expect("writer must be some")
500 .poll_write(cx, buf)
501 }
502
503 fn poll_flush(
504 self: std::pin::Pin<&mut Self>,
505 cx: &mut std::task::Context<'_>,
506 ) -> std::task::Poll<Result<(), io::Error>> {
507 let this = self.project();
508 let fut = this.fut.as_pin_mut().expect("not future");
510 let fut_p = fut.poll(cx);
511 if fut_p.is_ready() {
515 return Poll::Ready(Err(io::Error::other("upload failed")));
516 }
517
518 this.writer
520 .as_pin_mut()
521 .expect("writer must be some")
522 .poll_flush(cx)
523 }
524
525 fn poll_shutdown(
526 self: std::pin::Pin<&mut Self>,
527 _cx: &mut std::task::Context<'_>,
528 ) -> std::task::Poll<Result<(), io::Error>> {
529 std::task::Poll::Ready(Ok(()))
532 }
533}
534
535#[async_trait]
536impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
537where
538 W: AsyncWrite + Send + Unpin,
539 Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
540{
541 async fn close(&mut self) -> io::Result<B3Digest> {
542 match self.writer.take() {
543 Some(mut writer) => {
544 writer.shutdown().await?;
546
547 let fut = self.fut.take().expect("fut must be some");
549 let resp = pin!(fut).await;
551
552 match resp.as_ref() {
553 Ok(b3_digest) => {
556 self.fut_output = Some(Ok(b3_digest.clone()));
557 }
558 Err(e) => {
559 self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
563 }
564 }
565 resp
566 }
567 None => {
568 match self.fut_output.as_ref().unwrap() {
570 Ok(b3_digest) => Ok(b3_digest.clone()),
571 Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
572 }
573 }
574 }
575 }
576}
577
578#[cfg(test)]
579mod test {
580 use super::{chunk_and_upload, default_avg_chunk_size};
581 use crate::{
582 blobservice::{BlobService, ObjectStoreBlobService},
583 fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
584 };
585 use std::{io::Cursor, sync::Arc};
586 use url::Url;
587
588 #[rstest::rstest]
590 #[case::a(&BLOB_A, &BLOB_A_DIGEST)]
591 #[case::b(&BLOB_B, &BLOB_B_DIGEST)]
592 #[tokio::test]
593 async fn test_chunk_and_upload(
594 #[case] blob: &bytes::Bytes,
595 #[case] blob_digest: &crate::B3Digest,
596 ) {
597 let (object_store, base_path) =
598 object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
599 let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
600 let blobsvc = Arc::new(ObjectStoreBlobService {
601 instance_name: "test".into(),
602 object_store: object_store.clone(),
603 avg_chunk_size: default_avg_chunk_size(),
604 base_path,
605 });
606
607 let inserted_blob_digest = chunk_and_upload(
608 &mut Cursor::new(blob.to_vec()),
609 object_store,
610 object_store::path::Path::from("/"),
611 1024 / 2,
612 1024,
613 1024 * 2,
614 )
615 .await
616 .expect("chunk_and_upload succeeds");
617
618 assert_eq!(blob_digest.clone(), inserted_blob_digest);
619
620 assert!(blobsvc.has(blob_digest).await.unwrap());
622
623 let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
625 if blob.len() < 1024 / 2 {
626 assert!(chunks.is_empty());
628 } else if blob.len() > 1024 * 2 {
629 assert!(chunks.len() >= 2);
632 }
633 }
634}