nar_bridge/
nar.rs

1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use futures::TryStreamExt;
7use nix_compat::{nix_http, nixbase32};
8use serde::Deserialize;
9use snix_castore::proto::parse_urlsafe_proto;
10use snix_store::nar::ingest_nar_and_hash;
11use std::io;
12use tokio_util::io::ReaderStream;
13use tracing::{Span, instrument, warn};
14
15use crate::AppState;
16
17#[derive(Debug, Deserialize)]
18pub(crate) struct GetNARParams {
19    #[serde(rename = "narsize")]
20    nar_size: Option<u64>,
21}
22
23#[instrument(skip_all)]
24pub async fn get_head(
25    method: axum::http::Method,
26    ranges: Option<TypedHeader<Range>>,
27    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
28    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
29    axum::extract::State(AppState {
30        blob_service,
31        directory_service,
32        ..
33    }): axum::extract::State<AppState>,
34) -> Result<impl axum::response::IntoResponse, StatusCode> {
35    // We insist on the nar_size field being set. If the client dropped it from
36    // the NARInfo we sent, it's misbehaving and we reject it.
37    let nar_size = nar_size.ok_or_else(|| {
38        warn!("no nar_size parameter set");
39        StatusCode::BAD_REQUEST
40    })?;
41
42    // Attempt to parse the root node passed *by the user*
43    let root_node = parse_urlsafe_proto(root_node_enc).ok_or_else(|| {
44        warn!("unable to parse castore-infused NAR path");
45        StatusCode::NOT_FOUND
46    })?;
47
48    Ok((
49        // headers
50        [
51            ("cache-control", "max-age=31536000, immutable"),
52            ("content-type", nix_http::MIME_TYPE_NAR),
53        ],
54        if method == axum::http::Method::HEAD {
55            // If this is a HEAD request, construct a response returning back the
56            // user-provided content-length, but don't actually talk to castore.
57            // If the client lied about it, we will echo back a wrong `Content-Length`,
58            // which is their problem.
59            Response::builder()
60                .header("content-length", nar_size)
61                .body(Body::empty())
62                .unwrap()
63        } else if let Some(TypedHeader(ranges)) = ranges {
64            // If this is a range request, construct a seekable NAR reader.
65            let r =
66                snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
67                    .await
68                    .map_err(|e| {
69                        warn!(err=%e, "failed to construct seekable nar reader");
70                        StatusCode::INTERNAL_SERVER_ERROR
71                    })?;
72
73            // ensure the user-supplied nar size was correct, no point returning data otherwise.
74            if r.stream_len() != nar_size {
75                warn!(
76                    actual_nar_size = r.stream_len(),
77                    supplied_nar_size = nar_size,
78                    "wrong nar size supplied"
79                );
80                return Err(StatusCode::BAD_REQUEST);
81            }
82            Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
83        } else {
84            // use the non-seekable codepath if there's no range(s) requested,
85            // as it uses less memory.
86            let (w, r) = tokio::io::duplex(1024 * 8);
87
88            // spawn a task rendering the NAR to the client.
89            tokio::spawn(async move {
90                if let Err(e) =
91                    snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
92                {
93                    warn!(err=%e, "failed to write out NAR");
94                }
95            });
96
97            Response::builder()
98                // If the client lied about it, we will echo back a wrong `Content-Length`,
99                // which is their problem.
100                .header("content-length", nar_size)
101                .body(Body::from_stream(ReaderStream::new(r)))
102                .unwrap()
103        },
104    ))
105}
106
107/// Handler to respond to GET/HEAD requests for recently uploaded NAR files.
108/// Nix probes at {filehash}.nar[.compression_suffix] to determine whether a NAR
109/// has already been uploaded, by responding to (some of) these requests we
110/// avoid it unnecessarily uploading.
111/// We don't keep a full K/V from NAR hash to root note around, only the
112/// in-memory cache used to connect to the castore node when processing a PUT
113/// for the NARInfo.
114#[instrument(skip_all, fields(nar_str))]
115pub async fn head_root_nodes(
116    axum::extract::Path(nar_str): axum::extract::Path<String>,
117    axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
118) -> Result<impl axum::response::IntoResponse, StatusCode> {
119    let (nar_hash, compression_suffix) =
120        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
121
122    // No paths with compression suffix are supported.
123    if !compression_suffix.is_empty() {
124        warn!(%compression_suffix, "invalid compression suffix requested");
125        return Err(StatusCode::UNAUTHORIZED);
126    }
127
128    // Check root_nodes, updating the moving it to the most recently used,
129    // as it might be referred in a subsequent NARInfo upload.
130    if root_nodes.write().get(&nar_hash).is_some() {
131        Ok("")
132    } else {
133        Err(StatusCode::NOT_FOUND)
134    }
135}
136
137#[instrument(skip_all)]
138pub async fn put(
139    axum::extract::Path(nar_str): axum::extract::Path<String>,
140    axum::extract::State(AppState {
141        blob_service,
142        directory_service,
143        root_nodes,
144        ..
145    }): axum::extract::State<AppState>,
146    request: axum::extract::Request,
147) -> Result<&'static str, StatusCode> {
148    let (nar_hash_expected, compression_suffix) =
149        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
150
151    // No paths with compression suffix are supported.
152    if !compression_suffix.is_empty() {
153        warn!(%compression_suffix, "invalid compression suffix requested");
154        return Err(StatusCode::UNAUTHORIZED);
155    }
156
157    let s = request.into_body().into_data_stream();
158
159    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
160        warn!(err=%e, "failed to read request body");
161        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
162    }));
163
164    // ingest the NAR
165    let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
166        blob_service.clone(),
167        directory_service.clone(),
168        &mut r,
169        &None,
170    )
171    .await
172    .map_err(io::Error::other)
173    .map_err(|e| {
174        warn!(err=%e, "failed to ingest nar");
175        StatusCode::INTERNAL_SERVER_ERROR
176    })?;
177
178    let s = Span::current();
179    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
180    s.record("nar_size", nar_size);
181
182    if nar_hash_expected != nar_hash_actual {
183        warn!(
184            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
185            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
186            "nar hash mismatch"
187        );
188        return Err(StatusCode::BAD_REQUEST);
189    }
190
191    // store mapping of narhash to root node into root_nodes.
192    // we need it later to populate the root node when accepting the PathInfo.
193    root_nodes.write().put(nar_hash_actual, root_node);
194
195    Ok("")
196}
197
198#[cfg(test)]
199mod tests {
200    use std::{
201        num::NonZero,
202        sync::{Arc, LazyLock},
203    };
204
205    use axum::{Router, http::Method};
206    use bytes::Bytes;
207    use data_encoding::BASE64URL_NOPAD;
208    use nix_compat::nixbase32;
209    use sha2::Digest;
210    use snix_castore::{
211        blobservice::{BlobService, MemoryBlobService},
212        directoryservice::DirectoryService,
213        fixtures::HELLOWORLD_BLOB_DIGEST,
214        utils::gen_test_directory_service,
215    };
216    use snix_store::{
217        fixtures::{
218            CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
219            NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
220        },
221        pathinfoservice::PathInfoService,
222        utils::gen_test_pathinfo_service,
223    };
224    use tracing_test::traced_test;
225
226    use crate::AppState;
227
228    pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
229        use prost::Message;
230        BASE64URL_NOPAD.encode(
231            &snix_castore::proto::Entry::from_name_and_node(
232                "".into(),
233                CASTORE_NODE_SYMLINK.clone(),
234            )
235            .encode_to_vec(),
236        )
237    });
238
239    /// Accepts a router without state, and returns a [axum_test::TestServer].
240    /// Also returns the underlying services, so they can be poked with during testing.
241    fn gen_server(
242        router: axum::Router<AppState>,
243    ) -> (
244        axum_test::TestServer,
245        impl BlobService,
246        impl DirectoryService,
247        impl PathInfoService,
248    ) {
249        let blob_service = Arc::new(MemoryBlobService::default());
250        let directory_service = Arc::new(gen_test_directory_service());
251        let path_info_service = Arc::new(gen_test_pathinfo_service());
252
253        let app = router.with_state(AppState::new(
254            blob_service.clone(),
255            directory_service.clone(),
256            path_info_service.clone(),
257            NonZero::new(100).unwrap(),
258        ));
259
260        (
261            axum_test::TestServer::new(app),
262            blob_service,
263            directory_service,
264            path_info_service,
265        )
266    }
267
268    #[traced_test]
269    #[tokio::test]
270    async fn test_get_head() {
271        let (server, _blob_service, _directory_service, _path_info_service) =
272            gen_server(Router::new().route(
273                "/nar/snix-castore/{root_node_enc}",
274                axum::routing::get(super::get_head),
275            ));
276
277        // Empty nar_str should be NotFound
278        server
279            .method(Method::HEAD, "/nar/snix-castore/")
280            .expect_failure()
281            .await
282            .assert_status_not_found();
283
284        let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
285        let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
286
287        // Missing narsize should be BadRequest
288        server
289            .method(Method::HEAD, valid_url)
290            .expect_failure()
291            .await
292            .assert_status_bad_request();
293
294        let invalid_url = {
295            use prost::Message;
296            let n = snix_castore::proto::Entry {
297                entry: Some(snix_castore::proto::entry::Entry::Directory(
298                    snix_castore::proto::DirectoryEntry {
299                        name: "".into(),
300                        digest: "invalid b64".into(),
301                        size: 1,
302                    },
303                )),
304            };
305            &format!(
306                "/nar/snix-castore/{}",
307                BASE64URL_NOPAD.encode(&n.encode_to_vec())
308            )
309        };
310
311        // Invalid node proto should return NotFound
312        server
313            .method(Method::HEAD, invalid_url)
314            .add_query_params(qps)
315            .expect_failure()
316            .await
317            .assert_status_not_found();
318
319        // success, HEAD
320        server
321            .method(Method::HEAD, valid_url)
322            .add_query_params(qps)
323            .expect_success()
324            .await;
325
326        // success, GET
327        assert_eq!(
328            NAR_CONTENTS_SYMLINK.as_slice(),
329            server
330                .get(valid_url)
331                .add_query_params(qps)
332                .expect_success()
333                .await
334                .into_bytes(),
335            "Expected to get back NAR_CONTENTS_SYMLINK"
336        )
337    }
338
339    /// Uploading a NAR with a different file hash than what's specified in the URL
340    /// is considered an error.
341    #[traced_test]
342    #[tokio::test]
343    async fn test_put_wrong_narhash() {
344        let (server, _blob_service, _directory_service, _path_info_service) =
345            gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
346
347        server
348            .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
349            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
350            .expect_failure()
351            .await;
352    }
353
354    /// Uploading a NAR with compression is not supported.
355    #[traced_test]
356    #[tokio::test]
357    async fn test_put_with_compression_fail() {
358        let (server, _blob_service, _directory_service, _path_info_service) =
359            gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
360
361        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_SYMLINK.as_slice()).into();
362
363        let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
364
365        server
366            .put(&nar_url)
367            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
368            .expect_failure()
369            .await
370            .assert_status_unauthorized();
371    }
372
373    /// Upload a NAR with a single file, ensure the blob exists later on.
374    #[traced_test]
375    #[tokio::test]
376    async fn test_put_success() {
377        let (server, blob_service, _directory_service, _path_info_service) =
378            gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
379
380        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_HELLOWORLD.as_slice()).into();
381        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
382
383        server
384            .put(&nar_url)
385            .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
386            .expect_success()
387            .await;
388
389        assert!(
390            blob_service
391                .has(&HELLOWORLD_BLOB_DIGEST)
392                .await
393                .expect("blobservice")
394        )
395    }
396
397    // Upload a NAR with blobs and directories, ensure blobs and directories
398    // were uploaded, by rendering the NAR stream from the root node we know
399    // describes these contents.
400    #[traced_test]
401    #[tokio::test]
402    async fn test_put_success2() {
403        let (server, blob_service, directory_service, _path_info_service) =
404            gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
405
406        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
407        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
408
409        server
410            .put(&nar_url)
411            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
412            .expect_success()
413            .await;
414
415        let mut buf = Vec::new();
416        snix_store::nar::write_nar(
417            &mut buf,
418            &CASTORE_NODE_COMPLICATED,
419            blob_service,
420            directory_service,
421        )
422        .await
423        .expect("write nar");
424
425        assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
426    }
427
428    /// Upload a NAR, ensure a HEAD by NarHash returns a 2xx code.
429    #[traced_test]
430    #[tokio::test]
431    async fn test_put_root_nodes() {
432        let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
433            Router::new()
434                .route("/nar/{nar_str}", axum::routing::put(super::put))
435                .route("/nar/{nar_str}", axum::routing::get(super::head_root_nodes)),
436        );
437
438        let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
439        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
440
441        // upload NAR
442        server
443            .put(&nar_url)
444            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
445            .expect_success()
446            .await;
447
448        // check HEAD by NarHash
449        server.method(Method::HEAD, &nar_url).expect_success().await;
450    }
451}