1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9 InternalTableDefinition, Page, PageHint, PageNumber, SerializedSavepoint, TableTree,
10 TableTreeMut, TableType, TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
11};
12use crate::types::{Key, Value};
13use crate::{
14 AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15 ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16 TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17 UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28use std::ops::RangeFull;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35 SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37 SystemTableDefinition::new("persistent_savepoints");
38pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
41pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
42
43#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
44pub(crate) enum AllocatorStateKey {
45 Region(u32),
46 RegionTracker,
47 TransactionId,
48}
49
50impl Value for AllocatorStateKey {
51 type SelfType<'a> = Self;
52 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
53
54 fn fixed_width() -> Option<usize> {
55 Some(1 + size_of::<u32>())
56 }
57
58 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
59 where
60 Self: 'a,
61 {
62 match data[0] {
63 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
64 1 => Self::RegionTracker,
65 2 => Self::TransactionId,
66 _ => unreachable!(),
67 }
68 }
69
70 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
71 where
72 Self: 'a,
73 Self: 'b,
74 {
75 let mut result = Self::AsBytes::default();
76 match value {
77 Self::Region(region) => {
78 result[0] = 0;
79 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
80 }
81 Self::RegionTracker => {
82 result[0] = 1;
83 }
84 Self::TransactionId => {
85 result[0] = 2;
86 }
87 }
88
89 result
90 }
91
92 fn type_name() -> TypeName {
93 TypeName::internal("redb::AllocatorStateKey")
94 }
95}
96
97impl Key for AllocatorStateKey {
98 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
99 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
100 }
101}
102
103pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
104 name: &'a str,
105 _key_type: PhantomData<K>,
106 _value_type: PhantomData<V>,
107}
108
109impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
110 pub const fn new(name: &'a str) -> Self {
111 assert!(!name.is_empty());
112 Self {
113 name,
114 _key_type: PhantomData,
115 _value_type: PhantomData,
116 }
117 }
118}
119
120impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'a, K, V> {
121 fn name(&self) -> &str {
122 self.name
123 }
124}
125
126impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
127
128impl<'a, K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'a, K, V> {
129 fn clone(&self) -> Self {
130 *self
131 }
132}
133
134impl<'a, K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'a, K, V> {}
135
136impl<'a, K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'a, K, V> {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 write!(
139 f,
140 "{}<{}, {}>",
141 self.name,
142 K::type_name().name(),
143 V::type_name().name()
144 )
145 }
146}
147
148#[derive(Debug)]
150pub struct DatabaseStats {
151 pub(crate) tree_height: u32,
152 pub(crate) allocated_pages: u64,
153 pub(crate) leaf_pages: u64,
154 pub(crate) branch_pages: u64,
155 pub(crate) stored_leaf_bytes: u64,
156 pub(crate) metadata_bytes: u64,
157 pub(crate) fragmented_bytes: u64,
158 pub(crate) page_size: usize,
159}
160
161impl DatabaseStats {
162 pub fn tree_height(&self) -> u32 {
164 self.tree_height
165 }
166
167 pub fn allocated_pages(&self) -> u64 {
169 self.allocated_pages
170 }
171
172 pub fn leaf_pages(&self) -> u64 {
174 self.leaf_pages
175 }
176
177 pub fn branch_pages(&self) -> u64 {
179 self.branch_pages
180 }
181
182 pub fn stored_bytes(&self) -> u64 {
185 self.stored_leaf_bytes
186 }
187
188 pub fn metadata_bytes(&self) -> u64 {
190 self.metadata_bytes
191 }
192
193 pub fn fragmented_bytes(&self) -> u64 {
195 self.fragmented_bytes
196 }
197
198 pub fn page_size(&self) -> usize {
200 self.page_size
201 }
202}
203
204#[derive(Copy, Clone, Debug)]
205#[non_exhaustive]
206pub enum Durability {
207 None,
213 Eventual,
216 Immediate,
219 #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
222 Paranoid,
223}
224
225#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228enum InternalDurability {
229 None,
230 Eventual,
231 Immediate,
232}
233
234pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
236 name: String,
237 namespace: &'s mut SystemNamespace<'db>,
238 tree: BtreeMut<'s, K, V>,
239 transaction_guard: Arc<TransactionGuard>,
240}
241
242impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
243 fn new(
244 name: &str,
245 table_root: Option<BtreeHeader>,
246 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
247 guard: Arc<TransactionGuard>,
248 mem: Arc<TransactionalMemory>,
249 namespace: &'s mut SystemNamespace<'db>,
250 ) -> SystemTable<'db, 's, K, V> {
251 SystemTable {
252 name: name.to_string(),
253 namespace,
254 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
255 transaction_guard: guard,
256 }
257 }
258
259 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
260 where
261 K: 'a,
262 {
263 self.tree.get(key.borrow())
264 }
265
266 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
267 where
268 K: 'a,
269 KR: Borrow<K::SelfType<'a>> + 'a,
270 {
271 self.tree
272 .range(&range)
273 .map(|x| Range::new(x, self.transaction_guard.clone()))
274 }
275
276 pub fn insert<'k, 'v>(
277 &mut self,
278 key: impl Borrow<K::SelfType<'k>>,
279 value: impl Borrow<V::SelfType<'v>>,
280 ) -> Result<Option<AccessGuard<V>>> {
281 let value_len = V::as_bytes(value.borrow()).as_ref().len();
282 if value_len > MAX_VALUE_LENGTH {
283 return Err(StorageError::ValueTooLarge(value_len));
284 }
285 let key_len = K::as_bytes(key.borrow()).as_ref().len();
286 if key_len > MAX_VALUE_LENGTH {
287 return Err(StorageError::ValueTooLarge(key_len));
288 }
289 if value_len + key_len > MAX_PAIR_LENGTH {
290 return Err(StorageError::ValueTooLarge(value_len + key_len));
291 }
292 self.tree.insert(key.borrow(), value.borrow())
293 }
294
295 pub fn remove<'a>(
296 &mut self,
297 key: impl Borrow<K::SelfType<'a>>,
298 ) -> Result<Option<AccessGuard<V>>>
299 where
300 K: 'a,
301 {
302 self.tree.remove(key.borrow())
303 }
304}
305
306impl<'db, 's, K: Key + 'static, V: Value + 'static> Drop for SystemTable<'db, 's, K, V> {
307 fn drop(&mut self) {
308 self.namespace.close_table(
309 &self.name,
310 &self.tree,
311 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
312 );
313 }
314}
315
316struct SystemNamespace<'db> {
317 table_tree: TableTreeMut<'db>,
318 transaction_guard: Arc<TransactionGuard>,
319}
320
321impl<'db> SystemNamespace<'db> {
322 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
323 &'s mut self,
324 transaction: &'txn WriteTransaction,
325 definition: SystemTableDefinition<K, V>,
326 ) -> Result<SystemTable<'db, 's, K, V>> {
327 #[cfg(feature = "logging")]
328 debug!("Opening system table: {}", definition);
329 let (root, _) = self
330 .table_tree
331 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
332 .map_err(|e| {
333 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
334 })?;
335 transaction.dirty.store(true, Ordering::Release);
336
337 Ok(SystemTable::new(
338 definition.name(),
339 root,
340 transaction.freed_pages.clone(),
341 self.transaction_guard.clone(),
342 transaction.mem.clone(),
343 self,
344 ))
345 }
346
347 fn close_table<K: Key + 'static, V: Value + 'static>(
348 &mut self,
349 name: &str,
350 table: &BtreeMut<K, V>,
351 length: u64,
352 ) {
353 self.table_tree
354 .stage_update_table_root(name, table.get_root(), length);
355 }
356}
357
358struct TableNamespace<'db> {
359 open_tables: HashMap<String, &'static panic::Location<'static>>,
360 table_tree: TableTreeMut<'db>,
361}
362
363impl<'db> TableNamespace<'db> {
364 #[track_caller]
365 fn inner_open<K: Key + 'static, V: Value + 'static>(
366 &mut self,
367 name: &str,
368 table_type: TableType,
369 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
370 if let Some(location) = self.open_tables.get(name) {
371 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372 }
373
374 let root = self
375 .table_tree
376 .get_or_create_table::<K, V>(name, table_type)?;
377 self.open_tables
378 .insert(name.to_string(), panic::Location::caller());
379
380 Ok(root)
381 }
382
383 #[track_caller]
384 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
385 &mut self,
386 transaction: &'txn WriteTransaction,
387 definition: MultimapTableDefinition<K, V>,
388 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
389 #[cfg(feature = "logging")]
390 debug!("Opening multimap table: {}", definition);
391 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
392 transaction.dirty.store(true, Ordering::Release);
393
394 Ok(MultimapTable::new(
395 definition.name(),
396 root,
397 length,
398 transaction.freed_pages.clone(),
399 transaction.mem.clone(),
400 transaction,
401 ))
402 }
403
404 #[track_caller]
405 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
406 &mut self,
407 transaction: &'txn WriteTransaction,
408 definition: TableDefinition<K, V>,
409 ) -> Result<Table<'txn, K, V>, TableError> {
410 #[cfg(feature = "logging")]
411 debug!("Opening table: {}", definition);
412 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
413 transaction.dirty.store(true, Ordering::Release);
414
415 Ok(Table::new(
416 definition.name(),
417 root,
418 transaction.freed_pages.clone(),
419 transaction.mem.clone(),
420 transaction,
421 ))
422 }
423
424 #[track_caller]
425 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
426 if let Some(location) = self.open_tables.get(name) {
427 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
428 }
429
430 self.table_tree.delete_table(name, table_type)
431 }
432
433 #[track_caller]
434 fn delete_table(
435 &mut self,
436 transaction: &WriteTransaction,
437 name: &str,
438 ) -> Result<bool, TableError> {
439 #[cfg(feature = "logging")]
440 debug!("Deleting table: {}", name);
441 transaction.dirty.store(true, Ordering::Release);
442 self.inner_delete(name, TableType::Normal)
443 }
444
445 #[track_caller]
446 fn delete_multimap_table(
447 &mut self,
448 transaction: &WriteTransaction,
449 name: &str,
450 ) -> Result<bool, TableError> {
451 #[cfg(feature = "logging")]
452 debug!("Deleting multimap table: {}", name);
453 transaction.dirty.store(true, Ordering::Release);
454 self.inner_delete(name, TableType::Multimap)
455 }
456
457 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
458 &mut self,
459 name: &str,
460 table: &BtreeMut<K, V>,
461 length: u64,
462 ) {
463 self.open_tables.remove(name).unwrap();
464 self.table_tree
465 .stage_update_table_root(name, table.get_root(), length);
466 }
467}
468
469pub struct WriteTransaction {
473 transaction_tracker: Arc<TransactionTracker>,
474 mem: Arc<TransactionalMemory>,
475 transaction_guard: Arc<TransactionGuard>,
476 transaction_id: TransactionId,
477 freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
480 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
481 post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
484 tables: Mutex<TableNamespace<'static>>,
485 system_tables: Mutex<SystemNamespace<'static>>,
486 completed: bool,
487 dirty: AtomicBool,
488 durability: InternalDurability,
489 two_phase_commit: bool,
490 quick_repair: bool,
491 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
493 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
494}
495
496impl WriteTransaction {
497 pub(crate) fn new(
498 guard: TransactionGuard,
499 transaction_tracker: Arc<TransactionTracker>,
500 mem: Arc<TransactionalMemory>,
501 ) -> Result<Self> {
502 let transaction_id = guard.id();
503 let guard = Arc::new(guard);
504
505 let root_page = mem.get_data_root();
506 let system_page = mem.get_system_root();
507 let freed_root = mem.get_freed_root();
508 let freed_pages = Arc::new(Mutex::new(vec![]));
509 let post_commit_frees = Arc::new(Mutex::new(vec![]));
510
511 let tables = TableNamespace {
512 open_tables: Default::default(),
513 table_tree: TableTreeMut::new(
514 root_page,
515 guard.clone(),
516 mem.clone(),
517 freed_pages.clone(),
518 ),
519 };
520 let system_tables = SystemNamespace {
521 table_tree: TableTreeMut::new(
522 system_page,
523 guard.clone(),
524 mem.clone(),
525 freed_pages.clone(),
526 ),
527 transaction_guard: guard.clone(),
528 };
529
530 Ok(Self {
531 transaction_tracker,
532 mem: mem.clone(),
533 transaction_guard: guard.clone(),
534 transaction_id,
535 tables: Mutex::new(tables),
536 system_tables: Mutex::new(system_tables),
537 freed_tree: Mutex::new(BtreeMut::new(
538 freed_root,
539 guard,
540 mem,
541 post_commit_frees.clone(),
542 )),
543 freed_pages,
544 post_commit_frees,
545 completed: false,
546 dirty: AtomicBool::new(false),
547 durability: InternalDurability::Immediate,
548 two_phase_commit: false,
549 quick_repair: false,
550 created_persistent_savepoints: Mutex::new(Default::default()),
551 deleted_persistent_savepoints: Mutex::new(vec![]),
552 })
553 }
554
555 #[cfg(any(test, fuzzing))]
556 pub fn print_allocated_page_debug(&self) {
557 let mut all_allocated: HashSet<PageNumber> =
558 HashSet::from_iter(self.mem.all_allocated_pages());
559
560 let tracker = self.mem.tracker_page();
561 all_allocated.remove(&tracker);
562 println!("Tracker page");
563 println!("{tracker:?}");
564
565 let mut table_pages = vec![];
566 self.tables
567 .lock()
568 .unwrap()
569 .table_tree
570 .visit_all_pages(|path| {
571 table_pages.push(path.page_number());
572 Ok(())
573 })
574 .unwrap();
575 println!("Tables");
576 for p in table_pages {
577 all_allocated.remove(&p);
578 println!("{p:?}");
579 }
580
581 let mut system_table_pages = vec![];
582 self.system_tables
583 .lock()
584 .unwrap()
585 .table_tree
586 .visit_all_pages(|path| {
587 system_table_pages.push(path.page_number());
588 Ok(())
589 })
590 .unwrap();
591 println!("System tables");
592 for p in system_table_pages {
593 all_allocated.remove(&p);
594 println!("{p:?}");
595 }
596
597 println!("Free table");
598 if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
599 for p in freed_iter {
600 let p = p.unwrap();
601 all_allocated.remove(&p);
602 println!("{p:?}");
603 }
604 }
605 println!("Pending free (i.e. in freed table)");
606 for entry in self
607 .freed_tree
608 .lock()
609 .unwrap()
610 .range::<RangeFull, FreedTableKey>(&(..))
611 .unwrap()
612 {
613 let entry = entry.unwrap();
614 let value = entry.value();
615 for i in 0..value.len() {
616 let p = value.get(i);
617 all_allocated.remove(&p);
618 println!("{p:?}");
619 }
620 }
621 {
622 let pages = self.freed_pages.lock().unwrap();
623 if !pages.is_empty() {
624 println!("Pages in in-memory freed_pages");
625 for p in pages.iter() {
626 println!("{p:?}");
627 all_allocated.remove(p);
628 }
629 }
630 }
631 {
632 let pages = self.post_commit_frees.lock().unwrap();
633 if !pages.is_empty() {
634 println!("Pages in in-memory post_commit_frees");
635 for p in pages.iter() {
636 println!("{p:?}");
637 all_allocated.remove(p);
638 }
639 }
640 }
641 if !all_allocated.is_empty() {
642 println!("Leaked pages");
643 for p in all_allocated {
644 println!("{p:?}");
645 }
646 }
647 }
648
649 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
658 if self.durability != InternalDurability::Immediate {
659 return Err(SavepointError::InvalidSavepoint);
660 }
661
662 let mut savepoint = self.ephemeral_savepoint()?;
663
664 let mut system_tables = self.system_tables.lock().unwrap();
665
666 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
667 next_table.insert((), savepoint.get_id().next())?;
668 drop(next_table);
669
670 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
671 savepoint_table.insert(
672 savepoint.get_id(),
673 SerializedSavepoint::from_savepoint(&savepoint),
674 )?;
675
676 savepoint.set_persistent();
677
678 self.created_persistent_savepoints
679 .lock()
680 .unwrap()
681 .insert(savepoint.get_id());
682
683 Ok(savepoint.get_id().0)
684 }
685
686 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
687 self.transaction_guard.clone()
688 }
689
690 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
691 let mut system_tables = self.system_tables.lock().unwrap();
692 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
693 let value = next_table.get(())?;
694 if let Some(next_id) = value {
695 Ok(Some(next_id.value()))
696 } else {
697 Ok(None)
698 }
699 }
700
701 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
703 let mut system_tables = self.system_tables.lock().unwrap();
704 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
705 let value = table.get(SavepointId(id))?;
706
707 value
708 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
709 .ok_or(SavepointError::InvalidSavepoint)
710 }
711
712 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
719 if self.durability != InternalDurability::Immediate {
720 return Err(SavepointError::InvalidSavepoint);
721 }
722 let mut system_tables = self.system_tables.lock().unwrap();
723 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
724 let savepoint = table.remove(SavepointId(id))?;
725 if let Some(serialized) = savepoint {
726 let savepoint = serialized
727 .value()
728 .to_savepoint(self.transaction_tracker.clone());
729 self.deleted_persistent_savepoints
730 .lock()
731 .unwrap()
732 .push((savepoint.get_id(), savepoint.get_transaction_id()));
733 Ok(true)
734 } else {
735 Ok(false)
736 }
737 }
738
739 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
741 let mut system_tables = self.system_tables.lock().unwrap();
742 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
743 let mut savepoints = vec![];
744 for savepoint in table.range::<SavepointId>(..)? {
745 savepoints.push(savepoint?.0.value().0);
746 }
747 Ok(savepoints.into_iter())
748 }
749
750 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
752 let id = self
753 .transaction_tracker
754 .register_read_transaction(&self.mem)?;
755
756 Ok(TransactionGuard::new_read(
757 id,
758 self.transaction_tracker.clone(),
759 ))
760 }
761
762 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
763 let id = self.transaction_tracker.allocate_savepoint();
764 Ok((id, self.allocate_read_transaction()?.leak()))
765 }
766
767 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
773 if self.dirty.load(Ordering::Acquire) {
774 return Err(SavepointError::InvalidSavepoint);
775 }
776
777 let (id, transaction_id) = self.allocate_savepoint()?;
778 #[cfg(feature = "logging")]
779 debug!(
780 "Creating savepoint id={:?}, txn_id={:?}",
781 id, transaction_id
782 );
783
784 let regional_allocators = self.mem.get_raw_allocator_states();
785 let root = self.mem.get_data_root();
786 let system_root = self.mem.get_system_root();
787 let freed_root = self.mem.get_freed_root();
788 let savepoint = Savepoint::new_ephemeral(
789 &self.mem,
790 self.transaction_tracker.clone(),
791 id,
792 transaction_id,
793 root,
794 system_root,
795 freed_root,
796 regional_allocators,
797 );
798
799 Ok(savepoint)
800 }
801
802 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
806 assert_eq!(
808 std::ptr::from_ref(self.transaction_tracker.as_ref()),
809 savepoint.db_address()
810 );
811
812 if !self
813 .transaction_tracker
814 .is_valid_savepoint(savepoint.get_id())
815 {
816 return Err(SavepointError::InvalidSavepoint);
817 }
818 #[cfg(feature = "logging")]
819 debug!(
820 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
821 savepoint.get_id(),
822 self.transaction_id
823 );
824 assert_eq!(self.mem.get_version(), savepoint.get_version());
827 self.dirty.store(true, Ordering::Release);
828
829 let old_system_tree = TableTree::new(
838 savepoint.get_system_root(),
839 PageHint::None,
840 self.transaction_guard.clone(),
841 self.mem.clone(),
842 )?;
843 let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
844 savepoint.get_freed_root(),
845 PageHint::None,
846 self.transaction_guard.clone(),
847 self.mem.clone(),
848 )?;
849
850 let mut old_system_and_freed_pages = HashSet::new();
854 old_system_tree.visit_all_pages(|path| {
855 old_system_and_freed_pages.insert(path.page_number());
856 Ok(())
857 })?;
858 old_freed_tree.visit_all_pages(|path| {
859 old_system_and_freed_pages.insert(path.page_number());
860 Ok(())
861 })?;
862
863 {
865 self.tables.lock().unwrap().table_tree = TableTreeMut::new(
866 savepoint.get_user_root(),
867 self.transaction_guard.clone(),
868 self.mem.clone(),
869 self.freed_pages.clone(),
870 );
871 }
872
873 let mut txn_id = savepoint.get_transaction_id().next().raw_id();
882 let mut freed_tree = self.freed_tree.lock().unwrap();
883 loop {
884 let lower = FreedTableKey {
885 transaction_id: txn_id,
886 pagination_id: 0,
887 };
888
889 if freed_tree.range(&(lower..))?.next().is_none() {
890 break;
891 }
892 let lower = FreedTableKey {
893 transaction_id: txn_id,
894 pagination_id: 0,
895 };
896 let upper = FreedTableKey {
897 transaction_id: txn_id + 1,
898 pagination_id: 0,
899 };
900
901 let mut pending_pages = vec![];
903 for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
904 let item = entry?;
905 for i in 0..item.value().len() {
906 let p = item.value().get(i);
907 if old_system_and_freed_pages.contains(&p) {
909 pending_pages.push(p);
910 }
911 }
912 }
913
914 let mut pagination_counter = 0u64;
915 while !pending_pages.is_empty() {
916 let chunk_size = 100;
917 let buffer_size = FreedPageList::required_bytes(chunk_size);
918 let key = FreedTableKey {
919 transaction_id: txn_id,
920 pagination_id: pagination_counter,
921 };
922 let mut access_guard =
923 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
924
925 let len = pending_pages.len();
926 access_guard.as_mut().clear();
927 for page in pending_pages.drain(len - min(len, chunk_size)..) {
928 access_guard.as_mut().push_back(page);
929 }
930 drop(access_guard);
931
932 pagination_counter += 1;
933 }
934
935 txn_id += 1;
936 }
937
938 let mut current_system_and_freed_pages = HashSet::new();
939 self.system_tables
940 .lock()
941 .unwrap()
942 .table_tree
943 .visit_all_pages(|path| {
944 current_system_and_freed_pages.insert(path.page_number());
945 Ok(())
946 })?;
947 freed_tree.visit_all_pages(|path| {
948 current_system_and_freed_pages.insert(path.page_number());
949 Ok(())
950 })?;
951
952 let mut old_allocators: Vec<BuddyAllocator> = savepoint
953 .get_regional_allocators()
954 .iter()
955 .map(|data| BuddyAllocator::from_savepoint_state(data))
956 .collect();
957
958 {
960 let oldest_unprocessed_transaction =
961 if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
962 entry?.key().transaction_id
963 } else {
964 self.transaction_id.raw_id()
965 };
966
967 let lookup_key = FreedTableKey {
968 transaction_id: oldest_unprocessed_transaction,
969 pagination_id: 0,
970 };
971
972 for entry in old_freed_tree.range(&(..lookup_key))? {
975 let item = entry?;
976 let pages: FreedPageList = item.value();
977 for i in 0..pages.len() {
978 let page = pages.get(i);
979 assert!(old_allocators[page.region as usize]
980 .is_allocated(page.page_index, page.page_order));
981 old_allocators[page.region as usize].free(page.page_index, page.page_order);
982 }
983 }
984 }
985
986 let mut freed_pages = self.freed_pages.lock().unwrap();
988 let mut already_awaiting_free: HashSet<PageNumber> = freed_pages.iter().copied().collect();
989 already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
990 let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
991 for page in to_free {
992 if already_awaiting_free.contains(&page) {
993 continue;
995 }
996 if current_system_and_freed_pages.contains(&page) {
997 continue;
1001 }
1002 if self.mem.uncommitted(page) {
1003 self.mem.free(page);
1004 } else {
1005 freed_pages.push(page);
1006 }
1007 }
1008 drop(freed_pages);
1009
1010 self.transaction_tracker
1013 .invalidate_savepoints_after(savepoint.get_id());
1014 for persistent_savepoint in self.list_persistent_savepoints()? {
1015 if persistent_savepoint > savepoint.get_id().0 {
1016 self.delete_persistent_savepoint(persistent_savepoint)?;
1017 }
1018 }
1019
1020 Ok(())
1021 }
1022
1023 pub fn set_durability(&mut self, durability: Durability) {
1028 let no_created = self
1029 .created_persistent_savepoints
1030 .lock()
1031 .unwrap()
1032 .is_empty();
1033 let no_deleted = self
1034 .deleted_persistent_savepoints
1035 .lock()
1036 .unwrap()
1037 .is_empty();
1038 assert!(no_created && no_deleted);
1039
1040 self.durability = match durability {
1041 Durability::None => InternalDurability::None,
1042 Durability::Eventual => InternalDurability::Eventual,
1043 Durability::Immediate => InternalDurability::Immediate,
1044 #[allow(deprecated)]
1045 Durability::Paranoid => {
1046 self.set_two_phase_commit(true);
1047 InternalDurability::Immediate
1048 }
1049 };
1050 }
1051
1052 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1092 self.two_phase_commit = enabled;
1093 }
1094
1095 pub fn set_quick_repair(&mut self, enabled: bool) {
1106 self.quick_repair = enabled;
1107 }
1108
1109 #[track_caller]
1113 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1114 &'txn self,
1115 definition: TableDefinition<K, V>,
1116 ) -> Result<Table<'txn, K, V>, TableError> {
1117 self.tables.lock().unwrap().open_table(self, definition)
1118 }
1119
1120 #[track_caller]
1124 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1125 &'txn self,
1126 definition: MultimapTableDefinition<K, V>,
1127 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1128 self.tables
1129 .lock()
1130 .unwrap()
1131 .open_multimap_table(self, definition)
1132 }
1133
1134 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1135 &self,
1136 name: &str,
1137 table: &BtreeMut<K, V>,
1138 length: u64,
1139 ) {
1140 self.tables.lock().unwrap().close_table(name, table, length);
1141 }
1142
1143 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1147 let name = definition.name().to_string();
1148 drop(definition);
1150 self.tables.lock().unwrap().delete_table(self, &name)
1151 }
1152
1153 pub fn delete_multimap_table(
1157 &self,
1158 definition: impl MultimapTableHandle,
1159 ) -> Result<bool, TableError> {
1160 let name = definition.name().to_string();
1161 drop(definition);
1163 self.tables
1164 .lock()
1165 .unwrap()
1166 .delete_multimap_table(self, &name)
1167 }
1168
1169 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1171 self.tables
1172 .lock()
1173 .unwrap()
1174 .table_tree
1175 .list_tables(TableType::Normal)
1176 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1177 }
1178
1179 pub fn list_multimap_tables(
1181 &self,
1182 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1183 self.tables
1184 .lock()
1185 .unwrap()
1186 .table_tree
1187 .list_tables(TableType::Multimap)
1188 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1189 }
1190
1191 pub fn commit(mut self) -> Result<(), CommitError> {
1196 self.completed = true;
1198 self.commit_inner()
1199 }
1200
1201 fn commit_inner(&mut self) -> Result<(), CommitError> {
1202 if self.quick_repair {
1204 self.two_phase_commit = true;
1205 }
1206
1207 #[cfg(feature = "logging")]
1208 debug!(
1209 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1210 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1211 );
1212 match self.durability {
1213 InternalDurability::None => self.non_durable_commit()?,
1214 InternalDurability::Eventual => self.durable_commit(true)?,
1215 InternalDurability::Immediate => self.durable_commit(false)?,
1216 }
1217
1218 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1219 self.transaction_tracker
1220 .deallocate_savepoint(*savepoint, *transaction);
1221 }
1222
1223 #[cfg(feature = "logging")]
1224 debug!(
1225 "Finished commit of transaction id={:?}",
1226 self.transaction_id
1227 );
1228
1229 Ok(())
1230 }
1231
1232 pub fn abort(mut self) -> Result {
1236 self.completed = true;
1238 self.abort_inner()
1239 }
1240
1241 fn abort_inner(&mut self) -> Result {
1242 #[cfg(feature = "logging")]
1243 debug!("Aborting transaction id={:?}", self.transaction_id);
1244 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1245 match self.delete_persistent_savepoint(savepoint.0) {
1246 Ok(_) => {}
1247 Err(err) => match err {
1248 SavepointError::InvalidSavepoint => {
1249 unreachable!();
1250 }
1251 SavepointError::Storage(storage_err) => {
1252 return Err(storage_err);
1253 }
1254 },
1255 }
1256 }
1257 self.tables
1258 .lock()
1259 .unwrap()
1260 .table_tree
1261 .clear_table_root_updates();
1262 self.mem.rollback_uncommitted_writes()?;
1263 #[cfg(feature = "logging")]
1264 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1265 Ok(())
1266 }
1267
1268 pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1269 let free_until_transaction = self
1270 .transaction_tracker
1271 .oldest_live_read_transaction()
1272 .map_or(self.transaction_id, |x| x.next());
1273 self.process_freed_pages(free_until_transaction)?;
1274
1275 let user_root = self
1276 .tables
1277 .lock()
1278 .unwrap()
1279 .table_tree
1280 .flush_table_root_updates()?
1281 .finalize_dirty_checksums()?;
1282
1283 let mut system_tables = self.system_tables.lock().unwrap();
1284 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1285 system_tree
1286 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1287 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1288
1289 if self.quick_repair {
1290 system_tree.create_table_and_flush_table_root(
1291 ALLOCATOR_STATE_TABLE_NAME,
1292 |tree: &mut AllocatorStateTree| {
1293 let mut pagination_counter = 0;
1294
1295 loop {
1296 let num_regions = self
1297 .mem
1298 .reserve_allocator_state(tree, self.transaction_id)?;
1299
1300 self.store_freed_pages(&mut pagination_counter, true)?;
1304
1305 if self.mem.try_save_allocator_state(tree, num_regions)? {
1306 return Ok(());
1307 }
1308
1309 while let Some(guards) = tree.last()? {
1314 let key = guards.0.value();
1315 drop(guards);
1316 tree.remove(&key)?;
1317 }
1318 }
1319 },
1320 )?;
1321 } else {
1322 let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1327 self.store_freed_pages(&mut 0, savepoint_exists)?;
1328 }
1329
1330 let system_root = system_tree.finalize_dirty_checksums()?;
1331
1332 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1334
1335 self.mem.commit(
1336 user_root,
1337 system_root,
1338 freed_root,
1339 self.transaction_id,
1340 eventual,
1341 self.two_phase_commit,
1342 )?;
1343
1344 self.transaction_tracker.clear_pending_non_durable_commits();
1346
1347 for page in self.post_commit_frees.lock().unwrap().drain(..) {
1350 self.mem.free(page);
1351 }
1352
1353 Ok(())
1354 }
1355
1356 pub(crate) fn non_durable_commit(&mut self) -> Result {
1358 let user_root = self
1359 .tables
1360 .lock()
1361 .unwrap()
1362 .table_tree
1363 .flush_table_root_updates()?
1364 .finalize_dirty_checksums()?;
1365
1366 let system_root = self
1367 .system_tables
1368 .lock()
1369 .unwrap()
1370 .table_tree
1371 .flush_table_root_updates()?
1372 .finalize_dirty_checksums()?;
1373
1374 self.store_freed_pages(&mut 0, true)?;
1377
1378 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1380
1381 self.mem
1382 .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1383 self.transaction_tracker
1386 .register_non_durable_commit(self.transaction_id);
1387 Ok(())
1388 }
1389
1390 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1393 let mut progress = false;
1394 if self.mem.relocate_region_tracker()? {
1396 progress = true;
1397 }
1398
1399 let mut highest_pages = BTreeMap::new();
1401 let mut tables = self.tables.lock().unwrap();
1402 let table_tree = &mut tables.table_tree;
1403 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1404 let mut system_tables = self.system_tables.lock().unwrap();
1405 let system_table_tree = &mut system_tables.table_tree;
1406 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1407
1408 let mut relocation_map = HashMap::new();
1410 for path in highest_pages.into_values().rev() {
1411 if relocation_map.contains_key(&path.page_number()) {
1412 continue;
1413 }
1414 let old_page = self.mem.get_page(path.page_number())?;
1415 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1416 let new_page_number = new_page.get_page_number();
1417 new_page.memory_mut()[0] = old_page.memory()[0];
1420 drop(new_page);
1421 if new_page_number < path.page_number() {
1423 relocation_map.insert(path.page_number(), new_page_number);
1424 for parent in path.parents() {
1425 if relocation_map.contains_key(parent) {
1426 continue;
1427 }
1428 let old_parent = self.mem.get_page(*parent)?;
1429 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1430 let new_page_number = new_page.get_page_number();
1431 new_page.memory_mut()[0] = old_parent.memory()[0];
1434 drop(new_page);
1435 relocation_map.insert(*parent, new_page_number);
1436 }
1437 } else {
1438 self.mem.free(new_page_number);
1439 break;
1440 }
1441 }
1442
1443 if !relocation_map.is_empty() {
1444 progress = true;
1445 }
1446
1447 table_tree.relocate_tables(&relocation_map)?;
1448 system_table_tree.relocate_tables(&relocation_map)?;
1449
1450 Ok(progress)
1451 }
1452
1453 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1456 assert_eq!(PageNumber::serialized_size(), 8);
1458 let lookup_key = FreedTableKey {
1459 transaction_id: free_until.raw_id(),
1460 pagination_id: 0,
1461 };
1462
1463 let mut to_remove = vec![];
1464 let mut freed_tree = self.freed_tree.lock().unwrap();
1465 for entry in freed_tree.range(&(..lookup_key))? {
1466 let entry = entry?;
1467 to_remove.push(entry.key());
1468 let value = entry.value();
1469 for i in 0..value.len() {
1470 self.mem.free(value.get(i));
1471 }
1472 }
1473
1474 for key in to_remove {
1476 freed_tree.remove(&key)?;
1477 }
1478
1479 Ok(())
1480 }
1481
1482 fn store_freed_pages(
1483 &self,
1484 pagination_counter: &mut u64,
1485 include_post_commit_free: bool,
1486 ) -> Result {
1487 assert_eq!(PageNumber::serialized_size(), 8); let mut freed_tree = self.freed_tree.lock().unwrap();
1490 if include_post_commit_free {
1491 self.freed_pages
1494 .lock()
1495 .unwrap()
1496 .extend(self.post_commit_frees.lock().unwrap().drain(..));
1497 }
1498 while !self.freed_pages.lock().unwrap().is_empty() {
1499 let chunk_size = 100;
1500 let buffer_size = FreedPageList::required_bytes(chunk_size);
1501 let key = FreedTableKey {
1502 transaction_id: self.transaction_id.raw_id(),
1503 pagination_id: *pagination_counter,
1504 };
1505 let mut access_guard =
1506 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1507
1508 let mut freed_pages = self.freed_pages.lock().unwrap();
1509 let len = freed_pages.len();
1510 access_guard.as_mut().clear();
1511 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1512 access_guard.as_mut().push_back(page);
1513 }
1514 drop(access_guard);
1515
1516 *pagination_counter += 1;
1517
1518 if include_post_commit_free {
1519 freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1522 }
1523 }
1524
1525 Ok(())
1526 }
1527
1528 pub fn stats(&self) -> Result<DatabaseStats> {
1530 let tables = self.tables.lock().unwrap();
1531 let table_tree = &tables.table_tree;
1532 let data_tree_stats = table_tree.stats()?;
1533
1534 let system_tables = self.system_tables.lock().unwrap();
1535 let system_table_tree = &system_tables.table_tree;
1536 let system_tree_stats = system_table_tree.stats()?;
1537
1538 let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1539
1540 let total_metadata_bytes = data_tree_stats.metadata_bytes()
1541 + system_tree_stats.metadata_bytes
1542 + system_tree_stats.stored_leaf_bytes
1543 + freed_tree_stats.metadata_bytes
1544 + freed_tree_stats.stored_leaf_bytes;
1545 let total_fragmented = data_tree_stats.fragmented_bytes()
1546 + system_tree_stats.fragmented_bytes
1547 + freed_tree_stats.fragmented_bytes
1548 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1549
1550 Ok(DatabaseStats {
1551 tree_height: data_tree_stats.tree_height(),
1552 allocated_pages: self.mem.count_allocated_pages()?,
1553 leaf_pages: data_tree_stats.leaf_pages(),
1554 branch_pages: data_tree_stats.branch_pages(),
1555 stored_leaf_bytes: data_tree_stats.stored_bytes(),
1556 metadata_bytes: total_metadata_bytes,
1557 fragmented_bytes: total_fragmented,
1558 page_size: self.mem.get_page_size(),
1559 })
1560 }
1561
1562 #[cfg(any(test, fuzzing))]
1563 pub fn num_region_tracker_pages(&self) -> u64 {
1564 1 << self.mem.tracker_page().page_order
1565 }
1566
1567 #[allow(dead_code)]
1568 pub(crate) fn print_debug(&self) -> Result {
1569 let mut tables = self.tables.lock().unwrap();
1571 if let Some(page) = tables
1572 .table_tree
1573 .flush_table_root_updates()
1574 .unwrap()
1575 .finalize_dirty_checksums()
1576 .unwrap()
1577 {
1578 eprintln!("Master tree:");
1579 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1580 Some(page),
1581 PageHint::None,
1582 self.transaction_guard.clone(),
1583 self.mem.clone(),
1584 )?;
1585 master_tree.print_debug(true)?;
1586 }
1587
1588 Ok(())
1589 }
1590}
1591
1592impl Drop for WriteTransaction {
1593 fn drop(&mut self) {
1594 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1595 #[allow(unused_variables)]
1596 if let Err(error) = self.abort_inner() {
1597 #[cfg(feature = "logging")]
1598 warn!("Failure automatically aborting transaction: {}", error);
1599 }
1600 }
1601 }
1602}
1603
1604pub struct ReadTransaction {
1608 mem: Arc<TransactionalMemory>,
1609 tree: TableTree,
1610}
1611
1612impl ReadTransaction {
1613 pub(crate) fn new(
1614 mem: Arc<TransactionalMemory>,
1615 guard: TransactionGuard,
1616 ) -> Result<Self, TransactionError> {
1617 let root_page = mem.get_data_root();
1618 let guard = Arc::new(guard);
1619 Ok(Self {
1620 mem: mem.clone(),
1621 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1622 .map_err(TransactionError::Storage)?,
1623 })
1624 }
1625
1626 pub fn open_table<K: Key + 'static, V: Value + 'static>(
1628 &self,
1629 definition: TableDefinition<K, V>,
1630 ) -> Result<ReadOnlyTable<K, V>, TableError> {
1631 let header = self
1632 .tree
1633 .get_table::<K, V>(definition.name(), TableType::Normal)?
1634 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1635
1636 match header {
1637 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1638 definition.name().to_string(),
1639 table_root,
1640 PageHint::Clean,
1641 self.tree.transaction_guard().clone(),
1642 self.mem.clone(),
1643 )?),
1644 InternalTableDefinition::Multimap { .. } => unreachable!(),
1645 }
1646 }
1647
1648 pub fn open_untyped_table(
1650 &self,
1651 handle: impl TableHandle,
1652 ) -> Result<ReadOnlyUntypedTable, TableError> {
1653 let header = self
1654 .tree
1655 .get_table_untyped(handle.name(), TableType::Normal)?
1656 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1657
1658 match header {
1659 InternalTableDefinition::Normal {
1660 table_root,
1661 fixed_key_size,
1662 fixed_value_size,
1663 ..
1664 } => Ok(ReadOnlyUntypedTable::new(
1665 table_root,
1666 fixed_key_size,
1667 fixed_value_size,
1668 self.mem.clone(),
1669 )),
1670 InternalTableDefinition::Multimap { .. } => unreachable!(),
1671 }
1672 }
1673
1674 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1676 &self,
1677 definition: MultimapTableDefinition<K, V>,
1678 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1679 let header = self
1680 .tree
1681 .get_table::<K, V>(definition.name(), TableType::Multimap)?
1682 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1683
1684 match header {
1685 InternalTableDefinition::Normal { .. } => unreachable!(),
1686 InternalTableDefinition::Multimap {
1687 table_root,
1688 table_length,
1689 ..
1690 } => Ok(ReadOnlyMultimapTable::new(
1691 table_root,
1692 table_length,
1693 PageHint::Clean,
1694 self.tree.transaction_guard().clone(),
1695 self.mem.clone(),
1696 )?),
1697 }
1698 }
1699
1700 pub fn open_untyped_multimap_table(
1702 &self,
1703 handle: impl MultimapTableHandle,
1704 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1705 let header = self
1706 .tree
1707 .get_table_untyped(handle.name(), TableType::Multimap)?
1708 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1709
1710 match header {
1711 InternalTableDefinition::Normal { .. } => unreachable!(),
1712 InternalTableDefinition::Multimap {
1713 table_root,
1714 table_length,
1715 fixed_key_size,
1716 fixed_value_size,
1717 ..
1718 } => Ok(ReadOnlyUntypedMultimapTable::new(
1719 table_root,
1720 table_length,
1721 fixed_key_size,
1722 fixed_value_size,
1723 self.mem.clone(),
1724 )),
1725 }
1726 }
1727
1728 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1730 self.tree
1731 .list_tables(TableType::Normal)
1732 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1733 }
1734
1735 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1737 self.tree
1738 .list_tables(TableType::Multimap)
1739 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1740 }
1741
1742 pub fn close(self) -> Result<(), TransactionError> {
1750 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1751 return Err(TransactionError::ReadTransactionStillInUse(self));
1752 }
1753 Ok(())
1755 }
1756}
1757
1758impl Debug for ReadTransaction {
1759 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1760 f.write_str("ReadTransaction")
1761 }
1762}
1763
1764#[cfg(test)]
1765mod test {
1766 use crate::{Database, TableDefinition};
1767
1768 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1769
1770 #[test]
1771 fn transaction_id_persistence() {
1772 let tmpfile = crate::create_tempfile();
1773 let db = Database::create(tmpfile.path()).unwrap();
1774 let write_txn = db.begin_write().unwrap();
1775 {
1776 let mut table = write_txn.open_table(X).unwrap();
1777 table.insert("hello", "world").unwrap();
1778 }
1779 let first_txn_id = write_txn.transaction_id;
1780 write_txn.commit().unwrap();
1781 drop(db);
1782
1783 let db2 = Database::create(tmpfile.path()).unwrap();
1784 let write_txn = db2.begin_write().unwrap();
1785 assert!(write_txn.transaction_id > first_txn_id);
1786 }
1787}