redb/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTreeMut,
5    TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::marker::PhantomData;
14use std::ops::RangeFull;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22    AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE,
23};
24use crate::tree_store::file_backend::FileBackend;
25#[cfg(feature = "logging")]
26use log::{debug, info, warn};
27
28#[allow(clippy::len_without_is_empty)]
29/// Implements persistent storage for a database.
30pub trait StorageBackend: 'static + Debug + Send + Sync {
31    /// Gets the current length of the storage.
32    fn len(&self) -> std::result::Result<u64, io::Error>;
33
34    /// Reads the specified array of bytes from the storage.
35    ///
36    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
37    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
38
39    /// Sets the length of the storage.
40    ///
41    /// When extending the storage the new positions should be zero initialized.
42    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
43
44    /// Syncs all buffered data with the persistent storage.
45    ///
46    /// If `eventual` is true, data may become persistent at some point after this call returns,
47    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
48    /// call to `sync_data()` will become persistent before any writes that occur after.
49    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
50
51    /// Writes the specified array to the storage.
52    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
53}
54
55pub trait TableHandle: Sealed {
56    // Returns the name of the table
57    fn name(&self) -> &str;
58}
59
60#[derive(Clone)]
61pub struct UntypedTableHandle {
62    name: String,
63}
64
65impl UntypedTableHandle {
66    pub(crate) fn new(name: String) -> Self {
67        Self { name }
68    }
69}
70
71impl TableHandle for UntypedTableHandle {
72    fn name(&self) -> &str {
73        &self.name
74    }
75}
76
77impl Sealed for UntypedTableHandle {}
78
79pub trait MultimapTableHandle: Sealed {
80    // Returns the name of the multimap table
81    fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedMultimapTableHandle {
86    name: String,
87}
88
89impl UntypedMultimapTableHandle {
90    pub(crate) fn new(name: String) -> Self {
91        Self { name }
92    }
93}
94
95impl MultimapTableHandle for UntypedMultimapTableHandle {
96    fn name(&self) -> &str {
97        &self.name
98    }
99}
100
101impl Sealed for UntypedMultimapTableHandle {}
102
103/// Defines the name and types of a table
104///
105/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
106///
107/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
108/// that is stored or retreived from the table
109pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
110    name: &'a str,
111    _key_type: PhantomData<K>,
112    _value_type: PhantomData<V>,
113}
114
115impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
116    /// Construct a new table with given `name`
117    ///
118    /// ## Invariant
119    ///
120    /// `name` must not be empty.
121    pub const fn new(name: &'a str) -> Self {
122        assert!(!name.is_empty());
123        Self {
124            name,
125            _key_type: PhantomData,
126            _value_type: PhantomData,
127        }
128    }
129}
130
131impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'a, K, V> {
132    fn name(&self) -> &str {
133        self.name
134    }
135}
136
137impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
138
139impl<'a, K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'a, K, V> {
140    fn clone(&self) -> Self {
141        *self
142    }
143}
144
145impl<'a, K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'a, K, V> {}
146
147impl<'a, K: Key + 'static, V: Value + 'static> Display for TableDefinition<'a, K, V> {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        write!(
150            f,
151            "{}<{}, {}>",
152            self.name,
153            K::type_name().name(),
154            V::type_name().name()
155        )
156    }
157}
158
159/// Defines the name and types of a multimap table
160///
161/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
162///
163/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
164///
165/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
166/// that is stored or retreived from the table
167pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
168    name: &'a str,
169    _key_type: PhantomData<K>,
170    _value_type: PhantomData<V>,
171}
172
173impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
174    pub const fn new(name: &'a str) -> Self {
175        assert!(!name.is_empty());
176        Self {
177            name,
178            _key_type: PhantomData,
179            _value_type: PhantomData,
180        }
181    }
182}
183
184impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableHandle
185    for MultimapTableDefinition<'a, K, V>
186{
187    fn name(&self) -> &str {
188        self.name
189    }
190}
191
192impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
193
194impl<'a, K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'a, K, V> {
195    fn clone(&self) -> Self {
196        *self
197    }
198}
199
200impl<'a, K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
201
202impl<'a, K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'a, K, V> {
203    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
204        write!(
205            f,
206            "{}<{}, {}>",
207            self.name,
208            K::type_name().name(),
209            V::type_name().name()
210        )
211    }
212}
213
214/// Information regarding the usage of the in-memory cache
215///
216/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
217#[derive(Debug)]
218pub struct CacheStats {
219    pub(crate) evictions: u64,
220}
221
222impl CacheStats {
223    /// Number of times that data has been evicted, due to the cache being full
224    ///
225    /// To increase the cache size use [`Builder::set_cache_size`]
226    pub fn evictions(&self) -> u64 {
227        self.evictions
228    }
229}
230
231pub(crate) struct TransactionGuard {
232    transaction_tracker: Option<Arc<TransactionTracker>>,
233    transaction_id: Option<TransactionId>,
234    write_transaction: bool,
235}
236
237impl TransactionGuard {
238    pub(crate) fn new_read(
239        transaction_id: TransactionId,
240        tracker: Arc<TransactionTracker>,
241    ) -> Self {
242        Self {
243            transaction_tracker: Some(tracker),
244            transaction_id: Some(transaction_id),
245            write_transaction: false,
246        }
247    }
248
249    pub(crate) fn new_write(
250        transaction_id: TransactionId,
251        tracker: Arc<TransactionTracker>,
252    ) -> Self {
253        Self {
254            transaction_tracker: Some(tracker),
255            transaction_id: Some(transaction_id),
256            write_transaction: true,
257        }
258    }
259
260    // TODO: remove this hack
261    pub(crate) fn fake() -> Self {
262        Self {
263            transaction_tracker: None,
264            transaction_id: None,
265            write_transaction: false,
266        }
267    }
268
269    pub(crate) fn id(&self) -> TransactionId {
270        self.transaction_id.unwrap()
271    }
272
273    pub(crate) fn leak(mut self) -> TransactionId {
274        self.transaction_id.take().unwrap()
275    }
276}
277
278impl Drop for TransactionGuard {
279    fn drop(&mut self) {
280        if self.transaction_tracker.is_none() {
281            return;
282        }
283        if let Some(transaction_id) = self.transaction_id {
284            if self.write_transaction {
285                self.transaction_tracker
286                    .as_ref()
287                    .unwrap()
288                    .end_write_transaction(transaction_id);
289            } else {
290                self.transaction_tracker
291                    .as_ref()
292                    .unwrap()
293                    .deallocate_read_transaction(transaction_id);
294            }
295        }
296    }
297}
298
299/// Opened redb database file
300///
301/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
302/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
303///
304/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
305/// may be in progress at a time.
306///
307/// # Examples
308///
309/// Basic usage:
310///
311/// ```rust
312/// use redb::*;
313/// # use tempfile::NamedTempFile;
314/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
315///
316/// # fn main() -> Result<(), Error> {
317/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
318/// # let filename = tmpfile.path();
319/// let db = Database::create(filename)?;
320/// let write_txn = db.begin_write()?;
321/// {
322///     let mut table = write_txn.open_table(TABLE)?;
323///     table.insert(&0, &0)?;
324/// }
325/// write_txn.commit()?;
326/// # Ok(())
327/// # }
328/// ```
329pub struct Database {
330    mem: Arc<TransactionalMemory>,
331    transaction_tracker: Arc<TransactionTracker>,
332}
333
334impl Database {
335    /// Opens the specified file as a redb database.
336    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
337    /// * if the file is a valid redb database, it will be opened
338    /// * otherwise this function will return an error
339    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
340        Self::builder().create(path)
341    }
342
343    /// Opens an existing redb database.
344    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
345        Self::builder().open(path)
346    }
347
348    /// Information regarding the usage of the in-memory cache
349    ///
350    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
351    pub fn cache_stats(&self) -> CacheStats {
352        self.mem.cache_stats()
353    }
354
355    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
356        self.mem.clone()
357    }
358
359    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
360        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
361        let table_tree = TableTreeMut::new(
362            mem.get_data_root(),
363            Arc::new(TransactionGuard::fake()),
364            mem.clone(),
365            fake_freed_pages.clone(),
366        );
367        if !table_tree.verify_checksums()? {
368            return Ok(false);
369        }
370        let system_table_tree = TableTreeMut::new(
371            mem.get_system_root(),
372            Arc::new(TransactionGuard::fake()),
373            mem.clone(),
374            fake_freed_pages.clone(),
375        );
376        if !system_table_tree.verify_checksums()? {
377            return Ok(false);
378        }
379        assert!(fake_freed_pages.lock().unwrap().is_empty());
380
381        if let Some(header) = mem.get_freed_root() {
382            if !RawBtree::new(
383                Some(header),
384                FreedTableKey::fixed_width(),
385                FreedPageList::fixed_width(),
386                mem.clone(),
387            )
388            .verify_checksum()?
389            {
390                return Ok(false);
391            }
392        }
393
394        Ok(true)
395    }
396
397    /// Force a check of the integrity of the database file, and repair it if possible.
398    ///
399    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
400    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
401    /// quite slow and should only be used when you suspect the database file may have been modified
402    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
403    ///
404    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
405    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
406    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
407        let allocator_hash = self.mem.allocator_hash();
408        let mut was_clean = Arc::get_mut(&mut self.mem)
409            .unwrap()
410            .clear_cache_and_reload()?;
411
412        let old_roots = [
413            self.mem.get_data_root(),
414            self.mem.get_system_root(),
415            self.mem.get_freed_root(),
416        ];
417
418        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
419            DatabaseError::Storage(storage_err) => storage_err,
420            _ => unreachable!(),
421        })?;
422
423        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
424            was_clean = false;
425        }
426
427        if !was_clean {
428            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
429            let [data_root, system_root, freed_root] = new_roots;
430            self.mem.commit(
431                data_root,
432                system_root,
433                freed_root,
434                next_transaction_id,
435                false,
436                true,
437            )?;
438        }
439
440        self.mem.begin_writable()?;
441
442        Ok(was_clean)
443    }
444
445    /// Compacts the database file
446    ///
447    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
448    pub fn compact(&mut self) -> Result<bool, CompactionError> {
449        if self
450            .transaction_tracker
451            .oldest_live_read_transaction()
452            .is_some()
453        {
454            return Err(CompactionError::TransactionInProgress);
455        }
456        // Commit to free up any pending free pages
457        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
458        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
459        // can cancel the compaction without requiring a full repair afterwards
460        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
461        if txn.list_persistent_savepoints()?.next().is_some() {
462            return Err(CompactionError::PersistentSavepointExists);
463        }
464        if self.transaction_tracker.any_savepoint_exists() {
465            return Err(CompactionError::EphemeralSavepointExists);
466        }
467        txn.set_two_phase_commit(true);
468        txn.commit().map_err(|e| e.into_storage_error())?;
469        // Repeat, just in case executing list_persistent_savepoints() created a new table
470        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
471        txn.set_two_phase_commit(true);
472        txn.commit().map_err(|e| e.into_storage_error())?;
473        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
474        // should have been cleared out by the above commit()
475        assert!(self.mem.get_freed_root().is_none());
476
477        let mut compacted = false;
478        // Iteratively compact until no progress is made
479        loop {
480            let mut progress = false;
481
482            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
483            if txn.compact_pages()? {
484                progress = true;
485                txn.commit().map_err(|e| e.into_storage_error())?;
486            } else {
487                txn.abort()?;
488            }
489
490            // Double commit to free up the relocated pages for reuse
491            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
492            txn.set_two_phase_commit(true);
493            txn.commit().map_err(|e| e.into_storage_error())?;
494            assert!(self.mem.get_freed_root().is_none());
495
496            if !progress {
497                break;
498            }
499
500            compacted = true;
501        }
502
503        Ok(compacted)
504    }
505
506    fn check_repaired_persistent_savepoints(
507        system_root: Option<BtreeHeader>,
508        mem: Arc<TransactionalMemory>,
509    ) -> Result {
510        let freed_list = Arc::new(Mutex::new(vec![]));
511        let table_tree = TableTreeMut::new(
512            system_root,
513            Arc::new(TransactionGuard::fake()),
514            mem.clone(),
515            freed_list,
516        );
517        let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
518        if let Some(savepoint_table_def) = table_tree
519            .get_table::<SavepointId, SerializedSavepoint>(
520                SAVEPOINT_TABLE.name(),
521                TableType::Normal,
522            )
523            .map_err(|e| {
524                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
525            })?
526        {
527            let savepoint_table_root =
528                if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
529                    table_root
530                } else {
531                    unreachable!()
532                };
533            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
534                ReadOnlyTable::new(
535                    "internal savepoint table".to_string(),
536                    savepoint_table_root,
537                    PageHint::None,
538                    Arc::new(TransactionGuard::fake()),
539                    mem.clone(),
540                )?;
541            for result in savepoint_table.range::<SavepointId>(..)? {
542                let (_, savepoint_data) = result?;
543                let savepoint = savepoint_data
544                    .value()
545                    .to_savepoint(fake_transaction_tracker.clone());
546                if let Some(header) = savepoint.get_user_root() {
547                    Self::check_pages_allocated_recursive(header.root, mem.clone())?;
548                }
549            }
550        }
551
552        Ok(())
553    }
554
555    fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
556        if let Some(header) = freed_root {
557            let freed_pages_iter = AllPageNumbersBtreeIter::new(
558                header.root,
559                FreedTableKey::fixed_width(),
560                FreedPageList::fixed_width(),
561                mem.clone(),
562            )?;
563            for page in freed_pages_iter {
564                mem.mark_page_allocated(page?);
565            }
566        }
567
568        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
569            "internal freed table".to_string(),
570            freed_root,
571            PageHint::None,
572            Arc::new(TransactionGuard::fake()),
573            mem.clone(),
574        )?;
575        for result in freed_table.range::<FreedTableKey>(..)? {
576            let (_, freed_page_list) = result?;
577            for i in 0..freed_page_list.value().len() {
578                mem.mark_page_allocated(freed_page_list.value().get(i));
579            }
580        }
581
582        Ok(())
583    }
584
585    fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
586        // Repair the allocator state
587        // All pages in the master table
588        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
589        for result in master_pages_iter {
590            let page = result?;
591            assert!(mem.is_allocated(page));
592        }
593
594        // Iterate over all other tables
595        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
596            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
597
598        // Chain all the other tables to the master table iter
599        for entry in iter {
600            let definition = entry?.value();
601            definition.visit_all_pages(mem.clone(), |path| {
602                assert!(mem.is_allocated(path.page_number()));
603                Ok(())
604            })?;
605        }
606
607        Ok(())
608    }
609
610    fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
611        // Repair the allocator state
612        // All pages in the master table
613        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
614        for page in master_pages_iter {
615            mem.mark_page_allocated(page?);
616        }
617
618        // Iterate over all other tables
619        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
620            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
621
622        // Chain all the other tables to the master table iter
623        for entry in iter {
624            let definition = entry?.value();
625            definition.visit_all_pages(mem.clone(), |path| {
626                mem.mark_page_allocated(path.page_number());
627                Ok(())
628            })?;
629        }
630
631        Ok(())
632    }
633
634    fn do_repair(
635        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
636        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
637    ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
638        if !Self::verify_primary_checksums(mem.clone())? {
639            if mem.used_two_phase_commit() {
640                return Err(DatabaseError::Storage(StorageError::Corrupted(
641                    "Primary is corrupted despite 2-phase commit".to_string(),
642                )));
643            }
644
645            // 0.3 because the repair takes 3 full scans and the first is done now
646            let mut handle = RepairSession::new(0.3);
647            repair_callback(&mut handle);
648            if handle.aborted() {
649                return Err(DatabaseError::RepairAborted);
650            }
651
652            mem.repair_primary_corrupted();
653            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
654            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
655            // that rolls back a partially committed transaction.
656            mem.clear_read_cache();
657            if !Self::verify_primary_checksums(mem.clone())? {
658                return Err(DatabaseError::Storage(StorageError::Corrupted(
659                    "Failed to repair database. All roots are corrupted".to_string(),
660                )));
661            }
662        }
663        // 0.6 because the repair takes 3 full scans and the second is done now
664        let mut handle = RepairSession::new(0.6);
665        repair_callback(&mut handle);
666        if handle.aborted() {
667            return Err(DatabaseError::RepairAborted);
668        }
669
670        mem.begin_repair()?;
671
672        let data_root = mem.get_data_root();
673        if let Some(header) = data_root {
674            Self::mark_tables_recursive(header.root, mem.clone())?;
675        }
676
677        let freed_root = mem.get_freed_root();
678        // Allow processing of all transactions, since this is the main freed tree
679        Self::mark_freed_tree(freed_root, mem.clone())?;
680        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
681            "internal freed table".to_string(),
682            freed_root,
683            PageHint::None,
684            Arc::new(TransactionGuard::fake()),
685            mem.clone(),
686        )?;
687        drop(freed_table);
688
689        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
690        let mut handle = RepairSession::new(0.9);
691        repair_callback(&mut handle);
692        if handle.aborted() {
693            return Err(DatabaseError::RepairAborted);
694        }
695
696        let system_root = mem.get_system_root();
697        if let Some(header) = system_root {
698            Self::mark_tables_recursive(header.root, mem.clone())?;
699        }
700        #[cfg(debug_assertions)]
701        {
702            // Savepoints only reference pages from a previous commit, so those are either referenced
703            // in the current root, or are in the freed tree
704            Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
705        }
706
707        mem.end_repair()?;
708
709        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
710        // by storing an empty root during the below commit()
711        mem.clear_read_cache();
712
713        Ok([data_root, system_root, freed_root])
714    }
715
716    fn new(
717        file: Box<dyn StorageBackend>,
718        allow_initialize: bool,
719        page_size: usize,
720        region_size: Option<u64>,
721        read_cache_size_bytes: usize,
722        write_cache_size_bytes: usize,
723        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
724    ) -> Result<Self, DatabaseError> {
725        #[cfg(feature = "logging")]
726        let file_path = format!("{:?}", &file);
727        #[cfg(feature = "logging")]
728        info!("Opening database {:?}", &file_path);
729        let mem = TransactionalMemory::new(
730            file,
731            allow_initialize,
732            page_size,
733            region_size,
734            read_cache_size_bytes,
735            write_cache_size_bytes,
736        )?;
737        let mut mem = Arc::new(mem);
738        if mem.needs_repair()? {
739            // If the last transaction used 2-phase commit and updated the allocator state table, then
740            // we can just load the allocator state from there. Otherwise, we need a full repair
741            if let Some(tree) = Self::get_allocator_state_table(&mem)? {
742                #[cfg(feature = "logging")]
743                info!("Found valid allocator state, full repair not needed");
744                mem.load_allocator_state(&tree)?;
745            } else {
746                #[cfg(feature = "logging")]
747                warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
748                let mut handle = RepairSession::new(0.0);
749                repair_callback(&mut handle);
750                if handle.aborted() {
751                    return Err(DatabaseError::RepairAborted);
752                }
753                let [data_root, system_root, freed_root] =
754                    Self::do_repair(&mut mem, repair_callback)?;
755                let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
756                mem.commit(
757                    data_root,
758                    system_root,
759                    freed_root,
760                    next_transaction_id,
761                    false,
762                    true,
763                )?;
764            }
765        }
766
767        mem.begin_writable()?;
768        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
769
770        let db = Database {
771            mem,
772            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
773        };
774
775        // Restore the tracker state for any persistent savepoints
776        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
777        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
778            db.transaction_tracker
779                .restore_savepoint_counter_state(next_id);
780        }
781        for id in txn.list_persistent_savepoints()? {
782            let savepoint = match txn.get_persistent_savepoint(id) {
783                Ok(savepoint) => savepoint,
784                Err(err) => match err {
785                    SavepointError::InvalidSavepoint => unreachable!(),
786                    SavepointError::Storage(storage) => {
787                        return Err(storage.into());
788                    }
789                },
790            };
791            db.transaction_tracker
792                .register_persistent_savepoint(&savepoint);
793        }
794        txn.abort()?;
795
796        Ok(db)
797    }
798
799    fn get_allocator_state_table(
800        mem: &Arc<TransactionalMemory>,
801    ) -> Result<Option<AllocatorStateTree>> {
802        // The allocator state table is only valid if the primary was written using 2-phase commit
803        if !mem.used_two_phase_commit() {
804            return Ok(None);
805        }
806
807        // See if it's present in the system table tree
808        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
809        let system_table_tree = TableTreeMut::new(
810            mem.get_system_root(),
811            Arc::new(TransactionGuard::fake()),
812            mem.clone(),
813            fake_freed_pages.clone(),
814        );
815        let Some(allocator_state_table) = system_table_tree
816            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
817            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
818        else {
819            return Ok(None);
820        };
821
822        // Load the allocator state table
823        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
824            unreachable!();
825        };
826        let tree = AllocatorStateTree::new(
827            table_root,
828            Arc::new(TransactionGuard::fake()),
829            mem.clone(),
830            fake_freed_pages,
831        );
832
833        // Make sure this isn't stale allocator state left over from a previous transaction
834        if !mem.is_valid_allocator_state(&tree)? {
835            return Ok(None);
836        }
837
838        Ok(Some(tree))
839    }
840
841    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
842        let id = self
843            .transaction_tracker
844            .register_read_transaction(&self.mem)?;
845
846        Ok(TransactionGuard::new_read(
847            id,
848            self.transaction_tracker.clone(),
849        ))
850    }
851
852    /// Convenience method for [`Builder::new`]
853    pub fn builder() -> Builder {
854        Builder::new()
855    }
856
857    /// Begins a write transaction
858    ///
859    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
860    /// write may be in progress at a time. If a write is in progress, this function will block
861    /// until it completes.
862    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
863        // Fail early if there has been an I/O error -- nothing can be committed in that case
864        self.mem.check_io_errors()?;
865        let guard = TransactionGuard::new_write(
866            self.transaction_tracker.start_write_transaction(),
867            self.transaction_tracker.clone(),
868        );
869        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
870            .map_err(|e| e.into())
871    }
872
873    /// Begins a read transaction
874    ///
875    /// Captures a snapshot of the database, so that only data committed before calling this method
876    /// is visible in the transaction
877    ///
878    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
879    /// may exist concurrently with writes
880    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
881        let guard = self.allocate_read_transaction()?;
882        #[cfg(feature = "logging")]
883        debug!("Beginning read transaction id={:?}", guard.id());
884        ReadTransaction::new(self.get_memory(), guard)
885    }
886
887    fn ensure_allocator_state_table(&self) -> Result<(), Error> {
888        // If the allocator state table is already up to date, we're done
889        if Self::get_allocator_state_table(&self.mem)?.is_some() {
890            return Ok(());
891        }
892
893        // Make a new quick-repair commit to update the allocator state table
894        #[cfg(feature = "logging")]
895        debug!("Writing allocator state table");
896        let mut tx = self.begin_write()?;
897        tx.set_quick_repair(true);
898        tx.commit()?;
899
900        Ok(())
901    }
902}
903
904impl Drop for Database {
905    fn drop(&mut self) {
906        if thread::panicking() {
907            return;
908        }
909
910        if self.ensure_allocator_state_table().is_err() {
911            #[cfg(feature = "logging")]
912            warn!("Failed to write allocator state table. Repair may be required at restart.")
913        }
914    }
915}
916
917pub struct RepairSession {
918    progress: f64,
919    aborted: bool,
920}
921
922impl RepairSession {
923    pub(crate) fn new(progress: f64) -> Self {
924        Self {
925            progress,
926            aborted: false,
927        }
928    }
929
930    pub(crate) fn aborted(&self) -> bool {
931        self.aborted
932    }
933
934    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
935    pub fn abort(&mut self) {
936        self.aborted = true;
937    }
938
939    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
940    pub fn progress(&self) -> f64 {
941        self.progress
942    }
943}
944
945/// Configuration builder of a redb [Database].
946pub struct Builder {
947    page_size: usize,
948    region_size: Option<u64>,
949    read_cache_size_bytes: usize,
950    write_cache_size_bytes: usize,
951    repair_callback: Box<dyn Fn(&mut RepairSession)>,
952}
953
954impl Builder {
955    /// Construct a new [Builder] with sensible defaults.
956    ///
957    /// ## Defaults
958    ///
959    /// - `cache_size_bytes`: 1GiB
960    #[allow(clippy::new_without_default)]
961    pub fn new() -> Self {
962        let mut result = Self {
963            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
964            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
965            // It is part of the file format, so can be enabled in the future.
966            page_size: PAGE_SIZE,
967            region_size: None,
968            // TODO: Default should probably take into account the total system memory
969            read_cache_size_bytes: 0,
970            // TODO: Default should probably take into account the total system memory
971            write_cache_size_bytes: 0,
972            repair_callback: Box::new(|_| {}),
973        };
974
975        result.set_cache_size(1024 * 1024 * 1024);
976        result
977    }
978
979    /// Set a callback which will be invoked periodically in the event that the database file needs
980    /// to be repaired.
981    ///
982    /// The [`RepairSession`] argument can be used to control the repair process.
983    ///
984    /// If the database file needs repair, the callback will be invoked at least once.
985    /// There is no upper limit on the number of times it may be called.
986    pub fn set_repair_callback(
987        &mut self,
988        callback: impl Fn(&mut RepairSession) + 'static,
989    ) -> &mut Self {
990        self.repair_callback = Box::new(callback);
991        self
992    }
993
994    /// Set the internal page size of the database
995    ///
996    /// Valid values are powers of two, greater than or equal to 512
997    ///
998    /// ## Defaults
999    ///
1000    /// Default to 4 Kib pages.
1001    #[cfg(any(fuzzing, test))]
1002    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1003        assert!(size.is_power_of_two());
1004        self.page_size = std::cmp::max(size, 512);
1005        self
1006    }
1007
1008    /// Set the amount of memory (in bytes) used for caching data
1009    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1010        // TODO: allow dynamic expansion of the read/write cache
1011        self.read_cache_size_bytes = bytes / 10 * 9;
1012        self.write_cache_size_bytes = bytes / 10;
1013        self
1014    }
1015
1016    #[cfg(any(test, fuzzing))]
1017    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1018        assert!(size.is_power_of_two());
1019        self.region_size = Some(size);
1020        self
1021    }
1022
1023    /// Opens the specified file as a redb database.
1024    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1025    /// * if the file is a valid redb database, it will be opened
1026    /// * otherwise this function will return an error
1027    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1028        let file = OpenOptions::new()
1029            .read(true)
1030            .write(true)
1031            .create(true)
1032            .truncate(false)
1033            .open(path)?;
1034
1035        Database::new(
1036            Box::new(FileBackend::new(file)?),
1037            true,
1038            self.page_size,
1039            self.region_size,
1040            self.read_cache_size_bytes,
1041            self.write_cache_size_bytes,
1042            &self.repair_callback,
1043        )
1044    }
1045
1046    /// Opens an existing redb database.
1047    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1048        let file = OpenOptions::new().read(true).write(true).open(path)?;
1049
1050        Database::new(
1051            Box::new(FileBackend::new(file)?),
1052            false,
1053            self.page_size,
1054            None,
1055            self.read_cache_size_bytes,
1056            self.write_cache_size_bytes,
1057            &self.repair_callback,
1058        )
1059    }
1060
1061    /// Open an existing or create a new database in the given `file`.
1062    ///
1063    /// The file must be empty or contain a valid database.
1064    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1065        Database::new(
1066            Box::new(FileBackend::new(file)?),
1067            true,
1068            self.page_size,
1069            self.region_size,
1070            self.read_cache_size_bytes,
1071            self.write_cache_size_bytes,
1072            &self.repair_callback,
1073        )
1074    }
1075
1076    /// Open an existing or create a new database with the given backend.
1077    pub fn create_with_backend(
1078        &self,
1079        backend: impl StorageBackend,
1080    ) -> Result<Database, DatabaseError> {
1081        Database::new(
1082            Box::new(backend),
1083            true,
1084            self.page_size,
1085            self.region_size,
1086            self.read_cache_size_bytes,
1087            self.write_cache_size_bytes,
1088            &self.repair_callback,
1089        )
1090    }
1091}
1092
1093impl std::fmt::Debug for Database {
1094    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1095        f.debug_struct("Database").finish()
1096    }
1097}
1098
1099#[cfg(test)]
1100mod test {
1101    use crate::backends::FileBackend;
1102    use crate::{
1103        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1104        StorageError, TableDefinition, TransactionError,
1105    };
1106    use std::fs::File;
1107    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1108    use std::sync::atomic::{AtomicU64, Ordering};
1109    use std::sync::Arc;
1110
1111    #[derive(Debug)]
1112    struct FailingBackend {
1113        inner: FileBackend,
1114        countdown: Arc<AtomicU64>,
1115    }
1116
1117    impl FailingBackend {
1118        fn new(backend: FileBackend, countdown: u64) -> Self {
1119            Self {
1120                inner: backend,
1121                countdown: Arc::new(AtomicU64::new(countdown)),
1122            }
1123        }
1124
1125        fn check_countdown(&self) -> Result<(), std::io::Error> {
1126            if self.countdown.load(Ordering::SeqCst) == 0 {
1127                return Err(std::io::Error::from(ErrorKind::Other));
1128            }
1129
1130            Ok(())
1131        }
1132
1133        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1134            if self
1135                .countdown
1136                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1137                    if x > 0 {
1138                        Some(x - 1)
1139                    } else {
1140                        None
1141                    }
1142                })
1143                .is_err()
1144            {
1145                return Err(std::io::Error::from(ErrorKind::Other));
1146            }
1147
1148            Ok(())
1149        }
1150    }
1151
1152    impl StorageBackend for FailingBackend {
1153        fn len(&self) -> Result<u64, std::io::Error> {
1154            self.inner.len()
1155        }
1156
1157        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1158            self.check_countdown()?;
1159            self.inner.read(offset, len)
1160        }
1161
1162        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1163            self.inner.set_len(len)
1164        }
1165
1166        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1167            self.check_countdown()?;
1168            self.inner.sync_data(eventual)
1169        }
1170
1171        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1172            self.decrement_countdown()?;
1173            self.inner.write(offset, data)
1174        }
1175    }
1176
1177    #[test]
1178    fn crash_regression4() {
1179        let tmpfile = crate::create_tempfile();
1180        let (file, path) = tmpfile.into_parts();
1181
1182        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1183        let db = Database::builder()
1184            .set_cache_size(12686)
1185            .set_page_size(8 * 1024)
1186            .set_region_size(32 * 4096)
1187            .create_with_backend(backend)
1188            .unwrap();
1189
1190        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1191
1192        let tx = db.begin_write().unwrap();
1193        let _savepoint = tx.ephemeral_savepoint().unwrap();
1194        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1195        tx.commit().unwrap();
1196        let tx = db.begin_write().unwrap();
1197        {
1198            let mut table = tx.open_table(table_def).unwrap();
1199            let _ = table.insert_reserve(118821, 360).unwrap();
1200        }
1201        let result = tx.commit();
1202        assert!(result.is_err());
1203
1204        drop(db);
1205        Database::builder()
1206            .set_cache_size(1024 * 1024)
1207            .set_page_size(8 * 1024)
1208            .set_region_size(32 * 4096)
1209            .create(&path)
1210            .unwrap();
1211    }
1212
1213    #[test]
1214    fn transient_io_error() {
1215        let tmpfile = crate::create_tempfile();
1216        let (file, path) = tmpfile.into_parts();
1217
1218        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1219        let countdown = backend.countdown.clone();
1220        let db = Database::builder()
1221            .set_cache_size(0)
1222            .create_with_backend(backend)
1223            .unwrap();
1224
1225        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1226
1227        // Create some garbage
1228        let tx = db.begin_write().unwrap();
1229        {
1230            let mut table = tx.open_table(table_def).unwrap();
1231            table.insert(0, 0).unwrap();
1232        }
1233        tx.commit().unwrap();
1234        let tx = db.begin_write().unwrap();
1235        {
1236            let mut table = tx.open_table(table_def).unwrap();
1237            table.insert(0, 1).unwrap();
1238        }
1239        tx.commit().unwrap();
1240
1241        let tx = db.begin_write().unwrap();
1242        // Cause an error in the commit
1243        countdown.store(0, Ordering::SeqCst);
1244        let result = tx.commit().err().unwrap();
1245        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1246        let result = db.begin_write().err().unwrap();
1247        assert!(matches!(
1248            result,
1249            TransactionError::Storage(StorageError::PreviousIo)
1250        ));
1251        // Simulate a transient error
1252        countdown.store(u64::MAX, Ordering::SeqCst);
1253        drop(db);
1254
1255        // Check that recovery flag is set, even though the error has "cleared"
1256        let mut file = File::open(&path).unwrap();
1257        file.seek(SeekFrom::Start(9)).unwrap();
1258        let mut god_byte = vec![0u8];
1259        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1260        assert_ne!(god_byte[0] & 2, 0);
1261    }
1262
1263    #[test]
1264    fn small_pages() {
1265        let tmpfile = crate::create_tempfile();
1266
1267        let db = Database::builder()
1268            .set_page_size(512)
1269            .create(tmpfile.path())
1270            .unwrap();
1271
1272        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1273        let txn = db.begin_write().unwrap();
1274        {
1275            txn.open_table(table_definition).unwrap();
1276        }
1277        txn.commit().unwrap();
1278    }
1279
1280    #[test]
1281    fn small_pages2() {
1282        let tmpfile = crate::create_tempfile();
1283
1284        let db = Database::builder()
1285            .set_page_size(512)
1286            .create(tmpfile.path())
1287            .unwrap();
1288
1289        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1290
1291        let mut tx = db.begin_write().unwrap();
1292        tx.set_two_phase_commit(true);
1293        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1294        {
1295            tx.open_table(table_def).unwrap();
1296        }
1297        tx.commit().unwrap();
1298
1299        let mut tx = db.begin_write().unwrap();
1300        tx.set_two_phase_commit(true);
1301        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1302        tx.restore_savepoint(&savepoint0).unwrap();
1303        tx.set_durability(Durability::None);
1304        {
1305            let mut t = tx.open_table(table_def).unwrap();
1306            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1307            assert!(t.remove(&291295).unwrap().is_none());
1308        }
1309        tx.commit().unwrap();
1310
1311        let mut tx = db.begin_write().unwrap();
1312        tx.set_two_phase_commit(true);
1313        tx.restore_savepoint(&savepoint0).unwrap();
1314        {
1315            tx.open_table(table_def).unwrap();
1316        }
1317        tx.commit().unwrap();
1318
1319        let mut tx = db.begin_write().unwrap();
1320        tx.set_two_phase_commit(true);
1321        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1322        drop(savepoint0);
1323        tx.restore_savepoint(&savepoint2).unwrap();
1324        {
1325            let mut t = tx.open_table(table_def).unwrap();
1326            assert!(t.get(&2059).unwrap().is_none());
1327            assert!(t.remove(&145227).unwrap().is_none());
1328            assert!(t.remove(&145227).unwrap().is_none());
1329        }
1330        tx.commit().unwrap();
1331
1332        let mut tx = db.begin_write().unwrap();
1333        tx.set_two_phase_commit(true);
1334        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1335        drop(savepoint1);
1336        tx.restore_savepoint(&savepoint3).unwrap();
1337        {
1338            tx.open_table(table_def).unwrap();
1339        }
1340        tx.commit().unwrap();
1341
1342        let mut tx = db.begin_write().unwrap();
1343        tx.set_two_phase_commit(true);
1344        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1345        drop(savepoint2);
1346        tx.restore_savepoint(&savepoint3).unwrap();
1347        tx.set_durability(Durability::None);
1348        {
1349            let mut t = tx.open_table(table_def).unwrap();
1350            assert!(t.remove(&207936).unwrap().is_none());
1351        }
1352        tx.abort().unwrap();
1353
1354        let mut tx = db.begin_write().unwrap();
1355        tx.set_two_phase_commit(true);
1356        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1357        drop(savepoint3);
1358        assert!(tx.restore_savepoint(&savepoint4).is_err());
1359        {
1360            tx.open_table(table_def).unwrap();
1361        }
1362        tx.commit().unwrap();
1363
1364        let mut tx = db.begin_write().unwrap();
1365        tx.set_two_phase_commit(true);
1366        tx.restore_savepoint(&savepoint5).unwrap();
1367        tx.set_durability(Durability::None);
1368        {
1369            tx.open_table(table_def).unwrap();
1370        }
1371        tx.commit().unwrap();
1372    }
1373
1374    #[test]
1375    fn small_pages3() {
1376        let tmpfile = crate::create_tempfile();
1377
1378        let db = Database::builder()
1379            .set_page_size(1024)
1380            .create(tmpfile.path())
1381            .unwrap();
1382
1383        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1384
1385        let mut tx = db.begin_write().unwrap();
1386        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1387        tx.set_durability(Durability::None);
1388        {
1389            let mut t = tx.open_table(table_def).unwrap();
1390            let value = vec![0; 306];
1391            t.insert(&539717, value.as_slice()).unwrap();
1392        }
1393        tx.abort().unwrap();
1394
1395        let mut tx = db.begin_write().unwrap();
1396        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1397        tx.restore_savepoint(&savepoint1).unwrap();
1398        tx.set_durability(Durability::None);
1399        {
1400            let mut t = tx.open_table(table_def).unwrap();
1401            let value = vec![0; 2008];
1402            t.insert(&784384, value.as_slice()).unwrap();
1403        }
1404        tx.abort().unwrap();
1405    }
1406
1407    #[test]
1408    fn small_pages4() {
1409        let tmpfile = crate::create_tempfile();
1410
1411        let db = Database::builder()
1412            .set_cache_size(1024 * 1024)
1413            .set_page_size(1024)
1414            .create(tmpfile.path())
1415            .unwrap();
1416
1417        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1418
1419        let tx = db.begin_write().unwrap();
1420        {
1421            tx.open_table(table_def).unwrap();
1422        }
1423        tx.commit().unwrap();
1424
1425        let tx = db.begin_write().unwrap();
1426        {
1427            let mut t = tx.open_table(table_def).unwrap();
1428            assert!(t.get(&131072).unwrap().is_none());
1429            let value = vec![0xFF; 1130];
1430            t.insert(&42394, value.as_slice()).unwrap();
1431            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1432            assert!(t.get(&0).unwrap().is_none());
1433        }
1434        tx.abort().unwrap();
1435
1436        let tx = db.begin_write().unwrap();
1437        {
1438            let mut t = tx.open_table(table_def).unwrap();
1439            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1440        }
1441        tx.abort().unwrap();
1442    }
1443
1444    #[test]
1445    fn dynamic_shrink() {
1446        let tmpfile = crate::create_tempfile();
1447        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1448        let big_value = vec![0u8; 1024];
1449
1450        let db = Database::builder()
1451            .set_region_size(1024 * 1024)
1452            .create(tmpfile.path())
1453            .unwrap();
1454
1455        let txn = db.begin_write().unwrap();
1456        {
1457            let mut table = txn.open_table(table_definition).unwrap();
1458            for i in 0..2048 {
1459                table.insert(&i, big_value.as_slice()).unwrap();
1460            }
1461        }
1462        txn.commit().unwrap();
1463
1464        let file_size = tmpfile.as_file().metadata().unwrap().len();
1465
1466        let txn = db.begin_write().unwrap();
1467        {
1468            let mut table = txn.open_table(table_definition).unwrap();
1469            for i in 0..2048 {
1470                table.remove(&i).unwrap();
1471            }
1472        }
1473        txn.commit().unwrap();
1474
1475        // Perform a couple more commits to be sure the database has a chance to compact
1476        let txn = db.begin_write().unwrap();
1477        {
1478            let mut table = txn.open_table(table_definition).unwrap();
1479            table.insert(0, [].as_slice()).unwrap();
1480        }
1481        txn.commit().unwrap();
1482        let txn = db.begin_write().unwrap();
1483        {
1484            let mut table = txn.open_table(table_definition).unwrap();
1485            table.remove(0).unwrap();
1486        }
1487        txn.commit().unwrap();
1488        let txn = db.begin_write().unwrap();
1489        txn.commit().unwrap();
1490
1491        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1492        assert!(final_file_size < file_size);
1493    }
1494
1495    #[test]
1496    fn create_new_db_in_empty_file() {
1497        let tmpfile = crate::create_tempfile();
1498
1499        let _db = Database::builder()
1500            .create_file(tmpfile.into_file())
1501            .unwrap();
1502    }
1503
1504    #[test]
1505    fn open_missing_file() {
1506        let tmpfile = crate::create_tempfile();
1507
1508        let err = Database::builder()
1509            .open(tmpfile.path().with_extension("missing"))
1510            .unwrap_err();
1511
1512        match err {
1513            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1514            err => panic!("Unexpected error for empty file: {err}"),
1515        }
1516    }
1517
1518    #[test]
1519    fn open_empty_file() {
1520        let tmpfile = crate::create_tempfile();
1521
1522        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1523
1524        match err {
1525            DatabaseError::Storage(StorageError::Io(err))
1526                if err.kind() == ErrorKind::InvalidData => {}
1527            err => panic!("Unexpected error for empty file: {err}"),
1528        }
1529    }
1530}