nar_bridge/
nar.rs

1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{headers::Range, TypedHeader};
5use axum_range::{KnownSize, Ranged};
6use bytes::Bytes;
7use data_encoding::BASE64URL_NOPAD;
8use futures::TryStreamExt;
9use nix_compat::{nix_http, nixbase32};
10use serde::Deserialize;
11use snix_store::nar::ingest_nar_and_hash;
12use std::io;
13use tokio_util::io::ReaderStream;
14use tracing::{instrument, warn, Span};
15
16use crate::AppState;
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct GetNARParams {
20    #[serde(rename = "narsize")]
21    nar_size: Option<u64>,
22}
23
24#[instrument(skip_all)]
25pub async fn get_head(
26    method: axum::http::Method,
27    ranges: Option<TypedHeader<Range>>,
28    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
29    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
30    axum::extract::State(AppState {
31        blob_service,
32        directory_service,
33        ..
34    }): axum::extract::State<AppState>,
35) -> Result<impl axum::response::IntoResponse, StatusCode> {
36    use prost::Message;
37    // We insist on the nar_size field being set.
38    // If it's not present, the client is misbehaving somehow.
39    let nar_size = nar_size.ok_or_else(|| {
40        warn!("no nar_size parameter set");
41        StatusCode::BAD_REQUEST
42    })?;
43
44    // b64decode the root node passed *by the user*
45    let root_node_proto = BASE64URL_NOPAD
46        .decode(root_node_enc.as_bytes())
47        .map_err(|e| {
48            warn!(err=%e, "unable to decode root node b64");
49            StatusCode::NOT_FOUND
50        })?;
51
52    // check the proto size to be somewhat reasonable before parsing it.
53    if root_node_proto.len() > 4096 {
54        warn!("rejected too large root node");
55        return Err(StatusCode::BAD_REQUEST);
56    }
57
58    // parse the proto
59    let root_node: snix_castore::proto::Entry = Message::decode(Bytes::from(root_node_proto))
60        .map_err(|e| {
61            warn!(err=%e, "unable to decode root node proto");
62            StatusCode::NOT_FOUND
63        })?;
64
65    let root_node = root_node.try_into_anonymous_node().map_err(|e| {
66        warn!(err=%e, "root node validation failed");
67        StatusCode::NOT_FOUND
68    })?;
69
70    Ok((
71        // headers
72        [
73            ("cache-control", "max-age=31536000, immutable"),
74            ("content-type", nix_http::MIME_TYPE_NAR),
75        ],
76        if method == axum::http::Method::HEAD {
77            // If this is a HEAD request, construct a response returning back the
78            // user-provided content-length, but don't actually talk to castore.
79            Response::builder()
80                .header("content-length", nar_size)
81                .body(Body::empty())
82                .unwrap()
83        } else if let Some(TypedHeader(ranges)) = ranges {
84            // If this is a range request, construct a seekable NAR reader.
85            let r =
86                snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
87                    .await
88                    .map_err(|e| {
89                        warn!(err=%e, "failed to construct seekable nar reader");
90                        StatusCode::INTERNAL_SERVER_ERROR
91                    })?;
92
93            // ensure the user-supplied nar size was correct, no point returning data otherwise.
94            if r.stream_len() != nar_size {
95                warn!(
96                    actual_nar_size = r.stream_len(),
97                    supplied_nar_size = nar_size,
98                    "wrong nar size supplied"
99                );
100                return Err(StatusCode::BAD_REQUEST);
101            }
102            Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
103        } else {
104            // use the non-seekable codepath if there's no range(s) requested,
105            // as it uses less memory.
106            let (w, r) = tokio::io::duplex(1024 * 8);
107
108            // spawn a task rendering the NAR to the client.
109            tokio::spawn(async move {
110                if let Err(e) =
111                    snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
112                {
113                    warn!(err=%e, "failed to write out NAR");
114                }
115            });
116
117            Response::builder()
118                .header("content-length", nar_size)
119                .body(Body::from_stream(ReaderStream::new(r)))
120                .unwrap()
121        },
122    ))
123}
124
125/// Handler to respond to GET/HEAD requests for recently uploaded NAR files.
126/// Nix probes at {filehash}.nar[.compression_suffix] to determine whether a NAR
127/// has already been uploaded, by responding to (some of) these requests we
128/// avoid it unnecessarily uploading.
129/// We don't keep a full K/V from NAR hash to root note around, only the
130/// in-memory cache used to connect to the castore node when processing a PUT
131/// for the NARInfo.
132#[instrument(skip_all, fields(nar_str))]
133pub async fn head_root_nodes(
134    axum::extract::Path(nar_str): axum::extract::Path<String>,
135    axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
136) -> Result<impl axum::response::IntoResponse, StatusCode> {
137    let (nar_hash, compression_suffix) =
138        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
139
140    // No paths with compression suffix are supported.
141    if !compression_suffix.is_empty() {
142        warn!(%compression_suffix, "invalid compression suffix requested");
143        return Err(StatusCode::UNAUTHORIZED);
144    }
145
146    // Check root_nodes, updating the moving it to the most recently used,
147    // as it might be referred in a subsequent NARInfo upload.
148    if root_nodes.write().get(&nar_hash).is_some() {
149        Ok("")
150    } else {
151        Err(StatusCode::NOT_FOUND)
152    }
153}
154
155#[instrument(skip_all)]
156pub async fn put(
157    axum::extract::Path(nar_str): axum::extract::Path<String>,
158    axum::extract::State(AppState {
159        blob_service,
160        directory_service,
161        root_nodes,
162        ..
163    }): axum::extract::State<AppState>,
164    request: axum::extract::Request,
165) -> Result<&'static str, StatusCode> {
166    let (nar_hash_expected, compression_suffix) =
167        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
168
169    // No paths with compression suffix are supported.
170    if !compression_suffix.is_empty() {
171        warn!(%compression_suffix, "invalid compression suffix requested");
172        return Err(StatusCode::UNAUTHORIZED);
173    }
174
175    let s = request.into_body().into_data_stream();
176
177    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
178        warn!(err=%e, "failed to read request body");
179        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
180    }));
181
182    // ingest the NAR
183    let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
184        blob_service.clone(),
185        directory_service.clone(),
186        &mut r,
187        &None,
188    )
189    .await
190    .map_err(io::Error::other)
191    .map_err(|e| {
192        warn!(err=%e, "failed to ingest nar");
193        StatusCode::INTERNAL_SERVER_ERROR
194    })?;
195
196    let s = Span::current();
197    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
198    s.record("nar_size", nar_size);
199
200    if nar_hash_expected != nar_hash_actual {
201        warn!(
202            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
203            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
204            "nar hash mismatch"
205        );
206        return Err(StatusCode::BAD_REQUEST);
207    }
208
209    // store mapping of narhash to root node into root_nodes.
210    // we need it later to populate the root node when accepting the PathInfo.
211    root_nodes.write().put(nar_hash_actual, root_node);
212
213    Ok("")
214}
215
216#[cfg(test)]
217mod tests {
218    use std::{
219        num::NonZero,
220        sync::{Arc, LazyLock},
221    };
222
223    use axum::{http::Method, Router};
224    use bytes::Bytes;
225    use data_encoding::BASE64URL_NOPAD;
226    use nix_compat::nixbase32;
227    use sha2::Digest;
228    use snix_castore::{
229        blobservice::{BlobService, MemoryBlobService},
230        directoryservice::{DirectoryService, MemoryDirectoryService},
231        fixtures::HELLOWORLD_BLOB_DIGEST,
232    };
233    use snix_store::{
234        fixtures::{
235            CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
236            NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
237        },
238        pathinfoservice::{MemoryPathInfoService, PathInfoService},
239    };
240    use tracing_test::traced_test;
241
242    use crate::AppState;
243
244    pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
245        use prost::Message;
246        BASE64URL_NOPAD.encode(
247            &snix_castore::proto::Entry::from_name_and_node(
248                "".into(),
249                CASTORE_NODE_SYMLINK.clone(),
250            )
251            .encode_to_vec(),
252        )
253    });
254
255    /// Accepts a router without state, and returns a [axum_test::TestServer].
256    fn gen_server(
257        router: axum::Router<AppState>,
258    ) -> (
259        axum_test::TestServer,
260        impl BlobService,
261        impl DirectoryService,
262        impl PathInfoService,
263    ) {
264        let blob_service = Arc::new(MemoryBlobService::default());
265        let directory_service = Arc::new(MemoryDirectoryService::default());
266        let path_info_service = Arc::new(MemoryPathInfoService::default());
267
268        let app = router.with_state(AppState::new(
269            blob_service.clone(),
270            directory_service.clone(),
271            path_info_service.clone(),
272            NonZero::new(100).unwrap(),
273        ));
274
275        (
276            axum_test::TestServer::new(app).unwrap(),
277            blob_service,
278            directory_service,
279            path_info_service,
280        )
281    }
282
283    #[traced_test]
284    #[tokio::test]
285    async fn test_get_head() {
286        let (server, _blob_service, _directory_service, _path_info_service) =
287            gen_server(Router::new().route(
288                "/nar/snix-castore/:root_node_enc",
289                axum::routing::get(super::get_head),
290            ));
291
292        // Empty nar_str should be NotFound
293        server
294            .method(Method::HEAD, "/nar/snix-castore/")
295            .expect_failure()
296            .await
297            .assert_status_not_found();
298
299        let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
300        let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
301
302        // Missing narsize should be BadRequest
303        server
304            .method(Method::HEAD, valid_url)
305            .expect_failure()
306            .await
307            .assert_status_bad_request();
308
309        let invalid_url = {
310            use prost::Message;
311            let n = snix_castore::proto::Entry {
312                entry: Some(snix_castore::proto::entry::Entry::Directory(
313                    snix_castore::proto::DirectoryEntry {
314                        name: "".into(),
315                        digest: "invalid b64".into(),
316                        size: 1,
317                    },
318                )),
319            };
320            &format!(
321                "/nar/snix-castore/{}",
322                BASE64URL_NOPAD.encode(&n.encode_to_vec())
323            )
324        };
325
326        // Invalid node proto should return NotFound
327        server
328            .method(Method::HEAD, invalid_url)
329            .add_query_params(qps)
330            .expect_failure()
331            .await
332            .assert_status_not_found();
333
334        // success, HEAD
335        server
336            .method(Method::HEAD, valid_url)
337            .add_query_params(qps)
338            .expect_success()
339            .await;
340
341        // success, GET
342        assert_eq!(
343            NAR_CONTENTS_SYMLINK.as_slice(),
344            server
345                .get(valid_url)
346                .add_query_params(qps)
347                .expect_success()
348                .await
349                .into_bytes(),
350            "Expected to get back NAR_CONTENTS_SYMLINK"
351        )
352    }
353
354    /// Uploading a NAR with a different file hash than what's specified in the URL
355    /// is considered an error.
356    #[traced_test]
357    #[tokio::test]
358    async fn test_put_wrong_narhash() {
359        let (server, _blob_service, _directory_service, _path_info_service) =
360            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
361
362        server
363            .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
364            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
365            .expect_failure()
366            .await;
367    }
368
369    /// Uploading a NAR with compression is not supported.
370    #[traced_test]
371    #[tokio::test]
372    async fn test_put_with_compression_fail() {
373        let (server, _blob_service, _directory_service, _path_info_service) =
374            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
375
376        let nar_sha256: [u8; 32] = sha2::Sha256::new_with_prefix(NAR_CONTENTS_SYMLINK.as_slice())
377            .finalize()
378            .into();
379
380        let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
381
382        server
383            .put(&nar_url)
384            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
385            .expect_failure()
386            .await
387            .assert_status_unauthorized();
388    }
389
390    /// Upload a NAR with a single file, ensure the blob exists later on.
391    #[traced_test]
392    #[tokio::test]
393    async fn test_put_success() {
394        let (server, blob_service, _directory_service, _path_info_service) =
395            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
396
397        let nar_sha256: [u8; 32] =
398            sha2::Sha256::new_with_prefix(NAR_CONTENTS_HELLOWORLD.as_slice())
399                .finalize()
400                .into();
401
402        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
403
404        server
405            .put(&nar_url)
406            .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
407            .expect_success()
408            .await;
409
410        assert!(blob_service
411            .has(&HELLOWORLD_BLOB_DIGEST)
412            .await
413            .expect("blobservice"))
414    }
415
416    // Upload a NAR with blobs and directories, ensure blobs and directories
417    // were uploaded, by rendering the NAR stream from the root node we know
418    // describes these contents.
419    #[traced_test]
420    #[tokio::test]
421    async fn test_put_success2() {
422        let (server, blob_service, directory_service, _path_info_service) =
423            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
424
425        let nar_sha256: [u8; 32] =
426            sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
427                .finalize()
428                .into();
429
430        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
431
432        server
433            .put(&nar_url)
434            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
435            .expect_success()
436            .await;
437
438        let mut buf = Vec::new();
439        snix_store::nar::write_nar(
440            &mut buf,
441            &CASTORE_NODE_COMPLICATED,
442            blob_service,
443            directory_service,
444        )
445        .await
446        .expect("write nar");
447
448        assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
449    }
450
451    /// Upload a NAR, ensure a HEAD by NarHash returns a 2xx code.
452    #[traced_test]
453    #[tokio::test]
454    async fn test_put_root_nodes() {
455        let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
456            Router::new()
457                .route("/nar/:nar_str", axum::routing::put(super::put))
458                .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
459        );
460
461        let nar_sha256: [u8; 32] =
462            sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
463                .finalize()
464                .into();
465
466        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
467
468        // upload NAR
469        server
470            .put(&nar_url)
471            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
472            .expect_success()
473            .await;
474
475        // check HEAD by NarHash
476        server.method(Method::HEAD, &nar_url).expect_success().await;
477    }
478}