1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, PageTrackerPolicy, RawBtree,
5 SerializedSavepoint, TableTree, TableTreeMut, TableType, TransactionalMemory,
6};
7use crate::types::{Key, Value};
8use crate::{
9 CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
10};
11use crate::{ReadTransaction, Result, WriteTransaction};
12use std::fmt::{Debug, Display, Formatter};
13
14use std::fs::{File, OpenOptions};
15use std::marker::PhantomData;
16use std::ops::RangeFull;
17use std::path::Path;
18use std::sync::{Arc, Mutex};
19use std::{io, thread};
20
21use crate::error::{TransactionError, UpgradeError};
22use crate::sealed::Sealed;
23use crate::transactions::{
24 ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
25 DATA_FREED_TABLE, PageList, SAVEPOINT_TABLE, SYSTEM_FREED_TABLE, SystemTableDefinition,
26 TransactionIdWithPagination,
27};
28use crate::tree_store::file_backend::FileBackend;
29#[cfg(feature = "logging")]
30use log::{debug, info, warn};
31
32#[allow(clippy::len_without_is_empty)]
33pub trait StorageBackend: 'static + Debug + Send + Sync {
35 fn len(&self) -> std::result::Result<u64, io::Error>;
37
38 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
42
43 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
47
48 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
54
55 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
57}
58
59pub trait TableHandle: Sealed {
60 fn name(&self) -> &str;
62}
63
64#[derive(Clone)]
65pub struct UntypedTableHandle {
66 name: String,
67}
68
69impl UntypedTableHandle {
70 pub(crate) fn new(name: String) -> Self {
71 Self { name }
72 }
73}
74
75impl TableHandle for UntypedTableHandle {
76 fn name(&self) -> &str {
77 &self.name
78 }
79}
80
81impl Sealed for UntypedTableHandle {}
82
83pub trait MultimapTableHandle: Sealed {
84 fn name(&self) -> &str;
86}
87
88#[derive(Clone)]
89pub struct UntypedMultimapTableHandle {
90 name: String,
91}
92
93impl UntypedMultimapTableHandle {
94 pub(crate) fn new(name: String) -> Self {
95 Self { name }
96 }
97}
98
99impl MultimapTableHandle for UntypedMultimapTableHandle {
100 fn name(&self) -> &str {
101 &self.name
102 }
103}
104
105impl Sealed for UntypedMultimapTableHandle {}
106
107pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
114 name: &'a str,
115 _key_type: PhantomData<K>,
116 _value_type: PhantomData<V>,
117}
118
119impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
120 pub const fn new(name: &'a str) -> Self {
126 assert!(!name.is_empty());
127 Self {
128 name,
129 _key_type: PhantomData,
130 _value_type: PhantomData,
131 }
132 }
133}
134
135impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
136 fn name(&self) -> &str {
137 self.name
138 }
139}
140
141impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
142
143impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
144 fn clone(&self) -> Self {
145 *self
146 }
147}
148
149impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
150
151impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 write!(
154 f,
155 "{}<{}, {}>",
156 self.name,
157 K::type_name().name(),
158 V::type_name().name()
159 )
160 }
161}
162
163pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
172 name: &'a str,
173 _key_type: PhantomData<K>,
174 _value_type: PhantomData<V>,
175}
176
177impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
178 pub const fn new(name: &'a str) -> Self {
179 assert!(!name.is_empty());
180 Self {
181 name,
182 _key_type: PhantomData,
183 _value_type: PhantomData,
184 }
185 }
186}
187
188impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
189 fn name(&self) -> &str {
190 self.name
191 }
192}
193
194impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
195
196impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
197 fn clone(&self) -> Self {
198 *self
199 }
200}
201
202impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
203
204impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
205 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
206 write!(
207 f,
208 "{}<{}, {}>",
209 self.name,
210 K::type_name().name(),
211 V::type_name().name()
212 )
213 }
214}
215
216#[derive(Debug)]
220pub struct CacheStats {
221 pub(crate) evictions: u64,
222}
223
224impl CacheStats {
225 pub fn evictions(&self) -> u64 {
229 self.evictions
230 }
231}
232
233pub(crate) struct TransactionGuard {
234 transaction_tracker: Option<Arc<TransactionTracker>>,
235 transaction_id: Option<TransactionId>,
236 write_transaction: bool,
237}
238
239impl TransactionGuard {
240 pub(crate) fn new_read(
241 transaction_id: TransactionId,
242 tracker: Arc<TransactionTracker>,
243 ) -> Self {
244 Self {
245 transaction_tracker: Some(tracker),
246 transaction_id: Some(transaction_id),
247 write_transaction: false,
248 }
249 }
250
251 pub(crate) fn new_write(
252 transaction_id: TransactionId,
253 tracker: Arc<TransactionTracker>,
254 ) -> Self {
255 Self {
256 transaction_tracker: Some(tracker),
257 transaction_id: Some(transaction_id),
258 write_transaction: true,
259 }
260 }
261
262 pub(crate) fn fake() -> Self {
264 Self {
265 transaction_tracker: None,
266 transaction_id: None,
267 write_transaction: false,
268 }
269 }
270
271 pub(crate) fn id(&self) -> TransactionId {
272 self.transaction_id.unwrap()
273 }
274
275 pub(crate) fn leak(mut self) -> TransactionId {
276 self.transaction_id.take().unwrap()
277 }
278}
279
280impl Drop for TransactionGuard {
281 fn drop(&mut self) {
282 if self.transaction_tracker.is_none() {
283 return;
284 }
285 if let Some(transaction_id) = self.transaction_id {
286 if self.write_transaction {
287 self.transaction_tracker
288 .as_ref()
289 .unwrap()
290 .end_write_transaction(transaction_id);
291 } else {
292 self.transaction_tracker
293 .as_ref()
294 .unwrap()
295 .deallocate_read_transaction(transaction_id);
296 }
297 }
298 }
299}
300
301pub struct Database {
332 mem: Arc<TransactionalMemory>,
333 transaction_tracker: Arc<TransactionTracker>,
334}
335
336impl Database {
337 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
342 Self::builder().create(path)
343 }
344
345 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
347 Self::builder().open(path)
348 }
349
350 pub fn cache_stats(&self) -> CacheStats {
354 self.mem.cache_stats()
355 }
356
357 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
358 self.mem.clone()
359 }
360
361 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
362 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
363 let fake_allocated = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
364 let table_tree = TableTreeMut::new(
366 mem.get_data_root(),
367 Arc::new(TransactionGuard::fake()),
368 mem.clone(),
369 fake_freed_pages.clone(),
370 fake_allocated.clone(),
371 );
372 if !table_tree.verify_checksums()? {
373 return Ok(false);
374 }
375 let system_table_tree = TableTreeMut::new(
377 mem.get_system_root(),
378 Arc::new(TransactionGuard::fake()),
379 mem.clone(),
380 fake_freed_pages.clone(),
381 fake_allocated.clone(),
382 );
383 if !system_table_tree.verify_checksums()? {
384 return Ok(false);
385 }
386 assert!(fake_freed_pages.lock().unwrap().is_empty());
387
388 if let Some(header) = mem.get_freed_root() {
389 if !RawBtree::new(
390 Some(header),
391 FreedTableKey::fixed_width(),
392 FreedPageList::fixed_width(),
393 mem.clone(),
394 )
395 .verify_checksum()?
396 {
397 return Ok(false);
398 }
399 }
400
401 Ok(true)
402 }
403
404 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
414 let allocator_hash = self.mem.allocator_hash();
415 let mut was_clean = Arc::get_mut(&mut self.mem)
416 .unwrap()
417 .clear_cache_and_reload()?;
418
419 let old_roots = [
420 self.mem.get_data_root(),
421 self.mem.get_system_root(),
422 self.mem.get_freed_root(),
423 ];
424
425 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
426 DatabaseError::Storage(storage_err) => storage_err,
427 _ => unreachable!(),
428 })?;
429
430 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
431 was_clean = false;
432 }
433
434 if !was_clean {
435 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
436 let [data_root, system_root, freed_root] = new_roots;
437 self.mem.commit(
438 data_root,
439 system_root,
440 freed_root,
441 next_transaction_id,
442 false,
443 true,
444 )?;
445 }
446
447 self.mem.begin_writable()?;
448
449 Ok(was_clean)
450 }
451
452 pub fn upgrade(&mut self) -> Result<bool, UpgradeError> {
460 if self.mem.file_format_v3() {
461 return Ok(false);
462 }
463
464 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
465 if txn.list_persistent_savepoints()?.next().is_some() {
466 return Err(UpgradeError::PersistentSavepointExists);
467 }
468 txn.abort()?;
469
470 if self.transaction_tracker.any_savepoint_exists() {
471 return Err(UpgradeError::EphemeralSavepointExists);
472 }
473
474 if self.transaction_tracker.any_user_read_transaction() {
475 return Err(UpgradeError::TransactionInProgress);
476 }
477
478 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
481 txn.set_two_phase_commit(true);
482 txn.commit()?;
483 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
485 txn.set_two_phase_commit(true);
486 txn.commit()?;
487 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
490 assert!(!txn.pending_free_pages()?);
491 txn.abort()?;
492
493 Arc::get_mut(&mut self.mem).unwrap().upgrade_to_v3()?;
494
495 Ok(true)
496 }
497
498 pub fn compact(&mut self) -> Result<bool, CompactionError> {
502 if self
503 .transaction_tracker
504 .oldest_live_read_transaction()
505 .is_some()
506 {
507 return Err(CompactionError::TransactionInProgress);
508 }
509 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
514 if txn.list_persistent_savepoints()?.next().is_some() {
515 return Err(CompactionError::PersistentSavepointExists);
516 }
517 if self.transaction_tracker.any_savepoint_exists() {
518 return Err(CompactionError::EphemeralSavepointExists);
519 }
520 txn.set_two_phase_commit(true);
521 txn.commit().map_err(|e| e.into_storage_error())?;
522 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
524 txn.set_two_phase_commit(true);
525 txn.commit().map_err(|e| e.into_storage_error())?;
526 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
529 assert!(!txn.pending_free_pages()?);
530 txn.abort()?;
531
532 let mut compacted = false;
533 loop {
535 let mut progress = false;
536
537 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
538 if txn.compact_pages()? {
539 progress = true;
540 txn.commit().map_err(|e| e.into_storage_error())?;
541 } else {
542 txn.abort()?;
543 }
544
545 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
547 txn.set_two_phase_commit(true);
548 txn.commit().map_err(|e| e.into_storage_error())?;
549 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
553 txn.set_two_phase_commit(true);
554 txn.commit().map_err(|e| e.into_storage_error())?;
555 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
556 assert!(!txn.pending_free_pages()?);
557 txn.abort()?;
558
559 if !progress {
560 break;
561 }
562
563 compacted = true;
564 }
565
566 Ok(compacted)
567 }
568
569 fn check_repaired_allocated_pages_table(
570 system_root: Option<BtreeHeader>,
571 mem: Arc<TransactionalMemory>,
572 ) -> Result {
573 let table_tree = TableTree::new(
574 system_root,
575 PageHint::None,
576 Arc::new(TransactionGuard::fake()),
577 mem.clone(),
578 )?;
579 if let Some(table_def) = table_tree
580 .get_table::<TransactionIdWithPagination, PageList>(
581 DATA_ALLOCATED_TABLE.name(),
582 TableType::Normal,
583 )
584 .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
585 {
586 let table_root = if let InternalTableDefinition::Normal { table_root, .. } = table_def {
587 table_root
588 } else {
589 unreachable!()
590 };
591 let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
592 DATA_ALLOCATED_TABLE.name().to_string(),
593 table_root,
594 PageHint::None,
595 Arc::new(TransactionGuard::fake()),
596 mem.clone(),
597 )?;
598 for result in table.range::<TransactionIdWithPagination>(..)? {
599 let (_, pages) = result?;
600 for i in 0..pages.value().len() {
601 assert!(mem.is_allocated(pages.value().get(i)));
602 }
603 }
604 }
605
606 Ok(())
607 }
608
609 fn check_repaired_persistent_savepoints(
610 system_root: Option<BtreeHeader>,
611 mem: Arc<TransactionalMemory>,
612 ) -> Result {
613 let table_tree = TableTree::new(
614 system_root,
615 PageHint::None,
616 Arc::new(TransactionGuard::fake()),
617 mem.clone(),
618 )?;
619 let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
620 if let Some(savepoint_table_def) = table_tree
621 .get_table::<SavepointId, SerializedSavepoint>(
622 SAVEPOINT_TABLE.name(),
623 TableType::Normal,
624 )
625 .map_err(|e| {
626 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
627 })?
628 {
629 let savepoint_table_root =
630 if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
631 table_root
632 } else {
633 unreachable!()
634 };
635 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
636 ReadOnlyTable::new(
637 "internal savepoint table".to_string(),
638 savepoint_table_root,
639 PageHint::None,
640 Arc::new(TransactionGuard::fake()),
641 mem.clone(),
642 )?;
643 for result in savepoint_table.range::<SavepointId>(..)? {
644 let (_, savepoint_data) = result?;
645 let savepoint = savepoint_data
646 .value()
647 .to_savepoint(fake_transaction_tracker.clone());
648 if let Some(header) = savepoint.get_user_root() {
649 Self::check_pages_allocated_recursive(header.root, mem.clone())?;
650 }
651 }
652 }
653
654 Ok(())
655 }
656
657 fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
658 if let Some(header) = freed_root {
659 let freed_pages_iter = AllPageNumbersBtreeIter::new(
660 header.root,
661 FreedTableKey::fixed_width(),
662 FreedPageList::fixed_width(),
663 mem.clone(),
664 )?;
665 for page in freed_pages_iter {
666 mem.mark_page_allocated(page?);
667 }
668 }
669
670 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
671 "internal freed table".to_string(),
672 freed_root,
673 PageHint::None,
674 Arc::new(TransactionGuard::fake()),
675 mem.clone(),
676 )?;
677 for result in freed_table.range::<FreedTableKey>(..)? {
678 let (_, freed_page_list) = result?;
679 for i in 0..freed_page_list.value().len() {
680 mem.mark_page_allocated(freed_page_list.value().get(i));
681 }
682 }
683
684 Ok(())
685 }
686
687 fn mark_freed_tree_v2<K: Key, V: Value>(
688 system_root: Option<BtreeHeader>,
689 table_def: SystemTableDefinition<K, V>,
690 mem: Arc<TransactionalMemory>,
691 ) -> Result {
692 let fake_guard = Arc::new(TransactionGuard::fake());
693 let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
694 let table_name = table_def.name();
695 let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
696 Ok(result) => result,
697 Err(TableError::Storage(err)) => {
698 return Err(err);
699 }
700 Err(TableError::TableDoesNotExist(_)) => {
701 return Ok(());
702 }
703 Err(_) => {
704 return Err(StorageError::Corrupted(format!(
705 "Unable to open {table_name}"
706 )));
707 }
708 };
709
710 if let Some(definition) = result {
711 let table_root = match definition {
712 InternalTableDefinition::Normal { table_root, .. } => table_root,
713 InternalTableDefinition::Multimap { .. } => unreachable!(),
714 };
715 let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
716 ReadOnlyTable::new(
717 table_name.to_string(),
718 table_root,
719 PageHint::None,
720 Arc::new(TransactionGuard::fake()),
721 mem.clone(),
722 )?;
723 for result in table.range::<TransactionIdWithPagination>(..)? {
724 let (_, page_list) = result?;
725 for i in 0..page_list.value().len() {
726 mem.mark_page_allocated(page_list.value().get(i));
727 }
728 }
729 }
730
731 Ok(())
732 }
733
734 fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
735 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
738 for result in master_pages_iter {
739 let page = result?;
740 assert!(mem.is_allocated(page));
741 }
742
743 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
745 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
746
747 for entry in iter {
749 let definition = entry?.value();
750 definition.visit_all_pages(mem.clone(), |path| {
751 assert!(mem.is_allocated(path.page_number()));
752 Ok(())
753 })?;
754 }
755
756 Ok(())
757 }
758
759 fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
760 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
763 for page in master_pages_iter {
764 mem.mark_page_allocated(page?);
765 }
766
767 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
769 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
770
771 for entry in iter {
773 let definition = entry?.value();
774 definition.visit_all_pages(mem.clone(), |path| {
775 mem.mark_page_allocated(path.page_number());
776 Ok(())
777 })?;
778 }
779
780 Ok(())
781 }
782
783 fn do_repair(
784 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
786 ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
787 if !Self::verify_primary_checksums(mem.clone())? {
788 if mem.used_two_phase_commit() {
789 return Err(DatabaseError::Storage(StorageError::Corrupted(
790 "Primary is corrupted despite 2-phase commit".to_string(),
791 )));
792 }
793
794 let mut handle = RepairSession::new(0.3);
796 repair_callback(&mut handle);
797 if handle.aborted() {
798 return Err(DatabaseError::RepairAborted);
799 }
800
801 mem.repair_primary_corrupted();
802 mem.clear_read_cache();
806 if !Self::verify_primary_checksums(mem.clone())? {
807 return Err(DatabaseError::Storage(StorageError::Corrupted(
808 "Failed to repair database. All roots are corrupted".to_string(),
809 )));
810 }
811 }
812 let mut handle = RepairSession::new(0.6);
814 repair_callback(&mut handle);
815 if handle.aborted() {
816 return Err(DatabaseError::RepairAborted);
817 }
818
819 mem.begin_repair()?;
820
821 let data_root = mem.get_data_root();
822 if let Some(header) = data_root {
823 Self::mark_tables_recursive(header.root, mem.clone())?;
824 }
825
826 let freed_root = mem.get_freed_root();
827 Self::mark_freed_tree(freed_root, mem.clone())?;
829 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
830 "internal freed table".to_string(),
831 freed_root,
832 PageHint::None,
833 Arc::new(TransactionGuard::fake()),
834 mem.clone(),
835 )?;
836 drop(freed_table);
837
838 let mut handle = RepairSession::new(0.9);
840 repair_callback(&mut handle);
841 if handle.aborted() {
842 return Err(DatabaseError::RepairAborted);
843 }
844
845 let system_root = mem.get_system_root();
846 if let Some(header) = system_root {
847 Self::mark_tables_recursive(header.root, mem.clone())?;
848 }
849
850 Self::mark_freed_tree_v2(system_root, DATA_FREED_TABLE, mem.clone())?;
851 Self::mark_freed_tree_v2(system_root, SYSTEM_FREED_TABLE, mem.clone())?;
852 #[cfg(debug_assertions)]
853 {
854 Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
857
858 Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
859 }
860
861 mem.end_repair()?;
862
863 mem.clear_read_cache();
866
867 Ok([data_root, system_root, freed_root])
868 }
869
870 #[allow(clippy::too_many_arguments)]
871 fn new(
872 file: Box<dyn StorageBackend>,
873 allow_initialize: bool,
874 page_size: usize,
875 region_size: Option<u64>,
876 read_cache_size_bytes: usize,
877 write_cache_size_bytes: usize,
878 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
879 default_to_file_format_v3: bool,
880 ) -> Result<Self, DatabaseError> {
881 #[cfg(feature = "logging")]
882 let file_path = format!("{:?}", &file);
883 #[cfg(feature = "logging")]
884 info!("Opening database {:?}", &file_path);
885 let mem = TransactionalMemory::new(
886 file,
887 allow_initialize,
888 page_size,
889 region_size,
890 read_cache_size_bytes,
891 write_cache_size_bytes,
892 default_to_file_format_v3,
893 )?;
894 let mut mem = Arc::new(mem);
895 if mem.needs_repair()? || mem.file_format_v3() {
898 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
901 #[cfg(feature = "logging")]
902 info!("Found valid allocator state, full repair not needed");
903 mem.load_allocator_state(&tree)?;
904 } else {
905 #[cfg(feature = "logging")]
906 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
907 let mut handle = RepairSession::new(0.0);
908 repair_callback(&mut handle);
909 if handle.aborted() {
910 return Err(DatabaseError::RepairAborted);
911 }
912 let [data_root, system_root, freed_root] =
913 Self::do_repair(&mut mem, repair_callback)?;
914 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
915 mem.commit(
916 data_root,
917 system_root,
918 freed_root,
919 next_transaction_id,
920 false,
921 true,
922 )?;
923 }
924 }
925
926 mem.begin_writable()?;
927 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
928
929 let db = Database {
930 mem,
931 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
932 };
933
934 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
936 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
937 db.transaction_tracker
938 .restore_savepoint_counter_state(next_id);
939 }
940 for id in txn.list_persistent_savepoints()? {
941 let savepoint = match txn.get_persistent_savepoint(id) {
942 Ok(savepoint) => savepoint,
943 Err(err) => match err {
944 SavepointError::InvalidSavepoint => unreachable!(),
945 SavepointError::Storage(storage) => {
946 return Err(storage.into());
947 }
948 },
949 };
950 db.transaction_tracker
951 .register_persistent_savepoint(&savepoint);
952 }
953 txn.version_asserts()?;
954 txn.abort()?;
955
956 Ok(db)
957 }
958
959 fn get_allocator_state_table(
960 mem: &Arc<TransactionalMemory>,
961 ) -> Result<Option<AllocatorStateTree>> {
962 if !mem.used_two_phase_commit() {
964 return Ok(None);
965 }
966
967 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
969 let fake_allocated = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
970 let system_table_tree = TableTreeMut::new(
971 mem.get_system_root(),
972 Arc::new(TransactionGuard::fake()),
973 mem.clone(),
974 fake_freed_pages.clone(),
975 fake_allocated.clone(),
976 );
977 let Some(allocator_state_table) = system_table_tree
978 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
979 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
980 else {
981 return Ok(None);
982 };
983
984 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
986 unreachable!();
987 };
988 let tree = AllocatorStateTree::new(
989 table_root,
990 Arc::new(TransactionGuard::fake()),
991 mem.clone(),
992 fake_freed_pages,
993 fake_allocated,
994 );
995
996 if !mem.is_valid_allocator_state(&tree)? {
998 return Ok(None);
999 }
1000
1001 Ok(Some(tree))
1002 }
1003
1004 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1005 let id = self
1006 .transaction_tracker
1007 .register_read_transaction(&self.mem)?;
1008
1009 Ok(TransactionGuard::new_read(
1010 id,
1011 self.transaction_tracker.clone(),
1012 ))
1013 }
1014
1015 pub fn builder() -> Builder {
1017 Builder::new()
1018 }
1019
1020 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1026 self.mem.check_io_errors()?;
1028 let guard = TransactionGuard::new_write(
1029 self.transaction_tracker.start_write_transaction(),
1030 self.transaction_tracker.clone(),
1031 );
1032 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1033 .map_err(|e| e.into())
1034 }
1035
1036 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
1044 let guard = self.allocate_read_transaction()?;
1045 #[cfg(feature = "logging")]
1046 debug!("Beginning read transaction id={:?}", guard.id());
1047 ReadTransaction::new(self.get_memory(), guard)
1048 }
1049
1050 fn ensure_allocator_state_table(&self) -> Result<(), Error> {
1051 if Self::get_allocator_state_table(&self.mem)?.is_some() {
1053 return Ok(());
1054 }
1055
1056 #[cfg(feature = "logging")]
1058 debug!("Writing allocator state table");
1059 let mut tx = self.begin_write()?;
1060 tx.set_quick_repair(true);
1061 tx.commit()?;
1062
1063 Ok(())
1064 }
1065}
1066
1067impl Drop for Database {
1068 fn drop(&mut self) {
1069 if thread::panicking() {
1070 return;
1071 }
1072
1073 if self.ensure_allocator_state_table().is_err() {
1074 #[cfg(feature = "logging")]
1075 warn!("Failed to write allocator state table. Repair may be required at restart.")
1076 }
1077 }
1078}
1079
1080pub struct RepairSession {
1081 progress: f64,
1082 aborted: bool,
1083}
1084
1085impl RepairSession {
1086 pub(crate) fn new(progress: f64) -> Self {
1087 Self {
1088 progress,
1089 aborted: false,
1090 }
1091 }
1092
1093 pub(crate) fn aborted(&self) -> bool {
1094 self.aborted
1095 }
1096
1097 pub fn abort(&mut self) {
1099 self.aborted = true;
1100 }
1101
1102 pub fn progress(&self) -> f64 {
1104 self.progress
1105 }
1106}
1107
1108pub struct Builder {
1110 page_size: usize,
1111 region_size: Option<u64>,
1112 read_cache_size_bytes: usize,
1113 write_cache_size_bytes: usize,
1114 repair_callback: Box<dyn Fn(&mut RepairSession)>,
1115 default_to_file_format_v3: bool,
1116}
1117
1118impl Builder {
1119 #[allow(clippy::new_without_default)]
1125 pub fn new() -> Self {
1126 let mut result = Self {
1127 page_size: PAGE_SIZE,
1131 region_size: None,
1132 read_cache_size_bytes: 0,
1134 write_cache_size_bytes: 0,
1136 repair_callback: Box::new(|_| {}),
1137 default_to_file_format_v3: false,
1138 };
1139
1140 result.set_cache_size(1024 * 1024 * 1024);
1141 result
1142 }
1143
1144 pub fn create_with_file_format_v3(&mut self, value: bool) -> &mut Self {
1150 self.default_to_file_format_v3 = value;
1151 self
1152 }
1153
1154 pub fn set_repair_callback(
1162 &mut self,
1163 callback: impl Fn(&mut RepairSession) + 'static,
1164 ) -> &mut Self {
1165 self.repair_callback = Box::new(callback);
1166 self
1167 }
1168
1169 #[cfg(any(fuzzing, test))]
1177 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1178 assert!(size.is_power_of_two());
1179 self.page_size = std::cmp::max(size, 512);
1180 self
1181 }
1182
1183 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1185 self.read_cache_size_bytes = bytes / 10 * 9;
1187 self.write_cache_size_bytes = bytes / 10;
1188 self
1189 }
1190
1191 #[cfg(any(test, fuzzing))]
1192 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1193 assert!(size.is_power_of_two());
1194 self.region_size = Some(size);
1195 self
1196 }
1197
1198 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1203 let file = OpenOptions::new()
1204 .read(true)
1205 .write(true)
1206 .create(true)
1207 .truncate(false)
1208 .open(path)?;
1209
1210 Database::new(
1211 Box::new(FileBackend::new(file)?),
1212 true,
1213 self.page_size,
1214 self.region_size,
1215 self.read_cache_size_bytes,
1216 self.write_cache_size_bytes,
1217 &self.repair_callback,
1218 self.default_to_file_format_v3,
1219 )
1220 }
1221
1222 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1224 let file = OpenOptions::new().read(true).write(true).open(path)?;
1225
1226 Database::new(
1227 Box::new(FileBackend::new(file)?),
1228 false,
1229 self.page_size,
1230 None,
1231 self.read_cache_size_bytes,
1232 self.write_cache_size_bytes,
1233 &self.repair_callback,
1234 self.default_to_file_format_v3,
1235 )
1236 }
1237
1238 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1242 Database::new(
1243 Box::new(FileBackend::new(file)?),
1244 true,
1245 self.page_size,
1246 self.region_size,
1247 self.read_cache_size_bytes,
1248 self.write_cache_size_bytes,
1249 &self.repair_callback,
1250 self.default_to_file_format_v3,
1251 )
1252 }
1253
1254 pub fn create_with_backend(
1256 &self,
1257 backend: impl StorageBackend,
1258 ) -> Result<Database, DatabaseError> {
1259 Database::new(
1260 Box::new(backend),
1261 true,
1262 self.page_size,
1263 self.region_size,
1264 self.read_cache_size_bytes,
1265 self.write_cache_size_bytes,
1266 &self.repair_callback,
1267 self.default_to_file_format_v3,
1268 )
1269 }
1270}
1271
1272impl std::fmt::Debug for Database {
1273 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1274 f.debug_struct("Database").finish()
1275 }
1276}
1277
1278#[cfg(test)]
1279mod test {
1280 use crate::backends::FileBackend;
1281 use crate::{
1282 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1283 StorageError, TableDefinition, TransactionError,
1284 };
1285 use std::fs::File;
1286 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1287 use std::sync::Arc;
1288 use std::sync::atomic::{AtomicU64, Ordering};
1289
1290 #[derive(Debug)]
1291 struct FailingBackend {
1292 inner: FileBackend,
1293 countdown: Arc<AtomicU64>,
1294 }
1295
1296 impl FailingBackend {
1297 fn new(backend: FileBackend, countdown: u64) -> Self {
1298 Self {
1299 inner: backend,
1300 countdown: Arc::new(AtomicU64::new(countdown)),
1301 }
1302 }
1303
1304 fn check_countdown(&self) -> Result<(), std::io::Error> {
1305 if self.countdown.load(Ordering::SeqCst) == 0 {
1306 return Err(std::io::Error::from(ErrorKind::Other));
1307 }
1308
1309 Ok(())
1310 }
1311
1312 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1313 if self
1314 .countdown
1315 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1316 if x > 0 { Some(x - 1) } else { None }
1317 })
1318 .is_err()
1319 {
1320 return Err(std::io::Error::from(ErrorKind::Other));
1321 }
1322
1323 Ok(())
1324 }
1325 }
1326
1327 impl StorageBackend for FailingBackend {
1328 fn len(&self) -> Result<u64, std::io::Error> {
1329 self.inner.len()
1330 }
1331
1332 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1333 self.check_countdown()?;
1334 self.inner.read(offset, len)
1335 }
1336
1337 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1338 self.inner.set_len(len)
1339 }
1340
1341 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1342 self.check_countdown()?;
1343 self.inner.sync_data(eventual)
1344 }
1345
1346 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1347 self.decrement_countdown()?;
1348 self.inner.write(offset, data)
1349 }
1350 }
1351
1352 #[test]
1353 fn crash_regression4() {
1354 let tmpfile = crate::create_tempfile();
1355 let (file, path) = tmpfile.into_parts();
1356
1357 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 24);
1358 let db = Database::builder()
1359 .set_cache_size(12686)
1360 .set_page_size(8 * 1024)
1361 .set_region_size(32 * 4096)
1362 .create_with_backend(backend)
1363 .unwrap();
1364
1365 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1366
1367 let tx = db.begin_write().unwrap();
1368 let _savepoint = tx.ephemeral_savepoint().unwrap();
1369 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1370 tx.commit().unwrap();
1371 let tx = db.begin_write().unwrap();
1372 {
1373 let mut table = tx.open_table(table_def).unwrap();
1374 let _ = table.insert_reserve(118821, 360).unwrap();
1375 }
1376 let result = tx.commit();
1377 assert!(result.is_err());
1378
1379 drop(db);
1380 Database::builder()
1381 .set_cache_size(1024 * 1024)
1382 .set_page_size(8 * 1024)
1383 .set_region_size(32 * 4096)
1384 .create(&path)
1385 .unwrap();
1386 }
1387
1388 #[test]
1389 fn transient_io_error() {
1390 let tmpfile = crate::create_tempfile();
1391 let (file, path) = tmpfile.into_parts();
1392
1393 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1394 let countdown = backend.countdown.clone();
1395 let db = Database::builder()
1396 .set_cache_size(0)
1397 .create_with_backend(backend)
1398 .unwrap();
1399
1400 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1401
1402 let tx = db.begin_write().unwrap();
1404 {
1405 let mut table = tx.open_table(table_def).unwrap();
1406 table.insert(0, 0).unwrap();
1407 }
1408 tx.commit().unwrap();
1409 let tx = db.begin_write().unwrap();
1410 {
1411 let mut table = tx.open_table(table_def).unwrap();
1412 table.insert(0, 1).unwrap();
1413 }
1414 tx.commit().unwrap();
1415
1416 let tx = db.begin_write().unwrap();
1417 countdown.store(0, Ordering::SeqCst);
1419 let result = tx.commit().err().unwrap();
1420 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1421 let result = db.begin_write().err().unwrap();
1422 assert!(matches!(
1423 result,
1424 TransactionError::Storage(StorageError::PreviousIo)
1425 ));
1426 countdown.store(u64::MAX, Ordering::SeqCst);
1428 drop(db);
1429
1430 let mut file = File::open(&path).unwrap();
1432 file.seek(SeekFrom::Start(9)).unwrap();
1433 let mut god_byte = vec![0u8];
1434 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1435 assert_ne!(god_byte[0] & 2, 0);
1436 }
1437
1438 #[test]
1439 fn small_pages() {
1440 let tmpfile = crate::create_tempfile();
1441
1442 let db = Database::builder()
1443 .set_page_size(512)
1444 .create(tmpfile.path())
1445 .unwrap();
1446
1447 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1448 let txn = db.begin_write().unwrap();
1449 {
1450 txn.open_table(table_definition).unwrap();
1451 }
1452 txn.commit().unwrap();
1453 }
1454
1455 #[test]
1456 fn small_pages2() {
1457 let tmpfile = crate::create_tempfile();
1458
1459 let db = Database::builder()
1460 .set_page_size(512)
1461 .create(tmpfile.path())
1462 .unwrap();
1463
1464 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1465
1466 let mut tx = db.begin_write().unwrap();
1467 tx.set_two_phase_commit(true);
1468 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1469 {
1470 tx.open_table(table_def).unwrap();
1471 }
1472 tx.commit().unwrap();
1473
1474 let mut tx = db.begin_write().unwrap();
1475 tx.set_two_phase_commit(true);
1476 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1477 tx.restore_savepoint(&savepoint0).unwrap();
1478 tx.set_durability(Durability::None);
1479 {
1480 let mut t = tx.open_table(table_def).unwrap();
1481 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1482 assert!(t.remove(&291295).unwrap().is_none());
1483 }
1484 tx.commit().unwrap();
1485
1486 let mut tx = db.begin_write().unwrap();
1487 tx.set_two_phase_commit(true);
1488 tx.restore_savepoint(&savepoint0).unwrap();
1489 {
1490 tx.open_table(table_def).unwrap();
1491 }
1492 tx.commit().unwrap();
1493
1494 let mut tx = db.begin_write().unwrap();
1495 tx.set_two_phase_commit(true);
1496 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1497 drop(savepoint0);
1498 tx.restore_savepoint(&savepoint2).unwrap();
1499 {
1500 let mut t = tx.open_table(table_def).unwrap();
1501 assert!(t.get(&2059).unwrap().is_none());
1502 assert!(t.remove(&145227).unwrap().is_none());
1503 assert!(t.remove(&145227).unwrap().is_none());
1504 }
1505 tx.commit().unwrap();
1506
1507 let mut tx = db.begin_write().unwrap();
1508 tx.set_two_phase_commit(true);
1509 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1510 drop(savepoint1);
1511 tx.restore_savepoint(&savepoint3).unwrap();
1512 {
1513 tx.open_table(table_def).unwrap();
1514 }
1515 tx.commit().unwrap();
1516
1517 let mut tx = db.begin_write().unwrap();
1518 tx.set_two_phase_commit(true);
1519 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1520 drop(savepoint2);
1521 tx.restore_savepoint(&savepoint3).unwrap();
1522 tx.set_durability(Durability::None);
1523 {
1524 let mut t = tx.open_table(table_def).unwrap();
1525 assert!(t.remove(&207936).unwrap().is_none());
1526 }
1527 tx.abort().unwrap();
1528
1529 let mut tx = db.begin_write().unwrap();
1530 tx.set_two_phase_commit(true);
1531 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1532 drop(savepoint3);
1533 assert!(tx.restore_savepoint(&savepoint4).is_err());
1534 {
1535 tx.open_table(table_def).unwrap();
1536 }
1537 tx.commit().unwrap();
1538
1539 let mut tx = db.begin_write().unwrap();
1540 tx.set_two_phase_commit(true);
1541 tx.restore_savepoint(&savepoint5).unwrap();
1542 tx.set_durability(Durability::None);
1543 {
1544 tx.open_table(table_def).unwrap();
1545 }
1546 tx.commit().unwrap();
1547 }
1548
1549 #[test]
1550 fn small_pages3() {
1551 let tmpfile = crate::create_tempfile();
1552
1553 let db = Database::builder()
1554 .set_page_size(1024)
1555 .create(tmpfile.path())
1556 .unwrap();
1557
1558 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1559
1560 let mut tx = db.begin_write().unwrap();
1561 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1562 tx.set_durability(Durability::None);
1563 {
1564 let mut t = tx.open_table(table_def).unwrap();
1565 let value = vec![0; 306];
1566 t.insert(&539717, value.as_slice()).unwrap();
1567 }
1568 tx.abort().unwrap();
1569
1570 let mut tx = db.begin_write().unwrap();
1571 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1572 tx.restore_savepoint(&savepoint1).unwrap();
1573 tx.set_durability(Durability::None);
1574 {
1575 let mut t = tx.open_table(table_def).unwrap();
1576 let value = vec![0; 2008];
1577 t.insert(&784384, value.as_slice()).unwrap();
1578 }
1579 tx.abort().unwrap();
1580 }
1581
1582 #[test]
1583 fn small_pages4() {
1584 let tmpfile = crate::create_tempfile();
1585
1586 let db = Database::builder()
1587 .set_cache_size(1024 * 1024)
1588 .set_page_size(1024)
1589 .create(tmpfile.path())
1590 .unwrap();
1591
1592 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1593
1594 let tx = db.begin_write().unwrap();
1595 {
1596 tx.open_table(table_def).unwrap();
1597 }
1598 tx.commit().unwrap();
1599
1600 let tx = db.begin_write().unwrap();
1601 {
1602 let mut t = tx.open_table(table_def).unwrap();
1603 assert!(t.get(&131072).unwrap().is_none());
1604 let value = vec![0xFF; 1130];
1605 t.insert(&42394, value.as_slice()).unwrap();
1606 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1607 assert!(t.get(&0).unwrap().is_none());
1608 }
1609 tx.abort().unwrap();
1610
1611 let tx = db.begin_write().unwrap();
1612 {
1613 let mut t = tx.open_table(table_def).unwrap();
1614 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1615 }
1616 tx.abort().unwrap();
1617 }
1618
1619 #[test]
1620 fn dynamic_shrink() {
1621 let tmpfile = crate::create_tempfile();
1622 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1623 let big_value = vec![0u8; 1024];
1624
1625 let db = Database::builder()
1626 .set_region_size(1024 * 1024)
1627 .create(tmpfile.path())
1628 .unwrap();
1629
1630 let txn = db.begin_write().unwrap();
1631 {
1632 let mut table = txn.open_table(table_definition).unwrap();
1633 for i in 0..2048 {
1634 table.insert(&i, big_value.as_slice()).unwrap();
1635 }
1636 }
1637 txn.commit().unwrap();
1638
1639 let file_size = tmpfile.as_file().metadata().unwrap().len();
1640
1641 let txn = db.begin_write().unwrap();
1642 {
1643 let mut table = txn.open_table(table_definition).unwrap();
1644 for i in 0..2048 {
1645 table.remove(&i).unwrap();
1646 }
1647 }
1648 txn.commit().unwrap();
1649
1650 let txn = db.begin_write().unwrap();
1652 {
1653 let mut table = txn.open_table(table_definition).unwrap();
1654 table.insert(0, [].as_slice()).unwrap();
1655 }
1656 txn.commit().unwrap();
1657 let txn = db.begin_write().unwrap();
1658 {
1659 let mut table = txn.open_table(table_definition).unwrap();
1660 table.remove(0).unwrap();
1661 }
1662 txn.commit().unwrap();
1663 let txn = db.begin_write().unwrap();
1664 txn.commit().unwrap();
1665
1666 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1667 assert!(final_file_size < file_size);
1668 }
1669
1670 #[test]
1671 fn create_new_db_in_empty_file() {
1672 let tmpfile = crate::create_tempfile();
1673
1674 let _db = Database::builder()
1675 .create_file(tmpfile.into_file())
1676 .unwrap();
1677 }
1678
1679 #[test]
1680 fn open_missing_file() {
1681 let tmpfile = crate::create_tempfile();
1682
1683 let err = Database::builder()
1684 .open(tmpfile.path().with_extension("missing"))
1685 .unwrap_err();
1686
1687 match err {
1688 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1689 err => panic!("Unexpected error for empty file: {err}"),
1690 }
1691 }
1692
1693 #[test]
1694 fn open_empty_file() {
1695 let tmpfile = crate::create_tempfile();
1696
1697 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1698
1699 match err {
1700 DatabaseError::Storage(StorageError::Io(err))
1701 if err.kind() == ErrorKind::InvalidData => {}
1702 err => panic!("Unexpected error for empty file: {err}"),
1703 }
1704 }
1705}