nar_bridge/
nar.rs

1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use bytes::Bytes;
7use data_encoding::BASE64URL_NOPAD;
8use futures::TryStreamExt;
9use nix_compat::{nix_http, nixbase32};
10use serde::Deserialize;
11use snix_store::nar::ingest_nar_and_hash;
12use std::io;
13use tokio_util::io::ReaderStream;
14use tracing::{Span, instrument, warn};
15
16use crate::AppState;
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct GetNARParams {
20    #[serde(rename = "narsize")]
21    nar_size: Option<u64>,
22}
23
24#[instrument(skip_all)]
25pub async fn get_head(
26    method: axum::http::Method,
27    ranges: Option<TypedHeader<Range>>,
28    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
29    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
30    axum::extract::State(AppState {
31        blob_service,
32        directory_service,
33        ..
34    }): axum::extract::State<AppState>,
35) -> Result<impl axum::response::IntoResponse, StatusCode> {
36    use prost::Message;
37    // We insist on the nar_size field being set. If the client dropped it from
38    // the NARInfo we sent, it's misbehaving and we reject it.
39    let nar_size = nar_size.ok_or_else(|| {
40        warn!("no nar_size parameter set");
41        StatusCode::BAD_REQUEST
42    })?;
43
44    // b64decode the root node passed *by the user*
45    let root_node_proto = BASE64URL_NOPAD
46        .decode(root_node_enc.as_bytes())
47        .map_err(|e| {
48            warn!(err=%e, "unable to decode root node b64");
49            StatusCode::NOT_FOUND
50        })?;
51
52    // check the proto size to be somewhat reasonable before parsing it.
53    if root_node_proto.len() > 4096 {
54        warn!("rejected too large root node");
55        return Err(StatusCode::BAD_REQUEST);
56    }
57
58    // parse the proto
59    let root_node: snix_castore::proto::Entry = Message::decode(Bytes::from(root_node_proto))
60        .map_err(|e| {
61            warn!(err=%e, "unable to decode root node proto");
62            StatusCode::NOT_FOUND
63        })?;
64
65    let root_node = root_node.try_into_anonymous_node().map_err(|e| {
66        warn!(err=%e, "root node validation failed");
67        StatusCode::NOT_FOUND
68    })?;
69
70    Ok((
71        // headers
72        [
73            ("cache-control", "max-age=31536000, immutable"),
74            ("content-type", nix_http::MIME_TYPE_NAR),
75        ],
76        if method == axum::http::Method::HEAD {
77            // If this is a HEAD request, construct a response returning back the
78            // user-provided content-length, but don't actually talk to castore.
79            // If the client lied about it, we will echo back a wrong `Content-Length`,
80            // which is their problem.
81            Response::builder()
82                .header("content-length", nar_size)
83                .body(Body::empty())
84                .unwrap()
85        } else if let Some(TypedHeader(ranges)) = ranges {
86            // If this is a range request, construct a seekable NAR reader.
87            let r =
88                snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
89                    .await
90                    .map_err(|e| {
91                        warn!(err=%e, "failed to construct seekable nar reader");
92                        StatusCode::INTERNAL_SERVER_ERROR
93                    })?;
94
95            // ensure the user-supplied nar size was correct, no point returning data otherwise.
96            if r.stream_len() != nar_size {
97                warn!(
98                    actual_nar_size = r.stream_len(),
99                    supplied_nar_size = nar_size,
100                    "wrong nar size supplied"
101                );
102                return Err(StatusCode::BAD_REQUEST);
103            }
104            Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
105        } else {
106            // use the non-seekable codepath if there's no range(s) requested,
107            // as it uses less memory.
108            let (w, r) = tokio::io::duplex(1024 * 8);
109
110            // spawn a task rendering the NAR to the client.
111            tokio::spawn(async move {
112                if let Err(e) =
113                    snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
114                {
115                    warn!(err=%e, "failed to write out NAR");
116                }
117            });
118
119            Response::builder()
120                // If the client lied about it, we will echo back a wrong `Content-Length`,
121                // which is their problem.
122                .header("content-length", nar_size)
123                .body(Body::from_stream(ReaderStream::new(r)))
124                .unwrap()
125        },
126    ))
127}
128
129/// Handler to respond to GET/HEAD requests for recently uploaded NAR files.
130/// Nix probes at {filehash}.nar[.compression_suffix] to determine whether a NAR
131/// has already been uploaded, by responding to (some of) these requests we
132/// avoid it unnecessarily uploading.
133/// We don't keep a full K/V from NAR hash to root note around, only the
134/// in-memory cache used to connect to the castore node when processing a PUT
135/// for the NARInfo.
136#[instrument(skip_all, fields(nar_str))]
137pub async fn head_root_nodes(
138    axum::extract::Path(nar_str): axum::extract::Path<String>,
139    axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
140) -> Result<impl axum::response::IntoResponse, StatusCode> {
141    let (nar_hash, compression_suffix) =
142        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
143
144    // No paths with compression suffix are supported.
145    if !compression_suffix.is_empty() {
146        warn!(%compression_suffix, "invalid compression suffix requested");
147        return Err(StatusCode::UNAUTHORIZED);
148    }
149
150    // Check root_nodes, updating the moving it to the most recently used,
151    // as it might be referred in a subsequent NARInfo upload.
152    if root_nodes.write().get(&nar_hash).is_some() {
153        Ok("")
154    } else {
155        Err(StatusCode::NOT_FOUND)
156    }
157}
158
159#[instrument(skip_all)]
160pub async fn put(
161    axum::extract::Path(nar_str): axum::extract::Path<String>,
162    axum::extract::State(AppState {
163        blob_service,
164        directory_service,
165        root_nodes,
166        ..
167    }): axum::extract::State<AppState>,
168    request: axum::extract::Request,
169) -> Result<&'static str, StatusCode> {
170    let (nar_hash_expected, compression_suffix) =
171        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
172
173    // No paths with compression suffix are supported.
174    if !compression_suffix.is_empty() {
175        warn!(%compression_suffix, "invalid compression suffix requested");
176        return Err(StatusCode::UNAUTHORIZED);
177    }
178
179    let s = request.into_body().into_data_stream();
180
181    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
182        warn!(err=%e, "failed to read request body");
183        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
184    }));
185
186    // ingest the NAR
187    let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
188        blob_service.clone(),
189        directory_service.clone(),
190        &mut r,
191        &None,
192    )
193    .await
194    .map_err(io::Error::other)
195    .map_err(|e| {
196        warn!(err=%e, "failed to ingest nar");
197        StatusCode::INTERNAL_SERVER_ERROR
198    })?;
199
200    let s = Span::current();
201    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
202    s.record("nar_size", nar_size);
203
204    if nar_hash_expected != nar_hash_actual {
205        warn!(
206            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
207            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
208            "nar hash mismatch"
209        );
210        return Err(StatusCode::BAD_REQUEST);
211    }
212
213    // store mapping of narhash to root node into root_nodes.
214    // we need it later to populate the root node when accepting the PathInfo.
215    root_nodes.write().put(nar_hash_actual, root_node);
216
217    Ok("")
218}
219
220#[cfg(test)]
221mod tests {
222    use std::{
223        num::NonZero,
224        sync::{Arc, LazyLock},
225    };
226
227    use axum::{Router, http::Method};
228    use bytes::Bytes;
229    use data_encoding::BASE64URL_NOPAD;
230    use nix_compat::nixbase32;
231    use sha2::Digest;
232    use snix_castore::{
233        blobservice::{BlobService, MemoryBlobService},
234        directoryservice::{DirectoryService, MemoryDirectoryService},
235        fixtures::HELLOWORLD_BLOB_DIGEST,
236    };
237    use snix_store::{
238        fixtures::{
239            CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
240            NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
241        },
242        pathinfoservice::{MemoryPathInfoService, PathInfoService},
243    };
244    use tracing_test::traced_test;
245
246    use crate::AppState;
247
248    pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
249        use prost::Message;
250        BASE64URL_NOPAD.encode(
251            &snix_castore::proto::Entry::from_name_and_node(
252                "".into(),
253                CASTORE_NODE_SYMLINK.clone(),
254            )
255            .encode_to_vec(),
256        )
257    });
258
259    /// Accepts a router without state, and returns a [axum_test::TestServer].
260    fn gen_server(
261        router: axum::Router<AppState>,
262    ) -> (
263        axum_test::TestServer,
264        impl BlobService,
265        impl DirectoryService,
266        impl PathInfoService,
267    ) {
268        let blob_service = Arc::new(MemoryBlobService::default());
269        let directory_service = Arc::new(MemoryDirectoryService::default());
270        let path_info_service = Arc::new(MemoryPathInfoService::default());
271
272        let app = router.with_state(AppState::new(
273            blob_service.clone(),
274            directory_service.clone(),
275            path_info_service.clone(),
276            NonZero::new(100).unwrap(),
277        ));
278
279        (
280            axum_test::TestServer::new(app).unwrap(),
281            blob_service,
282            directory_service,
283            path_info_service,
284        )
285    }
286
287    #[traced_test]
288    #[tokio::test]
289    async fn test_get_head() {
290        let (server, _blob_service, _directory_service, _path_info_service) =
291            gen_server(Router::new().route(
292                "/nar/snix-castore/:root_node_enc",
293                axum::routing::get(super::get_head),
294            ));
295
296        // Empty nar_str should be NotFound
297        server
298            .method(Method::HEAD, "/nar/snix-castore/")
299            .expect_failure()
300            .await
301            .assert_status_not_found();
302
303        let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
304        let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
305
306        // Missing narsize should be BadRequest
307        server
308            .method(Method::HEAD, valid_url)
309            .expect_failure()
310            .await
311            .assert_status_bad_request();
312
313        let invalid_url = {
314            use prost::Message;
315            let n = snix_castore::proto::Entry {
316                entry: Some(snix_castore::proto::entry::Entry::Directory(
317                    snix_castore::proto::DirectoryEntry {
318                        name: "".into(),
319                        digest: "invalid b64".into(),
320                        size: 1,
321                    },
322                )),
323            };
324            &format!(
325                "/nar/snix-castore/{}",
326                BASE64URL_NOPAD.encode(&n.encode_to_vec())
327            )
328        };
329
330        // Invalid node proto should return NotFound
331        server
332            .method(Method::HEAD, invalid_url)
333            .add_query_params(qps)
334            .expect_failure()
335            .await
336            .assert_status_not_found();
337
338        // success, HEAD
339        server
340            .method(Method::HEAD, valid_url)
341            .add_query_params(qps)
342            .expect_success()
343            .await;
344
345        // success, GET
346        assert_eq!(
347            NAR_CONTENTS_SYMLINK.as_slice(),
348            server
349                .get(valid_url)
350                .add_query_params(qps)
351                .expect_success()
352                .await
353                .into_bytes(),
354            "Expected to get back NAR_CONTENTS_SYMLINK"
355        )
356    }
357
358    /// Uploading a NAR with a different file hash than what's specified in the URL
359    /// is considered an error.
360    #[traced_test]
361    #[tokio::test]
362    async fn test_put_wrong_narhash() {
363        let (server, _blob_service, _directory_service, _path_info_service) =
364            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
365
366        server
367            .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
368            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
369            .expect_failure()
370            .await;
371    }
372
373    /// Uploading a NAR with compression is not supported.
374    #[traced_test]
375    #[tokio::test]
376    async fn test_put_with_compression_fail() {
377        let (server, _blob_service, _directory_service, _path_info_service) =
378            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
379
380        let nar_sha256: [u8; 32] = sha2::Sha256::new_with_prefix(NAR_CONTENTS_SYMLINK.as_slice())
381            .finalize()
382            .into();
383
384        let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
385
386        server
387            .put(&nar_url)
388            .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
389            .expect_failure()
390            .await
391            .assert_status_unauthorized();
392    }
393
394    /// Upload a NAR with a single file, ensure the blob exists later on.
395    #[traced_test]
396    #[tokio::test]
397    async fn test_put_success() {
398        let (server, blob_service, _directory_service, _path_info_service) =
399            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
400
401        let nar_sha256: [u8; 32] =
402            sha2::Sha256::new_with_prefix(NAR_CONTENTS_HELLOWORLD.as_slice())
403                .finalize()
404                .into();
405
406        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
407
408        server
409            .put(&nar_url)
410            .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
411            .expect_success()
412            .await;
413
414        assert!(
415            blob_service
416                .has(&HELLOWORLD_BLOB_DIGEST)
417                .await
418                .expect("blobservice")
419        )
420    }
421
422    // Upload a NAR with blobs and directories, ensure blobs and directories
423    // were uploaded, by rendering the NAR stream from the root node we know
424    // describes these contents.
425    #[traced_test]
426    #[tokio::test]
427    async fn test_put_success2() {
428        let (server, blob_service, directory_service, _path_info_service) =
429            gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
430
431        let nar_sha256: [u8; 32] =
432            sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
433                .finalize()
434                .into();
435
436        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
437
438        server
439            .put(&nar_url)
440            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
441            .expect_success()
442            .await;
443
444        let mut buf = Vec::new();
445        snix_store::nar::write_nar(
446            &mut buf,
447            &CASTORE_NODE_COMPLICATED,
448            blob_service,
449            directory_service,
450        )
451        .await
452        .expect("write nar");
453
454        assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
455    }
456
457    /// Upload a NAR, ensure a HEAD by NarHash returns a 2xx code.
458    #[traced_test]
459    #[tokio::test]
460    async fn test_put_root_nodes() {
461        let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
462            Router::new()
463                .route("/nar/:nar_str", axum::routing::put(super::put))
464                .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
465        );
466
467        let nar_sha256: [u8; 32] =
468            sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
469                .finalize()
470                .into();
471
472        let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
473
474        // upload NAR
475        server
476            .put(&nar_url)
477            .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
478            .expect_success()
479            .await;
480
481        // check HEAD by NarHash
482        server.method(Method::HEAD, &nar_url).expect_success().await;
483    }
484}