1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use bytes::Bytes;
7use data_encoding::BASE64URL_NOPAD;
8use futures::TryStreamExt;
9use nix_compat::{nix_http, nixbase32};
10use serde::Deserialize;
11use snix_store::nar::ingest_nar_and_hash;
12use std::io;
13use tokio_util::io::ReaderStream;
14use tracing::{Span, instrument, warn};
15
16use crate::AppState;
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct GetNARParams {
20 #[serde(rename = "narsize")]
21 nar_size: Option<u64>,
22}
23
24#[instrument(skip_all)]
25pub async fn get_head(
26 method: axum::http::Method,
27 ranges: Option<TypedHeader<Range>>,
28 axum::extract::Path(root_node_enc): axum::extract::Path<String>,
29 axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
30 axum::extract::State(AppState {
31 blob_service,
32 directory_service,
33 ..
34 }): axum::extract::State<AppState>,
35) -> Result<impl axum::response::IntoResponse, StatusCode> {
36 use prost::Message;
37 let nar_size = nar_size.ok_or_else(|| {
40 warn!("no nar_size parameter set");
41 StatusCode::BAD_REQUEST
42 })?;
43
44 let root_node_proto = BASE64URL_NOPAD
46 .decode(root_node_enc.as_bytes())
47 .map_err(|e| {
48 warn!(err=%e, "unable to decode root node b64");
49 StatusCode::NOT_FOUND
50 })?;
51
52 if root_node_proto.len() > 4096 {
54 warn!("rejected too large root node");
55 return Err(StatusCode::BAD_REQUEST);
56 }
57
58 let root_node: snix_castore::proto::Entry = Message::decode(Bytes::from(root_node_proto))
60 .map_err(|e| {
61 warn!(err=%e, "unable to decode root node proto");
62 StatusCode::NOT_FOUND
63 })?;
64
65 let root_node = root_node.try_into_anonymous_node().map_err(|e| {
66 warn!(err=%e, "root node validation failed");
67 StatusCode::NOT_FOUND
68 })?;
69
70 Ok((
71 [
73 ("cache-control", "max-age=31536000, immutable"),
74 ("content-type", nix_http::MIME_TYPE_NAR),
75 ],
76 if method == axum::http::Method::HEAD {
77 Response::builder()
82 .header("content-length", nar_size)
83 .body(Body::empty())
84 .unwrap()
85 } else if let Some(TypedHeader(ranges)) = ranges {
86 let r =
88 snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
89 .await
90 .map_err(|e| {
91 warn!(err=%e, "failed to construct seekable nar reader");
92 StatusCode::INTERNAL_SERVER_ERROR
93 })?;
94
95 if r.stream_len() != nar_size {
97 warn!(
98 actual_nar_size = r.stream_len(),
99 supplied_nar_size = nar_size,
100 "wrong nar size supplied"
101 );
102 return Err(StatusCode::BAD_REQUEST);
103 }
104 Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
105 } else {
106 let (w, r) = tokio::io::duplex(1024 * 8);
109
110 tokio::spawn(async move {
112 if let Err(e) =
113 snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
114 {
115 warn!(err=%e, "failed to write out NAR");
116 }
117 });
118
119 Response::builder()
120 .header("content-length", nar_size)
123 .body(Body::from_stream(ReaderStream::new(r)))
124 .unwrap()
125 },
126 ))
127}
128
129#[instrument(skip_all, fields(nar_str))]
137pub async fn head_root_nodes(
138 axum::extract::Path(nar_str): axum::extract::Path<String>,
139 axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
140) -> Result<impl axum::response::IntoResponse, StatusCode> {
141 let (nar_hash, compression_suffix) =
142 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
143
144 if !compression_suffix.is_empty() {
146 warn!(%compression_suffix, "invalid compression suffix requested");
147 return Err(StatusCode::UNAUTHORIZED);
148 }
149
150 if root_nodes.write().get(&nar_hash).is_some() {
153 Ok("")
154 } else {
155 Err(StatusCode::NOT_FOUND)
156 }
157}
158
159#[instrument(skip_all)]
160pub async fn put(
161 axum::extract::Path(nar_str): axum::extract::Path<String>,
162 axum::extract::State(AppState {
163 blob_service,
164 directory_service,
165 root_nodes,
166 ..
167 }): axum::extract::State<AppState>,
168 request: axum::extract::Request,
169) -> Result<&'static str, StatusCode> {
170 let (nar_hash_expected, compression_suffix) =
171 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
172
173 if !compression_suffix.is_empty() {
175 warn!(%compression_suffix, "invalid compression suffix requested");
176 return Err(StatusCode::UNAUTHORIZED);
177 }
178
179 let s = request.into_body().into_data_stream();
180
181 let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
182 warn!(err=%e, "failed to read request body");
183 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
184 }));
185
186 let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
188 blob_service.clone(),
189 directory_service.clone(),
190 &mut r,
191 &None,
192 )
193 .await
194 .map_err(io::Error::other)
195 .map_err(|e| {
196 warn!(err=%e, "failed to ingest nar");
197 StatusCode::INTERNAL_SERVER_ERROR
198 })?;
199
200 let s = Span::current();
201 s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
202 s.record("nar_size", nar_size);
203
204 if nar_hash_expected != nar_hash_actual {
205 warn!(
206 nar_hash.expected = nixbase32::encode(&nar_hash_expected),
207 nar_hash.actual = nixbase32::encode(&nar_hash_actual),
208 "nar hash mismatch"
209 );
210 return Err(StatusCode::BAD_REQUEST);
211 }
212
213 root_nodes.write().put(nar_hash_actual, root_node);
216
217 Ok("")
218}
219
220#[cfg(test)]
221mod tests {
222 use std::{
223 num::NonZero,
224 sync::{Arc, LazyLock},
225 };
226
227 use axum::{Router, http::Method};
228 use bytes::Bytes;
229 use data_encoding::BASE64URL_NOPAD;
230 use nix_compat::nixbase32;
231 use sha2::Digest;
232 use snix_castore::{
233 blobservice::{BlobService, MemoryBlobService},
234 directoryservice::{DirectoryService, MemoryDirectoryService},
235 fixtures::HELLOWORLD_BLOB_DIGEST,
236 };
237 use snix_store::{
238 fixtures::{
239 CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
240 NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
241 },
242 pathinfoservice::{MemoryPathInfoService, PathInfoService},
243 };
244 use tracing_test::traced_test;
245
246 use crate::AppState;
247
248 pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
249 use prost::Message;
250 BASE64URL_NOPAD.encode(
251 &snix_castore::proto::Entry::from_name_and_node(
252 "".into(),
253 CASTORE_NODE_SYMLINK.clone(),
254 )
255 .encode_to_vec(),
256 )
257 });
258
259 fn gen_server(
261 router: axum::Router<AppState>,
262 ) -> (
263 axum_test::TestServer,
264 impl BlobService,
265 impl DirectoryService,
266 impl PathInfoService,
267 ) {
268 let blob_service = Arc::new(MemoryBlobService::default());
269 let directory_service = Arc::new(MemoryDirectoryService::default());
270 let path_info_service = Arc::new(MemoryPathInfoService::default());
271
272 let app = router.with_state(AppState::new(
273 blob_service.clone(),
274 directory_service.clone(),
275 path_info_service.clone(),
276 NonZero::new(100).unwrap(),
277 ));
278
279 (
280 axum_test::TestServer::new(app).unwrap(),
281 blob_service,
282 directory_service,
283 path_info_service,
284 )
285 }
286
287 #[traced_test]
288 #[tokio::test]
289 async fn test_get_head() {
290 let (server, _blob_service, _directory_service, _path_info_service) =
291 gen_server(Router::new().route(
292 "/nar/snix-castore/:root_node_enc",
293 axum::routing::get(super::get_head),
294 ));
295
296 server
298 .method(Method::HEAD, "/nar/snix-castore/")
299 .expect_failure()
300 .await
301 .assert_status_not_found();
302
303 let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
304 let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
305
306 server
308 .method(Method::HEAD, valid_url)
309 .expect_failure()
310 .await
311 .assert_status_bad_request();
312
313 let invalid_url = {
314 use prost::Message;
315 let n = snix_castore::proto::Entry {
316 entry: Some(snix_castore::proto::entry::Entry::Directory(
317 snix_castore::proto::DirectoryEntry {
318 name: "".into(),
319 digest: "invalid b64".into(),
320 size: 1,
321 },
322 )),
323 };
324 &format!(
325 "/nar/snix-castore/{}",
326 BASE64URL_NOPAD.encode(&n.encode_to_vec())
327 )
328 };
329
330 server
332 .method(Method::HEAD, invalid_url)
333 .add_query_params(qps)
334 .expect_failure()
335 .await
336 .assert_status_not_found();
337
338 server
340 .method(Method::HEAD, valid_url)
341 .add_query_params(qps)
342 .expect_success()
343 .await;
344
345 assert_eq!(
347 NAR_CONTENTS_SYMLINK.as_slice(),
348 server
349 .get(valid_url)
350 .add_query_params(qps)
351 .expect_success()
352 .await
353 .into_bytes(),
354 "Expected to get back NAR_CONTENTS_SYMLINK"
355 )
356 }
357
358 #[traced_test]
361 #[tokio::test]
362 async fn test_put_wrong_narhash() {
363 let (server, _blob_service, _directory_service, _path_info_service) =
364 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
365
366 server
367 .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
368 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
369 .expect_failure()
370 .await;
371 }
372
373 #[traced_test]
375 #[tokio::test]
376 async fn test_put_with_compression_fail() {
377 let (server, _blob_service, _directory_service, _path_info_service) =
378 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
379
380 let nar_sha256: [u8; 32] = sha2::Sha256::new_with_prefix(NAR_CONTENTS_SYMLINK.as_slice())
381 .finalize()
382 .into();
383
384 let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
385
386 server
387 .put(&nar_url)
388 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
389 .expect_failure()
390 .await
391 .assert_status_unauthorized();
392 }
393
394 #[traced_test]
396 #[tokio::test]
397 async fn test_put_success() {
398 let (server, blob_service, _directory_service, _path_info_service) =
399 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
400
401 let nar_sha256: [u8; 32] =
402 sha2::Sha256::new_with_prefix(NAR_CONTENTS_HELLOWORLD.as_slice())
403 .finalize()
404 .into();
405
406 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
407
408 server
409 .put(&nar_url)
410 .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
411 .expect_success()
412 .await;
413
414 assert!(
415 blob_service
416 .has(&HELLOWORLD_BLOB_DIGEST)
417 .await
418 .expect("blobservice")
419 )
420 }
421
422 #[traced_test]
426 #[tokio::test]
427 async fn test_put_success2() {
428 let (server, blob_service, directory_service, _path_info_service) =
429 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
430
431 let nar_sha256: [u8; 32] =
432 sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
433 .finalize()
434 .into();
435
436 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
437
438 server
439 .put(&nar_url)
440 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
441 .expect_success()
442 .await;
443
444 let mut buf = Vec::new();
445 snix_store::nar::write_nar(
446 &mut buf,
447 &CASTORE_NODE_COMPLICATED,
448 blob_service,
449 directory_service,
450 )
451 .await
452 .expect("write nar");
453
454 assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
455 }
456
457 #[traced_test]
459 #[tokio::test]
460 async fn test_put_root_nodes() {
461 let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
462 Router::new()
463 .route("/nar/:nar_str", axum::routing::put(super::put))
464 .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
465 );
466
467 let nar_sha256: [u8; 32] =
468 sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
469 .finalize()
470 .into();
471
472 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
473
474 server
476 .put(&nar_url)
477 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
478 .expect_success()
479 .await;
480
481 server.method(Method::HEAD, &nar_url).expect_success().await;
483 }
484}