1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTreeMut,
5 TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::marker::PhantomData;
14use std::ops::RangeFull;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22 AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE,
23};
24use crate::tree_store::file_backend::FileBackend;
25#[cfg(feature = "logging")]
26use log::{debug, info, warn};
27
28#[allow(clippy::len_without_is_empty)]
29pub trait StorageBackend: 'static + Debug + Send + Sync {
31 fn len(&self) -> std::result::Result<u64, io::Error>;
33
34 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
38
39 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
43
44 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
50
51 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
53}
54
55pub trait TableHandle: Sealed {
56 fn name(&self) -> &str;
58}
59
60#[derive(Clone)]
61pub struct UntypedTableHandle {
62 name: String,
63}
64
65impl UntypedTableHandle {
66 pub(crate) fn new(name: String) -> Self {
67 Self { name }
68 }
69}
70
71impl TableHandle for UntypedTableHandle {
72 fn name(&self) -> &str {
73 &self.name
74 }
75}
76
77impl Sealed for UntypedTableHandle {}
78
79pub trait MultimapTableHandle: Sealed {
80 fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedMultimapTableHandle {
86 name: String,
87}
88
89impl UntypedMultimapTableHandle {
90 pub(crate) fn new(name: String) -> Self {
91 Self { name }
92 }
93}
94
95impl MultimapTableHandle for UntypedMultimapTableHandle {
96 fn name(&self) -> &str {
97 &self.name
98 }
99}
100
101impl Sealed for UntypedMultimapTableHandle {}
102
103pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
110 name: &'a str,
111 _key_type: PhantomData<K>,
112 _value_type: PhantomData<V>,
113}
114
115impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
116 pub const fn new(name: &'a str) -> Self {
122 assert!(!name.is_empty());
123 Self {
124 name,
125 _key_type: PhantomData,
126 _value_type: PhantomData,
127 }
128 }
129}
130
131impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'a, K, V> {
132 fn name(&self) -> &str {
133 self.name
134 }
135}
136
137impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
138
139impl<'a, K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'a, K, V> {
140 fn clone(&self) -> Self {
141 *self
142 }
143}
144
145impl<'a, K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'a, K, V> {}
146
147impl<'a, K: Key + 'static, V: Value + 'static> Display for TableDefinition<'a, K, V> {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 write!(
150 f,
151 "{}<{}, {}>",
152 self.name,
153 K::type_name().name(),
154 V::type_name().name()
155 )
156 }
157}
158
159pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
168 name: &'a str,
169 _key_type: PhantomData<K>,
170 _value_type: PhantomData<V>,
171}
172
173impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
174 pub const fn new(name: &'a str) -> Self {
175 assert!(!name.is_empty());
176 Self {
177 name,
178 _key_type: PhantomData,
179 _value_type: PhantomData,
180 }
181 }
182}
183
184impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableHandle
185 for MultimapTableDefinition<'a, K, V>
186{
187 fn name(&self) -> &str {
188 self.name
189 }
190}
191
192impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
193
194impl<'a, K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'a, K, V> {
195 fn clone(&self) -> Self {
196 *self
197 }
198}
199
200impl<'a, K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
201
202impl<'a, K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'a, K, V> {
203 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
204 write!(
205 f,
206 "{}<{}, {}>",
207 self.name,
208 K::type_name().name(),
209 V::type_name().name()
210 )
211 }
212}
213
214#[derive(Debug)]
218pub struct CacheStats {
219 pub(crate) evictions: u64,
220}
221
222impl CacheStats {
223 pub fn evictions(&self) -> u64 {
227 self.evictions
228 }
229}
230
231pub(crate) struct TransactionGuard {
232 transaction_tracker: Option<Arc<TransactionTracker>>,
233 transaction_id: Option<TransactionId>,
234 write_transaction: bool,
235}
236
237impl TransactionGuard {
238 pub(crate) fn new_read(
239 transaction_id: TransactionId,
240 tracker: Arc<TransactionTracker>,
241 ) -> Self {
242 Self {
243 transaction_tracker: Some(tracker),
244 transaction_id: Some(transaction_id),
245 write_transaction: false,
246 }
247 }
248
249 pub(crate) fn new_write(
250 transaction_id: TransactionId,
251 tracker: Arc<TransactionTracker>,
252 ) -> Self {
253 Self {
254 transaction_tracker: Some(tracker),
255 transaction_id: Some(transaction_id),
256 write_transaction: true,
257 }
258 }
259
260 pub(crate) fn fake() -> Self {
262 Self {
263 transaction_tracker: None,
264 transaction_id: None,
265 write_transaction: false,
266 }
267 }
268
269 pub(crate) fn id(&self) -> TransactionId {
270 self.transaction_id.unwrap()
271 }
272
273 pub(crate) fn leak(mut self) -> TransactionId {
274 self.transaction_id.take().unwrap()
275 }
276}
277
278impl Drop for TransactionGuard {
279 fn drop(&mut self) {
280 if self.transaction_tracker.is_none() {
281 return;
282 }
283 if let Some(transaction_id) = self.transaction_id {
284 if self.write_transaction {
285 self.transaction_tracker
286 .as_ref()
287 .unwrap()
288 .end_write_transaction(transaction_id);
289 } else {
290 self.transaction_tracker
291 .as_ref()
292 .unwrap()
293 .deallocate_read_transaction(transaction_id);
294 }
295 }
296 }
297}
298
299pub struct Database {
330 mem: Arc<TransactionalMemory>,
331 transaction_tracker: Arc<TransactionTracker>,
332}
333
334impl Database {
335 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
340 Self::builder().create(path)
341 }
342
343 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
345 Self::builder().open(path)
346 }
347
348 pub fn cache_stats(&self) -> CacheStats {
352 self.mem.cache_stats()
353 }
354
355 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
356 self.mem.clone()
357 }
358
359 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
360 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
361 let table_tree = TableTreeMut::new(
362 mem.get_data_root(),
363 Arc::new(TransactionGuard::fake()),
364 mem.clone(),
365 fake_freed_pages.clone(),
366 );
367 if !table_tree.verify_checksums()? {
368 return Ok(false);
369 }
370 let system_table_tree = TableTreeMut::new(
371 mem.get_system_root(),
372 Arc::new(TransactionGuard::fake()),
373 mem.clone(),
374 fake_freed_pages.clone(),
375 );
376 if !system_table_tree.verify_checksums()? {
377 return Ok(false);
378 }
379 assert!(fake_freed_pages.lock().unwrap().is_empty());
380
381 if let Some(header) = mem.get_freed_root() {
382 if !RawBtree::new(
383 Some(header),
384 FreedTableKey::fixed_width(),
385 FreedPageList::fixed_width(),
386 mem.clone(),
387 )
388 .verify_checksum()?
389 {
390 return Ok(false);
391 }
392 }
393
394 Ok(true)
395 }
396
397 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
407 let allocator_hash = self.mem.allocator_hash();
408 let mut was_clean = Arc::get_mut(&mut self.mem)
409 .unwrap()
410 .clear_cache_and_reload()?;
411
412 let old_roots = [
413 self.mem.get_data_root(),
414 self.mem.get_system_root(),
415 self.mem.get_freed_root(),
416 ];
417
418 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
419 DatabaseError::Storage(storage_err) => storage_err,
420 _ => unreachable!(),
421 })?;
422
423 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
424 was_clean = false;
425 }
426
427 if !was_clean {
428 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
429 let [data_root, system_root, freed_root] = new_roots;
430 self.mem.commit(
431 data_root,
432 system_root,
433 freed_root,
434 next_transaction_id,
435 false,
436 true,
437 )?;
438 }
439
440 self.mem.begin_writable()?;
441
442 Ok(was_clean)
443 }
444
445 pub fn compact(&mut self) -> Result<bool, CompactionError> {
449 if self
450 .transaction_tracker
451 .oldest_live_read_transaction()
452 .is_some()
453 {
454 return Err(CompactionError::TransactionInProgress);
455 }
456 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
461 if txn.list_persistent_savepoints()?.next().is_some() {
462 return Err(CompactionError::PersistentSavepointExists);
463 }
464 if self.transaction_tracker.any_savepoint_exists() {
465 return Err(CompactionError::EphemeralSavepointExists);
466 }
467 txn.set_two_phase_commit(true);
468 txn.commit().map_err(|e| e.into_storage_error())?;
469 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
471 txn.set_two_phase_commit(true);
472 txn.commit().map_err(|e| e.into_storage_error())?;
473 assert!(self.mem.get_freed_root().is_none());
476
477 let mut compacted = false;
478 loop {
480 let mut progress = false;
481
482 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
483 if txn.compact_pages()? {
484 progress = true;
485 txn.commit().map_err(|e| e.into_storage_error())?;
486 } else {
487 txn.abort()?;
488 }
489
490 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
492 txn.set_two_phase_commit(true);
493 txn.commit().map_err(|e| e.into_storage_error())?;
494 assert!(self.mem.get_freed_root().is_none());
495
496 if !progress {
497 break;
498 }
499
500 compacted = true;
501 }
502
503 Ok(compacted)
504 }
505
506 fn check_repaired_persistent_savepoints(
507 system_root: Option<BtreeHeader>,
508 mem: Arc<TransactionalMemory>,
509 ) -> Result {
510 let freed_list = Arc::new(Mutex::new(vec![]));
511 let table_tree = TableTreeMut::new(
512 system_root,
513 Arc::new(TransactionGuard::fake()),
514 mem.clone(),
515 freed_list,
516 );
517 let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
518 if let Some(savepoint_table_def) = table_tree
519 .get_table::<SavepointId, SerializedSavepoint>(
520 SAVEPOINT_TABLE.name(),
521 TableType::Normal,
522 )
523 .map_err(|e| {
524 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
525 })?
526 {
527 let savepoint_table_root =
528 if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
529 table_root
530 } else {
531 unreachable!()
532 };
533 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
534 ReadOnlyTable::new(
535 "internal savepoint table".to_string(),
536 savepoint_table_root,
537 PageHint::None,
538 Arc::new(TransactionGuard::fake()),
539 mem.clone(),
540 )?;
541 for result in savepoint_table.range::<SavepointId>(..)? {
542 let (_, savepoint_data) = result?;
543 let savepoint = savepoint_data
544 .value()
545 .to_savepoint(fake_transaction_tracker.clone());
546 if let Some(header) = savepoint.get_user_root() {
547 Self::check_pages_allocated_recursive(header.root, mem.clone())?;
548 }
549 }
550 }
551
552 Ok(())
553 }
554
555 fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
556 if let Some(header) = freed_root {
557 let freed_pages_iter = AllPageNumbersBtreeIter::new(
558 header.root,
559 FreedTableKey::fixed_width(),
560 FreedPageList::fixed_width(),
561 mem.clone(),
562 )?;
563 for page in freed_pages_iter {
564 mem.mark_page_allocated(page?);
565 }
566 }
567
568 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
569 "internal freed table".to_string(),
570 freed_root,
571 PageHint::None,
572 Arc::new(TransactionGuard::fake()),
573 mem.clone(),
574 )?;
575 for result in freed_table.range::<FreedTableKey>(..)? {
576 let (_, freed_page_list) = result?;
577 for i in 0..freed_page_list.value().len() {
578 mem.mark_page_allocated(freed_page_list.value().get(i));
579 }
580 }
581
582 Ok(())
583 }
584
585 fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
586 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
589 for result in master_pages_iter {
590 let page = result?;
591 assert!(mem.is_allocated(page));
592 }
593
594 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
596 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
597
598 for entry in iter {
600 let definition = entry?.value();
601 definition.visit_all_pages(mem.clone(), |path| {
602 assert!(mem.is_allocated(path.page_number()));
603 Ok(())
604 })?;
605 }
606
607 Ok(())
608 }
609
610 fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
611 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
614 for page in master_pages_iter {
615 mem.mark_page_allocated(page?);
616 }
617
618 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
620 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
621
622 for entry in iter {
624 let definition = entry?.value();
625 definition.visit_all_pages(mem.clone(), |path| {
626 mem.mark_page_allocated(path.page_number());
627 Ok(())
628 })?;
629 }
630
631 Ok(())
632 }
633
634 fn do_repair(
635 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
637 ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
638 if !Self::verify_primary_checksums(mem.clone())? {
639 if mem.used_two_phase_commit() {
640 return Err(DatabaseError::Storage(StorageError::Corrupted(
641 "Primary is corrupted despite 2-phase commit".to_string(),
642 )));
643 }
644
645 let mut handle = RepairSession::new(0.3);
647 repair_callback(&mut handle);
648 if handle.aborted() {
649 return Err(DatabaseError::RepairAborted);
650 }
651
652 mem.repair_primary_corrupted();
653 mem.clear_read_cache();
657 if !Self::verify_primary_checksums(mem.clone())? {
658 return Err(DatabaseError::Storage(StorageError::Corrupted(
659 "Failed to repair database. All roots are corrupted".to_string(),
660 )));
661 }
662 }
663 let mut handle = RepairSession::new(0.6);
665 repair_callback(&mut handle);
666 if handle.aborted() {
667 return Err(DatabaseError::RepairAborted);
668 }
669
670 mem.begin_repair()?;
671
672 let data_root = mem.get_data_root();
673 if let Some(header) = data_root {
674 Self::mark_tables_recursive(header.root, mem.clone())?;
675 }
676
677 let freed_root = mem.get_freed_root();
678 Self::mark_freed_tree(freed_root, mem.clone())?;
680 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
681 "internal freed table".to_string(),
682 freed_root,
683 PageHint::None,
684 Arc::new(TransactionGuard::fake()),
685 mem.clone(),
686 )?;
687 drop(freed_table);
688
689 let mut handle = RepairSession::new(0.9);
691 repair_callback(&mut handle);
692 if handle.aborted() {
693 return Err(DatabaseError::RepairAborted);
694 }
695
696 let system_root = mem.get_system_root();
697 if let Some(header) = system_root {
698 Self::mark_tables_recursive(header.root, mem.clone())?;
699 }
700 #[cfg(debug_assertions)]
701 {
702 Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
705 }
706
707 mem.end_repair()?;
708
709 mem.clear_read_cache();
712
713 Ok([data_root, system_root, freed_root])
714 }
715
716 fn new(
717 file: Box<dyn StorageBackend>,
718 allow_initialize: bool,
719 page_size: usize,
720 region_size: Option<u64>,
721 read_cache_size_bytes: usize,
722 write_cache_size_bytes: usize,
723 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
724 ) -> Result<Self, DatabaseError> {
725 #[cfg(feature = "logging")]
726 let file_path = format!("{:?}", &file);
727 #[cfg(feature = "logging")]
728 info!("Opening database {:?}", &file_path);
729 let mem = TransactionalMemory::new(
730 file,
731 allow_initialize,
732 page_size,
733 region_size,
734 read_cache_size_bytes,
735 write_cache_size_bytes,
736 )?;
737 let mut mem = Arc::new(mem);
738 if mem.needs_repair()? {
739 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
742 #[cfg(feature = "logging")]
743 info!("Found valid allocator state, full repair not needed");
744 mem.load_allocator_state(&tree)?;
745 } else {
746 #[cfg(feature = "logging")]
747 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
748 let mut handle = RepairSession::new(0.0);
749 repair_callback(&mut handle);
750 if handle.aborted() {
751 return Err(DatabaseError::RepairAborted);
752 }
753 let [data_root, system_root, freed_root] =
754 Self::do_repair(&mut mem, repair_callback)?;
755 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
756 mem.commit(
757 data_root,
758 system_root,
759 freed_root,
760 next_transaction_id,
761 false,
762 true,
763 )?;
764 }
765 }
766
767 mem.begin_writable()?;
768 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
769
770 let db = Database {
771 mem,
772 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
773 };
774
775 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
777 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
778 db.transaction_tracker
779 .restore_savepoint_counter_state(next_id);
780 }
781 for id in txn.list_persistent_savepoints()? {
782 let savepoint = match txn.get_persistent_savepoint(id) {
783 Ok(savepoint) => savepoint,
784 Err(err) => match err {
785 SavepointError::InvalidSavepoint => unreachable!(),
786 SavepointError::Storage(storage) => {
787 return Err(storage.into());
788 }
789 },
790 };
791 db.transaction_tracker
792 .register_persistent_savepoint(&savepoint);
793 }
794 txn.abort()?;
795
796 Ok(db)
797 }
798
799 fn get_allocator_state_table(
800 mem: &Arc<TransactionalMemory>,
801 ) -> Result<Option<AllocatorStateTree>> {
802 if !mem.used_two_phase_commit() {
804 return Ok(None);
805 }
806
807 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
809 let system_table_tree = TableTreeMut::new(
810 mem.get_system_root(),
811 Arc::new(TransactionGuard::fake()),
812 mem.clone(),
813 fake_freed_pages.clone(),
814 );
815 let Some(allocator_state_table) = system_table_tree
816 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
817 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
818 else {
819 return Ok(None);
820 };
821
822 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
824 unreachable!();
825 };
826 let tree = AllocatorStateTree::new(
827 table_root,
828 Arc::new(TransactionGuard::fake()),
829 mem.clone(),
830 fake_freed_pages,
831 );
832
833 if !mem.is_valid_allocator_state(&tree)? {
835 return Ok(None);
836 }
837
838 Ok(Some(tree))
839 }
840
841 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
842 let id = self
843 .transaction_tracker
844 .register_read_transaction(&self.mem)?;
845
846 Ok(TransactionGuard::new_read(
847 id,
848 self.transaction_tracker.clone(),
849 ))
850 }
851
852 pub fn builder() -> Builder {
854 Builder::new()
855 }
856
857 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
863 self.mem.check_io_errors()?;
865 let guard = TransactionGuard::new_write(
866 self.transaction_tracker.start_write_transaction(),
867 self.transaction_tracker.clone(),
868 );
869 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
870 .map_err(|e| e.into())
871 }
872
873 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
881 let guard = self.allocate_read_transaction()?;
882 #[cfg(feature = "logging")]
883 debug!("Beginning read transaction id={:?}", guard.id());
884 ReadTransaction::new(self.get_memory(), guard)
885 }
886
887 fn ensure_allocator_state_table(&self) -> Result<(), Error> {
888 if Self::get_allocator_state_table(&self.mem)?.is_some() {
890 return Ok(());
891 }
892
893 #[cfg(feature = "logging")]
895 debug!("Writing allocator state table");
896 let mut tx = self.begin_write()?;
897 tx.set_quick_repair(true);
898 tx.commit()?;
899
900 Ok(())
901 }
902}
903
904impl Drop for Database {
905 fn drop(&mut self) {
906 if thread::panicking() {
907 return;
908 }
909
910 if self.ensure_allocator_state_table().is_err() {
911 #[cfg(feature = "logging")]
912 warn!("Failed to write allocator state table. Repair may be required at restart.")
913 }
914 }
915}
916
917pub struct RepairSession {
918 progress: f64,
919 aborted: bool,
920}
921
922impl RepairSession {
923 pub(crate) fn new(progress: f64) -> Self {
924 Self {
925 progress,
926 aborted: false,
927 }
928 }
929
930 pub(crate) fn aborted(&self) -> bool {
931 self.aborted
932 }
933
934 pub fn abort(&mut self) {
936 self.aborted = true;
937 }
938
939 pub fn progress(&self) -> f64 {
941 self.progress
942 }
943}
944
945pub struct Builder {
947 page_size: usize,
948 region_size: Option<u64>,
949 read_cache_size_bytes: usize,
950 write_cache_size_bytes: usize,
951 repair_callback: Box<dyn Fn(&mut RepairSession)>,
952}
953
954impl Builder {
955 #[allow(clippy::new_without_default)]
961 pub fn new() -> Self {
962 let mut result = Self {
963 page_size: PAGE_SIZE,
967 region_size: None,
968 read_cache_size_bytes: 0,
970 write_cache_size_bytes: 0,
972 repair_callback: Box::new(|_| {}),
973 };
974
975 result.set_cache_size(1024 * 1024 * 1024);
976 result
977 }
978
979 pub fn set_repair_callback(
987 &mut self,
988 callback: impl Fn(&mut RepairSession) + 'static,
989 ) -> &mut Self {
990 self.repair_callback = Box::new(callback);
991 self
992 }
993
994 #[cfg(any(fuzzing, test))]
1002 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1003 assert!(size.is_power_of_two());
1004 self.page_size = std::cmp::max(size, 512);
1005 self
1006 }
1007
1008 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1010 self.read_cache_size_bytes = bytes / 10 * 9;
1012 self.write_cache_size_bytes = bytes / 10;
1013 self
1014 }
1015
1016 #[cfg(any(test, fuzzing))]
1017 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1018 assert!(size.is_power_of_two());
1019 self.region_size = Some(size);
1020 self
1021 }
1022
1023 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1028 let file = OpenOptions::new()
1029 .read(true)
1030 .write(true)
1031 .create(true)
1032 .truncate(false)
1033 .open(path)?;
1034
1035 Database::new(
1036 Box::new(FileBackend::new(file)?),
1037 true,
1038 self.page_size,
1039 self.region_size,
1040 self.read_cache_size_bytes,
1041 self.write_cache_size_bytes,
1042 &self.repair_callback,
1043 )
1044 }
1045
1046 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1048 let file = OpenOptions::new().read(true).write(true).open(path)?;
1049
1050 Database::new(
1051 Box::new(FileBackend::new(file)?),
1052 false,
1053 self.page_size,
1054 None,
1055 self.read_cache_size_bytes,
1056 self.write_cache_size_bytes,
1057 &self.repair_callback,
1058 )
1059 }
1060
1061 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1065 Database::new(
1066 Box::new(FileBackend::new(file)?),
1067 true,
1068 self.page_size,
1069 self.region_size,
1070 self.read_cache_size_bytes,
1071 self.write_cache_size_bytes,
1072 &self.repair_callback,
1073 )
1074 }
1075
1076 pub fn create_with_backend(
1078 &self,
1079 backend: impl StorageBackend,
1080 ) -> Result<Database, DatabaseError> {
1081 Database::new(
1082 Box::new(backend),
1083 true,
1084 self.page_size,
1085 self.region_size,
1086 self.read_cache_size_bytes,
1087 self.write_cache_size_bytes,
1088 &self.repair_callback,
1089 )
1090 }
1091}
1092
1093impl std::fmt::Debug for Database {
1094 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1095 f.debug_struct("Database").finish()
1096 }
1097}
1098
1099#[cfg(test)]
1100mod test {
1101 use crate::backends::FileBackend;
1102 use crate::{
1103 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1104 StorageError, TableDefinition, TransactionError,
1105 };
1106 use std::fs::File;
1107 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1108 use std::sync::atomic::{AtomicU64, Ordering};
1109 use std::sync::Arc;
1110
1111 #[derive(Debug)]
1112 struct FailingBackend {
1113 inner: FileBackend,
1114 countdown: Arc<AtomicU64>,
1115 }
1116
1117 impl FailingBackend {
1118 fn new(backend: FileBackend, countdown: u64) -> Self {
1119 Self {
1120 inner: backend,
1121 countdown: Arc::new(AtomicU64::new(countdown)),
1122 }
1123 }
1124
1125 fn check_countdown(&self) -> Result<(), std::io::Error> {
1126 if self.countdown.load(Ordering::SeqCst) == 0 {
1127 return Err(std::io::Error::from(ErrorKind::Other));
1128 }
1129
1130 Ok(())
1131 }
1132
1133 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1134 if self
1135 .countdown
1136 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1137 if x > 0 {
1138 Some(x - 1)
1139 } else {
1140 None
1141 }
1142 })
1143 .is_err()
1144 {
1145 return Err(std::io::Error::from(ErrorKind::Other));
1146 }
1147
1148 Ok(())
1149 }
1150 }
1151
1152 impl StorageBackend for FailingBackend {
1153 fn len(&self) -> Result<u64, std::io::Error> {
1154 self.inner.len()
1155 }
1156
1157 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1158 self.check_countdown()?;
1159 self.inner.read(offset, len)
1160 }
1161
1162 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1163 self.inner.set_len(len)
1164 }
1165
1166 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1167 self.check_countdown()?;
1168 self.inner.sync_data(eventual)
1169 }
1170
1171 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1172 self.decrement_countdown()?;
1173 self.inner.write(offset, data)
1174 }
1175 }
1176
1177 #[test]
1178 fn crash_regression4() {
1179 let tmpfile = crate::create_tempfile();
1180 let (file, path) = tmpfile.into_parts();
1181
1182 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1183 let db = Database::builder()
1184 .set_cache_size(12686)
1185 .set_page_size(8 * 1024)
1186 .set_region_size(32 * 4096)
1187 .create_with_backend(backend)
1188 .unwrap();
1189
1190 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1191
1192 let tx = db.begin_write().unwrap();
1193 let _savepoint = tx.ephemeral_savepoint().unwrap();
1194 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1195 tx.commit().unwrap();
1196 let tx = db.begin_write().unwrap();
1197 {
1198 let mut table = tx.open_table(table_def).unwrap();
1199 let _ = table.insert_reserve(118821, 360).unwrap();
1200 }
1201 let result = tx.commit();
1202 assert!(result.is_err());
1203
1204 drop(db);
1205 Database::builder()
1206 .set_cache_size(1024 * 1024)
1207 .set_page_size(8 * 1024)
1208 .set_region_size(32 * 4096)
1209 .create(&path)
1210 .unwrap();
1211 }
1212
1213 #[test]
1214 fn transient_io_error() {
1215 let tmpfile = crate::create_tempfile();
1216 let (file, path) = tmpfile.into_parts();
1217
1218 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1219 let countdown = backend.countdown.clone();
1220 let db = Database::builder()
1221 .set_cache_size(0)
1222 .create_with_backend(backend)
1223 .unwrap();
1224
1225 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1226
1227 let tx = db.begin_write().unwrap();
1229 {
1230 let mut table = tx.open_table(table_def).unwrap();
1231 table.insert(0, 0).unwrap();
1232 }
1233 tx.commit().unwrap();
1234 let tx = db.begin_write().unwrap();
1235 {
1236 let mut table = tx.open_table(table_def).unwrap();
1237 table.insert(0, 1).unwrap();
1238 }
1239 tx.commit().unwrap();
1240
1241 let tx = db.begin_write().unwrap();
1242 countdown.store(0, Ordering::SeqCst);
1244 let result = tx.commit().err().unwrap();
1245 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1246 let result = db.begin_write().err().unwrap();
1247 assert!(matches!(
1248 result,
1249 TransactionError::Storage(StorageError::PreviousIo)
1250 ));
1251 countdown.store(u64::MAX, Ordering::SeqCst);
1253 drop(db);
1254
1255 let mut file = File::open(&path).unwrap();
1257 file.seek(SeekFrom::Start(9)).unwrap();
1258 let mut god_byte = vec![0u8];
1259 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1260 assert_ne!(god_byte[0] & 2, 0);
1261 }
1262
1263 #[test]
1264 fn small_pages() {
1265 let tmpfile = crate::create_tempfile();
1266
1267 let db = Database::builder()
1268 .set_page_size(512)
1269 .create(tmpfile.path())
1270 .unwrap();
1271
1272 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1273 let txn = db.begin_write().unwrap();
1274 {
1275 txn.open_table(table_definition).unwrap();
1276 }
1277 txn.commit().unwrap();
1278 }
1279
1280 #[test]
1281 fn small_pages2() {
1282 let tmpfile = crate::create_tempfile();
1283
1284 let db = Database::builder()
1285 .set_page_size(512)
1286 .create(tmpfile.path())
1287 .unwrap();
1288
1289 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1290
1291 let mut tx = db.begin_write().unwrap();
1292 tx.set_two_phase_commit(true);
1293 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1294 {
1295 tx.open_table(table_def).unwrap();
1296 }
1297 tx.commit().unwrap();
1298
1299 let mut tx = db.begin_write().unwrap();
1300 tx.set_two_phase_commit(true);
1301 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1302 tx.restore_savepoint(&savepoint0).unwrap();
1303 tx.set_durability(Durability::None);
1304 {
1305 let mut t = tx.open_table(table_def).unwrap();
1306 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1307 assert!(t.remove(&291295).unwrap().is_none());
1308 }
1309 tx.commit().unwrap();
1310
1311 let mut tx = db.begin_write().unwrap();
1312 tx.set_two_phase_commit(true);
1313 tx.restore_savepoint(&savepoint0).unwrap();
1314 {
1315 tx.open_table(table_def).unwrap();
1316 }
1317 tx.commit().unwrap();
1318
1319 let mut tx = db.begin_write().unwrap();
1320 tx.set_two_phase_commit(true);
1321 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1322 drop(savepoint0);
1323 tx.restore_savepoint(&savepoint2).unwrap();
1324 {
1325 let mut t = tx.open_table(table_def).unwrap();
1326 assert!(t.get(&2059).unwrap().is_none());
1327 assert!(t.remove(&145227).unwrap().is_none());
1328 assert!(t.remove(&145227).unwrap().is_none());
1329 }
1330 tx.commit().unwrap();
1331
1332 let mut tx = db.begin_write().unwrap();
1333 tx.set_two_phase_commit(true);
1334 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1335 drop(savepoint1);
1336 tx.restore_savepoint(&savepoint3).unwrap();
1337 {
1338 tx.open_table(table_def).unwrap();
1339 }
1340 tx.commit().unwrap();
1341
1342 let mut tx = db.begin_write().unwrap();
1343 tx.set_two_phase_commit(true);
1344 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1345 drop(savepoint2);
1346 tx.restore_savepoint(&savepoint3).unwrap();
1347 tx.set_durability(Durability::None);
1348 {
1349 let mut t = tx.open_table(table_def).unwrap();
1350 assert!(t.remove(&207936).unwrap().is_none());
1351 }
1352 tx.abort().unwrap();
1353
1354 let mut tx = db.begin_write().unwrap();
1355 tx.set_two_phase_commit(true);
1356 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1357 drop(savepoint3);
1358 assert!(tx.restore_savepoint(&savepoint4).is_err());
1359 {
1360 tx.open_table(table_def).unwrap();
1361 }
1362 tx.commit().unwrap();
1363
1364 let mut tx = db.begin_write().unwrap();
1365 tx.set_two_phase_commit(true);
1366 tx.restore_savepoint(&savepoint5).unwrap();
1367 tx.set_durability(Durability::None);
1368 {
1369 tx.open_table(table_def).unwrap();
1370 }
1371 tx.commit().unwrap();
1372 }
1373
1374 #[test]
1375 fn small_pages3() {
1376 let tmpfile = crate::create_tempfile();
1377
1378 let db = Database::builder()
1379 .set_page_size(1024)
1380 .create(tmpfile.path())
1381 .unwrap();
1382
1383 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1384
1385 let mut tx = db.begin_write().unwrap();
1386 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1387 tx.set_durability(Durability::None);
1388 {
1389 let mut t = tx.open_table(table_def).unwrap();
1390 let value = vec![0; 306];
1391 t.insert(&539717, value.as_slice()).unwrap();
1392 }
1393 tx.abort().unwrap();
1394
1395 let mut tx = db.begin_write().unwrap();
1396 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1397 tx.restore_savepoint(&savepoint1).unwrap();
1398 tx.set_durability(Durability::None);
1399 {
1400 let mut t = tx.open_table(table_def).unwrap();
1401 let value = vec![0; 2008];
1402 t.insert(&784384, value.as_slice()).unwrap();
1403 }
1404 tx.abort().unwrap();
1405 }
1406
1407 #[test]
1408 fn small_pages4() {
1409 let tmpfile = crate::create_tempfile();
1410
1411 let db = Database::builder()
1412 .set_cache_size(1024 * 1024)
1413 .set_page_size(1024)
1414 .create(tmpfile.path())
1415 .unwrap();
1416
1417 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1418
1419 let tx = db.begin_write().unwrap();
1420 {
1421 tx.open_table(table_def).unwrap();
1422 }
1423 tx.commit().unwrap();
1424
1425 let tx = db.begin_write().unwrap();
1426 {
1427 let mut t = tx.open_table(table_def).unwrap();
1428 assert!(t.get(&131072).unwrap().is_none());
1429 let value = vec![0xFF; 1130];
1430 t.insert(&42394, value.as_slice()).unwrap();
1431 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1432 assert!(t.get(&0).unwrap().is_none());
1433 }
1434 tx.abort().unwrap();
1435
1436 let tx = db.begin_write().unwrap();
1437 {
1438 let mut t = tx.open_table(table_def).unwrap();
1439 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1440 }
1441 tx.abort().unwrap();
1442 }
1443
1444 #[test]
1445 fn dynamic_shrink() {
1446 let tmpfile = crate::create_tempfile();
1447 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1448 let big_value = vec![0u8; 1024];
1449
1450 let db = Database::builder()
1451 .set_region_size(1024 * 1024)
1452 .create(tmpfile.path())
1453 .unwrap();
1454
1455 let txn = db.begin_write().unwrap();
1456 {
1457 let mut table = txn.open_table(table_definition).unwrap();
1458 for i in 0..2048 {
1459 table.insert(&i, big_value.as_slice()).unwrap();
1460 }
1461 }
1462 txn.commit().unwrap();
1463
1464 let file_size = tmpfile.as_file().metadata().unwrap().len();
1465
1466 let txn = db.begin_write().unwrap();
1467 {
1468 let mut table = txn.open_table(table_definition).unwrap();
1469 for i in 0..2048 {
1470 table.remove(&i).unwrap();
1471 }
1472 }
1473 txn.commit().unwrap();
1474
1475 let txn = db.begin_write().unwrap();
1477 {
1478 let mut table = txn.open_table(table_definition).unwrap();
1479 table.insert(0, [].as_slice()).unwrap();
1480 }
1481 txn.commit().unwrap();
1482 let txn = db.begin_write().unwrap();
1483 {
1484 let mut table = txn.open_table(table_definition).unwrap();
1485 table.remove(0).unwrap();
1486 }
1487 txn.commit().unwrap();
1488 let txn = db.begin_write().unwrap();
1489 txn.commit().unwrap();
1490
1491 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1492 assert!(final_file_size < file_size);
1493 }
1494
1495 #[test]
1496 fn create_new_db_in_empty_file() {
1497 let tmpfile = crate::create_tempfile();
1498
1499 let _db = Database::builder()
1500 .create_file(tmpfile.into_file())
1501 .unwrap();
1502 }
1503
1504 #[test]
1505 fn open_missing_file() {
1506 let tmpfile = crate::create_tempfile();
1507
1508 let err = Database::builder()
1509 .open(tmpfile.path().with_extension("missing"))
1510 .unwrap_err();
1511
1512 match err {
1513 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1514 err => panic!("Unexpected error for empty file: {err}"),
1515 }
1516 }
1517
1518 #[test]
1519 fn open_empty_file() {
1520 let tmpfile = crate::create_tempfile();
1521
1522 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1523
1524 match err {
1525 DatabaseError::Storage(StorageError::Io(err))
1526 if err.kind() == ErrorKind::InvalidData => {}
1527 err => panic!("Unexpected error for empty file: {err}"),
1528 }
1529 }
1530}