snix_castore/import/
blobs.rs1use std::{
2 io::{Cursor, Write},
3 sync::Arc,
4};
5
6use tokio::{
7 io::AsyncRead,
8 sync::Semaphore,
9 task::{JoinError, JoinSet},
10};
11use tokio_util::io::InspectReader;
12use tracing::{Instrument, info_span};
13
14use crate::{B3Digest, Path, PathBuf, blobservice::BlobService};
15
16const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
23
24const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
26
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29 #[error("unable to read blob contents for {0}: {1}")]
30 BlobRead(PathBuf, std::io::Error),
31
32 #[error("unable to check whether blob at {0} already exists: {1}")]
33 BlobCheck(PathBuf, std::io::Error),
34
35 #[error("unable to finalize blob {0}: {1}")]
37 BlobFinalize(PathBuf, std::io::Error),
38
39 #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
40 UnexpectedSize {
41 path: PathBuf,
42 wanted: u64,
43 got: u64,
44 },
45
46 #[error("blob upload join error: {0}")]
47 JoinError(#[from] JoinError),
48}
49
50pub struct ConcurrentBlobUploader<BS> {
59 blob_service: BS,
60 upload_tasks: JoinSet<Result<(), Error>>,
61 upload_semaphore: Arc<Semaphore>,
62}
63
64impl<BS> ConcurrentBlobUploader<BS>
65where
66 BS: BlobService + Clone + 'static,
67{
68 pub fn new(blob_service: BS) -> Self {
71 Self {
72 blob_service,
73 upload_tasks: JoinSet::new(),
74 upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
75 }
76 }
77
78 pub async fn upload<R>(
83 &mut self,
84 path: &Path,
85 expected_size: u64,
86 mut r: R,
87 ) -> Result<B3Digest, Error>
88 where
89 R: AsyncRead + Unpin,
90 {
91 if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
92 let mut buffer = Vec::with_capacity(expected_size as usize);
93 let mut hasher = blake3::Hasher::new();
94 let mut reader = InspectReader::new(&mut r, |bytes| {
95 hasher.write_all(bytes).unwrap();
96 });
97
98 let permit = self
99 .upload_semaphore
100 .clone()
101 .acquire_many_owned(expected_size as u32)
104 .await
105 .unwrap();
106 let size = tokio::io::copy(&mut reader, &mut buffer)
107 .await
108 .map_err(|e| Error::BlobRead(path.into(), e))?;
109 let digest: B3Digest = hasher.finalize().as_bytes().into();
110
111 if size != expected_size {
112 return Err(Error::UnexpectedSize {
113 path: path.into(),
114 wanted: expected_size,
115 got: size,
116 });
117 }
118
119 self.upload_tasks.spawn({
120 let blob_service = self.blob_service.clone();
121 let expected_digest = digest.clone();
122 let path = path.to_owned();
123 let r = Cursor::new(buffer);
124 async move {
125 if blob_service
127 .has(&expected_digest)
128 .await
129 .map_err(|e| Error::BlobCheck(path.clone(), e))?
130 {
131 drop(permit);
132 return Ok(());
133 }
134
135 let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
136
137 assert_eq!(digest, expected_digest, "Snix bug: blob digest mismatch");
138
139 drop(permit);
142 Ok(())
143 }
144 .instrument(info_span!("upload_task"))
145 });
146
147 return Ok(digest);
148 }
149
150 upload_blob(&self.blob_service, path, expected_size, r).await
151 }
152
153 pub async fn join(mut self) -> Result<(), Error> {
155 while let Some(result) = self.upload_tasks.join_next().await {
156 result??;
157 }
158 Ok(())
159 }
160}
161
162async fn upload_blob<BS, R>(
163 blob_service: &BS,
164 path: &Path,
165 expected_size: u64,
166 mut r: R,
167) -> Result<B3Digest, Error>
168where
169 BS: BlobService,
170 R: AsyncRead + Unpin,
171{
172 let mut writer = blob_service.open_write().await;
173
174 let size = tokio::io::copy(&mut r, &mut writer)
175 .await
176 .map_err(|e| Error::BlobRead(path.into(), e))?;
177
178 let digest = writer
179 .close()
180 .await
181 .map_err(|e| Error::BlobFinalize(path.into(), e))?;
182
183 if size != expected_size {
184 return Err(Error::UnexpectedSize {
185 path: path.into(),
186 wanted: expected_size,
187 got: size,
188 });
189 }
190
191 Ok(digest)
192}