snix_castore/import/
blobs.rs

1use std::{
2    io::{Cursor, Write},
3    sync::Arc,
4};
5
6use tokio::{
7    io::AsyncRead,
8    sync::Semaphore,
9    task::{JoinError, JoinSet},
10};
11use tokio_util::io::InspectReader;
12use tracing::{Instrument, Level, instrument};
13
14use crate::{B3Digest, Path, PathBuf, blobservice::BlobService};
15
16/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
17/// background.
18///
19/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
20/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
21/// the blob can be represented using a u32 and will not cause an overflow.
22const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
23
24/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
25const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
26
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29    #[error("unable to read blob contents for {0}: {1}")]
30    BlobRead(PathBuf, std::io::Error),
31
32    #[error("unable to check whether blob at {0} already exists: {1}")]
33    BlobCheck(PathBuf, std::io::Error),
34
35    // FUTUREWORK: proper error for blob finalize
36    #[error("unable to finalize blob {0}: {1}")]
37    BlobFinalize(PathBuf, std::io::Error),
38
39    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
40    UnexpectedSize {
41        path: PathBuf,
42        wanted: u64,
43        got: u64,
44    },
45
46    #[error("blob upload join error: {0}")]
47    JoinError(#[from] JoinError),
48}
49
50/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
51/// This is useful when ingesting from sources like tarballs and archives which each blob entry
52/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
53/// round trip time with the blob service. The concurrent blob uploader will buffer small
54/// blobs in memory and upload them to the blob service in the background.
55///
56/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
57/// for all background jobs to complete and check for any errors.
58pub struct ConcurrentBlobUploader<BS> {
59    blob_service: BS,
60    upload_tasks: JoinSet<Result<(), Error>>,
61    upload_semaphore: Arc<Semaphore>,
62}
63
64impl<BS> ConcurrentBlobUploader<BS>
65where
66    BS: BlobService + Clone + 'static,
67{
68    /// Creates a new concurrent blob uploader which uploads blobs to the provided
69    /// blob service.
70    pub fn new(blob_service: BS) -> Self {
71        Self {
72            blob_service,
73            upload_tasks: JoinSet::new(),
74            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
75        }
76    }
77
78    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
79    /// and uploaded in the background.
80    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
81    /// are uploaded in the background..
82    #[instrument(skip_all, fields(nar.path=%path, blob.size=expected_size), err, ret(level = Level::TRACE, Display))]
83    pub async fn upload<R>(
84        &mut self,
85        path: &Path,
86        expected_size: u64,
87        mut r: R,
88    ) -> Result<B3Digest, Error>
89    where
90        R: AsyncRead + Unpin,
91    {
92        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
93            let mut buffer = Vec::with_capacity(expected_size as usize);
94            let mut hasher = blake3::Hasher::new();
95            let mut reader = InspectReader::new(&mut r, |bytes| {
96                hasher.write_all(bytes).unwrap();
97            });
98
99            let permit = self
100                .upload_semaphore
101                .clone()
102                // This cast is safe because ensure the header_size is less than
103                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
104                .acquire_many_owned(expected_size as u32)
105                .await
106                .unwrap();
107            let size = tokio::io::copy(&mut reader, &mut buffer)
108                .await
109                .map_err(|e| Error::BlobRead(path.into(), e))?;
110            let digest: B3Digest = hasher.finalize().as_bytes().into();
111
112            if size != expected_size {
113                return Err(Error::UnexpectedSize {
114                    path: path.into(),
115                    wanted: expected_size,
116                    got: size,
117                });
118            }
119
120            self.upload_tasks.spawn({
121                let blob_service = self.blob_service.clone();
122                let path = path.to_owned();
123                let r = Cursor::new(buffer);
124                async move {
125                    // We know the blob digest already, check it exists before sending it.
126                    if blob_service
127                        .has(&digest)
128                        .await
129                        .map_err(|e| Error::BlobCheck(path.clone(), e))?
130                    {
131                        drop(permit);
132                        return Ok(());
133                    }
134
135                    let uploaded_digest =
136                        upload_blob(&blob_service, &path, expected_size, r).await?;
137
138                    assert_eq!(uploaded_digest, digest, "Snix bug: blob digest mismatch");
139
140                    // Make sure we hold the permit until we finish writing the blob
141                    // to the [BlobService].
142                    drop(permit);
143                    Ok(())
144                }
145                .in_current_span()
146            });
147
148            return Ok(digest);
149        }
150
151        upload_blob(&self.blob_service, path, expected_size, r).await
152    }
153
154    /// Waits for all background upload jobs to complete, returning any upload errors.
155    pub async fn join(mut self) -> Result<(), Error> {
156        while let Some(result) = self.upload_tasks.join_next().await {
157            result??;
158        }
159        Ok(())
160    }
161}
162
163async fn upload_blob<BS, R>(
164    blob_service: &BS,
165    path: &Path,
166    expected_size: u64,
167    mut r: R,
168) -> Result<B3Digest, Error>
169where
170    BS: BlobService,
171    R: AsyncRead + Unpin,
172{
173    let mut writer = blob_service.open_write().await;
174
175    let size = tokio::io::copy(&mut r, &mut writer)
176        .await
177        .map_err(|e| Error::BlobRead(path.into(), e))?;
178
179    let digest = writer
180        .close()
181        .await
182        .map_err(|e| Error::BlobFinalize(path.into(), e))?;
183
184    if size != expected_size {
185        return Err(Error::UnexpectedSize {
186            path: path.into(),
187            wanted: expected_size,
188            got: size,
189        });
190    }
191
192    Ok(digest)
193}