snix_castore/blobservice/
combinator.rs1use std::sync::Arc;
2
3use tonic::async_trait;
4use tracing::instrument;
5
6use crate::composition::{CompositionContext, ServiceBuilder};
7use crate::{B3Digest, Error};
8
9use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
10
11pub struct CombinedBlobService<BL, BR> {
19 instance_name: String,
20 near: BL,
21 far: BR,
22}
23
24impl<BL, BR> Clone for CombinedBlobService<BL, BR>
25where
26 BL: Clone,
27 BR: Clone,
28{
29 fn clone(&self) -> Self {
30 Self {
31 instance_name: self.instance_name.clone(),
32 near: self.near.clone(),
33 far: self.far.clone(),
34 }
35 }
36}
37
38#[async_trait]
39impl<BL, BR> BlobService for CombinedBlobService<BL, BR>
40where
41 BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
42 BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
43{
44 #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
45 async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
46 Ok(self.near.as_ref().has(digest).await? || self.far.as_ref().has(digest).await?)
47 }
48
49 #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
50 async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
51 if self.near.as_ref().has(digest).await? {
52 self.near.as_ref().open_read(digest).await
54 } else {
55 match self.far.as_ref().chunks(digest).await? {
62 None => Ok(None),
64 Some(remote_chunks) => {
65 if remote_chunks.is_empty() {
69 return self.far.as_ref().open_read(digest).await;
70 }
71 let chunked_reader = ChunkedReader::from_chunks(
75 remote_chunks.into_iter().map(|chunk| {
76 (
77 chunk.digest.try_into().expect("invalid b3 digest"),
78 chunk.size,
79 )
80 }),
81 Arc::new(self.clone()) as Arc<dyn BlobService>,
82 );
83 Ok(Some(Box::new(chunked_reader)))
84 }
85 }
86 }
87 }
88
89 #[instrument(skip_all, fields(instance_name=%self.instance_name))]
90 async fn open_write(&self) -> Box<dyn BlobWriter> {
91 self.near.as_ref().open_write().await
93 }
94}
95
96#[derive(serde::Deserialize, Debug, Clone)]
97#[serde(deny_unknown_fields)]
98pub struct CombinedBlobServiceConfig {
99 near: String,
100 far: String,
101}
102
103impl TryFrom<url::Url> for CombinedBlobServiceConfig {
104 type Error = Box<dyn std::error::Error + Send + Sync>;
105 fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
106 Err(Error::StorageError(
107 "Instantiating a CombinedBlobService from a url is not supported".into(),
108 )
109 .into())
110 }
111}
112
113#[async_trait]
114impl ServiceBuilder for CombinedBlobServiceConfig {
115 type Output = dyn BlobService;
116 async fn build<'a>(
117 &'a self,
118 instance_name: &str,
119 context: &CompositionContext,
120 ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
121 let (local, remote) = futures::join!(
122 context.resolve(self.near.clone()),
123 context.resolve(self.far.clone())
124 );
125 Ok(Arc::new(CombinedBlobService {
126 instance_name: instance_name.to_string(),
127 near: local?,
128 far: remote?,
129 }))
130 }
131}