1use super::{PathInfo, PathInfoService};
2use crate::{
3 nar::{NarIngestionError, ingest_nar_and_hash},
4 pathinfoservice,
5};
6use futures::{TryStreamExt, stream::BoxStream};
7use nix_compat::{
8 narinfo::{self, NarInfo, Signature},
9 nixbase32,
10 nixhash::NixHash,
11 store_path::StorePath,
12};
13use reqwest::StatusCode;
14use snix_castore::composition::{CompositionContext, ServiceBuilder};
15use snix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
16use std::sync::Arc;
17use tokio::io::{self, AsyncRead};
18use tonic::async_trait;
19use tracing::{Span, instrument, warn};
20use url::Url;
21
22pub struct NixHTTPPathInfoService<BS, DS> {
38 instance_name: String,
39 base_url: url::Url,
40 http_client: reqwest_middleware::ClientWithMiddleware,
41
42 blob_service: BS,
43 directory_service: DS,
44
45 trusted_public_keys: Vec<narinfo::VerifyingKey>,
49}
50
51impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
52 pub fn try_build(
53 instance_name: String,
54 config: NixHTTPPathInfoServiceConfig,
55 blob_service: BS,
56 directory_service: DS,
57 ) -> Result<Self, Error> {
58 let mut trusted_public_keys = Vec::new();
59 for s in config.params.trusted_public_keys {
60 trusted_public_keys.push(
61 narinfo::VerifyingKey::parse(&s).map_err(|e| Error::ParseTrustedPublicKey(s, e))?,
62 )
63 }
64
65 Ok(Self {
66 instance_name,
67 base_url: config.base_url,
68 http_client: reqwest_middleware::ClientBuilder::new(
69 reqwest::Client::builder()
70 .user_agent(crate::USER_AGENT)
71 .build()
72 .map_err(reqwest_middleware::Error::Reqwest)?,
73 )
74 .with(snix_tracing::propagate::reqwest::tracing_middleware())
75 .build(),
76 blob_service,
77 directory_service,
78
79 trusted_public_keys,
80 })
81 }
82
83 #[instrument(level=tracing::Level::TRACE, skip_all,fields(path.digest=nixbase32::encode(&digest)),err)]
84 fn derive_narinfo_url(&self, digest: [u8; 20]) -> Result<Url, Error> {
85 let s = format!("{}.narinfo", nixbase32::encode(&digest));
86 self.base_url
87 .join(&s)
88 .map_err(|e| Error::JoinUrl(self.base_url.to_owned(), s.to_owned(), e))
89 }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum Error {
94 #[error("wrong arguments: {0}")]
95 WrongConfig(&'static str),
96 #[error("serde-qs error: {0}")]
97 SerdeQS(#[from] serde_qs::Error),
98 #[error("unable to parse pubkey {0}")]
99 ParseTrustedPublicKey(String, nix_compat::narinfo::VerifyingKeyError),
100
101 #[error("unable to join URL {0} with {1}")]
102 JoinUrl(Url, String, url::ParseError),
103 #[error("reqwest error")]
104 Reqwest(#[from] reqwest_middleware::Error),
105 #[error("unable to decode NARInfo response as string")]
106 DecodeBody(reqwest::Error),
107 #[error("unable to parse NARInfo")]
108 ParseNARInfo(nix_compat::narinfo::Error),
109 #[error("no valid signature found")]
110 NoValidSignature,
111 #[error("failed to request NAR, status {0}")]
112 FailedToRequestNAR(reqwest::StatusCode),
113 #[error("unsupported NAR compression: {0}")]
114 UnsupportedNARCompression(String),
115 #[error("failed to ingest NAR")]
116 IngestNAR(NarIngestionError),
117 #[error("NARSize mismatch, narinfo size {narinfo_size}, actual size {actual_size}")]
118 NARSizeMismatch { narinfo_size: u64, actual_size: u64 },
119 #[error("NARHash mismatch, narinfo NARHash {exp}, actual NARHash {act}",
120 exp = NixHash::Sha256(*.narinfo_nar_sha256),
121 act = NixHash::Sha256(*.actual_nar_sha256))]
122 NARHashMismatch {
123 narinfo_nar_sha256: [u8; 32],
124 actual_nar_sha256: [u8; 32],
125 },
126
127 #[error("put not supported")]
128 PutNotSupported,
129 #[error("list not supported")]
130 ListNotSupported,
131}
132
133#[async_trait]
134impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
135where
136 BS: BlobService + Send + Sync + Clone + 'static,
137 DS: DirectoryService + Send + Sync + Clone + 'static,
138{
139 #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
140 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, pathinfoservice::Error> {
141 let narinfo_url = self.derive_narinfo_url(digest)?;
142
143 let span = Span::current();
144 span.record("narinfo.url", narinfo_url.to_string());
145
146 let resp = self
147 .http_client
148 .get(narinfo_url)
149 .send()
150 .await
151 .map_err(Error::Reqwest)?;
152
153 if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
157 return Ok(None);
158 }
159
160 let narinfo_str = resp.text().await.map_err(Error::DecodeBody)?;
161
162 let narinfo = NarInfo::parse(&narinfo_str).map_err(Error::ParseNARInfo)?;
164
165 if !self.trusted_public_keys.is_empty() {
167 let fingerprint = narinfo.fingerprint();
168
169 if !self.trusted_public_keys.iter().any(|pubkey| {
170 narinfo
171 .signatures
172 .iter()
173 .any(|sig| pubkey.verify(&fingerprint, sig))
174 }) {
175 Err(Error::NoValidSignature)?
176 }
177 }
178
179 let nar_url = self
187 .base_url
188 .join(narinfo.url)
189 .map_err(|e| Error::JoinUrl(self.base_url.clone(), narinfo.url.to_owned(), e))?;
190 span.record("nar.url", nar_url.to_string());
191
192 let resp = self
193 .http_client
194 .get(nar_url.clone())
195 .send()
196 .await
197 .map_err(Error::Reqwest)?;
198
199 if !resp.status().is_success() {
201 Err(Error::FailedToRequestNAR(resp.status()))?;
202 }
203
204 let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
206 let e = e.without_url();
207 warn!(e=%e, "failed to get response body");
208 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
209 }));
210
211 let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
213 None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
214 Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
215 as Box<dyn AsyncRead + Send + Unpin>,
216 Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
217 as Box<dyn AsyncRead + Send + Unpin>,
218 Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
219 as Box<dyn AsyncRead + Send + Unpin>,
220 Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
221 as Box<dyn AsyncRead + Send + Unpin>,
222 Some(comp_str) => Err(Error::UnsupportedNARCompression(comp_str.to_owned()))?,
223 };
224
225 let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
226 self.blob_service.clone(),
227 &self.directory_service,
228 &mut r,
229 &narinfo.ca,
230 )
231 .await
232 .map_err(Error::IngestNAR)?;
233
234 if narinfo.nar_size != nar_size {
236 Err(Error::NARSizeMismatch {
237 narinfo_size: narinfo.nar_size,
238 actual_size: nar_size,
239 })?
240 }
241 if narinfo.nar_hash != nar_hash {
242 Err(Error::NARHashMismatch {
243 narinfo_nar_sha256: narinfo.nar_hash,
244 actual_nar_sha256: nar_hash,
245 })?
246 }
247
248 Ok(Some(PathInfo {
249 store_path: narinfo.store_path.to_owned(),
250 node: root_node,
251 references: narinfo.references.iter().map(StorePath::to_owned).collect(),
252 nar_size: narinfo.nar_size,
253 nar_sha256: narinfo.nar_hash,
254 deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
255 signatures: narinfo
256 .signatures
257 .into_iter()
258 .map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
259 .collect(),
260 ca: narinfo.ca,
261 }))
262 }
263
264 #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
265 async fn has(&self, digest: [u8; 20]) -> Result<bool, pathinfoservice::Error> {
266 let narinfo_url = self.derive_narinfo_url(digest)?;
267
268 let span = Span::current();
269 span.record("narinfo.url", narinfo_url.to_string());
270
271 let resp = self
272 .http_client
273 .head(narinfo_url)
274 .send()
275 .await
276 .map_err(Error::Reqwest)?;
277
278 if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
282 Ok(false)
283 } else {
284 Ok(true)
285 }
286 }
287
288 #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))]
289 async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, pathinfoservice::Error> {
290 Err(Box::new(Error::PutNotSupported))
291 }
292
293 fn list(&self) -> BoxStream<'static, Result<PathInfo, pathinfoservice::Error>> {
294 Box::pin(futures::stream::once(async {
295 Err(Error::ListNotSupported)?
296 }))
297 }
298}
299
300#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
301#[serde(deny_unknown_fields)]
302pub struct NixHTTPPathInfoServiceConfig {
303 base_url: Url,
304
305 #[serde(flatten)]
306 params: NixHTTPPathInfoServiceParams,
307}
308
309#[derive(serde::Deserialize, Clone, Debug, PartialEq, Eq)]
310#[serde(deny_unknown_fields)]
311struct NixHTTPPathInfoServiceParams {
312 #[serde(default = "default_blob_service")]
313 blob_service: String,
314 #[serde(default = "default_directory_service")]
315 directory_service: String,
316 #[serde(default)]
317 trusted_public_keys: Vec<String>,
320}
321
322fn default_blob_service() -> String {
323 "&root".to_string()
324}
325fn default_directory_service() -> String {
326 "&root".to_string()
327}
328
329impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
330 type Error = Box<dyn std::error::Error + Send + Sync>;
331 fn try_from(url: Url) -> Result<Self, Self::Error> {
332 let scheme = url
333 .scheme()
334 .strip_prefix("nix+")
335 .ok_or_else(|| Error::WrongConfig("scheme must start with nix+"))?;
336
337 if !url.has_authority() {
338 Err(Error::WrongConfig("url must have authority component"))?
339 }
340 if !url.has_host() {
341 Err(Error::WrongConfig("url must have host component"))?
342 }
343 if !["http", "https"].contains(&scheme) {
344 Err(Error::WrongConfig("unknown scheme"))?
345 }
346
347 Ok(NixHTTPPathInfoServiceConfig {
348 base_url: {
354 let mut url: Url = url
355 .to_string()
356 .strip_prefix("nix+")
357 .unwrap()
358 .parse()
359 .expect("stripped URL to parse again");
360 url.set_query(None);
361 url
362 },
363 params: serde_qs::from_str(url.query().unwrap_or_default())?,
364 })
365 }
366}
367
368#[async_trait]
369impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
370 type Output = dyn PathInfoService;
371 async fn build<'a>(
372 &'a self,
373 instance_name: &str,
374 context: &CompositionContext,
375 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
376 let (blob_service, directory_service) = futures::join!(
377 context.resolve::<dyn BlobService>(&self.params.blob_service),
378 context.resolve::<dyn DirectoryService>(&self.params.directory_service)
379 );
380 let svc = NixHTTPPathInfoService::try_build(
381 instance_name.to_string(),
382 self.to_owned(),
383 blob_service?,
384 directory_service?,
385 )?;
386 Ok(Arc::new(svc))
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::{NixHTTPPathInfoServiceConfig, NixHTTPPathInfoServiceParams};
393 use rstest::rstest;
394 use url::Url;
395
396 #[rstest]
397 #[case::correct_nix_https("nix+https://cache.nixos.org", Some(
399 NixHTTPPathInfoServiceConfig {
400 base_url: "https://cache.nixos.org".try_into().unwrap(),
401 params: NixHTTPPathInfoServiceParams {
402 blob_service: "&root".to_string(),
403 directory_service: "&root".to_string(),
404 trusted_public_keys: vec![]
405 }
406 }
407 ))]
408 #[case::correct_nix_http("nix+http://cache.nixos.org", Some(
410 NixHTTPPathInfoServiceConfig {
411 base_url: "http://cache.nixos.org".try_into().unwrap(),
412 params: NixHTTPPathInfoServiceParams {
413 blob_service: "&root".to_string(),
414 directory_service: "&root".to_string(),
415 trusted_public_keys: vec![]
416 }
417 }
418 ))]
419 #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", Some(
421 NixHTTPPathInfoServiceConfig {
422 base_url: "http://192.0.2.1/foo".try_into().unwrap(),
423 params: NixHTTPPathInfoServiceParams {
424 blob_service: "&root".to_string(),
425 directory_service: "&root".to_string(),
426 trusted_public_keys: vec![]
427 }
428 }
429 ))]
430 #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", Some(
432 NixHTTPPathInfoServiceConfig {
433 base_url: "http://[::1]:8080/foo".try_into().unwrap(),
434 params: NixHTTPPathInfoServiceParams {
435 blob_service: "&root".to_string(),
436 directory_service: "&root".to_string(),
437 trusted_public_keys: vec![]
438 }
439 }
440
441 ))]
442 #[case::correct_nix_https_with_trusted_public_key(
444 "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", Some(
445 NixHTTPPathInfoServiceConfig {
446 base_url: "https://cache.nixos.org".try_into().unwrap(),
447 params: NixHTTPPathInfoServiceParams {
448 blob_service: "&root".to_string(),
449 directory_service: "&root".to_string(),
450 trusted_public_keys: vec![
451 "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string()
452 ]
453 }
454 }
455 ))]
456 #[case::correct_nix_https_with_two_trusted_public_keys(
458 "nix+https://cache.nixos.org?trusted_public_keys[0]=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=&trusted_public_keys[1]=foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", Some(
459 NixHTTPPathInfoServiceConfig {
460 base_url: "https://cache.nixos.org".try_into().unwrap(),
461 params: NixHTTPPathInfoServiceParams {
462 blob_service: "&root".to_string(),
463 directory_service: "&root".to_string(),
464 trusted_public_keys: vec![
465 "cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=".to_string(),
466 "foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=".to_string()
467 ]
468 }
469 }
470 ))]
471 #[case::wrong_scheme("nix+grpc://example.com", None)]
472 #[case::missing_host("nix+http:///", None)]
473 #[case::missing_authority("nix+http:", None)]
474 #[case::trusted_public_keys_no_sequence(
476 "nix+https://cache.nixos.org?trusted_public_keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
477 None
478 )]
479 #[case::trusted_public_keys_wrong_pubkey(
481 "nix+https://cache.nixos.org?trustedpublickeys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=",
482 None
483 )]
484 fn parse_url(#[case] url_str: &str, #[case] exp_config: Option<NixHTTPPathInfoServiceConfig>) {
485 let url: Url = url_str.parse().expect("url to parse");
486
487 match (NixHTTPPathInfoServiceConfig::try_from(url), exp_config) {
488 (Ok(_), None) => panic!("parsing url unexpectedly succeeded"),
489 (Ok(config), Some(exp_config)) => assert_eq!(exp_config, config),
490 (Err(_), None) => {}
491 (Err(e), Some(_)) => panic!("parsing url unexpectedly failed: {e}"),
492 }
493 }
494}