snix_build/buildservice/
oci.rs

1use anyhow::Context;
2use bstr::BStr;
3use snix_castore::{
4    blobservice::BlobService,
5    directoryservice::DirectoryService,
6    fs::fuse::FuseDaemon,
7    import::fs::ingest_path,
8    refscan::{ReferencePattern, ReferenceScanner},
9};
10use tokio::process::{Child, Command};
11use tonic::async_trait;
12use tracing::{Span, debug, instrument, warn};
13use uuid::Uuid;
14
15use crate::buildservice::{BuildOutput, BuildRequest, BuildResult};
16use crate::oci::{get_host_output_paths, make_bundle, make_spec};
17use std::{ffi::OsStr, path::PathBuf, process::Stdio};
18
19use super::BuildService;
20
21const SANDBOX_SHELL: &str = env!("SNIX_BUILD_SANDBOX_SHELL");
22const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable
23
24pub struct OCIBuildService<BS, DS> {
25    /// Root path in which all bundles are created in
26    bundle_root: PathBuf,
27
28    /// Handle to a [BlobService], used by filesystems spawned during builds.
29    blob_service: BS,
30    /// Handle to a [DirectoryService], used by filesystems spawned during builds.
31    directory_service: DS,
32
33    // semaphore to track number of concurrently running builds.
34    // this is necessary, as otherwise we very quickly run out of open file handles.
35    concurrent_builds: tokio::sync::Semaphore,
36}
37
38impl<BS, DS> OCIBuildService<BS, DS> {
39    pub fn new(bundle_root: PathBuf, blob_service: BS, directory_service: DS) -> Self {
40        // We map root inside the container to the uid/gid this is running at,
41        // and allocate one for uid 1000 into the container from the range we
42        // got in /etc/sub{u,g}id.
43        // FUTUREWORK: use different uids?
44        Self {
45            bundle_root,
46            blob_service,
47            directory_service,
48            concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS),
49        }
50    }
51}
52
53#[async_trait]
54impl<BS, DS> BuildService for OCIBuildService<BS, DS>
55where
56    BS: BlobService + Clone + 'static,
57    DS: DirectoryService + Clone + 'static,
58{
59    #[instrument(skip_all, err)]
60    async fn do_build(&self, request: BuildRequest) -> std::io::Result<BuildResult> {
61        let _permit = self.concurrent_builds.acquire().await.unwrap();
62
63        let bundle_name = Uuid::new_v4();
64        let bundle_path = self.bundle_root.join(bundle_name.to_string());
65
66        let span = Span::current();
67        span.record("bundle_name", bundle_name.to_string());
68
69        let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL)
70            .context("failed to create spec")
71            .map_err(std::io::Error::other)?;
72
73        let linux = runtime_spec.linux().clone().unwrap();
74
75        runtime_spec.set_linux(Some(linux));
76
77        make_bundle(&request, &runtime_spec, &bundle_path)
78            .context("failed to produce bundle")
79            .map_err(std::io::Error::other)?;
80
81        // pre-calculate the locations we want to later ingest, in the order of
82        // the original outputs.
83        // If we can't find calculate that path, don't start the build in first place.
84        let host_output_paths = get_host_output_paths(&request, &bundle_path)
85            .context("failed to calculate host output paths")
86            .map_err(std::io::Error::other)?;
87
88        // assemble a BTreeMap of Nodes to pass into SnixStoreFs.
89        let patterns = ReferencePattern::new(request.refscan_needles);
90        // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount.
91        let _fuse_daemon = tokio::task::spawn_blocking({
92            let blob_service = self.blob_service.clone();
93            let directory_service = self.directory_service.clone();
94
95            let dest = bundle_path.join("inputs");
96
97            let root_nodes = Box::new(request.inputs);
98            move || {
99                let fs = snix_castore::fs::SnixStoreFs::new(
100                    blob_service,
101                    directory_service,
102                    root_nodes,
103                    true,
104                    false,
105                );
106                // mount the filesystem and wait for it to be unmounted.
107                // FUTUREWORK: make fuse daemon threads configurable?
108                FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon")
109            }
110        })
111        .await?
112        .context("mounting")
113        .map_err(std::io::Error::other)?;
114
115        debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle");
116
117        // start the bundle as another process.
118        let child = spawn_bundle(bundle_path, &bundle_name.to_string())?;
119
120        // wait for the process to exit
121        // FUTUREWORK: change the trait to allow reporting progress / logs…
122        let child_output = child
123            .wait_with_output()
124            .await
125            .context("failed to run process")
126            .map_err(std::io::Error::other)?;
127
128        // Check the exit code
129        if !child_output.status.success() {
130            let stdout = BStr::new(&child_output.stdout);
131            let stderr = BStr::new(&child_output.stderr);
132
133            warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed");
134
135            return Err(std::io::Error::new(
136                std::io::ErrorKind::Other,
137                "nonzero exit code".to_string(),
138            ));
139        }
140
141        // Ingest build outputs into the castore.
142        // We use try_join_all here. No need to spawn new tasks, as this is
143        // mostly IO bound.
144        let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map(
145            |(i, host_output_path)| {
146                let output_path = &request.outputs[i];
147                let patterns = patterns.clone();
148                async move {
149                    debug!(host.path=?host_output_path, output.path=?output_path, "ingesting path");
150
151                    let scanner = ReferenceScanner::new(patterns);
152
153                    Ok::<_, std::io::Error>(BuildOutput {
154                        node: ingest_path(
155                            self.blob_service.clone(),
156                            &self.directory_service,
157                            host_output_path,
158                            Some(&scanner),
159                        )
160                        .await
161                        .map_err(|e| {
162                            std::io::Error::new(
163                                std::io::ErrorKind::InvalidData,
164                                format!("Unable to ingest output: {}", e),
165                            )
166                        })?,
167
168                        output_needles: scanner
169                            .matches()
170                            .into_iter()
171                            .enumerate()
172                            .filter(|(_, val)| *val)
173                            .map(|(idx, _)| idx as u64)
174                            .collect(),
175                    })
176                }
177            },
178        ))
179        .await?;
180
181        Ok(BuildResult { outputs })
182    }
183}
184
185/// Spawns runc with the bundle at bundle_path.
186/// On success, returns the child.
187#[instrument(err)]
188fn spawn_bundle(
189    bundle_path: impl AsRef<OsStr> + std::fmt::Debug,
190    bundle_name: &str,
191) -> std::io::Result<Child> {
192    let mut command = Command::new("runc");
193
194    command
195        .args(&[
196            "run".into(),
197            "--bundle".into(),
198            bundle_path.as_ref().to_os_string(),
199            bundle_name.into(),
200        ])
201        .stderr(Stdio::piped())
202        .stdout(Stdio::piped())
203        .stdin(Stdio::null());
204
205    command.spawn()
206}