snix_store/pathinfoservice/
nix_http.rs1use super::{PathInfo, PathInfoService};
2use crate::nar::ingest_nar_and_hash;
3use futures::{TryStreamExt, stream::BoxStream};
4use nix_compat::{
5 narinfo::{self, NarInfo, Signature},
6 nixbase32,
7 nixhash::NixHash,
8 store_path::StorePath,
9};
10use reqwest::StatusCode;
11use snix_castore::composition::{CompositionContext, ServiceBuilder};
12use snix_castore::{Error, blobservice::BlobService, directoryservice::DirectoryService};
13use std::sync::Arc;
14use tokio::io::{self, AsyncRead};
15use tonic::async_trait;
16use tracing::{debug, instrument, warn};
17use url::Url;
18
19pub struct NixHTTPPathInfoService<BS, DS> {
35 instance_name: String,
36 base_url: url::Url,
37 http_client: reqwest_middleware::ClientWithMiddleware,
38
39 blob_service: BS,
40 directory_service: DS,
41
42 public_keys: Option<Vec<narinfo::VerifyingKey>>,
45}
46
47impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
48 pub fn new(
49 instance_name: String,
50 base_url: url::Url,
51 blob_service: BS,
52 directory_service: DS,
53 ) -> Self {
54 Self {
55 instance_name,
56 base_url,
57 http_client: reqwest_middleware::ClientBuilder::new(
58 reqwest::Client::builder()
59 .user_agent(crate::USER_AGENT)
60 .build()
61 .expect("Client::new()"),
62 )
63 .with(snix_tracing::propagate::reqwest::tracing_middleware())
64 .build(),
65 blob_service,
66 directory_service,
67
68 public_keys: None,
69 }
70 }
71
72 pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::VerifyingKey>) {
74 self.public_keys = Some(public_keys);
75 }
76}
77
78#[async_trait]
79impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
80where
81 BS: BlobService + Send + Sync + Clone + 'static,
82 DS: DirectoryService + Send + Sync + Clone + 'static,
83{
84 #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))]
85 async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
86 let narinfo_url = self
87 .base_url
88 .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
89 .map_err(|e| {
90 warn!(e = %e, "unable to join URL");
91 io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
92 })?;
93
94 debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
95
96 let resp = self
97 .http_client
98 .get(narinfo_url)
99 .send()
100 .await
101 .map_err(|e| {
102 warn!(e=%e,"unable to send NARInfo request");
103 io::Error::new(
104 io::ErrorKind::InvalidInput,
105 "unable to send NARInfo request",
106 )
107 })?;
108
109 if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
113 return Ok(None);
114 }
115
116 let narinfo_str = resp.text().await.map_err(|e| {
117 warn!(e=%e,"unable to decode response as string");
118 io::Error::new(
119 io::ErrorKind::InvalidData,
120 "unable to decode response as string",
121 )
122 })?;
123
124 let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
126 warn!(e=%e,"unable to parse response as NarInfo");
127 io::Error::new(
128 io::ErrorKind::InvalidData,
129 "unable to parse response as NarInfo",
130 )
131 })?;
132
133 if let Some(public_keys) = &self.public_keys {
135 let fingerprint = narinfo.fingerprint();
136
137 if !public_keys.iter().any(|pubkey| {
138 narinfo
139 .signatures
140 .iter()
141 .any(|sig| pubkey.verify(&fingerprint, sig))
142 }) {
143 warn!("no valid signature found");
144 Err(io::Error::new(
145 io::ErrorKind::InvalidData,
146 "no valid signature found",
147 ))?;
148 }
149 }
150
151 let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
159 warn!(e = %e, "unable to join URL");
160 io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
161 })?;
162 debug!(nar_url= %nar_url, "constructed NAR url");
163
164 let resp = self
165 .http_client
166 .get(nar_url.clone())
167 .send()
168 .await
169 .map_err(|e| {
170 warn!(e=%e,"unable to send NAR request");
171 io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
172 })?;
173
174 if !resp.status().is_success() {
176 return Err(Error::StorageError(format!(
177 "unable to retrieve NAR at {}, status {}",
178 nar_url,
179 resp.status()
180 )));
181 }
182
183 let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
185 let e = e.without_url();
186 warn!(e=%e, "failed to get response body");
187 io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
188 }));
189
190 let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
192 None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
193 Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
194 as Box<dyn AsyncRead + Send + Unpin>,
195 Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
196 as Box<dyn AsyncRead + Send + Unpin>,
197 Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
198 as Box<dyn AsyncRead + Send + Unpin>,
199 Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
200 as Box<dyn AsyncRead + Send + Unpin>,
201 Some(comp_str) => {
202 return Err(Error::StorageError(format!(
203 "unsupported compression: {comp_str}"
204 )));
205 }
206 };
207
208 let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
209 self.blob_service.clone(),
210 self.directory_service.clone(),
211 &mut r,
212 &narinfo.ca,
213 )
214 .await
215 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
216
217 if narinfo.nar_size != nar_size {
219 warn!(
220 narinfo.nar_size = narinfo.nar_size,
221 http.nar_size = nar_size,
222 "NarSize mismatch"
223 );
224 Err(io::Error::new(
225 io::ErrorKind::InvalidData,
226 "NarSize mismatch".to_string(),
227 ))?;
228 }
229 if narinfo.nar_hash != nar_hash {
230 warn!(
231 narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
232 http.nar_hash = %NixHash::Sha256(nar_hash),
233 "NarHash mismatch"
234 );
235 Err(io::Error::new(
236 io::ErrorKind::InvalidData,
237 "NarHash mismatch".to_string(),
238 ))?;
239 }
240
241 Ok(Some(PathInfo {
242 store_path: narinfo.store_path.to_owned(),
243 node: root_node,
244 references: narinfo.references.iter().map(StorePath::to_owned).collect(),
245 nar_size: narinfo.nar_size,
246 nar_sha256: narinfo.nar_hash,
247 deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
248 signatures: narinfo
249 .signatures
250 .into_iter()
251 .map(|s| Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()))
252 .collect(),
253 ca: narinfo.ca,
254 }))
255 }
256
257 #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))]
258 async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
259 Err(Error::InvalidRequest(
260 "put not supported for this backend".to_string(),
261 ))
262 }
263
264 fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
265 Box::pin(futures::stream::once(async {
266 Err(Error::InvalidRequest(
267 "list not supported for this backend".to_string(),
268 ))
269 }))
270 }
271}
272
273#[derive(serde::Deserialize)]
274pub struct NixHTTPPathInfoServiceConfig {
275 base_url: String,
276 blob_service: String,
277 directory_service: String,
278 #[serde(default)]
279 public_keys: Option<Vec<String>>,
282}
283
284impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
285 type Error = Box<dyn std::error::Error + Send + Sync>;
286 fn try_from(url: Url) -> Result<Self, Self::Error> {
287 let mut public_keys: Option<Vec<String>> = None;
289 for (_, v) in url
290 .query_pairs()
291 .into_iter()
292 .filter(|(k, _)| k == "trusted-public-keys")
293 {
294 public_keys
295 .get_or_insert(Default::default())
296 .extend(v.split_ascii_whitespace().map(ToString::to_string));
297 }
298
299 let blob_service = url
301 .query_pairs()
302 .into_iter()
303 .find(|(k, _)| k == "blob_service")
304 .map(|(_, v)| v.to_string())
305 .unwrap_or("root".to_string());
306 let directory_service = url
307 .query_pairs()
308 .into_iter()
309 .find(|(k, _)| k == "directory_service")
310 .map(|(_, v)| v.to_string())
311 .unwrap_or("root".to_string());
312
313 Ok(NixHTTPPathInfoServiceConfig {
314 base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
318 blob_service,
319 directory_service,
320 public_keys,
321 })
322 }
323}
324
325#[async_trait]
326impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
327 type Output = dyn PathInfoService;
328 async fn build<'a>(
329 &'a self,
330 instance_name: &str,
331 context: &CompositionContext,
332 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
333 let (blob_service, directory_service) = futures::join!(
334 context.resolve::<dyn BlobService>(self.blob_service.clone()),
335 context.resolve::<dyn DirectoryService>(self.directory_service.clone())
336 );
337 let mut svc = NixHTTPPathInfoService::new(
338 instance_name.to_string(),
339 Url::parse(&self.base_url)?,
340 blob_service?,
341 directory_service?,
342 );
343 if let Some(public_keys) = &self.public_keys {
344 svc.set_public_keys(
345 public_keys
346 .iter()
347 .map(|pubkey_str| {
348 narinfo::VerifyingKey::parse(pubkey_str)
349 .map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
350 })
351 .collect::<Result<Vec<_>, Error>>()?,
352 );
353 }
354 Ok(Arc::new(svc))
355 }
356}