redb/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9    InternalTableDefinition, Page, PageHint, PageNumber, SerializedSavepoint, TableTree,
10    TableTreeMut, TableType, TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
11};
12use crate::types::{Key, Value};
13use crate::{
14    AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15    ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16    TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17    UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28use std::ops::RangeFull;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35    SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37    SystemTableDefinition::new("persistent_savepoints");
38// The allocator state table is stored in the system table tree, but it's accessed using
39// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
40pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
41pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
42
43#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
44pub(crate) enum AllocatorStateKey {
45    Region(u32),
46    RegionTracker,
47    TransactionId,
48}
49
50impl Value for AllocatorStateKey {
51    type SelfType<'a> = Self;
52    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
53
54    fn fixed_width() -> Option<usize> {
55        Some(1 + size_of::<u32>())
56    }
57
58    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
59    where
60        Self: 'a,
61    {
62        match data[0] {
63            0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
64            1 => Self::RegionTracker,
65            2 => Self::TransactionId,
66            _ => unreachable!(),
67        }
68    }
69
70    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
71    where
72        Self: 'a,
73        Self: 'b,
74    {
75        let mut result = Self::AsBytes::default();
76        match value {
77            Self::Region(region) => {
78                result[0] = 0;
79                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
80            }
81            Self::RegionTracker => {
82                result[0] = 1;
83            }
84            Self::TransactionId => {
85                result[0] = 2;
86            }
87        }
88
89        result
90    }
91
92    fn type_name() -> TypeName {
93        TypeName::internal("redb::AllocatorStateKey")
94    }
95}
96
97impl Key for AllocatorStateKey {
98    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
99        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
100    }
101}
102
103pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
104    name: &'a str,
105    _key_type: PhantomData<K>,
106    _value_type: PhantomData<V>,
107}
108
109impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
110    pub const fn new(name: &'a str) -> Self {
111        assert!(!name.is_empty());
112        Self {
113            name,
114            _key_type: PhantomData,
115            _value_type: PhantomData,
116        }
117    }
118}
119
120impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'a, K, V> {
121    fn name(&self) -> &str {
122        self.name
123    }
124}
125
126impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
127
128impl<'a, K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'a, K, V> {
129    fn clone(&self) -> Self {
130        *self
131    }
132}
133
134impl<'a, K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'a, K, V> {}
135
136impl<'a, K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'a, K, V> {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        write!(
139            f,
140            "{}<{}, {}>",
141            self.name,
142            K::type_name().name(),
143            V::type_name().name()
144        )
145    }
146}
147
148/// Informational storage stats about the database
149#[derive(Debug)]
150pub struct DatabaseStats {
151    pub(crate) tree_height: u32,
152    pub(crate) allocated_pages: u64,
153    pub(crate) leaf_pages: u64,
154    pub(crate) branch_pages: u64,
155    pub(crate) stored_leaf_bytes: u64,
156    pub(crate) metadata_bytes: u64,
157    pub(crate) fragmented_bytes: u64,
158    pub(crate) page_size: usize,
159}
160
161impl DatabaseStats {
162    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
163    pub fn tree_height(&self) -> u32 {
164        self.tree_height
165    }
166
167    /// Number of pages allocated
168    pub fn allocated_pages(&self) -> u64 {
169        self.allocated_pages
170    }
171
172    /// Number of leaf pages that store user data
173    pub fn leaf_pages(&self) -> u64 {
174        self.leaf_pages
175    }
176
177    /// Number of branch pages in btrees that store user data
178    pub fn branch_pages(&self) -> u64 {
179        self.branch_pages
180    }
181
182    /// Number of bytes consumed by keys and values that have been inserted.
183    /// Does not include indexing overhead
184    pub fn stored_bytes(&self) -> u64 {
185        self.stored_leaf_bytes
186    }
187
188    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
189    pub fn metadata_bytes(&self) -> u64 {
190        self.metadata_bytes
191    }
192
193    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
194    pub fn fragmented_bytes(&self) -> u64 {
195        self.fragmented_bytes
196    }
197
198    /// Number of bytes per page
199    pub fn page_size(&self) -> usize {
200        self.page_size
201    }
202}
203
204#[derive(Copy, Clone, Debug)]
205#[non_exhaustive]
206pub enum Durability {
207    /// Commits with this durability level will not be persisted to disk unless followed by a
208    /// commit with a higher durability level.
209    ///
210    /// Note: Pages are only freed during commits with higher durability levels. Exclusively using
211    /// this durability level will result in rapid growth of the database file.
212    None,
213    /// Commits with this durability level have been queued for persitance to disk, and should be
214    /// persistent some time after [`WriteTransaction::commit`] returns.
215    Eventual,
216    /// Commits with this durability level are guaranteed to be persistent as soon as
217    /// [`WriteTransaction::commit`] returns.
218    Immediate,
219    /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code
220    /// should call `set_two_phase_commit(true)` directly instead.
221    #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
222    Paranoid,
223}
224
225// These are the actual durability levels used internally. `Durability::Paranoid` is translated
226// to `InternalDurability::Immediate`, and also enables 2-phase commit
227#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228enum InternalDurability {
229    None,
230    Eventual,
231    Immediate,
232}
233
234// Like a Table but only one may be open at a time to avoid possible races
235pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
236    name: String,
237    namespace: &'s mut SystemNamespace<'db>,
238    tree: BtreeMut<'s, K, V>,
239    transaction_guard: Arc<TransactionGuard>,
240}
241
242impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
243    fn new(
244        name: &str,
245        table_root: Option<BtreeHeader>,
246        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
247        guard: Arc<TransactionGuard>,
248        mem: Arc<TransactionalMemory>,
249        namespace: &'s mut SystemNamespace<'db>,
250    ) -> SystemTable<'db, 's, K, V> {
251        SystemTable {
252            name: name.to_string(),
253            namespace,
254            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
255            transaction_guard: guard,
256        }
257    }
258
259    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
260    where
261        K: 'a,
262    {
263        self.tree.get(key.borrow())
264    }
265
266    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
267    where
268        K: 'a,
269        KR: Borrow<K::SelfType<'a>> + 'a,
270    {
271        self.tree
272            .range(&range)
273            .map(|x| Range::new(x, self.transaction_guard.clone()))
274    }
275
276    pub fn insert<'k, 'v>(
277        &mut self,
278        key: impl Borrow<K::SelfType<'k>>,
279        value: impl Borrow<V::SelfType<'v>>,
280    ) -> Result<Option<AccessGuard<V>>> {
281        let value_len = V::as_bytes(value.borrow()).as_ref().len();
282        if value_len > MAX_VALUE_LENGTH {
283            return Err(StorageError::ValueTooLarge(value_len));
284        }
285        let key_len = K::as_bytes(key.borrow()).as_ref().len();
286        if key_len > MAX_VALUE_LENGTH {
287            return Err(StorageError::ValueTooLarge(key_len));
288        }
289        if value_len + key_len > MAX_PAIR_LENGTH {
290            return Err(StorageError::ValueTooLarge(value_len + key_len));
291        }
292        self.tree.insert(key.borrow(), value.borrow())
293    }
294
295    pub fn remove<'a>(
296        &mut self,
297        key: impl Borrow<K::SelfType<'a>>,
298    ) -> Result<Option<AccessGuard<V>>>
299    where
300        K: 'a,
301    {
302        self.tree.remove(key.borrow())
303    }
304}
305
306impl<'db, 's, K: Key + 'static, V: Value + 'static> Drop for SystemTable<'db, 's, K, V> {
307    fn drop(&mut self) {
308        self.namespace.close_table(
309            &self.name,
310            &self.tree,
311            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
312        );
313    }
314}
315
316struct SystemNamespace<'db> {
317    table_tree: TableTreeMut<'db>,
318    transaction_guard: Arc<TransactionGuard>,
319}
320
321impl<'db> SystemNamespace<'db> {
322    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
323        &'s mut self,
324        transaction: &'txn WriteTransaction,
325        definition: SystemTableDefinition<K, V>,
326    ) -> Result<SystemTable<'db, 's, K, V>> {
327        #[cfg(feature = "logging")]
328        debug!("Opening system table: {}", definition);
329        let (root, _) = self
330            .table_tree
331            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
332            .map_err(|e| {
333                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
334            })?;
335        transaction.dirty.store(true, Ordering::Release);
336
337        Ok(SystemTable::new(
338            definition.name(),
339            root,
340            transaction.freed_pages.clone(),
341            self.transaction_guard.clone(),
342            transaction.mem.clone(),
343            self,
344        ))
345    }
346
347    fn close_table<K: Key + 'static, V: Value + 'static>(
348        &mut self,
349        name: &str,
350        table: &BtreeMut<K, V>,
351        length: u64,
352    ) {
353        self.table_tree
354            .stage_update_table_root(name, table.get_root(), length);
355    }
356}
357
358struct TableNamespace<'db> {
359    open_tables: HashMap<String, &'static panic::Location<'static>>,
360    table_tree: TableTreeMut<'db>,
361}
362
363impl<'db> TableNamespace<'db> {
364    #[track_caller]
365    fn inner_open<K: Key + 'static, V: Value + 'static>(
366        &mut self,
367        name: &str,
368        table_type: TableType,
369    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
370        if let Some(location) = self.open_tables.get(name) {
371            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372        }
373
374        let root = self
375            .table_tree
376            .get_or_create_table::<K, V>(name, table_type)?;
377        self.open_tables
378            .insert(name.to_string(), panic::Location::caller());
379
380        Ok(root)
381    }
382
383    #[track_caller]
384    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
385        &mut self,
386        transaction: &'txn WriteTransaction,
387        definition: MultimapTableDefinition<K, V>,
388    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
389        #[cfg(feature = "logging")]
390        debug!("Opening multimap table: {}", definition);
391        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
392        transaction.dirty.store(true, Ordering::Release);
393
394        Ok(MultimapTable::new(
395            definition.name(),
396            root,
397            length,
398            transaction.freed_pages.clone(),
399            transaction.mem.clone(),
400            transaction,
401        ))
402    }
403
404    #[track_caller]
405    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
406        &mut self,
407        transaction: &'txn WriteTransaction,
408        definition: TableDefinition<K, V>,
409    ) -> Result<Table<'txn, K, V>, TableError> {
410        #[cfg(feature = "logging")]
411        debug!("Opening table: {}", definition);
412        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
413        transaction.dirty.store(true, Ordering::Release);
414
415        Ok(Table::new(
416            definition.name(),
417            root,
418            transaction.freed_pages.clone(),
419            transaction.mem.clone(),
420            transaction,
421        ))
422    }
423
424    #[track_caller]
425    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
426        if let Some(location) = self.open_tables.get(name) {
427            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
428        }
429
430        self.table_tree.delete_table(name, table_type)
431    }
432
433    #[track_caller]
434    fn delete_table(
435        &mut self,
436        transaction: &WriteTransaction,
437        name: &str,
438    ) -> Result<bool, TableError> {
439        #[cfg(feature = "logging")]
440        debug!("Deleting table: {}", name);
441        transaction.dirty.store(true, Ordering::Release);
442        self.inner_delete(name, TableType::Normal)
443    }
444
445    #[track_caller]
446    fn delete_multimap_table(
447        &mut self,
448        transaction: &WriteTransaction,
449        name: &str,
450    ) -> Result<bool, TableError> {
451        #[cfg(feature = "logging")]
452        debug!("Deleting multimap table: {}", name);
453        transaction.dirty.store(true, Ordering::Release);
454        self.inner_delete(name, TableType::Multimap)
455    }
456
457    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
458        &mut self,
459        name: &str,
460        table: &BtreeMut<K, V>,
461        length: u64,
462    ) {
463        self.open_tables.remove(name).unwrap();
464        self.table_tree
465            .stage_update_table_root(name, table.get_root(), length);
466    }
467}
468
469/// A read/write transaction
470///
471/// Only a single [`WriteTransaction`] may exist at a time
472pub struct WriteTransaction {
473    transaction_tracker: Arc<TransactionTracker>,
474    mem: Arc<TransactionalMemory>,
475    transaction_guard: Arc<TransactionGuard>,
476    transaction_id: TransactionId,
477    // The table of freed pages by transaction. FreedTableKey -> binary.
478    // The binary blob is a length-prefixed array of PageNumber
479    freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
480    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
481    // Pages that were freed from the freed-tree. These can be freed immediately after commit(),
482    // since read transactions do not access the freed-tree
483    post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
484    tables: Mutex<TableNamespace<'static>>,
485    system_tables: Mutex<SystemNamespace<'static>>,
486    completed: bool,
487    dirty: AtomicBool,
488    durability: InternalDurability,
489    two_phase_commit: bool,
490    quick_repair: bool,
491    // Persistent savepoints created during this transaction
492    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
493    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
494}
495
496impl WriteTransaction {
497    pub(crate) fn new(
498        guard: TransactionGuard,
499        transaction_tracker: Arc<TransactionTracker>,
500        mem: Arc<TransactionalMemory>,
501    ) -> Result<Self> {
502        let transaction_id = guard.id();
503        let guard = Arc::new(guard);
504
505        let root_page = mem.get_data_root();
506        let system_page = mem.get_system_root();
507        let freed_root = mem.get_freed_root();
508        let freed_pages = Arc::new(Mutex::new(vec![]));
509        let post_commit_frees = Arc::new(Mutex::new(vec![]));
510
511        let tables = TableNamespace {
512            open_tables: Default::default(),
513            table_tree: TableTreeMut::new(
514                root_page,
515                guard.clone(),
516                mem.clone(),
517                freed_pages.clone(),
518            ),
519        };
520        let system_tables = SystemNamespace {
521            table_tree: TableTreeMut::new(
522                system_page,
523                guard.clone(),
524                mem.clone(),
525                freed_pages.clone(),
526            ),
527            transaction_guard: guard.clone(),
528        };
529
530        Ok(Self {
531            transaction_tracker,
532            mem: mem.clone(),
533            transaction_guard: guard.clone(),
534            transaction_id,
535            tables: Mutex::new(tables),
536            system_tables: Mutex::new(system_tables),
537            freed_tree: Mutex::new(BtreeMut::new(
538                freed_root,
539                guard,
540                mem,
541                post_commit_frees.clone(),
542            )),
543            freed_pages,
544            post_commit_frees,
545            completed: false,
546            dirty: AtomicBool::new(false),
547            durability: InternalDurability::Immediate,
548            two_phase_commit: false,
549            quick_repair: false,
550            created_persistent_savepoints: Mutex::new(Default::default()),
551            deleted_persistent_savepoints: Mutex::new(vec![]),
552        })
553    }
554
555    #[cfg(any(test, fuzzing))]
556    pub fn print_allocated_page_debug(&self) {
557        let mut all_allocated: HashSet<PageNumber> =
558            HashSet::from_iter(self.mem.all_allocated_pages());
559
560        let tracker = self.mem.tracker_page();
561        all_allocated.remove(&tracker);
562        println!("Tracker page");
563        println!("{tracker:?}");
564
565        let mut table_pages = vec![];
566        self.tables
567            .lock()
568            .unwrap()
569            .table_tree
570            .visit_all_pages(|path| {
571                table_pages.push(path.page_number());
572                Ok(())
573            })
574            .unwrap();
575        println!("Tables");
576        for p in table_pages {
577            all_allocated.remove(&p);
578            println!("{p:?}");
579        }
580
581        let mut system_table_pages = vec![];
582        self.system_tables
583            .lock()
584            .unwrap()
585            .table_tree
586            .visit_all_pages(|path| {
587                system_table_pages.push(path.page_number());
588                Ok(())
589            })
590            .unwrap();
591        println!("System tables");
592        for p in system_table_pages {
593            all_allocated.remove(&p);
594            println!("{p:?}");
595        }
596
597        println!("Free table");
598        if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
599            for p in freed_iter {
600                let p = p.unwrap();
601                all_allocated.remove(&p);
602                println!("{p:?}");
603            }
604        }
605        println!("Pending free (i.e. in freed table)");
606        for entry in self
607            .freed_tree
608            .lock()
609            .unwrap()
610            .range::<RangeFull, FreedTableKey>(&(..))
611            .unwrap()
612        {
613            let entry = entry.unwrap();
614            let value = entry.value();
615            for i in 0..value.len() {
616                let p = value.get(i);
617                all_allocated.remove(&p);
618                println!("{p:?}");
619            }
620        }
621        {
622            let pages = self.freed_pages.lock().unwrap();
623            if !pages.is_empty() {
624                println!("Pages in in-memory freed_pages");
625                for p in pages.iter() {
626                    println!("{p:?}");
627                    all_allocated.remove(p);
628                }
629            }
630        }
631        {
632            let pages = self.post_commit_frees.lock().unwrap();
633            if !pages.is_empty() {
634                println!("Pages in in-memory post_commit_frees");
635                for p in pages.iter() {
636                    println!("{p:?}");
637                    all_allocated.remove(p);
638                }
639            }
640        }
641        if !all_allocated.is_empty() {
642            println!("Leaked pages");
643            for p in all_allocated {
644                println!("{p:?}");
645            }
646        }
647    }
648
649    /// Creates a snapshot of the current database state, which can be used to rollback the database.
650    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
651    ///
652    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
653    /// Therefore, the lifetime of a savepoint should be minimized.
654    ///
655    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
656    /// or if the transaction's durability is less than `[Durability::Immediate]`
657    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
658        if self.durability != InternalDurability::Immediate {
659            return Err(SavepointError::InvalidSavepoint);
660        }
661
662        let mut savepoint = self.ephemeral_savepoint()?;
663
664        let mut system_tables = self.system_tables.lock().unwrap();
665
666        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
667        next_table.insert((), savepoint.get_id().next())?;
668        drop(next_table);
669
670        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
671        savepoint_table.insert(
672            savepoint.get_id(),
673            SerializedSavepoint::from_savepoint(&savepoint),
674        )?;
675
676        savepoint.set_persistent();
677
678        self.created_persistent_savepoints
679            .lock()
680            .unwrap()
681            .insert(savepoint.get_id());
682
683        Ok(savepoint.get_id().0)
684    }
685
686    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
687        self.transaction_guard.clone()
688    }
689
690    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
691        let mut system_tables = self.system_tables.lock().unwrap();
692        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
693        let value = next_table.get(())?;
694        if let Some(next_id) = value {
695            Ok(Some(next_id.value()))
696        } else {
697            Ok(None)
698        }
699    }
700
701    /// Get a persistent savepoint given its id
702    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
703        let mut system_tables = self.system_tables.lock().unwrap();
704        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
705        let value = table.get(SavepointId(id))?;
706
707        value
708            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
709            .ok_or(SavepointError::InvalidSavepoint)
710    }
711
712    /// Delete the given persistent savepoint.
713    ///
714    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
715    ///
716    /// Returns `true` if the savepoint existed
717    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
718    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
719        if self.durability != InternalDurability::Immediate {
720            return Err(SavepointError::InvalidSavepoint);
721        }
722        let mut system_tables = self.system_tables.lock().unwrap();
723        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
724        let savepoint = table.remove(SavepointId(id))?;
725        if let Some(serialized) = savepoint {
726            let savepoint = serialized
727                .value()
728                .to_savepoint(self.transaction_tracker.clone());
729            self.deleted_persistent_savepoints
730                .lock()
731                .unwrap()
732                .push((savepoint.get_id(), savepoint.get_transaction_id()));
733            Ok(true)
734        } else {
735            Ok(false)
736        }
737    }
738
739    /// List all persistent savepoints
740    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
741        let mut system_tables = self.system_tables.lock().unwrap();
742        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
743        let mut savepoints = vec![];
744        for savepoint in table.range::<SavepointId>(..)? {
745            savepoints.push(savepoint?.0.value().0);
746        }
747        Ok(savepoints.into_iter())
748    }
749
750    // TODO: deduplicate this with the one in Database
751    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
752        let id = self
753            .transaction_tracker
754            .register_read_transaction(&self.mem)?;
755
756        Ok(TransactionGuard::new_read(
757            id,
758            self.transaction_tracker.clone(),
759        ))
760    }
761
762    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
763        let id = self.transaction_tracker.allocate_savepoint();
764        Ok((id, self.allocate_read_transaction()?.leak()))
765    }
766
767    /// Creates a snapshot of the current database state, which can be used to rollback the database
768    ///
769    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
770    ///
771    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
772    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
773        if self.dirty.load(Ordering::Acquire) {
774            return Err(SavepointError::InvalidSavepoint);
775        }
776
777        let (id, transaction_id) = self.allocate_savepoint()?;
778        #[cfg(feature = "logging")]
779        debug!(
780            "Creating savepoint id={:?}, txn_id={:?}",
781            id, transaction_id
782        );
783
784        let regional_allocators = self.mem.get_raw_allocator_states();
785        let root = self.mem.get_data_root();
786        let system_root = self.mem.get_system_root();
787        let freed_root = self.mem.get_freed_root();
788        let savepoint = Savepoint::new_ephemeral(
789            &self.mem,
790            self.transaction_tracker.clone(),
791            id,
792            transaction_id,
793            root,
794            system_root,
795            freed_root,
796            regional_allocators,
797        );
798
799        Ok(savepoint)
800    }
801
802    /// Restore the state of the database to the given [`Savepoint`]
803    ///
804    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
805    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
806        // Ensure that user does not try to restore a Savepoint that is from a different Database
807        assert_eq!(
808            std::ptr::from_ref(self.transaction_tracker.as_ref()),
809            savepoint.db_address()
810        );
811
812        if !self
813            .transaction_tracker
814            .is_valid_savepoint(savepoint.get_id())
815        {
816            return Err(SavepointError::InvalidSavepoint);
817        }
818        #[cfg(feature = "logging")]
819        debug!(
820            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
821            savepoint.get_id(),
822            self.transaction_id
823        );
824        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
825        // the database
826        assert_eq!(self.mem.get_version(), savepoint.get_version());
827        self.dirty.store(true, Ordering::Release);
828
829        // Restoring a savepoint needs to accomplish the following:
830        // 1) restore the table tree. This is trivial, since we have the old root
831        // 1a) we also filter the freed tree to remove any pages referenced by the old root
832        // 2) free all pages that were allocated since the savepoint and are unreachable
833        //    from the restored table tree root. Here we diff the reachable pages from the old
834        //    and new roots
835        // 3) update the system tree to remove invalid persistent savepoints.
836
837        let old_system_tree = TableTree::new(
838            savepoint.get_system_root(),
839            PageHint::None,
840            self.transaction_guard.clone(),
841            self.mem.clone(),
842        )?;
843        let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
844            savepoint.get_freed_root(),
845            PageHint::None,
846            self.transaction_guard.clone(),
847            self.mem.clone(),
848        )?;
849
850        // Pages which are part of the system and freed trees in the savepoint, should be freed
851        // even after the savepoint is restored, because the system and freed trees only roll
852        // forward
853        let mut old_system_and_freed_pages = HashSet::new();
854        old_system_tree.visit_all_pages(|path| {
855            old_system_and_freed_pages.insert(path.page_number());
856            Ok(())
857        })?;
858        old_freed_tree.visit_all_pages(|path| {
859            old_system_and_freed_pages.insert(path.page_number());
860            Ok(())
861        })?;
862
863        // 1) restore the table tree
864        {
865            self.tables.lock().unwrap().table_tree = TableTreeMut::new(
866                savepoint.get_user_root(),
867                self.transaction_guard.clone(),
868                self.mem.clone(),
869                self.freed_pages.clone(),
870            );
871        }
872
873        // 1a) purge all transactions that happened after the savepoint from freed tree,
874        // except pages from the old system or freed tree in the savepoint. Those still need to be
875        // freed, since the system tree only rolls forward, never back. This brings all pages in the
876        // old data root back to the committed state
877        // This operation will also leak everything else that was allocated since the savepoint,
878        // but we fix that below -- noting that all the system trees that existed between the savepoint
879        // and now which might be referenced by other savepoints will become unreachable, since those
880        // savepoints are invalidated by this restoration
881        let mut txn_id = savepoint.get_transaction_id().next().raw_id();
882        let mut freed_tree = self.freed_tree.lock().unwrap();
883        loop {
884            let lower = FreedTableKey {
885                transaction_id: txn_id,
886                pagination_id: 0,
887            };
888
889            if freed_tree.range(&(lower..))?.next().is_none() {
890                break;
891            }
892            let lower = FreedTableKey {
893                transaction_id: txn_id,
894                pagination_id: 0,
895            };
896            let upper = FreedTableKey {
897                transaction_id: txn_id + 1,
898                pagination_id: 0,
899            };
900
901            // Find all the pending pages for this txn and filter them
902            let mut pending_pages = vec![];
903            for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
904                let item = entry?;
905                for i in 0..item.value().len() {
906                    let p = item.value().get(i);
907                    // Keep the old system and freed tree pages, but purge anything else
908                    if old_system_and_freed_pages.contains(&p) {
909                        pending_pages.push(p);
910                    }
911                }
912            }
913
914            let mut pagination_counter = 0u64;
915            while !pending_pages.is_empty() {
916                let chunk_size = 100;
917                let buffer_size = FreedPageList::required_bytes(chunk_size);
918                let key = FreedTableKey {
919                    transaction_id: txn_id,
920                    pagination_id: pagination_counter,
921                };
922                let mut access_guard =
923                    freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
924
925                let len = pending_pages.len();
926                access_guard.as_mut().clear();
927                for page in pending_pages.drain(len - min(len, chunk_size)..) {
928                    access_guard.as_mut().push_back(page);
929                }
930                drop(access_guard);
931
932                pagination_counter += 1;
933            }
934
935            txn_id += 1;
936        }
937
938        let mut current_system_and_freed_pages = HashSet::new();
939        self.system_tables
940            .lock()
941            .unwrap()
942            .table_tree
943            .visit_all_pages(|path| {
944                current_system_and_freed_pages.insert(path.page_number());
945                Ok(())
946            })?;
947        freed_tree.visit_all_pages(|path| {
948            current_system_and_freed_pages.insert(path.page_number());
949            Ok(())
950        })?;
951
952        let mut old_allocators: Vec<BuddyAllocator> = savepoint
953            .get_regional_allocators()
954            .iter()
955            .map(|data| BuddyAllocator::from_savepoint_state(data))
956            .collect();
957
958        // Find the oldest transaction in the current freed tree, for use below
959        {
960            let oldest_unprocessed_transaction =
961                if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
962                    entry?.key().transaction_id
963                } else {
964                    self.transaction_id.raw_id()
965                };
966
967            let lookup_key = FreedTableKey {
968                transaction_id: oldest_unprocessed_transaction,
969                pagination_id: 0,
970            };
971
972            // Replay all finalized frees into the old allocator state to ensure that a page which
973            // was pending free, freed, and then reallocated does not leak
974            for entry in old_freed_tree.range(&(..lookup_key))? {
975                let item = entry?;
976                let pages: FreedPageList = item.value();
977                for i in 0..pages.len() {
978                    let page = pages.get(i);
979                    assert!(old_allocators[page.region as usize]
980                        .is_allocated(page.page_index, page.page_order));
981                    old_allocators[page.region as usize].free(page.page_index, page.page_order);
982                }
983            }
984        }
985
986        // 2) free all pages that became unreachable
987        let mut freed_pages = self.freed_pages.lock().unwrap();
988        let mut already_awaiting_free: HashSet<PageNumber> = freed_pages.iter().copied().collect();
989        already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
990        let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
991        for page in to_free {
992            if already_awaiting_free.contains(&page) {
993                // Make sure that we don't double free something that is already going to be freed
994                continue;
995            }
996            if current_system_and_freed_pages.contains(&page) {
997                // Don't free pages which are part of the current system or freed tree, even though
998                // these pages are new. Again this is because these trees only move forward;
999                // never backwards as part of a savepoint restore
1000                continue;
1001            }
1002            if self.mem.uncommitted(page) {
1003                self.mem.free(page);
1004            } else {
1005                freed_pages.push(page);
1006            }
1007        }
1008        drop(freed_pages);
1009
1010        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1011        // from later trying to restore a savepoint "on another timeline"
1012        self.transaction_tracker
1013            .invalidate_savepoints_after(savepoint.get_id());
1014        for persistent_savepoint in self.list_persistent_savepoints()? {
1015            if persistent_savepoint > savepoint.get_id().0 {
1016                self.delete_persistent_savepoint(persistent_savepoint)?;
1017            }
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Set the desired durability level for writes made in this transaction
1024    /// Defaults to [`Durability::Immediate`]
1025    ///
1026    /// Will panic if the durability is reduced below `[Durability::Immediate]` after a persistent savepoint has been created or deleted.
1027    pub fn set_durability(&mut self, durability: Durability) {
1028        let no_created = self
1029            .created_persistent_savepoints
1030            .lock()
1031            .unwrap()
1032            .is_empty();
1033        let no_deleted = self
1034            .deleted_persistent_savepoints
1035            .lock()
1036            .unwrap()
1037            .is_empty();
1038        assert!(no_created && no_deleted);
1039
1040        self.durability = match durability {
1041            Durability::None => InternalDurability::None,
1042            Durability::Eventual => InternalDurability::Eventual,
1043            Durability::Immediate => InternalDurability::Immediate,
1044            #[allow(deprecated)]
1045            Durability::Paranoid => {
1046                self.set_two_phase_commit(true);
1047                InternalDurability::Immediate
1048            }
1049        };
1050    }
1051
1052    /// Enable or disable 2-phase commit (defaults to disabled)
1053    ///
1054    /// By default, data is written using the following 1-phase commit algorithm:
1055    ///
1056    /// 1. Update the inactive commit slot with the new database state
1057    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1058    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1059    ///
1060    /// All data is written with checksums. When opening the database after a crash, the most
1061    /// recent of the two commit slots with a valid checksum is used.
1062    ///
1063    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1064    /// function with close to perfect collision resistance when used with non-malicious input. An
1065    /// attacker with an extremely high degree of control over the database's workload, including
1066    /// the ability to cause the database process to crash, can cause invalid data to be written
1067    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1068    ///
1069    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1070    ///
1071    /// 1. Update the inactive commit slot with the new database state
1072    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1073    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1074    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1075    ///
1076    /// This mitigates a theoretical attack where an attacker who
1077    /// 1. can control the order in which pages are flushed to disk
1078    /// 2. can introduce crashes during `fsync`,
1079    /// 3. has knowledge of the database file contents, and
1080    /// 4. can include arbitrary data in a write transaction
1081    ///
1082    /// could cause a transaction to partially commit (some but not all of the data is written).
1083    /// This is described in the design doc in futher detail.
1084    ///
1085    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1086    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1087    /// a high degree of control over the database's workload, including the ability to cause the
1088    /// database process to crash, can cause the database to crash with the god byte primary bit
1089    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1090    /// controlled state.
1091    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1092        self.two_phase_commit = enabled;
1093    }
1094
1095    /// Enable or disable quick-repair (defaults to disabled)
1096    ///
1097    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1098    /// This involves walking the entire database to verify the checksums and reconstruct the
1099    /// allocator state, so it can be very slow if the database is large.
1100    ///
1101    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1102    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1103    /// (which guarantees that the primary commit slot is valid without needing to look at the
1104    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1105    pub fn set_quick_repair(&mut self, enabled: bool) {
1106        self.quick_repair = enabled;
1107    }
1108
1109    /// Open the given table
1110    ///
1111    /// The table will be created if it does not exist
1112    #[track_caller]
1113    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1114        &'txn self,
1115        definition: TableDefinition<K, V>,
1116    ) -> Result<Table<'txn, K, V>, TableError> {
1117        self.tables.lock().unwrap().open_table(self, definition)
1118    }
1119
1120    /// Open the given table
1121    ///
1122    /// The table will be created if it does not exist
1123    #[track_caller]
1124    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1125        &'txn self,
1126        definition: MultimapTableDefinition<K, V>,
1127    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1128        self.tables
1129            .lock()
1130            .unwrap()
1131            .open_multimap_table(self, definition)
1132    }
1133
1134    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1135        &self,
1136        name: &str,
1137        table: &BtreeMut<K, V>,
1138        length: u64,
1139    ) {
1140        self.tables.lock().unwrap().close_table(name, table, length);
1141    }
1142
1143    /// Delete the given table
1144    ///
1145    /// Returns a bool indicating whether the table existed
1146    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1147        let name = definition.name().to_string();
1148        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1149        drop(definition);
1150        self.tables.lock().unwrap().delete_table(self, &name)
1151    }
1152
1153    /// Delete the given table
1154    ///
1155    /// Returns a bool indicating whether the table existed
1156    pub fn delete_multimap_table(
1157        &self,
1158        definition: impl MultimapTableHandle,
1159    ) -> Result<bool, TableError> {
1160        let name = definition.name().to_string();
1161        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1162        drop(definition);
1163        self.tables
1164            .lock()
1165            .unwrap()
1166            .delete_multimap_table(self, &name)
1167    }
1168
1169    /// List all the tables
1170    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1171        self.tables
1172            .lock()
1173            .unwrap()
1174            .table_tree
1175            .list_tables(TableType::Normal)
1176            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1177    }
1178
1179    /// List all the multimap tables
1180    pub fn list_multimap_tables(
1181        &self,
1182    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1183        self.tables
1184            .lock()
1185            .unwrap()
1186            .table_tree
1187            .list_tables(TableType::Multimap)
1188            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1189    }
1190
1191    /// Commit the transaction
1192    ///
1193    /// All writes performed in this transaction will be visible to future transactions, and are
1194    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1195    pub fn commit(mut self) -> Result<(), CommitError> {
1196        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1197        self.completed = true;
1198        self.commit_inner()
1199    }
1200
1201    fn commit_inner(&mut self) -> Result<(), CommitError> {
1202        // Quick-repair requires 2-phase commit
1203        if self.quick_repair {
1204            self.two_phase_commit = true;
1205        }
1206
1207        #[cfg(feature = "logging")]
1208        debug!(
1209            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1210            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1211        );
1212        match self.durability {
1213            InternalDurability::None => self.non_durable_commit()?,
1214            InternalDurability::Eventual => self.durable_commit(true)?,
1215            InternalDurability::Immediate => self.durable_commit(false)?,
1216        }
1217
1218        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1219            self.transaction_tracker
1220                .deallocate_savepoint(*savepoint, *transaction);
1221        }
1222
1223        #[cfg(feature = "logging")]
1224        debug!(
1225            "Finished commit of transaction id={:?}",
1226            self.transaction_id
1227        );
1228
1229        Ok(())
1230    }
1231
1232    /// Abort the transaction
1233    ///
1234    /// All writes performed in this transaction will be rolled back
1235    pub fn abort(mut self) -> Result {
1236        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1237        self.completed = true;
1238        self.abort_inner()
1239    }
1240
1241    fn abort_inner(&mut self) -> Result {
1242        #[cfg(feature = "logging")]
1243        debug!("Aborting transaction id={:?}", self.transaction_id);
1244        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1245            match self.delete_persistent_savepoint(savepoint.0) {
1246                Ok(_) => {}
1247                Err(err) => match err {
1248                    SavepointError::InvalidSavepoint => {
1249                        unreachable!();
1250                    }
1251                    SavepointError::Storage(storage_err) => {
1252                        return Err(storage_err);
1253                    }
1254                },
1255            }
1256        }
1257        self.tables
1258            .lock()
1259            .unwrap()
1260            .table_tree
1261            .clear_table_root_updates();
1262        self.mem.rollback_uncommitted_writes()?;
1263        #[cfg(feature = "logging")]
1264        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1265        Ok(())
1266    }
1267
1268    pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1269        let free_until_transaction = self
1270            .transaction_tracker
1271            .oldest_live_read_transaction()
1272            .map_or(self.transaction_id, |x| x.next());
1273        self.process_freed_pages(free_until_transaction)?;
1274
1275        let user_root = self
1276            .tables
1277            .lock()
1278            .unwrap()
1279            .table_tree
1280            .flush_table_root_updates()?
1281            .finalize_dirty_checksums()?;
1282
1283        let mut system_tables = self.system_tables.lock().unwrap();
1284        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1285        system_tree
1286            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1287            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1288
1289        if self.quick_repair {
1290            system_tree.create_table_and_flush_table_root(
1291                ALLOCATOR_STATE_TABLE_NAME,
1292                |tree: &mut AllocatorStateTree| {
1293                    let mut pagination_counter = 0;
1294
1295                    loop {
1296                        let num_regions = self
1297                            .mem
1298                            .reserve_allocator_state(tree, self.transaction_id)?;
1299
1300                        // We can't free pages after the commit, because that would invalidate our
1301                        // saved allocator state. Everything needs to go through the transactional
1302                        // free mechanism
1303                        self.store_freed_pages(&mut pagination_counter, true)?;
1304
1305                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1306                            return Ok(());
1307                        }
1308
1309                        // Clear out the table before retrying, just in case the number of regions
1310                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1311                        // free the pages immediately -- we need to reuse those pages to guarantee
1312                        // that our retry loop will eventually terminate
1313                        while let Some(guards) = tree.last()? {
1314                            let key = guards.0.value();
1315                            drop(guards);
1316                            tree.remove(&key)?;
1317                        }
1318                    }
1319                },
1320            )?;
1321        } else {
1322            // If a savepoint exists it might reference the freed-tree, since it holds a reference to the
1323            // root of the freed-tree. Therefore, we must use the transactional free mechanism to free
1324            // those pages. If there are no save points then these can be immediately freed, which is
1325            // done at the end of this function.
1326            let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1327            self.store_freed_pages(&mut 0, savepoint_exists)?;
1328        }
1329
1330        let system_root = system_tree.finalize_dirty_checksums()?;
1331
1332        // Finalize freed table checksums, before doing the final commit
1333        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1334
1335        self.mem.commit(
1336            user_root,
1337            system_root,
1338            freed_root,
1339            self.transaction_id,
1340            eventual,
1341            self.two_phase_commit,
1342        )?;
1343
1344        // Mark any pending non-durable commits as fully committed.
1345        self.transaction_tracker.clear_pending_non_durable_commits();
1346
1347        // Immediately free the pages that were freed from the freed-tree itself. These are only
1348        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
1349        for page in self.post_commit_frees.lock().unwrap().drain(..) {
1350            self.mem.free(page);
1351        }
1352
1353        Ok(())
1354    }
1355
1356    // Commit without a durability guarantee
1357    pub(crate) fn non_durable_commit(&mut self) -> Result {
1358        let user_root = self
1359            .tables
1360            .lock()
1361            .unwrap()
1362            .table_tree
1363            .flush_table_root_updates()?
1364            .finalize_dirty_checksums()?;
1365
1366        let system_root = self
1367            .system_tables
1368            .lock()
1369            .unwrap()
1370            .table_tree
1371            .flush_table_root_updates()?
1372            .finalize_dirty_checksums()?;
1373
1374        // Store all freed pages for a future commit(), since we can't free pages during a
1375        // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
1376        self.store_freed_pages(&mut 0, true)?;
1377
1378        // Finalize all checksums, before doing the final commit
1379        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1380
1381        self.mem
1382            .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1383        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
1384        // are only processed after this has been persisted
1385        self.transaction_tracker
1386            .register_non_durable_commit(self.transaction_id);
1387        Ok(())
1388    }
1389
1390    // Relocate pages to lower number regions/pages
1391    // Returns true if a page(s) was moved
1392    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1393        let mut progress = false;
1394        // Relocate the region tracker page
1395        if self.mem.relocate_region_tracker()? {
1396            progress = true;
1397        }
1398
1399        // Find the 1M highest pages
1400        let mut highest_pages = BTreeMap::new();
1401        let mut tables = self.tables.lock().unwrap();
1402        let table_tree = &mut tables.table_tree;
1403        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1404        let mut system_tables = self.system_tables.lock().unwrap();
1405        let system_table_tree = &mut system_tables.table_tree;
1406        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1407
1408        // Calculate how many of them can be relocated to lower pages, starting from the last page
1409        let mut relocation_map = HashMap::new();
1410        for path in highest_pages.into_values().rev() {
1411            if relocation_map.contains_key(&path.page_number()) {
1412                continue;
1413            }
1414            let old_page = self.mem.get_page(path.page_number())?;
1415            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1416            let new_page_number = new_page.get_page_number();
1417            // We have to copy at least the page type into the new page.
1418            // Otherwise its cache priority will be calculated incorrectly
1419            new_page.memory_mut()[0] = old_page.memory()[0];
1420            drop(new_page);
1421            // We're able to move this to a lower page, so insert it and rewrite all its parents
1422            if new_page_number < path.page_number() {
1423                relocation_map.insert(path.page_number(), new_page_number);
1424                for parent in path.parents() {
1425                    if relocation_map.contains_key(parent) {
1426                        continue;
1427                    }
1428                    let old_parent = self.mem.get_page(*parent)?;
1429                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1430                    let new_page_number = new_page.get_page_number();
1431                    // We have to copy at least the page type into the new page.
1432                    // Otherwise its cache priority will be calculated incorrectly
1433                    new_page.memory_mut()[0] = old_parent.memory()[0];
1434                    drop(new_page);
1435                    relocation_map.insert(*parent, new_page_number);
1436                }
1437            } else {
1438                self.mem.free(new_page_number);
1439                break;
1440            }
1441        }
1442
1443        if !relocation_map.is_empty() {
1444            progress = true;
1445        }
1446
1447        table_tree.relocate_tables(&relocation_map)?;
1448        system_table_tree.relocate_tables(&relocation_map)?;
1449
1450        Ok(progress)
1451    }
1452
1453    // NOTE: must be called before store_freed_pages() during commit, since this can create
1454    // more pages freed by the current transaction
1455    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1456        // We assume below that PageNumber is length 8
1457        assert_eq!(PageNumber::serialized_size(), 8);
1458        let lookup_key = FreedTableKey {
1459            transaction_id: free_until.raw_id(),
1460            pagination_id: 0,
1461        };
1462
1463        let mut to_remove = vec![];
1464        let mut freed_tree = self.freed_tree.lock().unwrap();
1465        for entry in freed_tree.range(&(..lookup_key))? {
1466            let entry = entry?;
1467            to_remove.push(entry.key());
1468            let value = entry.value();
1469            for i in 0..value.len() {
1470                self.mem.free(value.get(i));
1471            }
1472        }
1473
1474        // Remove all the old transactions
1475        for key in to_remove {
1476            freed_tree.remove(&key)?;
1477        }
1478
1479        Ok(())
1480    }
1481
1482    fn store_freed_pages(
1483        &self,
1484        pagination_counter: &mut u64,
1485        include_post_commit_free: bool,
1486    ) -> Result {
1487        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
1488
1489        let mut freed_tree = self.freed_tree.lock().unwrap();
1490        if include_post_commit_free {
1491            // Move all the post-commit pages that came from the freed-tree. These need to be stored
1492            // since we can't free pages until a durable commit
1493            self.freed_pages
1494                .lock()
1495                .unwrap()
1496                .extend(self.post_commit_frees.lock().unwrap().drain(..));
1497        }
1498        while !self.freed_pages.lock().unwrap().is_empty() {
1499            let chunk_size = 100;
1500            let buffer_size = FreedPageList::required_bytes(chunk_size);
1501            let key = FreedTableKey {
1502                transaction_id: self.transaction_id.raw_id(),
1503                pagination_id: *pagination_counter,
1504            };
1505            let mut access_guard =
1506                freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1507
1508            let mut freed_pages = self.freed_pages.lock().unwrap();
1509            let len = freed_pages.len();
1510            access_guard.as_mut().clear();
1511            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1512                access_guard.as_mut().push_back(page);
1513            }
1514            drop(access_guard);
1515
1516            *pagination_counter += 1;
1517
1518            if include_post_commit_free {
1519                // Move all the post-commit pages that came from the freed-tree. These need to be stored
1520                // since we can't free pages until a durable commit
1521                freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1522            }
1523        }
1524
1525        Ok(())
1526    }
1527
1528    /// Retrieves information about storage usage in the database
1529    pub fn stats(&self) -> Result<DatabaseStats> {
1530        let tables = self.tables.lock().unwrap();
1531        let table_tree = &tables.table_tree;
1532        let data_tree_stats = table_tree.stats()?;
1533
1534        let system_tables = self.system_tables.lock().unwrap();
1535        let system_table_tree = &system_tables.table_tree;
1536        let system_tree_stats = system_table_tree.stats()?;
1537
1538        let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1539
1540        let total_metadata_bytes = data_tree_stats.metadata_bytes()
1541            + system_tree_stats.metadata_bytes
1542            + system_tree_stats.stored_leaf_bytes
1543            + freed_tree_stats.metadata_bytes
1544            + freed_tree_stats.stored_leaf_bytes;
1545        let total_fragmented = data_tree_stats.fragmented_bytes()
1546            + system_tree_stats.fragmented_bytes
1547            + freed_tree_stats.fragmented_bytes
1548            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1549
1550        Ok(DatabaseStats {
1551            tree_height: data_tree_stats.tree_height(),
1552            allocated_pages: self.mem.count_allocated_pages()?,
1553            leaf_pages: data_tree_stats.leaf_pages(),
1554            branch_pages: data_tree_stats.branch_pages(),
1555            stored_leaf_bytes: data_tree_stats.stored_bytes(),
1556            metadata_bytes: total_metadata_bytes,
1557            fragmented_bytes: total_fragmented,
1558            page_size: self.mem.get_page_size(),
1559        })
1560    }
1561
1562    #[cfg(any(test, fuzzing))]
1563    pub fn num_region_tracker_pages(&self) -> u64 {
1564        1 << self.mem.tracker_page().page_order
1565    }
1566
1567    #[allow(dead_code)]
1568    pub(crate) fn print_debug(&self) -> Result {
1569        // Flush any pending updates to make sure we get the latest root
1570        let mut tables = self.tables.lock().unwrap();
1571        if let Some(page) = tables
1572            .table_tree
1573            .flush_table_root_updates()
1574            .unwrap()
1575            .finalize_dirty_checksums()
1576            .unwrap()
1577        {
1578            eprintln!("Master tree:");
1579            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1580                Some(page),
1581                PageHint::None,
1582                self.transaction_guard.clone(),
1583                self.mem.clone(),
1584            )?;
1585            master_tree.print_debug(true)?;
1586        }
1587
1588        Ok(())
1589    }
1590}
1591
1592impl Drop for WriteTransaction {
1593    fn drop(&mut self) {
1594        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1595            #[allow(unused_variables)]
1596            if let Err(error) = self.abort_inner() {
1597                #[cfg(feature = "logging")]
1598                warn!("Failure automatically aborting transaction: {}", error);
1599            }
1600        }
1601    }
1602}
1603
1604/// A read-only transaction
1605///
1606/// Read-only transactions may exist concurrently with writes
1607pub struct ReadTransaction {
1608    mem: Arc<TransactionalMemory>,
1609    tree: TableTree,
1610}
1611
1612impl ReadTransaction {
1613    pub(crate) fn new(
1614        mem: Arc<TransactionalMemory>,
1615        guard: TransactionGuard,
1616    ) -> Result<Self, TransactionError> {
1617        let root_page = mem.get_data_root();
1618        let guard = Arc::new(guard);
1619        Ok(Self {
1620            mem: mem.clone(),
1621            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1622                .map_err(TransactionError::Storage)?,
1623        })
1624    }
1625
1626    /// Open the given table
1627    pub fn open_table<K: Key + 'static, V: Value + 'static>(
1628        &self,
1629        definition: TableDefinition<K, V>,
1630    ) -> Result<ReadOnlyTable<K, V>, TableError> {
1631        let header = self
1632            .tree
1633            .get_table::<K, V>(definition.name(), TableType::Normal)?
1634            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1635
1636        match header {
1637            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1638                definition.name().to_string(),
1639                table_root,
1640                PageHint::Clean,
1641                self.tree.transaction_guard().clone(),
1642                self.mem.clone(),
1643            )?),
1644            InternalTableDefinition::Multimap { .. } => unreachable!(),
1645        }
1646    }
1647
1648    /// Open the given table without a type
1649    pub fn open_untyped_table(
1650        &self,
1651        handle: impl TableHandle,
1652    ) -> Result<ReadOnlyUntypedTable, TableError> {
1653        let header = self
1654            .tree
1655            .get_table_untyped(handle.name(), TableType::Normal)?
1656            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1657
1658        match header {
1659            InternalTableDefinition::Normal {
1660                table_root,
1661                fixed_key_size,
1662                fixed_value_size,
1663                ..
1664            } => Ok(ReadOnlyUntypedTable::new(
1665                table_root,
1666                fixed_key_size,
1667                fixed_value_size,
1668                self.mem.clone(),
1669            )),
1670            InternalTableDefinition::Multimap { .. } => unreachable!(),
1671        }
1672    }
1673
1674    /// Open the given table
1675    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1676        &self,
1677        definition: MultimapTableDefinition<K, V>,
1678    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1679        let header = self
1680            .tree
1681            .get_table::<K, V>(definition.name(), TableType::Multimap)?
1682            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1683
1684        match header {
1685            InternalTableDefinition::Normal { .. } => unreachable!(),
1686            InternalTableDefinition::Multimap {
1687                table_root,
1688                table_length,
1689                ..
1690            } => Ok(ReadOnlyMultimapTable::new(
1691                table_root,
1692                table_length,
1693                PageHint::Clean,
1694                self.tree.transaction_guard().clone(),
1695                self.mem.clone(),
1696            )?),
1697        }
1698    }
1699
1700    /// Open the given table without a type
1701    pub fn open_untyped_multimap_table(
1702        &self,
1703        handle: impl MultimapTableHandle,
1704    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1705        let header = self
1706            .tree
1707            .get_table_untyped(handle.name(), TableType::Multimap)?
1708            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1709
1710        match header {
1711            InternalTableDefinition::Normal { .. } => unreachable!(),
1712            InternalTableDefinition::Multimap {
1713                table_root,
1714                table_length,
1715                fixed_key_size,
1716                fixed_value_size,
1717                ..
1718            } => Ok(ReadOnlyUntypedMultimapTable::new(
1719                table_root,
1720                table_length,
1721                fixed_key_size,
1722                fixed_value_size,
1723                self.mem.clone(),
1724            )),
1725        }
1726    }
1727
1728    /// List all the tables
1729    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1730        self.tree
1731            .list_tables(TableType::Normal)
1732            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1733    }
1734
1735    /// List all the multimap tables
1736    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1737        self.tree
1738            .list_tables(TableType::Multimap)
1739            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1740    }
1741
1742    /// Close the transaction
1743    ///
1744    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
1745    /// so this method does not normally need to be called.
1746    /// This method can be used to ensure that there are no outstanding objects remaining.
1747    ///
1748    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
1749    pub fn close(self) -> Result<(), TransactionError> {
1750        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1751            return Err(TransactionError::ReadTransactionStillInUse(self));
1752        }
1753        // No-op, just drop ourself
1754        Ok(())
1755    }
1756}
1757
1758impl Debug for ReadTransaction {
1759    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1760        f.write_str("ReadTransaction")
1761    }
1762}
1763
1764#[cfg(test)]
1765mod test {
1766    use crate::{Database, TableDefinition};
1767
1768    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1769
1770    #[test]
1771    fn transaction_id_persistence() {
1772        let tmpfile = crate::create_tempfile();
1773        let db = Database::create(tmpfile.path()).unwrap();
1774        let write_txn = db.begin_write().unwrap();
1775        {
1776            let mut table = write_txn.open_table(X).unwrap();
1777            table.insert("hello", "world").unwrap();
1778        }
1779        let first_txn_id = write_txn.transaction_id;
1780        write_txn.commit().unwrap();
1781        drop(db);
1782
1783        let db2 = Database::create(tmpfile.path()).unwrap();
1784        let write_txn = db2.begin_write().unwrap();
1785        assert!(write_txn.transaction_id > first_txn_id);
1786    }
1787}