snix_castore/blobservice/
combinator.rs1use std::sync::Arc;
2
3use tokio::io::AsyncRead;
4use tonic::async_trait;
5use tracing::instrument;
6
7use crate::B3Digest;
8use crate::composition::{CompositionContext, ServiceBuilder};
9
10use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
11
12#[derive(Clone)]
21pub struct Cache<BN, BF>
22where
23 BN: Clone,
24 BF: Clone,
25{
26 instance_name: String,
27 near: BN,
28 far: BF,
29}
30
31impl<BN, BF> Cache<BN, BF>
32where
33 BN: Clone,
34 BF: Clone,
35{
36 pub fn new(instance_name: String, near: BN, far: BF) -> Self {
37 Self {
38 instance_name,
39 near,
40 far,
41 }
42 }
43}
44
45#[async_trait]
46impl<BN, BF> BlobService for Cache<BN, BF>
47where
48 BN: BlobService + Clone + 'static,
49 BF: BlobService + Clone + 'static,
50{
51 #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
52 async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
53 Ok(self.near.has(digest).await? || self.far.has(digest).await?)
54 }
55
56 #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
57 async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
58 if self.near.has(digest).await? {
59 self.near.open_read(digest).await
61 } else {
62 match self.far.chunks(digest).await? {
69 None => Ok(None),
71 Some(remote_chunks) => {
72 let mut far_reader = {
73 if remote_chunks.is_empty() {
77 if let Some(reader) = self.far.open_read(digest).await? {
78 Box::new(reader) as Box<dyn AsyncRead + Unpin + Send>
79 } else {
80 return Ok(None);
81 }
82 } else {
83 Box::new(ChunkedReader::from_chunks(
86 remote_chunks.into_iter().map(|chunk| {
87 (
88 chunk.digest.try_into().expect("invalid b3 digest"),
89 chunk.size,
90 )
91 }),
92 self.clone(),
93 )) as Box<dyn AsyncRead + Unpin + Send>
94 }
95 };
96
97 let mut near_blobwriter = self.near.open_write().await;
100 tokio::io::copy(&mut far_reader, &mut near_blobwriter).await?;
101
102 let written_digest = near_blobwriter.close().await?;
103 if written_digest != *digest {
104 return Err(std::io::Error::other(
105 "blob written to near blobservice returned unexpected digest",
106 ));
107 }
108
109 return self.near.open_read(digest).await;
110 }
111 }
112 }
113 }
114
115 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
116 async fn open_write(&self) -> Box<dyn BlobWriter> {
117 self.near.open_write().await
119 }
120}
121
122#[derive(serde::Deserialize, Debug, Clone)]
123#[serde(deny_unknown_fields)]
124pub struct CacheBlobServiceConfig {
125 near: String,
126 far: String,
127}
128
129impl TryFrom<url::Url> for CacheBlobServiceConfig {
130 type Error = Box<dyn std::error::Error + Send + Sync>;
131 fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
132 Err("Instantiating a CacheBlobService from a url is not supported".into())
133 }
134}
135
136#[async_trait]
137impl ServiceBuilder for CacheBlobServiceConfig {
138 type Output = dyn BlobService;
139 async fn build<'a>(
140 &'a self,
141 instance_name: &str,
142 context: &CompositionContext,
143 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
144 let (near, far) = futures::join!(
145 context.resolve::<dyn BlobService>(&self.near),
146 context.resolve::<dyn BlobService>(&self.far)
147 );
148 Ok(Arc::new(Cache {
149 instance_name: instance_name.to_string(),
150 near: near?,
151 far: far?,
152 }))
153 }
154}