snix_store/
utils.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::Arc,
5    task::{self, Poll},
6};
7use tokio::io::{self, AsyncWrite};
8
9use snix_castore::utils as castore_utils;
10use snix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
11use url::Url;
12
13use crate::composition::REG;
14use crate::nar::{NarCalculationService, SimpleRenderer};
15use crate::pathinfoservice::PathInfoService;
16use snix_castore::composition::{
17    Composition, DeserializeWithRegistry, ServiceBuilder, with_registry,
18};
19
20#[derive(serde::Deserialize, Default)]
21pub struct CompositionConfigs {
22    pub blobservices:
23        HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
24    pub directoryservices: HashMap<
25        String,
26        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
27    >,
28    pub pathinfoservices: HashMap<
29        String,
30        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>,
31    >,
32}
33
34/// Provides a set of clap arguments to configure snix-\[ca\]store services.
35///
36/// This particular variant has defaults tailored for usecases accessing data
37/// directly locally, like the `snix-store daemon` command.
38#[derive(clap::Parser, Clone)]
39#[group(id = "StoreServiceUrls")]
40pub struct ServiceUrls {
41    #[clap(flatten)]
42    castore_service_addrs: castore_utils::ServiceUrls,
43
44    #[arg(long, env, default_value = "redb:///var/lib/snix-store/pathinfo.redb")]
45    path_info_service_addr: String,
46
47    /// Path to a TOML file describing the way the services should be composed
48    /// Experimental because the format is not final.
49    /// If specified, the other service addrs are ignored.
50    #[cfg(feature = "xp-composition-cli")]
51    #[arg(long, env)]
52    experimental_store_composition: Option<String>,
53}
54
55/// Provides a set of clap arguments to configure snix-\[ca\]store services.
56///
57/// This particular variant has defaults tailored for usecases accessing data
58/// from another running snix daemon, via gRPC.
59#[derive(clap::Parser, Clone)]
60#[group(id = "StoreServiceUrlsGrpc")]
61pub struct ServiceUrlsGrpc {
62    #[clap(flatten)]
63    castore_service_addrs: castore_utils::ServiceUrlsGrpc,
64
65    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
66    path_info_service_addr: String,
67
68    #[cfg(feature = "xp-composition-cli")]
69    #[arg(long, env)]
70    experimental_store_composition: Option<String>,
71}
72
73/// Provides a set of clap arguments to configure snix-\[ca\]store services.
74///
75/// This particular variant has defaults tailored for usecases keeping all data
76/// in memory.
77/// It's currently used in snix-cli, as we don't really care about persistency
78/// there yet, and using something else here might make some perf output harder
79/// to interpret.
80#[derive(clap::Parser, Clone)]
81#[group(id = "StoreServiceUrlsMemory")]
82pub struct ServiceUrlsMemory {
83    #[clap(flatten)]
84    castore_service_addrs: castore_utils::ServiceUrlsMemory,
85
86    #[arg(long, env, default_value = "memory://")]
87    path_info_service_addr: String,
88
89    #[cfg(feature = "xp-composition-cli")]
90    #[arg(long, env)]
91    experimental_store_composition: Option<String>,
92}
93
94impl From<ServiceUrlsGrpc> for ServiceUrls {
95    fn from(urls: ServiceUrlsGrpc) -> ServiceUrls {
96        ServiceUrls {
97            castore_service_addrs: urls.castore_service_addrs.into(),
98            path_info_service_addr: urls.path_info_service_addr,
99            #[cfg(feature = "xp-composition-cli")]
100            experimental_store_composition: urls.experimental_store_composition,
101        }
102    }
103}
104
105impl From<ServiceUrlsMemory> for ServiceUrls {
106    fn from(urls: ServiceUrlsMemory) -> ServiceUrls {
107        ServiceUrls {
108            castore_service_addrs: urls.castore_service_addrs.into(),
109            path_info_service_addr: urls.path_info_service_addr,
110            #[cfg(feature = "xp-composition-cli")]
111            experimental_store_composition: urls.experimental_store_composition,
112        }
113    }
114}
115
116/// Deserializes service addresses into composition config, configuring each
117/// service as the single "root".
118/// If the `xp-composition-cli` feature is enabled, and a file specified in the
119/// `--experimental-store-composition` parameter, this is used instead.
120pub async fn addrs_to_configs(
121    urls: impl Into<ServiceUrls>,
122) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
123    let urls: ServiceUrls = urls.into();
124
125    #[cfg(feature = "xp-composition-cli")]
126    if let Some(conf_path) = urls.experimental_store_composition {
127        let conf_text = tokio::fs::read_to_string(conf_path).await?;
128        return Ok(with_registry(&REG, || toml::from_str(&conf_text))?);
129    }
130
131    let mut configs: CompositionConfigs = Default::default();
132
133    let blob_service_url = Url::parse(&urls.castore_service_addrs.blob_service_addr)?;
134    let directory_service_url = Url::parse(&urls.castore_service_addrs.directory_service_addr)?;
135    let path_info_service_url = Url::parse(&urls.path_info_service_addr)?;
136
137    configs.blobservices.insert(
138        "root".into(),
139        with_registry(&REG, || blob_service_url.try_into())?,
140    );
141    configs.directoryservices.insert(
142        "root".into(),
143        with_registry(&REG, || directory_service_url.try_into())?,
144    );
145    configs.pathinfoservices.insert(
146        "root".into(),
147        with_registry(&REG, || path_info_service_url.try_into())?,
148    );
149
150    Ok(configs)
151}
152
153/// Construct the store handles from their addrs.
154pub async fn construct_services(
155    urls: impl Into<ServiceUrls>,
156) -> Result<
157    (
158        Arc<dyn BlobService>,
159        Arc<dyn DirectoryService>,
160        Arc<dyn PathInfoService>,
161        Box<dyn NarCalculationService>,
162    ),
163    Box<dyn std::error::Error + Send + Sync>,
164> {
165    let configs = addrs_to_configs(urls).await?;
166    construct_services_from_configs(configs).await
167}
168
169/// Construct the store handles from their addrs.
170pub async fn construct_services_from_configs(
171    configs: CompositionConfigs,
172) -> Result<
173    (
174        Arc<dyn BlobService>,
175        Arc<dyn DirectoryService>,
176        Arc<dyn PathInfoService>,
177        Box<dyn NarCalculationService>,
178    ),
179    Box<dyn std::error::Error + Send + Sync>,
180> {
181    let mut comp = Composition::new(&REG);
182
183    comp.extend(configs.blobservices);
184    comp.extend(configs.directoryservices);
185    comp.extend(configs.pathinfoservices);
186
187    let blob_service: Arc<dyn BlobService> = comp.build("root").await?;
188    let directory_service: Arc<dyn DirectoryService> = comp.build("root").await?;
189    let path_info_service: Arc<dyn PathInfoService> = comp.build("root").await?;
190
191    // HACK: The grpc client also implements NarCalculationService, and we
192    // really want to use it (otherwise we'd need to fetch everything again for hashing).
193    // Until we revamped store composition and config, detect this special case here.
194    let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
195        .nar_calculation_service()
196        .unwrap_or_else(|| {
197            Box::new(SimpleRenderer::new(
198                blob_service.clone(),
199                directory_service.clone(),
200            ))
201        });
202
203    Ok((
204        blob_service,
205        directory_service,
206        path_info_service,
207        nar_calculation_service,
208    ))
209}
210
211/// The inverse of [tokio_util::io::SyncIoBridge].
212/// Don't use this with anything that actually does blocking I/O.
213pub struct AsyncIoBridge<T>(pub T);
214
215impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
216    fn poll_write(
217        self: Pin<&mut Self>,
218        _cx: &mut task::Context<'_>,
219        buf: &[u8],
220    ) -> Poll<io::Result<usize>> {
221        Poll::Ready(self.get_mut().0.write(buf))
222    }
223
224    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
225        Poll::Ready(self.get_mut().0.flush())
226    }
227
228    fn poll_shutdown(
229        self: Pin<&mut Self>,
230        _cx: &mut task::Context<'_>,
231    ) -> Poll<Result<(), io::Error>> {
232        Poll::Ready(Ok(()))
233    }
234}