1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3 BtreeHeader, InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, ReadOnlyBackend,
4 ShrinkPolicy, TableTree, TableType, TransactionalMemory,
5};
6use crate::types::{Key, Value};
7use crate::{
8 CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22 ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
23 DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
24 TransactionIdWithPagination,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31pub trait StorageBackend: 'static + Debug + Send + Sync {
33 fn len(&self) -> std::result::Result<u64, io::Error>;
35
36 fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46 fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52 fn close(&self) -> std::result::Result<(), io::Error> {
57 Ok(())
58 }
59}
60
61pub trait TableHandle: Sealed {
62 fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68 name: String,
69}
70
71impl UntypedTableHandle {
72 pub(crate) fn new(name: String) -> Self {
73 Self { name }
74 }
75}
76
77impl TableHandle for UntypedTableHandle {
78 fn name(&self) -> &str {
79 &self.name
80 }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86 fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92 name: String,
93}
94
95impl UntypedMultimapTableHandle {
96 pub(crate) fn new(name: String) -> Self {
97 Self { name }
98 }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102 fn name(&self) -> &str {
103 &self.name
104 }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116 name: &'a str,
117 _key_type: PhantomData<K>,
118 _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122 pub const fn new(name: &'a str) -> Self {
128 assert!(!name.is_empty());
129 Self {
130 name,
131 _key_type: PhantomData,
132 _value_type: PhantomData,
133 }
134 }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138 fn name(&self) -> &str {
139 self.name
140 }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146 fn clone(&self) -> Self {
147 *self
148 }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 write!(
156 f,
157 "{}<{}, {}>",
158 self.name,
159 K::type_name().name(),
160 V::type_name().name()
161 )
162 }
163}
164
165pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174 name: &'a str,
175 _key_type: PhantomData<K>,
176 _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180 pub const fn new(name: &'a str) -> Self {
181 assert!(!name.is_empty());
182 Self {
183 name,
184 _key_type: PhantomData,
185 _value_type: PhantomData,
186 }
187 }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191 fn name(&self) -> &str {
192 self.name
193 }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199 fn clone(&self) -> Self {
200 *self
201 }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208 write!(
209 f,
210 "{}<{}, {}>",
211 self.name,
212 K::type_name().name(),
213 V::type_name().name()
214 )
215 }
216}
217
218#[derive(Debug)]
222pub struct CacheStats {
223 pub(crate) evictions: u64,
224}
225
226impl CacheStats {
227 pub fn evictions(&self) -> u64 {
231 self.evictions
232 }
233}
234
235pub(crate) struct TransactionGuard {
236 transaction_tracker: Option<Arc<TransactionTracker>>,
237 transaction_id: Option<TransactionId>,
238 write_transaction: bool,
239}
240
241impl TransactionGuard {
242 pub(crate) fn new_read(
243 transaction_id: TransactionId,
244 tracker: Arc<TransactionTracker>,
245 ) -> Self {
246 Self {
247 transaction_tracker: Some(tracker),
248 transaction_id: Some(transaction_id),
249 write_transaction: false,
250 }
251 }
252
253 pub(crate) fn new_write(
254 transaction_id: TransactionId,
255 tracker: Arc<TransactionTracker>,
256 ) -> Self {
257 Self {
258 transaction_tracker: Some(tracker),
259 transaction_id: Some(transaction_id),
260 write_transaction: true,
261 }
262 }
263
264 pub(crate) fn fake() -> Self {
266 Self {
267 transaction_tracker: None,
268 transaction_id: None,
269 write_transaction: false,
270 }
271 }
272
273 pub(crate) fn id(&self) -> TransactionId {
274 self.transaction_id.unwrap()
275 }
276
277 pub(crate) fn leak(mut self) -> TransactionId {
278 self.transaction_id.take().unwrap()
279 }
280}
281
282impl Drop for TransactionGuard {
283 fn drop(&mut self) {
284 if self.transaction_tracker.is_none() {
285 return;
286 }
287 if let Some(transaction_id) = self.transaction_id {
288 if self.write_transaction {
289 self.transaction_tracker
290 .as_ref()
291 .unwrap()
292 .end_write_transaction(transaction_id);
293 } else {
294 self.transaction_tracker
295 .as_ref()
296 .unwrap()
297 .deallocate_read_transaction(transaction_id);
298 }
299 }
300 }
301}
302
303pub trait ReadableDatabase {
304 fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
312
313 fn cache_stats(&self) -> CacheStats;
317}
318
319pub struct ReadOnlyDatabase {
360 mem: Arc<TransactionalMemory>,
361 transaction_tracker: Arc<TransactionTracker>,
362}
363
364impl ReadableDatabase for ReadOnlyDatabase {
365 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
366 let id = self
367 .transaction_tracker
368 .register_read_transaction(&self.mem)?;
369 #[cfg(feature = "logging")]
370 debug!("Beginning read transaction id={id:?}");
371
372 let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
373
374 ReadTransaction::new(self.mem.clone(), guard)
375 }
376
377 fn cache_stats(&self) -> CacheStats {
378 self.mem.cache_stats()
379 }
380}
381
382impl ReadOnlyDatabase {
383 pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
385 Builder::new().open_read_only(path)
386 }
387
388 fn new(
389 file: Box<dyn StorageBackend>,
390 page_size: usize,
391 region_size: Option<u64>,
392 read_cache_size_bytes: usize,
393 ) -> Result<Self, DatabaseError> {
394 #[cfg(feature = "logging")]
395 let file_path = format!("{:?}", &file);
396 #[cfg(feature = "logging")]
397 info!("Opening database in read-only {:?}", &file_path);
398 let mem = TransactionalMemory::new(
399 Box::new(ReadOnlyBackend::new(file)),
400 false,
401 page_size,
402 region_size,
403 read_cache_size_bytes,
404 0,
405 true,
406 )?;
407 let mem = Arc::new(mem);
408 if let Some(tree) = Database::get_allocator_state_table(&mem)? {
411 mem.load_allocator_state(&tree)?;
412 } else {
413 #[cfg(feature = "logging")]
414 warn!(
415 "Database {:?} not shutdown cleanly. Repair required",
416 &file_path
417 );
418 return Err(DatabaseError::RepairAborted);
419 }
420
421 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
422 let db = Self {
423 mem,
424 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
425 };
426
427 Ok(db)
428 }
429}
430
431pub struct Database {
465 mem: Arc<TransactionalMemory>,
466 transaction_tracker: Arc<TransactionTracker>,
467}
468
469impl ReadableDatabase for Database {
470 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
471 let guard = self.allocate_read_transaction()?;
472 #[cfg(feature = "logging")]
473 debug!("Beginning read transaction id={:?}", guard.id());
474 ReadTransaction::new(self.get_memory(), guard)
475 }
476
477 fn cache_stats(&self) -> CacheStats {
478 self.mem.cache_stats()
479 }
480}
481
482impl Database {
483 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
488 Self::builder().create(path)
489 }
490
491 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
493 Self::builder().open(path)
494 }
495
496 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
497 self.mem.clone()
498 }
499
500 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
501 let table_tree = TableTree::new(
502 mem.get_data_root(),
503 PageHint::None,
504 Arc::new(TransactionGuard::fake()),
505 mem.clone(),
506 )?;
507 if !table_tree.verify_checksums()? {
508 return Ok(false);
509 }
510 let system_table_tree = TableTree::new(
511 mem.get_system_root(),
512 PageHint::None,
513 Arc::new(TransactionGuard::fake()),
514 mem.clone(),
515 )?;
516 if !system_table_tree.verify_checksums()? {
517 return Ok(false);
518 }
519
520 Ok(true)
521 }
522
523 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
533 let allocator_hash = self.mem.allocator_hash();
534 let mut was_clean = Arc::get_mut(&mut self.mem)
535 .unwrap()
536 .clear_cache_and_reload()?;
537
538 let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
539
540 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
541 DatabaseError::Storage(storage_err) => storage_err,
542 _ => unreachable!(),
543 })?;
544
545 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
546 was_clean = false;
547 }
548
549 if !was_clean {
550 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
551 let [data_root, system_root] = new_roots;
552 self.mem.commit(
553 data_root,
554 system_root,
555 next_transaction_id,
556 true,
557 ShrinkPolicy::Never,
558 )?;
559 }
560
561 self.mem.begin_writable()?;
562
563 Ok(was_clean)
564 }
565
566 pub fn compact(&mut self) -> Result<bool, CompactionError> {
570 if self
571 .transaction_tracker
572 .oldest_live_read_transaction()
573 .is_some()
574 {
575 return Err(CompactionError::TransactionInProgress);
576 }
577 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
582 if txn.list_persistent_savepoints()?.next().is_some() {
583 return Err(CompactionError::PersistentSavepointExists);
584 }
585 if self.transaction_tracker.any_savepoint_exists() {
586 return Err(CompactionError::EphemeralSavepointExists);
587 }
588 txn.set_two_phase_commit(true);
589 txn.commit().map_err(|e| e.into_storage_error())?;
590 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
592 txn.set_two_phase_commit(true);
593 txn.commit().map_err(|e| e.into_storage_error())?;
594 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
597 assert!(!txn.pending_free_pages()?);
598 txn.abort()?;
599
600 let mut compacted = false;
601 loop {
603 let mut progress = false;
604
605 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
606 if txn.compact_pages()? {
607 progress = true;
608 txn.commit().map_err(|e| e.into_storage_error())?;
609 } else {
610 txn.abort()?;
611 }
612
613 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
615 txn.set_two_phase_commit(true);
616 txn.set_shrink_policy(ShrinkPolicy::Maximum);
618 txn.commit().map_err(|e| e.into_storage_error())?;
619 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
623 txn.set_two_phase_commit(true);
624 txn.set_shrink_policy(ShrinkPolicy::Maximum);
626 txn.commit().map_err(|e| e.into_storage_error())?;
627 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
628 assert!(!txn.pending_free_pages()?);
629 txn.abort()?;
630
631 if !progress {
632 break;
633 }
634
635 compacted = true;
636 }
637
638 Ok(compacted)
639 }
640
641 #[cfg_attr(not(debug_assertions), expect(dead_code))]
642 fn check_repaired_allocated_pages_table(
643 system_root: Option<BtreeHeader>,
644 mem: Arc<TransactionalMemory>,
645 ) -> Result {
646 let table_tree = TableTree::new(
647 system_root,
648 PageHint::None,
649 Arc::new(TransactionGuard::fake()),
650 mem.clone(),
651 )?;
652 if let Some(table_def) = table_tree
653 .get_table::<TransactionIdWithPagination, PageList>(
654 DATA_ALLOCATED_TABLE.name(),
655 TableType::Normal,
656 )
657 .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
658 {
659 let InternalTableDefinition::Normal { table_root, .. } = table_def else {
660 unreachable!()
661 };
662 let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
663 DATA_ALLOCATED_TABLE.name().to_string(),
664 table_root,
665 PageHint::None,
666 Arc::new(TransactionGuard::fake()),
667 mem.clone(),
668 )?;
669 for result in table.range::<TransactionIdWithPagination>(..)? {
670 let (_, pages) = result?;
671 for i in 0..pages.value().len() {
672 assert!(mem.is_allocated(pages.value().get(i)));
673 }
674 }
675 }
676
677 Ok(())
678 }
679
680 fn visit_freed_tree<K: Key, V: Value, F>(
681 system_root: Option<BtreeHeader>,
682 table_def: SystemTableDefinition<K, V>,
683 mem: Arc<TransactionalMemory>,
684 mut visitor: F,
685 ) -> Result
686 where
687 F: FnMut(PageNumber) -> Result,
688 {
689 let fake_guard = Arc::new(TransactionGuard::fake());
690 let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
691 let table_name = table_def.name();
692 let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
693 Ok(result) => result,
694 Err(TableError::Storage(err)) => {
695 return Err(err);
696 }
697 Err(TableError::TableDoesNotExist(_)) => {
698 return Ok(());
699 }
700 Err(_) => {
701 return Err(StorageError::Corrupted(format!(
702 "Unable to open {table_name}"
703 )));
704 }
705 };
706
707 if let Some(definition) = result {
708 let table_root = match definition {
709 InternalTableDefinition::Normal { table_root, .. } => table_root,
710 InternalTableDefinition::Multimap { .. } => unreachable!(),
711 };
712 let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
713 ReadOnlyTable::new(
714 table_name.to_string(),
715 table_root,
716 PageHint::None,
717 Arc::new(TransactionGuard::fake()),
718 mem.clone(),
719 )?;
720 for result in table.range::<TransactionIdWithPagination>(..)? {
721 let (_, page_list) = result?;
722 for i in 0..page_list.value().len() {
723 visitor(page_list.value().get(i))?;
724 }
725 }
726 }
727
728 Ok(())
729 }
730
731 #[cfg(debug_assertions)]
732 fn mark_allocated_page_for_debug(
733 mem: &mut Arc<TransactionalMemory>, ) -> Result {
735 let data_root = mem.get_data_root();
736 {
737 let fake = Arc::new(TransactionGuard::fake());
738 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
739 tables.visit_all_pages(|path| {
740 mem.mark_debug_allocated_page(path.page_number());
741 Ok(())
742 })?;
743 }
744
745 let system_root = mem.get_system_root();
746 {
747 let fake = Arc::new(TransactionGuard::fake());
748 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
749 system_tables.visit_all_pages(|path| {
750 mem.mark_debug_allocated_page(path.page_number());
751 Ok(())
752 })?;
753 }
754
755 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
756 mem.mark_debug_allocated_page(page);
757 Ok(())
758 })?;
759 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
760 mem.mark_debug_allocated_page(page);
761 Ok(())
762 })?;
763
764 Ok(())
765 }
766
767 fn do_repair(
768 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
770 ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
771 if !Self::verify_primary_checksums(mem.clone())? {
772 if mem.used_two_phase_commit() {
773 return Err(DatabaseError::Storage(StorageError::Corrupted(
774 "Primary is corrupted despite 2-phase commit".to_string(),
775 )));
776 }
777
778 let mut handle = RepairSession::new(0.3);
780 repair_callback(&mut handle);
781 if handle.aborted() {
782 return Err(DatabaseError::RepairAborted);
783 }
784
785 mem.repair_primary_corrupted();
786 mem.clear_read_cache();
790 if !Self::verify_primary_checksums(mem.clone())? {
791 return Err(DatabaseError::Storage(StorageError::Corrupted(
792 "Failed to repair database. All roots are corrupted".to_string(),
793 )));
794 }
795 }
796 let mut handle = RepairSession::new(0.6);
798 repair_callback(&mut handle);
799 if handle.aborted() {
800 return Err(DatabaseError::RepairAborted);
801 }
802
803 mem.begin_repair()?;
804
805 let data_root = mem.get_data_root();
806 {
807 let fake = Arc::new(TransactionGuard::fake());
808 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
809 tables.visit_all_pages(|path| {
810 mem.mark_page_allocated(path.page_number());
811 Ok(())
812 })?;
813 }
814
815 let mut handle = RepairSession::new(0.9);
817 repair_callback(&mut handle);
818 if handle.aborted() {
819 return Err(DatabaseError::RepairAborted);
820 }
821
822 let system_root = mem.get_system_root();
823 {
824 let fake = Arc::new(TransactionGuard::fake());
825 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
826 system_tables.visit_all_pages(|path| {
827 mem.mark_page_allocated(path.page_number());
828 Ok(())
829 })?;
830 }
831
832 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
833 mem.mark_page_allocated(page);
834 Ok(())
835 })?;
836 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
837 mem.mark_page_allocated(page);
838 Ok(())
839 })?;
840 #[cfg(debug_assertions)]
841 {
842 Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
843 }
844
845 mem.end_repair()?;
846
847 mem.clear_read_cache();
850
851 Ok([data_root, system_root])
852 }
853
854 #[allow(clippy::too_many_arguments)]
855 fn new(
856 file: Box<dyn StorageBackend>,
857 allow_initialize: bool,
858 page_size: usize,
859 region_size: Option<u64>,
860 read_cache_size_bytes: usize,
861 write_cache_size_bytes: usize,
862 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
863 ) -> Result<Self, DatabaseError> {
864 #[cfg(feature = "logging")]
865 let file_path = format!("{:?}", &file);
866 #[cfg(feature = "logging")]
867 info!("Opening database {:?}", &file_path);
868 let mem = TransactionalMemory::new(
869 file,
870 allow_initialize,
871 page_size,
872 region_size,
873 read_cache_size_bytes,
874 write_cache_size_bytes,
875 false,
876 )?;
877 let mut mem = Arc::new(mem);
878 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
881 #[cfg(feature = "logging")]
882 info!("Found valid allocator state, full repair not needed");
883 mem.load_allocator_state(&tree)?;
884 #[cfg(debug_assertions)]
885 Self::mark_allocated_page_for_debug(&mut mem)?;
886 } else {
887 #[cfg(feature = "logging")]
888 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
889 let mut handle = RepairSession::new(0.0);
890 repair_callback(&mut handle);
891 if handle.aborted() {
892 return Err(DatabaseError::RepairAborted);
893 }
894 let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
895 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
896 mem.commit(
897 data_root,
898 system_root,
899 next_transaction_id,
900 true,
901 ShrinkPolicy::Never,
902 )?;
903 }
904
905 mem.begin_writable()?;
906 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
907
908 let db = Database {
909 mem,
910 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
911 };
912
913 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
915 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
916 db.transaction_tracker
917 .restore_savepoint_counter_state(next_id);
918 }
919 for id in txn.list_persistent_savepoints()? {
920 let savepoint = match txn.get_persistent_savepoint(id) {
921 Ok(savepoint) => savepoint,
922 Err(err) => match err {
923 SavepointError::InvalidSavepoint => unreachable!(),
924 SavepointError::Storage(storage) => {
925 return Err(storage.into());
926 }
927 },
928 };
929 db.transaction_tracker
930 .register_persistent_savepoint(&savepoint);
931 }
932 txn.abort()?;
933
934 Ok(db)
935 }
936
937 fn get_allocator_state_table(
938 mem: &Arc<TransactionalMemory>,
939 ) -> Result<Option<AllocatorStateTree>> {
940 if !mem.used_two_phase_commit() {
942 return Ok(None);
943 }
944
945 let system_table_tree = TableTree::new(
947 mem.get_system_root(),
948 PageHint::None,
949 Arc::new(TransactionGuard::fake()),
950 mem.clone(),
951 )?;
952 let Some(allocator_state_table) = system_table_tree
953 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
954 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
955 else {
956 return Ok(None);
957 };
958
959 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
961 unreachable!();
962 };
963 let tree = AllocatorStateTree::new(
964 table_root,
965 PageHint::None,
966 Arc::new(TransactionGuard::fake()),
967 mem.clone(),
968 )?;
969
970 if !mem.is_valid_allocator_state(&tree)? {
972 return Ok(None);
973 }
974
975 Ok(Some(tree))
976 }
977
978 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
979 let id = self
980 .transaction_tracker
981 .register_read_transaction(&self.mem)?;
982
983 Ok(TransactionGuard::new_read(
984 id,
985 self.transaction_tracker.clone(),
986 ))
987 }
988
989 pub fn builder() -> Builder {
991 Builder::new()
992 }
993
994 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1000 self.mem.check_io_errors()?;
1002 let guard = TransactionGuard::new_write(
1003 self.transaction_tracker.start_write_transaction(),
1004 self.transaction_tracker.clone(),
1005 );
1006 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1007 .map_err(|e| e.into())
1008 }
1009
1010 fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1011 #[cfg(feature = "logging")]
1013 debug!("Writing allocator state table");
1014 let mut tx = self.begin_write()?;
1015 tx.set_quick_repair(true);
1016 tx.set_shrink_policy(ShrinkPolicy::Maximum);
1017 tx.commit()?;
1018
1019 Ok(())
1020 }
1021}
1022
1023impl Drop for Database {
1024 fn drop(&mut self) {
1025 if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1026 #[cfg(feature = "logging")]
1027 warn!("Failed to write allocator state table. Repair may be required at restart.");
1028 }
1029
1030 if self.mem.close().is_err() {
1031 #[cfg(feature = "logging")]
1032 warn!("Failed to flush database file. Repair may be required at restart.");
1033 }
1034 }
1035}
1036
1037pub struct RepairSession {
1038 progress: f64,
1039 aborted: bool,
1040}
1041
1042impl RepairSession {
1043 pub(crate) fn new(progress: f64) -> Self {
1044 Self {
1045 progress,
1046 aborted: false,
1047 }
1048 }
1049
1050 pub(crate) fn aborted(&self) -> bool {
1051 self.aborted
1052 }
1053
1054 pub fn abort(&mut self) {
1056 self.aborted = true;
1057 }
1058
1059 pub fn progress(&self) -> f64 {
1061 self.progress
1062 }
1063}
1064
1065pub struct Builder {
1067 page_size: usize,
1068 region_size: Option<u64>,
1069 read_cache_size_bytes: usize,
1070 write_cache_size_bytes: usize,
1071 repair_callback: Box<dyn Fn(&mut RepairSession)>,
1072}
1073
1074impl Builder {
1075 #[allow(clippy::new_without_default)]
1081 pub fn new() -> Self {
1082 let mut result = Self {
1083 page_size: PAGE_SIZE,
1087 region_size: None,
1088 read_cache_size_bytes: 0,
1090 write_cache_size_bytes: 0,
1092 repair_callback: Box::new(|_| {}),
1093 };
1094
1095 result.set_cache_size(1024 * 1024 * 1024);
1096 result
1097 }
1098
1099 pub fn set_repair_callback(
1107 &mut self,
1108 callback: impl Fn(&mut RepairSession) + 'static,
1109 ) -> &mut Self {
1110 self.repair_callback = Box::new(callback);
1111 self
1112 }
1113
1114 #[cfg(any(fuzzing, test))]
1122 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1123 assert!(size.is_power_of_two());
1124 self.page_size = std::cmp::max(size, 512);
1125 self
1126 }
1127
1128 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1130 self.read_cache_size_bytes = bytes / 10 * 9;
1132 self.write_cache_size_bytes = bytes / 10;
1133 self
1134 }
1135
1136 #[cfg(any(test, fuzzing))]
1137 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1138 assert!(size.is_power_of_two());
1139 self.region_size = Some(size);
1140 self
1141 }
1142
1143 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1148 let file = OpenOptions::new()
1149 .read(true)
1150 .write(true)
1151 .create(true)
1152 .truncate(false)
1153 .open(path)?;
1154
1155 Database::new(
1156 Box::new(FileBackend::new(file)?),
1157 true,
1158 self.page_size,
1159 self.region_size,
1160 self.read_cache_size_bytes,
1161 self.write_cache_size_bytes,
1162 &self.repair_callback,
1163 )
1164 }
1165
1166 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1168 let file = OpenOptions::new().read(true).write(true).open(path)?;
1169
1170 Database::new(
1171 Box::new(FileBackend::new(file)?),
1172 false,
1173 self.page_size,
1174 None,
1175 self.read_cache_size_bytes,
1176 self.write_cache_size_bytes,
1177 &self.repair_callback,
1178 )
1179 }
1180
1181 pub fn open_read_only(
1187 &self,
1188 path: impl AsRef<Path>,
1189 ) -> Result<ReadOnlyDatabase, DatabaseError> {
1190 let file = OpenOptions::new().read(true).open(path)?;
1191
1192 ReadOnlyDatabase::new(
1193 Box::new(FileBackend::new_internal(file, true)?),
1194 self.page_size,
1195 None,
1196 self.read_cache_size_bytes,
1197 )
1198 }
1199
1200 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1204 Database::new(
1205 Box::new(FileBackend::new(file)?),
1206 true,
1207 self.page_size,
1208 self.region_size,
1209 self.read_cache_size_bytes,
1210 self.write_cache_size_bytes,
1211 &self.repair_callback,
1212 )
1213 }
1214
1215 pub fn create_with_backend(
1217 &self,
1218 backend: impl StorageBackend,
1219 ) -> Result<Database, DatabaseError> {
1220 Database::new(
1221 Box::new(backend),
1222 true,
1223 self.page_size,
1224 self.region_size,
1225 self.read_cache_size_bytes,
1226 self.write_cache_size_bytes,
1227 &self.repair_callback,
1228 )
1229 }
1230}
1231
1232impl std::fmt::Debug for Database {
1233 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1234 f.debug_struct("Database").finish()
1235 }
1236}
1237
1238#[cfg(test)]
1239mod test {
1240 use crate::backends::FileBackend;
1241 use crate::{
1242 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1243 StorageError, TableDefinition, TransactionError,
1244 };
1245 use std::fs::File;
1246 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1247 use std::sync::Arc;
1248 use std::sync::atomic::{AtomicU64, Ordering};
1249
1250 #[derive(Debug)]
1251 struct FailingBackend {
1252 inner: FileBackend,
1253 countdown: Arc<AtomicU64>,
1254 }
1255
1256 impl FailingBackend {
1257 fn new(backend: FileBackend, countdown: u64) -> Self {
1258 Self {
1259 inner: backend,
1260 countdown: Arc::new(AtomicU64::new(countdown)),
1261 }
1262 }
1263
1264 fn check_countdown(&self) -> Result<(), std::io::Error> {
1265 if self.countdown.load(Ordering::SeqCst) == 0 {
1266 return Err(std::io::Error::from(ErrorKind::Other));
1267 }
1268
1269 Ok(())
1270 }
1271
1272 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1273 if self
1274 .countdown
1275 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1276 if x > 0 { Some(x - 1) } else { None }
1277 })
1278 .is_err()
1279 {
1280 return Err(std::io::Error::from(ErrorKind::Other));
1281 }
1282
1283 Ok(())
1284 }
1285 }
1286
1287 impl StorageBackend for FailingBackend {
1288 fn len(&self) -> Result<u64, std::io::Error> {
1289 self.inner.len()
1290 }
1291
1292 fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1293 self.check_countdown()?;
1294 self.inner.read(offset, out)
1295 }
1296
1297 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1298 self.inner.set_len(len)
1299 }
1300
1301 fn sync_data(&self) -> Result<(), std::io::Error> {
1302 self.check_countdown()?;
1303 self.inner.sync_data()
1304 }
1305
1306 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1307 self.decrement_countdown()?;
1308 self.inner.write(offset, data)
1309 }
1310 }
1311
1312 #[test]
1313 fn crash_regression4() {
1314 let tmpfile = crate::create_tempfile();
1315 let (file, path) = tmpfile.into_parts();
1316
1317 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1318 let db = Database::builder()
1319 .set_cache_size(12686)
1320 .set_page_size(8 * 1024)
1321 .set_region_size(32 * 4096)
1322 .create_with_backend(backend)
1323 .unwrap();
1324
1325 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1326
1327 let tx = db.begin_write().unwrap();
1328 let _savepoint = tx.ephemeral_savepoint().unwrap();
1329 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1330 tx.commit().unwrap();
1331 let tx = db.begin_write().unwrap();
1332 {
1333 let mut table = tx.open_table(table_def).unwrap();
1334 let _ = table.insert_reserve(118821, 360).unwrap();
1335 }
1336 let result = tx.commit();
1337 assert!(result.is_err());
1338
1339 drop(db);
1340 Database::builder()
1341 .set_cache_size(1024 * 1024)
1342 .set_page_size(8 * 1024)
1343 .set_region_size(32 * 4096)
1344 .create(&path)
1345 .unwrap();
1346 }
1347
1348 #[test]
1349 fn transient_io_error() {
1350 let tmpfile = crate::create_tempfile();
1351 let (file, path) = tmpfile.into_parts();
1352
1353 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1354 let countdown = backend.countdown.clone();
1355 let db = Database::builder()
1356 .set_cache_size(0)
1357 .create_with_backend(backend)
1358 .unwrap();
1359
1360 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1361
1362 let tx = db.begin_write().unwrap();
1364 {
1365 let mut table = tx.open_table(table_def).unwrap();
1366 table.insert(0, 0).unwrap();
1367 }
1368 tx.commit().unwrap();
1369 let tx = db.begin_write().unwrap();
1370 {
1371 let mut table = tx.open_table(table_def).unwrap();
1372 table.insert(0, 1).unwrap();
1373 }
1374 tx.commit().unwrap();
1375
1376 let tx = db.begin_write().unwrap();
1377 countdown.store(0, Ordering::SeqCst);
1379 let result = tx.commit().err().unwrap();
1380 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1381 let result = db.begin_write().err().unwrap();
1382 assert!(matches!(
1383 result,
1384 TransactionError::Storage(StorageError::PreviousIo)
1385 ));
1386 countdown.store(u64::MAX, Ordering::SeqCst);
1388 drop(db);
1389
1390 let mut file = File::open(&path).unwrap();
1392 file.seek(SeekFrom::Start(9)).unwrap();
1393 let mut god_byte = vec![0u8];
1394 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1395 assert_ne!(god_byte[0] & 2, 0);
1396 }
1397
1398 #[test]
1399 fn small_pages() {
1400 let tmpfile = crate::create_tempfile();
1401
1402 let db = Database::builder()
1403 .set_page_size(512)
1404 .create(tmpfile.path())
1405 .unwrap();
1406
1407 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1408 let txn = db.begin_write().unwrap();
1409 {
1410 txn.open_table(table_definition).unwrap();
1411 }
1412 txn.commit().unwrap();
1413 }
1414
1415 #[test]
1416 fn small_pages2() {
1417 let tmpfile = crate::create_tempfile();
1418
1419 let db = Database::builder()
1420 .set_page_size(512)
1421 .create(tmpfile.path())
1422 .unwrap();
1423
1424 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1425
1426 let mut tx = db.begin_write().unwrap();
1427 tx.set_two_phase_commit(true);
1428 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1429 {
1430 tx.open_table(table_def).unwrap();
1431 }
1432 tx.commit().unwrap();
1433
1434 let mut tx = db.begin_write().unwrap();
1435 tx.set_two_phase_commit(true);
1436 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1437 tx.restore_savepoint(&savepoint0).unwrap();
1438 tx.set_durability(Durability::None).unwrap();
1439 {
1440 let mut t = tx.open_table(table_def).unwrap();
1441 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1442 assert!(t.remove(&291295).unwrap().is_none());
1443 }
1444 tx.commit().unwrap();
1445
1446 let mut tx = db.begin_write().unwrap();
1447 tx.set_two_phase_commit(true);
1448 tx.restore_savepoint(&savepoint0).unwrap();
1449 {
1450 tx.open_table(table_def).unwrap();
1451 }
1452 tx.commit().unwrap();
1453
1454 let mut tx = db.begin_write().unwrap();
1455 tx.set_two_phase_commit(true);
1456 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1457 drop(savepoint0);
1458 tx.restore_savepoint(&savepoint2).unwrap();
1459 {
1460 let mut t = tx.open_table(table_def).unwrap();
1461 assert!(t.get(&2059).unwrap().is_none());
1462 assert!(t.remove(&145227).unwrap().is_none());
1463 assert!(t.remove(&145227).unwrap().is_none());
1464 }
1465 tx.commit().unwrap();
1466
1467 let mut tx = db.begin_write().unwrap();
1468 tx.set_two_phase_commit(true);
1469 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1470 drop(savepoint1);
1471 tx.restore_savepoint(&savepoint3).unwrap();
1472 {
1473 tx.open_table(table_def).unwrap();
1474 }
1475 tx.commit().unwrap();
1476
1477 let mut tx = db.begin_write().unwrap();
1478 tx.set_two_phase_commit(true);
1479 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1480 drop(savepoint2);
1481 tx.restore_savepoint(&savepoint3).unwrap();
1482 tx.set_durability(Durability::None).unwrap();
1483 {
1484 let mut t = tx.open_table(table_def).unwrap();
1485 assert!(t.remove(&207936).unwrap().is_none());
1486 }
1487 tx.abort().unwrap();
1488
1489 let mut tx = db.begin_write().unwrap();
1490 tx.set_two_phase_commit(true);
1491 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1492 drop(savepoint3);
1493 assert!(tx.restore_savepoint(&savepoint4).is_err());
1494 {
1495 tx.open_table(table_def).unwrap();
1496 }
1497 tx.commit().unwrap();
1498
1499 let mut tx = db.begin_write().unwrap();
1500 tx.set_two_phase_commit(true);
1501 tx.restore_savepoint(&savepoint5).unwrap();
1502 tx.set_durability(Durability::None).unwrap();
1503 {
1504 tx.open_table(table_def).unwrap();
1505 }
1506 tx.commit().unwrap();
1507 }
1508
1509 #[test]
1510 fn small_pages3() {
1511 let tmpfile = crate::create_tempfile();
1512
1513 let db = Database::builder()
1514 .set_page_size(1024)
1515 .create(tmpfile.path())
1516 .unwrap();
1517
1518 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1519
1520 let mut tx = db.begin_write().unwrap();
1521 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1522 tx.set_durability(Durability::None).unwrap();
1523 {
1524 let mut t = tx.open_table(table_def).unwrap();
1525 let value = vec![0; 306];
1526 t.insert(&539717, value.as_slice()).unwrap();
1527 }
1528 tx.abort().unwrap();
1529
1530 let mut tx = db.begin_write().unwrap();
1531 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1532 tx.restore_savepoint(&savepoint1).unwrap();
1533 tx.set_durability(Durability::None).unwrap();
1534 {
1535 let mut t = tx.open_table(table_def).unwrap();
1536 let value = vec![0; 2008];
1537 t.insert(&784384, value.as_slice()).unwrap();
1538 }
1539 tx.abort().unwrap();
1540 }
1541
1542 #[test]
1543 fn small_pages4() {
1544 let tmpfile = crate::create_tempfile();
1545
1546 let db = Database::builder()
1547 .set_cache_size(1024 * 1024)
1548 .set_page_size(1024)
1549 .create(tmpfile.path())
1550 .unwrap();
1551
1552 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1553
1554 let tx = db.begin_write().unwrap();
1555 {
1556 tx.open_table(table_def).unwrap();
1557 }
1558 tx.commit().unwrap();
1559
1560 let tx = db.begin_write().unwrap();
1561 {
1562 let mut t = tx.open_table(table_def).unwrap();
1563 assert!(t.get(&131072).unwrap().is_none());
1564 let value = vec![0xFF; 1130];
1565 t.insert(&42394, value.as_slice()).unwrap();
1566 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1567 assert!(t.get(&0).unwrap().is_none());
1568 }
1569 tx.abort().unwrap();
1570
1571 let tx = db.begin_write().unwrap();
1572 {
1573 let mut t = tx.open_table(table_def).unwrap();
1574 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1575 }
1576 tx.abort().unwrap();
1577 }
1578
1579 #[test]
1580 fn dynamic_shrink() {
1581 let tmpfile = crate::create_tempfile();
1582 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1583 let big_value = vec![0u8; 1024];
1584
1585 let db = Database::builder()
1586 .set_region_size(1024 * 1024)
1587 .create(tmpfile.path())
1588 .unwrap();
1589
1590 let txn = db.begin_write().unwrap();
1591 {
1592 let mut table = txn.open_table(table_definition).unwrap();
1593 for i in 0..2048 {
1594 table.insert(&i, big_value.as_slice()).unwrap();
1595 }
1596 }
1597 txn.commit().unwrap();
1598
1599 let file_size = tmpfile.as_file().metadata().unwrap().len();
1600
1601 let txn = db.begin_write().unwrap();
1602 {
1603 let mut table = txn.open_table(table_definition).unwrap();
1604 for i in 0..2048 {
1605 table.remove(&i).unwrap();
1606 }
1607 }
1608 txn.commit().unwrap();
1609
1610 let txn = db.begin_write().unwrap();
1612 {
1613 let mut table = txn.open_table(table_definition).unwrap();
1614 table.insert(0, [].as_slice()).unwrap();
1615 }
1616 txn.commit().unwrap();
1617 let txn = db.begin_write().unwrap();
1618 {
1619 let mut table = txn.open_table(table_definition).unwrap();
1620 table.remove(0).unwrap();
1621 }
1622 txn.commit().unwrap();
1623 let txn = db.begin_write().unwrap();
1624 txn.commit().unwrap();
1625
1626 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1627 assert!(final_file_size < file_size);
1628 }
1629
1630 #[test]
1631 fn create_new_db_in_empty_file() {
1632 let tmpfile = crate::create_tempfile();
1633
1634 let _db = Database::builder()
1635 .create_file(tmpfile.into_file())
1636 .unwrap();
1637 }
1638
1639 #[test]
1640 fn open_missing_file() {
1641 let tmpfile = crate::create_tempfile();
1642
1643 let err = Database::builder()
1644 .open(tmpfile.path().with_extension("missing"))
1645 .unwrap_err();
1646
1647 match err {
1648 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1649 err => panic!("Unexpected error for empty file: {err}"),
1650 }
1651 }
1652
1653 #[test]
1654 fn open_empty_file() {
1655 let tmpfile = crate::create_tempfile();
1656
1657 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1658
1659 match err {
1660 DatabaseError::Storage(StorageError::Io(err))
1661 if err.kind() == ErrorKind::InvalidData => {}
1662 err => panic!("Unexpected error for empty file: {err}"),
1663 }
1664 }
1665}