Skip to main content

snix_castore/blobservice/
combinator.rs

1use std::sync::Arc;
2
3use tokio::io::AsyncRead;
4use tonic::async_trait;
5use tracing::instrument;
6
7use crate::B3Digest;
8use crate::composition::{CompositionContext, ServiceBuilder};
9
10use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
11
12/// Cache for a BlobService, using a "near" and "far" blobservice.
13/// Requests are tried in (and returned from) the near store first, only if
14/// things are not present there, the far BlobService is queried.
15/// In case the near blobservice doesn't have the blob, we ask the remote
16/// blobservice for chunks, and try to read each of these chunks from the near
17/// blobservice again, before falling back to the far one.
18/// While doing so, the full blob is written to the near blobservice.
19/// The far BlobService is never written to.
20#[derive(Clone)]
21pub struct Cache<BN, BF>
22where
23    BN: Clone,
24    BF: Clone,
25{
26    instance_name: String,
27    near: BN,
28    far: BF,
29}
30
31impl<BN, BF> Cache<BN, BF>
32where
33    BN: Clone,
34    BF: Clone,
35{
36    pub fn new(instance_name: String, near: BN, far: BF) -> Self {
37        Self {
38            instance_name,
39            near,
40            far,
41        }
42    }
43}
44
45#[async_trait]
46impl<BN, BF> BlobService for Cache<BN, BF>
47where
48    BN: BlobService + Clone + 'static,
49    BF: BlobService + Clone + 'static,
50{
51    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
52    async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
53        Ok(self.near.has(digest).await? || self.far.has(digest).await?)
54    }
55
56    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
57    async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
58        if self.near.has(digest).await? {
59            // near store has the blob, so we can assume it also has all chunks.
60            self.near.open_read(digest).await
61        } else {
62            // near store doesn't have the blob.
63            // Ask the remote one for the list of chunks,
64            // and create a chunked reader that uses self.open_read() for
65            // individual chunks. There's a chance we already have some chunks
66            // in near, meaning we don't need to fetch them all from the far
67            // BlobService.
68            match self.far.chunks(digest).await? {
69                // blob doesn't exist on the far side either, nothing we can do.
70                None => Ok(None),
71                Some(remote_chunks) => {
72                    let mut far_reader = {
73                        // if there's no more granular chunks, or the far
74                        // blobservice doesn't support chunks, read the blob from
75                        // the far blobservice directly.
76                        if remote_chunks.is_empty() {
77                            if let Some(reader) = self.far.open_read(digest).await? {
78                                Box::new(reader) as Box<dyn AsyncRead + Unpin + Send>
79                            } else {
80                                return Ok(None);
81                            }
82                        } else {
83                            // otherwise, a chunked reader, which will always try the
84                            // near backend first.
85                            Box::new(ChunkedReader::from_chunks(
86                                remote_chunks.into_iter().map(|chunk| {
87                                    (
88                                        chunk.digest.try_into().expect("invalid b3 digest"),
89                                        chunk.size,
90                                    )
91                                }),
92                                self.clone(),
93                            )) as Box<dyn AsyncRead + Unpin + Send>
94                        }
95                    };
96
97                    // Blob is present on the remote blobservice.
98                    // Copy it into the near blobservice, then return from there.
99                    let mut near_blobwriter = self.near.open_write().await;
100                    tokio::io::copy(&mut far_reader, &mut near_blobwriter).await?;
101
102                    let written_digest = near_blobwriter.close().await?;
103                    if written_digest != *digest {
104                        return Err(std::io::Error::other(
105                            "blob written to near blobservice returned unexpected digest",
106                        ));
107                    }
108
109                    return self.near.open_read(digest).await;
110                }
111            }
112        }
113    }
114
115    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
116    async fn open_write(&self) -> Box<dyn BlobWriter> {
117        // direct writes to the near one.
118        self.near.open_write().await
119    }
120}
121
122#[derive(serde::Deserialize, Debug, Clone)]
123#[serde(deny_unknown_fields)]
124pub struct CacheBlobServiceConfig {
125    near: String,
126    far: String,
127}
128
129impl TryFrom<url::Url> for CacheBlobServiceConfig {
130    type Error = Box<dyn std::error::Error + Send + Sync>;
131    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
132        Err("Instantiating a CacheBlobService from a url is not supported".into())
133    }
134}
135
136#[async_trait]
137impl ServiceBuilder for CacheBlobServiceConfig {
138    type Output = dyn BlobService;
139    async fn build<'a>(
140        &'a self,
141        instance_name: &str,
142        context: &CompositionContext,
143    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
144        let (near, far) = futures::join!(
145            context.resolve::<dyn BlobService>(&self.near),
146            context.resolve::<dyn BlobService>(&self.far)
147        );
148        Ok(Arc::new(Cache {
149            instance_name: instance_name.to_string(),
150            near: near?,
151            far: far?,
152        }))
153    }
154}