1use std::{
2 collections::{HashMap, hash_map},
3 io::{self, Cursor},
4 pin::pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use data_encoding::HEXLOWER;
10use fastcdc::v2020::AsyncStreamCDC;
11use futures::{Future, TryStreamExt};
12use object_store::{ObjectStore, ObjectStoreExt, ObjectStoreScheme, path::Path};
13use pin_project_lite::pin_project;
14use prost::Message;
15use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
16use tonic::async_trait;
17use tracing::{Level, debug, instrument, trace};
18use url::Url;
19
20use crate::{
21 B3Digest, B3HashingReader,
22 composition::{CompositionContext, ServiceBuilder},
23 proto::{StatBlobResponse, stat_blob_response::ChunkMeta},
24};
25
26use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
27
28#[cfg(feature = "cloud")]
29mod aws;
30
31const CONCURRENT_CHUNK_UPLOADS: usize = 64;
33
34#[derive(Clone)]
71pub struct ObjectStoreBlobService {
72 instance_name: String,
73 object_store: Arc<dyn ObjectStore>,
74 base_path: Path,
75
76 avg_chunk_size: u32,
79}
80
81#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
82fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
83 base_path
84 .child("blobs")
85 .child("b3")
86 .child(HEXLOWER.encode(&digest[..2]))
87 .child(HEXLOWER.encode(&digest[..]))
88}
89
90#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
91fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
92 base_path
93 .child("chunks")
94 .child("b3")
95 .child(HEXLOWER.encode(&digest[..2]))
96 .child(HEXLOWER.encode(&digest[..]))
97}
98
99#[async_trait]
100impl BlobService for ObjectStoreBlobService {
101 #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
102 async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
103 let p = derive_blob_path(&self.base_path, digest);
106
107 match self.object_store.head(&p).await {
108 Ok(_) => Ok(true),
109 Err(object_store::Error::NotFound { .. }) => {
110 let p = derive_chunk_path(&self.base_path, digest);
111 match self.object_store.head(&p).await {
112 Ok(_) => Ok(true),
113 Err(object_store::Error::NotFound { .. }) => Ok(false),
114 Err(e) => Err(e)?,
115 }
116 }
117 Err(e) => Err(e)?,
118 }
119 }
120
121 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
122 async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
123 if digest.as_slice() == blake3::hash(b"").as_bytes() {
125 return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
126 }
127 match self
128 .object_store
129 .get(&derive_chunk_path(&self.base_path, digest))
130 .await
131 {
132 Ok(res) => {
133 let chunk_raw_bytes = res.bytes().await?;
139 let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
140
141 if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
142 Err(io::Error::other("chunk contents invalid"))?;
143 }
144
145 Ok(Some(Box::new(Cursor::new(chunk_contents))))
146 }
147 Err(object_store::Error::NotFound { .. }) => {
148 if let Some(chunks) = self.chunks(digest).await? {
158 let chunked_reader = ChunkedReader::from_chunks(
159 chunks.into_iter().map(|chunk| {
160 (
161 chunk.digest.try_into().expect("invalid b3 digest"),
162 chunk.size,
163 )
164 }),
165 Arc::new(self.clone()) as Arc<dyn BlobService>,
166 );
167
168 Ok(Some(Box::new(chunked_reader)))
169 } else {
170 Ok(None)
172 }
173 }
174 Err(e) => Err(e.into()),
175 }
176 }
177
178 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
179 async fn open_write(&self) -> Box<dyn BlobWriter> {
180 let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
186
187 Box::new(ObjectStoreBlobWriter {
188 writer: Some(w),
189 fut: Some(Box::pin(chunk_and_upload(
190 r,
191 self.object_store.clone(),
192 self.base_path.clone(),
193 self.avg_chunk_size / 2,
194 self.avg_chunk_size,
195 self.avg_chunk_size * 2,
196 ))),
197 fut_output: None,
198 })
199 }
200
201 #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
202 async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
203 match self
204 .object_store
205 .get(&derive_blob_path(&self.base_path, digest))
206 .await
207 {
208 Ok(get_result) => {
209 let blob_data = get_result.bytes().await?;
211 let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
213
214 debug!(
215 chunk.count = stat_blob_response.chunks.len(),
216 blob.size = stat_blob_response
217 .chunks
218 .iter()
219 .map(|x| x.size)
220 .sum::<u64>(),
221 "found more granular chunks"
222 );
223
224 Ok(Some(stat_blob_response.chunks))
225 }
226 Err(object_store::Error::NotFound { .. }) => {
227 match self
229 .object_store
230 .head(&derive_chunk_path(&self.base_path, digest))
231 .await
232 {
233 Ok(_) => {
234 debug!("found a single chunk");
236 Ok(Some(vec![]))
237 }
238 Err(object_store::Error::NotFound { .. }) => {
239 debug!("not found");
241 Ok(None)
242 }
243 Err(e) => Err(e.into()),
245 }
246 }
247 Err(err) => Err(err.into()),
249 }
250 }
251}
252
253fn default_avg_chunk_size() -> u32 {
254 256 * 1024
255}
256
257#[derive(serde::Deserialize)]
258#[serde(deny_unknown_fields)]
259pub struct ObjectStoreBlobServiceConfig {
260 object_store_url: String,
261 #[serde(default = "default_avg_chunk_size")]
262 avg_chunk_size: u32,
263 object_store_options: HashMap<String, String>,
264}
265
266impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
267 type Error = Box<dyn std::error::Error + Send + Sync>;
268 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
274 let trimmed_url = {
277 let s = url.to_string();
278 let mut url = Url::parse(
279 s.strip_prefix("objectstore+")
280 .ok_or("Missing objectstore uri")?,
281 )?;
282 url.set_query(None);
284 url
285 };
286 Ok(ObjectStoreBlobServiceConfig {
287 object_store_url: trimmed_url.into(),
288 object_store_options: url
289 .query_pairs()
290 .into_iter()
291 .map(|(k, v)| (k.to_string(), v.to_string()))
292 .collect(),
293 avg_chunk_size: 256 * 1024,
294 })
295 }
296}
297
298#[async_trait]
299impl ServiceBuilder for ObjectStoreBlobServiceConfig {
300 type Output = dyn BlobService;
301 async fn build<'a>(
302 &'a self,
303 instance_name: &str,
304 _context: &CompositionContext,
305 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
306 let opts = {
307 let mut opts: HashMap<&str, _> = self
308 .object_store_options
309 .iter()
310 .map(|(k, v)| (k.as_str(), v.as_str()))
311 .collect();
312
313 if let hash_map::Entry::Vacant(e) =
314 opts.entry(object_store::ClientConfigKey::UserAgent.as_ref())
315 {
316 e.insert(crate::USER_AGENT);
317 }
318
319 opts
320 };
321
322 let object_store_url: url::Url = self.object_store_url.parse()?;
324 let (object_store_scheme, path) =
325 object_store::ObjectStoreScheme::parse(&object_store_url)?;
326
327 let (object_store, path) = match object_store_scheme {
328 #[cfg(feature = "cloud")]
329 ObjectStoreScheme::AmazonS3 => {
330 if object_store_url.scheme() != "s3" {
332 return Err(Box::new(std::io::Error::new(
333 std::io::ErrorKind::InvalidInput,
334 "only s3://-style URLs supported",
335 )));
336 }
337
338 let store = aws::setup_aws_object_store(&object_store_url, opts).await?;
339 (Box::new(store) as Box<dyn ObjectStore>, path)
340 }
341 _ => object_store::parse_url_opts(&object_store_url, opts)?,
342 };
343
344 Ok(Arc::new(ObjectStoreBlobService {
345 instance_name: instance_name.to_string(),
346 object_store: Arc::new(object_store),
347 base_path: path,
348 avg_chunk_size: self.avg_chunk_size,
349 }))
350 }
351}
352
353#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
356async fn chunk_and_upload<R: AsyncRead + Unpin>(
357 r: R,
358 object_store: Arc<dyn ObjectStore>,
359 base_path: Path,
360 min_chunk_size: u32,
361 avg_chunk_size: u32,
362 max_chunk_size: u32,
363) -> io::Result<B3Digest> {
364 let mut b3_r = B3HashingReader::from(r);
366 let mut chunker =
368 AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
369
370 let chunks = chunker
373 .as_stream()
374 .err_into()
375 .map_ok(|chunk_data| {
376 let object_store = object_store.clone();
377 let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
378 let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
379 upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data)
380 })
381 .try_buffered(CONCURRENT_CHUNK_UPLOADS)
382 .try_collect::<Vec<ChunkMeta>>()
383 .await?;
384
385 let chunks = if chunks.len() < 2 {
386 vec![]
390 } else {
391 chunks
392 };
393
394 let stat_blob_response = StatBlobResponse {
395 chunks,
396 bao: "".into(), };
398
399 let blob_digest: B3Digest = b3_r.digest().into();
401 let blob_path = derive_blob_path(&base_path, &blob_digest);
402
403 match object_store.head(&blob_path).await {
404 Ok(_) => {
406 trace!(
407 blob.digest = %blob_digest,
408 blob.path = %blob_path,
409 "blob already exists on backend"
410 );
411 }
412 Err(object_store::Error::NotFound { .. }) => {
414 debug!(
415 blob.digest = %blob_digest,
416 blob.path = %blob_path,
417 "uploading blob"
418 );
419 object_store
420 .put(&blob_path, stat_blob_response.encode_to_vec().into())
421 .await?;
422 }
423 Err(err) => {
424 Err(err)?
426 }
427 }
428
429 Ok(blob_digest)
430}
431
432#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
434async fn upload_chunk(
435 object_store: Arc<dyn ObjectStore>,
436 chunk_digest: B3Digest,
437 chunk_path: Path,
438 chunk_data: Vec<u8>,
439) -> std::io::Result<ChunkMeta> {
440 let chunk_size = chunk_data.len();
441 match object_store.head(&chunk_path).await {
442 Ok(_) => {
444 debug!("chunk already exists");
445 }
446
447 Err(object_store::Error::NotFound { .. }) => {
449 let chunk_data_compressed =
450 zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
451
452 debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
453
454 object_store
455 .as_ref()
456 .put(&chunk_path, chunk_data_compressed.into())
457 .await?;
458 }
459 Err(err) => Err(err)?,
461 }
462
463 Ok(ChunkMeta {
464 digest: chunk_digest.into(),
465 size: chunk_size as u64,
466 })
467}
468
469pin_project! {
470 pub struct ObjectStoreBlobWriter<W, Fut>
478 where
479 W: AsyncWrite,
480 Fut: Future,
481 {
482 #[pin]
483 writer: Option<W>,
484
485 #[pin]
486 fut: Option<Fut>,
487
488 fut_output: Option<io::Result<B3Digest>>
489 }
490}
491
492impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
493where
494 W: AsyncWrite + Send + Unpin,
495 Fut: Future,
496{
497 fn poll_write(
498 self: std::pin::Pin<&mut Self>,
499 cx: &mut std::task::Context<'_>,
500 buf: &[u8],
501 ) -> std::task::Poll<Result<usize, io::Error>> {
502 let this = self.project();
503 let fut = this.fut.as_pin_mut().expect("not future");
505 let fut_p = fut.poll(cx);
506 if fut_p.is_ready() {
510 return Poll::Ready(Err(io::Error::other("upload failed")));
511 }
512
513 this.writer
515 .as_pin_mut()
516 .expect("writer must be some")
517 .poll_write(cx, buf)
518 }
519
520 fn poll_flush(
521 self: std::pin::Pin<&mut Self>,
522 cx: &mut std::task::Context<'_>,
523 ) -> std::task::Poll<Result<(), io::Error>> {
524 let this = self.project();
525 let fut = this.fut.as_pin_mut().expect("not future");
527 let fut_p = fut.poll(cx);
528 if fut_p.is_ready() {
532 return Poll::Ready(Err(io::Error::other("upload failed")));
533 }
534
535 this.writer
537 .as_pin_mut()
538 .expect("writer must be some")
539 .poll_flush(cx)
540 }
541
542 fn poll_shutdown(
543 self: std::pin::Pin<&mut Self>,
544 _cx: &mut std::task::Context<'_>,
545 ) -> std::task::Poll<Result<(), io::Error>> {
546 std::task::Poll::Ready(Ok(()))
549 }
550}
551
552#[async_trait]
553impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
554where
555 W: AsyncWrite + Send + Unpin,
556 Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
557{
558 async fn close(&mut self) -> io::Result<B3Digest> {
559 match self.writer.take() {
560 Some(mut writer) => {
561 writer.shutdown().await?;
563
564 let fut = self.fut.take().expect("fut must be some");
566 let resp = pin!(fut).await;
568
569 match resp.as_ref() {
570 Ok(b3_digest) => {
573 self.fut_output = Some(Ok(*b3_digest));
574 }
575 Err(e) => {
576 self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
580 }
581 }
582 resp
583 }
584 None => {
585 match self.fut_output.as_ref().unwrap() {
587 Ok(b3_digest) => Ok(*b3_digest),
588 Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
589 }
590 }
591 }
592 }
593}
594
595#[cfg(test)]
596mod test {
597 use super::{chunk_and_upload, default_avg_chunk_size};
598 use crate::{
599 blobservice::{BlobService, ObjectStoreBlobService},
600 fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
601 };
602 use std::{io::Cursor, sync::Arc};
603 use url::Url;
604
605 #[rstest::rstest]
607 #[case::a(&BLOB_A, &BLOB_A_DIGEST)]
608 #[case::b(&BLOB_B, &BLOB_B_DIGEST)]
609 #[tokio::test]
610 async fn test_chunk_and_upload(
611 #[case] blob: &bytes::Bytes,
612 #[case] blob_digest: &crate::B3Digest,
613 ) {
614 let (object_store, base_path) =
615 object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
616 let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
617 let blobsvc = Arc::new(ObjectStoreBlobService {
618 instance_name: "test".into(),
619 object_store: object_store.clone(),
620 avg_chunk_size: default_avg_chunk_size(),
621 base_path,
622 });
623
624 let inserted_blob_digest = chunk_and_upload(
625 &mut Cursor::new(blob.to_vec()),
626 object_store,
627 object_store::path::Path::from("/"),
628 1024 / 2,
629 1024,
630 1024 * 2,
631 )
632 .await
633 .expect("chunk_and_upload succeeds");
634
635 assert_eq!(blob_digest.clone(), inserted_blob_digest);
636
637 assert!(blobsvc.has(blob_digest).await.unwrap());
639
640 let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
642 if blob.len() < 1024 / 2 {
643 assert!(chunks.is_empty());
645 } else if blob.len() > 1024 * 2 {
646 assert!(chunks.len() >= 2);
649 }
650 }
651}