1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
9 PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
10 TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14 AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
15 MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
16 Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
17 TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
18 UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35 SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37 SystemTableDefinition::new("persistent_savepoints");
38pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
41 TransactionIdWithPagination,
42 PageList,
43> = SystemTableDefinition::new("data_pages_allocated");
44pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
47 SystemTableDefinition::new("data_pages_unreachable");
48pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
51 SystemTableDefinition::new("system_pages_unreachable");
52pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
55pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
56pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64 data: &'a [u8],
65}
66
67impl PageList<'_> {
68 fn required_bytes(len: usize) -> usize {
69 2 + PageNumber::serialized_size() * len
70 }
71
72 pub(crate) fn len(&self) -> usize {
73 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74 }
75
76 pub(crate) fn get(&self, index: usize) -> PageNumber {
77 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78 PageNumber::from_le_bytes(
79 self.data[start..(start + PageNumber::serialized_size())]
80 .try_into()
81 .unwrap(),
82 )
83 }
84}
85
86impl Value for PageList<'_> {
87 type SelfType<'a>
88 = PageList<'a>
89 where
90 Self: 'a;
91 type AsBytes<'a>
92 = &'a [u8]
93 where
94 Self: 'a;
95
96 fn fixed_width() -> Option<usize> {
97 None
98 }
99
100 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101 where
102 Self: 'a,
103 {
104 PageList { data }
105 }
106
107 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108 where
109 Self: 'b,
110 {
111 value.data
112 }
113
114 fn type_name() -> TypeName {
115 TypeName::internal("redb::PageList")
116 }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120 type BaseRefType = PageListMut;
121
122 fn initialize(data: &mut [u8]) {
123 assert!(data.len() >= 8);
124 data[..8].fill(0);
126 }
127
128 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130 }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135 pub(crate) transaction_id: u64,
136 pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140 type SelfType<'a>
141 = TransactionIdWithPagination
142 where
143 Self: 'a;
144 type AsBytes<'a>
145 = [u8; 2 * size_of::<u64>()]
146 where
147 Self: 'a;
148
149 fn fixed_width() -> Option<usize> {
150 Some(2 * size_of::<u64>())
151 }
152
153 fn from_bytes<'a>(data: &'a [u8]) -> Self
154 where
155 Self: 'a,
156 {
157 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159 Self {
160 transaction_id,
161 pagination_id,
162 }
163 }
164
165 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166 where
167 Self: 'b,
168 {
169 let mut result = [0u8; 2 * size_of::<u64>()];
170 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172 result
173 }
174
175 fn type_name() -> TypeName {
176 TypeName::internal("redb::TransactionIdWithPagination")
177 }
178}
179
180impl Key for TransactionIdWithPagination {
181 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182 let value1 = Self::from_bytes(data1);
183 let value2 = Self::from_bytes(data2);
184
185 match value1.transaction_id.cmp(&value2.transaction_id) {
186 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189 }
190 }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195 Deprecated,
196 Region(u32),
197 RegionTracker,
198 TransactionId,
199}
200
201impl Value for AllocatorStateKey {
202 type SelfType<'a> = Self;
203 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
204
205 fn fixed_width() -> Option<usize> {
206 Some(1 + size_of::<u32>())
207 }
208
209 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
210 where
211 Self: 'a,
212 {
213 match data[0] {
214 0..=2 => Self::Deprecated,
216 3 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
217 4 => Self::RegionTracker,
218 5 => Self::TransactionId,
219 _ => unreachable!(),
220 }
221 }
222
223 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
224 where
225 Self: 'a,
226 Self: 'b,
227 {
228 let mut result = Self::AsBytes::default();
229 match value {
230 Self::Region(region) => {
231 result[0] = 3;
232 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
233 }
234 Self::RegionTracker => {
235 result[0] = 4;
236 }
237 Self::TransactionId => {
238 result[0] = 5;
239 }
240 AllocatorStateKey::Deprecated => {
241 result[0] = 0;
242 }
243 }
244
245 result
246 }
247
248 fn type_name() -> TypeName {
249 TypeName::internal("redb::AllocatorStateKey")
250 }
251}
252
253impl Key for AllocatorStateKey {
254 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
255 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
256 }
257}
258
259pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
260 name: &'a str,
261 _key_type: PhantomData<K>,
262 _value_type: PhantomData<V>,
263}
264
265impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
266 pub const fn new(name: &'a str) -> Self {
267 assert!(!name.is_empty());
268 Self {
269 name,
270 _key_type: PhantomData,
271 _value_type: PhantomData,
272 }
273 }
274}
275
276impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
277 fn name(&self) -> &str {
278 self.name
279 }
280}
281
282impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
283
284impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
285 fn clone(&self) -> Self {
286 *self
287 }
288}
289
290impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
291
292impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
293 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
294 write!(
295 f,
296 "{}<{}, {}>",
297 self.name,
298 K::type_name().name(),
299 V::type_name().name()
300 )
301 }
302}
303
304#[derive(Debug)]
306pub struct DatabaseStats {
307 pub(crate) tree_height: u32,
308 pub(crate) allocated_pages: u64,
309 pub(crate) leaf_pages: u64,
310 pub(crate) branch_pages: u64,
311 pub(crate) stored_leaf_bytes: u64,
312 pub(crate) metadata_bytes: u64,
313 pub(crate) fragmented_bytes: u64,
314 pub(crate) page_size: usize,
315}
316
317impl DatabaseStats {
318 pub fn tree_height(&self) -> u32 {
320 self.tree_height
321 }
322
323 pub fn allocated_pages(&self) -> u64 {
325 self.allocated_pages
326 }
327
328 pub fn leaf_pages(&self) -> u64 {
330 self.leaf_pages
331 }
332
333 pub fn branch_pages(&self) -> u64 {
335 self.branch_pages
336 }
337
338 pub fn stored_bytes(&self) -> u64 {
341 self.stored_leaf_bytes
342 }
343
344 pub fn metadata_bytes(&self) -> u64 {
346 self.metadata_bytes
347 }
348
349 pub fn fragmented_bytes(&self) -> u64 {
351 self.fragmented_bytes
352 }
353
354 pub fn page_size(&self) -> usize {
356 self.page_size
357 }
358}
359
360#[derive(Copy, Clone, Debug)]
361#[non_exhaustive]
362pub enum Durability {
363 None,
366 Immediate,
369}
370
371#[derive(Copy, Clone, Debug, PartialEq, Eq)]
374enum InternalDurability {
375 None,
376 Immediate,
377}
378
379pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
381 name: String,
382 namespace: &'s mut SystemNamespace<'db>,
383 tree: BtreeMut<'s, K, V>,
384 transaction_guard: Arc<TransactionGuard>,
385}
386
387impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
388 fn new(
389 name: &str,
390 table_root: Option<BtreeHeader>,
391 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
392 guard: Arc<TransactionGuard>,
393 mem: Arc<TransactionalMemory>,
394 namespace: &'s mut SystemNamespace<'db>,
395 ) -> SystemTable<'db, 's, K, V> {
396 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
399 SystemTable {
400 name: name.to_string(),
401 namespace,
402 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
403 transaction_guard: guard,
404 }
405 }
406
407 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
408 where
409 K: 'a,
410 {
411 self.tree.get(key.borrow())
412 }
413
414 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
415 where
416 K: 'a,
417 KR: Borrow<K::SelfType<'a>> + 'a,
418 {
419 self.tree
420 .range(&range)
421 .map(|x| Range::new(x, self.transaction_guard.clone()))
422 }
423
424 pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
425 &mut self,
426 range: impl RangeBounds<KR> + 'a,
427 predicate: F,
428 ) -> Result<ExtractIf<'_, K, V, F>>
429 where
430 KR: Borrow<K::SelfType<'a>> + 'a,
431 {
432 self.tree
433 .extract_from_if(&range, predicate)
434 .map(ExtractIf::new)
435 }
436
437 pub fn insert<'k, 'v>(
438 &mut self,
439 key: impl Borrow<K::SelfType<'k>>,
440 value: impl Borrow<V::SelfType<'v>>,
441 ) -> Result<Option<AccessGuard<'_, V>>> {
442 let value_len = V::as_bytes(value.borrow()).as_ref().len();
443 if value_len > MAX_VALUE_LENGTH {
444 return Err(StorageError::ValueTooLarge(value_len));
445 }
446 let key_len = K::as_bytes(key.borrow()).as_ref().len();
447 if key_len > MAX_VALUE_LENGTH {
448 return Err(StorageError::ValueTooLarge(key_len));
449 }
450 if value_len + key_len > MAX_PAIR_LENGTH {
451 return Err(StorageError::ValueTooLarge(value_len + key_len));
452 }
453 self.tree.insert(key.borrow(), value.borrow())
454 }
455
456 pub fn remove<'a>(
457 &mut self,
458 key: impl Borrow<K::SelfType<'a>>,
459 ) -> Result<Option<AccessGuard<'_, V>>>
460 where
461 K: 'a,
462 {
463 self.tree.remove(key.borrow())
464 }
465}
466
467impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
468 pub fn insert_reserve<'a>(
469 &mut self,
470 key: impl Borrow<K::SelfType<'a>>,
471 value_length: usize,
472 ) -> Result<AccessGuardMutInPlace<'_, V>> {
473 if value_length > MAX_VALUE_LENGTH {
474 return Err(StorageError::ValueTooLarge(value_length));
475 }
476 let key_len = K::as_bytes(key.borrow()).as_ref().len();
477 if key_len > MAX_VALUE_LENGTH {
478 return Err(StorageError::ValueTooLarge(key_len));
479 }
480 if value_length + key_len > MAX_PAIR_LENGTH {
481 return Err(StorageError::ValueTooLarge(value_length + key_len));
482 }
483 self.tree.insert_reserve(key.borrow(), value_length)
484 }
485}
486
487impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
488 fn drop(&mut self) {
489 self.namespace.close_table(
490 &self.name,
491 &self.tree,
492 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
493 );
494 }
495}
496
497struct SystemNamespace<'db> {
498 table_tree: TableTreeMut<'db>,
499 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
500 transaction_guard: Arc<TransactionGuard>,
501}
502
503impl<'db> SystemNamespace<'db> {
504 fn new(
505 root_page: Option<BtreeHeader>,
506 guard: Arc<TransactionGuard>,
507 mem: Arc<TransactionalMemory>,
508 ) -> Self {
509 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
512 let freed_pages = Arc::new(Mutex::new(vec![]));
513 Self {
514 table_tree: TableTreeMut::new(
515 root_page,
516 guard.clone(),
517 mem,
518 freed_pages.clone(),
519 ignore,
520 ),
521 freed_pages,
522 transaction_guard: guard.clone(),
523 }
524 }
525
526 fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
527 self.freed_pages.clone()
528 }
529
530 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
531 &'s mut self,
532 transaction: &'txn WriteTransaction,
533 definition: SystemTableDefinition<K, V>,
534 ) -> Result<SystemTable<'db, 's, K, V>> {
535 let (root, _) = self
536 .table_tree
537 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
538 .map_err(|e| {
539 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
540 })?;
541 transaction.dirty.store(true, Ordering::Release);
542
543 Ok(SystemTable::new(
544 definition.name(),
545 root,
546 self.freed_pages.clone(),
547 self.transaction_guard.clone(),
548 transaction.mem.clone(),
549 self,
550 ))
551 }
552
553 fn close_table<K: Key + 'static, V: Value + 'static>(
554 &mut self,
555 name: &str,
556 table: &BtreeMut<K, V>,
557 length: u64,
558 ) {
559 self.table_tree
560 .stage_update_table_root(name, table.get_root(), length);
561 }
562}
563
564struct TableNamespace<'db> {
565 open_tables: HashMap<String, &'static panic::Location<'static>>,
566 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
567 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
568 table_tree: TableTreeMut<'db>,
569}
570
571impl TableNamespace<'_> {
572 fn new(
573 root_page: Option<BtreeHeader>,
574 guard: Arc<TransactionGuard>,
575 mem: Arc<TransactionalMemory>,
576 ) -> Self {
577 let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
578 let freed_pages = Arc::new(Mutex::new(vec![]));
579 let table_tree = TableTreeMut::new(
580 root_page,
581 guard,
582 mem,
583 freed_pages.clone(),
586 allocated.clone(),
587 );
588 Self {
589 open_tables: Default::default(),
590 table_tree,
591 freed_pages,
592 allocated_pages: allocated,
593 }
594 }
595
596 fn set_dirty(&mut self, transaction: &WriteTransaction) {
597 transaction.dirty.store(true, Ordering::Release);
598 if !transaction.transaction_tracker.any_savepoint_exists() {
599 *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
602 }
603 }
604
605 fn set_root(&mut self, root: Option<BtreeHeader>) {
606 assert!(self.open_tables.is_empty());
607 self.table_tree.set_root(root);
608 }
609
610 #[track_caller]
611 fn inner_open<K: Key + 'static, V: Value + 'static>(
612 &mut self,
613 name: &str,
614 table_type: TableType,
615 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
616 if let Some(location) = self.open_tables.get(name) {
617 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
618 }
619
620 let root = self
621 .table_tree
622 .get_or_create_table::<K, V>(name, table_type)?;
623 self.open_tables
624 .insert(name.to_string(), panic::Location::caller());
625
626 Ok(root)
627 }
628
629 #[track_caller]
630 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
631 &mut self,
632 transaction: &'txn WriteTransaction,
633 definition: MultimapTableDefinition<K, V>,
634 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
635 #[cfg(feature = "logging")]
636 debug!("Opening multimap table: {definition}");
637 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
638 self.set_dirty(transaction);
639
640 Ok(MultimapTable::new(
641 definition.name(),
642 root,
643 length,
644 self.freed_pages.clone(),
645 self.allocated_pages.clone(),
646 transaction.mem.clone(),
647 transaction,
648 ))
649 }
650
651 #[track_caller]
652 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
653 &mut self,
654 transaction: &'txn WriteTransaction,
655 definition: TableDefinition<K, V>,
656 ) -> Result<Table<'txn, K, V>, TableError> {
657 #[cfg(feature = "logging")]
658 debug!("Opening table: {definition}");
659 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
660 self.set_dirty(transaction);
661
662 Ok(Table::new(
663 definition.name(),
664 root,
665 self.freed_pages.clone(),
666 self.allocated_pages.clone(),
667 transaction.mem.clone(),
668 transaction,
669 ))
670 }
671
672 #[track_caller]
673 fn inner_rename(
674 &mut self,
675 name: &str,
676 new_name: &str,
677 table_type: TableType,
678 ) -> Result<(), TableError> {
679 if let Some(location) = self.open_tables.get(name) {
680 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
681 }
682
683 self.table_tree.rename_table(name, new_name, table_type)
684 }
685
686 #[track_caller]
687 fn rename_table(
688 &mut self,
689 transaction: &WriteTransaction,
690 name: &str,
691 new_name: &str,
692 ) -> Result<(), TableError> {
693 #[cfg(feature = "logging")]
694 debug!("Renaming table: {name} to {new_name}");
695 self.set_dirty(transaction);
696 self.inner_rename(name, new_name, TableType::Normal)
697 }
698
699 #[track_caller]
700 fn rename_multimap_table(
701 &mut self,
702 transaction: &WriteTransaction,
703 name: &str,
704 new_name: &str,
705 ) -> Result<(), TableError> {
706 #[cfg(feature = "logging")]
707 debug!("Renaming multimap table: {name} to {new_name}");
708 self.set_dirty(transaction);
709 self.inner_rename(name, new_name, TableType::Multimap)
710 }
711
712 #[track_caller]
713 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
714 if let Some(location) = self.open_tables.get(name) {
715 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
716 }
717
718 self.table_tree.delete_table(name, table_type)
719 }
720
721 #[track_caller]
722 fn delete_table(
723 &mut self,
724 transaction: &WriteTransaction,
725 name: &str,
726 ) -> Result<bool, TableError> {
727 #[cfg(feature = "logging")]
728 debug!("Deleting table: {name}");
729 self.set_dirty(transaction);
730 self.inner_delete(name, TableType::Normal)
731 }
732
733 #[track_caller]
734 fn delete_multimap_table(
735 &mut self,
736 transaction: &WriteTransaction,
737 name: &str,
738 ) -> Result<bool, TableError> {
739 #[cfg(feature = "logging")]
740 debug!("Deleting multimap table: {name}");
741 self.set_dirty(transaction);
742 self.inner_delete(name, TableType::Multimap)
743 }
744
745 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
746 &mut self,
747 name: &str,
748 table: &BtreeMut<K, V>,
749 length: u64,
750 ) {
751 self.open_tables.remove(name).unwrap();
752 self.table_tree
753 .stage_update_table_root(name, table.get_root(), length);
754 }
755}
756
757pub struct WriteTransaction {
761 transaction_tracker: Arc<TransactionTracker>,
762 mem: Arc<TransactionalMemory>,
763 transaction_guard: Arc<TransactionGuard>,
764 transaction_id: TransactionId,
765 tables: Mutex<TableNamespace<'static>>,
766 system_tables: Mutex<SystemNamespace<'static>>,
767 completed: bool,
768 dirty: AtomicBool,
769 durability: InternalDurability,
770 two_phase_commit: bool,
771 shrink_policy: ShrinkPolicy,
772 quick_repair: bool,
773 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
775 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
776}
777
778impl WriteTransaction {
779 pub(crate) fn new(
780 guard: TransactionGuard,
781 transaction_tracker: Arc<TransactionTracker>,
782 mem: Arc<TransactionalMemory>,
783 ) -> Result<Self> {
784 let transaction_id = guard.id();
785 let guard = Arc::new(guard);
786
787 let root_page = mem.get_data_root();
788 let system_page = mem.get_system_root();
789
790 let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
791 let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
792
793 Ok(Self {
794 transaction_tracker,
795 mem: mem.clone(),
796 transaction_guard: guard.clone(),
797 transaction_id,
798 tables: Mutex::new(tables),
799 system_tables: Mutex::new(system_tables),
800 completed: false,
801 dirty: AtomicBool::new(false),
802 durability: InternalDurability::Immediate,
803 two_phase_commit: false,
804 quick_repair: false,
805 shrink_policy: ShrinkPolicy::Default,
806 created_persistent_savepoints: Mutex::new(Default::default()),
807 deleted_persistent_savepoints: Mutex::new(vec![]),
808 })
809 }
810
811 pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
812 self.shrink_policy = shrink_policy;
813 }
814
815 pub(crate) fn pending_free_pages(&self) -> Result<bool> {
816 let mut system_tables = self.system_tables.lock().unwrap();
817 if system_tables
818 .open_system_table(self, DATA_FREED_TABLE)?
819 .tree
820 .get_root()
821 .is_some()
822 {
823 return Ok(true);
824 }
825 if system_tables
826 .open_system_table(self, SYSTEM_FREED_TABLE)?
827 .tree
828 .get_root()
829 .is_some()
830 {
831 return Ok(true);
832 }
833
834 Ok(false)
835 }
836
837 #[cfg(debug_assertions)]
838 pub fn print_allocated_page_debug(&self) {
839 let mut all_allocated: HashSet<PageNumber> =
840 HashSet::from_iter(self.mem.all_allocated_pages());
841
842 self.mem.debug_check_allocator_consistency();
843
844 let mut table_pages = vec![];
845 self.tables
846 .lock()
847 .unwrap()
848 .table_tree
849 .visit_all_pages(|path| {
850 table_pages.push(path.page_number());
851 Ok(())
852 })
853 .unwrap();
854 println!("Tables");
855 for p in table_pages {
856 assert!(all_allocated.remove(&p));
857 println!("{p:?}");
858 }
859
860 let mut system_table_pages = vec![];
861 self.system_tables
862 .lock()
863 .unwrap()
864 .table_tree
865 .visit_all_pages(|path| {
866 system_table_pages.push(path.page_number());
867 Ok(())
868 })
869 .unwrap();
870 println!("System tables");
871 for p in system_table_pages {
872 assert!(all_allocated.remove(&p));
873 println!("{p:?}");
874 }
875
876 {
877 println!("Pending free (in data freed table)");
878 let mut system_tables = self.system_tables.lock().unwrap();
879 let data_freed = system_tables
880 .open_system_table(self, DATA_FREED_TABLE)
881 .unwrap();
882 for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
883 let (_, entry) = entry.unwrap();
884 let value = entry.value();
885 for i in 0..value.len() {
886 let p = value.get(i);
887 assert!(all_allocated.remove(&p));
888 println!("{p:?}");
889 }
890 }
891 }
892 {
893 println!("Pending free (in system freed table)");
894 let mut system_tables = self.system_tables.lock().unwrap();
895 let system_freed = system_tables
896 .open_system_table(self, SYSTEM_FREED_TABLE)
897 .unwrap();
898 for entry in system_freed
899 .range::<TransactionIdWithPagination>(..)
900 .unwrap()
901 {
902 let (_, entry) = entry.unwrap();
903 let value = entry.value();
904 for i in 0..value.len() {
905 let p = value.get(i);
906 assert!(all_allocated.remove(&p));
907 println!("{p:?}");
908 }
909 }
910 }
911 {
912 let tables = self.tables.lock().unwrap();
913 let pages = tables.freed_pages.lock().unwrap();
914 if !pages.is_empty() {
915 println!("Pages in in-memory data freed_pages");
916 for p in pages.iter() {
917 println!("{p:?}");
918 assert!(all_allocated.remove(p));
919 }
920 }
921 }
922 {
923 let system_tables = self.system_tables.lock().unwrap();
924 let pages = system_tables.freed_pages.lock().unwrap();
925 if !pages.is_empty() {
926 println!("Pages in in-memory system freed_pages");
927 for p in pages.iter() {
928 println!("{p:?}");
929 assert!(all_allocated.remove(p));
930 }
931 }
932 }
933 if !all_allocated.is_empty() {
934 println!("Leaked pages");
935 for p in all_allocated {
936 println!("{p:?}");
937 }
938 }
939 }
940
941 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
950 if self.durability != InternalDurability::Immediate {
951 return Err(SavepointError::InvalidSavepoint);
952 }
953
954 let mut savepoint = self.ephemeral_savepoint()?;
955
956 let mut system_tables = self.system_tables.lock().unwrap();
957
958 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
959 next_table.insert((), savepoint.get_id().next())?;
960 drop(next_table);
961
962 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
963 savepoint_table.insert(
964 savepoint.get_id(),
965 SerializedSavepoint::from_savepoint(&savepoint),
966 )?;
967
968 savepoint.set_persistent();
969
970 self.created_persistent_savepoints
971 .lock()
972 .unwrap()
973 .insert(savepoint.get_id());
974
975 Ok(savepoint.get_id().0)
976 }
977
978 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
979 self.transaction_guard.clone()
980 }
981
982 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
983 let mut system_tables = self.system_tables.lock().unwrap();
984 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
985 let value = next_table.get(())?;
986 if let Some(next_id) = value {
987 Ok(Some(next_id.value()))
988 } else {
989 Ok(None)
990 }
991 }
992
993 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
995 let mut system_tables = self.system_tables.lock().unwrap();
996 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
997 let value = table.get(SavepointId(id))?;
998
999 value
1000 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1001 .ok_or(SavepointError::InvalidSavepoint)
1002 }
1003
1004 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1011 if self.durability != InternalDurability::Immediate {
1012 return Err(SavepointError::InvalidSavepoint);
1013 }
1014 let mut system_tables = self.system_tables.lock().unwrap();
1015 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1016 let savepoint = table.remove(SavepointId(id))?;
1017 if let Some(serialized) = savepoint {
1018 let savepoint = serialized
1019 .value()
1020 .to_savepoint(self.transaction_tracker.clone());
1021 self.deleted_persistent_savepoints
1022 .lock()
1023 .unwrap()
1024 .push((savepoint.get_id(), savepoint.get_transaction_id()));
1025 Ok(true)
1026 } else {
1027 Ok(false)
1028 }
1029 }
1030
1031 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1033 let mut system_tables = self.system_tables.lock().unwrap();
1034 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1035 let mut savepoints = vec![];
1036 for savepoint in table.range::<SavepointId>(..)? {
1037 savepoints.push(savepoint?.0.value().0);
1038 }
1039 Ok(savepoints.into_iter())
1040 }
1041
1042 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1044 let id = self
1045 .transaction_tracker
1046 .register_read_transaction(&self.mem)?;
1047
1048 Ok(TransactionGuard::new_read(
1049 id,
1050 self.transaction_tracker.clone(),
1051 ))
1052 }
1053
1054 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1055 let transaction_id = self.allocate_read_transaction()?.leak();
1056 let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1057 Ok((id, transaction_id))
1058 }
1059
1060 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1066 if self.dirty.load(Ordering::Acquire) {
1067 return Err(SavepointError::InvalidSavepoint);
1068 }
1069
1070 let (id, transaction_id) = self.allocate_savepoint()?;
1071 #[cfg(feature = "logging")]
1072 debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1073
1074 let root = self.mem.get_data_root();
1075 let savepoint = Savepoint::new_ephemeral(
1076 &self.mem,
1077 self.transaction_tracker.clone(),
1078 id,
1079 transaction_id,
1080 root,
1081 );
1082
1083 Ok(savepoint)
1084 }
1085
1086 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1090 assert_eq!(
1092 std::ptr::from_ref(self.transaction_tracker.as_ref()),
1093 savepoint.db_address()
1094 );
1095
1096 if !self
1097 .transaction_tracker
1098 .is_valid_savepoint(savepoint.get_id())
1099 {
1100 return Err(SavepointError::InvalidSavepoint);
1101 }
1102 #[cfg(feature = "logging")]
1103 debug!(
1104 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1105 savepoint.get_id(),
1106 self.transaction_id
1107 );
1108 assert_eq!(self.mem.get_version(), savepoint.get_version());
1111 self.dirty.store(true, Ordering::Release);
1112
1113 {
1123 self.tables
1124 .lock()
1125 .unwrap()
1126 .set_root(savepoint.get_user_root());
1127 }
1128
1129 let txn_id = savepoint.get_transaction_id().next().raw_id();
1131 {
1132 let lower = TransactionIdWithPagination {
1133 transaction_id: txn_id,
1134 pagination_id: 0,
1135 };
1136 let mut system_tables = self.system_tables.lock().unwrap();
1137 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1138 for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1139 entry?;
1140 }
1141 }
1143
1144 {
1146 let tables = self.tables.lock().unwrap();
1147 let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1148 let mut system_tables = self.system_tables.lock().unwrap();
1149 let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1150 let lower = TransactionIdWithPagination {
1151 transaction_id: txn_id,
1152 pagination_id: 0,
1153 };
1154 for entry in data_allocated.range(lower..)? {
1155 let (_, value) = entry?;
1156 for i in 0..value.value().len() {
1157 data_freed_pages.push(value.value().get(i));
1158 }
1159 }
1160 }
1161
1162 self.transaction_tracker
1165 .invalidate_savepoints_after(savepoint.get_id());
1166 for persistent_savepoint in self.list_persistent_savepoints()? {
1167 if persistent_savepoint > savepoint.get_id().0 {
1168 self.delete_persistent_savepoint(persistent_savepoint)?;
1169 }
1170 }
1171
1172 Ok(())
1173 }
1174
1175 pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1181 let created = !self
1182 .created_persistent_savepoints
1183 .lock()
1184 .unwrap()
1185 .is_empty();
1186 let deleted = !self
1187 .deleted_persistent_savepoints
1188 .lock()
1189 .unwrap()
1190 .is_empty();
1191 if (created || deleted) && !matches!(durability, Durability::Immediate) {
1192 return Err(SetDurabilityError::PersistentSavepointModified);
1193 }
1194
1195 self.durability = match durability {
1196 Durability::None => InternalDurability::None,
1197 Durability::Immediate => InternalDurability::Immediate,
1198 };
1199
1200 Ok(())
1201 }
1202
1203 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1243 self.two_phase_commit = enabled;
1244 }
1245
1246 pub fn set_quick_repair(&mut self, enabled: bool) {
1257 self.quick_repair = enabled;
1258 }
1259
1260 #[track_caller]
1264 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1265 &'txn self,
1266 definition: TableDefinition<K, V>,
1267 ) -> Result<Table<'txn, K, V>, TableError> {
1268 self.tables.lock().unwrap().open_table(self, definition)
1269 }
1270
1271 #[track_caller]
1275 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1276 &'txn self,
1277 definition: MultimapTableDefinition<K, V>,
1278 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1279 self.tables
1280 .lock()
1281 .unwrap()
1282 .open_multimap_table(self, definition)
1283 }
1284
1285 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1286 &self,
1287 name: &str,
1288 table: &BtreeMut<K, V>,
1289 length: u64,
1290 ) {
1291 self.tables.lock().unwrap().close_table(name, table, length);
1292 }
1293
1294 pub fn rename_table(
1296 &self,
1297 definition: impl TableHandle,
1298 new_name: impl TableHandle,
1299 ) -> Result<(), TableError> {
1300 let name = definition.name().to_string();
1301 drop(definition);
1303 self.tables
1304 .lock()
1305 .unwrap()
1306 .rename_table(self, &name, new_name.name())
1307 }
1308
1309 pub fn rename_multimap_table(
1311 &self,
1312 definition: impl MultimapTableHandle,
1313 new_name: impl MultimapTableHandle,
1314 ) -> Result<(), TableError> {
1315 let name = definition.name().to_string();
1316 drop(definition);
1318 self.tables
1319 .lock()
1320 .unwrap()
1321 .rename_multimap_table(self, &name, new_name.name())
1322 }
1323
1324 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1328 let name = definition.name().to_string();
1329 drop(definition);
1331 self.tables.lock().unwrap().delete_table(self, &name)
1332 }
1333
1334 pub fn delete_multimap_table(
1338 &self,
1339 definition: impl MultimapTableHandle,
1340 ) -> Result<bool, TableError> {
1341 let name = definition.name().to_string();
1342 drop(definition);
1344 self.tables
1345 .lock()
1346 .unwrap()
1347 .delete_multimap_table(self, &name)
1348 }
1349
1350 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1352 self.tables
1353 .lock()
1354 .unwrap()
1355 .table_tree
1356 .list_tables(TableType::Normal)
1357 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1358 }
1359
1360 pub fn list_multimap_tables(
1362 &self,
1363 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1364 self.tables
1365 .lock()
1366 .unwrap()
1367 .table_tree
1368 .list_tables(TableType::Multimap)
1369 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1370 }
1371
1372 pub fn commit(mut self) -> Result<(), CommitError> {
1377 self.completed = true;
1379 self.commit_inner()
1380 }
1381
1382 fn commit_inner(&mut self) -> Result<(), CommitError> {
1383 if self.quick_repair {
1385 self.two_phase_commit = true;
1386 }
1387
1388 let (user_root, allocated_pages, data_freed) =
1389 self.tables.lock().unwrap().table_tree.flush_and_close()?;
1390
1391 self.store_data_freed_pages(data_freed)?;
1392 self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1393
1394 #[cfg(feature = "logging")]
1395 debug!(
1396 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1397 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1398 );
1399 match self.durability {
1400 InternalDurability::None => self.non_durable_commit(user_root)?,
1401 InternalDurability::Immediate => self.durable_commit(user_root)?,
1402 }
1403
1404 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1405 self.transaction_tracker
1406 .deallocate_savepoint(*savepoint, *transaction);
1407 }
1408
1409 assert!(
1410 self.system_tables
1411 .lock()
1412 .unwrap()
1413 .system_freed_pages()
1414 .lock()
1415 .unwrap()
1416 .is_empty()
1417 );
1418 assert!(
1419 self.tables
1420 .lock()
1421 .unwrap()
1422 .freed_pages
1423 .lock()
1424 .unwrap()
1425 .is_empty()
1426 );
1427
1428 #[cfg(feature = "logging")]
1429 debug!(
1430 "Finished commit of transaction id={:?}",
1431 self.transaction_id
1432 );
1433
1434 Ok(())
1435 }
1436
1437 fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1438 let mut system_tables = self.system_tables.lock().unwrap();
1439 let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1440 let mut pagination_counter = 0;
1441 while !freed_pages.is_empty() {
1442 let chunk_size = 400;
1443 let buffer_size = PageList::required_bytes(chunk_size);
1444 let key = TransactionIdWithPagination {
1445 transaction_id: self.transaction_id.raw_id(),
1446 pagination_id: pagination_counter,
1447 };
1448 let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
1449
1450 let len = freed_pages.len();
1451 access_guard.as_mut().clear();
1452 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1453 debug_assert!(
1455 self.mem.is_allocated(page),
1456 "Page is not allocated: {page:?}"
1457 );
1458 debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1459 access_guard.as_mut().push_back(page);
1460 }
1461
1462 pagination_counter += 1;
1463 }
1464
1465 Ok(())
1466 }
1467
1468 fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1469 let mut system_tables = self.system_tables.lock().unwrap();
1470 let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1471 let mut pagination_counter = 0;
1472 while !data_allocated_pages.is_empty() {
1473 let chunk_size = 400;
1474 let buffer_size = PageList::required_bytes(chunk_size);
1475 let key = TransactionIdWithPagination {
1476 transaction_id: self.transaction_id.raw_id(),
1477 pagination_id: pagination_counter,
1478 };
1479 let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
1480
1481 let len = data_allocated_pages.len();
1482 access_guard.as_mut().clear();
1483 for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1484 debug_assert!(
1488 self.mem.is_allocated(page),
1489 "Page is not allocated: {page:?}"
1490 );
1491 debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1492 access_guard.as_mut().push_back(page);
1493 }
1494
1495 pagination_counter += 1;
1496 }
1497
1498 let oldest = self
1500 .transaction_tracker
1501 .oldest_savepoint()
1502 .map_or(u64::MAX, |(_, x)| x.raw_id());
1503 let key = TransactionIdWithPagination {
1504 transaction_id: oldest,
1505 pagination_id: 0,
1506 };
1507 for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1508 entry?;
1509 }
1510
1511 Ok(())
1512 }
1513
1514 pub fn abort(mut self) -> Result {
1518 self.completed = true;
1520 self.abort_inner()
1521 }
1522
1523 fn abort_inner(&mut self) -> Result {
1524 #[cfg(feature = "logging")]
1525 debug!("Aborting transaction id={:?}", self.transaction_id);
1526 self.tables
1527 .lock()
1528 .unwrap()
1529 .table_tree
1530 .clear_root_updates_and_close();
1531 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1532 match self.delete_persistent_savepoint(savepoint.0) {
1533 Ok(_) => {}
1534 Err(err) => match err {
1535 SavepointError::InvalidSavepoint => {
1536 unreachable!();
1537 }
1538 SavepointError::Storage(storage_err) => {
1539 return Err(storage_err);
1540 }
1541 },
1542 }
1543 }
1544 self.mem.rollback_uncommitted_writes()?;
1545 #[cfg(feature = "logging")]
1546 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1547 Ok(())
1548 }
1549
1550 pub(crate) fn durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
1551 let free_until_transaction = self
1552 .transaction_tracker
1553 .oldest_live_read_transaction()
1554 .map_or(self.transaction_id, |x| x.next());
1555 self.process_freed_pages(free_until_transaction)?;
1556
1557 let mut system_tables = self.system_tables.lock().unwrap();
1558 let system_freed_pages = system_tables.system_freed_pages();
1559 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1560 system_tree
1561 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1562 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1563
1564 if self.quick_repair {
1565 system_tree.create_table_and_flush_table_root(
1566 ALLOCATOR_STATE_TABLE_NAME,
1567 |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
1568 let mut pagination_counter = 0;
1569
1570 loop {
1571 let num_regions = self
1572 .mem
1573 .reserve_allocator_state(tree, self.transaction_id)?;
1574
1575 self.store_system_freed_pages(
1579 system_tree_ref,
1580 system_freed_pages.clone(),
1581 None,
1582 &mut pagination_counter,
1583 )?;
1584
1585 if self.mem.try_save_allocator_state(tree, num_regions)? {
1586 return Ok(());
1587 }
1588
1589 while let Some(guards) = tree.last()? {
1594 let key = guards.0.value();
1595 drop(guards);
1596 tree.remove(&key)?;
1597 }
1598 }
1599 },
1600 )?;
1601 }
1602
1603 let system_root = system_tree.finalize_dirty_checksums()?;
1604
1605 self.mem.commit(
1606 user_root,
1607 system_root,
1608 self.transaction_id,
1609 self.two_phase_commit,
1610 self.shrink_policy,
1611 )?;
1612
1613 self.transaction_tracker.clear_pending_non_durable_commits();
1615
1616 for page in system_freed_pages.lock().unwrap().drain(..) {
1619 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1620 }
1621
1622 Ok(())
1623 }
1624
1625 pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
1627 let mut free_until_transaction = self
1628 .transaction_tracker
1629 .oldest_live_read_nondurable_transaction()
1630 .map_or(self.transaction_id, |x| x.next());
1631 if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint() {
1639 free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
1640 }
1641 self.process_freed_pages_nondurable(free_until_transaction)?;
1642
1643 let mut post_commit_frees = vec![];
1644
1645 let system_root = {
1646 let mut system_tables = self.system_tables.lock().unwrap();
1647 let system_freed_pages = system_tables.system_freed_pages();
1648 system_tables.table_tree.flush_table_root_updates()?;
1649 for page in system_freed_pages
1650 .lock()
1651 .unwrap()
1652 .extract_if(.., |p| self.mem.unpersisted(*p))
1653 {
1654 post_commit_frees.push(page);
1655 }
1656 self.store_system_freed_pages(
1659 &mut system_tables.table_tree,
1660 system_freed_pages,
1661 Some(&mut post_commit_frees),
1662 &mut 0,
1663 )?;
1664
1665 system_tables
1666 .table_tree
1667 .flush_table_root_updates()?
1668 .finalize_dirty_checksums()?
1669 };
1670
1671 self.mem
1672 .non_durable_commit(user_root, system_root, self.transaction_id)?;
1673 self.transaction_tracker.register_non_durable_commit(
1676 self.transaction_id,
1677 self.mem.get_last_durable_transaction_id()?,
1678 );
1679
1680 for page in post_commit_frees {
1681 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1682 }
1683
1684 Ok(())
1685 }
1686
1687 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1690 let mut progress = false;
1691
1692 let mut highest_pages = BTreeMap::new();
1694 let mut tables = self.tables.lock().unwrap();
1695 let table_tree = &mut tables.table_tree;
1696 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1697 let mut system_tables = self.system_tables.lock().unwrap();
1698 let system_table_tree = &mut system_tables.table_tree;
1699 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1700
1701 let mut relocation_map = HashMap::new();
1703 for path in highest_pages.into_values().rev() {
1704 if relocation_map.contains_key(&path.page_number()) {
1705 continue;
1706 }
1707 let old_page = self.mem.get_page(path.page_number())?;
1708 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1709 let new_page_number = new_page.get_page_number();
1710 new_page.memory_mut()[0] = old_page.memory()[0];
1713 drop(new_page);
1714 if new_page_number < path.page_number() {
1716 relocation_map.insert(path.page_number(), new_page_number);
1717 for parent in path.parents() {
1718 if relocation_map.contains_key(parent) {
1719 continue;
1720 }
1721 let old_parent = self.mem.get_page(*parent)?;
1722 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1723 let new_page_number = new_page.get_page_number();
1724 new_page.memory_mut()[0] = old_parent.memory()[0];
1727 drop(new_page);
1728 relocation_map.insert(*parent, new_page_number);
1729 }
1730 } else {
1731 self.mem
1732 .free(new_page_number, &mut PageTrackerPolicy::Ignore);
1733 break;
1734 }
1735 }
1736
1737 if !relocation_map.is_empty() {
1738 progress = true;
1739 }
1740
1741 table_tree.relocate_tables(&relocation_map)?;
1742 system_table_tree.relocate_tables(&relocation_map)?;
1743
1744 Ok(progress)
1745 }
1746
1747 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1750 assert_eq!(PageNumber::serialized_size(), 8);
1752
1753 let mut system_tables = self.system_tables.lock().unwrap();
1755 {
1756 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1757 let key = TransactionIdWithPagination {
1758 transaction_id: free_until.raw_id(),
1759 pagination_id: 0,
1760 };
1761 for entry in data_freed.extract_from_if(..key, |_, _| true)? {
1762 let (_, page_list) = entry?;
1763 for i in 0..page_list.value().len() {
1764 self.mem
1765 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1766 }
1767 }
1768 }
1769
1770 {
1772 let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
1773 let key = TransactionIdWithPagination {
1774 transaction_id: free_until.raw_id(),
1775 pagination_id: 0,
1776 };
1777 for entry in system_freed.extract_from_if(..key, |_, _| true)? {
1778 let (_, page_list) = entry?;
1779 for i in 0..page_list.value().len() {
1780 self.mem
1781 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1782 }
1783 }
1784 }
1785
1786 Ok(())
1787 }
1788
1789 fn process_freed_pages_nondurable_helper(
1790 &mut self,
1791 free_until: TransactionId,
1792 definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
1793 ) -> Result<Vec<TransactionId>> {
1794 let mut processed = vec![];
1795 let mut system_tables = self.system_tables.lock().unwrap();
1796
1797 let last_key = TransactionIdWithPagination {
1798 transaction_id: free_until.raw_id(),
1799 pagination_id: 0,
1800 };
1801 let oldest_unprocessed = self
1802 .transaction_tracker
1803 .oldest_unprocessed_non_durable_commit()
1804 .map_or(free_until.raw_id(), |x| x.raw_id());
1805 let first_key = TransactionIdWithPagination {
1806 transaction_id: oldest_unprocessed,
1807 pagination_id: 0,
1808 };
1809 let mut data_freed = system_tables.open_system_table(self, definition)?;
1810
1811 let mut candidate_transactions = vec![];
1812 for entry in data_freed.range(first_key..last_key)? {
1813 let (key, _) = entry?;
1814 let transaction_id = TransactionId::new(key.value().transaction_id);
1815 if self
1816 .transaction_tracker
1817 .is_unprocessed_non_durable_commit(transaction_id)
1818 {
1819 candidate_transactions.push(transaction_id);
1820 }
1821 }
1822 for transaction_id in candidate_transactions {
1823 let mut key = TransactionIdWithPagination {
1824 transaction_id: transaction_id.raw_id(),
1825 pagination_id: 0,
1826 };
1827 loop {
1828 let Some(entry) = data_freed.get(&key)? else {
1829 break;
1830 };
1831 let pages = entry.value();
1832 let mut new_pages = vec![];
1833 for i in 0..pages.len() {
1834 let page = pages.get(i);
1835 if !self
1836 .mem
1837 .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
1838 {
1839 new_pages.push(page);
1840 }
1841 }
1842 if new_pages.len() != pages.len() {
1843 drop(entry);
1844 if new_pages.is_empty() {
1845 data_freed.remove(&key)?;
1846 } else {
1847 let required = PageList::required_bytes(new_pages.len());
1848 let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
1849 for page in new_pages {
1850 page_list_mut.as_mut().push_back(page);
1851 }
1852 }
1853 }
1854 key.pagination_id += 1;
1855 }
1856 processed.push(transaction_id);
1857 }
1858
1859 Ok(processed)
1860 }
1861
1862 fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
1869 assert_eq!(PageNumber::serialized_size(), 8);
1871
1872 let mut processed =
1874 self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
1875
1876 processed
1878 .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
1879
1880 for transaction_id in processed {
1881 self.transaction_tracker
1882 .mark_unprocessed_non_durable_commit(transaction_id);
1883 }
1884
1885 Ok(())
1886 }
1887
1888 fn store_system_freed_pages(
1889 &self,
1890 system_tree: &mut TableTreeMut,
1891 system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1892 mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
1893 pagination_counter: &mut u64,
1894 ) -> Result {
1895 assert_eq!(PageNumber::serialized_size(), 8); system_tree.open_table_and_flush_table_root(
1898 SYSTEM_FREED_TABLE.name(),
1899 |system_freed_tree: &mut SystemFreedTree| {
1900 while !system_freed_pages.lock().unwrap().is_empty() {
1901 let chunk_size = 200;
1902 let buffer_size = PageList::required_bytes(chunk_size);
1903 let key = TransactionIdWithPagination {
1904 transaction_id: self.transaction_id.raw_id(),
1905 pagination_id: *pagination_counter,
1906 };
1907 let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
1908
1909 let mut freed_pages = system_freed_pages.lock().unwrap();
1910 let len = freed_pages.len();
1911 access_guard.as_mut().clear();
1912 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1913 if let Some(ref mut unpersisted_pages) = unpersisted_pages
1914 && self.mem.unpersisted(page)
1915 {
1916 unpersisted_pages.push(page);
1917 } else {
1918 access_guard.as_mut().push_back(page);
1919 }
1920 }
1921 drop(access_guard);
1922
1923 *pagination_counter += 1;
1924 }
1925 Ok(())
1926 },
1927 )?;
1928
1929 Ok(())
1930 }
1931
1932 pub fn stats(&self) -> Result<DatabaseStats> {
1934 let tables = self.tables.lock().unwrap();
1935 let table_tree = &tables.table_tree;
1936 let data_tree_stats = table_tree.stats()?;
1937
1938 let system_tables = self.system_tables.lock().unwrap();
1939 let system_table_tree = &system_tables.table_tree;
1940 let system_tree_stats = system_table_tree.stats()?;
1941
1942 let total_metadata_bytes = data_tree_stats.metadata_bytes()
1943 + system_tree_stats.metadata_bytes
1944 + system_tree_stats.stored_leaf_bytes;
1945 let total_fragmented = data_tree_stats.fragmented_bytes()
1946 + system_tree_stats.fragmented_bytes
1947 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1948
1949 Ok(DatabaseStats {
1950 tree_height: data_tree_stats.tree_height(),
1951 allocated_pages: self.mem.count_allocated_pages()?,
1952 leaf_pages: data_tree_stats.leaf_pages(),
1953 branch_pages: data_tree_stats.branch_pages(),
1954 stored_leaf_bytes: data_tree_stats.stored_bytes(),
1955 metadata_bytes: total_metadata_bytes,
1956 fragmented_bytes: total_fragmented,
1957 page_size: self.mem.get_page_size(),
1958 })
1959 }
1960
1961 #[allow(dead_code)]
1962 pub(crate) fn print_debug(&self) -> Result {
1963 let mut tables = self.tables.lock().unwrap();
1965 if let Some(page) = tables
1966 .table_tree
1967 .flush_table_root_updates()
1968 .unwrap()
1969 .finalize_dirty_checksums()
1970 .unwrap()
1971 {
1972 eprintln!("Master tree:");
1973 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1974 Some(page),
1975 PageHint::None,
1976 self.transaction_guard.clone(),
1977 self.mem.clone(),
1978 )?;
1979 master_tree.print_debug(true)?;
1980 }
1981
1982 let mut system_tables = self.system_tables.lock().unwrap();
1984 if let Some(page) = system_tables
1985 .table_tree
1986 .flush_table_root_updates()
1987 .unwrap()
1988 .finalize_dirty_checksums()
1989 .unwrap()
1990 {
1991 eprintln!("System tree:");
1992 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1993 Some(page),
1994 PageHint::None,
1995 self.transaction_guard.clone(),
1996 self.mem.clone(),
1997 )?;
1998 master_tree.print_debug(true)?;
1999 }
2000
2001 Ok(())
2002 }
2003}
2004
2005impl Drop for WriteTransaction {
2006 fn drop(&mut self) {
2007 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2008 #[allow(unused_variables)]
2009 if let Err(error) = self.abort_inner() {
2010 #[cfg(feature = "logging")]
2011 warn!("Failure automatically aborting transaction: {error}");
2012 }
2013 } else if !self.completed && self.mem.storage_failure() {
2014 self.tables
2015 .lock()
2016 .unwrap()
2017 .table_tree
2018 .clear_root_updates_and_close();
2019 }
2020 }
2021}
2022
2023pub struct ReadTransaction {
2027 mem: Arc<TransactionalMemory>,
2028 tree: TableTree,
2029}
2030
2031impl ReadTransaction {
2032 pub(crate) fn new(
2033 mem: Arc<TransactionalMemory>,
2034 guard: TransactionGuard,
2035 ) -> Result<Self, TransactionError> {
2036 let root_page = mem.get_data_root();
2037 let guard = Arc::new(guard);
2038 Ok(Self {
2039 mem: mem.clone(),
2040 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2041 .map_err(TransactionError::Storage)?,
2042 })
2043 }
2044
2045 pub fn open_table<K: Key + 'static, V: Value + 'static>(
2047 &self,
2048 definition: TableDefinition<K, V>,
2049 ) -> Result<ReadOnlyTable<K, V>, TableError> {
2050 let header = self
2051 .tree
2052 .get_table::<K, V>(definition.name(), TableType::Normal)?
2053 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2054
2055 match header {
2056 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2057 definition.name().to_string(),
2058 table_root,
2059 PageHint::Clean,
2060 self.tree.transaction_guard().clone(),
2061 self.mem.clone(),
2062 )?),
2063 InternalTableDefinition::Multimap { .. } => unreachable!(),
2064 }
2065 }
2066
2067 pub fn open_untyped_table(
2069 &self,
2070 handle: impl TableHandle,
2071 ) -> Result<ReadOnlyUntypedTable, TableError> {
2072 let header = self
2073 .tree
2074 .get_table_untyped(handle.name(), TableType::Normal)?
2075 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2076
2077 match header {
2078 InternalTableDefinition::Normal {
2079 table_root,
2080 fixed_key_size,
2081 fixed_value_size,
2082 ..
2083 } => Ok(ReadOnlyUntypedTable::new(
2084 table_root,
2085 fixed_key_size,
2086 fixed_value_size,
2087 self.mem.clone(),
2088 )),
2089 InternalTableDefinition::Multimap { .. } => unreachable!(),
2090 }
2091 }
2092
2093 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2095 &self,
2096 definition: MultimapTableDefinition<K, V>,
2097 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2098 let header = self
2099 .tree
2100 .get_table::<K, V>(definition.name(), TableType::Multimap)?
2101 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2102
2103 match header {
2104 InternalTableDefinition::Normal { .. } => unreachable!(),
2105 InternalTableDefinition::Multimap {
2106 table_root,
2107 table_length,
2108 ..
2109 } => Ok(ReadOnlyMultimapTable::new(
2110 table_root,
2111 table_length,
2112 PageHint::Clean,
2113 self.tree.transaction_guard().clone(),
2114 self.mem.clone(),
2115 )?),
2116 }
2117 }
2118
2119 pub fn open_untyped_multimap_table(
2121 &self,
2122 handle: impl MultimapTableHandle,
2123 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2124 let header = self
2125 .tree
2126 .get_table_untyped(handle.name(), TableType::Multimap)?
2127 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2128
2129 match header {
2130 InternalTableDefinition::Normal { .. } => unreachable!(),
2131 InternalTableDefinition::Multimap {
2132 table_root,
2133 table_length,
2134 fixed_key_size,
2135 fixed_value_size,
2136 ..
2137 } => Ok(ReadOnlyUntypedMultimapTable::new(
2138 table_root,
2139 table_length,
2140 fixed_key_size,
2141 fixed_value_size,
2142 self.mem.clone(),
2143 )),
2144 }
2145 }
2146
2147 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2149 self.tree
2150 .list_tables(TableType::Normal)
2151 .map(|x| x.into_iter().map(UntypedTableHandle::new))
2152 }
2153
2154 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2156 self.tree
2157 .list_tables(TableType::Multimap)
2158 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2159 }
2160
2161 pub fn close(self) -> Result<(), TransactionError> {
2169 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2170 return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
2171 }
2172 Ok(())
2174 }
2175}
2176
2177impl Debug for ReadTransaction {
2178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2179 f.write_str("ReadTransaction")
2180 }
2181}
2182
2183#[cfg(test)]
2184mod test {
2185 use crate::{Database, TableDefinition};
2186
2187 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2188
2189 #[test]
2190 fn transaction_id_persistence() {
2191 let tmpfile = crate::create_tempfile();
2192 let db = Database::create(tmpfile.path()).unwrap();
2193 let write_txn = db.begin_write().unwrap();
2194 {
2195 let mut table = write_txn.open_table(X).unwrap();
2196 table.insert("hello", "world").unwrap();
2197 }
2198 let first_txn_id = write_txn.transaction_id;
2199 write_txn.commit().unwrap();
2200 drop(db);
2201
2202 let db2 = Database::create(tmpfile.path()).unwrap();
2203 let write_txn = db2.begin_write().unwrap();
2204 assert!(write_txn.transaction_id > first_txn_id);
2205 }
2206}