snix_castore/blobservice/
combinator.rs

1use std::sync::Arc;
2
3use tonic::async_trait;
4use tracing::instrument;
5
6use crate::composition::{CompositionContext, ServiceBuilder};
7use crate::{B3Digest, Error};
8
9use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
10
11/// Combinator for a BlobService, using a "near" and "far" blobservice.
12/// Requests are tried in (and returned from) the near store first, only if
13/// things are not present there, the far BlobService is queried.
14/// In case the near blobservice doesn't have the blob, we ask the remote
15/// blobservice for chunks, and try to read each of these chunks from the near
16/// blobservice again, before falling back to the far one.
17/// The far BlobService is never written to.
18pub struct CombinedBlobService<BL, BR> {
19    instance_name: String,
20    near: BL,
21    far: BR,
22}
23
24impl<BL, BR> Clone for CombinedBlobService<BL, BR>
25where
26    BL: Clone,
27    BR: Clone,
28{
29    fn clone(&self) -> Self {
30        Self {
31            instance_name: self.instance_name.clone(),
32            near: self.near.clone(),
33            far: self.far.clone(),
34        }
35    }
36}
37
38#[async_trait]
39impl<BL, BR> BlobService for CombinedBlobService<BL, BR>
40where
41    BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
42    BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
43{
44    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
45    async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
46        Ok(self.near.as_ref().has(digest).await? || self.far.as_ref().has(digest).await?)
47    }
48
49    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
50    async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
51        if self.near.as_ref().has(digest).await? {
52            // near store has the blob, so we can assume it also has all chunks.
53            self.near.as_ref().open_read(digest).await
54        } else {
55            // near store doesn't have the blob.
56            // Ask the remote one for the list of chunks,
57            // and create a chunked reader that uses self.open_read() for
58            // individual chunks. There's a chance we already have some chunks
59            // in near, meaning we don't need to fetch them all from the far
60            // BlobService.
61            match self.far.as_ref().chunks(digest).await? {
62                // blob doesn't exist on the near side either, nothing we can do.
63                None => Ok(None),
64                Some(remote_chunks) => {
65                    // if there's no more granular chunks, or the far
66                    // blobservice doesn't support chunks, read the blob from
67                    // the far blobservice directly.
68                    if remote_chunks.is_empty() {
69                        return self.far.as_ref().open_read(digest).await;
70                    }
71                    // otherwise, a chunked reader, which will always try the
72                    // near backend first.
73
74                    let chunked_reader = ChunkedReader::from_chunks(
75                        remote_chunks.into_iter().map(|chunk| {
76                            (
77                                chunk.digest.try_into().expect("invalid b3 digest"),
78                                chunk.size,
79                            )
80                        }),
81                        Arc::new(self.clone()) as Arc<dyn BlobService>,
82                    );
83                    Ok(Some(Box::new(chunked_reader)))
84                }
85            }
86        }
87    }
88
89    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
90    async fn open_write(&self) -> Box<dyn BlobWriter> {
91        // direct writes to the near one.
92        self.near.as_ref().open_write().await
93    }
94}
95
96#[derive(serde::Deserialize, Debug, Clone)]
97#[serde(deny_unknown_fields)]
98pub struct CombinedBlobServiceConfig {
99    near: String,
100    far: String,
101}
102
103impl TryFrom<url::Url> for CombinedBlobServiceConfig {
104    type Error = Box<dyn std::error::Error + Send + Sync>;
105    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
106        Err(Error::StorageError(
107            "Instantiating a CombinedBlobService from a url is not supported".into(),
108        )
109        .into())
110    }
111}
112
113#[async_trait]
114impl ServiceBuilder for CombinedBlobServiceConfig {
115    type Output = dyn BlobService;
116    async fn build<'a>(
117        &'a self,
118        instance_name: &str,
119        context: &CompositionContext,
120    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
121        let (local, remote) = futures::join!(
122            context.resolve(self.near.clone()),
123            context.resolve(self.far.clone())
124        );
125        Ok(Arc::new(CombinedBlobService {
126            instance_name: instance_name.to_string(),
127            near: local?,
128            far: remote?,
129        }))
130    }
131}