snix_castore/import/
blobs.rs

1use std::{
2    io::{Cursor, Write},
3    sync::Arc,
4};
5
6use tokio::{
7    io::AsyncRead,
8    sync::Semaphore,
9    task::{JoinError, JoinSet},
10};
11use tokio_util::io::InspectReader;
12use tracing::{Instrument, info_span};
13
14use crate::{B3Digest, Path, PathBuf, blobservice::BlobService};
15
16/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
17/// background.
18///
19/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
20/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
21/// the blob can be represented using a u32 and will not cause an overflow.
22const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
23
24/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
25const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
26
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29    #[error("unable to read blob contents for {0}: {1}")]
30    BlobRead(PathBuf, std::io::Error),
31
32    #[error("unable to check whether blob at {0} already exists: {1}")]
33    BlobCheck(PathBuf, std::io::Error),
34
35    // FUTUREWORK: proper error for blob finalize
36    #[error("unable to finalize blob {0}: {1}")]
37    BlobFinalize(PathBuf, std::io::Error),
38
39    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
40    UnexpectedSize {
41        path: PathBuf,
42        wanted: u64,
43        got: u64,
44    },
45
46    #[error("blob upload join error: {0}")]
47    JoinError(#[from] JoinError),
48}
49
50/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
51/// This is useful when ingesting from sources like tarballs and archives which each blob entry
52/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
53/// round trip time with the blob service. The concurrent blob uploader will buffer small
54/// blobs in memory and upload them to the blob service in the background.
55///
56/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
57/// for all background jobs to complete and check for any errors.
58pub struct ConcurrentBlobUploader<BS> {
59    blob_service: BS,
60    upload_tasks: JoinSet<Result<(), Error>>,
61    upload_semaphore: Arc<Semaphore>,
62}
63
64impl<BS> ConcurrentBlobUploader<BS>
65where
66    BS: BlobService + Clone + 'static,
67{
68    /// Creates a new concurrent blob uploader which uploads blobs to the provided
69    /// blob service.
70    pub fn new(blob_service: BS) -> Self {
71        Self {
72            blob_service,
73            upload_tasks: JoinSet::new(),
74            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
75        }
76    }
77
78    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
79    /// and uploaded in the background.
80    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
81    /// are uploaded in the background..
82    pub async fn upload<R>(
83        &mut self,
84        path: &Path,
85        expected_size: u64,
86        mut r: R,
87    ) -> Result<B3Digest, Error>
88    where
89        R: AsyncRead + Unpin,
90    {
91        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
92            let mut buffer = Vec::with_capacity(expected_size as usize);
93            let mut hasher = blake3::Hasher::new();
94            let mut reader = InspectReader::new(&mut r, |bytes| {
95                hasher.write_all(bytes).unwrap();
96            });
97
98            let permit = self
99                .upload_semaphore
100                .clone()
101                // This cast is safe because ensure the header_size is less than
102                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
103                .acquire_many_owned(expected_size as u32)
104                .await
105                .unwrap();
106            let size = tokio::io::copy(&mut reader, &mut buffer)
107                .await
108                .map_err(|e| Error::BlobRead(path.into(), e))?;
109            let digest: B3Digest = hasher.finalize().as_bytes().into();
110
111            if size != expected_size {
112                return Err(Error::UnexpectedSize {
113                    path: path.into(),
114                    wanted: expected_size,
115                    got: size,
116                });
117            }
118
119            self.upload_tasks.spawn({
120                let blob_service = self.blob_service.clone();
121                let expected_digest = digest.clone();
122                let path = path.to_owned();
123                let r = Cursor::new(buffer);
124                async move {
125                    // We know the blob digest already, check it exists before sending it.
126                    if blob_service
127                        .has(&expected_digest)
128                        .await
129                        .map_err(|e| Error::BlobCheck(path.clone(), e))?
130                    {
131                        drop(permit);
132                        return Ok(());
133                    }
134
135                    let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
136
137                    assert_eq!(digest, expected_digest, "Snix bug: blob digest mismatch");
138
139                    // Make sure we hold the permit until we finish writing the blob
140                    // to the [BlobService].
141                    drop(permit);
142                    Ok(())
143                }
144                .instrument(info_span!("upload_task"))
145            });
146
147            return Ok(digest);
148        }
149
150        upload_blob(&self.blob_service, path, expected_size, r).await
151    }
152
153    /// Waits for all background upload jobs to complete, returning any upload errors.
154    pub async fn join(mut self) -> Result<(), Error> {
155        while let Some(result) = self.upload_tasks.join_next().await {
156            result??;
157        }
158        Ok(())
159    }
160}
161
162async fn upload_blob<BS, R>(
163    blob_service: &BS,
164    path: &Path,
165    expected_size: u64,
166    mut r: R,
167) -> Result<B3Digest, Error>
168where
169    BS: BlobService,
170    R: AsyncRead + Unpin,
171{
172    let mut writer = blob_service.open_write().await;
173
174    let size = tokio::io::copy(&mut r, &mut writer)
175        .await
176        .map_err(|e| Error::BlobRead(path.into(), e))?;
177
178    let digest = writer
179        .close()
180        .await
181        .map_err(|e| Error::BlobFinalize(path.into(), e))?;
182
183    if size != expected_size {
184        return Err(Error::UnexpectedSize {
185            path: path.into(),
186            wanted: expected_size,
187            got: size,
188        });
189    }
190
191    Ok(digest)
192}