nar_bridge/
nar.rs

1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use futures::TryStreamExt;
7use nix_compat::{nix_http, nixbase32};
8use serde::Deserialize;
9use snix_castore::proto::parse_urlsafe_proto;
10use snix_store::nar::ingest_nar_and_hash;
11use std::io;
12use tokio_util::io::ReaderStream;
13use tracing::{Span, instrument, warn};
14
15use crate::AppState;
16
17#[derive(Debug, Deserialize)]
18pub(crate) struct GetNARParams {
19    #[serde(rename = "narsize")]
20    nar_size: Option<u64>,
21}
22
23#[instrument(skip_all)]
24pub async fn get_head(
25    method: axum::http::Method,
26    ranges: Option<TypedHeader<Range>>,
27    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
28    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
29    axum::extract::State(AppState {
30        blob_service,
31        directory_service,
32        ..
33    }): axum::extract::State<AppState>,
34) -> Result<impl axum::response::IntoResponse, StatusCode> {
35    // We insist on the nar_size field being set. If the client dropped it from
36    // the NARInfo we sent, it's misbehaving and we reject it.
37    let nar_size = nar_size.ok_or_else(|| {
38        warn!("no nar_size parameter set");
39        StatusCode::BAD_REQUEST
40    })?;
41
42    // Attempt to parse the root node passed *by the user*
43    let root_node = parse_urlsafe_proto(root_node_enc).ok_or_else(|| {
44        warn!("unable to parse castore-infused NAR path");
45        StatusCode::NOT_FOUND
46    })?;
47
48    Ok((
49        // headers
50        [
51            ("cache-control", "max-age=31536000, immutable"),
52            ("content-type", nix_http::MIME_TYPE_NAR),
53        ],
54        if method == axum::http::Method::HEAD {
55            // If this is a HEAD request, construct a response returning back the
56            // user-provided content-length, but don't actually talk to castore.
57            // If the client lied about it, we will echo back a wrong `Content-Length`,
58            // which is their problem.
59            Response::builder()
60                .header("content-length", nar_size)
61                .body(Body::empty())
62                .unwrap()
63        } else if let Some(TypedHeader(ranges)) = ranges {
64            // If this is a range request, construct a seekable NAR reader.
65            let r =
66                snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
67                    .await
68                    .map_err(|e| {
69                        warn!(err=%e, "failed to construct seekable nar reader");
70                        StatusCode::INTERNAL_SERVER_ERROR
71                    })?;
72
73            // ensure the user-supplied nar size was correct, no point returning data otherwise.
74            if r.stream_len() != nar_size {
75                warn!(
76                    actual_nar_size = r.stream_len(),
77                    supplied_nar_size = nar_size,
78                    "wrong nar size supplied"
79                );
80                return Err(StatusCode::BAD_REQUEST);
81            }
82            Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
83        } else {
84            // use the non-seekable codepath if there's no range(s) requested,
85            // as it uses less memory.
86            let (w, r) = tokio::io::duplex(1024 * 8);
87
88            // spawn a task rendering the NAR to the client.
89            tokio::spawn(async move {
90                if let Err(e) =
91                    snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
92                {
93                    warn!(err=%e, "failed to write out NAR");
94                }
95            });
96
97            Response::builder()
98                // If the client lied about it, we will echo back a wrong `Content-Length`,
99                // which is their problem.
100                .header("content-length", nar_size)
101                .body(Body::from_stream(ReaderStream::new(r)))
102                .unwrap()
103        },
104    ))
105}
106
107/// Handler to respond to GET/HEAD requests for recently uploaded NAR files.
108/// Nix probes at {filehash}.nar[.compression_suffix] to determine whether a NAR
109/// has already been uploaded, by responding to (some of) these requests we
110/// avoid it unnecessarily uploading.
111/// We don't keep a full K/V from NAR hash to root note around, only the
112/// in-memory cache used to connect to the castore node when processing a PUT
113/// for the NARInfo.
114#[instrument(skip_all, fields(nar_str))]
115pub async fn head_root_nodes(
116    axum::extract::Path(nar_str): axum::extract::Path<String>,
117    axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
118) -> Result<impl axum::response::IntoResponse, StatusCode> {
119    let (nar_hash, compression_suffix) =
120        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
121
122    // No paths with compression suffix are supported.
123    if !compression_suffix.is_empty() {
124        warn!(%compression_suffix, "invalid compression suffix requested");
125        return Err(StatusCode::UNAUTHORIZED);
126    }
127
128    // Check root_nodes, updating the moving it to the most recently used,
129    // as it might be referred in a subsequent NARInfo upload.
130    if root_nodes.write().get(&nar_hash).is_some() {
131        Ok("")
132    } else {
133        Err(StatusCode::NOT_FOUND)
134    }
135}
136
137#[instrument(skip_all)]
138pub async fn put(
139    axum::extract::Path(nar_str): axum::extract::Path<String>,
140    axum::extract::State(AppState {
141        blob_service,
142        directory_service,
143        root_nodes,
144        ..
145    }): axum::extract::State<AppState>,
146    request: axum::extract::Request,
147) -> Result<&'static str, StatusCode> {
148    let (nar_hash_expected, compression_suffix) =
149        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
150
151    // No paths with compression suffix are supported.
152    if !compression_suffix.is_empty() {
153        warn!(%compression_suffix, "invalid compression suffix requested");
154        return Err(StatusCode::UNAUTHORIZED);
155    }
156
157    let s = request.into_body().into_data_stream();
158
159    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
160        warn!(err=%e, "failed to read request body");
161        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
162    }));
163
164    // ingest the NAR
165    let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
166        blob_service.clone(),
167        directory_service.clone(),
168        &mut r,
169        &None,
170    )
171    .await
172    .map_err(io::Error::other)
173    .map_err(|e| {
174        warn!(err=%e, "failed to ingest nar");
175        StatusCode::INTERNAL_SERVER_ERROR
176    })?;
177
178    let s = Span::current();
179    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
180    s.record("nar_size", nar_size);
181
182    if nar_hash_expected != nar_hash_actual {
183        warn!(
184            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
185            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
186            "nar hash mismatch"
187        );
188        return Err(StatusCode::BAD_REQUEST);
189    }
190
191    // store mapping of narhash to root node into root_nodes.
192    // we need it later to populate the root node when accepting the PathInfo.
193    root_nodes.write().put(nar_hash_actual, root_node);
194
195    Ok("")
196}
197
198#[cfg(test)]
199mod tests {
200    use std::{
201        num::NonZero,
202        sync::{Arc, LazyLock},
203    };
204
205    use axum::{Router, http::Method};
206    use bytes::Bytes;
207    use data_encoding::BASE64URL_NOPAD;
208    use nix_compat::nixbase32;
209    use sha2::Digest;
210    use snix_castore::{
211        blobservice::{BlobService, MemoryBlobService},
212        directoryservice::DirectoryService,
213        fixtures::HELLOWORLD_BLOB_DIGEST,
214        utils::gen_test_directory_service,
215    };
216    use snix_store::{
217        fixtures::{
218            CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
219            NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
220        },
221        pathinfoservice::PathInfoService,
222        utils::gen_test_pathinfo_service,
223    };
224    use tracing_test::traced_test;
225
226    use crate::AppState;
227
228    pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
229        use prost::Message;
230        BASE64URL_NOPAD.encode(
231            &snix_castore::proto::Entry::from_name_and_node(
232                "".into(),
233                CASTORE_NODE_SYMLINK.clone(),
234            )
235            .encode_to_vec(),
236        )
237    });
238
239    /// Accepts a router without state, and returns a [axum_test::TestServer].
240    fn gen_server(
241        router: axum::Router<AppState>,
242    ) -> (
243        axum_test::TestServer,
244        impl BlobService,
245        impl DirectoryService,
246        impl PathInfoService,
247    ) {
248        let blob_service = Arc::new(MemoryBlobService::default());
249        let directory_service = Arc::new(gen_test_directory_service());
250        let path_info_service = Arc::new(gen_test_pathinfo_service());
251
252        let app = router.with_state(AppState::new(
253            blob_service.clone(),
254            directory_service.clone(),
255            path_info_service.clone(),
256            NonZero::new(100).unwrap(),
257        ));
258
259        (
260            axum_test::TestServer::new(app).unwrap(),
261            blob_service,
262            directory_service,
263            path_info_service,
264        )
265    }
266
267    #[traced_test]
268    #[tokio::test]
269    async fn test_get_head() {
270        let (server, _blob_service, _directory_service, _path_info_service) =
271            gen_server(Router::new().route(
272                "/nar/snix-castore/:root_node_enc",
273                axum::routing::get(super::get_head),
274            ));
275
276        // Empty nar_str should be NotFound
277        server
278            .method(Method::HEAD, "/nar/snix-castore/")
279            .expect_failure()
280            .await
281            .assert_status_not_found();
282
283        let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
284        let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
285
286        // Missing narsize should be BadRequest
287        server
288            .method(Method::HEAD, valid_url)
289            .expect_failure()
290            .await
291            .assert_status_bad_request();
292
293        let invalid_url = {
294            use prost::Message;
295            let n = snix_castore::proto::Entry {
296                entry: Some(snix_castore::proto::entry::Entry::Directory(
297                    snix_castore::proto::DirectoryEntry {
298                        name: "".into(),
299                        digest: "invalid b64".into(),
300                        size: 1,
301                    },
302                )),
303            };
304            &format!(
305                "/nar/snix-castore/{}",
306                BASE64URL_NOPAD.encode(&n.encode_to_vec())
307            )
308        };
309
310        // Invalid node proto should return NotFound
311        server
312            .method(Method::HEAD, invalid_url)
313            .add_query_params(qps)
314            .expect_failure()
315            .await
316            .assert_status_not_found();
317
318        // success, HEAD
319        server
320            .method(Method::HEAD, valid_url)
321            .add_query_params(qps)
322            .expect_success()
323            .await;
324
325        // success, GET
326        assert_eq!(
327            NAR_CONTENTS_SYMLINK.as_slice(),
328            server
329                .get(valid_url)
330                .add_query_params(qps)
331                .expect_success()
332                .await
333                .into_bytes(),
334            "Expected to get back NAR_CONTENTS_SYMLINK"
335        )
336    }
337
338    /// Uploading a NAR with a different file hash than what's specified in the URL
339    /// is considered an error.
340    #[traced_test]
341    #[tokio::test]
342    async fn test_put_wrong_narhash() {
343        let (server, _blob_service, _directory_service, _path_info_service) =
344            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
345
346        server
347            .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
348            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
349            .expect_failure()
350            .await;
351    }
352
353    /// Uploading a NAR with compression is not supported.
354    #[traced_test]
355    #[tokio::test]
356    async fn test_put_with_compression_fail() {
357        let (server, _blob_service, _directory_service, _path_info_service) =
358            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
359
360        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_SYMLINK.as_slice()).into();
361
362        let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
363
364        server
365            .put(&nar_url)
366            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
367            .expect_failure()
368            .await
369            .assert_status_unauthorized();
370    }
371
372    /// Upload a NAR with a single file, ensure the blob exists later on.
373    #[traced_test]
374    #[tokio::test]
375    async fn test_put_success() {
376        let (server, blob_service, _directory_service, _path_info_service) =
377            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
378
379        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_HELLOWORLD.as_slice()).into();
380        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
381
382        server
383            .put(&nar_url)
384            .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
385            .expect_success()
386            .await;
387
388        assert!(
389            blob_service
390                .has(&HELLOWORLD_BLOB_DIGEST)
391                .await
392                .expect("blobservice")
393        )
394    }
395
396    // Upload a NAR with blobs and directories, ensure blobs and directories
397    // were uploaded, by rendering the NAR stream from the root node we know
398    // describes these contents.
399    #[traced_test]
400    #[tokio::test]
401    async fn test_put_success2() {
402        let (server, blob_service, directory_service, _path_info_service) =
403            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
404
405        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
406        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
407
408        server
409            .put(&nar_url)
410            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
411            .expect_success()
412            .await;
413
414        let mut buf = Vec::new();
415        snix_store::nar::write_nar(
416            &mut buf,
417            &CASTORE_NODE_COMPLICATED,
418            blob_service,
419            directory_service,
420        )
421        .await
422        .expect("write nar");
423
424        assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
425    }
426
427    /// Upload a NAR, ensure a HEAD by NarHash returns a 2xx code.
428    #[traced_test]
429    #[tokio::test]
430    async fn test_put_root_nodes() {
431        let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
432            Router::new()
433                .route("/nar/:nar_str", axum::routing::put(super::put))
434                .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
435        );
436
437        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
438        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
439
440        // upload NAR
441        server
442            .put(&nar_url)
443            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
444            .expect_success()
445            .await;
446
447        // check HEAD by NarHash
448        server.method(Method::HEAD, &nar_url).expect_success().await;
449    }
450}