1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{headers::Range, TypedHeader};
5use axum_range::{KnownSize, Ranged};
6use bytes::Bytes;
7use data_encoding::BASE64URL_NOPAD;
8use futures::TryStreamExt;
9use nix_compat::{nix_http, nixbase32};
10use serde::Deserialize;
11use snix_store::nar::ingest_nar_and_hash;
12use std::io;
13use tokio_util::io::ReaderStream;
14use tracing::{instrument, warn, Span};
15
16use crate::AppState;
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct GetNARParams {
20 #[serde(rename = "narsize")]
21 nar_size: Option<u64>,
22}
23
24#[instrument(skip_all)]
25pub async fn get_head(
26 method: axum::http::Method,
27 ranges: Option<TypedHeader<Range>>,
28 axum::extract::Path(root_node_enc): axum::extract::Path<String>,
29 axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
30 axum::extract::State(AppState {
31 blob_service,
32 directory_service,
33 ..
34 }): axum::extract::State<AppState>,
35) -> Result<impl axum::response::IntoResponse, StatusCode> {
36 use prost::Message;
37 let nar_size = nar_size.ok_or_else(|| {
40 warn!("no nar_size parameter set");
41 StatusCode::BAD_REQUEST
42 })?;
43
44 let root_node_proto = BASE64URL_NOPAD
46 .decode(root_node_enc.as_bytes())
47 .map_err(|e| {
48 warn!(err=%e, "unable to decode root node b64");
49 StatusCode::NOT_FOUND
50 })?;
51
52 if root_node_proto.len() > 4096 {
54 warn!("rejected too large root node");
55 return Err(StatusCode::BAD_REQUEST);
56 }
57
58 let root_node: snix_castore::proto::Entry = Message::decode(Bytes::from(root_node_proto))
60 .map_err(|e| {
61 warn!(err=%e, "unable to decode root node proto");
62 StatusCode::NOT_FOUND
63 })?;
64
65 let root_node = root_node.try_into_anonymous_node().map_err(|e| {
66 warn!(err=%e, "root node validation failed");
67 StatusCode::NOT_FOUND
68 })?;
69
70 Ok((
71 [
73 ("cache-control", "max-age=31536000, immutable"),
74 ("content-type", nix_http::MIME_TYPE_NAR),
75 ],
76 if method == axum::http::Method::HEAD {
77 Response::builder()
80 .header("content-length", nar_size)
81 .body(Body::empty())
82 .unwrap()
83 } else if let Some(TypedHeader(ranges)) = ranges {
84 let r =
86 snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
87 .await
88 .map_err(|e| {
89 warn!(err=%e, "failed to construct seekable nar reader");
90 StatusCode::INTERNAL_SERVER_ERROR
91 })?;
92
93 if r.stream_len() != nar_size {
95 warn!(
96 actual_nar_size = r.stream_len(),
97 supplied_nar_size = nar_size,
98 "wrong nar size supplied"
99 );
100 return Err(StatusCode::BAD_REQUEST);
101 }
102 Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
103 } else {
104 let (w, r) = tokio::io::duplex(1024 * 8);
107
108 tokio::spawn(async move {
110 if let Err(e) =
111 snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
112 {
113 warn!(err=%e, "failed to write out NAR");
114 }
115 });
116
117 Response::builder()
118 .header("content-length", nar_size)
119 .body(Body::from_stream(ReaderStream::new(r)))
120 .unwrap()
121 },
122 ))
123}
124
125#[instrument(skip_all, fields(nar_str))]
133pub async fn head_root_nodes(
134 axum::extract::Path(nar_str): axum::extract::Path<String>,
135 axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
136) -> Result<impl axum::response::IntoResponse, StatusCode> {
137 let (nar_hash, compression_suffix) =
138 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
139
140 if !compression_suffix.is_empty() {
142 warn!(%compression_suffix, "invalid compression suffix requested");
143 return Err(StatusCode::UNAUTHORIZED);
144 }
145
146 if root_nodes.write().get(&nar_hash).is_some() {
149 Ok("")
150 } else {
151 Err(StatusCode::NOT_FOUND)
152 }
153}
154
155#[instrument(skip_all)]
156pub async fn put(
157 axum::extract::Path(nar_str): axum::extract::Path<String>,
158 axum::extract::State(AppState {
159 blob_service,
160 directory_service,
161 root_nodes,
162 ..
163 }): axum::extract::State<AppState>,
164 request: axum::extract::Request,
165) -> Result<&'static str, StatusCode> {
166 let (nar_hash_expected, compression_suffix) =
167 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
168
169 if !compression_suffix.is_empty() {
171 warn!(%compression_suffix, "invalid compression suffix requested");
172 return Err(StatusCode::UNAUTHORIZED);
173 }
174
175 let s = request.into_body().into_data_stream();
176
177 let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
178 warn!(err=%e, "failed to read request body");
179 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
180 }));
181
182 let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
184 blob_service.clone(),
185 directory_service.clone(),
186 &mut r,
187 &None,
188 )
189 .await
190 .map_err(io::Error::other)
191 .map_err(|e| {
192 warn!(err=%e, "failed to ingest nar");
193 StatusCode::INTERNAL_SERVER_ERROR
194 })?;
195
196 let s = Span::current();
197 s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
198 s.record("nar_size", nar_size);
199
200 if nar_hash_expected != nar_hash_actual {
201 warn!(
202 nar_hash.expected = nixbase32::encode(&nar_hash_expected),
203 nar_hash.actual = nixbase32::encode(&nar_hash_actual),
204 "nar hash mismatch"
205 );
206 return Err(StatusCode::BAD_REQUEST);
207 }
208
209 root_nodes.write().put(nar_hash_actual, root_node);
212
213 Ok("")
214}
215
216#[cfg(test)]
217mod tests {
218 use std::{
219 num::NonZero,
220 sync::{Arc, LazyLock},
221 };
222
223 use axum::{http::Method, Router};
224 use bytes::Bytes;
225 use data_encoding::BASE64URL_NOPAD;
226 use nix_compat::nixbase32;
227 use sha2::Digest;
228 use snix_castore::{
229 blobservice::{BlobService, MemoryBlobService},
230 directoryservice::{DirectoryService, MemoryDirectoryService},
231 fixtures::HELLOWORLD_BLOB_DIGEST,
232 };
233 use snix_store::{
234 fixtures::{
235 CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
236 NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
237 },
238 pathinfoservice::{MemoryPathInfoService, PathInfoService},
239 };
240 use tracing_test::traced_test;
241
242 use crate::AppState;
243
244 pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
245 use prost::Message;
246 BASE64URL_NOPAD.encode(
247 &snix_castore::proto::Entry::from_name_and_node(
248 "".into(),
249 CASTORE_NODE_SYMLINK.clone(),
250 )
251 .encode_to_vec(),
252 )
253 });
254
255 fn gen_server(
257 router: axum::Router<AppState>,
258 ) -> (
259 axum_test::TestServer,
260 impl BlobService,
261 impl DirectoryService,
262 impl PathInfoService,
263 ) {
264 let blob_service = Arc::new(MemoryBlobService::default());
265 let directory_service = Arc::new(MemoryDirectoryService::default());
266 let path_info_service = Arc::new(MemoryPathInfoService::default());
267
268 let app = router.with_state(AppState::new(
269 blob_service.clone(),
270 directory_service.clone(),
271 path_info_service.clone(),
272 NonZero::new(100).unwrap(),
273 ));
274
275 (
276 axum_test::TestServer::new(app).unwrap(),
277 blob_service,
278 directory_service,
279 path_info_service,
280 )
281 }
282
283 #[traced_test]
284 #[tokio::test]
285 async fn test_get_head() {
286 let (server, _blob_service, _directory_service, _path_info_service) =
287 gen_server(Router::new().route(
288 "/nar/snix-castore/:root_node_enc",
289 axum::routing::get(super::get_head),
290 ));
291
292 server
294 .method(Method::HEAD, "/nar/snix-castore/")
295 .expect_failure()
296 .await
297 .assert_status_not_found();
298
299 let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
300 let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
301
302 server
304 .method(Method::HEAD, valid_url)
305 .expect_failure()
306 .await
307 .assert_status_bad_request();
308
309 let invalid_url = {
310 use prost::Message;
311 let n = snix_castore::proto::Entry {
312 entry: Some(snix_castore::proto::entry::Entry::Directory(
313 snix_castore::proto::DirectoryEntry {
314 name: "".into(),
315 digest: "invalid b64".into(),
316 size: 1,
317 },
318 )),
319 };
320 &format!(
321 "/nar/snix-castore/{}",
322 BASE64URL_NOPAD.encode(&n.encode_to_vec())
323 )
324 };
325
326 server
328 .method(Method::HEAD, invalid_url)
329 .add_query_params(qps)
330 .expect_failure()
331 .await
332 .assert_status_not_found();
333
334 server
336 .method(Method::HEAD, valid_url)
337 .add_query_params(qps)
338 .expect_success()
339 .await;
340
341 assert_eq!(
343 NAR_CONTENTS_SYMLINK.as_slice(),
344 server
345 .get(valid_url)
346 .add_query_params(qps)
347 .expect_success()
348 .await
349 .into_bytes(),
350 "Expected to get back NAR_CONTENTS_SYMLINK"
351 )
352 }
353
354 #[traced_test]
357 #[tokio::test]
358 async fn test_put_wrong_narhash() {
359 let (server, _blob_service, _directory_service, _path_info_service) =
360 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
361
362 server
363 .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
364 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
365 .expect_failure()
366 .await;
367 }
368
369 #[traced_test]
371 #[tokio::test]
372 async fn test_put_with_compression_fail() {
373 let (server, _blob_service, _directory_service, _path_info_service) =
374 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
375
376 let nar_sha256: [u8; 32] = sha2::Sha256::new_with_prefix(NAR_CONTENTS_SYMLINK.as_slice())
377 .finalize()
378 .into();
379
380 let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
381
382 server
383 .put(&nar_url)
384 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
385 .expect_failure()
386 .await
387 .assert_status_unauthorized();
388 }
389
390 #[traced_test]
392 #[tokio::test]
393 async fn test_put_success() {
394 let (server, blob_service, _directory_service, _path_info_service) =
395 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
396
397 let nar_sha256: [u8; 32] =
398 sha2::Sha256::new_with_prefix(NAR_CONTENTS_HELLOWORLD.as_slice())
399 .finalize()
400 .into();
401
402 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
403
404 server
405 .put(&nar_url)
406 .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
407 .expect_success()
408 .await;
409
410 assert!(blob_service
411 .has(&HELLOWORLD_BLOB_DIGEST)
412 .await
413 .expect("blobservice"))
414 }
415
416 #[traced_test]
420 #[tokio::test]
421 async fn test_put_success2() {
422 let (server, blob_service, directory_service, _path_info_service) =
423 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
424
425 let nar_sha256: [u8; 32] =
426 sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
427 .finalize()
428 .into();
429
430 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
431
432 server
433 .put(&nar_url)
434 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
435 .expect_success()
436 .await;
437
438 let mut buf = Vec::new();
439 snix_store::nar::write_nar(
440 &mut buf,
441 &CASTORE_NODE_COMPLICATED,
442 blob_service,
443 directory_service,
444 )
445 .await
446 .expect("write nar");
447
448 assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
449 }
450
451 #[traced_test]
453 #[tokio::test]
454 async fn test_put_root_nodes() {
455 let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
456 Router::new()
457 .route("/nar/:nar_str", axum::routing::put(super::put))
458 .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
459 );
460
461 let nar_sha256: [u8; 32] =
462 sha2::Sha256::new_with_prefix(NAR_CONTENTS_COMPLICATED.as_slice())
463 .finalize()
464 .into();
465
466 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
467
468 server
470 .put(&nar_url)
471 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
472 .expect_success()
473 .await;
474
475 server.method(Method::HEAD, &nar_url).expect_success().await;
477 }
478}