1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use futures::TryStreamExt;
7use nix_compat::{nix_http, nixbase32};
8use serde::Deserialize;
9use snix_castore::proto::parse_urlsafe_proto;
10use snix_store::nar::ingest_nar_and_hash;
11use std::io;
12use tokio_util::io::ReaderStream;
13use tracing::{Span, instrument, warn};
14
15use crate::AppState;
16
17#[derive(Debug, Deserialize)]
18pub(crate) struct GetNARParams {
19 #[serde(rename = "narsize")]
20 nar_size: Option<u64>,
21}
22
23#[instrument(skip_all)]
24pub async fn get_head(
25 method: axum::http::Method,
26 ranges: Option<TypedHeader<Range>>,
27 axum::extract::Path(root_node_enc): axum::extract::Path<String>,
28 axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
29 axum::extract::State(AppState {
30 blob_service,
31 directory_service,
32 ..
33 }): axum::extract::State<AppState>,
34) -> Result<impl axum::response::IntoResponse, StatusCode> {
35 let nar_size = nar_size.ok_or_else(|| {
38 warn!("no nar_size parameter set");
39 StatusCode::BAD_REQUEST
40 })?;
41
42 let root_node = parse_urlsafe_proto(root_node_enc).ok_or_else(|| {
44 warn!("unable to parse castore-infused NAR path");
45 StatusCode::NOT_FOUND
46 })?;
47
48 Ok((
49 [
51 ("cache-control", "max-age=31536000, immutable"),
52 ("content-type", nix_http::MIME_TYPE_NAR),
53 ],
54 if method == axum::http::Method::HEAD {
55 Response::builder()
60 .header("content-length", nar_size)
61 .body(Body::empty())
62 .unwrap()
63 } else if let Some(TypedHeader(ranges)) = ranges {
64 let r =
66 snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
67 .await
68 .map_err(|e| {
69 warn!(err=%e, "failed to construct seekable nar reader");
70 StatusCode::INTERNAL_SERVER_ERROR
71 })?;
72
73 if r.stream_len() != nar_size {
75 warn!(
76 actual_nar_size = r.stream_len(),
77 supplied_nar_size = nar_size,
78 "wrong nar size supplied"
79 );
80 return Err(StatusCode::BAD_REQUEST);
81 }
82 Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
83 } else {
84 let (w, r) = tokio::io::duplex(1024 * 8);
87
88 tokio::spawn(async move {
90 if let Err(e) =
91 snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
92 {
93 warn!(err=%e, "failed to write out NAR");
94 }
95 });
96
97 Response::builder()
98 .header("content-length", nar_size)
101 .body(Body::from_stream(ReaderStream::new(r)))
102 .unwrap()
103 },
104 ))
105}
106
107#[instrument(skip_all, fields(nar_str))]
115pub async fn head_root_nodes(
116 axum::extract::Path(nar_str): axum::extract::Path<String>,
117 axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
118) -> Result<impl axum::response::IntoResponse, StatusCode> {
119 let (nar_hash, compression_suffix) =
120 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
121
122 if !compression_suffix.is_empty() {
124 warn!(%compression_suffix, "invalid compression suffix requested");
125 return Err(StatusCode::UNAUTHORIZED);
126 }
127
128 if root_nodes.write().get(&nar_hash).is_some() {
131 Ok("")
132 } else {
133 Err(StatusCode::NOT_FOUND)
134 }
135}
136
137#[instrument(skip_all)]
138pub async fn put(
139 axum::extract::Path(nar_str): axum::extract::Path<String>,
140 axum::extract::State(AppState {
141 blob_service,
142 directory_service,
143 root_nodes,
144 ..
145 }): axum::extract::State<AppState>,
146 request: axum::extract::Request,
147) -> Result<&'static str, StatusCode> {
148 let (nar_hash_expected, compression_suffix) =
149 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
150
151 if !compression_suffix.is_empty() {
153 warn!(%compression_suffix, "invalid compression suffix requested");
154 return Err(StatusCode::UNAUTHORIZED);
155 }
156
157 let s = request.into_body().into_data_stream();
158
159 let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
160 warn!(err=%e, "failed to read request body");
161 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
162 }));
163
164 let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
166 blob_service.clone(),
167 directory_service.clone(),
168 &mut r,
169 &None,
170 )
171 .await
172 .map_err(io::Error::other)
173 .map_err(|e| {
174 warn!(err=%e, "failed to ingest nar");
175 StatusCode::INTERNAL_SERVER_ERROR
176 })?;
177
178 let s = Span::current();
179 s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
180 s.record("nar_size", nar_size);
181
182 if nar_hash_expected != nar_hash_actual {
183 warn!(
184 nar_hash.expected = nixbase32::encode(&nar_hash_expected),
185 nar_hash.actual = nixbase32::encode(&nar_hash_actual),
186 "nar hash mismatch"
187 );
188 return Err(StatusCode::BAD_REQUEST);
189 }
190
191 root_nodes.write().put(nar_hash_actual, root_node);
194
195 Ok("")
196}
197
198#[cfg(test)]
199mod tests {
200 use std::{
201 num::NonZero,
202 sync::{Arc, LazyLock},
203 };
204
205 use axum::{Router, http::Method};
206 use bytes::Bytes;
207 use data_encoding::BASE64URL_NOPAD;
208 use nix_compat::nixbase32;
209 use sha2::Digest;
210 use snix_castore::{
211 blobservice::{BlobService, MemoryBlobService},
212 directoryservice::DirectoryService,
213 fixtures::HELLOWORLD_BLOB_DIGEST,
214 utils::gen_test_directory_service,
215 };
216 use snix_store::{
217 fixtures::{
218 CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
219 NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
220 },
221 pathinfoservice::PathInfoService,
222 utils::gen_test_pathinfo_service,
223 };
224 use tracing_test::traced_test;
225
226 use crate::AppState;
227
228 pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
229 use prost::Message;
230 BASE64URL_NOPAD.encode(
231 &snix_castore::proto::Entry::from_name_and_node(
232 "".into(),
233 CASTORE_NODE_SYMLINK.clone(),
234 )
235 .encode_to_vec(),
236 )
237 });
238
239 fn gen_server(
242 router: axum::Router<AppState>,
243 ) -> (
244 axum_test::TestServer,
245 impl BlobService,
246 impl DirectoryService,
247 impl PathInfoService,
248 ) {
249 let blob_service = Arc::new(MemoryBlobService::default());
250 let directory_service = Arc::new(gen_test_directory_service());
251 let path_info_service = Arc::new(gen_test_pathinfo_service());
252
253 let app = router.with_state(AppState::new(
254 blob_service.clone(),
255 directory_service.clone(),
256 path_info_service.clone(),
257 NonZero::new(100).unwrap(),
258 ));
259
260 (
261 axum_test::TestServer::new(app),
262 blob_service,
263 directory_service,
264 path_info_service,
265 )
266 }
267
268 #[traced_test]
269 #[tokio::test]
270 async fn test_get_head() {
271 let (server, _blob_service, _directory_service, _path_info_service) =
272 gen_server(Router::new().route(
273 "/nar/snix-castore/{root_node_enc}",
274 axum::routing::get(super::get_head),
275 ));
276
277 server
279 .method(Method::HEAD, "/nar/snix-castore/")
280 .expect_failure()
281 .await
282 .assert_status_not_found();
283
284 let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
285 let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
286
287 server
289 .method(Method::HEAD, valid_url)
290 .expect_failure()
291 .await
292 .assert_status_bad_request();
293
294 let invalid_url = {
295 use prost::Message;
296 let n = snix_castore::proto::Entry {
297 entry: Some(snix_castore::proto::entry::Entry::Directory(
298 snix_castore::proto::DirectoryEntry {
299 name: "".into(),
300 digest: "invalid b64".into(),
301 size: 1,
302 },
303 )),
304 };
305 &format!(
306 "/nar/snix-castore/{}",
307 BASE64URL_NOPAD.encode(&n.encode_to_vec())
308 )
309 };
310
311 server
313 .method(Method::HEAD, invalid_url)
314 .add_query_params(qps)
315 .expect_failure()
316 .await
317 .assert_status_not_found();
318
319 server
321 .method(Method::HEAD, valid_url)
322 .add_query_params(qps)
323 .expect_success()
324 .await;
325
326 assert_eq!(
328 NAR_CONTENTS_SYMLINK.as_slice(),
329 server
330 .get(valid_url)
331 .add_query_params(qps)
332 .expect_success()
333 .await
334 .into_bytes(),
335 "Expected to get back NAR_CONTENTS_SYMLINK"
336 )
337 }
338
339 #[traced_test]
342 #[tokio::test]
343 async fn test_put_wrong_narhash() {
344 let (server, _blob_service, _directory_service, _path_info_service) =
345 gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
346
347 server
348 .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
349 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
350 .expect_failure()
351 .await;
352 }
353
354 #[traced_test]
356 #[tokio::test]
357 async fn test_put_with_compression_fail() {
358 let (server, _blob_service, _directory_service, _path_info_service) =
359 gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
360
361 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_SYMLINK.as_slice()).into();
362
363 let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
364
365 server
366 .put(&nar_url)
367 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
368 .expect_failure()
369 .await
370 .assert_status_unauthorized();
371 }
372
373 #[traced_test]
375 #[tokio::test]
376 async fn test_put_success() {
377 let (server, blob_service, _directory_service, _path_info_service) =
378 gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
379
380 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_HELLOWORLD.as_slice()).into();
381 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
382
383 server
384 .put(&nar_url)
385 .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
386 .expect_success()
387 .await;
388
389 assert!(
390 blob_service
391 .has(&HELLOWORLD_BLOB_DIGEST)
392 .await
393 .expect("blobservice")
394 )
395 }
396
397 #[traced_test]
401 #[tokio::test]
402 async fn test_put_success2() {
403 let (server, blob_service, directory_service, _path_info_service) =
404 gen_server(Router::new().route("/nar/{nar_str}", axum::routing::put(super::put)));
405
406 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
407 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
408
409 server
410 .put(&nar_url)
411 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
412 .expect_success()
413 .await;
414
415 let mut buf = Vec::new();
416 snix_store::nar::write_nar(
417 &mut buf,
418 &CASTORE_NODE_COMPLICATED,
419 blob_service,
420 directory_service,
421 )
422 .await
423 .expect("write nar");
424
425 assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
426 }
427
428 #[traced_test]
430 #[tokio::test]
431 async fn test_put_root_nodes() {
432 let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
433 Router::new()
434 .route("/nar/{nar_str}", axum::routing::put(super::put))
435 .route("/nar/{nar_str}", axum::routing::get(super::head_root_nodes)),
436 );
437
438 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
439 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
440
441 server
443 .put(&nar_url)
444 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
445 .expect_success()
446 .await;
447
448 server.method(Method::HEAD, &nar_url).expect_success().await;
450 }
451}