snix_castore/
composition.rs

1//! The composition module allows composing different kinds of services based on a set of service
2//! configurations _at runtime_.
3//!
4//! Store configs are deserialized with serde. The registry provides a stateful mapping from the
5//! `type` tag of an internally tagged enum on the serde side to a Config struct which is
6//! deserialized and then returned as a `Box<dyn ServiceBuilder<Output = dyn BlobService>>`
7//! (the same for DirectoryService instead of BlobService etc).
8//!
9//! ### Example 1.: Implementing a new BlobService
10//!
11//! You need a Config struct which implements `DeserializeOwned` and
12//! `ServiceBuilder<Output = dyn BlobService>`.
13//! Provide the user with a function to call with
14//! their registry. You register your new type as:
15//!
16//! ```
17//! use std::sync::Arc;
18//!
19//! use snix_castore::composition::*;
20//! use snix_castore::blobservice::BlobService;
21//!
22//! #[derive(serde::Deserialize)]
23//! struct MyBlobServiceConfig {
24//! }
25//!
26//! #[tonic::async_trait]
27//! impl ServiceBuilder for MyBlobServiceConfig {
28//!     type Output = dyn BlobService;
29//!     async fn build(&self, _: &str, _: &CompositionContext) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
30//!         todo!()
31//!     }
32//! }
33//!
34//! impl TryFrom<url::Url> for MyBlobServiceConfig {
35//!     type Error = Box<dyn std::error::Error + Send + Sync>;
36//!     fn try_from(url: url::Url) -> Result<Self, Self::Error> {
37//!         todo!()
38//!     }
39//! }
40//!
41//! pub fn add_my_service(reg: &mut Registry) {
42//!     reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, MyBlobServiceConfig>("myblobservicetype");
43//! }
44//! ```
45//!
46//! Now, when a user deserializes a store config with the type tag "myblobservicetype" into a
47//! `Box<dyn ServiceBuilder<Output = Arc<dyn BlobService>>>`, it will be done via `MyBlobServiceConfig`.
48//!
49//! ### Example 2.: Composing stores to get one store
50//!
51//! ```
52//! use std::sync::Arc;
53//! use snix_castore::composition::*;
54//! use snix_castore::blobservice::BlobService;
55//!
56//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
57//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async move {
58//! let blob_services_configs_json = serde_json::json!({
59//!   "blobstore1": {
60//!     "type": "memory"
61//!   },
62//!   "blobstore2": {
63//!     "type": "memory"
64//!   },
65//!   "root": {
66//!     "type": "combined",
67//!     "near": "blobstore1",
68//!     "far": "blobstore2"
69//!   }
70//! });
71//!
72//! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?;
73//! let mut blob_service_composition = Composition::new(&REG);
74//! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
75//! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("root").await?;
76//! # Ok(())
77//! # })
78//! # }
79//! ```
80//!
81//! ### Example 3.: Creating another registry extending the default registry with third-party types
82//!
83//! ```
84//! # pub fn add_my_service(reg: &mut snix_castore::composition::Registry) {}
85//! let mut my_registry = snix_castore::composition::Registry::default();
86//! snix_castore::composition::add_default_services(&mut my_registry);
87//! add_my_service(&mut my_registry);
88//! ```
89//!
90//! Continue with Example 2, with my_registry instead of REG
91//!
92//! EXPERIMENTAL: If the xp-composition-url-refs feature is enabled,
93//! entrypoints can also be URL strings, which are created as
94//! anonymous stores. Instantiations of the same URL will
95//! result in a new, distinct anonymous store each time, so creating
96//! two `memory://` stores with this method will not share the same view.
97//! This behavior might change in the future.
98
99use erased_serde::deserialize;
100use futures::FutureExt;
101use futures::future::BoxFuture;
102use serde::de::DeserializeOwned;
103use serde_tagged::de::{BoxFnSeed, SeedFactory};
104use serde_tagged::util::TagString;
105use std::any::{Any, TypeId};
106use std::cell::Cell;
107use std::collections::BTreeMap;
108use std::collections::HashMap;
109use std::marker::PhantomData;
110use std::sync::{Arc, LazyLock};
111use tonic::async_trait;
112
113/// Resolves tag names to the corresponding Config type.
114// Registry implementation details:
115// This is really ugly. Really we would want to store this as a generic static field:
116//
117// ```
118// struct Registry<T>(BTreeMap<(&'static str), RegistryEntry<T>);
119// static REG<T>: Registry<T>;
120// ```
121//
122// so that one version of the static is generated for each Type that the registry is accessed for.
123// However, this is not possible, because generics are only a thing in functions, and even there
124// they will not interact with static items:
125// https://doc.rust-lang.org/reference/items/static-items.html#statics--generics
126//
127// So instead, we make this lookup at runtime by putting the TypeId into the key.
128// But now we can no longer store the `BoxFnSeed<T>` because we are lacking the generic parameter
129// T, so instead store it as `Box<dyn Any>` and downcast to `&BoxFnSeed<T>` when performing the
130// lookup.
131// I said it was ugly...
132#[derive(Default)]
133pub struct Registry(BTreeMap<(TypeId, &'static str), Box<dyn Any + Sync>>);
134pub type FromUrlSeed<T> =
135    Box<dyn Fn(url::Url) -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Sync>;
136pub struct RegistryEntry<T> {
137    serde_deserialize_seed: BoxFnSeed<DeserializeWithRegistry<T>>,
138    from_url_seed: FromUrlSeed<DeserializeWithRegistry<T>>,
139}
140
141struct RegistryWithFakeType<'r, T>(&'r Registry, PhantomData<T>);
142
143impl<'r, 'de: 'r, T: 'static> SeedFactory<'de, TagString<'de>> for RegistryWithFakeType<'r, T> {
144    type Value = DeserializeWithRegistry<T>;
145    type Seed = &'r BoxFnSeed<Self::Value>;
146
147    // Required method
148    fn seed<E>(self, tag: TagString<'de>) -> Result<Self::Seed, E>
149    where
150        E: serde::de::Error,
151    {
152        // using find() and not get() because of https://github.com/rust-lang/rust/issues/80389
153        let seed: &Box<dyn Any + Sync> = self
154            .0
155            .0
156            .iter()
157            .find(|(k, _)| *k == &(TypeId::of::<T>(), tag.as_ref()))
158            .ok_or_else(|| serde::de::Error::custom(format!("Unknown type: {}", tag)))?
159            .1;
160
161        let entry: &RegistryEntry<T> = <dyn Any>::downcast_ref(&**seed).unwrap();
162
163        Ok(&entry.serde_deserialize_seed)
164    }
165}
166
167/// Wrapper type which implements Deserialize using the registry
168///
169/// Wrap your type in this in order to deserialize it using a registry, e.g.
170/// `RegistryWithFakeType<Box<dyn MyTrait>>`, then the types registered for `Box<dyn MyTrait>`
171/// will be used.
172pub struct DeserializeWithRegistry<T>(pub T);
173
174impl Registry {
175    /// Registers a mapping from type tag to a concrete type into the registry.
176    ///
177    /// The type parameters are very important:
178    /// After calling `register::<Box<dyn FooTrait>, FooStruct>("footype")`, when a user
179    /// deserializes into an input with the type tag "myblobservicetype" into a
180    /// `Box<dyn FooTrait>`, it will first call the Deserialize imple of `FooStruct` and
181    /// then convert it into a `Box<dyn FooTrait>` using From::from.
182    pub fn register<
183        T: 'static,
184        C: DeserializeOwned
185            + TryFrom<url::Url, Error = Box<dyn std::error::Error + Send + Sync>>
186            + Into<T>,
187    >(
188        &mut self,
189        type_name: &'static str,
190    ) {
191        self.0.insert(
192            (TypeId::of::<T>(), type_name),
193            Box::new(RegistryEntry {
194                serde_deserialize_seed: BoxFnSeed::new(|x| {
195                    deserialize::<C>(x)
196                        .map(Into::into)
197                        .map(DeserializeWithRegistry)
198                }),
199                from_url_seed: Box::new(|url| {
200                    C::try_from(url)
201                        .map(Into::into)
202                        .map(DeserializeWithRegistry)
203                }),
204            }),
205        );
206    }
207}
208
209impl<'de, T: 'static> serde::Deserialize<'de> for DeserializeWithRegistry<T> {
210    fn deserialize<D>(de: D) -> std::result::Result<Self, D::Error>
211    where
212        D: serde::Deserializer<'de>,
213    {
214        serde_tagged::de::internal::deserialize(
215            de,
216            "type",
217            RegistryWithFakeType(ACTIVE_REG.get().unwrap(), PhantomData::<T>),
218        )
219    }
220}
221
222#[derive(Debug, thiserror::Error)]
223enum TryFromUrlError {
224    #[error("Unknown type: {0}")]
225    UnknownTag(String),
226}
227
228impl<T: 'static> TryFrom<url::Url> for DeserializeWithRegistry<T> {
229    type Error = Box<dyn std::error::Error + Send + Sync>;
230    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
231        let tag = url.scheme().split('+').next().unwrap();
232        // same as in the SeedFactory impl: using find() and not get() because of https://github.com/rust-lang/rust/issues/80389
233        let seed = ACTIVE_REG
234            .get()
235            .unwrap()
236            .0
237            .iter()
238            .find(|(k, _)| *k == &(TypeId::of::<T>(), tag))
239            .ok_or_else(|| Box::new(TryFromUrlError::UnknownTag(tag.into())))?
240            .1;
241        let entry: &RegistryEntry<T> = <dyn Any>::downcast_ref(&**seed).unwrap();
242        (entry.from_url_seed)(url)
243    }
244}
245
246thread_local! {
247    /// The active Registry is global state, because there is no convenient and universal way to pass state
248    /// into the functions usually used for deserialization, e.g. `serde_json::from_str`, `toml::from_str`,
249    /// `serde_qs::from_str`.
250    static ACTIVE_REG: Cell<Option<&'static Registry>> = panic!("reg was accessed before initialization");
251}
252
253/// Run the provided closure with a registry context.
254/// Any serde deserialize calls within the closure will use the registry to resolve tag names to
255/// the corresponding Config type.
256pub fn with_registry<R>(reg: &'static Registry, f: impl FnOnce() -> R) -> R {
257    ACTIVE_REG.set(Some(reg));
258    let result = f();
259    ACTIVE_REG.set(None);
260    result
261}
262
263/// The provided registry of snix_castore, with all builtin BlobStore/DirectoryStore implementations
264pub static REG: LazyLock<&'static Registry> = LazyLock::new(|| {
265    let mut reg = Default::default();
266    add_default_services(&mut reg);
267    // explicitly leak to get an &'static, so that we gain `&Registry: Send` from `Registry: Sync`
268    Box::leak(Box::new(reg))
269});
270
271// ---------- End of generic registry code --------- //
272
273/// Register the builtin services of snix_castore (blob services and directory
274/// services) with the given registry.
275/// This can be used outside to create your own registry with the builtin types
276/// _and_ extra third party types.
277pub fn add_default_services(reg: &mut Registry) {
278    crate::blobservice::register_blob_services(reg);
279    crate::directoryservice::register_directory_services(reg);
280}
281
282pub struct CompositionContext<'a> {
283    // The stack used to detect recursive instantiations and prevent deadlocks
284    // The TypeId of the trait object is included to distinguish e.g. the
285    // BlobService "root" and the DirectoryService "root".
286    stack: Vec<(TypeId, String)>,
287    registry: &'static Registry,
288    composition: Option<&'a Composition>,
289}
290
291impl CompositionContext<'_> {
292    /// Get a composition context for one-off store creation.
293    pub fn blank(registry: &'static Registry) -> Self {
294        Self {
295            registry,
296            stack: Default::default(),
297            composition: None,
298        }
299    }
300
301    pub async fn resolve<T: ?Sized + Send + Sync + 'static>(
302        &self,
303        entrypoint: String,
304    ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync + 'static>> {
305        // disallow recursion
306        if self
307            .stack
308            .contains(&(TypeId::of::<T>(), entrypoint.clone()))
309        {
310            return Err(CompositionError::Recursion(
311                self.stack.iter().map(|(_, n)| n.clone()).collect(),
312            )
313            .into());
314        }
315
316        Ok(self.build_internal(entrypoint).await?)
317    }
318
319    #[cfg(feature = "xp-composition-url-refs")]
320    async fn build_anonymous<T: ?Sized + Send + Sync + 'static>(
321        &self,
322        entrypoint: String,
323    ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> {
324        let url = url::Url::parse(&entrypoint)?;
325        let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> =
326            with_registry(self.registry, || url.try_into())?;
327        config.0.build("anonymous", self).await
328    }
329
330    fn build_internal<T: ?Sized + Send + Sync + 'static>(
331        &self,
332        entrypoint: String,
333    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
334        #[cfg(feature = "xp-composition-url-refs")]
335        if entrypoint.contains("://") {
336            // There is a chance this is a url. we are building an anonymous store
337            return Box::pin(async move {
338                self.build_anonymous(entrypoint.clone())
339                    .await
340                    .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e)))
341            });
342        }
343
344        let mut stores = match self.composition {
345            Some(comp) => comp.stores.lock().unwrap(),
346            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
347        };
348        let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
349            Some(v) => v,
350            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
351        };
352        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
353        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
354        // this temporary value.
355        let prev_val = std::mem::replace(
356            entry,
357            Box::new(InstantiationState::<T>::Done(Err(
358                CompositionError::Poisoned(entrypoint.clone()),
359            ))),
360        );
361        let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
362            InstantiationState::Done(service) => (
363                InstantiationState::Done(service.clone()),
364                futures::future::ready(service).boxed(),
365            ),
366            // the construction of the store has not started yet.
367            InstantiationState::Config(config) => {
368                let (tx, rx) = tokio::sync::watch::channel(None);
369                (
370                    InstantiationState::InProgress(rx),
371                    (async move {
372                        let mut new_context = CompositionContext {
373                            composition: self.composition,
374                            registry: self.registry,
375                            stack: self.stack.clone(),
376                        };
377                        new_context
378                            .stack
379                            .push((TypeId::of::<T>(), entrypoint.clone()));
380                        let res =
381                            config.build(&entrypoint, &new_context).await.map_err(|e| {
382                                match e.downcast() {
383                                    Ok(e) => *e,
384                                    Err(e) => CompositionError::Failed(entrypoint, e.into()),
385                                }
386                            });
387                        tx.send(Some(res.clone())).unwrap();
388                        res
389                    })
390                    .boxed(),
391                )
392            }
393            // there is already a task driving forward the construction of this store, wait for it
394            // to notify us via the provided channel
395            InstantiationState::InProgress(mut recv) => {
396                (InstantiationState::InProgress(recv.clone()), {
397                    (async move {
398                        loop {
399                            if let Some(v) =
400                                recv.borrow_and_update().as_ref().map(|res| res.clone())
401                            {
402                                break v;
403                            }
404                            recv.changed().await.unwrap();
405                        }
406                    })
407                    .boxed()
408                })
409            }
410        };
411        *entry = Box::new(new_val);
412        ret
413    }
414}
415
416#[async_trait]
417/// This is the trait usually implemented on a per-store-type Config struct and
418/// used to instantiate it.
419pub trait ServiceBuilder: Send + Sync {
420    type Output: ?Sized;
421    async fn build(
422        &self,
423        instance_name: &str,
424        context: &CompositionContext,
425    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>>;
426}
427
428impl<T: ?Sized, S: ServiceBuilder<Output = T> + 'static> From<S>
429    for Box<dyn ServiceBuilder<Output = T>>
430{
431    fn from(t: S) -> Self {
432        Box::new(t)
433    }
434}
435
436enum InstantiationState<T: ?Sized> {
437    Config(Box<dyn ServiceBuilder<Output = T>>),
438    InProgress(tokio::sync::watch::Receiver<Option<Result<Arc<T>, CompositionError>>>),
439    Done(Result<Arc<T>, CompositionError>),
440}
441
442pub struct Composition {
443    registry: &'static Registry,
444    stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>,
445}
446
447#[derive(thiserror::Error, Clone, Debug)]
448pub enum CompositionError {
449    #[error("store not found: {0}")]
450    NotFound(String),
451    #[error("recursion not allowed {0:?}")]
452    Recursion(Vec<String>),
453    #[error("store construction panicked {0}")]
454    Poisoned(String),
455    #[error("instantiation of service {0} failed: {1}")]
456    Failed(String, Arc<dyn std::error::Error + Send + Sync>),
457}
458
459impl<T: ?Sized + Send + Sync + 'static>
460    Extend<(
461        String,
462        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
463    )> for Composition
464{
465    fn extend<I>(&mut self, configs: I)
466    where
467        I: IntoIterator<
468            Item = (
469                String,
470                DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
471            ),
472        >,
473    {
474        self.stores
475            .lock()
476            .unwrap()
477            .extend(configs.into_iter().map(|(k, v)| {
478                (
479                    (TypeId::of::<T>(), k),
480                    Box::new(InstantiationState::Config(v.0)) as Box<dyn Any + Send + Sync>,
481                )
482            }))
483    }
484}
485
486impl Composition {
487    /// The given registry will be used for creation of anonymous stores during composition
488    pub fn new(registry: &'static Registry) -> Self {
489        Self {
490            registry,
491            stores: Default::default(),
492        }
493    }
494
495    pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>(
496        &mut self,
497        // Keep the concrete `HashMap` type here since it allows for type
498        // inference of what type is previously being deserialized.
499        configs: HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>>,
500    ) {
501        self.extend(configs);
502    }
503
504    /// Looks up the entrypoint name in the composition and returns an instantiated service.
505    pub async fn build<T: ?Sized + Send + Sync + 'static>(
506        &self,
507        entrypoint: &str,
508    ) -> Result<Arc<T>, CompositionError> {
509        self.context().build_internal(entrypoint.to_string()).await
510    }
511
512    pub fn context(&self) -> CompositionContext {
513        CompositionContext {
514            registry: self.registry,
515            stack: vec![],
516            composition: Some(self),
517        }
518    }
519}
520
521#[cfg(test)]
522mod test {
523    use super::*;
524    use crate::blobservice::BlobService;
525    use std::sync::Arc;
526
527    /// Test that we return a reference to the same instance of MemoryBlobService (via ptr_eq)
528    /// when instantiating the same entrypoint twice. By instantiating concurrently, we also
529    /// test the channels notifying the second consumer when the store has been instantiated.
530    #[tokio::test]
531    async fn concurrent() {
532        let blob_services_configs_json = serde_json::json!({
533            "root": {
534                "type": "memory",
535            }
536        });
537
538        let blob_services_configs =
539            with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
540        let mut blob_service_composition = Composition::new(&REG);
541        blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
542        let (blob_service1, blob_service2) = tokio::join!(
543            blob_service_composition.build::<dyn BlobService>("root"),
544            blob_service_composition.build::<dyn BlobService>("root")
545        );
546        assert!(Arc::ptr_eq(
547            &blob_service1.unwrap(),
548            &blob_service2.unwrap()
549        ));
550    }
551
552    /// Test that we throw the correct error when an instantiation would recurse (deadlock)
553    #[tokio::test]
554    async fn reject_recursion() {
555        let blob_services_configs_json = serde_json::json!({
556            "root": {
557                "type": "combined",
558                "near": "other",
559                "far": "other"
560            },
561            "other": {
562                "type": "combined",
563                "near": "root",
564                "far": "root"
565            }
566        });
567
568        let blob_services_configs =
569            with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
570        let mut blob_service_composition = Composition::new(&REG);
571        blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
572        match blob_service_composition
573            .build::<dyn BlobService>("root")
574            .await
575        {
576            Err(CompositionError::Recursion(stack)) => {
577                assert_eq!(stack, vec!["root".to_string(), "other".to_string()])
578            }
579            other => panic!("should have returned an error, returned: {:?}", other.err()),
580        }
581    }
582}