redb/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
9    PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
10    TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14    AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
15    MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
16    Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
17    TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
18    UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35    SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37    SystemTableDefinition::new("persistent_savepoints");
38// Pages that were allocated in the data tree by a given transaction. Only updated when a savepoint
39// exists
40pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
41    TransactionIdWithPagination,
42    PageList,
43> = SystemTableDefinition::new("data_pages_allocated");
44// Pages in the data tree that are in the pending free state: i.e., they are unreachable from the
45// root as of the given transaction.
46pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
47    SystemTableDefinition::new("data_pages_unreachable");
48// Pages in the system tree that are in the pending free state: i.e., they are unreachable from the
49// root as of the given transaction.
50pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
51    SystemTableDefinition::new("system_pages_unreachable");
52// The allocator state table is stored in the system table tree, but it's accessed using
53// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
54pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
55pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
56pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59// Format:
60// 2 bytes: length
61// length * size_of(PageNumber): array of page numbers
62#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64    data: &'a [u8],
65}
66
67impl PageList<'_> {
68    fn required_bytes(len: usize) -> usize {
69        2 + PageNumber::serialized_size() * len
70    }
71
72    pub(crate) fn len(&self) -> usize {
73        u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74    }
75
76    pub(crate) fn get(&self, index: usize) -> PageNumber {
77        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78        PageNumber::from_le_bytes(
79            self.data[start..(start + PageNumber::serialized_size())]
80                .try_into()
81                .unwrap(),
82        )
83    }
84}
85
86impl Value for PageList<'_> {
87    type SelfType<'a>
88        = PageList<'a>
89    where
90        Self: 'a;
91    type AsBytes<'a>
92        = &'a [u8]
93    where
94        Self: 'a;
95
96    fn fixed_width() -> Option<usize> {
97        None
98    }
99
100    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101    where
102        Self: 'a,
103    {
104        PageList { data }
105    }
106
107    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108    where
109        Self: 'b,
110    {
111        value.data
112    }
113
114    fn type_name() -> TypeName {
115        TypeName::internal("redb::PageList")
116    }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120    type BaseRefType = PageListMut;
121
122    fn initialize(data: &mut [u8]) {
123        assert!(data.len() >= 8);
124        // Set the length to zero
125        data[..8].fill(0);
126    }
127
128    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129        unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130    }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135    pub(crate) transaction_id: u64,
136    pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140    type SelfType<'a>
141        = TransactionIdWithPagination
142    where
143        Self: 'a;
144    type AsBytes<'a>
145        = [u8; 2 * size_of::<u64>()]
146    where
147        Self: 'a;
148
149    fn fixed_width() -> Option<usize> {
150        Some(2 * size_of::<u64>())
151    }
152
153    fn from_bytes<'a>(data: &'a [u8]) -> Self
154    where
155        Self: 'a,
156    {
157        let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158        let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159        Self {
160            transaction_id,
161            pagination_id,
162        }
163    }
164
165    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166    where
167        Self: 'b,
168    {
169        let mut result = [0u8; 2 * size_of::<u64>()];
170        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172        result
173    }
174
175    fn type_name() -> TypeName {
176        TypeName::internal("redb::TransactionIdWithPagination")
177    }
178}
179
180impl Key for TransactionIdWithPagination {
181    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182        let value1 = Self::from_bytes(data1);
183        let value2 = Self::from_bytes(data2);
184
185        match value1.transaction_id.cmp(&value2.transaction_id) {
186            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187            std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189        }
190    }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195    Deprecated,
196    Region(u32),
197    RegionTracker,
198    TransactionId,
199}
200
201impl Value for AllocatorStateKey {
202    type SelfType<'a> = Self;
203    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
204
205    fn fixed_width() -> Option<usize> {
206        Some(1 + size_of::<u32>())
207    }
208
209    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
210    where
211        Self: 'a,
212    {
213        match data[0] {
214            // 0, 1, 2 were used in redb 2.x and have a different format
215            0..=2 => Self::Deprecated,
216            3 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
217            4 => Self::RegionTracker,
218            5 => Self::TransactionId,
219            _ => unreachable!(),
220        }
221    }
222
223    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
224    where
225        Self: 'a,
226        Self: 'b,
227    {
228        let mut result = Self::AsBytes::default();
229        match value {
230            Self::Region(region) => {
231                result[0] = 3;
232                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
233            }
234            Self::RegionTracker => {
235                result[0] = 4;
236            }
237            Self::TransactionId => {
238                result[0] = 5;
239            }
240            AllocatorStateKey::Deprecated => {
241                result[0] = 0;
242            }
243        }
244
245        result
246    }
247
248    fn type_name() -> TypeName {
249        TypeName::internal("redb::AllocatorStateKey")
250    }
251}
252
253impl Key for AllocatorStateKey {
254    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
255        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
256    }
257}
258
259pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
260    name: &'a str,
261    _key_type: PhantomData<K>,
262    _value_type: PhantomData<V>,
263}
264
265impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
266    pub const fn new(name: &'a str) -> Self {
267        assert!(!name.is_empty());
268        Self {
269            name,
270            _key_type: PhantomData,
271            _value_type: PhantomData,
272        }
273    }
274}
275
276impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
277    fn name(&self) -> &str {
278        self.name
279    }
280}
281
282impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
283
284impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
285    fn clone(&self) -> Self {
286        *self
287    }
288}
289
290impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
291
292impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
293    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
294        write!(
295            f,
296            "{}<{}, {}>",
297            self.name,
298            K::type_name().name(),
299            V::type_name().name()
300        )
301    }
302}
303
304/// Informational storage stats about the database
305#[derive(Debug)]
306pub struct DatabaseStats {
307    pub(crate) tree_height: u32,
308    pub(crate) allocated_pages: u64,
309    pub(crate) leaf_pages: u64,
310    pub(crate) branch_pages: u64,
311    pub(crate) stored_leaf_bytes: u64,
312    pub(crate) metadata_bytes: u64,
313    pub(crate) fragmented_bytes: u64,
314    pub(crate) page_size: usize,
315}
316
317impl DatabaseStats {
318    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
319    pub fn tree_height(&self) -> u32 {
320        self.tree_height
321    }
322
323    /// Number of pages allocated
324    pub fn allocated_pages(&self) -> u64 {
325        self.allocated_pages
326    }
327
328    /// Number of leaf pages that store user data
329    pub fn leaf_pages(&self) -> u64 {
330        self.leaf_pages
331    }
332
333    /// Number of branch pages in btrees that store user data
334    pub fn branch_pages(&self) -> u64 {
335        self.branch_pages
336    }
337
338    /// Number of bytes consumed by keys and values that have been inserted.
339    /// Does not include indexing overhead
340    pub fn stored_bytes(&self) -> u64 {
341        self.stored_leaf_bytes
342    }
343
344    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
345    pub fn metadata_bytes(&self) -> u64 {
346        self.metadata_bytes
347    }
348
349    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
350    pub fn fragmented_bytes(&self) -> u64 {
351        self.fragmented_bytes
352    }
353
354    /// Number of bytes per page
355    pub fn page_size(&self) -> usize {
356        self.page_size
357    }
358}
359
360#[derive(Copy, Clone, Debug)]
361#[non_exhaustive]
362pub enum Durability {
363    /// Commits with this durability level will not be persisted to disk unless followed by a
364    /// commit with [`Durability::Immediate`].
365    None,
366    /// Commits with this durability level are guaranteed to be persistent as soon as
367    /// [`WriteTransaction::commit`] returns.
368    Immediate,
369}
370
371// These are the actual durability levels used internally. `Durability::Paranoid` is translated
372// to `InternalDurability::Immediate`, and also enables 2-phase commit
373#[derive(Copy, Clone, Debug, PartialEq, Eq)]
374enum InternalDurability {
375    None,
376    Immediate,
377}
378
379// Like a Table but only one may be open at a time to avoid possible races
380pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
381    name: String,
382    namespace: &'s mut SystemNamespace<'db>,
383    tree: BtreeMut<'s, K, V>,
384    transaction_guard: Arc<TransactionGuard>,
385}
386
387impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
388    fn new(
389        name: &str,
390        table_root: Option<BtreeHeader>,
391        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
392        guard: Arc<TransactionGuard>,
393        mem: Arc<TransactionalMemory>,
394        namespace: &'s mut SystemNamespace<'db>,
395    ) -> SystemTable<'db, 's, K, V> {
396        // No need to track allocations in the system tree. Savepoint restoration only relies on
397        // freeing in the data tree
398        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
399        SystemTable {
400            name: name.to_string(),
401            namespace,
402            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
403            transaction_guard: guard,
404        }
405    }
406
407    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
408    where
409        K: 'a,
410    {
411        self.tree.get(key.borrow())
412    }
413
414    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
415    where
416        K: 'a,
417        KR: Borrow<K::SelfType<'a>> + 'a,
418    {
419        self.tree
420            .range(&range)
421            .map(|x| Range::new(x, self.transaction_guard.clone()))
422    }
423
424    pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
425        &mut self,
426        range: impl RangeBounds<KR> + 'a,
427        predicate: F,
428    ) -> Result<ExtractIf<'_, K, V, F>>
429    where
430        KR: Borrow<K::SelfType<'a>> + 'a,
431    {
432        self.tree
433            .extract_from_if(&range, predicate)
434            .map(ExtractIf::new)
435    }
436
437    pub fn insert<'k, 'v>(
438        &mut self,
439        key: impl Borrow<K::SelfType<'k>>,
440        value: impl Borrow<V::SelfType<'v>>,
441    ) -> Result<Option<AccessGuard<'_, V>>> {
442        let value_len = V::as_bytes(value.borrow()).as_ref().len();
443        if value_len > MAX_VALUE_LENGTH {
444            return Err(StorageError::ValueTooLarge(value_len));
445        }
446        let key_len = K::as_bytes(key.borrow()).as_ref().len();
447        if key_len > MAX_VALUE_LENGTH {
448            return Err(StorageError::ValueTooLarge(key_len));
449        }
450        if value_len + key_len > MAX_PAIR_LENGTH {
451            return Err(StorageError::ValueTooLarge(value_len + key_len));
452        }
453        self.tree.insert(key.borrow(), value.borrow())
454    }
455
456    pub fn remove<'a>(
457        &mut self,
458        key: impl Borrow<K::SelfType<'a>>,
459    ) -> Result<Option<AccessGuard<'_, V>>>
460    where
461        K: 'a,
462    {
463        self.tree.remove(key.borrow())
464    }
465}
466
467impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
468    pub fn insert_reserve<'a>(
469        &mut self,
470        key: impl Borrow<K::SelfType<'a>>,
471        value_length: usize,
472    ) -> Result<AccessGuardMutInPlace<'_, V>> {
473        if value_length > MAX_VALUE_LENGTH {
474            return Err(StorageError::ValueTooLarge(value_length));
475        }
476        let key_len = K::as_bytes(key.borrow()).as_ref().len();
477        if key_len > MAX_VALUE_LENGTH {
478            return Err(StorageError::ValueTooLarge(key_len));
479        }
480        if value_length + key_len > MAX_PAIR_LENGTH {
481            return Err(StorageError::ValueTooLarge(value_length + key_len));
482        }
483        self.tree.insert_reserve(key.borrow(), value_length)
484    }
485}
486
487impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
488    fn drop(&mut self) {
489        self.namespace.close_table(
490            &self.name,
491            &self.tree,
492            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
493        );
494    }
495}
496
497struct SystemNamespace<'db> {
498    table_tree: TableTreeMut<'db>,
499    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
500    transaction_guard: Arc<TransactionGuard>,
501}
502
503impl<'db> SystemNamespace<'db> {
504    fn new(
505        root_page: Option<BtreeHeader>,
506        guard: Arc<TransactionGuard>,
507        mem: Arc<TransactionalMemory>,
508    ) -> Self {
509        // No need to track allocations in the system tree. Savepoint restoration only relies on
510        // freeing in the data tree
511        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
512        let freed_pages = Arc::new(Mutex::new(vec![]));
513        Self {
514            table_tree: TableTreeMut::new(
515                root_page,
516                guard.clone(),
517                mem,
518                freed_pages.clone(),
519                ignore,
520            ),
521            freed_pages,
522            transaction_guard: guard.clone(),
523        }
524    }
525
526    fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
527        self.freed_pages.clone()
528    }
529
530    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
531        &'s mut self,
532        transaction: &'txn WriteTransaction,
533        definition: SystemTableDefinition<K, V>,
534    ) -> Result<SystemTable<'db, 's, K, V>> {
535        let (root, _) = self
536            .table_tree
537            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
538            .map_err(|e| {
539                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
540            })?;
541        transaction.dirty.store(true, Ordering::Release);
542
543        Ok(SystemTable::new(
544            definition.name(),
545            root,
546            self.freed_pages.clone(),
547            self.transaction_guard.clone(),
548            transaction.mem.clone(),
549            self,
550        ))
551    }
552
553    fn close_table<K: Key + 'static, V: Value + 'static>(
554        &mut self,
555        name: &str,
556        table: &BtreeMut<K, V>,
557        length: u64,
558    ) {
559        self.table_tree
560            .stage_update_table_root(name, table.get_root(), length);
561    }
562}
563
564struct TableNamespace<'db> {
565    open_tables: HashMap<String, &'static panic::Location<'static>>,
566    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
567    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
568    table_tree: TableTreeMut<'db>,
569}
570
571impl TableNamespace<'_> {
572    fn new(
573        root_page: Option<BtreeHeader>,
574        guard: Arc<TransactionGuard>,
575        mem: Arc<TransactionalMemory>,
576    ) -> Self {
577        let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
578        let freed_pages = Arc::new(Mutex::new(vec![]));
579        let table_tree = TableTreeMut::new(
580            root_page,
581            guard,
582            mem,
583            // Committed pages which are no longer reachable and will be queued for free'ing
584            // These are separated from the system freed pages
585            freed_pages.clone(),
586            allocated.clone(),
587        );
588        Self {
589            open_tables: Default::default(),
590            table_tree,
591            freed_pages,
592            allocated_pages: allocated,
593        }
594    }
595
596    fn set_dirty(&mut self, transaction: &WriteTransaction) {
597        transaction.dirty.store(true, Ordering::Release);
598        if !transaction.transaction_tracker.any_savepoint_exists() {
599            // No savepoints exist, and we don't allow savepoints to be created in a dirty transaction
600            // so we can disable allocation tracking now
601            *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
602        }
603    }
604
605    fn set_root(&mut self, root: Option<BtreeHeader>) {
606        assert!(self.open_tables.is_empty());
607        self.table_tree.set_root(root);
608    }
609
610    #[track_caller]
611    fn inner_open<K: Key + 'static, V: Value + 'static>(
612        &mut self,
613        name: &str,
614        table_type: TableType,
615    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
616        if let Some(location) = self.open_tables.get(name) {
617            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
618        }
619
620        let root = self
621            .table_tree
622            .get_or_create_table::<K, V>(name, table_type)?;
623        self.open_tables
624            .insert(name.to_string(), panic::Location::caller());
625
626        Ok(root)
627    }
628
629    #[track_caller]
630    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
631        &mut self,
632        transaction: &'txn WriteTransaction,
633        definition: MultimapTableDefinition<K, V>,
634    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
635        #[cfg(feature = "logging")]
636        debug!("Opening multimap table: {definition}");
637        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
638        self.set_dirty(transaction);
639
640        Ok(MultimapTable::new(
641            definition.name(),
642            root,
643            length,
644            self.freed_pages.clone(),
645            self.allocated_pages.clone(),
646            transaction.mem.clone(),
647            transaction,
648        ))
649    }
650
651    #[track_caller]
652    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
653        &mut self,
654        transaction: &'txn WriteTransaction,
655        definition: TableDefinition<K, V>,
656    ) -> Result<Table<'txn, K, V>, TableError> {
657        #[cfg(feature = "logging")]
658        debug!("Opening table: {definition}");
659        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
660        self.set_dirty(transaction);
661
662        Ok(Table::new(
663            definition.name(),
664            root,
665            self.freed_pages.clone(),
666            self.allocated_pages.clone(),
667            transaction.mem.clone(),
668            transaction,
669        ))
670    }
671
672    #[track_caller]
673    fn inner_rename(
674        &mut self,
675        name: &str,
676        new_name: &str,
677        table_type: TableType,
678    ) -> Result<(), TableError> {
679        if let Some(location) = self.open_tables.get(name) {
680            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
681        }
682
683        self.table_tree.rename_table(name, new_name, table_type)
684    }
685
686    #[track_caller]
687    fn rename_table(
688        &mut self,
689        transaction: &WriteTransaction,
690        name: &str,
691        new_name: &str,
692    ) -> Result<(), TableError> {
693        #[cfg(feature = "logging")]
694        debug!("Renaming table: {name} to {new_name}");
695        self.set_dirty(transaction);
696        self.inner_rename(name, new_name, TableType::Normal)
697    }
698
699    #[track_caller]
700    fn rename_multimap_table(
701        &mut self,
702        transaction: &WriteTransaction,
703        name: &str,
704        new_name: &str,
705    ) -> Result<(), TableError> {
706        #[cfg(feature = "logging")]
707        debug!("Renaming multimap table: {name} to {new_name}");
708        self.set_dirty(transaction);
709        self.inner_rename(name, new_name, TableType::Multimap)
710    }
711
712    #[track_caller]
713    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
714        if let Some(location) = self.open_tables.get(name) {
715            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
716        }
717
718        self.table_tree.delete_table(name, table_type)
719    }
720
721    #[track_caller]
722    fn delete_table(
723        &mut self,
724        transaction: &WriteTransaction,
725        name: &str,
726    ) -> Result<bool, TableError> {
727        #[cfg(feature = "logging")]
728        debug!("Deleting table: {name}");
729        self.set_dirty(transaction);
730        self.inner_delete(name, TableType::Normal)
731    }
732
733    #[track_caller]
734    fn delete_multimap_table(
735        &mut self,
736        transaction: &WriteTransaction,
737        name: &str,
738    ) -> Result<bool, TableError> {
739        #[cfg(feature = "logging")]
740        debug!("Deleting multimap table: {name}");
741        self.set_dirty(transaction);
742        self.inner_delete(name, TableType::Multimap)
743    }
744
745    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
746        &mut self,
747        name: &str,
748        table: &BtreeMut<K, V>,
749        length: u64,
750    ) {
751        self.open_tables.remove(name).unwrap();
752        self.table_tree
753            .stage_update_table_root(name, table.get_root(), length);
754    }
755}
756
757/// A read/write transaction
758///
759/// Only a single [`WriteTransaction`] may exist at a time
760pub struct WriteTransaction {
761    transaction_tracker: Arc<TransactionTracker>,
762    mem: Arc<TransactionalMemory>,
763    transaction_guard: Arc<TransactionGuard>,
764    transaction_id: TransactionId,
765    tables: Mutex<TableNamespace<'static>>,
766    system_tables: Mutex<SystemNamespace<'static>>,
767    completed: bool,
768    dirty: AtomicBool,
769    durability: InternalDurability,
770    two_phase_commit: bool,
771    shrink_policy: ShrinkPolicy,
772    quick_repair: bool,
773    // Persistent savepoints created during this transaction
774    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
775    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
776}
777
778impl WriteTransaction {
779    pub(crate) fn new(
780        guard: TransactionGuard,
781        transaction_tracker: Arc<TransactionTracker>,
782        mem: Arc<TransactionalMemory>,
783    ) -> Result<Self> {
784        let transaction_id = guard.id();
785        let guard = Arc::new(guard);
786
787        let root_page = mem.get_data_root();
788        let system_page = mem.get_system_root();
789
790        let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
791        let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
792
793        Ok(Self {
794            transaction_tracker,
795            mem: mem.clone(),
796            transaction_guard: guard.clone(),
797            transaction_id,
798            tables: Mutex::new(tables),
799            system_tables: Mutex::new(system_tables),
800            completed: false,
801            dirty: AtomicBool::new(false),
802            durability: InternalDurability::Immediate,
803            two_phase_commit: false,
804            quick_repair: false,
805            shrink_policy: ShrinkPolicy::Default,
806            created_persistent_savepoints: Mutex::new(Default::default()),
807            deleted_persistent_savepoints: Mutex::new(vec![]),
808        })
809    }
810
811    pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
812        self.shrink_policy = shrink_policy;
813    }
814
815    pub(crate) fn pending_free_pages(&self) -> Result<bool> {
816        let mut system_tables = self.system_tables.lock().unwrap();
817        if system_tables
818            .open_system_table(self, DATA_FREED_TABLE)?
819            .tree
820            .get_root()
821            .is_some()
822        {
823            return Ok(true);
824        }
825        if system_tables
826            .open_system_table(self, SYSTEM_FREED_TABLE)?
827            .tree
828            .get_root()
829            .is_some()
830        {
831            return Ok(true);
832        }
833
834        Ok(false)
835    }
836
837    #[cfg(debug_assertions)]
838    pub fn print_allocated_page_debug(&self) {
839        let mut all_allocated: HashSet<PageNumber> =
840            HashSet::from_iter(self.mem.all_allocated_pages());
841
842        self.mem.debug_check_allocator_consistency();
843
844        let mut table_pages = vec![];
845        self.tables
846            .lock()
847            .unwrap()
848            .table_tree
849            .visit_all_pages(|path| {
850                table_pages.push(path.page_number());
851                Ok(())
852            })
853            .unwrap();
854        println!("Tables");
855        for p in table_pages {
856            assert!(all_allocated.remove(&p));
857            println!("{p:?}");
858        }
859
860        let mut system_table_pages = vec![];
861        self.system_tables
862            .lock()
863            .unwrap()
864            .table_tree
865            .visit_all_pages(|path| {
866                system_table_pages.push(path.page_number());
867                Ok(())
868            })
869            .unwrap();
870        println!("System tables");
871        for p in system_table_pages {
872            assert!(all_allocated.remove(&p));
873            println!("{p:?}");
874        }
875
876        {
877            println!("Pending free (in data freed table)");
878            let mut system_tables = self.system_tables.lock().unwrap();
879            let data_freed = system_tables
880                .open_system_table(self, DATA_FREED_TABLE)
881                .unwrap();
882            for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
883                let (_, entry) = entry.unwrap();
884                let value = entry.value();
885                for i in 0..value.len() {
886                    let p = value.get(i);
887                    assert!(all_allocated.remove(&p));
888                    println!("{p:?}");
889                }
890            }
891        }
892        {
893            println!("Pending free (in system freed table)");
894            let mut system_tables = self.system_tables.lock().unwrap();
895            let system_freed = system_tables
896                .open_system_table(self, SYSTEM_FREED_TABLE)
897                .unwrap();
898            for entry in system_freed
899                .range::<TransactionIdWithPagination>(..)
900                .unwrap()
901            {
902                let (_, entry) = entry.unwrap();
903                let value = entry.value();
904                for i in 0..value.len() {
905                    let p = value.get(i);
906                    assert!(all_allocated.remove(&p));
907                    println!("{p:?}");
908                }
909            }
910        }
911        {
912            let tables = self.tables.lock().unwrap();
913            let pages = tables.freed_pages.lock().unwrap();
914            if !pages.is_empty() {
915                println!("Pages in in-memory data freed_pages");
916                for p in pages.iter() {
917                    println!("{p:?}");
918                    assert!(all_allocated.remove(p));
919                }
920            }
921        }
922        {
923            let system_tables = self.system_tables.lock().unwrap();
924            let pages = system_tables.freed_pages.lock().unwrap();
925            if !pages.is_empty() {
926                println!("Pages in in-memory system freed_pages");
927                for p in pages.iter() {
928                    println!("{p:?}");
929                    assert!(all_allocated.remove(p));
930                }
931            }
932        }
933        if !all_allocated.is_empty() {
934            println!("Leaked pages");
935            for p in all_allocated {
936                println!("{p:?}");
937            }
938        }
939    }
940
941    /// Creates a snapshot of the current database state, which can be used to rollback the database.
942    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
943    ///
944    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
945    /// Therefore, the lifetime of a savepoint should be minimized.
946    ///
947    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
948    /// or if the transaction's durability is less than `[Durability::Immediate]`
949    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
950        if self.durability != InternalDurability::Immediate {
951            return Err(SavepointError::InvalidSavepoint);
952        }
953
954        let mut savepoint = self.ephemeral_savepoint()?;
955
956        let mut system_tables = self.system_tables.lock().unwrap();
957
958        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
959        next_table.insert((), savepoint.get_id().next())?;
960        drop(next_table);
961
962        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
963        savepoint_table.insert(
964            savepoint.get_id(),
965            SerializedSavepoint::from_savepoint(&savepoint),
966        )?;
967
968        savepoint.set_persistent();
969
970        self.created_persistent_savepoints
971            .lock()
972            .unwrap()
973            .insert(savepoint.get_id());
974
975        Ok(savepoint.get_id().0)
976    }
977
978    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
979        self.transaction_guard.clone()
980    }
981
982    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
983        let mut system_tables = self.system_tables.lock().unwrap();
984        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
985        let value = next_table.get(())?;
986        if let Some(next_id) = value {
987            Ok(Some(next_id.value()))
988        } else {
989            Ok(None)
990        }
991    }
992
993    /// Get a persistent savepoint given its id
994    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
995        let mut system_tables = self.system_tables.lock().unwrap();
996        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
997        let value = table.get(SavepointId(id))?;
998
999        value
1000            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1001            .ok_or(SavepointError::InvalidSavepoint)
1002    }
1003
1004    /// Delete the given persistent savepoint.
1005    ///
1006    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
1007    ///
1008    /// Returns `true` if the savepoint existed
1009    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
1010    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1011        if self.durability != InternalDurability::Immediate {
1012            return Err(SavepointError::InvalidSavepoint);
1013        }
1014        let mut system_tables = self.system_tables.lock().unwrap();
1015        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1016        let savepoint = table.remove(SavepointId(id))?;
1017        if let Some(serialized) = savepoint {
1018            let savepoint = serialized
1019                .value()
1020                .to_savepoint(self.transaction_tracker.clone());
1021            self.deleted_persistent_savepoints
1022                .lock()
1023                .unwrap()
1024                .push((savepoint.get_id(), savepoint.get_transaction_id()));
1025            Ok(true)
1026        } else {
1027            Ok(false)
1028        }
1029    }
1030
1031    /// List all persistent savepoints
1032    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1033        let mut system_tables = self.system_tables.lock().unwrap();
1034        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1035        let mut savepoints = vec![];
1036        for savepoint in table.range::<SavepointId>(..)? {
1037            savepoints.push(savepoint?.0.value().0);
1038        }
1039        Ok(savepoints.into_iter())
1040    }
1041
1042    // TODO: deduplicate this with the one in Database
1043    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1044        let id = self
1045            .transaction_tracker
1046            .register_read_transaction(&self.mem)?;
1047
1048        Ok(TransactionGuard::new_read(
1049            id,
1050            self.transaction_tracker.clone(),
1051        ))
1052    }
1053
1054    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1055        let transaction_id = self.allocate_read_transaction()?.leak();
1056        let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1057        Ok((id, transaction_id))
1058    }
1059
1060    /// Creates a snapshot of the current database state, which can be used to rollback the database
1061    ///
1062    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
1063    ///
1064    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1065    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1066        if self.dirty.load(Ordering::Acquire) {
1067            return Err(SavepointError::InvalidSavepoint);
1068        }
1069
1070        let (id, transaction_id) = self.allocate_savepoint()?;
1071        #[cfg(feature = "logging")]
1072        debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1073
1074        let root = self.mem.get_data_root();
1075        let savepoint = Savepoint::new_ephemeral(
1076            &self.mem,
1077            self.transaction_tracker.clone(),
1078            id,
1079            transaction_id,
1080            root,
1081        );
1082
1083        Ok(savepoint)
1084    }
1085
1086    /// Restore the state of the database to the given [`Savepoint`]
1087    ///
1088    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
1089    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1090        // Ensure that user does not try to restore a Savepoint that is from a different Database
1091        assert_eq!(
1092            std::ptr::from_ref(self.transaction_tracker.as_ref()),
1093            savepoint.db_address()
1094        );
1095
1096        if !self
1097            .transaction_tracker
1098            .is_valid_savepoint(savepoint.get_id())
1099        {
1100            return Err(SavepointError::InvalidSavepoint);
1101        }
1102        #[cfg(feature = "logging")]
1103        debug!(
1104            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1105            savepoint.get_id(),
1106            self.transaction_id
1107        );
1108        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
1109        // the database
1110        assert_eq!(self.mem.get_version(), savepoint.get_version());
1111        self.dirty.store(true, Ordering::Release);
1112
1113        // Restoring a savepoint needs to accomplish the following:
1114        // 1) restore the table tree. This is trivial, since we have the old root
1115        // 1a) we also filter the freed tree to remove any pages referenced by the old root
1116        // 2) free all pages that were allocated since the savepoint and are unreachable
1117        //    from the restored table tree root. Here we diff the reachable pages from the old
1118        //    and new roots
1119        // 3) update the system tree to remove invalid persistent savepoints.
1120
1121        // 1) restore the table tree
1122        {
1123            self.tables
1124                .lock()
1125                .unwrap()
1126                .set_root(savepoint.get_user_root());
1127        }
1128
1129        // 1a) purge all transactions that happened after the savepoint from the data freed tree
1130        let txn_id = savepoint.get_transaction_id().next().raw_id();
1131        {
1132            let lower = TransactionIdWithPagination {
1133                transaction_id: txn_id,
1134                pagination_id: 0,
1135            };
1136            let mut system_tables = self.system_tables.lock().unwrap();
1137            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1138            for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1139                entry?;
1140            }
1141            // No need to process the system freed table, because it only rolls forward
1142        }
1143
1144        // 2) queue all pages that became unreachable
1145        {
1146            let tables = self.tables.lock().unwrap();
1147            let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1148            let mut system_tables = self.system_tables.lock().unwrap();
1149            let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1150            let lower = TransactionIdWithPagination {
1151                transaction_id: txn_id,
1152                pagination_id: 0,
1153            };
1154            for entry in data_allocated.range(lower..)? {
1155                let (_, value) = entry?;
1156                for i in 0..value.value().len() {
1157                    data_freed_pages.push(value.value().get(i));
1158                }
1159            }
1160        }
1161
1162        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1163        // from later trying to restore a savepoint "on another timeline"
1164        self.transaction_tracker
1165            .invalidate_savepoints_after(savepoint.get_id());
1166        for persistent_savepoint in self.list_persistent_savepoints()? {
1167            if persistent_savepoint > savepoint.get_id().0 {
1168                self.delete_persistent_savepoint(persistent_savepoint)?;
1169            }
1170        }
1171
1172        Ok(())
1173    }
1174
1175    /// Set the desired durability level for writes made in this transaction
1176    /// Defaults to [`Durability::Immediate`]
1177    ///
1178    /// If a persistent savepoint has been created or deleted, in this transaction, the durability may not
1179    /// be reduced below [`Durability::Immediate`]
1180    pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1181        let created = !self
1182            .created_persistent_savepoints
1183            .lock()
1184            .unwrap()
1185            .is_empty();
1186        let deleted = !self
1187            .deleted_persistent_savepoints
1188            .lock()
1189            .unwrap()
1190            .is_empty();
1191        if (created || deleted) && !matches!(durability, Durability::Immediate) {
1192            return Err(SetDurabilityError::PersistentSavepointModified);
1193        }
1194
1195        self.durability = match durability {
1196            Durability::None => InternalDurability::None,
1197            Durability::Immediate => InternalDurability::Immediate,
1198        };
1199
1200        Ok(())
1201    }
1202
1203    /// Enable or disable 2-phase commit (defaults to disabled)
1204    ///
1205    /// By default, data is written using the following 1-phase commit algorithm:
1206    ///
1207    /// 1. Update the inactive commit slot with the new database state
1208    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1209    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1210    ///
1211    /// All data is written with checksums. When opening the database after a crash, the most
1212    /// recent of the two commit slots with a valid checksum is used.
1213    ///
1214    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1215    /// function with close to perfect collision resistance when used with non-malicious input. An
1216    /// attacker with an extremely high degree of control over the database's workload, including
1217    /// the ability to cause the database process to crash, can cause invalid data to be written
1218    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1219    ///
1220    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1221    ///
1222    /// 1. Update the inactive commit slot with the new database state
1223    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1224    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1225    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1226    ///
1227    /// This mitigates a theoretical attack where an attacker who
1228    /// 1. can control the order in which pages are flushed to disk
1229    /// 2. can introduce crashes during `fsync`,
1230    /// 3. has knowledge of the database file contents, and
1231    /// 4. can include arbitrary data in a write transaction
1232    ///
1233    /// could cause a transaction to partially commit (some but not all of the data is written).
1234    /// This is described in the design doc in futher detail.
1235    ///
1236    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1237    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1238    /// a high degree of control over the database's workload, including the ability to cause the
1239    /// database process to crash, can cause the database to crash with the god byte primary bit
1240    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1241    /// controlled state.
1242    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1243        self.two_phase_commit = enabled;
1244    }
1245
1246    /// Enable or disable quick-repair (defaults to disabled)
1247    ///
1248    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1249    /// This involves walking the entire database to verify the checksums and reconstruct the
1250    /// allocator state, so it can be very slow if the database is large.
1251    ///
1252    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1253    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1254    /// (which guarantees that the primary commit slot is valid without needing to look at the
1255    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1256    pub fn set_quick_repair(&mut self, enabled: bool) {
1257        self.quick_repair = enabled;
1258    }
1259
1260    /// Open the given table
1261    ///
1262    /// The table will be created if it does not exist
1263    #[track_caller]
1264    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1265        &'txn self,
1266        definition: TableDefinition<K, V>,
1267    ) -> Result<Table<'txn, K, V>, TableError> {
1268        self.tables.lock().unwrap().open_table(self, definition)
1269    }
1270
1271    /// Open the given table
1272    ///
1273    /// The table will be created if it does not exist
1274    #[track_caller]
1275    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1276        &'txn self,
1277        definition: MultimapTableDefinition<K, V>,
1278    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1279        self.tables
1280            .lock()
1281            .unwrap()
1282            .open_multimap_table(self, definition)
1283    }
1284
1285    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1286        &self,
1287        name: &str,
1288        table: &BtreeMut<K, V>,
1289        length: u64,
1290    ) {
1291        self.tables.lock().unwrap().close_table(name, table, length);
1292    }
1293
1294    /// Rename the given table
1295    pub fn rename_table(
1296        &self,
1297        definition: impl TableHandle,
1298        new_name: impl TableHandle,
1299    ) -> Result<(), TableError> {
1300        let name = definition.name().to_string();
1301        // Drop the definition so that callers can pass in a `Table` to rename, without getting a TableAlreadyOpen error
1302        drop(definition);
1303        self.tables
1304            .lock()
1305            .unwrap()
1306            .rename_table(self, &name, new_name.name())
1307    }
1308
1309    /// Rename the given multimap table
1310    pub fn rename_multimap_table(
1311        &self,
1312        definition: impl MultimapTableHandle,
1313        new_name: impl MultimapTableHandle,
1314    ) -> Result<(), TableError> {
1315        let name = definition.name().to_string();
1316        // Drop the definition so that callers can pass in a `MultimapTable` to rename, without getting a TableAlreadyOpen error
1317        drop(definition);
1318        self.tables
1319            .lock()
1320            .unwrap()
1321            .rename_multimap_table(self, &name, new_name.name())
1322    }
1323
1324    /// Delete the given table
1325    ///
1326    /// Returns a bool indicating whether the table existed
1327    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1328        let name = definition.name().to_string();
1329        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1330        drop(definition);
1331        self.tables.lock().unwrap().delete_table(self, &name)
1332    }
1333
1334    /// Delete the given table
1335    ///
1336    /// Returns a bool indicating whether the table existed
1337    pub fn delete_multimap_table(
1338        &self,
1339        definition: impl MultimapTableHandle,
1340    ) -> Result<bool, TableError> {
1341        let name = definition.name().to_string();
1342        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1343        drop(definition);
1344        self.tables
1345            .lock()
1346            .unwrap()
1347            .delete_multimap_table(self, &name)
1348    }
1349
1350    /// List all the tables
1351    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1352        self.tables
1353            .lock()
1354            .unwrap()
1355            .table_tree
1356            .list_tables(TableType::Normal)
1357            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1358    }
1359
1360    /// List all the multimap tables
1361    pub fn list_multimap_tables(
1362        &self,
1363    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1364        self.tables
1365            .lock()
1366            .unwrap()
1367            .table_tree
1368            .list_tables(TableType::Multimap)
1369            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1370    }
1371
1372    /// Commit the transaction
1373    ///
1374    /// All writes performed in this transaction will be visible to future transactions, and are
1375    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1376    pub fn commit(mut self) -> Result<(), CommitError> {
1377        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1378        self.completed = true;
1379        self.commit_inner()
1380    }
1381
1382    fn commit_inner(&mut self) -> Result<(), CommitError> {
1383        // Quick-repair requires 2-phase commit
1384        if self.quick_repair {
1385            self.two_phase_commit = true;
1386        }
1387
1388        let (user_root, allocated_pages, data_freed) =
1389            self.tables.lock().unwrap().table_tree.flush_and_close()?;
1390
1391        self.store_data_freed_pages(data_freed)?;
1392        self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1393
1394        #[cfg(feature = "logging")]
1395        debug!(
1396            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1397            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1398        );
1399        match self.durability {
1400            InternalDurability::None => self.non_durable_commit(user_root)?,
1401            InternalDurability::Immediate => self.durable_commit(user_root)?,
1402        }
1403
1404        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1405            self.transaction_tracker
1406                .deallocate_savepoint(*savepoint, *transaction);
1407        }
1408
1409        assert!(
1410            self.system_tables
1411                .lock()
1412                .unwrap()
1413                .system_freed_pages()
1414                .lock()
1415                .unwrap()
1416                .is_empty()
1417        );
1418        assert!(
1419            self.tables
1420                .lock()
1421                .unwrap()
1422                .freed_pages
1423                .lock()
1424                .unwrap()
1425                .is_empty()
1426        );
1427
1428        #[cfg(feature = "logging")]
1429        debug!(
1430            "Finished commit of transaction id={:?}",
1431            self.transaction_id
1432        );
1433
1434        Ok(())
1435    }
1436
1437    fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1438        let mut system_tables = self.system_tables.lock().unwrap();
1439        let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1440        let mut pagination_counter = 0;
1441        while !freed_pages.is_empty() {
1442            let chunk_size = 400;
1443            let buffer_size = PageList::required_bytes(chunk_size);
1444            let key = TransactionIdWithPagination {
1445                transaction_id: self.transaction_id.raw_id(),
1446                pagination_id: pagination_counter,
1447            };
1448            let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
1449
1450            let len = freed_pages.len();
1451            access_guard.as_mut().clear();
1452            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1453                // Make sure that the page is currently allocated
1454                debug_assert!(
1455                    self.mem.is_allocated(page),
1456                    "Page is not allocated: {page:?}"
1457                );
1458                debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1459                access_guard.as_mut().push_back(page);
1460            }
1461
1462            pagination_counter += 1;
1463        }
1464
1465        Ok(())
1466    }
1467
1468    fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1469        let mut system_tables = self.system_tables.lock().unwrap();
1470        let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1471        let mut pagination_counter = 0;
1472        while !data_allocated_pages.is_empty() {
1473            let chunk_size = 400;
1474            let buffer_size = PageList::required_bytes(chunk_size);
1475            let key = TransactionIdWithPagination {
1476                transaction_id: self.transaction_id.raw_id(),
1477                pagination_id: pagination_counter,
1478            };
1479            let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
1480
1481            let len = data_allocated_pages.len();
1482            access_guard.as_mut().clear();
1483            for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1484                // Make sure that the page is currently allocated. This is to catch scenarios like
1485                // a page getting allocated, and then deallocated within the same transaction,
1486                // but errantly being left in the allocated pages list
1487                debug_assert!(
1488                    self.mem.is_allocated(page),
1489                    "Page is not allocated: {page:?}"
1490                );
1491                debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1492                access_guard.as_mut().push_back(page);
1493            }
1494
1495            pagination_counter += 1;
1496        }
1497
1498        // Purge any transactions that are no longer referenced
1499        let oldest = self
1500            .transaction_tracker
1501            .oldest_savepoint()
1502            .map_or(u64::MAX, |(_, x)| x.raw_id());
1503        let key = TransactionIdWithPagination {
1504            transaction_id: oldest,
1505            pagination_id: 0,
1506        };
1507        for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1508            entry?;
1509        }
1510
1511        Ok(())
1512    }
1513
1514    /// Abort the transaction
1515    ///
1516    /// All writes performed in this transaction will be rolled back
1517    pub fn abort(mut self) -> Result {
1518        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1519        self.completed = true;
1520        self.abort_inner()
1521    }
1522
1523    fn abort_inner(&mut self) -> Result {
1524        #[cfg(feature = "logging")]
1525        debug!("Aborting transaction id={:?}", self.transaction_id);
1526        self.tables
1527            .lock()
1528            .unwrap()
1529            .table_tree
1530            .clear_root_updates_and_close();
1531        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1532            match self.delete_persistent_savepoint(savepoint.0) {
1533                Ok(_) => {}
1534                Err(err) => match err {
1535                    SavepointError::InvalidSavepoint => {
1536                        unreachable!();
1537                    }
1538                    SavepointError::Storage(storage_err) => {
1539                        return Err(storage_err);
1540                    }
1541                },
1542            }
1543        }
1544        self.mem.rollback_uncommitted_writes()?;
1545        #[cfg(feature = "logging")]
1546        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1547        Ok(())
1548    }
1549
1550    pub(crate) fn durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
1551        let free_until_transaction = self
1552            .transaction_tracker
1553            .oldest_live_read_transaction()
1554            .map_or(self.transaction_id, |x| x.next());
1555        self.process_freed_pages(free_until_transaction)?;
1556
1557        let mut system_tables = self.system_tables.lock().unwrap();
1558        let system_freed_pages = system_tables.system_freed_pages();
1559        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1560        system_tree
1561            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1562            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1563
1564        if self.quick_repair {
1565            system_tree.create_table_and_flush_table_root(
1566                ALLOCATOR_STATE_TABLE_NAME,
1567                |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
1568                    let mut pagination_counter = 0;
1569
1570                    loop {
1571                        let num_regions = self
1572                            .mem
1573                            .reserve_allocator_state(tree, self.transaction_id)?;
1574
1575                        // We can't free pages after the commit, because that would invalidate our
1576                        // saved allocator state. Everything needs to go through the transactional
1577                        // free mechanism
1578                        self.store_system_freed_pages(
1579                            system_tree_ref,
1580                            system_freed_pages.clone(),
1581                            None,
1582                            &mut pagination_counter,
1583                        )?;
1584
1585                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1586                            return Ok(());
1587                        }
1588
1589                        // Clear out the table before retrying, just in case the number of regions
1590                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1591                        // free the pages immediately -- we need to reuse those pages to guarantee
1592                        // that our retry loop will eventually terminate
1593                        while let Some(guards) = tree.last()? {
1594                            let key = guards.0.value();
1595                            drop(guards);
1596                            tree.remove(&key)?;
1597                        }
1598                    }
1599                },
1600            )?;
1601        }
1602
1603        let system_root = system_tree.finalize_dirty_checksums()?;
1604
1605        self.mem.commit(
1606            user_root,
1607            system_root,
1608            self.transaction_id,
1609            self.two_phase_commit,
1610            self.shrink_policy,
1611        )?;
1612
1613        // Mark any pending non-durable commits as fully committed.
1614        self.transaction_tracker.clear_pending_non_durable_commits();
1615
1616        // Immediately free the pages that were freed from the system-tree. These are only
1617        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
1618        for page in system_freed_pages.lock().unwrap().drain(..) {
1619            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1620        }
1621
1622        Ok(())
1623    }
1624
1625    // Commit without a durability guarantee
1626    pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
1627        let mut free_until_transaction = self
1628            .transaction_tracker
1629            .oldest_live_read_nondurable_transaction()
1630            .map_or(self.transaction_id, |x| x.next());
1631        // TODO: refactor the non-durable free'ed processing to remove this
1632        // The reason it is needed is that non-durable commits edit previous non-durable commits,
1633        // but they only edit the freed tree of unpersisted pages.
1634        // The allocated tree, which savepoints rely, is not edited for performance reasons
1635        // Therefore, we must not edit anything after a savepoint
1636        // It would be better for non-durable transaction's unpersisted pages to be kept in-memory
1637        // in a data structure where the allocated list can be efficiently edited
1638        if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint() {
1639            free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
1640        }
1641        self.process_freed_pages_nondurable(free_until_transaction)?;
1642
1643        let mut post_commit_frees = vec![];
1644
1645        let system_root = {
1646            let mut system_tables = self.system_tables.lock().unwrap();
1647            let system_freed_pages = system_tables.system_freed_pages();
1648            system_tables.table_tree.flush_table_root_updates()?;
1649            for page in system_freed_pages
1650                .lock()
1651                .unwrap()
1652                .extract_if(.., |p| self.mem.unpersisted(*p))
1653            {
1654                post_commit_frees.push(page);
1655            }
1656            // Store all freed pages for a future commit(), since we can't free pages during a
1657            // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
1658            self.store_system_freed_pages(
1659                &mut system_tables.table_tree,
1660                system_freed_pages,
1661                Some(&mut post_commit_frees),
1662                &mut 0,
1663            )?;
1664
1665            system_tables
1666                .table_tree
1667                .flush_table_root_updates()?
1668                .finalize_dirty_checksums()?
1669        };
1670
1671        self.mem
1672            .non_durable_commit(user_root, system_root, self.transaction_id)?;
1673        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
1674        // are only processed after this has been persisted
1675        self.transaction_tracker.register_non_durable_commit(
1676            self.transaction_id,
1677            self.mem.get_last_durable_transaction_id()?,
1678        );
1679
1680        for page in post_commit_frees {
1681            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1682        }
1683
1684        Ok(())
1685    }
1686
1687    // Relocate pages to lower number regions/pages
1688    // Returns true if a page(s) was moved
1689    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1690        let mut progress = false;
1691
1692        // Find the 1M highest pages
1693        let mut highest_pages = BTreeMap::new();
1694        let mut tables = self.tables.lock().unwrap();
1695        let table_tree = &mut tables.table_tree;
1696        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1697        let mut system_tables = self.system_tables.lock().unwrap();
1698        let system_table_tree = &mut system_tables.table_tree;
1699        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1700
1701        // Calculate how many of them can be relocated to lower pages, starting from the last page
1702        let mut relocation_map = HashMap::new();
1703        for path in highest_pages.into_values().rev() {
1704            if relocation_map.contains_key(&path.page_number()) {
1705                continue;
1706            }
1707            let old_page = self.mem.get_page(path.page_number())?;
1708            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1709            let new_page_number = new_page.get_page_number();
1710            // We have to copy at least the page type into the new page.
1711            // Otherwise its cache priority will be calculated incorrectly
1712            new_page.memory_mut()[0] = old_page.memory()[0];
1713            drop(new_page);
1714            // We're able to move this to a lower page, so insert it and rewrite all its parents
1715            if new_page_number < path.page_number() {
1716                relocation_map.insert(path.page_number(), new_page_number);
1717                for parent in path.parents() {
1718                    if relocation_map.contains_key(parent) {
1719                        continue;
1720                    }
1721                    let old_parent = self.mem.get_page(*parent)?;
1722                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1723                    let new_page_number = new_page.get_page_number();
1724                    // We have to copy at least the page type into the new page.
1725                    // Otherwise its cache priority will be calculated incorrectly
1726                    new_page.memory_mut()[0] = old_parent.memory()[0];
1727                    drop(new_page);
1728                    relocation_map.insert(*parent, new_page_number);
1729                }
1730            } else {
1731                self.mem
1732                    .free(new_page_number, &mut PageTrackerPolicy::Ignore);
1733                break;
1734            }
1735        }
1736
1737        if !relocation_map.is_empty() {
1738            progress = true;
1739        }
1740
1741        table_tree.relocate_tables(&relocation_map)?;
1742        system_table_tree.relocate_tables(&relocation_map)?;
1743
1744        Ok(progress)
1745    }
1746
1747    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
1748    // more pages freed by the current transaction
1749    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1750        // We assume below that PageNumber is length 8
1751        assert_eq!(PageNumber::serialized_size(), 8);
1752
1753        // Handle the data freed tree
1754        let mut system_tables = self.system_tables.lock().unwrap();
1755        {
1756            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1757            let key = TransactionIdWithPagination {
1758                transaction_id: free_until.raw_id(),
1759                pagination_id: 0,
1760            };
1761            for entry in data_freed.extract_from_if(..key, |_, _| true)? {
1762                let (_, page_list) = entry?;
1763                for i in 0..page_list.value().len() {
1764                    self.mem
1765                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1766                }
1767            }
1768        }
1769
1770        // Handle the system freed tree
1771        {
1772            let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
1773            let key = TransactionIdWithPagination {
1774                transaction_id: free_until.raw_id(),
1775                pagination_id: 0,
1776            };
1777            for entry in system_freed.extract_from_if(..key, |_, _| true)? {
1778                let (_, page_list) = entry?;
1779                for i in 0..page_list.value().len() {
1780                    self.mem
1781                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1782                }
1783            }
1784        }
1785
1786        Ok(())
1787    }
1788
1789    fn process_freed_pages_nondurable_helper(
1790        &mut self,
1791        free_until: TransactionId,
1792        definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
1793    ) -> Result<Vec<TransactionId>> {
1794        let mut processed = vec![];
1795        let mut system_tables = self.system_tables.lock().unwrap();
1796
1797        let last_key = TransactionIdWithPagination {
1798            transaction_id: free_until.raw_id(),
1799            pagination_id: 0,
1800        };
1801        let oldest_unprocessed = self
1802            .transaction_tracker
1803            .oldest_unprocessed_non_durable_commit()
1804            .map_or(free_until.raw_id(), |x| x.raw_id());
1805        let first_key = TransactionIdWithPagination {
1806            transaction_id: oldest_unprocessed,
1807            pagination_id: 0,
1808        };
1809        let mut data_freed = system_tables.open_system_table(self, definition)?;
1810
1811        let mut candidate_transactions = vec![];
1812        for entry in data_freed.range(first_key..last_key)? {
1813            let (key, _) = entry?;
1814            let transaction_id = TransactionId::new(key.value().transaction_id);
1815            if self
1816                .transaction_tracker
1817                .is_unprocessed_non_durable_commit(transaction_id)
1818            {
1819                candidate_transactions.push(transaction_id);
1820            }
1821        }
1822        for transaction_id in candidate_transactions {
1823            let mut key = TransactionIdWithPagination {
1824                transaction_id: transaction_id.raw_id(),
1825                pagination_id: 0,
1826            };
1827            loop {
1828                let Some(entry) = data_freed.get(&key)? else {
1829                    break;
1830                };
1831                let pages = entry.value();
1832                let mut new_pages = vec![];
1833                for i in 0..pages.len() {
1834                    let page = pages.get(i);
1835                    if !self
1836                        .mem
1837                        .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
1838                    {
1839                        new_pages.push(page);
1840                    }
1841                }
1842                if new_pages.len() != pages.len() {
1843                    drop(entry);
1844                    if new_pages.is_empty() {
1845                        data_freed.remove(&key)?;
1846                    } else {
1847                        let required = PageList::required_bytes(new_pages.len());
1848                        let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
1849                        for page in new_pages {
1850                            page_list_mut.as_mut().push_back(page);
1851                        }
1852                    }
1853                }
1854                key.pagination_id += 1;
1855            }
1856            processed.push(transaction_id);
1857        }
1858
1859        Ok(processed)
1860    }
1861
1862    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
1863    // more pages freed by the current transaction
1864    //
1865    // This method only frees pages that are unpersisted, in non-durable transactions, since
1866    // it is called from a non-durable commit() and therefore can't modify anything that the
1867    // on-disk state in the last durable transaction might reference.
1868    fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
1869        // We assume below that PageNumber is length 8
1870        assert_eq!(PageNumber::serialized_size(), 8);
1871
1872        // Handle the data freed tree
1873        let mut processed =
1874            self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
1875
1876        // Handle the system freed tree
1877        processed
1878            .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
1879
1880        for transaction_id in processed {
1881            self.transaction_tracker
1882                .mark_unprocessed_non_durable_commit(transaction_id);
1883        }
1884
1885        Ok(())
1886    }
1887
1888    fn store_system_freed_pages(
1889        &self,
1890        system_tree: &mut TableTreeMut,
1891        system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1892        mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
1893        pagination_counter: &mut u64,
1894    ) -> Result {
1895        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
1896
1897        system_tree.open_table_and_flush_table_root(
1898            SYSTEM_FREED_TABLE.name(),
1899            |system_freed_tree: &mut SystemFreedTree| {
1900                while !system_freed_pages.lock().unwrap().is_empty() {
1901                    let chunk_size = 200;
1902                    let buffer_size = PageList::required_bytes(chunk_size);
1903                    let key = TransactionIdWithPagination {
1904                        transaction_id: self.transaction_id.raw_id(),
1905                        pagination_id: *pagination_counter,
1906                    };
1907                    let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
1908
1909                    let mut freed_pages = system_freed_pages.lock().unwrap();
1910                    let len = freed_pages.len();
1911                    access_guard.as_mut().clear();
1912                    for page in freed_pages.drain(len - min(len, chunk_size)..) {
1913                        if let Some(ref mut unpersisted_pages) = unpersisted_pages
1914                            && self.mem.unpersisted(page)
1915                        {
1916                            unpersisted_pages.push(page);
1917                        } else {
1918                            access_guard.as_mut().push_back(page);
1919                        }
1920                    }
1921                    drop(access_guard);
1922
1923                    *pagination_counter += 1;
1924                }
1925                Ok(())
1926            },
1927        )?;
1928
1929        Ok(())
1930    }
1931
1932    /// Retrieves information about storage usage in the database
1933    pub fn stats(&self) -> Result<DatabaseStats> {
1934        let tables = self.tables.lock().unwrap();
1935        let table_tree = &tables.table_tree;
1936        let data_tree_stats = table_tree.stats()?;
1937
1938        let system_tables = self.system_tables.lock().unwrap();
1939        let system_table_tree = &system_tables.table_tree;
1940        let system_tree_stats = system_table_tree.stats()?;
1941
1942        let total_metadata_bytes = data_tree_stats.metadata_bytes()
1943            + system_tree_stats.metadata_bytes
1944            + system_tree_stats.stored_leaf_bytes;
1945        let total_fragmented = data_tree_stats.fragmented_bytes()
1946            + system_tree_stats.fragmented_bytes
1947            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1948
1949        Ok(DatabaseStats {
1950            tree_height: data_tree_stats.tree_height(),
1951            allocated_pages: self.mem.count_allocated_pages()?,
1952            leaf_pages: data_tree_stats.leaf_pages(),
1953            branch_pages: data_tree_stats.branch_pages(),
1954            stored_leaf_bytes: data_tree_stats.stored_bytes(),
1955            metadata_bytes: total_metadata_bytes,
1956            fragmented_bytes: total_fragmented,
1957            page_size: self.mem.get_page_size(),
1958        })
1959    }
1960
1961    #[allow(dead_code)]
1962    pub(crate) fn print_debug(&self) -> Result {
1963        // Flush any pending updates to make sure we get the latest root
1964        let mut tables = self.tables.lock().unwrap();
1965        if let Some(page) = tables
1966            .table_tree
1967            .flush_table_root_updates()
1968            .unwrap()
1969            .finalize_dirty_checksums()
1970            .unwrap()
1971        {
1972            eprintln!("Master tree:");
1973            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1974                Some(page),
1975                PageHint::None,
1976                self.transaction_guard.clone(),
1977                self.mem.clone(),
1978            )?;
1979            master_tree.print_debug(true)?;
1980        }
1981
1982        // Flush any pending updates to make sure we get the latest root
1983        let mut system_tables = self.system_tables.lock().unwrap();
1984        if let Some(page) = system_tables
1985            .table_tree
1986            .flush_table_root_updates()
1987            .unwrap()
1988            .finalize_dirty_checksums()
1989            .unwrap()
1990        {
1991            eprintln!("System tree:");
1992            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1993                Some(page),
1994                PageHint::None,
1995                self.transaction_guard.clone(),
1996                self.mem.clone(),
1997            )?;
1998            master_tree.print_debug(true)?;
1999        }
2000
2001        Ok(())
2002    }
2003}
2004
2005impl Drop for WriteTransaction {
2006    fn drop(&mut self) {
2007        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2008            #[allow(unused_variables)]
2009            if let Err(error) = self.abort_inner() {
2010                #[cfg(feature = "logging")]
2011                warn!("Failure automatically aborting transaction: {error}");
2012            }
2013        } else if !self.completed && self.mem.storage_failure() {
2014            self.tables
2015                .lock()
2016                .unwrap()
2017                .table_tree
2018                .clear_root_updates_and_close();
2019        }
2020    }
2021}
2022
2023/// A read-only transaction
2024///
2025/// Read-only transactions may exist concurrently with writes
2026pub struct ReadTransaction {
2027    mem: Arc<TransactionalMemory>,
2028    tree: TableTree,
2029}
2030
2031impl ReadTransaction {
2032    pub(crate) fn new(
2033        mem: Arc<TransactionalMemory>,
2034        guard: TransactionGuard,
2035    ) -> Result<Self, TransactionError> {
2036        let root_page = mem.get_data_root();
2037        let guard = Arc::new(guard);
2038        Ok(Self {
2039            mem: mem.clone(),
2040            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2041                .map_err(TransactionError::Storage)?,
2042        })
2043    }
2044
2045    /// Open the given table
2046    pub fn open_table<K: Key + 'static, V: Value + 'static>(
2047        &self,
2048        definition: TableDefinition<K, V>,
2049    ) -> Result<ReadOnlyTable<K, V>, TableError> {
2050        let header = self
2051            .tree
2052            .get_table::<K, V>(definition.name(), TableType::Normal)?
2053            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2054
2055        match header {
2056            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2057                definition.name().to_string(),
2058                table_root,
2059                PageHint::Clean,
2060                self.tree.transaction_guard().clone(),
2061                self.mem.clone(),
2062            )?),
2063            InternalTableDefinition::Multimap { .. } => unreachable!(),
2064        }
2065    }
2066
2067    /// Open the given table without a type
2068    pub fn open_untyped_table(
2069        &self,
2070        handle: impl TableHandle,
2071    ) -> Result<ReadOnlyUntypedTable, TableError> {
2072        let header = self
2073            .tree
2074            .get_table_untyped(handle.name(), TableType::Normal)?
2075            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2076
2077        match header {
2078            InternalTableDefinition::Normal {
2079                table_root,
2080                fixed_key_size,
2081                fixed_value_size,
2082                ..
2083            } => Ok(ReadOnlyUntypedTable::new(
2084                table_root,
2085                fixed_key_size,
2086                fixed_value_size,
2087                self.mem.clone(),
2088            )),
2089            InternalTableDefinition::Multimap { .. } => unreachable!(),
2090        }
2091    }
2092
2093    /// Open the given table
2094    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2095        &self,
2096        definition: MultimapTableDefinition<K, V>,
2097    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2098        let header = self
2099            .tree
2100            .get_table::<K, V>(definition.name(), TableType::Multimap)?
2101            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2102
2103        match header {
2104            InternalTableDefinition::Normal { .. } => unreachable!(),
2105            InternalTableDefinition::Multimap {
2106                table_root,
2107                table_length,
2108                ..
2109            } => Ok(ReadOnlyMultimapTable::new(
2110                table_root,
2111                table_length,
2112                PageHint::Clean,
2113                self.tree.transaction_guard().clone(),
2114                self.mem.clone(),
2115            )?),
2116        }
2117    }
2118
2119    /// Open the given table without a type
2120    pub fn open_untyped_multimap_table(
2121        &self,
2122        handle: impl MultimapTableHandle,
2123    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2124        let header = self
2125            .tree
2126            .get_table_untyped(handle.name(), TableType::Multimap)?
2127            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2128
2129        match header {
2130            InternalTableDefinition::Normal { .. } => unreachable!(),
2131            InternalTableDefinition::Multimap {
2132                table_root,
2133                table_length,
2134                fixed_key_size,
2135                fixed_value_size,
2136                ..
2137            } => Ok(ReadOnlyUntypedMultimapTable::new(
2138                table_root,
2139                table_length,
2140                fixed_key_size,
2141                fixed_value_size,
2142                self.mem.clone(),
2143            )),
2144        }
2145    }
2146
2147    /// List all the tables
2148    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2149        self.tree
2150            .list_tables(TableType::Normal)
2151            .map(|x| x.into_iter().map(UntypedTableHandle::new))
2152    }
2153
2154    /// List all the multimap tables
2155    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2156        self.tree
2157            .list_tables(TableType::Multimap)
2158            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2159    }
2160
2161    /// Close the transaction
2162    ///
2163    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
2164    /// so this method does not normally need to be called.
2165    /// This method can be used to ensure that there are no outstanding objects remaining.
2166    ///
2167    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
2168    pub fn close(self) -> Result<(), TransactionError> {
2169        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2170            return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
2171        }
2172        // No-op, just drop ourself
2173        Ok(())
2174    }
2175}
2176
2177impl Debug for ReadTransaction {
2178    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2179        f.write_str("ReadTransaction")
2180    }
2181}
2182
2183#[cfg(test)]
2184mod test {
2185    use crate::{Database, TableDefinition};
2186
2187    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2188
2189    #[test]
2190    fn transaction_id_persistence() {
2191        let tmpfile = crate::create_tempfile();
2192        let db = Database::create(tmpfile.path()).unwrap();
2193        let write_txn = db.begin_write().unwrap();
2194        {
2195            let mut table = write_txn.open_table(X).unwrap();
2196            table.insert("hello", "world").unwrap();
2197        }
2198        let first_txn_id = write_txn.transaction_id;
2199        write_txn.commit().unwrap();
2200        drop(db);
2201
2202        let db2 = Database::create(tmpfile.path()).unwrap();
2203        let write_txn = db2.begin_write().unwrap();
2204        assert!(write_txn.transaction_id > first_txn_id);
2205    }
2206}