1use axum::extract::Query;
2use axum::http::{Response, StatusCode};
3use axum::{body::Body, response::IntoResponse};
4use axum_extra::{TypedHeader, headers::Range};
5use axum_range::{KnownSize, Ranged};
6use futures::TryStreamExt;
7use nix_compat::{nix_http, nixbase32};
8use serde::Deserialize;
9use snix_castore::proto::parse_urlsafe_proto;
10use snix_store::nar::ingest_nar_and_hash;
11use std::io;
12use tokio_util::io::ReaderStream;
13use tracing::{Span, instrument, warn};
14
15use crate::AppState;
16
17#[derive(Debug, Deserialize)]
18pub(crate) struct GetNARParams {
19 #[serde(rename = "narsize")]
20 nar_size: Option<u64>,
21}
22
23#[instrument(skip_all)]
24pub async fn get_head(
25 method: axum::http::Method,
26 ranges: Option<TypedHeader<Range>>,
27 axum::extract::Path(root_node_enc): axum::extract::Path<String>,
28 axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
29 axum::extract::State(AppState {
30 blob_service,
31 directory_service,
32 ..
33 }): axum::extract::State<AppState>,
34) -> Result<impl axum::response::IntoResponse, StatusCode> {
35 let nar_size = nar_size.ok_or_else(|| {
38 warn!("no nar_size parameter set");
39 StatusCode::BAD_REQUEST
40 })?;
41
42 let root_node = parse_urlsafe_proto(root_node_enc).ok_or_else(|| {
44 warn!("unable to parse castore-infused NAR path");
45 StatusCode::NOT_FOUND
46 })?;
47
48 Ok((
49 [
51 ("cache-control", "max-age=31536000, immutable"),
52 ("content-type", nix_http::MIME_TYPE_NAR),
53 ],
54 if method == axum::http::Method::HEAD {
55 Response::builder()
60 .header("content-length", nar_size)
61 .body(Body::empty())
62 .unwrap()
63 } else if let Some(TypedHeader(ranges)) = ranges {
64 let r =
66 snix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
67 .await
68 .map_err(|e| {
69 warn!(err=%e, "failed to construct seekable nar reader");
70 StatusCode::INTERNAL_SERVER_ERROR
71 })?;
72
73 if r.stream_len() != nar_size {
75 warn!(
76 actual_nar_size = r.stream_len(),
77 supplied_nar_size = nar_size,
78 "wrong nar size supplied"
79 );
80 return Err(StatusCode::BAD_REQUEST);
81 }
82 Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
83 } else {
84 let (w, r) = tokio::io::duplex(1024 * 8);
87
88 tokio::spawn(async move {
90 if let Err(e) =
91 snix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
92 {
93 warn!(err=%e, "failed to write out NAR");
94 }
95 });
96
97 Response::builder()
98 .header("content-length", nar_size)
101 .body(Body::from_stream(ReaderStream::new(r)))
102 .unwrap()
103 },
104 ))
105}
106
107#[instrument(skip_all, fields(nar_str))]
115pub async fn head_root_nodes(
116 axum::extract::Path(nar_str): axum::extract::Path<String>,
117 axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
118) -> Result<impl axum::response::IntoResponse, StatusCode> {
119 let (nar_hash, compression_suffix) =
120 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
121
122 if !compression_suffix.is_empty() {
124 warn!(%compression_suffix, "invalid compression suffix requested");
125 return Err(StatusCode::UNAUTHORIZED);
126 }
127
128 if root_nodes.write().get(&nar_hash).is_some() {
131 Ok("")
132 } else {
133 Err(StatusCode::NOT_FOUND)
134 }
135}
136
137#[instrument(skip_all)]
138pub async fn put(
139 axum::extract::Path(nar_str): axum::extract::Path<String>,
140 axum::extract::State(AppState {
141 blob_service,
142 directory_service,
143 root_nodes,
144 ..
145 }): axum::extract::State<AppState>,
146 request: axum::extract::Request,
147) -> Result<&'static str, StatusCode> {
148 let (nar_hash_expected, compression_suffix) =
149 nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
150
151 if !compression_suffix.is_empty() {
153 warn!(%compression_suffix, "invalid compression suffix requested");
154 return Err(StatusCode::UNAUTHORIZED);
155 }
156
157 let s = request.into_body().into_data_stream();
158
159 let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
160 warn!(err=%e, "failed to read request body");
161 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
162 }));
163
164 let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
166 blob_service.clone(),
167 directory_service.clone(),
168 &mut r,
169 &None,
170 )
171 .await
172 .map_err(io::Error::other)
173 .map_err(|e| {
174 warn!(err=%e, "failed to ingest nar");
175 StatusCode::INTERNAL_SERVER_ERROR
176 })?;
177
178 let s = Span::current();
179 s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
180 s.record("nar_size", nar_size);
181
182 if nar_hash_expected != nar_hash_actual {
183 warn!(
184 nar_hash.expected = nixbase32::encode(&nar_hash_expected),
185 nar_hash.actual = nixbase32::encode(&nar_hash_actual),
186 "nar hash mismatch"
187 );
188 return Err(StatusCode::BAD_REQUEST);
189 }
190
191 root_nodes.write().put(nar_hash_actual, root_node);
194
195 Ok("")
196}
197
198#[cfg(test)]
199mod tests {
200 use std::{
201 num::NonZero,
202 sync::{Arc, LazyLock},
203 };
204
205 use axum::{Router, http::Method};
206 use bytes::Bytes;
207 use data_encoding::BASE64URL_NOPAD;
208 use nix_compat::nixbase32;
209 use sha2::Digest;
210 use snix_castore::{
211 blobservice::{BlobService, MemoryBlobService},
212 directoryservice::DirectoryService,
213 fixtures::HELLOWORLD_BLOB_DIGEST,
214 utils::gen_test_directory_service,
215 };
216 use snix_store::{
217 fixtures::{
218 CASTORE_NODE_COMPLICATED, CASTORE_NODE_SYMLINK, NAR_CONTENTS_COMPLICATED,
219 NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK,
220 },
221 pathinfoservice::PathInfoService,
222 utils::gen_test_pathinfo_service,
223 };
224 use tracing_test::traced_test;
225
226 use crate::AppState;
227
228 pub static NAR_STR_SYMLINK: LazyLock<String> = LazyLock::new(|| {
229 use prost::Message;
230 BASE64URL_NOPAD.encode(
231 &snix_castore::proto::Entry::from_name_and_node(
232 "".into(),
233 CASTORE_NODE_SYMLINK.clone(),
234 )
235 .encode_to_vec(),
236 )
237 });
238
239 fn gen_server(
241 router: axum::Router<AppState>,
242 ) -> (
243 axum_test::TestServer,
244 impl BlobService,
245 impl DirectoryService,
246 impl PathInfoService,
247 ) {
248 let blob_service = Arc::new(MemoryBlobService::default());
249 let directory_service = Arc::new(gen_test_directory_service());
250 let path_info_service = Arc::new(gen_test_pathinfo_service());
251
252 let app = router.with_state(AppState::new(
253 blob_service.clone(),
254 directory_service.clone(),
255 path_info_service.clone(),
256 NonZero::new(100).unwrap(),
257 ));
258
259 (
260 axum_test::TestServer::new(app).unwrap(),
261 blob_service,
262 directory_service,
263 path_info_service,
264 )
265 }
266
267 #[traced_test]
268 #[tokio::test]
269 async fn test_get_head() {
270 let (server, _blob_service, _directory_service, _path_info_service) =
271 gen_server(Router::new().route(
272 "/nar/snix-castore/:root_node_enc",
273 axum::routing::get(super::get_head),
274 ));
275
276 server
278 .method(Method::HEAD, "/nar/snix-castore/")
279 .expect_failure()
280 .await
281 .assert_status_not_found();
282
283 let valid_url = &format!("/nar/snix-castore/{}", &*NAR_STR_SYMLINK);
284 let qps = &[("narsize", &NAR_CONTENTS_SYMLINK.len().to_string())];
285
286 server
288 .method(Method::HEAD, valid_url)
289 .expect_failure()
290 .await
291 .assert_status_bad_request();
292
293 let invalid_url = {
294 use prost::Message;
295 let n = snix_castore::proto::Entry {
296 entry: Some(snix_castore::proto::entry::Entry::Directory(
297 snix_castore::proto::DirectoryEntry {
298 name: "".into(),
299 digest: "invalid b64".into(),
300 size: 1,
301 },
302 )),
303 };
304 &format!(
305 "/nar/snix-castore/{}",
306 BASE64URL_NOPAD.encode(&n.encode_to_vec())
307 )
308 };
309
310 server
312 .method(Method::HEAD, invalid_url)
313 .add_query_params(qps)
314 .expect_failure()
315 .await
316 .assert_status_not_found();
317
318 server
320 .method(Method::HEAD, valid_url)
321 .add_query_params(qps)
322 .expect_success()
323 .await;
324
325 assert_eq!(
327 NAR_CONTENTS_SYMLINK.as_slice(),
328 server
329 .get(valid_url)
330 .add_query_params(qps)
331 .expect_success()
332 .await
333 .into_bytes(),
334 "Expected to get back NAR_CONTENTS_SYMLINK"
335 )
336 }
337
338 #[traced_test]
341 #[tokio::test]
342 async fn test_put_wrong_narhash() {
343 let (server, _blob_service, _directory_service, _path_info_service) =
344 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
345
346 server
347 .put("/nar/0000000000000000000000000000000000000000000000000000.nar")
348 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
349 .expect_failure()
350 .await;
351 }
352
353 #[traced_test]
355 #[tokio::test]
356 async fn test_put_with_compression_fail() {
357 let (server, _blob_service, _directory_service, _path_info_service) =
358 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
359
360 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_SYMLINK.as_slice()).into();
361
362 let nar_url = format!("/nar/{}.nar.zst", nixbase32::encode(&nar_sha256));
363
364 server
365 .put(&nar_url)
366 .bytes(Bytes::from_static(&NAR_CONTENTS_SYMLINK))
367 .expect_failure()
368 .await
369 .assert_status_unauthorized();
370 }
371
372 #[traced_test]
374 #[tokio::test]
375 async fn test_put_success() {
376 let (server, blob_service, _directory_service, _path_info_service) =
377 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
378
379 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_HELLOWORLD.as_slice()).into();
380 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
381
382 server
383 .put(&nar_url)
384 .bytes(Bytes::from_static(&NAR_CONTENTS_HELLOWORLD))
385 .expect_success()
386 .await;
387
388 assert!(
389 blob_service
390 .has(&HELLOWORLD_BLOB_DIGEST)
391 .await
392 .expect("blobservice")
393 )
394 }
395
396 #[traced_test]
400 #[tokio::test]
401 async fn test_put_success2() {
402 let (server, blob_service, directory_service, _path_info_service) =
403 gen_server(Router::new().route("/nar/:nar_str", axum::routing::put(super::put)));
404
405 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
406 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
407
408 server
409 .put(&nar_url)
410 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
411 .expect_success()
412 .await;
413
414 let mut buf = Vec::new();
415 snix_store::nar::write_nar(
416 &mut buf,
417 &CASTORE_NODE_COMPLICATED,
418 blob_service,
419 directory_service,
420 )
421 .await
422 .expect("write nar");
423
424 assert_eq!(NAR_CONTENTS_COMPLICATED, buf[..]);
425 }
426
427 #[traced_test]
429 #[tokio::test]
430 async fn test_put_root_nodes() {
431 let (server, _blob_service, _directory_servicee, _path_info_service) = gen_server(
432 Router::new()
433 .route("/nar/:nar_str", axum::routing::put(super::put))
434 .route("/nar/:nar_str", axum::routing::get(super::head_root_nodes)),
435 );
436
437 let nar_sha256: [u8; 32] = sha2::Sha256::digest(NAR_CONTENTS_COMPLICATED.as_slice()).into();
438 let nar_url = format!("/nar/{}.nar", nixbase32::encode(&nar_sha256));
439
440 server
442 .put(&nar_url)
443 .bytes(Bytes::from_static(&NAR_CONTENTS_COMPLICATED))
444 .expect_success()
445 .await;
446
447 server.method(Method::HEAD, &nar_url).expect_success().await;
449 }
450}