snix_castore/blobservice/object_store/
mod.rs

1use std::{
2    collections::{HashMap, hash_map},
3    io::{self, Cursor},
4    pin::pin,
5    sync::Arc,
6    task::Poll,
7};
8
9use data_encoding::HEXLOWER;
10use fastcdc::v2020::AsyncStreamCDC;
11use futures::{Future, TryStreamExt};
12use object_store::{ObjectStore, ObjectStoreExt, ObjectStoreScheme, path::Path};
13use pin_project_lite::pin_project;
14use prost::Message;
15use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
16use tonic::async_trait;
17use tracing::{Level, debug, instrument, trace};
18use url::Url;
19
20use crate::{
21    B3Digest, B3HashingReader,
22    composition::{CompositionContext, ServiceBuilder},
23    proto::{StatBlobResponse, stat_blob_response::ChunkMeta},
24};
25
26use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
27
28#[cfg(feature = "cloud")]
29mod aws;
30
31/// The number of chunks that will be uploaded in parallel, per blob.
32const CONCURRENT_CHUNK_UPLOADS: usize = 64;
33
34/// Uses any object storage supported by the [object_store] crate to provide a
35/// snix-castore [BlobService].
36///
37/// # Data format
38/// Data is organized in "blobs" and "chunks".
39/// Blobs don't hold the actual data, but instead contain a list of more
40/// granular chunks that assemble to the contents requested.
41/// This allows clients to seek, and not download chunks they already have
42/// locally, as it's referred to from other files.
43/// Check `rpc_blobstore` and more general BlobStore docs on that.
44///
45/// ## Blobs
46/// Stored at `${base_path}/blobs/b3/$digest_key`. They contains the serialized
47/// StatBlobResponse for the blob with the digest.
48///
49/// ## Chunks
50/// Chunks are stored at `${base_path}/chunks/b3/$digest_key`. They contain
51/// the literal contents of the chunk, but are zstd-compressed.
52///
53/// ## Digest key sharding
54/// The blake3 digest encoded in lower hex, and sharded after the second
55/// character.
56/// The blob for "Hello World" is stored at
57/// `${base_path}/blobs/b3/41/41f8394111eb713a22165c46c90ab8f0fd9399c92028fd6d288944b23ff5bf76`.
58///
59/// This reduces the number of files in the same directory, which would be a
60/// problem at least when using [object_store::local::LocalFileSystem].
61///
62/// # Future changes
63/// There's no guarantees about this being a final format yet.
64/// Once object_store gets support for additional metadata / content-types,
65/// we can eliminate some requests (small blobs only consisting of a single
66/// chunk can be stored as-is, without the blob index file).
67/// It also allows signalling any compression of chunks in the content-type.
68/// Migration *should* be possible by simply adding the right content-types to
69/// all keys stored so far, but no promises ;-)
70#[derive(Clone)]
71pub struct ObjectStoreBlobService {
72    instance_name: String,
73    object_store: Arc<dyn ObjectStore>,
74    base_path: Path,
75
76    /// Average chunk size for FastCDC, in bytes.
77    /// min value is half, max value double of that number.
78    avg_chunk_size: u32,
79}
80
81#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
82fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
83    base_path
84        .child("blobs")
85        .child("b3")
86        .child(HEXLOWER.encode(&digest[..2]))
87        .child(HEXLOWER.encode(&digest[..]))
88}
89
90#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
91fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
92    base_path
93        .child("chunks")
94        .child("b3")
95        .child(HEXLOWER.encode(&digest[..2]))
96        .child(HEXLOWER.encode(&digest[..]))
97}
98
99#[async_trait]
100impl BlobService for ObjectStoreBlobService {
101    #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
102    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
103        // TODO: clarify if this should work for chunks or not, and explicitly
104        // document in the proto docs.
105        let p = derive_blob_path(&self.base_path, digest);
106
107        match self.object_store.head(&p).await {
108            Ok(_) => Ok(true),
109            Err(object_store::Error::NotFound { .. }) => {
110                let p = derive_chunk_path(&self.base_path, digest);
111                match self.object_store.head(&p).await {
112                    Ok(_) => Ok(true),
113                    Err(object_store::Error::NotFound { .. }) => Ok(false),
114                    Err(e) => Err(e)?,
115                }
116            }
117            Err(e) => Err(e)?,
118        }
119    }
120
121    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
122    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
123        // handle reading the empty blob.
124        if digest.as_slice() == blake3::hash(b"").as_bytes() {
125            return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
126        }
127        match self
128            .object_store
129            .get(&derive_chunk_path(&self.base_path, digest))
130            .await
131        {
132            Ok(res) => {
133                // handle reading blobs that are small enough to fit inside a single chunk:
134                // fetch the entire chunk into memory, decompress, ensure the b3 digest matches,
135                // and return a io::Cursor over that data.
136                // FUTUREWORK: use zstd::bulk to prevent decompression bombs
137
138                let chunk_raw_bytes = res.bytes().await?;
139                let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
140
141                if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
142                    Err(io::Error::other("chunk contents invalid"))?;
143                }
144
145                Ok(Some(Box::new(Cursor::new(chunk_contents))))
146            }
147            Err(object_store::Error::NotFound { .. }) => {
148                // NOTE: For public-facing things, we would want to stop here.
149                // Clients should fetch granularly, so they can make use of
150                // chunks they have locally.
151                // However, if this is used directly, without any caches, do the
152                // assembly here.
153                // This is subject to change, once we have store composition.
154                // TODO: make this configurable, and/or clarify behaviour for
155                // the gRPC server surface (explicitly document behaviour in the
156                // proto docs)
157                if let Some(chunks) = self.chunks(digest).await? {
158                    let chunked_reader = ChunkedReader::from_chunks(
159                        chunks.into_iter().map(|chunk| {
160                            (
161                                chunk.digest.try_into().expect("invalid b3 digest"),
162                                chunk.size,
163                            )
164                        }),
165                        Arc::new(self.clone()) as Arc<dyn BlobService>,
166                    );
167
168                    Ok(Some(Box::new(chunked_reader)))
169                } else {
170                    // This is neither a chunk nor a blob, return None.
171                    Ok(None)
172                }
173            }
174            Err(e) => Err(e.into()),
175        }
176    }
177
178    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
179    async fn open_write(&self) -> Box<dyn BlobWriter> {
180        // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
181        // needs an AsyncRead, so we create a pipe here.
182        // In its `AsyncWrite` implementation, `ObjectStoreBlobWriter` delegates
183        // writes to w. It periodically polls the future that's reading from the
184        // other side.
185        let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
186
187        Box::new(ObjectStoreBlobWriter {
188            writer: Some(w),
189            fut: Some(Box::pin(chunk_and_upload(
190                r,
191                self.object_store.clone(),
192                self.base_path.clone(),
193                self.avg_chunk_size / 2,
194                self.avg_chunk_size,
195                self.avg_chunk_size * 2,
196            ))),
197            fut_output: None,
198        })
199    }
200
201    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
202    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
203        match self
204            .object_store
205            .get(&derive_blob_path(&self.base_path, digest))
206            .await
207        {
208            Ok(get_result) => {
209                // fetch the data at the blob path
210                let blob_data = get_result.bytes().await?;
211                // parse into StatBlobResponse
212                let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
213
214                debug!(
215                    chunk.count = stat_blob_response.chunks.len(),
216                    blob.size = stat_blob_response
217                        .chunks
218                        .iter()
219                        .map(|x| x.size)
220                        .sum::<u64>(),
221                    "found more granular chunks"
222                );
223
224                Ok(Some(stat_blob_response.chunks))
225            }
226            Err(object_store::Error::NotFound { .. }) => {
227                // If there's only a chunk, we must return the empty vec here, rather than None.
228                match self
229                    .object_store
230                    .head(&derive_chunk_path(&self.base_path, digest))
231                    .await
232                {
233                    Ok(_) => {
234                        // present, but no more chunks available
235                        debug!("found a single chunk");
236                        Ok(Some(vec![]))
237                    }
238                    Err(object_store::Error::NotFound { .. }) => {
239                        // Neither blob nor single chunk found
240                        debug!("not found");
241                        Ok(None)
242                    }
243                    // error checking for chunk
244                    Err(e) => Err(e.into()),
245                }
246            }
247            // error checking for blob
248            Err(err) => Err(err.into()),
249        }
250    }
251}
252
253fn default_avg_chunk_size() -> u32 {
254    256 * 1024
255}
256
257#[derive(serde::Deserialize)]
258#[serde(deny_unknown_fields)]
259pub struct ObjectStoreBlobServiceConfig {
260    object_store_url: String,
261    #[serde(default = "default_avg_chunk_size")]
262    avg_chunk_size: u32,
263    object_store_options: HashMap<String, String>,
264}
265
266impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
267    type Error = Box<dyn std::error::Error + Send + Sync>;
268    /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by
269    /// [object_store].
270    /// Any path suffix becomes the base path of the object store.
271    /// additional options, the same as in [object_store::parse_url_opts] can
272    /// be passed.
273    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
274        // We need to convert the URL to string, strip the prefix there, and then
275        // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do.
276        let trimmed_url = {
277            let s = url.to_string();
278            let mut url = Url::parse(
279                s.strip_prefix("objectstore+")
280                    .ok_or("Missing objectstore uri")?,
281            )?;
282            // trim the query pairs, they might contain credentials or local settings we don't want to send as-is.
283            url.set_query(None);
284            url
285        };
286        Ok(ObjectStoreBlobServiceConfig {
287            object_store_url: trimmed_url.into(),
288            object_store_options: url
289                .query_pairs()
290                .into_iter()
291                .map(|(k, v)| (k.to_string(), v.to_string()))
292                .collect(),
293            avg_chunk_size: 256 * 1024,
294        })
295    }
296}
297
298#[async_trait]
299impl ServiceBuilder for ObjectStoreBlobServiceConfig {
300    type Output = dyn BlobService;
301    async fn build<'a>(
302        &'a self,
303        instance_name: &str,
304        _context: &CompositionContext,
305    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
306        let opts = {
307            let mut opts: HashMap<&str, _> = self
308                .object_store_options
309                .iter()
310                .map(|(k, v)| (k.as_str(), v.as_str()))
311                .collect();
312
313            if let hash_map::Entry::Vacant(e) =
314                opts.entry(object_store::ClientConfigKey::UserAgent.as_ref())
315            {
316                e.insert(crate::USER_AGENT);
317            }
318
319            opts
320        };
321
322        // object_store doesn't sufficiently support the AWS credential chain.
323        let object_store_url: url::Url = self.object_store_url.parse()?;
324        let (object_store_scheme, path) =
325            object_store::ObjectStoreScheme::parse(&object_store_url)?;
326
327        let (object_store, path) = match object_store_scheme {
328            #[cfg(feature = "cloud")]
329            ObjectStoreScheme::AmazonS3 => {
330                // In the AWS case, we only support s3:// URLs.
331                if object_store_url.scheme() != "s3" {
332                    return Err(Box::new(std::io::Error::new(
333                        std::io::ErrorKind::InvalidInput,
334                        "only s3://-style URLs supported",
335                    )));
336                }
337
338                let store = aws::setup_aws_object_store(&object_store_url, opts).await?;
339                (Box::new(store) as Box<dyn ObjectStore>, path)
340            }
341            _ => object_store::parse_url_opts(&object_store_url, opts)?,
342        };
343
344        Ok(Arc::new(ObjectStoreBlobService {
345            instance_name: instance_name.to_string(),
346            object_store: Arc::new(object_store),
347            base_path: path,
348            avg_chunk_size: self.avg_chunk_size,
349        }))
350    }
351}
352
353/// Reads blob contents from a AsyncRead, chunks and uploads them.
354/// On success, returns a [StatBlobResponse] pointing to the individual chunks.
355#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
356async fn chunk_and_upload<R: AsyncRead + Unpin>(
357    r: R,
358    object_store: Arc<dyn ObjectStore>,
359    base_path: Path,
360    min_chunk_size: u32,
361    avg_chunk_size: u32,
362    max_chunk_size: u32,
363) -> io::Result<B3Digest> {
364    // wrap reader with something calculating the blake3 hash of all data read.
365    let mut b3_r = B3HashingReader::from(r);
366    // set up a fastcdc chunker
367    let mut chunker =
368        AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
369
370    // Use the fastcdc chunker to produce a stream of chunks, and upload these
371    // that don't exist to the backend.
372    let chunks = chunker
373        .as_stream()
374        .err_into()
375        .map_ok(|chunk_data| {
376            let object_store = object_store.clone();
377            let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
378            let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
379            upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data)
380        })
381        .try_buffered(CONCURRENT_CHUNK_UPLOADS)
382        .try_collect::<Vec<ChunkMeta>>()
383        .await?;
384
385    let chunks = if chunks.len() < 2 {
386        // The chunker returned only one chunk, which is the entire blob.
387        // According to the protocol, we must return an empty list of chunks
388        // when the blob is not split up further.
389        vec![]
390    } else {
391        chunks
392    };
393
394    let stat_blob_response = StatBlobResponse {
395        chunks,
396        bao: "".into(), // still todo
397    };
398
399    // check for Blob, if it doesn't exist, persist.
400    let blob_digest: B3Digest = b3_r.digest().into();
401    let blob_path = derive_blob_path(&base_path, &blob_digest);
402
403    match object_store.head(&blob_path).await {
404        // blob already exists, nothing to do
405        Ok(_) => {
406            trace!(
407                blob.digest = %blob_digest,
408                blob.path = %blob_path,
409                "blob already exists on backend"
410            );
411        }
412        // chunk does not yet exist, upload first
413        Err(object_store::Error::NotFound { .. }) => {
414            debug!(
415                blob.digest = %blob_digest,
416                blob.path = %blob_path,
417                "uploading blob"
418            );
419            object_store
420                .put(&blob_path, stat_blob_response.encode_to_vec().into())
421                .await?;
422        }
423        Err(err) => {
424            // other error
425            Err(err)?
426        }
427    }
428
429    Ok(blob_digest)
430}
431
432/// upload chunk if it doesn't exist yet.
433#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
434async fn upload_chunk(
435    object_store: Arc<dyn ObjectStore>,
436    chunk_digest: B3Digest,
437    chunk_path: Path,
438    chunk_data: Vec<u8>,
439) -> std::io::Result<ChunkMeta> {
440    let chunk_size = chunk_data.len();
441    match object_store.head(&chunk_path).await {
442        // chunk already exists, nothing to do
443        Ok(_) => {
444            debug!("chunk already exists");
445        }
446
447        // chunk does not yet exist, compress and upload.
448        Err(object_store::Error::NotFound { .. }) => {
449            let chunk_data_compressed =
450                zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
451
452            debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
453
454            object_store
455                .as_ref()
456                .put(&chunk_path, chunk_data_compressed.into())
457                .await?;
458        }
459        // other error
460        Err(err) => Err(err)?,
461    }
462
463    Ok(ChunkMeta {
464        digest: chunk_digest.into(),
465        size: chunk_size as u64,
466    })
467}
468
469pin_project! {
470    /// Takes care of blob uploads.
471    /// All writes are relayed to self.writer, and we continuously poll the
472    /// future (which will internally read from the other side of the pipe and
473    /// upload chunks).
474    /// Our BlobWriter::close() needs to drop self.writer, so the other side
475    /// will read EOF and can finalize the blob.
476    /// The future should then resolve and return the blob digest.
477    pub struct ObjectStoreBlobWriter<W, Fut>
478    where
479        W: AsyncWrite,
480        Fut: Future,
481    {
482        #[pin]
483        writer: Option<W>,
484
485        #[pin]
486        fut: Option<Fut>,
487
488        fut_output: Option<io::Result<B3Digest>>
489    }
490}
491
492impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
493where
494    W: AsyncWrite + Send + Unpin,
495    Fut: Future,
496{
497    fn poll_write(
498        self: std::pin::Pin<&mut Self>,
499        cx: &mut std::task::Context<'_>,
500        buf: &[u8],
501    ) -> std::task::Poll<Result<usize, io::Error>> {
502        let this = self.project();
503        // poll the future.
504        let fut = this.fut.as_pin_mut().expect("not future");
505        let fut_p = fut.poll(cx);
506        // if it's ready, the only way this could have happened is that the
507        // upload failed, because we're only closing `self.writer` after all
508        // writes happened.
509        if fut_p.is_ready() {
510            return Poll::Ready(Err(io::Error::other("upload failed")));
511        }
512
513        // write to the underlying writer
514        this.writer
515            .as_pin_mut()
516            .expect("writer must be some")
517            .poll_write(cx, buf)
518    }
519
520    fn poll_flush(
521        self: std::pin::Pin<&mut Self>,
522        cx: &mut std::task::Context<'_>,
523    ) -> std::task::Poll<Result<(), io::Error>> {
524        let this = self.project();
525        // poll the future.
526        let fut = this.fut.as_pin_mut().expect("not future");
527        let fut_p = fut.poll(cx);
528        // if it's ready, the only way this could have happened is that the
529        // upload failed, because we're only closing `self.writer` after all
530        // writes happened.
531        if fut_p.is_ready() {
532            return Poll::Ready(Err(io::Error::other("upload failed")));
533        }
534
535        // Call poll_flush on the writer
536        this.writer
537            .as_pin_mut()
538            .expect("writer must be some")
539            .poll_flush(cx)
540    }
541
542    fn poll_shutdown(
543        self: std::pin::Pin<&mut Self>,
544        _cx: &mut std::task::Context<'_>,
545    ) -> std::task::Poll<Result<(), io::Error>> {
546        // There's nothing to do on shutdown. We might have written some chunks
547        // that are nowhere else referenced, but cleaning them up here would be racy.
548        std::task::Poll::Ready(Ok(()))
549    }
550}
551
552#[async_trait]
553impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
554where
555    W: AsyncWrite + Send + Unpin,
556    Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
557{
558    async fn close(&mut self) -> io::Result<B3Digest> {
559        match self.writer.take() {
560            Some(mut writer) => {
561                // shut down the writer, so the other side will read EOF.
562                writer.shutdown().await?;
563
564                // take out the future.
565                let fut = self.fut.take().expect("fut must be some");
566                // await it.
567                let resp = pin!(fut).await;
568
569                match resp.as_ref() {
570                    // In the case of an Ok value, we store it in self.fut_output,
571                    // so future calls to close can return that.
572                    Ok(b3_digest) => {
573                        self.fut_output = Some(Ok(*b3_digest));
574                    }
575                    Err(e) => {
576                        // for the error type, we need to cheat a bit, as
577                        // they're not clone-able.
578                        // Simply store a sloppy clone, with the same ErrorKind and message there.
579                        self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
580                    }
581                }
582                resp
583            }
584            None => {
585                // called a second time, return self.fut_output.
586                match self.fut_output.as_ref().unwrap() {
587                    Ok(b3_digest) => Ok(*b3_digest),
588                    Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
589                }
590            }
591        }
592    }
593}
594
595#[cfg(test)]
596mod test {
597    use super::{chunk_and_upload, default_avg_chunk_size};
598    use crate::{
599        blobservice::{BlobService, ObjectStoreBlobService},
600        fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
601    };
602    use std::{io::Cursor, sync::Arc};
603    use url::Url;
604
605    /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write().
606    #[rstest::rstest]
607    #[case::a(&BLOB_A, &BLOB_A_DIGEST)]
608    #[case::b(&BLOB_B, &BLOB_B_DIGEST)]
609    #[tokio::test]
610    async fn test_chunk_and_upload(
611        #[case] blob: &bytes::Bytes,
612        #[case] blob_digest: &crate::B3Digest,
613    ) {
614        let (object_store, base_path) =
615            object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
616        let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
617        let blobsvc = Arc::new(ObjectStoreBlobService {
618            instance_name: "test".into(),
619            object_store: object_store.clone(),
620            avg_chunk_size: default_avg_chunk_size(),
621            base_path,
622        });
623
624        let inserted_blob_digest = chunk_and_upload(
625            &mut Cursor::new(blob.to_vec()),
626            object_store,
627            object_store::path::Path::from("/"),
628            1024 / 2,
629            1024,
630            1024 * 2,
631        )
632        .await
633        .expect("chunk_and_upload succeeds");
634
635        assert_eq!(blob_digest.clone(), inserted_blob_digest);
636
637        // Now we should have the blob
638        assert!(blobsvc.has(blob_digest).await.unwrap());
639
640        // Check if it was chunked correctly
641        let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
642        if blob.len() < 1024 / 2 {
643            // The blob is smaller than the min chunk size, it should have been inserted as a whole
644            assert!(chunks.is_empty());
645        } else if blob.len() > 1024 * 2 {
646            // The blob is larger than the max chunk size, make sure it was split up into at least
647            // two chunks
648            assert!(chunks.len() >= 2);
649        }
650    }
651}