redb/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, PageTrackerPolicy, RawBtree,
5    SerializedSavepoint, TableTree, TableTreeMut, TableType, TransactionalMemory,
6};
7use crate::types::{Key, Value};
8use crate::{
9    CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
10};
11use crate::{ReadTransaction, Result, WriteTransaction};
12use std::fmt::{Debug, Display, Formatter};
13
14use std::fs::{File, OpenOptions};
15use std::marker::PhantomData;
16use std::ops::RangeFull;
17use std::path::Path;
18use std::sync::{Arc, Mutex};
19use std::{io, thread};
20
21use crate::error::{TransactionError, UpgradeError};
22use crate::sealed::Sealed;
23use crate::transactions::{
24    ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
25    DATA_FREED_TABLE, PageList, SAVEPOINT_TABLE, SYSTEM_FREED_TABLE, SystemTableDefinition,
26    TransactionIdWithPagination,
27};
28use crate::tree_store::file_backend::FileBackend;
29#[cfg(feature = "logging")]
30use log::{debug, info, warn};
31
32#[allow(clippy::len_without_is_empty)]
33/// Implements persistent storage for a database.
34pub trait StorageBackend: 'static + Debug + Send + Sync {
35    /// Gets the current length of the storage.
36    fn len(&self) -> std::result::Result<u64, io::Error>;
37
38    /// Reads the specified array of bytes from the storage.
39    ///
40    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
41    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
42
43    /// Sets the length of the storage.
44    ///
45    /// When extending the storage the new positions should be zero initialized.
46    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
47
48    /// Syncs all buffered data with the persistent storage.
49    ///
50    /// If `eventual` is true, data may become persistent at some point after this call returns,
51    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
52    /// call to `sync_data()` will become persistent before any writes that occur after.
53    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
54
55    /// Writes the specified array to the storage.
56    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
57}
58
59pub trait TableHandle: Sealed {
60    // Returns the name of the table
61    fn name(&self) -> &str;
62}
63
64#[derive(Clone)]
65pub struct UntypedTableHandle {
66    name: String,
67}
68
69impl UntypedTableHandle {
70    pub(crate) fn new(name: String) -> Self {
71        Self { name }
72    }
73}
74
75impl TableHandle for UntypedTableHandle {
76    fn name(&self) -> &str {
77        &self.name
78    }
79}
80
81impl Sealed for UntypedTableHandle {}
82
83pub trait MultimapTableHandle: Sealed {
84    // Returns the name of the multimap table
85    fn name(&self) -> &str;
86}
87
88#[derive(Clone)]
89pub struct UntypedMultimapTableHandle {
90    name: String,
91}
92
93impl UntypedMultimapTableHandle {
94    pub(crate) fn new(name: String) -> Self {
95        Self { name }
96    }
97}
98
99impl MultimapTableHandle for UntypedMultimapTableHandle {
100    fn name(&self) -> &str {
101        &self.name
102    }
103}
104
105impl Sealed for UntypedMultimapTableHandle {}
106
107/// Defines the name and types of a table
108///
109/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
110///
111/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
112/// that is stored or retreived from the table
113pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
114    name: &'a str,
115    _key_type: PhantomData<K>,
116    _value_type: PhantomData<V>,
117}
118
119impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
120    /// Construct a new table with given `name`
121    ///
122    /// ## Invariant
123    ///
124    /// `name` must not be empty.
125    pub const fn new(name: &'a str) -> Self {
126        assert!(!name.is_empty());
127        Self {
128            name,
129            _key_type: PhantomData,
130            _value_type: PhantomData,
131        }
132    }
133}
134
135impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
136    fn name(&self) -> &str {
137        self.name
138    }
139}
140
141impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
142
143impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
144    fn clone(&self) -> Self {
145        *self
146    }
147}
148
149impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
150
151impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(
154            f,
155            "{}<{}, {}>",
156            self.name,
157            K::type_name().name(),
158            V::type_name().name()
159        )
160    }
161}
162
163/// Defines the name and types of a multimap table
164///
165/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
166///
167/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
168///
169/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
170/// that is stored or retreived from the table
171pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
172    name: &'a str,
173    _key_type: PhantomData<K>,
174    _value_type: PhantomData<V>,
175}
176
177impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
178    pub const fn new(name: &'a str) -> Self {
179        assert!(!name.is_empty());
180        Self {
181            name,
182            _key_type: PhantomData,
183            _value_type: PhantomData,
184        }
185    }
186}
187
188impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
189    fn name(&self) -> &str {
190        self.name
191    }
192}
193
194impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
195
196impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
197    fn clone(&self) -> Self {
198        *self
199    }
200}
201
202impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
203
204impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
205    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
206        write!(
207            f,
208            "{}<{}, {}>",
209            self.name,
210            K::type_name().name(),
211            V::type_name().name()
212        )
213    }
214}
215
216/// Information regarding the usage of the in-memory cache
217///
218/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
219#[derive(Debug)]
220pub struct CacheStats {
221    pub(crate) evictions: u64,
222}
223
224impl CacheStats {
225    /// Number of times that data has been evicted, due to the cache being full
226    ///
227    /// To increase the cache size use [`Builder::set_cache_size`]
228    pub fn evictions(&self) -> u64 {
229        self.evictions
230    }
231}
232
233pub(crate) struct TransactionGuard {
234    transaction_tracker: Option<Arc<TransactionTracker>>,
235    transaction_id: Option<TransactionId>,
236    write_transaction: bool,
237}
238
239impl TransactionGuard {
240    pub(crate) fn new_read(
241        transaction_id: TransactionId,
242        tracker: Arc<TransactionTracker>,
243    ) -> Self {
244        Self {
245            transaction_tracker: Some(tracker),
246            transaction_id: Some(transaction_id),
247            write_transaction: false,
248        }
249    }
250
251    pub(crate) fn new_write(
252        transaction_id: TransactionId,
253        tracker: Arc<TransactionTracker>,
254    ) -> Self {
255        Self {
256            transaction_tracker: Some(tracker),
257            transaction_id: Some(transaction_id),
258            write_transaction: true,
259        }
260    }
261
262    // TODO: remove this hack
263    pub(crate) fn fake() -> Self {
264        Self {
265            transaction_tracker: None,
266            transaction_id: None,
267            write_transaction: false,
268        }
269    }
270
271    pub(crate) fn id(&self) -> TransactionId {
272        self.transaction_id.unwrap()
273    }
274
275    pub(crate) fn leak(mut self) -> TransactionId {
276        self.transaction_id.take().unwrap()
277    }
278}
279
280impl Drop for TransactionGuard {
281    fn drop(&mut self) {
282        if self.transaction_tracker.is_none() {
283            return;
284        }
285        if let Some(transaction_id) = self.transaction_id {
286            if self.write_transaction {
287                self.transaction_tracker
288                    .as_ref()
289                    .unwrap()
290                    .end_write_transaction(transaction_id);
291            } else {
292                self.transaction_tracker
293                    .as_ref()
294                    .unwrap()
295                    .deallocate_read_transaction(transaction_id);
296            }
297        }
298    }
299}
300
301/// Opened redb database file
302///
303/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
304/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
305///
306/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
307/// may be in progress at a time.
308///
309/// # Examples
310///
311/// Basic usage:
312///
313/// ```rust
314/// use redb::*;
315/// # use tempfile::NamedTempFile;
316/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
317///
318/// # fn main() -> Result<(), Error> {
319/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
320/// # let filename = tmpfile.path();
321/// let db = Database::create(filename)?;
322/// let write_txn = db.begin_write()?;
323/// {
324///     let mut table = write_txn.open_table(TABLE)?;
325///     table.insert(&0, &0)?;
326/// }
327/// write_txn.commit()?;
328/// # Ok(())
329/// # }
330/// ```
331pub struct Database {
332    mem: Arc<TransactionalMemory>,
333    transaction_tracker: Arc<TransactionTracker>,
334}
335
336impl Database {
337    /// Opens the specified file as a redb database.
338    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
339    /// * if the file is a valid redb database, it will be opened
340    /// * otherwise this function will return an error
341    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
342        Self::builder().create(path)
343    }
344
345    /// Opens an existing redb database.
346    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
347        Self::builder().open(path)
348    }
349
350    /// Information regarding the usage of the in-memory cache
351    ///
352    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
353    pub fn cache_stats(&self) -> CacheStats {
354        self.mem.cache_stats()
355    }
356
357    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
358        self.mem.clone()
359    }
360
361    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
362        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
363        let fake_allocated = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
364        // TODO: seems like we should be able to use TableTree here, instead of TableTreeMut
365        let table_tree = TableTreeMut::new(
366            mem.get_data_root(),
367            Arc::new(TransactionGuard::fake()),
368            mem.clone(),
369            fake_freed_pages.clone(),
370            fake_allocated.clone(),
371        );
372        if !table_tree.verify_checksums()? {
373            return Ok(false);
374        }
375        // TODO: seems like we should be able to use TableTree here, instead of TableTreeMut
376        let system_table_tree = TableTreeMut::new(
377            mem.get_system_root(),
378            Arc::new(TransactionGuard::fake()),
379            mem.clone(),
380            fake_freed_pages.clone(),
381            fake_allocated.clone(),
382        );
383        if !system_table_tree.verify_checksums()? {
384            return Ok(false);
385        }
386        assert!(fake_freed_pages.lock().unwrap().is_empty());
387
388        if let Some(header) = mem.get_freed_root() {
389            if !RawBtree::new(
390                Some(header),
391                FreedTableKey::fixed_width(),
392                FreedPageList::fixed_width(),
393                mem.clone(),
394            )
395            .verify_checksum()?
396            {
397                return Ok(false);
398            }
399        }
400
401        Ok(true)
402    }
403
404    /// Force a check of the integrity of the database file, and repair it if possible.
405    ///
406    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
407    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
408    /// quite slow and should only be used when you suspect the database file may have been modified
409    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
410    ///
411    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
412    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
413    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
414        let allocator_hash = self.mem.allocator_hash();
415        let mut was_clean = Arc::get_mut(&mut self.mem)
416            .unwrap()
417            .clear_cache_and_reload()?;
418
419        let old_roots = [
420            self.mem.get_data_root(),
421            self.mem.get_system_root(),
422            self.mem.get_freed_root(),
423        ];
424
425        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
426            DatabaseError::Storage(storage_err) => storage_err,
427            _ => unreachable!(),
428        })?;
429
430        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
431            was_clean = false;
432        }
433
434        if !was_clean {
435            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
436            let [data_root, system_root, freed_root] = new_roots;
437            self.mem.commit(
438                data_root,
439                system_root,
440                freed_root,
441                next_transaction_id,
442                false,
443                true,
444            )?;
445        }
446
447        self.mem.begin_writable()?;
448
449        Ok(was_clean)
450    }
451
452    /// Upgrade the database file format
453    ///
454    /// Returns [`UpgradeError::PersistentSavepointExists`] or [`UpgradeError::EphemeralSavepointExists`]
455    /// if there are any persistent or ephemeral savepoints. Delete them and then try again.
456    ///
457    /// Returns `true` if an upgrade was performed, and `false` if the database is already using the
458    /// latest file format
459    pub fn upgrade(&mut self) -> Result<bool, UpgradeError> {
460        if self.mem.file_format_v3() {
461            return Ok(false);
462        }
463
464        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
465        if txn.list_persistent_savepoints()?.next().is_some() {
466            return Err(UpgradeError::PersistentSavepointExists);
467        }
468        txn.abort()?;
469
470        if self.transaction_tracker.any_savepoint_exists() {
471            return Err(UpgradeError::EphemeralSavepointExists);
472        }
473
474        if self.transaction_tracker.any_user_read_transaction() {
475            return Err(UpgradeError::TransactionInProgress);
476        }
477
478        // Commit to free up any pending free pages
479        // Use 2-phase commit to avoid any possible security issues.
480        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
481        txn.set_two_phase_commit(true);
482        txn.commit()?;
483        // Repeat, just in case executing list_persistent_savepoints() created a new table
484        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
485        txn.set_two_phase_commit(true);
486        txn.commit()?;
487        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
488        // should have been cleared out by the above commit()
489        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
490        assert!(!txn.pending_free_pages()?);
491        txn.abort()?;
492
493        Arc::get_mut(&mut self.mem).unwrap().upgrade_to_v3()?;
494
495        Ok(true)
496    }
497
498    /// Compacts the database file
499    ///
500    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
501    pub fn compact(&mut self) -> Result<bool, CompactionError> {
502        if self
503            .transaction_tracker
504            .oldest_live_read_transaction()
505            .is_some()
506        {
507            return Err(CompactionError::TransactionInProgress);
508        }
509        // Commit to free up any pending free pages
510        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
511        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
512        // can cancel the compaction without requiring a full repair afterwards
513        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
514        if txn.list_persistent_savepoints()?.next().is_some() {
515            return Err(CompactionError::PersistentSavepointExists);
516        }
517        if self.transaction_tracker.any_savepoint_exists() {
518            return Err(CompactionError::EphemeralSavepointExists);
519        }
520        txn.set_two_phase_commit(true);
521        txn.commit().map_err(|e| e.into_storage_error())?;
522        // Repeat, just in case executing list_persistent_savepoints() created a new table
523        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
524        txn.set_two_phase_commit(true);
525        txn.commit().map_err(|e| e.into_storage_error())?;
526        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
527        // should have been cleared out by the above commit()
528        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
529        assert!(!txn.pending_free_pages()?);
530        txn.abort()?;
531
532        let mut compacted = false;
533        // Iteratively compact until no progress is made
534        loop {
535            let mut progress = false;
536
537            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
538            if txn.compact_pages()? {
539                progress = true;
540                txn.commit().map_err(|e| e.into_storage_error())?;
541            } else {
542                txn.abort()?;
543            }
544
545            // Double commit to free up the relocated pages for reuse
546            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
547            txn.set_two_phase_commit(true);
548            txn.commit().map_err(|e| e.into_storage_error())?;
549            // Triple commit to free up the relocated pages for reuse
550            // TODO: this really shouldn't be necessary, but the data freed tree is a system table
551            // and so free'ing up its pages causes more deletes from the system tree
552            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
553            txn.set_two_phase_commit(true);
554            txn.commit().map_err(|e| e.into_storage_error())?;
555            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
556            assert!(!txn.pending_free_pages()?);
557            txn.abort()?;
558
559            if !progress {
560                break;
561            }
562
563            compacted = true;
564        }
565
566        Ok(compacted)
567    }
568
569    fn check_repaired_allocated_pages_table(
570        system_root: Option<BtreeHeader>,
571        mem: Arc<TransactionalMemory>,
572    ) -> Result {
573        let table_tree = TableTree::new(
574            system_root,
575            PageHint::None,
576            Arc::new(TransactionGuard::fake()),
577            mem.clone(),
578        )?;
579        if let Some(table_def) = table_tree
580            .get_table::<TransactionIdWithPagination, PageList>(
581                DATA_ALLOCATED_TABLE.name(),
582                TableType::Normal,
583            )
584            .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
585        {
586            let table_root = if let InternalTableDefinition::Normal { table_root, .. } = table_def {
587                table_root
588            } else {
589                unreachable!()
590            };
591            let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
592                DATA_ALLOCATED_TABLE.name().to_string(),
593                table_root,
594                PageHint::None,
595                Arc::new(TransactionGuard::fake()),
596                mem.clone(),
597            )?;
598            for result in table.range::<TransactionIdWithPagination>(..)? {
599                let (_, pages) = result?;
600                for i in 0..pages.value().len() {
601                    assert!(mem.is_allocated(pages.value().get(i)));
602                }
603            }
604        }
605
606        Ok(())
607    }
608
609    fn check_repaired_persistent_savepoints(
610        system_root: Option<BtreeHeader>,
611        mem: Arc<TransactionalMemory>,
612    ) -> Result {
613        let table_tree = TableTree::new(
614            system_root,
615            PageHint::None,
616            Arc::new(TransactionGuard::fake()),
617            mem.clone(),
618        )?;
619        let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
620        if let Some(savepoint_table_def) = table_tree
621            .get_table::<SavepointId, SerializedSavepoint>(
622                SAVEPOINT_TABLE.name(),
623                TableType::Normal,
624            )
625            .map_err(|e| {
626                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
627            })?
628        {
629            let savepoint_table_root =
630                if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
631                    table_root
632                } else {
633                    unreachable!()
634                };
635            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
636                ReadOnlyTable::new(
637                    "internal savepoint table".to_string(),
638                    savepoint_table_root,
639                    PageHint::None,
640                    Arc::new(TransactionGuard::fake()),
641                    mem.clone(),
642                )?;
643            for result in savepoint_table.range::<SavepointId>(..)? {
644                let (_, savepoint_data) = result?;
645                let savepoint = savepoint_data
646                    .value()
647                    .to_savepoint(fake_transaction_tracker.clone());
648                if let Some(header) = savepoint.get_user_root() {
649                    Self::check_pages_allocated_recursive(header.root, mem.clone())?;
650                }
651            }
652        }
653
654        Ok(())
655    }
656
657    fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
658        if let Some(header) = freed_root {
659            let freed_pages_iter = AllPageNumbersBtreeIter::new(
660                header.root,
661                FreedTableKey::fixed_width(),
662                FreedPageList::fixed_width(),
663                mem.clone(),
664            )?;
665            for page in freed_pages_iter {
666                mem.mark_page_allocated(page?);
667            }
668        }
669
670        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
671            "internal freed table".to_string(),
672            freed_root,
673            PageHint::None,
674            Arc::new(TransactionGuard::fake()),
675            mem.clone(),
676        )?;
677        for result in freed_table.range::<FreedTableKey>(..)? {
678            let (_, freed_page_list) = result?;
679            for i in 0..freed_page_list.value().len() {
680                mem.mark_page_allocated(freed_page_list.value().get(i));
681            }
682        }
683
684        Ok(())
685    }
686
687    fn mark_freed_tree_v2<K: Key, V: Value>(
688        system_root: Option<BtreeHeader>,
689        table_def: SystemTableDefinition<K, V>,
690        mem: Arc<TransactionalMemory>,
691    ) -> Result {
692        let fake_guard = Arc::new(TransactionGuard::fake());
693        let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
694        let table_name = table_def.name();
695        let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
696            Ok(result) => result,
697            Err(TableError::Storage(err)) => {
698                return Err(err);
699            }
700            Err(TableError::TableDoesNotExist(_)) => {
701                return Ok(());
702            }
703            Err(_) => {
704                return Err(StorageError::Corrupted(format!(
705                    "Unable to open {table_name}"
706                )));
707            }
708        };
709
710        if let Some(definition) = result {
711            let table_root = match definition {
712                InternalTableDefinition::Normal { table_root, .. } => table_root,
713                InternalTableDefinition::Multimap { .. } => unreachable!(),
714            };
715            let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
716                ReadOnlyTable::new(
717                    table_name.to_string(),
718                    table_root,
719                    PageHint::None,
720                    Arc::new(TransactionGuard::fake()),
721                    mem.clone(),
722                )?;
723            for result in table.range::<TransactionIdWithPagination>(..)? {
724                let (_, page_list) = result?;
725                for i in 0..page_list.value().len() {
726                    mem.mark_page_allocated(page_list.value().get(i));
727                }
728            }
729        }
730
731        Ok(())
732    }
733
734    fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
735        // Repair the allocator state
736        // All pages in the master table
737        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
738        for result in master_pages_iter {
739            let page = result?;
740            assert!(mem.is_allocated(page));
741        }
742
743        // Iterate over all other tables
744        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
745            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
746
747        // Chain all the other tables to the master table iter
748        for entry in iter {
749            let definition = entry?.value();
750            definition.visit_all_pages(mem.clone(), |path| {
751                assert!(mem.is_allocated(path.page_number()));
752                Ok(())
753            })?;
754        }
755
756        Ok(())
757    }
758
759    fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
760        // Repair the allocator state
761        // All pages in the master table
762        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
763        for page in master_pages_iter {
764            mem.mark_page_allocated(page?);
765        }
766
767        // Iterate over all other tables
768        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
769            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
770
771        // Chain all the other tables to the master table iter
772        for entry in iter {
773            let definition = entry?.value();
774            definition.visit_all_pages(mem.clone(), |path| {
775                mem.mark_page_allocated(path.page_number());
776                Ok(())
777            })?;
778        }
779
780        Ok(())
781    }
782
783    fn do_repair(
784        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
785        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
786    ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
787        if !Self::verify_primary_checksums(mem.clone())? {
788            if mem.used_two_phase_commit() {
789                return Err(DatabaseError::Storage(StorageError::Corrupted(
790                    "Primary is corrupted despite 2-phase commit".to_string(),
791                )));
792            }
793
794            // 0.3 because the repair takes 3 full scans and the first is done now
795            let mut handle = RepairSession::new(0.3);
796            repair_callback(&mut handle);
797            if handle.aborted() {
798                return Err(DatabaseError::RepairAborted);
799            }
800
801            mem.repair_primary_corrupted();
802            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
803            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
804            // that rolls back a partially committed transaction.
805            mem.clear_read_cache();
806            if !Self::verify_primary_checksums(mem.clone())? {
807                return Err(DatabaseError::Storage(StorageError::Corrupted(
808                    "Failed to repair database. All roots are corrupted".to_string(),
809                )));
810            }
811        }
812        // 0.6 because the repair takes 3 full scans and the second is done now
813        let mut handle = RepairSession::new(0.6);
814        repair_callback(&mut handle);
815        if handle.aborted() {
816            return Err(DatabaseError::RepairAborted);
817        }
818
819        mem.begin_repair()?;
820
821        let data_root = mem.get_data_root();
822        if let Some(header) = data_root {
823            Self::mark_tables_recursive(header.root, mem.clone())?;
824        }
825
826        let freed_root = mem.get_freed_root();
827        // Allow processing of all transactions, since this is the main freed tree
828        Self::mark_freed_tree(freed_root, mem.clone())?;
829        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
830            "internal freed table".to_string(),
831            freed_root,
832            PageHint::None,
833            Arc::new(TransactionGuard::fake()),
834            mem.clone(),
835        )?;
836        drop(freed_table);
837
838        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
839        let mut handle = RepairSession::new(0.9);
840        repair_callback(&mut handle);
841        if handle.aborted() {
842            return Err(DatabaseError::RepairAborted);
843        }
844
845        let system_root = mem.get_system_root();
846        if let Some(header) = system_root {
847            Self::mark_tables_recursive(header.root, mem.clone())?;
848        }
849
850        Self::mark_freed_tree_v2(system_root, DATA_FREED_TABLE, mem.clone())?;
851        Self::mark_freed_tree_v2(system_root, SYSTEM_FREED_TABLE, mem.clone())?;
852        #[cfg(debug_assertions)]
853        {
854            // Savepoints only reference pages from a previous commit, so those are either referenced
855            // in the current root, or are in the freed tree
856            Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
857
858            Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
859        }
860
861        mem.end_repair()?;
862
863        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
864        // by storing an empty root during the below commit()
865        mem.clear_read_cache();
866
867        Ok([data_root, system_root, freed_root])
868    }
869
870    #[allow(clippy::too_many_arguments)]
871    fn new(
872        file: Box<dyn StorageBackend>,
873        allow_initialize: bool,
874        page_size: usize,
875        region_size: Option<u64>,
876        read_cache_size_bytes: usize,
877        write_cache_size_bytes: usize,
878        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
879        default_to_file_format_v3: bool,
880    ) -> Result<Self, DatabaseError> {
881        #[cfg(feature = "logging")]
882        let file_path = format!("{:?}", &file);
883        #[cfg(feature = "logging")]
884        info!("Opening database {:?}", &file_path);
885        let mem = TransactionalMemory::new(
886            file,
887            allow_initialize,
888            page_size,
889            region_size,
890            read_cache_size_bytes,
891            write_cache_size_bytes,
892            default_to_file_format_v3,
893        )?;
894        let mut mem = Arc::new(mem);
895        // TODO: Seems like there should be a better way to structure this. We have a file format
896        // check here, which matches the InMemoryState::from_bytes behavior
897        if mem.needs_repair()? || mem.file_format_v3() {
898            // If the last transaction used 2-phase commit and updated the allocator state table, then
899            // we can just load the allocator state from there. Otherwise, we need a full repair
900            if let Some(tree) = Self::get_allocator_state_table(&mem)? {
901                #[cfg(feature = "logging")]
902                info!("Found valid allocator state, full repair not needed");
903                mem.load_allocator_state(&tree)?;
904            } else {
905                #[cfg(feature = "logging")]
906                warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
907                let mut handle = RepairSession::new(0.0);
908                repair_callback(&mut handle);
909                if handle.aborted() {
910                    return Err(DatabaseError::RepairAborted);
911                }
912                let [data_root, system_root, freed_root] =
913                    Self::do_repair(&mut mem, repair_callback)?;
914                let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
915                mem.commit(
916                    data_root,
917                    system_root,
918                    freed_root,
919                    next_transaction_id,
920                    false,
921                    true,
922                )?;
923            }
924        }
925
926        mem.begin_writable()?;
927        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
928
929        let db = Database {
930            mem,
931            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
932        };
933
934        // Restore the tracker state for any persistent savepoints
935        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
936        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
937            db.transaction_tracker
938                .restore_savepoint_counter_state(next_id);
939        }
940        for id in txn.list_persistent_savepoints()? {
941            let savepoint = match txn.get_persistent_savepoint(id) {
942                Ok(savepoint) => savepoint,
943                Err(err) => match err {
944                    SavepointError::InvalidSavepoint => unreachable!(),
945                    SavepointError::Storage(storage) => {
946                        return Err(storage.into());
947                    }
948                },
949            };
950            db.transaction_tracker
951                .register_persistent_savepoint(&savepoint);
952        }
953        txn.version_asserts()?;
954        txn.abort()?;
955
956        Ok(db)
957    }
958
959    fn get_allocator_state_table(
960        mem: &Arc<TransactionalMemory>,
961    ) -> Result<Option<AllocatorStateTree>> {
962        // The allocator state table is only valid if the primary was written using 2-phase commit
963        if !mem.used_two_phase_commit() {
964            return Ok(None);
965        }
966
967        // See if it's present in the system table tree
968        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
969        let fake_allocated = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
970        let system_table_tree = TableTreeMut::new(
971            mem.get_system_root(),
972            Arc::new(TransactionGuard::fake()),
973            mem.clone(),
974            fake_freed_pages.clone(),
975            fake_allocated.clone(),
976        );
977        let Some(allocator_state_table) = system_table_tree
978            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
979            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
980        else {
981            return Ok(None);
982        };
983
984        // Load the allocator state table
985        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
986            unreachable!();
987        };
988        let tree = AllocatorStateTree::new(
989            table_root,
990            Arc::new(TransactionGuard::fake()),
991            mem.clone(),
992            fake_freed_pages,
993            fake_allocated,
994        );
995
996        // Make sure this isn't stale allocator state left over from a previous transaction
997        if !mem.is_valid_allocator_state(&tree)? {
998            return Ok(None);
999        }
1000
1001        Ok(Some(tree))
1002    }
1003
1004    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1005        let id = self
1006            .transaction_tracker
1007            .register_read_transaction(&self.mem)?;
1008
1009        Ok(TransactionGuard::new_read(
1010            id,
1011            self.transaction_tracker.clone(),
1012        ))
1013    }
1014
1015    /// Convenience method for [`Builder::new`]
1016    pub fn builder() -> Builder {
1017        Builder::new()
1018    }
1019
1020    /// Begins a write transaction
1021    ///
1022    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
1023    /// write may be in progress at a time. If a write is in progress, this function will block
1024    /// until it completes.
1025    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1026        // Fail early if there has been an I/O error -- nothing can be committed in that case
1027        self.mem.check_io_errors()?;
1028        let guard = TransactionGuard::new_write(
1029            self.transaction_tracker.start_write_transaction(),
1030            self.transaction_tracker.clone(),
1031        );
1032        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1033            .map_err(|e| e.into())
1034    }
1035
1036    /// Begins a read transaction
1037    ///
1038    /// Captures a snapshot of the database, so that only data committed before calling this method
1039    /// is visible in the transaction
1040    ///
1041    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
1042    /// may exist concurrently with writes
1043    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
1044        let guard = self.allocate_read_transaction()?;
1045        #[cfg(feature = "logging")]
1046        debug!("Beginning read transaction id={:?}", guard.id());
1047        ReadTransaction::new(self.get_memory(), guard)
1048    }
1049
1050    fn ensure_allocator_state_table(&self) -> Result<(), Error> {
1051        // If the allocator state table is already up to date, we're done
1052        if Self::get_allocator_state_table(&self.mem)?.is_some() {
1053            return Ok(());
1054        }
1055
1056        // Make a new quick-repair commit to update the allocator state table
1057        #[cfg(feature = "logging")]
1058        debug!("Writing allocator state table");
1059        let mut tx = self.begin_write()?;
1060        tx.set_quick_repair(true);
1061        tx.commit()?;
1062
1063        Ok(())
1064    }
1065}
1066
1067impl Drop for Database {
1068    fn drop(&mut self) {
1069        if thread::panicking() {
1070            return;
1071        }
1072
1073        if self.ensure_allocator_state_table().is_err() {
1074            #[cfg(feature = "logging")]
1075            warn!("Failed to write allocator state table. Repair may be required at restart.")
1076        }
1077    }
1078}
1079
1080pub struct RepairSession {
1081    progress: f64,
1082    aborted: bool,
1083}
1084
1085impl RepairSession {
1086    pub(crate) fn new(progress: f64) -> Self {
1087        Self {
1088            progress,
1089            aborted: false,
1090        }
1091    }
1092
1093    pub(crate) fn aborted(&self) -> bool {
1094        self.aborted
1095    }
1096
1097    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
1098    pub fn abort(&mut self) {
1099        self.aborted = true;
1100    }
1101
1102    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
1103    pub fn progress(&self) -> f64 {
1104        self.progress
1105    }
1106}
1107
1108/// Configuration builder of a redb [Database].
1109pub struct Builder {
1110    page_size: usize,
1111    region_size: Option<u64>,
1112    read_cache_size_bytes: usize,
1113    write_cache_size_bytes: usize,
1114    repair_callback: Box<dyn Fn(&mut RepairSession)>,
1115    default_to_file_format_v3: bool,
1116}
1117
1118impl Builder {
1119    /// Construct a new [Builder] with sensible defaults.
1120    ///
1121    /// ## Defaults
1122    ///
1123    /// - `cache_size_bytes`: 1GiB
1124    #[allow(clippy::new_without_default)]
1125    pub fn new() -> Self {
1126        let mut result = Self {
1127            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
1128            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
1129            // It is part of the file format, so can be enabled in the future.
1130            page_size: PAGE_SIZE,
1131            region_size: None,
1132            // TODO: Default should probably take into account the total system memory
1133            read_cache_size_bytes: 0,
1134            // TODO: Default should probably take into account the total system memory
1135            write_cache_size_bytes: 0,
1136            repair_callback: Box::new(|_| {}),
1137            default_to_file_format_v3: false,
1138        };
1139
1140        result.set_cache_size(1024 * 1024 * 1024);
1141        result
1142    }
1143
1144    /// Create new databases in the v3 file format
1145    ///
1146    /// The v3 format is only supported in redb 2.6 and newer.
1147    ///
1148    /// Defaults to `false`
1149    pub fn create_with_file_format_v3(&mut self, value: bool) -> &mut Self {
1150        self.default_to_file_format_v3 = value;
1151        self
1152    }
1153
1154    /// Set a callback which will be invoked periodically in the event that the database file needs
1155    /// to be repaired.
1156    ///
1157    /// The [`RepairSession`] argument can be used to control the repair process.
1158    ///
1159    /// If the database file needs repair, the callback will be invoked at least once.
1160    /// There is no upper limit on the number of times it may be called.
1161    pub fn set_repair_callback(
1162        &mut self,
1163        callback: impl Fn(&mut RepairSession) + 'static,
1164    ) -> &mut Self {
1165        self.repair_callback = Box::new(callback);
1166        self
1167    }
1168
1169    /// Set the internal page size of the database
1170    ///
1171    /// Valid values are powers of two, greater than or equal to 512
1172    ///
1173    /// ## Defaults
1174    ///
1175    /// Default to 4 Kib pages.
1176    #[cfg(any(fuzzing, test))]
1177    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1178        assert!(size.is_power_of_two());
1179        self.page_size = std::cmp::max(size, 512);
1180        self
1181    }
1182
1183    /// Set the amount of memory (in bytes) used for caching data
1184    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1185        // TODO: allow dynamic expansion of the read/write cache
1186        self.read_cache_size_bytes = bytes / 10 * 9;
1187        self.write_cache_size_bytes = bytes / 10;
1188        self
1189    }
1190
1191    #[cfg(any(test, fuzzing))]
1192    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1193        assert!(size.is_power_of_two());
1194        self.region_size = Some(size);
1195        self
1196    }
1197
1198    /// Opens the specified file as a redb database.
1199    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1200    /// * if the file is a valid redb database, it will be opened
1201    /// * otherwise this function will return an error
1202    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1203        let file = OpenOptions::new()
1204            .read(true)
1205            .write(true)
1206            .create(true)
1207            .truncate(false)
1208            .open(path)?;
1209
1210        Database::new(
1211            Box::new(FileBackend::new(file)?),
1212            true,
1213            self.page_size,
1214            self.region_size,
1215            self.read_cache_size_bytes,
1216            self.write_cache_size_bytes,
1217            &self.repair_callback,
1218            self.default_to_file_format_v3,
1219        )
1220    }
1221
1222    /// Opens an existing redb database.
1223    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1224        let file = OpenOptions::new().read(true).write(true).open(path)?;
1225
1226        Database::new(
1227            Box::new(FileBackend::new(file)?),
1228            false,
1229            self.page_size,
1230            None,
1231            self.read_cache_size_bytes,
1232            self.write_cache_size_bytes,
1233            &self.repair_callback,
1234            self.default_to_file_format_v3,
1235        )
1236    }
1237
1238    /// Open an existing or create a new database in the given `file`.
1239    ///
1240    /// The file must be empty or contain a valid database.
1241    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1242        Database::new(
1243            Box::new(FileBackend::new(file)?),
1244            true,
1245            self.page_size,
1246            self.region_size,
1247            self.read_cache_size_bytes,
1248            self.write_cache_size_bytes,
1249            &self.repair_callback,
1250            self.default_to_file_format_v3,
1251        )
1252    }
1253
1254    /// Open an existing or create a new database with the given backend.
1255    pub fn create_with_backend(
1256        &self,
1257        backend: impl StorageBackend,
1258    ) -> Result<Database, DatabaseError> {
1259        Database::new(
1260            Box::new(backend),
1261            true,
1262            self.page_size,
1263            self.region_size,
1264            self.read_cache_size_bytes,
1265            self.write_cache_size_bytes,
1266            &self.repair_callback,
1267            self.default_to_file_format_v3,
1268        )
1269    }
1270}
1271
1272impl std::fmt::Debug for Database {
1273    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1274        f.debug_struct("Database").finish()
1275    }
1276}
1277
1278#[cfg(test)]
1279mod test {
1280    use crate::backends::FileBackend;
1281    use crate::{
1282        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1283        StorageError, TableDefinition, TransactionError,
1284    };
1285    use std::fs::File;
1286    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1287    use std::sync::Arc;
1288    use std::sync::atomic::{AtomicU64, Ordering};
1289
1290    #[derive(Debug)]
1291    struct FailingBackend {
1292        inner: FileBackend,
1293        countdown: Arc<AtomicU64>,
1294    }
1295
1296    impl FailingBackend {
1297        fn new(backend: FileBackend, countdown: u64) -> Self {
1298            Self {
1299                inner: backend,
1300                countdown: Arc::new(AtomicU64::new(countdown)),
1301            }
1302        }
1303
1304        fn check_countdown(&self) -> Result<(), std::io::Error> {
1305            if self.countdown.load(Ordering::SeqCst) == 0 {
1306                return Err(std::io::Error::from(ErrorKind::Other));
1307            }
1308
1309            Ok(())
1310        }
1311
1312        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1313            if self
1314                .countdown
1315                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1316                    if x > 0 { Some(x - 1) } else { None }
1317                })
1318                .is_err()
1319            {
1320                return Err(std::io::Error::from(ErrorKind::Other));
1321            }
1322
1323            Ok(())
1324        }
1325    }
1326
1327    impl StorageBackend for FailingBackend {
1328        fn len(&self) -> Result<u64, std::io::Error> {
1329            self.inner.len()
1330        }
1331
1332        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1333            self.check_countdown()?;
1334            self.inner.read(offset, len)
1335        }
1336
1337        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1338            self.inner.set_len(len)
1339        }
1340
1341        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1342            self.check_countdown()?;
1343            self.inner.sync_data(eventual)
1344        }
1345
1346        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1347            self.decrement_countdown()?;
1348            self.inner.write(offset, data)
1349        }
1350    }
1351
1352    #[test]
1353    fn crash_regression4() {
1354        let tmpfile = crate::create_tempfile();
1355        let (file, path) = tmpfile.into_parts();
1356
1357        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 24);
1358        let db = Database::builder()
1359            .set_cache_size(12686)
1360            .set_page_size(8 * 1024)
1361            .set_region_size(32 * 4096)
1362            .create_with_backend(backend)
1363            .unwrap();
1364
1365        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1366
1367        let tx = db.begin_write().unwrap();
1368        let _savepoint = tx.ephemeral_savepoint().unwrap();
1369        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1370        tx.commit().unwrap();
1371        let tx = db.begin_write().unwrap();
1372        {
1373            let mut table = tx.open_table(table_def).unwrap();
1374            let _ = table.insert_reserve(118821, 360).unwrap();
1375        }
1376        let result = tx.commit();
1377        assert!(result.is_err());
1378
1379        drop(db);
1380        Database::builder()
1381            .set_cache_size(1024 * 1024)
1382            .set_page_size(8 * 1024)
1383            .set_region_size(32 * 4096)
1384            .create(&path)
1385            .unwrap();
1386    }
1387
1388    #[test]
1389    fn transient_io_error() {
1390        let tmpfile = crate::create_tempfile();
1391        let (file, path) = tmpfile.into_parts();
1392
1393        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1394        let countdown = backend.countdown.clone();
1395        let db = Database::builder()
1396            .set_cache_size(0)
1397            .create_with_backend(backend)
1398            .unwrap();
1399
1400        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1401
1402        // Create some garbage
1403        let tx = db.begin_write().unwrap();
1404        {
1405            let mut table = tx.open_table(table_def).unwrap();
1406            table.insert(0, 0).unwrap();
1407        }
1408        tx.commit().unwrap();
1409        let tx = db.begin_write().unwrap();
1410        {
1411            let mut table = tx.open_table(table_def).unwrap();
1412            table.insert(0, 1).unwrap();
1413        }
1414        tx.commit().unwrap();
1415
1416        let tx = db.begin_write().unwrap();
1417        // Cause an error in the commit
1418        countdown.store(0, Ordering::SeqCst);
1419        let result = tx.commit().err().unwrap();
1420        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1421        let result = db.begin_write().err().unwrap();
1422        assert!(matches!(
1423            result,
1424            TransactionError::Storage(StorageError::PreviousIo)
1425        ));
1426        // Simulate a transient error
1427        countdown.store(u64::MAX, Ordering::SeqCst);
1428        drop(db);
1429
1430        // Check that recovery flag is set, even though the error has "cleared"
1431        let mut file = File::open(&path).unwrap();
1432        file.seek(SeekFrom::Start(9)).unwrap();
1433        let mut god_byte = vec![0u8];
1434        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1435        assert_ne!(god_byte[0] & 2, 0);
1436    }
1437
1438    #[test]
1439    fn small_pages() {
1440        let tmpfile = crate::create_tempfile();
1441
1442        let db = Database::builder()
1443            .set_page_size(512)
1444            .create(tmpfile.path())
1445            .unwrap();
1446
1447        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1448        let txn = db.begin_write().unwrap();
1449        {
1450            txn.open_table(table_definition).unwrap();
1451        }
1452        txn.commit().unwrap();
1453    }
1454
1455    #[test]
1456    fn small_pages2() {
1457        let tmpfile = crate::create_tempfile();
1458
1459        let db = Database::builder()
1460            .set_page_size(512)
1461            .create(tmpfile.path())
1462            .unwrap();
1463
1464        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1465
1466        let mut tx = db.begin_write().unwrap();
1467        tx.set_two_phase_commit(true);
1468        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1469        {
1470            tx.open_table(table_def).unwrap();
1471        }
1472        tx.commit().unwrap();
1473
1474        let mut tx = db.begin_write().unwrap();
1475        tx.set_two_phase_commit(true);
1476        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1477        tx.restore_savepoint(&savepoint0).unwrap();
1478        tx.set_durability(Durability::None);
1479        {
1480            let mut t = tx.open_table(table_def).unwrap();
1481            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1482            assert!(t.remove(&291295).unwrap().is_none());
1483        }
1484        tx.commit().unwrap();
1485
1486        let mut tx = db.begin_write().unwrap();
1487        tx.set_two_phase_commit(true);
1488        tx.restore_savepoint(&savepoint0).unwrap();
1489        {
1490            tx.open_table(table_def).unwrap();
1491        }
1492        tx.commit().unwrap();
1493
1494        let mut tx = db.begin_write().unwrap();
1495        tx.set_two_phase_commit(true);
1496        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1497        drop(savepoint0);
1498        tx.restore_savepoint(&savepoint2).unwrap();
1499        {
1500            let mut t = tx.open_table(table_def).unwrap();
1501            assert!(t.get(&2059).unwrap().is_none());
1502            assert!(t.remove(&145227).unwrap().is_none());
1503            assert!(t.remove(&145227).unwrap().is_none());
1504        }
1505        tx.commit().unwrap();
1506
1507        let mut tx = db.begin_write().unwrap();
1508        tx.set_two_phase_commit(true);
1509        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1510        drop(savepoint1);
1511        tx.restore_savepoint(&savepoint3).unwrap();
1512        {
1513            tx.open_table(table_def).unwrap();
1514        }
1515        tx.commit().unwrap();
1516
1517        let mut tx = db.begin_write().unwrap();
1518        tx.set_two_phase_commit(true);
1519        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1520        drop(savepoint2);
1521        tx.restore_savepoint(&savepoint3).unwrap();
1522        tx.set_durability(Durability::None);
1523        {
1524            let mut t = tx.open_table(table_def).unwrap();
1525            assert!(t.remove(&207936).unwrap().is_none());
1526        }
1527        tx.abort().unwrap();
1528
1529        let mut tx = db.begin_write().unwrap();
1530        tx.set_two_phase_commit(true);
1531        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1532        drop(savepoint3);
1533        assert!(tx.restore_savepoint(&savepoint4).is_err());
1534        {
1535            tx.open_table(table_def).unwrap();
1536        }
1537        tx.commit().unwrap();
1538
1539        let mut tx = db.begin_write().unwrap();
1540        tx.set_two_phase_commit(true);
1541        tx.restore_savepoint(&savepoint5).unwrap();
1542        tx.set_durability(Durability::None);
1543        {
1544            tx.open_table(table_def).unwrap();
1545        }
1546        tx.commit().unwrap();
1547    }
1548
1549    #[test]
1550    fn small_pages3() {
1551        let tmpfile = crate::create_tempfile();
1552
1553        let db = Database::builder()
1554            .set_page_size(1024)
1555            .create(tmpfile.path())
1556            .unwrap();
1557
1558        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1559
1560        let mut tx = db.begin_write().unwrap();
1561        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1562        tx.set_durability(Durability::None);
1563        {
1564            let mut t = tx.open_table(table_def).unwrap();
1565            let value = vec![0; 306];
1566            t.insert(&539717, value.as_slice()).unwrap();
1567        }
1568        tx.abort().unwrap();
1569
1570        let mut tx = db.begin_write().unwrap();
1571        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1572        tx.restore_savepoint(&savepoint1).unwrap();
1573        tx.set_durability(Durability::None);
1574        {
1575            let mut t = tx.open_table(table_def).unwrap();
1576            let value = vec![0; 2008];
1577            t.insert(&784384, value.as_slice()).unwrap();
1578        }
1579        tx.abort().unwrap();
1580    }
1581
1582    #[test]
1583    fn small_pages4() {
1584        let tmpfile = crate::create_tempfile();
1585
1586        let db = Database::builder()
1587            .set_cache_size(1024 * 1024)
1588            .set_page_size(1024)
1589            .create(tmpfile.path())
1590            .unwrap();
1591
1592        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1593
1594        let tx = db.begin_write().unwrap();
1595        {
1596            tx.open_table(table_def).unwrap();
1597        }
1598        tx.commit().unwrap();
1599
1600        let tx = db.begin_write().unwrap();
1601        {
1602            let mut t = tx.open_table(table_def).unwrap();
1603            assert!(t.get(&131072).unwrap().is_none());
1604            let value = vec![0xFF; 1130];
1605            t.insert(&42394, value.as_slice()).unwrap();
1606            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1607            assert!(t.get(&0).unwrap().is_none());
1608        }
1609        tx.abort().unwrap();
1610
1611        let tx = db.begin_write().unwrap();
1612        {
1613            let mut t = tx.open_table(table_def).unwrap();
1614            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1615        }
1616        tx.abort().unwrap();
1617    }
1618
1619    #[test]
1620    fn dynamic_shrink() {
1621        let tmpfile = crate::create_tempfile();
1622        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1623        let big_value = vec![0u8; 1024];
1624
1625        let db = Database::builder()
1626            .set_region_size(1024 * 1024)
1627            .create(tmpfile.path())
1628            .unwrap();
1629
1630        let txn = db.begin_write().unwrap();
1631        {
1632            let mut table = txn.open_table(table_definition).unwrap();
1633            for i in 0..2048 {
1634                table.insert(&i, big_value.as_slice()).unwrap();
1635            }
1636        }
1637        txn.commit().unwrap();
1638
1639        let file_size = tmpfile.as_file().metadata().unwrap().len();
1640
1641        let txn = db.begin_write().unwrap();
1642        {
1643            let mut table = txn.open_table(table_definition).unwrap();
1644            for i in 0..2048 {
1645                table.remove(&i).unwrap();
1646            }
1647        }
1648        txn.commit().unwrap();
1649
1650        // Perform a couple more commits to be sure the database has a chance to compact
1651        let txn = db.begin_write().unwrap();
1652        {
1653            let mut table = txn.open_table(table_definition).unwrap();
1654            table.insert(0, [].as_slice()).unwrap();
1655        }
1656        txn.commit().unwrap();
1657        let txn = db.begin_write().unwrap();
1658        {
1659            let mut table = txn.open_table(table_definition).unwrap();
1660            table.remove(0).unwrap();
1661        }
1662        txn.commit().unwrap();
1663        let txn = db.begin_write().unwrap();
1664        txn.commit().unwrap();
1665
1666        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1667        assert!(final_file_size < file_size);
1668    }
1669
1670    #[test]
1671    fn create_new_db_in_empty_file() {
1672        let tmpfile = crate::create_tempfile();
1673
1674        let _db = Database::builder()
1675            .create_file(tmpfile.into_file())
1676            .unwrap();
1677    }
1678
1679    #[test]
1680    fn open_missing_file() {
1681        let tmpfile = crate::create_tempfile();
1682
1683        let err = Database::builder()
1684            .open(tmpfile.path().with_extension("missing"))
1685            .unwrap_err();
1686
1687        match err {
1688            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1689            err => panic!("Unexpected error for empty file: {err}"),
1690        }
1691    }
1692
1693    #[test]
1694    fn open_empty_file() {
1695        let tmpfile = crate::create_tempfile();
1696
1697        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1698
1699        match err {
1700            DatabaseError::Storage(StorageError::Io(err))
1701                if err.kind() == ErrorKind::InvalidData => {}
1702            err => panic!("Unexpected error for empty file: {err}"),
1703        }
1704    }
1705}