1use crate::db::TransactionGuard;
2use crate::error::TableError;
3use crate::multimap_table::{
4 finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums,
5};
6use crate::tree_store::btree::{btree_stats, UntypedBtreeMut};
7use crate::tree_store::btree_base::BtreeHeader;
8use crate::tree_store::{
9 Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath,
10 RawBtree, TableType, TransactionalMemory,
11};
12use crate::types::{Key, MutInPlaceValue, TypeName, Value};
13use crate::{DatabaseStats, Result};
14use std::cmp::max;
15use std::collections::{BTreeMap, HashMap};
16use std::mem::size_of;
17use std::ops::RangeFull;
18use std::sync::{Arc, Mutex};
19
20#[derive(Debug)]
21pub(crate) struct FreedTableKey {
22 pub(crate) transaction_id: u64,
23 pub(crate) pagination_id: u64,
24}
25
26impl Value for FreedTableKey {
27 type SelfType<'a> = FreedTableKey
28 where
29 Self: 'a;
30 type AsBytes<'a> = [u8; 2 * size_of::<u64>()]
31 where
32 Self: 'a;
33
34 fn fixed_width() -> Option<usize> {
35 Some(2 * size_of::<u64>())
36 }
37
38 fn from_bytes<'a>(data: &'a [u8]) -> Self
39 where
40 Self: 'a,
41 {
42 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
43 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
44 Self {
45 transaction_id,
46 pagination_id,
47 }
48 }
49
50 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
51 where
52 Self: 'b,
53 {
54 let mut result = [0u8; 2 * size_of::<u64>()];
55 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
56 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
57 result
58 }
59
60 fn type_name() -> TypeName {
61 TypeName::internal("redb::FreedTableKey")
62 }
63}
64
65impl Key for FreedTableKey {
66 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
67 let value1 = Self::from_bytes(data1);
68 let value2 = Self::from_bytes(data2);
69
70 match value1.transaction_id.cmp(&value2.transaction_id) {
71 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
72 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
73 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
74 }
75 }
76}
77
78#[derive(Debug)]
82pub(crate) struct FreedPageList<'a> {
83 data: &'a [u8],
84}
85
86impl<'a> FreedPageList<'a> {
87 pub(crate) fn required_bytes(len: usize) -> usize {
88 2 + PageNumber::serialized_size() * len
89 }
90
91 pub(crate) fn len(&self) -> usize {
92 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
93 }
94
95 pub(crate) fn get(&self, index: usize) -> PageNumber {
96 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
97 PageNumber::from_le_bytes(
98 self.data[start..(start + PageNumber::serialized_size())]
99 .try_into()
100 .unwrap(),
101 )
102 }
103}
104
105#[derive(Debug)]
106#[repr(transparent)]
107pub(crate) struct FreedPageListMut {
108 data: [u8],
109}
110
111impl FreedPageListMut {
112 pub(crate) fn push_back(&mut self, value: PageNumber) {
113 let len = u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap());
114 self.data[..size_of::<u16>()].copy_from_slice(&(len + 1).to_le_bytes());
115 let len: usize = len.into();
116 let start = size_of::<u16>() + PageNumber::serialized_size() * len;
117 self.data[start..(start + PageNumber::serialized_size())]
118 .copy_from_slice(&value.to_le_bytes());
119 }
120
121 pub(crate) fn clear(&mut self) {
122 self.data[..size_of::<u16>()].fill(0);
123 }
124}
125
126impl Value for FreedPageList<'_> {
127 type SelfType<'a> = FreedPageList<'a>
128 where
129 Self: 'a;
130 type AsBytes<'a> = &'a [u8]
131 where
132 Self: 'a;
133
134 fn fixed_width() -> Option<usize> {
135 None
136 }
137
138 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
139 where
140 Self: 'a,
141 {
142 FreedPageList { data }
143 }
144
145 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
146 where
147 Self: 'b,
148 {
149 value.data
150 }
151
152 fn type_name() -> TypeName {
153 TypeName::internal("redb::FreedPageList")
154 }
155}
156
157impl MutInPlaceValue for FreedPageList<'_> {
158 type BaseRefType = FreedPageListMut;
159
160 fn initialize(data: &mut [u8]) {
161 assert!(data.len() >= 8);
162 data[..8].fill(0);
164 }
165
166 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
167 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut FreedPageListMut) }
168 }
169}
170
171pub struct TableNameIter {
172 inner: BtreeRangeIter<&'static str, InternalTableDefinition>,
173 table_type: TableType,
174}
175
176impl Iterator for TableNameIter {
177 type Item = Result<String>;
178
179 fn next(&mut self) -> Option<Self::Item> {
180 for entry in self.inner.by_ref() {
181 match entry {
182 Ok(entry) => {
183 if entry.value().get_type() == self.table_type {
184 return Some(Ok(entry.key().to_string()));
185 }
186 }
187 Err(err) => {
188 return Some(Err(err));
189 }
190 }
191 }
192 None
193 }
194}
195
196pub(crate) struct TableTree {
197 tree: Btree<&'static str, InternalTableDefinition>,
198 mem: Arc<TransactionalMemory>,
199}
200
201impl TableTree {
202 pub(crate) fn new(
203 master_root: Option<BtreeHeader>,
204 page_hint: PageHint,
205 guard: Arc<TransactionGuard>,
206 mem: Arc<TransactionalMemory>,
207 ) -> Result<Self> {
208 Ok(Self {
209 tree: Btree::new(master_root, page_hint, guard, mem.clone())?,
210 mem,
211 })
212 }
213
214 pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
215 self.tree.transaction_guard()
216 }
217
218 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
220 let iter = self.tree.range::<RangeFull, &str>(&(..))?;
221 let iter = TableNameIter {
222 inner: iter,
223 table_type,
224 };
225 let mut result = vec![];
226 for table in iter {
227 result.push(table?);
228 }
229 Ok(result)
230 }
231
232 pub(crate) fn get_table_untyped(
233 &self,
234 name: &str,
235 table_type: TableType,
236 ) -> Result<Option<InternalTableDefinition>, TableError> {
237 if let Some(guard) = self.tree.get(&name)? {
238 let definition = guard.value();
239 definition.check_match_untyped(table_type, name)?;
240 Ok(Some(definition))
241 } else {
242 Ok(None)
243 }
244 }
245
246 pub(crate) fn get_table<K: Key, V: Value>(
248 &self,
249 name: &str,
250 table_type: TableType,
251 ) -> Result<Option<InternalTableDefinition>, TableError> {
252 Ok(
253 if let Some(definition) = self.get_table_untyped(name, table_type)? {
254 definition.check_match::<K, V>(table_type, name)?;
256 Some(definition)
257 } else {
258 None
259 },
260 )
261 }
262
263 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
264 where
265 F: FnMut(&PagePath) -> Result,
266 {
267 self.tree.visit_all_pages(&mut visitor)?;
269
270 for entry in self.list_tables(TableType::Normal)? {
272 let definition = self
273 .get_table_untyped(&entry, TableType::Normal)
274 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
275 .unwrap();
276 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
277 }
278
279 for entry in self.list_tables(TableType::Multimap)? {
280 let definition = self
281 .get_table_untyped(&entry, TableType::Multimap)
282 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
283 .unwrap();
284 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
285 }
286
287 Ok(())
288 }
289}
290
291pub(crate) struct TableTreeMut<'txn> {
292 tree: BtreeMut<'txn, &'static str, InternalTableDefinition>,
293 guard: Arc<TransactionGuard>,
294 mem: Arc<TransactionalMemory>,
295 pending_table_updates: HashMap<String, (Option<BtreeHeader>, u64)>,
297 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
298}
299
300impl<'txn> TableTreeMut<'txn> {
301 pub(crate) fn new(
302 master_root: Option<BtreeHeader>,
303 guard: Arc<TransactionGuard>,
304 mem: Arc<TransactionalMemory>,
305 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
306 ) -> Self {
307 Self {
308 tree: BtreeMut::new(master_root, guard.clone(), mem.clone(), freed_pages.clone()),
309 guard,
310 mem,
311 pending_table_updates: Default::default(),
312 freed_pages,
313 }
314 }
315
316 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
317 where
318 F: FnMut(&PagePath) -> Result,
319 {
320 self.tree.visit_all_pages(&mut visitor)?;
322
323 for entry in self.list_tables(TableType::Normal)? {
325 let definition = self
326 .get_table_untyped(&entry, TableType::Normal)
327 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
328 .unwrap();
329 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
330 }
331
332 for entry in self.list_tables(TableType::Multimap)? {
333 let definition = self
334 .get_table_untyped(&entry, TableType::Multimap)
335 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
336 .unwrap();
337 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
338 }
339
340 Ok(())
341 }
342
343 pub(crate) fn stage_update_table_root(
345 &mut self,
346 name: &str,
347 table_root: Option<BtreeHeader>,
348 length: u64,
349 ) {
350 self.pending_table_updates
351 .insert(name.to_string(), (table_root, length));
352 }
353
354 pub(crate) fn clear_table_root_updates(&mut self) {
355 self.pending_table_updates.clear();
356 }
357
358 pub(crate) fn verify_checksums(&self) -> Result<bool> {
359 assert!(self.pending_table_updates.is_empty());
360 if !self.tree.verify_checksum()? {
361 return Ok(false);
362 }
363
364 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
365 let entry = entry?;
366 let definition = entry.value();
367 match definition {
368 InternalTableDefinition::Normal {
369 table_root,
370 fixed_key_size,
371 fixed_value_size,
372 ..
373 } => {
374 if let Some(header) = table_root {
375 if !RawBtree::new(
376 Some(header),
377 fixed_key_size,
378 fixed_value_size,
379 self.mem.clone(),
380 )
381 .verify_checksum()?
382 {
383 return Ok(false);
384 }
385 }
386 }
387 InternalTableDefinition::Multimap {
388 table_root,
389 fixed_key_size,
390 fixed_value_size,
391 ..
392 } => {
393 if !verify_tree_and_subtree_checksums(
394 table_root,
395 fixed_key_size,
396 fixed_value_size,
397 self.mem.clone(),
398 )? {
399 return Ok(false);
400 }
401 }
402 }
403 }
404
405 Ok(true)
406 }
407
408 pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> {
409 for (name, (new_root, new_length)) in self.pending_table_updates.drain() {
410 let mut definition = self.tree.get(&name.as_str())?.unwrap().value();
412 match definition {
414 InternalTableDefinition::Normal { table_root, .. }
415 | InternalTableDefinition::Multimap { table_root, .. } => {
416 if table_root == new_root {
417 continue;
418 }
419 }
420 }
421 match definition {
423 InternalTableDefinition::Normal {
424 ref mut table_root,
425 ref mut table_length,
426 fixed_key_size,
427 fixed_value_size,
428 ..
429 } => {
430 let mut tree = UntypedBtreeMut::new(
431 new_root,
432 self.mem.clone(),
433 self.freed_pages.clone(),
434 fixed_key_size,
435 fixed_value_size,
436 );
437 *table_root = tree.finalize_dirty_checksums()?;
438 *table_length = new_length;
439 }
440 InternalTableDefinition::Multimap {
441 ref mut table_root,
442 ref mut table_length,
443 fixed_key_size,
444 fixed_value_size,
445 ..
446 } => {
447 *table_root = finalize_tree_and_subtree_checksums(
448 new_root,
449 fixed_key_size,
450 fixed_value_size,
451 self.mem.clone(),
452 )?;
453 *table_length = new_length;
454 }
455 }
456 self.tree.insert(&name.as_str(), &definition)?;
457 }
458 Ok(self)
459 }
460
461 pub(crate) fn create_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
465 &mut self,
466 name: &str,
467 f: impl FnOnce(&mut BtreeMut<K, V>) -> Result,
468 ) -> Result {
469 assert!(self.pending_table_updates.is_empty());
470 assert!(self.tree.get(&name)?.is_none());
471
472 self.tree.insert(
474 &name,
475 &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
476 )?;
477
478 let mut tree: BtreeMut<K, V> = BtreeMut::new(
480 None,
481 self.guard.clone(),
482 self.mem.clone(),
483 self.freed_pages.clone(),
484 );
485 f(&mut tree)?;
486
487 let table_root = tree.finalize_dirty_checksums()?;
489 let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
490
491 self.tree.insert_inplace(
493 &name,
494 &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
495 )?;
496
497 Ok(())
498 }
499
500 pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
501 self.tree.finalize_dirty_checksums()
502 }
503
504 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
506 let tree = TableTree::new(
507 self.tree.get_root(),
508 PageHint::None,
509 self.guard.clone(),
510 self.mem.clone(),
511 )?;
512 tree.list_tables(table_type)
513 }
514
515 pub(crate) fn get_table_untyped(
516 &self,
517 name: &str,
518 table_type: TableType,
519 ) -> Result<Option<InternalTableDefinition>, TableError> {
520 let tree = TableTree::new(
521 self.tree.get_root(),
522 PageHint::None,
523 self.guard.clone(),
524 self.mem.clone(),
525 )?;
526 let mut result = tree.get_table_untyped(name, table_type);
527
528 if let Ok(Some(definition)) = result.as_mut() {
529 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
530 definition.set_header(*updated_root, *updated_length);
531 }
532 }
533
534 result
535 }
536
537 pub(crate) fn get_table<K: Key, V: Value>(
539 &self,
540 name: &str,
541 table_type: TableType,
542 ) -> Result<Option<InternalTableDefinition>, TableError> {
543 let tree = TableTree::new(
544 self.tree.get_root(),
545 PageHint::None,
546 self.guard.clone(),
547 self.mem.clone(),
548 )?;
549 let mut result = tree.get_table::<K, V>(name, table_type);
550
551 if let Ok(Some(definition)) = result.as_mut() {
552 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
553 definition.set_header(*updated_root, *updated_length);
554 }
555 }
556
557 result
558 }
559
560 pub(crate) fn delete_table(
562 &mut self,
563 name: &str,
564 table_type: TableType,
565 ) -> Result<bool, TableError> {
566 if let Some(definition) = self.get_table_untyped(name, table_type)? {
567 let mut freed_pages = self.freed_pages.lock().unwrap();
568 definition.visit_all_pages(self.mem.clone(), |path| {
569 freed_pages.push(path.page_number());
570 Ok(())
571 })?;
572 drop(freed_pages);
573
574 self.pending_table_updates.remove(name);
575
576 let found = self.tree.remove(&name)?.is_some();
577 return Ok(found);
578 }
579
580 Ok(false)
581 }
582
583 pub(crate) fn get_or_create_table<K: Key, V: Value>(
584 &mut self,
585 name: &str,
586 table_type: TableType,
587 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
588 let table = if let Some(found) = self.get_table::<K, V>(name, table_type)? {
589 found
590 } else {
591 let table = InternalTableDefinition::new::<K, V>(table_type, None, 0);
592 self.tree.insert(&name, &table)?;
593 table
594 };
595
596 match table {
597 InternalTableDefinition::Normal {
598 table_root,
599 table_length,
600 ..
601 }
602 | InternalTableDefinition::Multimap {
603 table_root,
604 table_length,
605 ..
606 } => Ok((table_root, table_length)),
607 }
608 }
609
610 pub(crate) fn highest_index_pages(
613 &self,
614 n: usize,
615 output: &mut BTreeMap<PageNumber, PagePath>,
616 ) -> Result {
617 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
618 let entry = entry?;
619 let mut definition = entry.value();
620 if let Some((updated_root, updated_length)) =
621 self.pending_table_updates.get(entry.key())
622 {
623 definition.set_header(*updated_root, *updated_length);
624 }
625
626 definition.visit_all_pages(self.mem.clone(), |path| {
627 output.insert(path.page_number(), path.clone());
628 while output.len() > n {
629 output.pop_first();
630 }
631 Ok(())
632 })?;
633 }
634
635 self.tree.visit_all_pages(|path| {
636 output.insert(path.page_number(), path.clone());
637 while output.len() > n {
638 output.pop_first();
639 }
640 Ok(())
641 })?;
642
643 Ok(())
644 }
645
646 pub(crate) fn relocate_tables(
647 &mut self,
648 relocation_map: &HashMap<PageNumber, PageNumber>,
649 ) -> Result {
650 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
651 let entry = entry?;
652 let mut definition = entry.value();
653 if let Some((updated_root, updated_length)) =
654 self.pending_table_updates.get(entry.key())
655 {
656 definition.set_header(*updated_root, *updated_length);
657 }
658
659 if let Some(new_root) = definition.relocate_tree(
660 self.mem.clone(),
661 self.freed_pages.clone(),
662 relocation_map,
663 )? {
664 self.pending_table_updates
665 .insert(entry.key().to_string(), (new_root, definition.get_length()));
666 }
667 }
668
669 self.tree.relocate(relocation_map)?;
670
671 Ok(())
672 }
673
674 pub fn stats(&self) -> Result<DatabaseStats> {
675 let master_tree_stats = self.tree.stats()?;
676 let mut max_subtree_height = 0;
677 let mut total_stored_bytes = 0;
678 let mut branch_pages = master_tree_stats.branch_pages + master_tree_stats.leaf_pages;
680 let mut leaf_pages = 0;
681 let mut total_metadata_bytes =
683 master_tree_stats.metadata_bytes + master_tree_stats.stored_leaf_bytes;
684 let mut total_fragmented = master_tree_stats.fragmented_bytes;
685
686 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
687 let entry = entry?;
688 let mut definition = entry.value();
689 if let Some((updated_root, length)) = self.pending_table_updates.get(entry.key()) {
690 definition.set_header(*updated_root, *length);
691 }
692 match definition {
693 InternalTableDefinition::Normal {
694 table_root,
695 fixed_key_size,
696 fixed_value_size,
697 ..
698 } => {
699 let subtree_stats = btree_stats(
700 table_root.map(|x| x.root),
701 &self.mem,
702 fixed_key_size,
703 fixed_value_size,
704 )?;
705 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
706 total_stored_bytes += subtree_stats.stored_leaf_bytes;
707 total_metadata_bytes += subtree_stats.metadata_bytes;
708 total_fragmented += subtree_stats.fragmented_bytes;
709 branch_pages += subtree_stats.branch_pages;
710 leaf_pages += subtree_stats.leaf_pages;
711 }
712 InternalTableDefinition::Multimap {
713 table_root,
714 fixed_key_size,
715 fixed_value_size,
716 ..
717 } => {
718 let subtree_stats = multimap_btree_stats(
719 table_root.map(|x| x.root),
720 &self.mem,
721 fixed_key_size,
722 fixed_value_size,
723 )?;
724 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
725 total_stored_bytes += subtree_stats.stored_leaf_bytes;
726 total_metadata_bytes += subtree_stats.metadata_bytes;
727 total_fragmented += subtree_stats.fragmented_bytes;
728 branch_pages += subtree_stats.branch_pages;
729 leaf_pages += subtree_stats.leaf_pages;
730 }
731 }
732 }
733 Ok(DatabaseStats {
734 tree_height: master_tree_stats.tree_height + max_subtree_height,
735 allocated_pages: self.mem.count_allocated_pages()?,
736 leaf_pages,
737 branch_pages,
738 stored_leaf_bytes: total_stored_bytes,
739 metadata_bytes: total_metadata_bytes,
740 fragmented_bytes: total_fragmented,
741 page_size: self.mem.get_page_size(),
742 })
743 }
744}
745
746#[cfg(test)]
747mod test {
748 use crate::tree_store::table_tree_base::InternalTableDefinition;
749 use crate::types::TypeName;
750 use crate::Value;
751
752 #[test]
753 fn round_trip() {
754 let x = InternalTableDefinition::Multimap {
755 table_root: None,
756 table_length: 0,
757 fixed_key_size: None,
758 fixed_value_size: Some(5),
759 key_alignment: 6,
760 value_alignment: 7,
761 key_type: TypeName::new("test::Key"),
762 value_type: TypeName::new("test::Value"),
763 };
764 let y = InternalTableDefinition::from_bytes(InternalTableDefinition::as_bytes(&x).as_ref());
765 assert_eq!(x, y);
766 }
767}