snix_castore/directoryservice/
object_store.rs1use std::collections::HashMap;
2use std::collections::hash_map;
3use std::sync::Arc;
4
5use data_encoding::HEXLOWER;
6use futures::SinkExt;
7use futures::StreamExt;
8use futures::TryStreamExt;
9use futures::stream::BoxStream;
10use object_store::ObjectStoreExt;
11use object_store::{ObjectStore, path::Path};
12use prost::Message;
13use tokio::io::AsyncWriteExt;
14use tokio_util::codec::LengthDelimitedCodec;
15use tonic::async_trait;
16use tracing::{Level, instrument, trace, warn};
17use url::Url;
18
19use super::{Directory, DirectoryPutter, DirectoryService, RootToLeavesValidator};
20use crate::composition::{CompositionContext, ServiceBuilder};
21use crate::directoryservice::directory_graph::DirectoryGraphBuilder;
22use crate::{B3Digest, Node, proto};
23
24#[derive(Clone)]
32pub struct ObjectStoreDirectoryService {
33 instance_name: String,
34 object_store: Arc<dyn ObjectStore>,
35 base_path: Path,
36}
37
38#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
39fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path {
40 base_path
41 .child("dirs")
42 .child("b3")
43 .child(HEXLOWER.encode(&digest.as_slice()[..2]))
44 .child(HEXLOWER.encode(digest.as_slice()))
45}
46
47fn parse_proto_directory<F>(
50 encoded_directory: &[u8],
51 digest_allowed: F,
52) -> Result<crate::Directory, Error>
53where
54 F: Fn(&B3Digest) -> bool,
55{
56 let actual_digest = B3Digest::from(blake3::hash(encoded_directory).as_bytes());
57 if !digest_allowed(&actual_digest) {
58 return Err(Error::UnexpectedDigest(actual_digest));
59 }
60
61 let directory_proto =
62 proto::Directory::decode(encoded_directory).map_err(Error::ProtobufDecode)?;
63
64 Directory::try_from(directory_proto).map_err(Error::DirectoryValidation)
65}
66
67#[allow(clippy::identity_op)]
68const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; impl ObjectStoreDirectoryService {
71 pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error>
77 where
78 I: IntoIterator<Item = (K, V)>,
79 K: AsRef<str>,
80 V: Into<String>,
81 {
82 let (object_store, path) = object_store::parse_url_opts(url, options)?;
83
84 Ok(Self {
85 instance_name: "root".into(),
86 object_store: Arc::new(object_store),
87 base_path: path,
88 })
89 }
90
91 pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> {
93 Self::parse_url_opts(url, Vec::<(String, String)>::new())
94 }
95
96 pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self {
97 Self {
98 instance_name,
99 object_store,
100 base_path,
101 }
102 }
103}
104
105#[async_trait]
106impl DirectoryService for ObjectStoreDirectoryService {
107 #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
110 async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, super::Error> {
111 self.get_recursive(digest).take(1).next().await.transpose()
112 }
113
114 #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
115 async fn put(&self, directory: Directory) -> Result<B3Digest, super::Error> {
116 if directory
118 .nodes()
119 .any(|(_, e)| matches!(e, Node::Directory { .. }))
120 {
121 Err(Error::PutForDirectoryWithChildren)?
122 }
123
124 let mut handle = self.put_multiple_start();
125 handle.put(directory).await?;
126 handle.close().await
127 }
128
129 #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
130 fn get_recursive(
131 &self,
132 root_directory_digest: &B3Digest,
133 ) -> BoxStream<'_, Result<Directory, super::Error>> {
134 let dir_path = derive_dirs_path(&self.base_path, root_directory_digest);
137 let object_store = &self.object_store;
138 let root_directory_digest = *root_directory_digest;
139
140 async_stream::try_stream! {
141 let bytes_stream = match object_store.get(&dir_path).await {
142 Ok(v) => v.into_stream(),
143 Err(object_store::Error::NotFound { .. }) => {
144 return;
145 }
146 Err(e) => Err(Error::ObjectStore(e))?,
147 };
148
149 let r = tokio_util::io::StreamReader::new(bytes_stream);
151 let decompressed_stream = async_compression::tokio::bufread::ZstdDecoder::new(r);
152
153 let mut encoded_directories = LengthDelimitedCodec::builder()
155 .max_frame_length(MAX_FRAME_LENGTH)
156 .length_field_type::<u32>()
157 .new_read(decompressed_stream)
158 .err_into::<Error>();
159
160 let mut order_validator = RootToLeavesValidator::new_with_root_digest(root_directory_digest);
161 while let Some(encoded_directory) = encoded_directories.try_next().await? {
162 let directory = parse_proto_directory(&encoded_directory, |digest| {
163 order_validator.would_accept(digest)
164 })?;
165
166 order_validator.try_accept(&directory).map_err(Error::DirectoryOrdering)?;
167
168 yield directory;
169 }
170
171 order_validator.finalize().map_err(Error::DirectoryOrdering)?;
172 }.boxed()
173 }
174
175 #[instrument(skip_all)]
176 fn put_multiple_start(&self) -> Box<dyn DirectoryPutter + '_>
177 where
178 Self: Clone,
179 {
180 Box::new(ObjectStoreDirectoryPutter::new(
181 self.object_store.clone(),
182 &self.base_path,
183 ))
184 }
185}
186
187#[derive(thiserror::Error, Debug)]
188enum Error {
189 #[error("wrong arguments: {0}")]
190 WrongConfig(&'static str),
191 #[error("put() may only be used for directories without children")]
192 PutForDirectoryWithChildren,
193
194 #[error("Directory Graph ordering error")]
195 DirectoryOrdering(#[from] crate::directoryservice::OrderingError),
196 #[error("requested directory has unexpected digest {0}")]
197 UnexpectedDigest(B3Digest),
198 #[error("failed to decode protobuf: {0}")]
199 ProtobufDecode(#[from] prost::DecodeError),
200 #[error("failed to validate directory: {0}")]
201 DirectoryValidation(#[from] crate::DirectoryError),
202
203 #[error("DirectoryPutter already closed")]
204 DirectoryPutterAlreadyClosed,
205
206 #[error("ObjectStore error: {0}")]
207 ObjectStore(#[from] object_store::Error),
208
209 #[error("io error: {0}")]
210 IO(#[from] std::io::Error),
211}
212
213#[derive(serde::Deserialize)]
214#[serde(deny_unknown_fields)]
215pub struct ObjectStoreDirectoryServiceConfig {
216 object_store_url: String,
217 #[serde(default)]
218 object_store_options: HashMap<String, String>,
219}
220
221impl TryFrom<url::Url> for ObjectStoreDirectoryServiceConfig {
222 type Error = Box<dyn std::error::Error + Send + Sync>;
223 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
224 let trimmed_url = {
227 let s = url.to_string();
228 let mut url = Url::parse(s.strip_prefix("objectstore+").ok_or(Error::WrongConfig(
229 "Missing objectstore+ part in URI scheme",
230 ))?)?;
231 url.set_query(None);
233 url
234 };
235 Ok(ObjectStoreDirectoryServiceConfig {
236 object_store_url: trimmed_url.into(),
237 object_store_options: url
238 .query_pairs()
239 .into_iter()
240 .map(|(k, v)| (k.to_string(), v.to_string()))
241 .collect(),
242 })
243 }
244}
245
246#[async_trait]
247impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
248 type Output = dyn DirectoryService;
249 async fn build<'a>(
250 &'a self,
251 instance_name: &str,
252 _context: &CompositionContext,
253 ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync>> {
254 let opts = {
255 let mut opts: HashMap<&str, _> = self
256 .object_store_options
257 .iter()
258 .map(|(k, v)| (k.as_str(), v.as_str()))
259 .collect();
260
261 if let hash_map::Entry::Vacant(e) =
262 opts.entry(object_store::ClientConfigKey::UserAgent.as_ref())
263 {
264 e.insert(crate::USER_AGENT);
265 }
266
267 opts
268 };
269
270 let (object_store, path) =
271 object_store::parse_url_opts(&self.object_store_url.parse()?, opts)?;
272 Ok(Arc::new(ObjectStoreDirectoryService::new(
273 instance_name.to_string(),
274 Arc::new(object_store),
275 path,
276 )))
277 }
278}
279
280struct ObjectStoreDirectoryPutter<'a> {
281 object_store: Arc<dyn ObjectStore>,
282 base_path: &'a Path,
283
284 builder: Option<DirectoryGraphBuilder>,
285}
286
287impl<'a> ObjectStoreDirectoryPutter<'a> {
288 fn new(object_store: Arc<dyn ObjectStore>, base_path: &'a Path) -> Self {
289 Self {
290 object_store,
291 base_path,
292 builder: Some(DirectoryGraphBuilder::new_leaves_to_root()),
293 }
294 }
295}
296
297#[async_trait]
298impl DirectoryPutter for ObjectStoreDirectoryPutter<'_> {
299 #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
300 async fn put(&mut self, directory: Directory) -> Result<(), super::Error> {
301 let builder = self
302 .builder
303 .as_mut()
304 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
305
306 builder.try_insert(directory)?;
307
308 Ok(())
309 }
310
311 #[instrument(level = "trace", skip_all, ret, err)]
312 async fn close(&mut self) -> Result<B3Digest, super::Error> {
313 let builder = self
314 .builder
315 .take()
316 .ok_or_else(|| Error::DirectoryPutterAlreadyClosed)?;
317
318 let directory_graph = builder.build()?;
320 let root_digest = directory_graph.root().digest();
321
322 let dir_path = derive_dirs_path(self.base_path, &root_digest);
323
324 match self.object_store.head(&dir_path).await {
325 Ok(_) => {
327 trace!("directory tree already exists");
328 }
329
330 Err(object_store::Error::NotFound { .. }) => {
332 trace!("uploading directory tree");
333
334 let object_store_writer =
335 object_store::buffered::BufWriter::new(self.object_store.clone(), dir_path);
336 let compressed_writer =
337 async_compression::tokio::write::ZstdEncoder::new(object_store_writer);
338 let mut directories_sink = LengthDelimitedCodec::builder()
339 .max_frame_length(MAX_FRAME_LENGTH)
340 .length_field_type::<u32>()
341 .new_write(compressed_writer);
342
343 for directory in directory_graph.drain_root_to_leaves() {
345 directories_sink
346 .send(proto::Directory::from(directory).encode_to_vec().into())
347 .await?;
348 }
349
350 let mut compressed_writer = directories_sink.into_inner();
351 compressed_writer.shutdown().await?;
352 }
353 Err(err) => Err(Error::ObjectStore(err))?,
355 }
356
357 Ok(root_digest)
358 }
359}