snix_castore/import/
blobs.rs1use std::{
2 io::{Cursor, Write},
3 sync::Arc,
4};
5
6use tokio::{
7 io::AsyncRead,
8 sync::Semaphore,
9 task::{JoinError, JoinSet},
10};
11use tokio_util::io::InspectReader;
12use tracing::{Instrument, Level, instrument};
13
14use crate::{B3Digest, Path, PathBuf, blobservice::BlobService};
15
16const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
23
24const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
26
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29 #[error("unable to read blob contents for {0}: {1}")]
30 BlobRead(PathBuf, std::io::Error),
31
32 #[error("unable to check whether blob at {0} already exists: {1}")]
33 BlobCheck(PathBuf, std::io::Error),
34
35 #[error("unable to finalize blob {0}: {1}")]
37 BlobFinalize(PathBuf, std::io::Error),
38
39 #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
40 UnexpectedSize {
41 path: PathBuf,
42 wanted: u64,
43 got: u64,
44 },
45
46 #[error("blob upload join error: {0}")]
47 JoinError(#[from] JoinError),
48}
49
50pub struct ConcurrentBlobUploader<BS> {
59 blob_service: BS,
60 upload_tasks: JoinSet<Result<(), Error>>,
61 upload_semaphore: Arc<Semaphore>,
62}
63
64impl<BS> ConcurrentBlobUploader<BS>
65where
66 BS: BlobService + Clone + 'static,
67{
68 pub fn new(blob_service: BS) -> Self {
71 Self {
72 blob_service,
73 upload_tasks: JoinSet::new(),
74 upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
75 }
76 }
77
78 #[instrument(skip_all, fields(nar.path=%path, blob.size=expected_size), err, ret(level = Level::TRACE, Display))]
83 pub async fn upload<R>(
84 &mut self,
85 path: &Path,
86 expected_size: u64,
87 mut r: R,
88 ) -> Result<B3Digest, Error>
89 where
90 R: AsyncRead + Unpin,
91 {
92 if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
93 let mut buffer = Vec::with_capacity(expected_size as usize);
94 let mut hasher = blake3::Hasher::new();
95 let mut reader = InspectReader::new(&mut r, |bytes| {
96 hasher.write_all(bytes).unwrap();
97 });
98
99 let permit = self
100 .upload_semaphore
101 .clone()
102 .acquire_many_owned(expected_size as u32)
105 .await
106 .unwrap();
107 let size = tokio::io::copy(&mut reader, &mut buffer)
108 .await
109 .map_err(|e| Error::BlobRead(path.into(), e))?;
110 let digest: B3Digest = hasher.finalize().as_bytes().into();
111
112 if size != expected_size {
113 return Err(Error::UnexpectedSize {
114 path: path.into(),
115 wanted: expected_size,
116 got: size,
117 });
118 }
119
120 self.upload_tasks.spawn({
121 let blob_service = self.blob_service.clone();
122 let path = path.to_owned();
123 let r = Cursor::new(buffer);
124 async move {
125 if blob_service
127 .has(&digest)
128 .await
129 .map_err(|e| Error::BlobCheck(path.clone(), e))?
130 {
131 drop(permit);
132 return Ok(());
133 }
134
135 let uploaded_digest =
136 upload_blob(&blob_service, &path, expected_size, r).await?;
137
138 assert_eq!(uploaded_digest, digest, "Snix bug: blob digest mismatch");
139
140 drop(permit);
143 Ok(())
144 }
145 .in_current_span()
146 });
147
148 return Ok(digest);
149 }
150
151 upload_blob(&self.blob_service, path, expected_size, r).await
152 }
153
154 pub async fn join(mut self) -> Result<(), Error> {
156 while let Some(result) = self.upload_tasks.join_next().await {
157 result??;
158 }
159 Ok(())
160 }
161}
162
163async fn upload_blob<BS, R>(
164 blob_service: &BS,
165 path: &Path,
166 expected_size: u64,
167 mut r: R,
168) -> Result<B3Digest, Error>
169where
170 BS: BlobService,
171 R: AsyncRead + Unpin,
172{
173 let mut writer = blob_service.open_write().await;
174
175 let size = tokio::io::copy(&mut r, &mut writer)
176 .await
177 .map_err(|e| Error::BlobRead(path.into(), e))?;
178
179 let digest = writer
180 .close()
181 .await
182 .map_err(|e| Error::BlobFinalize(path.into(), e))?;
183
184 if size != expected_size {
185 return Err(Error::UnexpectedSize {
186 path: path.into(),
187 wanted: expected_size,
188 got: size,
189 });
190 }
191
192 Ok(digest)
193}