redb/
db.rs

1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3    BtreeHeader, InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, ReadOnlyBackend,
4    ShrinkPolicy, TableTree, TableType, TransactionalMemory,
5};
6use crate::types::{Key, Value};
7use crate::{
8    CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22    ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
23    DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
24    TransactionIdWithPagination,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31/// Implements persistent storage for a database.
32pub trait StorageBackend: 'static + Debug + Send + Sync {
33    /// Gets the current length of the storage.
34    fn len(&self) -> std::result::Result<u64, io::Error>;
35
36    /// Reads the specified array of bytes from the storage.
37    ///
38    /// If `out.len()` + `offset` exceeds the length of the storage an appropriate `Error` must be returned.
39    fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41    /// Sets the length of the storage.
42    ///
43    /// New positions in the storage must be initialized to zero.
44    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46    /// Syncs all buffered data with the persistent storage.
47    fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49    /// Writes the specified array to the storage.
50    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52    /// Release any resources held by the backend
53    ///
54    /// Note: redb will not access the backend after calling this method and will call it exactly
55    /// once when the [`Database`] is dropped
56    fn close(&self) -> std::result::Result<(), io::Error> {
57        Ok(())
58    }
59}
60
61pub trait TableHandle: Sealed {
62    // Returns the name of the table
63    fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68    name: String,
69}
70
71impl UntypedTableHandle {
72    pub(crate) fn new(name: String) -> Self {
73        Self { name }
74    }
75}
76
77impl TableHandle for UntypedTableHandle {
78    fn name(&self) -> &str {
79        &self.name
80    }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86    // Returns the name of the multimap table
87    fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92    name: String,
93}
94
95impl UntypedMultimapTableHandle {
96    pub(crate) fn new(name: String) -> Self {
97        Self { name }
98    }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102    fn name(&self) -> &str {
103        &self.name
104    }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109/// Defines the name and types of a table
110///
111/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
112///
113/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
114/// that is stored or retreived from the table
115pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116    name: &'a str,
117    _key_type: PhantomData<K>,
118    _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122    /// Construct a new table with given `name`
123    ///
124    /// ## Invariant
125    ///
126    /// `name` must not be empty.
127    pub const fn new(name: &'a str) -> Self {
128        assert!(!name.is_empty());
129        Self {
130            name,
131            _key_type: PhantomData,
132            _value_type: PhantomData,
133        }
134    }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138    fn name(&self) -> &str {
139        self.name
140    }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146    fn clone(&self) -> Self {
147        *self
148    }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        write!(
156            f,
157            "{}<{}, {}>",
158            self.name,
159            K::type_name().name(),
160            V::type_name().name()
161        )
162    }
163}
164
165/// Defines the name and types of a multimap table
166///
167/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
168///
169/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
170///
171/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
172/// that is stored or retreived from the table
173pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174    name: &'a str,
175    _key_type: PhantomData<K>,
176    _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180    pub const fn new(name: &'a str) -> Self {
181        assert!(!name.is_empty());
182        Self {
183            name,
184            _key_type: PhantomData,
185            _value_type: PhantomData,
186        }
187    }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191    fn name(&self) -> &str {
192        self.name
193    }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199    fn clone(&self) -> Self {
200        *self
201    }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208        write!(
209            f,
210            "{}<{}, {}>",
211            self.name,
212            K::type_name().name(),
213            V::type_name().name()
214        )
215    }
216}
217
218/// Information regarding the usage of the in-memory cache
219///
220/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
221#[derive(Debug)]
222pub struct CacheStats {
223    pub(crate) evictions: u64,
224}
225
226impl CacheStats {
227    /// Number of times that data has been evicted, due to the cache being full
228    ///
229    /// To increase the cache size use [`Builder::set_cache_size`]
230    pub fn evictions(&self) -> u64 {
231        self.evictions
232    }
233}
234
235pub(crate) struct TransactionGuard {
236    transaction_tracker: Option<Arc<TransactionTracker>>,
237    transaction_id: Option<TransactionId>,
238    write_transaction: bool,
239}
240
241impl TransactionGuard {
242    pub(crate) fn new_read(
243        transaction_id: TransactionId,
244        tracker: Arc<TransactionTracker>,
245    ) -> Self {
246        Self {
247            transaction_tracker: Some(tracker),
248            transaction_id: Some(transaction_id),
249            write_transaction: false,
250        }
251    }
252
253    pub(crate) fn new_write(
254        transaction_id: TransactionId,
255        tracker: Arc<TransactionTracker>,
256    ) -> Self {
257        Self {
258            transaction_tracker: Some(tracker),
259            transaction_id: Some(transaction_id),
260            write_transaction: true,
261        }
262    }
263
264    // TODO: remove this hack
265    pub(crate) fn fake() -> Self {
266        Self {
267            transaction_tracker: None,
268            transaction_id: None,
269            write_transaction: false,
270        }
271    }
272
273    pub(crate) fn id(&self) -> TransactionId {
274        self.transaction_id.unwrap()
275    }
276
277    pub(crate) fn leak(mut self) -> TransactionId {
278        self.transaction_id.take().unwrap()
279    }
280}
281
282impl Drop for TransactionGuard {
283    fn drop(&mut self) {
284        if self.transaction_tracker.is_none() {
285            return;
286        }
287        if let Some(transaction_id) = self.transaction_id {
288            if self.write_transaction {
289                self.transaction_tracker
290                    .as_ref()
291                    .unwrap()
292                    .end_write_transaction(transaction_id);
293            } else {
294                self.transaction_tracker
295                    .as_ref()
296                    .unwrap()
297                    .deallocate_read_transaction(transaction_id);
298            }
299        }
300    }
301}
302
303pub trait ReadableDatabase {
304    /// Begins a read transaction
305    ///
306    /// Captures a snapshot of the database, so that only data committed before calling this method
307    /// is visible in the transaction
308    ///
309    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
310    /// may exist concurrently with writes
311    fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
312
313    /// Information regarding the usage of the in-memory cache
314    ///
315    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
316    fn cache_stats(&self) -> CacheStats;
317}
318
319/// A redb database opened in read-only mode
320///
321/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
322///
323/// Multiple processes may open a [`ReadOnlyDatabase`], but it may not be opened concurrently
324/// with a [`Database`].
325///
326/// # Examples
327///
328/// Basic usage:
329///
330/// ```rust
331/// use redb::*;
332/// # use tempfile::NamedTempFile;
333/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
334///
335/// # fn main() -> Result<(), Error> {
336/// # #[cfg(not(target_os = "wasi"))]
337/// # let tmpfile = NamedTempFile::new().unwrap();
338/// # #[cfg(target_os = "wasi")]
339/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
340/// # let filename = tmpfile.path();
341/// let db = Database::create(filename)?;
342/// let txn = db.begin_write()?;
343/// {
344///     let mut table = txn.open_table(TABLE)?;
345///     table.insert(&0, &0)?;
346/// }
347/// txn.commit()?;
348/// drop(db);
349///
350/// let db = ReadOnlyDatabase::open(filename)?;
351/// let txn = db.begin_read()?;
352/// {
353///     let mut table = txn.open_table(TABLE)?;
354///     println!("{}", table.get(&0)?.unwrap().value());
355/// }
356/// # Ok(())
357/// # }
358/// ```
359pub struct ReadOnlyDatabase {
360    mem: Arc<TransactionalMemory>,
361    transaction_tracker: Arc<TransactionTracker>,
362}
363
364impl ReadableDatabase for ReadOnlyDatabase {
365    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
366        let id = self
367            .transaction_tracker
368            .register_read_transaction(&self.mem)?;
369        #[cfg(feature = "logging")]
370        debug!("Beginning read transaction id={id:?}");
371
372        let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
373
374        ReadTransaction::new(self.mem.clone(), guard)
375    }
376
377    fn cache_stats(&self) -> CacheStats {
378        self.mem.cache_stats()
379    }
380}
381
382impl ReadOnlyDatabase {
383    /// Opens an existing redb database.
384    pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
385        Builder::new().open_read_only(path)
386    }
387
388    fn new(
389        file: Box<dyn StorageBackend>,
390        page_size: usize,
391        region_size: Option<u64>,
392        read_cache_size_bytes: usize,
393    ) -> Result<Self, DatabaseError> {
394        #[cfg(feature = "logging")]
395        let file_path = format!("{:?}", &file);
396        #[cfg(feature = "logging")]
397        info!("Opening database in read-only {:?}", &file_path);
398        let mem = TransactionalMemory::new(
399            Box::new(ReadOnlyBackend::new(file)),
400            false,
401            page_size,
402            region_size,
403            read_cache_size_bytes,
404            0,
405            true,
406        )?;
407        let mem = Arc::new(mem);
408        // If the last transaction used 2-phase commit and updated the allocator state table, then
409        // we can just load the allocator state from there. Otherwise, we need a full repair
410        if let Some(tree) = Database::get_allocator_state_table(&mem)? {
411            mem.load_allocator_state(&tree)?;
412        } else {
413            #[cfg(feature = "logging")]
414            warn!(
415                "Database {:?} not shutdown cleanly. Repair required",
416                &file_path
417            );
418            return Err(DatabaseError::RepairAborted);
419        }
420
421        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
422        let db = Self {
423            mem,
424            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
425        };
426
427        Ok(db)
428    }
429}
430
431/// Opened redb database file
432///
433/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
434/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
435///
436/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
437/// may be in progress at a time.
438///
439/// # Examples
440///
441/// Basic usage:
442///
443/// ```rust
444/// use redb::*;
445/// # use tempfile::NamedTempFile;
446/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
447///
448/// # fn main() -> Result<(), Error> {
449/// # #[cfg(not(target_os = "wasi"))]
450/// # let tmpfile = NamedTempFile::new().unwrap();
451/// # #[cfg(target_os = "wasi")]
452/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
453/// # let filename = tmpfile.path();
454/// let db = Database::create(filename)?;
455/// let write_txn = db.begin_write()?;
456/// {
457///     let mut table = write_txn.open_table(TABLE)?;
458///     table.insert(&0, &0)?;
459/// }
460/// write_txn.commit()?;
461/// # Ok(())
462/// # }
463/// ```
464pub struct Database {
465    mem: Arc<TransactionalMemory>,
466    transaction_tracker: Arc<TransactionTracker>,
467}
468
469impl ReadableDatabase for Database {
470    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
471        let guard = self.allocate_read_transaction()?;
472        #[cfg(feature = "logging")]
473        debug!("Beginning read transaction id={:?}", guard.id());
474        ReadTransaction::new(self.get_memory(), guard)
475    }
476
477    fn cache_stats(&self) -> CacheStats {
478        self.mem.cache_stats()
479    }
480}
481
482impl Database {
483    /// Opens the specified file as a redb database.
484    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
485    /// * if the file is a valid redb database, it will be opened
486    /// * otherwise this function will return an error
487    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
488        Self::builder().create(path)
489    }
490
491    /// Opens an existing redb database.
492    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
493        Self::builder().open(path)
494    }
495
496    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
497        self.mem.clone()
498    }
499
500    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
501        let table_tree = TableTree::new(
502            mem.get_data_root(),
503            PageHint::None,
504            Arc::new(TransactionGuard::fake()),
505            mem.clone(),
506        )?;
507        if !table_tree.verify_checksums()? {
508            return Ok(false);
509        }
510        let system_table_tree = TableTree::new(
511            mem.get_system_root(),
512            PageHint::None,
513            Arc::new(TransactionGuard::fake()),
514            mem.clone(),
515        )?;
516        if !system_table_tree.verify_checksums()? {
517            return Ok(false);
518        }
519
520        Ok(true)
521    }
522
523    /// Force a check of the integrity of the database file, and repair it if possible.
524    ///
525    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
526    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
527    /// quite slow and should only be used when you suspect the database file may have been modified
528    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
529    ///
530    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
531    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
532    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
533        let allocator_hash = self.mem.allocator_hash();
534        let mut was_clean = Arc::get_mut(&mut self.mem)
535            .unwrap()
536            .clear_cache_and_reload()?;
537
538        let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
539
540        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
541            DatabaseError::Storage(storage_err) => storage_err,
542            _ => unreachable!(),
543        })?;
544
545        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
546            was_clean = false;
547        }
548
549        if !was_clean {
550            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
551            let [data_root, system_root] = new_roots;
552            self.mem.commit(
553                data_root,
554                system_root,
555                next_transaction_id,
556                true,
557                ShrinkPolicy::Never,
558            )?;
559        }
560
561        self.mem.begin_writable()?;
562
563        Ok(was_clean)
564    }
565
566    /// Compacts the database file
567    ///
568    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
569    pub fn compact(&mut self) -> Result<bool, CompactionError> {
570        if self
571            .transaction_tracker
572            .oldest_live_read_transaction()
573            .is_some()
574        {
575            return Err(CompactionError::TransactionInProgress);
576        }
577        // Commit to free up any pending free pages
578        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
579        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
580        // can cancel the compaction without requiring a full repair afterwards
581        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
582        if txn.list_persistent_savepoints()?.next().is_some() {
583            return Err(CompactionError::PersistentSavepointExists);
584        }
585        if self.transaction_tracker.any_savepoint_exists() {
586            return Err(CompactionError::EphemeralSavepointExists);
587        }
588        txn.set_two_phase_commit(true);
589        txn.commit().map_err(|e| e.into_storage_error())?;
590        // Repeat, just in case executing list_persistent_savepoints() created a new table
591        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
592        txn.set_two_phase_commit(true);
593        txn.commit().map_err(|e| e.into_storage_error())?;
594        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
595        // should have been cleared out by the above commit()
596        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
597        assert!(!txn.pending_free_pages()?);
598        txn.abort()?;
599
600        let mut compacted = false;
601        // Iteratively compact until no progress is made
602        loop {
603            let mut progress = false;
604
605            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
606            if txn.compact_pages()? {
607                progress = true;
608                txn.commit().map_err(|e| e.into_storage_error())?;
609            } else {
610                txn.abort()?;
611            }
612
613            // Double commit to free up the relocated pages for reuse
614            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
615            txn.set_two_phase_commit(true);
616            // Also shrink the database file by the maximum amount
617            txn.set_shrink_policy(ShrinkPolicy::Maximum);
618            txn.commit().map_err(|e| e.into_storage_error())?;
619            // Triple commit to free up the relocated pages for reuse
620            // TODO: this really shouldn't be necessary, but the data freed tree is a system table
621            // and so free'ing up its pages causes more deletes from the system tree
622            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
623            txn.set_two_phase_commit(true);
624            // Also shrink the database file by the maximum amount
625            txn.set_shrink_policy(ShrinkPolicy::Maximum);
626            txn.commit().map_err(|e| e.into_storage_error())?;
627            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
628            assert!(!txn.pending_free_pages()?);
629            txn.abort()?;
630
631            if !progress {
632                break;
633            }
634
635            compacted = true;
636        }
637
638        Ok(compacted)
639    }
640
641    #[cfg_attr(not(debug_assertions), expect(dead_code))]
642    fn check_repaired_allocated_pages_table(
643        system_root: Option<BtreeHeader>,
644        mem: Arc<TransactionalMemory>,
645    ) -> Result {
646        let table_tree = TableTree::new(
647            system_root,
648            PageHint::None,
649            Arc::new(TransactionGuard::fake()),
650            mem.clone(),
651        )?;
652        if let Some(table_def) = table_tree
653            .get_table::<TransactionIdWithPagination, PageList>(
654                DATA_ALLOCATED_TABLE.name(),
655                TableType::Normal,
656            )
657            .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
658        {
659            let InternalTableDefinition::Normal { table_root, .. } = table_def else {
660                unreachable!()
661            };
662            let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
663                DATA_ALLOCATED_TABLE.name().to_string(),
664                table_root,
665                PageHint::None,
666                Arc::new(TransactionGuard::fake()),
667                mem.clone(),
668            )?;
669            for result in table.range::<TransactionIdWithPagination>(..)? {
670                let (_, pages) = result?;
671                for i in 0..pages.value().len() {
672                    assert!(mem.is_allocated(pages.value().get(i)));
673                }
674            }
675        }
676
677        Ok(())
678    }
679
680    fn visit_freed_tree<K: Key, V: Value, F>(
681        system_root: Option<BtreeHeader>,
682        table_def: SystemTableDefinition<K, V>,
683        mem: Arc<TransactionalMemory>,
684        mut visitor: F,
685    ) -> Result
686    where
687        F: FnMut(PageNumber) -> Result,
688    {
689        let fake_guard = Arc::new(TransactionGuard::fake());
690        let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
691        let table_name = table_def.name();
692        let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
693            Ok(result) => result,
694            Err(TableError::Storage(err)) => {
695                return Err(err);
696            }
697            Err(TableError::TableDoesNotExist(_)) => {
698                return Ok(());
699            }
700            Err(_) => {
701                return Err(StorageError::Corrupted(format!(
702                    "Unable to open {table_name}"
703                )));
704            }
705        };
706
707        if let Some(definition) = result {
708            let table_root = match definition {
709                InternalTableDefinition::Normal { table_root, .. } => table_root,
710                InternalTableDefinition::Multimap { .. } => unreachable!(),
711            };
712            let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
713                ReadOnlyTable::new(
714                    table_name.to_string(),
715                    table_root,
716                    PageHint::None,
717                    Arc::new(TransactionGuard::fake()),
718                    mem.clone(),
719                )?;
720            for result in table.range::<TransactionIdWithPagination>(..)? {
721                let (_, page_list) = result?;
722                for i in 0..page_list.value().len() {
723                    visitor(page_list.value().get(i))?;
724                }
725            }
726        }
727
728        Ok(())
729    }
730
731    #[cfg(debug_assertions)]
732    fn mark_allocated_page_for_debug(
733        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
734    ) -> Result {
735        let data_root = mem.get_data_root();
736        {
737            let fake = Arc::new(TransactionGuard::fake());
738            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
739            tables.visit_all_pages(|path| {
740                mem.mark_debug_allocated_page(path.page_number());
741                Ok(())
742            })?;
743        }
744
745        let system_root = mem.get_system_root();
746        {
747            let fake = Arc::new(TransactionGuard::fake());
748            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
749            system_tables.visit_all_pages(|path| {
750                mem.mark_debug_allocated_page(path.page_number());
751                Ok(())
752            })?;
753        }
754
755        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
756            mem.mark_debug_allocated_page(page);
757            Ok(())
758        })?;
759        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
760            mem.mark_debug_allocated_page(page);
761            Ok(())
762        })?;
763
764        Ok(())
765    }
766
767    fn do_repair(
768        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
769        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
770    ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
771        if !Self::verify_primary_checksums(mem.clone())? {
772            if mem.used_two_phase_commit() {
773                return Err(DatabaseError::Storage(StorageError::Corrupted(
774                    "Primary is corrupted despite 2-phase commit".to_string(),
775                )));
776            }
777
778            // 0.3 because the repair takes 3 full scans and the first is done now
779            let mut handle = RepairSession::new(0.3);
780            repair_callback(&mut handle);
781            if handle.aborted() {
782                return Err(DatabaseError::RepairAborted);
783            }
784
785            mem.repair_primary_corrupted();
786            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
787            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
788            // that rolls back a partially committed transaction.
789            mem.clear_read_cache();
790            if !Self::verify_primary_checksums(mem.clone())? {
791                return Err(DatabaseError::Storage(StorageError::Corrupted(
792                    "Failed to repair database. All roots are corrupted".to_string(),
793                )));
794            }
795        }
796        // 0.6 because the repair takes 3 full scans and the second is done now
797        let mut handle = RepairSession::new(0.6);
798        repair_callback(&mut handle);
799        if handle.aborted() {
800            return Err(DatabaseError::RepairAborted);
801        }
802
803        mem.begin_repair()?;
804
805        let data_root = mem.get_data_root();
806        {
807            let fake = Arc::new(TransactionGuard::fake());
808            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
809            tables.visit_all_pages(|path| {
810                mem.mark_page_allocated(path.page_number());
811                Ok(())
812            })?;
813        }
814
815        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
816        let mut handle = RepairSession::new(0.9);
817        repair_callback(&mut handle);
818        if handle.aborted() {
819            return Err(DatabaseError::RepairAborted);
820        }
821
822        let system_root = mem.get_system_root();
823        {
824            let fake = Arc::new(TransactionGuard::fake());
825            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
826            system_tables.visit_all_pages(|path| {
827                mem.mark_page_allocated(path.page_number());
828                Ok(())
829            })?;
830        }
831
832        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
833            mem.mark_page_allocated(page);
834            Ok(())
835        })?;
836        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
837            mem.mark_page_allocated(page);
838            Ok(())
839        })?;
840        #[cfg(debug_assertions)]
841        {
842            Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
843        }
844
845        mem.end_repair()?;
846
847        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
848        // by storing an empty root during the below commit()
849        mem.clear_read_cache();
850
851        Ok([data_root, system_root])
852    }
853
854    #[allow(clippy::too_many_arguments)]
855    fn new(
856        file: Box<dyn StorageBackend>,
857        allow_initialize: bool,
858        page_size: usize,
859        region_size: Option<u64>,
860        read_cache_size_bytes: usize,
861        write_cache_size_bytes: usize,
862        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
863    ) -> Result<Self, DatabaseError> {
864        #[cfg(feature = "logging")]
865        let file_path = format!("{:?}", &file);
866        #[cfg(feature = "logging")]
867        info!("Opening database {:?}", &file_path);
868        let mem = TransactionalMemory::new(
869            file,
870            allow_initialize,
871            page_size,
872            region_size,
873            read_cache_size_bytes,
874            write_cache_size_bytes,
875            false,
876        )?;
877        let mut mem = Arc::new(mem);
878        // If the last transaction used 2-phase commit and updated the allocator state table, then
879        // we can just load the allocator state from there. Otherwise, we need a full repair
880        if let Some(tree) = Self::get_allocator_state_table(&mem)? {
881            #[cfg(feature = "logging")]
882            info!("Found valid allocator state, full repair not needed");
883            mem.load_allocator_state(&tree)?;
884            #[cfg(debug_assertions)]
885            Self::mark_allocated_page_for_debug(&mut mem)?;
886        } else {
887            #[cfg(feature = "logging")]
888            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
889            let mut handle = RepairSession::new(0.0);
890            repair_callback(&mut handle);
891            if handle.aborted() {
892                return Err(DatabaseError::RepairAborted);
893            }
894            let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
895            let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
896            mem.commit(
897                data_root,
898                system_root,
899                next_transaction_id,
900                true,
901                ShrinkPolicy::Never,
902            )?;
903        }
904
905        mem.begin_writable()?;
906        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
907
908        let db = Database {
909            mem,
910            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
911        };
912
913        // Restore the tracker state for any persistent savepoints
914        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
915        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
916            db.transaction_tracker
917                .restore_savepoint_counter_state(next_id);
918        }
919        for id in txn.list_persistent_savepoints()? {
920            let savepoint = match txn.get_persistent_savepoint(id) {
921                Ok(savepoint) => savepoint,
922                Err(err) => match err {
923                    SavepointError::InvalidSavepoint => unreachable!(),
924                    SavepointError::Storage(storage) => {
925                        return Err(storage.into());
926                    }
927                },
928            };
929            db.transaction_tracker
930                .register_persistent_savepoint(&savepoint);
931        }
932        txn.abort()?;
933
934        Ok(db)
935    }
936
937    fn get_allocator_state_table(
938        mem: &Arc<TransactionalMemory>,
939    ) -> Result<Option<AllocatorStateTree>> {
940        // The allocator state table is only valid if the primary was written using 2-phase commit
941        if !mem.used_two_phase_commit() {
942            return Ok(None);
943        }
944
945        // See if it's present in the system table tree
946        let system_table_tree = TableTree::new(
947            mem.get_system_root(),
948            PageHint::None,
949            Arc::new(TransactionGuard::fake()),
950            mem.clone(),
951        )?;
952        let Some(allocator_state_table) = system_table_tree
953            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
954            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
955        else {
956            return Ok(None);
957        };
958
959        // Load the allocator state table
960        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
961            unreachable!();
962        };
963        let tree = AllocatorStateTree::new(
964            table_root,
965            PageHint::None,
966            Arc::new(TransactionGuard::fake()),
967            mem.clone(),
968        )?;
969
970        // Make sure this isn't stale allocator state left over from a previous transaction
971        if !mem.is_valid_allocator_state(&tree)? {
972            return Ok(None);
973        }
974
975        Ok(Some(tree))
976    }
977
978    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
979        let id = self
980            .transaction_tracker
981            .register_read_transaction(&self.mem)?;
982
983        Ok(TransactionGuard::new_read(
984            id,
985            self.transaction_tracker.clone(),
986        ))
987    }
988
989    /// Convenience method for [`Builder::new`]
990    pub fn builder() -> Builder {
991        Builder::new()
992    }
993
994    /// Begins a write transaction
995    ///
996    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
997    /// write may be in progress at a time. If a write is in progress, this function will block
998    /// until it completes.
999    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1000        // Fail early if there has been an I/O error -- nothing can be committed in that case
1001        self.mem.check_io_errors()?;
1002        let guard = TransactionGuard::new_write(
1003            self.transaction_tracker.start_write_transaction(),
1004            self.transaction_tracker.clone(),
1005        );
1006        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1007            .map_err(|e| e.into())
1008    }
1009
1010    fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1011        // Make a new quick-repair commit to update the allocator state table
1012        #[cfg(feature = "logging")]
1013        debug!("Writing allocator state table");
1014        let mut tx = self.begin_write()?;
1015        tx.set_quick_repair(true);
1016        tx.set_shrink_policy(ShrinkPolicy::Maximum);
1017        tx.commit()?;
1018
1019        Ok(())
1020    }
1021}
1022
1023impl Drop for Database {
1024    fn drop(&mut self) {
1025        if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1026            #[cfg(feature = "logging")]
1027            warn!("Failed to write allocator state table. Repair may be required at restart.");
1028        }
1029
1030        if self.mem.close().is_err() {
1031            #[cfg(feature = "logging")]
1032            warn!("Failed to flush database file. Repair may be required at restart.");
1033        }
1034    }
1035}
1036
1037pub struct RepairSession {
1038    progress: f64,
1039    aborted: bool,
1040}
1041
1042impl RepairSession {
1043    pub(crate) fn new(progress: f64) -> Self {
1044        Self {
1045            progress,
1046            aborted: false,
1047        }
1048    }
1049
1050    pub(crate) fn aborted(&self) -> bool {
1051        self.aborted
1052    }
1053
1054    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
1055    pub fn abort(&mut self) {
1056        self.aborted = true;
1057    }
1058
1059    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
1060    pub fn progress(&self) -> f64 {
1061        self.progress
1062    }
1063}
1064
1065/// Configuration builder of a redb [Database].
1066pub struct Builder {
1067    page_size: usize,
1068    region_size: Option<u64>,
1069    read_cache_size_bytes: usize,
1070    write_cache_size_bytes: usize,
1071    repair_callback: Box<dyn Fn(&mut RepairSession)>,
1072}
1073
1074impl Builder {
1075    /// Construct a new [Builder] with sensible defaults.
1076    ///
1077    /// ## Defaults
1078    ///
1079    /// - `cache_size_bytes`: 1GiB
1080    #[allow(clippy::new_without_default)]
1081    pub fn new() -> Self {
1082        let mut result = Self {
1083            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
1084            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
1085            // It is part of the file format, so can be enabled in the future.
1086            page_size: PAGE_SIZE,
1087            region_size: None,
1088            // TODO: Default should probably take into account the total system memory
1089            read_cache_size_bytes: 0,
1090            // TODO: Default should probably take into account the total system memory
1091            write_cache_size_bytes: 0,
1092            repair_callback: Box::new(|_| {}),
1093        };
1094
1095        result.set_cache_size(1024 * 1024 * 1024);
1096        result
1097    }
1098
1099    /// Set a callback which will be invoked periodically in the event that the database file needs
1100    /// to be repaired.
1101    ///
1102    /// The [`RepairSession`] argument can be used to control the repair process.
1103    ///
1104    /// If the database file needs repair, the callback will be invoked at least once.
1105    /// There is no upper limit on the number of times it may be called.
1106    pub fn set_repair_callback(
1107        &mut self,
1108        callback: impl Fn(&mut RepairSession) + 'static,
1109    ) -> &mut Self {
1110        self.repair_callback = Box::new(callback);
1111        self
1112    }
1113
1114    /// Set the internal page size of the database
1115    ///
1116    /// Valid values are powers of two, greater than or equal to 512
1117    ///
1118    /// ## Defaults
1119    ///
1120    /// Default to 4 Kib pages.
1121    #[cfg(any(fuzzing, test))]
1122    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1123        assert!(size.is_power_of_two());
1124        self.page_size = std::cmp::max(size, 512);
1125        self
1126    }
1127
1128    /// Set the amount of memory (in bytes) used for caching data
1129    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1130        // TODO: allow dynamic expansion of the read/write cache
1131        self.read_cache_size_bytes = bytes / 10 * 9;
1132        self.write_cache_size_bytes = bytes / 10;
1133        self
1134    }
1135
1136    #[cfg(any(test, fuzzing))]
1137    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1138        assert!(size.is_power_of_two());
1139        self.region_size = Some(size);
1140        self
1141    }
1142
1143    /// Opens the specified file as a redb database.
1144    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1145    /// * if the file is a valid redb database, it will be opened
1146    /// * otherwise this function will return an error
1147    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1148        let file = OpenOptions::new()
1149            .read(true)
1150            .write(true)
1151            .create(true)
1152            .truncate(false)
1153            .open(path)?;
1154
1155        Database::new(
1156            Box::new(FileBackend::new(file)?),
1157            true,
1158            self.page_size,
1159            self.region_size,
1160            self.read_cache_size_bytes,
1161            self.write_cache_size_bytes,
1162            &self.repair_callback,
1163        )
1164    }
1165
1166    /// Opens an existing redb database.
1167    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1168        let file = OpenOptions::new().read(true).write(true).open(path)?;
1169
1170        Database::new(
1171            Box::new(FileBackend::new(file)?),
1172            false,
1173            self.page_size,
1174            None,
1175            self.read_cache_size_bytes,
1176            self.write_cache_size_bytes,
1177            &self.repair_callback,
1178        )
1179    }
1180
1181    /// Opens an existing redb database.
1182    ///
1183    /// If the file has been opened for writing (i.e. as a [`Database`]) [`DatabaseError::DatabaseAlreadyOpen`]
1184    /// will be returned on platforms which support file locks (macOS, Windows, Linux). On other platforms,
1185    /// the caller MUST avoid calling this method when the database is open for writing.
1186    pub fn open_read_only(
1187        &self,
1188        path: impl AsRef<Path>,
1189    ) -> Result<ReadOnlyDatabase, DatabaseError> {
1190        let file = OpenOptions::new().read(true).open(path)?;
1191
1192        ReadOnlyDatabase::new(
1193            Box::new(FileBackend::new_internal(file, true)?),
1194            self.page_size,
1195            None,
1196            self.read_cache_size_bytes,
1197        )
1198    }
1199
1200    /// Open an existing or create a new database in the given `file`.
1201    ///
1202    /// The file must be empty or contain a valid database.
1203    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1204        Database::new(
1205            Box::new(FileBackend::new(file)?),
1206            true,
1207            self.page_size,
1208            self.region_size,
1209            self.read_cache_size_bytes,
1210            self.write_cache_size_bytes,
1211            &self.repair_callback,
1212        )
1213    }
1214
1215    /// Open an existing or create a new database with the given backend.
1216    pub fn create_with_backend(
1217        &self,
1218        backend: impl StorageBackend,
1219    ) -> Result<Database, DatabaseError> {
1220        Database::new(
1221            Box::new(backend),
1222            true,
1223            self.page_size,
1224            self.region_size,
1225            self.read_cache_size_bytes,
1226            self.write_cache_size_bytes,
1227            &self.repair_callback,
1228        )
1229    }
1230}
1231
1232impl std::fmt::Debug for Database {
1233    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1234        f.debug_struct("Database").finish()
1235    }
1236}
1237
1238#[cfg(test)]
1239mod test {
1240    use crate::backends::FileBackend;
1241    use crate::{
1242        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1243        StorageError, TableDefinition, TransactionError,
1244    };
1245    use std::fs::File;
1246    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1247    use std::sync::Arc;
1248    use std::sync::atomic::{AtomicU64, Ordering};
1249
1250    #[derive(Debug)]
1251    struct FailingBackend {
1252        inner: FileBackend,
1253        countdown: Arc<AtomicU64>,
1254    }
1255
1256    impl FailingBackend {
1257        fn new(backend: FileBackend, countdown: u64) -> Self {
1258            Self {
1259                inner: backend,
1260                countdown: Arc::new(AtomicU64::new(countdown)),
1261            }
1262        }
1263
1264        fn check_countdown(&self) -> Result<(), std::io::Error> {
1265            if self.countdown.load(Ordering::SeqCst) == 0 {
1266                return Err(std::io::Error::from(ErrorKind::Other));
1267            }
1268
1269            Ok(())
1270        }
1271
1272        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1273            if self
1274                .countdown
1275                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1276                    if x > 0 { Some(x - 1) } else { None }
1277                })
1278                .is_err()
1279            {
1280                return Err(std::io::Error::from(ErrorKind::Other));
1281            }
1282
1283            Ok(())
1284        }
1285    }
1286
1287    impl StorageBackend for FailingBackend {
1288        fn len(&self) -> Result<u64, std::io::Error> {
1289            self.inner.len()
1290        }
1291
1292        fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1293            self.check_countdown()?;
1294            self.inner.read(offset, out)
1295        }
1296
1297        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1298            self.inner.set_len(len)
1299        }
1300
1301        fn sync_data(&self) -> Result<(), std::io::Error> {
1302            self.check_countdown()?;
1303            self.inner.sync_data()
1304        }
1305
1306        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1307            self.decrement_countdown()?;
1308            self.inner.write(offset, data)
1309        }
1310    }
1311
1312    #[test]
1313    fn crash_regression4() {
1314        let tmpfile = crate::create_tempfile();
1315        let (file, path) = tmpfile.into_parts();
1316
1317        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1318        let db = Database::builder()
1319            .set_cache_size(12686)
1320            .set_page_size(8 * 1024)
1321            .set_region_size(32 * 4096)
1322            .create_with_backend(backend)
1323            .unwrap();
1324
1325        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1326
1327        let tx = db.begin_write().unwrap();
1328        let _savepoint = tx.ephemeral_savepoint().unwrap();
1329        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1330        tx.commit().unwrap();
1331        let tx = db.begin_write().unwrap();
1332        {
1333            let mut table = tx.open_table(table_def).unwrap();
1334            let _ = table.insert_reserve(118821, 360).unwrap();
1335        }
1336        let result = tx.commit();
1337        assert!(result.is_err());
1338
1339        drop(db);
1340        Database::builder()
1341            .set_cache_size(1024 * 1024)
1342            .set_page_size(8 * 1024)
1343            .set_region_size(32 * 4096)
1344            .create(&path)
1345            .unwrap();
1346    }
1347
1348    #[test]
1349    fn transient_io_error() {
1350        let tmpfile = crate::create_tempfile();
1351        let (file, path) = tmpfile.into_parts();
1352
1353        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1354        let countdown = backend.countdown.clone();
1355        let db = Database::builder()
1356            .set_cache_size(0)
1357            .create_with_backend(backend)
1358            .unwrap();
1359
1360        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1361
1362        // Create some garbage
1363        let tx = db.begin_write().unwrap();
1364        {
1365            let mut table = tx.open_table(table_def).unwrap();
1366            table.insert(0, 0).unwrap();
1367        }
1368        tx.commit().unwrap();
1369        let tx = db.begin_write().unwrap();
1370        {
1371            let mut table = tx.open_table(table_def).unwrap();
1372            table.insert(0, 1).unwrap();
1373        }
1374        tx.commit().unwrap();
1375
1376        let tx = db.begin_write().unwrap();
1377        // Cause an error in the commit
1378        countdown.store(0, Ordering::SeqCst);
1379        let result = tx.commit().err().unwrap();
1380        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1381        let result = db.begin_write().err().unwrap();
1382        assert!(matches!(
1383            result,
1384            TransactionError::Storage(StorageError::PreviousIo)
1385        ));
1386        // Simulate a transient error
1387        countdown.store(u64::MAX, Ordering::SeqCst);
1388        drop(db);
1389
1390        // Check that recovery flag is set, even though the error has "cleared"
1391        let mut file = File::open(&path).unwrap();
1392        file.seek(SeekFrom::Start(9)).unwrap();
1393        let mut god_byte = vec![0u8];
1394        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1395        assert_ne!(god_byte[0] & 2, 0);
1396    }
1397
1398    #[test]
1399    fn small_pages() {
1400        let tmpfile = crate::create_tempfile();
1401
1402        let db = Database::builder()
1403            .set_page_size(512)
1404            .create(tmpfile.path())
1405            .unwrap();
1406
1407        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1408        let txn = db.begin_write().unwrap();
1409        {
1410            txn.open_table(table_definition).unwrap();
1411        }
1412        txn.commit().unwrap();
1413    }
1414
1415    #[test]
1416    fn small_pages2() {
1417        let tmpfile = crate::create_tempfile();
1418
1419        let db = Database::builder()
1420            .set_page_size(512)
1421            .create(tmpfile.path())
1422            .unwrap();
1423
1424        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1425
1426        let mut tx = db.begin_write().unwrap();
1427        tx.set_two_phase_commit(true);
1428        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1429        {
1430            tx.open_table(table_def).unwrap();
1431        }
1432        tx.commit().unwrap();
1433
1434        let mut tx = db.begin_write().unwrap();
1435        tx.set_two_phase_commit(true);
1436        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1437        tx.restore_savepoint(&savepoint0).unwrap();
1438        tx.set_durability(Durability::None).unwrap();
1439        {
1440            let mut t = tx.open_table(table_def).unwrap();
1441            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1442            assert!(t.remove(&291295).unwrap().is_none());
1443        }
1444        tx.commit().unwrap();
1445
1446        let mut tx = db.begin_write().unwrap();
1447        tx.set_two_phase_commit(true);
1448        tx.restore_savepoint(&savepoint0).unwrap();
1449        {
1450            tx.open_table(table_def).unwrap();
1451        }
1452        tx.commit().unwrap();
1453
1454        let mut tx = db.begin_write().unwrap();
1455        tx.set_two_phase_commit(true);
1456        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1457        drop(savepoint0);
1458        tx.restore_savepoint(&savepoint2).unwrap();
1459        {
1460            let mut t = tx.open_table(table_def).unwrap();
1461            assert!(t.get(&2059).unwrap().is_none());
1462            assert!(t.remove(&145227).unwrap().is_none());
1463            assert!(t.remove(&145227).unwrap().is_none());
1464        }
1465        tx.commit().unwrap();
1466
1467        let mut tx = db.begin_write().unwrap();
1468        tx.set_two_phase_commit(true);
1469        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1470        drop(savepoint1);
1471        tx.restore_savepoint(&savepoint3).unwrap();
1472        {
1473            tx.open_table(table_def).unwrap();
1474        }
1475        tx.commit().unwrap();
1476
1477        let mut tx = db.begin_write().unwrap();
1478        tx.set_two_phase_commit(true);
1479        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1480        drop(savepoint2);
1481        tx.restore_savepoint(&savepoint3).unwrap();
1482        tx.set_durability(Durability::None).unwrap();
1483        {
1484            let mut t = tx.open_table(table_def).unwrap();
1485            assert!(t.remove(&207936).unwrap().is_none());
1486        }
1487        tx.abort().unwrap();
1488
1489        let mut tx = db.begin_write().unwrap();
1490        tx.set_two_phase_commit(true);
1491        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1492        drop(savepoint3);
1493        assert!(tx.restore_savepoint(&savepoint4).is_err());
1494        {
1495            tx.open_table(table_def).unwrap();
1496        }
1497        tx.commit().unwrap();
1498
1499        let mut tx = db.begin_write().unwrap();
1500        tx.set_two_phase_commit(true);
1501        tx.restore_savepoint(&savepoint5).unwrap();
1502        tx.set_durability(Durability::None).unwrap();
1503        {
1504            tx.open_table(table_def).unwrap();
1505        }
1506        tx.commit().unwrap();
1507    }
1508
1509    #[test]
1510    fn small_pages3() {
1511        let tmpfile = crate::create_tempfile();
1512
1513        let db = Database::builder()
1514            .set_page_size(1024)
1515            .create(tmpfile.path())
1516            .unwrap();
1517
1518        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1519
1520        let mut tx = db.begin_write().unwrap();
1521        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1522        tx.set_durability(Durability::None).unwrap();
1523        {
1524            let mut t = tx.open_table(table_def).unwrap();
1525            let value = vec![0; 306];
1526            t.insert(&539717, value.as_slice()).unwrap();
1527        }
1528        tx.abort().unwrap();
1529
1530        let mut tx = db.begin_write().unwrap();
1531        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1532        tx.restore_savepoint(&savepoint1).unwrap();
1533        tx.set_durability(Durability::None).unwrap();
1534        {
1535            let mut t = tx.open_table(table_def).unwrap();
1536            let value = vec![0; 2008];
1537            t.insert(&784384, value.as_slice()).unwrap();
1538        }
1539        tx.abort().unwrap();
1540    }
1541
1542    #[test]
1543    fn small_pages4() {
1544        let tmpfile = crate::create_tempfile();
1545
1546        let db = Database::builder()
1547            .set_cache_size(1024 * 1024)
1548            .set_page_size(1024)
1549            .create(tmpfile.path())
1550            .unwrap();
1551
1552        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1553
1554        let tx = db.begin_write().unwrap();
1555        {
1556            tx.open_table(table_def).unwrap();
1557        }
1558        tx.commit().unwrap();
1559
1560        let tx = db.begin_write().unwrap();
1561        {
1562            let mut t = tx.open_table(table_def).unwrap();
1563            assert!(t.get(&131072).unwrap().is_none());
1564            let value = vec![0xFF; 1130];
1565            t.insert(&42394, value.as_slice()).unwrap();
1566            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1567            assert!(t.get(&0).unwrap().is_none());
1568        }
1569        tx.abort().unwrap();
1570
1571        let tx = db.begin_write().unwrap();
1572        {
1573            let mut t = tx.open_table(table_def).unwrap();
1574            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1575        }
1576        tx.abort().unwrap();
1577    }
1578
1579    #[test]
1580    fn dynamic_shrink() {
1581        let tmpfile = crate::create_tempfile();
1582        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1583        let big_value = vec![0u8; 1024];
1584
1585        let db = Database::builder()
1586            .set_region_size(1024 * 1024)
1587            .create(tmpfile.path())
1588            .unwrap();
1589
1590        let txn = db.begin_write().unwrap();
1591        {
1592            let mut table = txn.open_table(table_definition).unwrap();
1593            for i in 0..2048 {
1594                table.insert(&i, big_value.as_slice()).unwrap();
1595            }
1596        }
1597        txn.commit().unwrap();
1598
1599        let file_size = tmpfile.as_file().metadata().unwrap().len();
1600
1601        let txn = db.begin_write().unwrap();
1602        {
1603            let mut table = txn.open_table(table_definition).unwrap();
1604            for i in 0..2048 {
1605                table.remove(&i).unwrap();
1606            }
1607        }
1608        txn.commit().unwrap();
1609
1610        // Perform a couple more commits to be sure the database has a chance to compact
1611        let txn = db.begin_write().unwrap();
1612        {
1613            let mut table = txn.open_table(table_definition).unwrap();
1614            table.insert(0, [].as_slice()).unwrap();
1615        }
1616        txn.commit().unwrap();
1617        let txn = db.begin_write().unwrap();
1618        {
1619            let mut table = txn.open_table(table_definition).unwrap();
1620            table.remove(0).unwrap();
1621        }
1622        txn.commit().unwrap();
1623        let txn = db.begin_write().unwrap();
1624        txn.commit().unwrap();
1625
1626        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1627        assert!(final_file_size < file_size);
1628    }
1629
1630    #[test]
1631    fn create_new_db_in_empty_file() {
1632        let tmpfile = crate::create_tempfile();
1633
1634        let _db = Database::builder()
1635            .create_file(tmpfile.into_file())
1636            .unwrap();
1637    }
1638
1639    #[test]
1640    fn open_missing_file() {
1641        let tmpfile = crate::create_tempfile();
1642
1643        let err = Database::builder()
1644            .open(tmpfile.path().with_extension("missing"))
1645            .unwrap_err();
1646
1647        match err {
1648            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1649            err => panic!("Unexpected error for empty file: {err}"),
1650        }
1651    }
1652
1653    #[test]
1654    fn open_empty_file() {
1655        let tmpfile = crate::create_tempfile();
1656
1657        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1658
1659        match err {
1660            DatabaseError::Storage(StorageError::Io(err))
1661                if err.kind() == ErrorKind::InvalidData => {}
1662            err => panic!("Unexpected error for empty file: {err}"),
1663        }
1664    }
1665}