1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DatabaseHeader, DB_HEADER_SIZE, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{hash128_with_seed, PageImpl, PageMut};
11use crate::tree_store::{Page, PageNumber};
12use crate::{CacheStats, StorageBackend};
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::io::ErrorKind;
22use std::sync::atomic::{AtomicBool, Ordering};
23#[cfg(debug_assertions)]
24use std::sync::Arc;
25use std::sync::Mutex;
26use std::thread;
27
28const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
31pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
33pub(super) const MIN_USABLE_PAGES: u32 = 10;
34const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
35
36pub(super) const INITIAL_REGIONS: u32 = 1000; pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
40pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
42
43fn ceil_log2(x: usize) -> u8 {
44 if x.is_power_of_two() {
45 x.trailing_zeros().try_into().unwrap()
46 } else {
47 x.next_power_of_two().trailing_zeros().try_into().unwrap()
48 }
49}
50
51pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
52 hash128_with_seed(data, 0)
53}
54
55struct InMemoryState {
56 header: DatabaseHeader,
57 allocators: Allocators,
58}
59
60impl InMemoryState {
61 fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile) -> Result<Self> {
62 let allocators = if header.recovery_required {
63 Allocators::new(header.layout())
64 } else {
65 Allocators::from_bytes(&header, file)?
66 };
67 Ok(Self { header, allocators })
68 }
69
70 fn get_region(&self, region: u32) -> &BuddyAllocator {
71 &self.allocators.region_allocators[region as usize]
72 }
73
74 fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
75 &mut self.allocators.region_allocators[region as usize]
76 }
77
78 fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
79 &mut self.allocators.region_tracker
80 }
81}
82
83pub(crate) struct TransactionalMemory {
84 allocated_since_commit: Mutex<HashSet<PageNumber>>,
87 needs_recovery: AtomicBool,
90 storage: PagedCachedFile,
91 state: Mutex<InMemoryState>,
92 #[cfg(debug_assertions)]
94 open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
95 #[cfg(debug_assertions)]
97 read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
98 read_from_secondary: AtomicBool,
100 page_size: u32,
101 region_size: u64,
104 region_header_with_padding_size: u64,
105}
106
107impl TransactionalMemory {
108 #[allow(clippy::too_many_arguments)]
109 pub(crate) fn new(
110 file: Box<dyn StorageBackend>,
111 allow_initialize: bool,
113 page_size: usize,
114 requested_region_size: Option<u64>,
115 read_cache_size_bytes: usize,
116 write_cache_size_bytes: usize,
117 ) -> Result<Self, DatabaseError> {
118 assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
119
120 let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
121 let region_size = min(
122 region_size,
123 (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
124 );
125 assert!(region_size.is_power_of_two());
126
127 let storage = PagedCachedFile::new(
128 file,
129 page_size as u64,
130 read_cache_size_bytes,
131 write_cache_size_bytes,
132 )?;
133
134 let initial_storage_len = storage.raw_file_len()?;
135
136 let magic_number: [u8; MAGICNUMBER.len()] =
137 if initial_storage_len >= MAGICNUMBER.len() as u64 {
138 storage
139 .read_direct(0, MAGICNUMBER.len())?
140 .try_into()
141 .unwrap()
142 } else {
143 [0; MAGICNUMBER.len()]
144 };
145
146 if initial_storage_len > 0 {
147 if magic_number != MAGICNUMBER {
149 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
150 }
151 } else {
152 if !allow_initialize {
154 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
155 }
156 }
157
158 if magic_number != MAGICNUMBER {
159 let region_tracker_required_bytes =
160 RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
161 .to_vec()
162 .len();
163
164 let size: u64 = max(
166 MIN_DESIRED_USABLE_BYTES,
167 page_size as u64 * u64::from(MIN_USABLE_PAGES),
168 );
169 let tracker_space =
170 (page_size * ((region_tracker_required_bytes + page_size - 1) / page_size)) as u64;
171 let starting_size = size + tracker_space;
172
173 let layout = DatabaseLayout::calculate(
174 starting_size,
175 (region_size / u64::try_from(page_size).unwrap())
176 .try_into()
177 .unwrap(),
178 page_size.try_into().unwrap(),
179 );
180
181 {
182 let file_len = storage.raw_file_len()?;
183
184 if file_len < layout.len() {
185 storage.resize(layout.len())?;
186 }
187 }
188
189 let mut allocators = Allocators::new(layout);
190
191 let tracker_page = {
193 let tracker_required_pages =
194 (allocators.region_tracker.to_vec().len() + page_size - 1) / page_size;
195 let required_order = ceil_log2(tracker_required_pages);
196 let page_number = allocators.region_allocators[0]
197 .alloc(required_order)
198 .unwrap();
199 PageNumber::new(0, page_number, required_order)
200 };
201
202 let mut header = DatabaseHeader::new(
203 layout,
204 TransactionId::new(0),
205 FILE_FORMAT_VERSION2,
206 tracker_page,
207 );
208
209 header.recovery_required = false;
210 header.two_phase_commit = true;
211 storage
212 .write(0, DB_HEADER_SIZE, true)?
213 .mem_mut()
214 .copy_from_slice(&header.to_bytes(false));
215 allocators.flush_to(tracker_page, layout, &storage)?;
216
217 storage.flush(false)?;
218 storage
221 .write(0, DB_HEADER_SIZE, true)?
222 .mem_mut()
223 .copy_from_slice(&header.to_bytes(true));
224 storage.flush(false)?;
225 }
226 let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
227 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
228
229 assert_eq!(header.page_size() as usize, page_size);
230 assert!(storage.raw_file_len()? >= header.layout().len());
231 let needs_recovery =
232 header.recovery_required || header.layout().len() != storage.raw_file_len()?;
233 if needs_recovery {
234 let layout = header.layout();
235 let region_max_pages = layout.full_region_layout().num_pages();
236 let region_header_pages = layout.full_region_layout().get_header_pages();
237 header.set_layout(DatabaseLayout::recalculate(
238 storage.raw_file_len()?,
239 region_header_pages,
240 region_max_pages,
241 page_size.try_into().unwrap(),
242 ));
243 header.pick_primary_for_repair(repair_info)?;
244 assert!(!repair_info.invalid_magic_number);
245 storage
246 .write(0, DB_HEADER_SIZE, true)?
247 .mem_mut()
248 .copy_from_slice(&header.to_bytes(true));
249 storage.flush(false)?;
250 }
251
252 let layout = header.layout();
253 assert_eq!(layout.len(), storage.raw_file_len()?);
254 let region_size = layout.full_region_layout().len();
255 let region_header_size = layout.full_region_layout().data_section().start;
256
257 let state = InMemoryState::from_bytes(header, &storage)?;
258
259 assert!(page_size >= DB_HEADER_SIZE);
260
261 Ok(Self {
262 allocated_since_commit: Mutex::new(HashSet::new()),
263 needs_recovery: AtomicBool::new(needs_recovery),
264 storage,
265 state: Mutex::new(state),
266 #[cfg(debug_assertions)]
267 open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
268 #[cfg(debug_assertions)]
269 read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
270 read_from_secondary: AtomicBool::new(false),
271 page_size: page_size.try_into().unwrap(),
272 region_size,
273 region_header_with_padding_size: region_header_size,
274 })
275 }
276
277 pub(crate) fn cache_stats(&self) -> CacheStats {
278 self.storage.cache_stats()
279 }
280
281 pub(crate) fn check_io_errors(&self) -> Result {
282 self.storage.check_io_errors()
283 }
284
285 #[cfg(any(test, fuzzing))]
286 pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
287 self.state.lock().unwrap().allocators.all_allocated()
288 }
289
290 #[cfg(any(test, fuzzing))]
291 pub(crate) fn tracker_page(&self) -> PageNumber {
292 self.state.lock().unwrap().header.region_tracker()
293 }
294
295 pub(crate) fn clear_read_cache(&self) {
296 self.storage.invalidate_cache_all();
297 }
298
299 pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
300 assert!(self.allocated_since_commit.lock().unwrap().is_empty());
301
302 self.storage.flush(false)?;
303 self.storage.invalidate_cache_all();
304
305 let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
306 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
307 let mut was_clean = true;
310 if header.recovery_required {
311 if !header.pick_primary_for_repair(repair_info)? {
312 was_clean = false;
313 }
314 if repair_info.invalid_magic_number {
315 return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
316 }
317 self.storage
318 .write(0, DB_HEADER_SIZE, true)?
319 .mem_mut()
320 .copy_from_slice(&header.to_bytes(true));
321 self.storage.flush(false)?;
322 }
323
324 self.needs_recovery
325 .store(header.recovery_required, Ordering::Release);
326 self.state.lock().unwrap().header = header;
327
328 Ok(was_clean)
329 }
330
331 pub(crate) fn begin_writable(&self) -> Result {
332 let mut state = self.state.lock().unwrap();
333 assert!(!state.header.recovery_required);
334 state.header.recovery_required = true;
335 self.write_header(&state.header)?;
336 self.storage.flush(false)
337 }
338
339 pub(crate) fn needs_repair(&self) -> Result<bool> {
340 Ok(self.state.lock().unwrap().header.recovery_required)
341 }
342
343 pub(crate) fn used_two_phase_commit(&self) -> bool {
344 self.state.lock().unwrap().header.two_phase_commit
345 }
346
347 pub(crate) fn allocator_hash(&self) -> u128 {
348 self.state.lock().unwrap().allocators.xxh3_hash()
349 }
350
351 pub(crate) fn storage_failure(&self) -> bool {
353 self.needs_recovery.load(Ordering::Acquire)
354 }
355
356 pub(crate) fn repair_primary_corrupted(&self) {
357 let mut state = self.state.lock().unwrap();
358 state.header.swap_primary_slot();
359 }
360
361 pub(crate) fn begin_repair(&self) -> Result<()> {
362 let mut state = self.state.lock().unwrap();
363 state.allocators = Allocators::new(state.header.layout());
364
365 Ok(())
366 }
367
368 pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
369 let mut state = self.state.lock().unwrap();
370 let region_index = page_number.region;
371 let allocator = state.get_region_mut(region_index);
372 allocator.record_alloc(page_number.page_index, page_number.page_order);
373 }
374
375 fn write_header(&self, header: &DatabaseHeader) -> Result {
376 self.storage
377 .write(0, DB_HEADER_SIZE, true)?
378 .mem_mut()
379 .copy_from_slice(&header.to_bytes(true));
380
381 Ok(())
382 }
383
384 pub(crate) fn end_repair(&self) -> Result<()> {
385 self.allocate_region_tracker_page()?;
386
387 let mut state = self.state.lock().unwrap();
388 let tracker_page = state.header.region_tracker();
389 state
390 .allocators
391 .flush_to(tracker_page, state.header.layout(), &self.storage)?;
392
393 state.header.recovery_required = false;
394 self.write_header(&state.header)?;
395 let result = self.storage.flush(false);
396 self.needs_recovery.store(false, Ordering::Release);
397
398 result
399 }
400
401 pub(crate) fn reserve_allocator_state(
402 &self,
403 tree: &mut AllocatorStateTree,
404 transaction_id: TransactionId,
405 ) -> Result<u32> {
406 let state = self.state.lock().unwrap();
407 let layout = state.header.layout();
408 let num_regions = layout.num_regions();
409 let region_header_len = layout.full_region_layout().get_header_pages()
410 * layout.full_region_layout().page_size();
411 let region_tracker_len = state.allocators.region_tracker.to_vec().len();
412 drop(state);
413
414 for i in 0..num_regions {
415 tree.insert(
416 &AllocatorStateKey::Region(i),
417 &vec![0; region_header_len as usize].as_ref(),
418 )?;
419 }
420
421 tree.insert(
422 &AllocatorStateKey::RegionTracker,
423 &vec![0; region_tracker_len].as_ref(),
424 )?;
425
426 tree.insert(
427 &AllocatorStateKey::TransactionId,
428 &transaction_id.raw_id().to_le_bytes().as_ref(),
429 )?;
430
431 Ok(num_regions)
432 }
433
434 pub(crate) fn try_save_allocator_state(
436 &self,
437 tree: &mut AllocatorStateTree,
438 num_regions: u32,
439 ) -> Result<bool> {
440 let state = self.state.lock().unwrap();
442 if num_regions != state.header.layout().num_regions() {
443 return Ok(false);
444 }
445
446 let tracker_page = state.header.region_tracker();
449 drop(state);
450 self.free(tracker_page);
451
452 let result = self.try_save_allocator_state_inner(tree, num_regions);
453
454 self.mark_page_allocated(tracker_page);
456
457 result
458 }
459
460 fn try_save_allocator_state_inner(
461 &self,
462 tree: &mut AllocatorStateTree,
463 num_regions: u32,
464 ) -> Result<bool> {
465 for i in 0..num_regions {
466 let region_bytes =
467 &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
468 tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?;
469 }
470
471 let region_tracker_bytes = self
472 .state
473 .lock()
474 .unwrap()
475 .allocators
476 .region_tracker
477 .to_vec();
478 tree.insert_inplace(
479 &AllocatorStateKey::RegionTracker,
480 ®ion_tracker_bytes.as_ref(),
481 )?;
482
483 Ok(true)
484 }
485
486 pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
488 let transaction_id = TransactionId::new(u64::from_le_bytes(
496 tree.get(&AllocatorStateKey::TransactionId)?
497 .unwrap()
498 .value()
499 .try_into()
500 .unwrap(),
501 ));
502
503 Ok(transaction_id == self.get_last_committed_transaction_id()?)
504 }
505
506 pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
507 assert!(self.is_valid_allocator_state(tree)?);
508
509 let mut region_allocators = vec![];
511 for region in
512 tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
513 {
514 region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
515 }
516
517 let region_tracker = RegionTracker::from_page(
518 tree.get(&AllocatorStateKey::RegionTracker)?
519 .unwrap()
520 .value(),
521 );
522
523 let mut state = self.state.lock().unwrap();
524 state.allocators = Allocators {
525 region_tracker,
526 region_allocators,
527 };
528
529 let layout = state.header.layout();
531 state.allocators.resize_to(layout);
532 drop(state);
533
534 self.allocate_region_tracker_page()?;
536
537 self.state.lock().unwrap().header.recovery_required = false;
538 self.needs_recovery.store(false, Ordering::Release);
539
540 Ok(())
541 }
542
543 pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
544 let state = self.state.lock().unwrap();
545 let allocator = state.get_region(page.region);
546
547 allocator.is_allocated(page.page_index, page.page_order)
548 }
549
550 fn allocate_region_tracker_page(&self) -> Result {
554 let mut state = self.state.lock().unwrap();
555 let tracker_len = state.allocators.region_tracker.to_vec().len();
556 let tracker_page = state.header.region_tracker();
557
558 let allocator = state.get_region_mut(tracker_page.region);
559 if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
561 || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
562 {
563 drop(state);
564
565 let new_tracker_page = self
566 .allocate_non_transactional(tracker_len, false)?
567 .get_page_number();
568
569 let mut state = self.state.lock().unwrap();
570 state.header.set_region_tracker(new_tracker_page);
571 self.write_header(&state.header)?;
572 self.storage.flush(false)?;
573 } else {
574 allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
576 drop(state);
577 }
578
579 Ok(())
580 }
581
582 pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
585 let state = self.state.lock().unwrap();
586 let region_tracker_size = state
587 .header
588 .region_tracker()
589 .page_size_bytes(self.page_size);
590 let old_tracker_page = state.header.region_tracker();
591 drop(state);
593 let new_page =
594 self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
595 if new_page.get_page_number().is_before(old_tracker_page) {
596 let mut state = self.state.lock().unwrap();
597 state.header.set_region_tracker(new_page.get_page_number());
598 drop(state);
599 self.free(old_tracker_page);
600 Ok(true)
601 } else {
602 let new_page_number = new_page.get_page_number();
603 drop(new_page);
604 self.free(new_page_number);
605 Ok(false)
606 }
607 }
608
609 pub(crate) fn pages_allocated_since_raw_state(
612 &self,
613 old_states: &[BuddyAllocator],
614 ) -> Vec<PageNumber> {
615 let mut result = vec![];
616 let state = self.state.lock().unwrap();
617
618 for i in 0..state.header.layout().num_regions() {
619 let current_state = state.get_region(i);
620 if let Some(old_state) = old_states.get(i as usize) {
621 current_state.difference(i, old_state, &mut result);
622 } else {
623 current_state.get_allocated_pages(i, &mut result);
625 }
626 }
627
628 result.retain(|x| *x != state.header.region_tracker());
631
632 result
633 }
634
635 pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
636 let state = self.state.lock().unwrap();
637
638 let mut regional_allocators = vec![];
639 for i in 0..state.header.layout().num_regions() {
640 regional_allocators.push(state.get_region(i).make_state_for_savepoint());
641 }
642
643 regional_allocators
644 }
645
646 #[allow(clippy::too_many_arguments)]
648 pub(crate) fn commit(
649 &self,
650 data_root: Option<BtreeHeader>,
651 system_root: Option<BtreeHeader>,
652 freed_root: Option<BtreeHeader>,
653 transaction_id: TransactionId,
654 eventual: bool,
655 two_phase: bool,
656 ) -> Result {
657 let result = self.commit_inner(
658 data_root,
659 system_root,
660 freed_root,
661 transaction_id,
662 eventual,
663 two_phase,
664 );
665 if result.is_err() {
666 self.needs_recovery.store(true, Ordering::Release);
667 }
668 result
669 }
670
671 #[allow(clippy::too_many_arguments)]
672 fn commit_inner(
673 &self,
674 data_root: Option<BtreeHeader>,
675 system_root: Option<BtreeHeader>,
676 freed_root: Option<BtreeHeader>,
677 transaction_id: TransactionId,
678 eventual: bool,
679 two_phase: bool,
680 ) -> Result {
681 #[cfg(debug_assertions)]
685 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
686 assert!(!self.needs_recovery.load(Ordering::Acquire));
687
688 let mut state = self.state.lock().unwrap();
689 let shrunk = Self::try_shrink(&mut state)?;
691 let mut header = state.header.clone();
693 drop(state);
694
695 let old_transaction_id = header.secondary_slot().transaction_id;
696 let secondary = header.secondary_slot_mut();
697 secondary.transaction_id = transaction_id;
698 secondary.user_root = data_root;
699 secondary.system_root = system_root;
700 secondary.freed_root = freed_root;
701
702 self.write_header(&header)?;
703
704 if two_phase {
706 self.storage.flush(eventual)?;
707 }
708
709 header.swap_primary_slot();
712 header.two_phase_commit = two_phase;
713
714 self.write_header(&header)?;
716 self.storage.flush(eventual)?;
717
718 if shrunk {
719 let result = self.storage.resize(header.layout().len());
720 if result.is_err() {
721 self.needs_recovery.store(true, Ordering::Release);
724 return result;
725 }
726 }
727 self.allocated_since_commit.lock().unwrap().clear();
728
729 let mut state = self.state.lock().unwrap();
730 assert_eq!(
731 state.header.secondary_slot().transaction_id,
732 old_transaction_id
733 );
734 state.header = header;
735 self.read_from_secondary.store(false, Ordering::Release);
736 drop(state);
739
740 Ok(())
741 }
742
743 pub(crate) fn non_durable_commit(
745 &self,
746 data_root: Option<BtreeHeader>,
747 system_root: Option<BtreeHeader>,
748 freed_root: Option<BtreeHeader>,
749 transaction_id: TransactionId,
750 ) -> Result {
751 #[cfg(debug_assertions)]
755 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
756 assert!(!self.needs_recovery.load(Ordering::Acquire));
757
758 self.allocated_since_commit.lock().unwrap().clear();
759 self.storage.write_barrier()?;
760
761 let mut state = self.state.lock().unwrap();
762 let secondary = state.header.secondary_slot_mut();
763 secondary.transaction_id = transaction_id;
764 secondary.user_root = data_root;
765 secondary.system_root = system_root;
766 secondary.freed_root = freed_root;
767
768 self.read_from_secondary.store(true, Ordering::Release);
770
771 Ok(())
772 }
773
774 pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
775 let result = self.rollback_uncommitted_writes_inner();
776 if result.is_err() {
777 self.needs_recovery.store(true, Ordering::Release);
778 }
779 result
780 }
781
782 fn rollback_uncommitted_writes_inner(&self) -> Result {
783 #[cfg(debug_assertions)]
784 {
785 let dirty_pages = self.open_dirty_pages.lock().unwrap();
786 debug_assert!(
787 dirty_pages.is_empty(),
788 "Dirty pages outstanding: {dirty_pages:?}"
789 );
790 }
791 assert!(!self.needs_recovery.load(Ordering::Acquire));
792 let mut state = self.state.lock().unwrap();
793 let mut guard = self.allocated_since_commit.lock().unwrap();
794 for page_number in guard.iter() {
795 let region_index = page_number.region;
796 state
797 .get_region_tracker_mut()
798 .mark_free(page_number.page_order, region_index);
799 state
800 .get_region_mut(region_index)
801 .free(page_number.page_index, page_number.page_order);
802
803 let address = page_number.address_range(
804 self.page_size.into(),
805 self.region_size,
806 self.region_header_with_padding_size,
807 self.page_size,
808 );
809 let len: usize = (address.end - address.start).try_into().unwrap();
810 self.storage.invalidate_cache(address.start, len);
811 self.storage.cancel_pending_write(address.start, len);
812 }
813 guard.clear();
814
815 Ok(())
816 }
817
818 pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
820 self.get_page_extended(page_number, PageHint::None)
821 }
822
823 pub(crate) fn get_page_extended(
824 &self,
825 page_number: PageNumber,
826 hint: PageHint,
827 ) -> Result<PageImpl> {
828 let range = page_number.address_range(
829 self.page_size.into(),
830 self.region_size,
831 self.region_header_with_padding_size,
832 self.page_size,
833 );
834 let len: usize = (range.end - range.start).try_into().unwrap();
835 let mem = self.storage.read(range.start, len, hint)?;
836
837 #[cfg(debug_assertions)]
839 {
840 let dirty_pages = self.open_dirty_pages.lock().unwrap();
841 debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
842 *(self
843 .read_page_ref_counts
844 .lock()
845 .unwrap()
846 .entry(page_number)
847 .or_default()) += 1;
848 drop(dirty_pages);
849 }
850
851 Ok(PageImpl {
852 mem,
853 page_number,
854 #[cfg(debug_assertions)]
855 open_pages: self.read_page_ref_counts.clone(),
856 })
857 }
858
859 pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
861 #[cfg(debug_assertions)]
862 {
863 assert!(!self
864 .read_page_ref_counts
865 .lock()
866 .unwrap()
867 .contains_key(&page_number));
868 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
869 }
870
871 let address_range = page_number.address_range(
872 self.page_size.into(),
873 self.region_size,
874 self.region_header_with_padding_size,
875 self.page_size,
876 );
877 let len: usize = (address_range.end - address_range.start)
878 .try_into()
879 .unwrap();
880 let mem = self.storage.write(address_range.start, len, false)?;
881
882 #[cfg(debug_assertions)]
883 {
884 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
885 }
886
887 Ok(PageMut {
888 mem,
889 page_number,
890 #[cfg(debug_assertions)]
891 open_pages: self.open_dirty_pages.clone(),
892 })
893 }
894
895 pub(crate) fn get_version(&self) -> u8 {
896 let state = self.state.lock().unwrap();
897 if self.read_from_secondary.load(Ordering::Acquire) {
898 state.header.secondary_slot().version
899 } else {
900 state.header.primary_slot().version
901 }
902 }
903
904 pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
905 let state = self.state.lock().unwrap();
906 if self.read_from_secondary.load(Ordering::Acquire) {
907 state.header.secondary_slot().user_root
908 } else {
909 state.header.primary_slot().user_root
910 }
911 }
912
913 pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
914 let state = self.state.lock().unwrap();
915 if self.read_from_secondary.load(Ordering::Acquire) {
916 state.header.secondary_slot().system_root
917 } else {
918 state.header.primary_slot().system_root
919 }
920 }
921
922 pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
923 let state = self.state.lock().unwrap();
924 if self.read_from_secondary.load(Ordering::Acquire) {
925 state.header.secondary_slot().freed_root
926 } else {
927 state.header.primary_slot().freed_root
928 }
929 }
930
931 pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
932 let state = self.state.lock().unwrap();
933 if self.read_from_secondary.load(Ordering::Acquire) {
934 Ok(state.header.secondary_slot().transaction_id)
935 } else {
936 Ok(state.header.primary_slot().transaction_id)
937 }
938 }
939
940 pub(crate) fn free(&self, page: PageNumber) {
941 self.allocated_since_commit.lock().unwrap().remove(&page);
942 self.free_helper(page);
943 }
944
945 fn free_helper(&self, page: PageNumber) {
946 let mut state = self.state.lock().unwrap();
947 let region_index = page.region;
948 state
950 .get_region_mut(region_index)
951 .free(page.page_index, page.page_order);
952 state
954 .get_region_tracker_mut()
955 .mark_free(page.page_order, region_index);
956
957 let address_range = page.address_range(
958 self.page_size.into(),
959 self.region_size,
960 self.region_header_with_padding_size,
961 self.page_size,
962 );
963 let len: usize = (address_range.end - address_range.start)
964 .try_into()
965 .unwrap();
966 self.storage.invalidate_cache(address_range.start, len);
967 self.storage.cancel_pending_write(address_range.start, len);
968 }
969
970 pub(crate) fn free_if_uncommitted(&self, page: PageNumber) -> bool {
972 if self.allocated_since_commit.lock().unwrap().remove(&page) {
973 self.free_helper(page);
974 true
975 } else {
976 false
977 }
978 }
979
980 pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
982 self.allocated_since_commit.lock().unwrap().contains(&page)
983 }
984
985 pub(crate) fn allocate_helper(
986 &self,
987 allocation_size: usize,
988 lowest: bool,
989 transactional: bool,
990 ) -> Result<PageMut> {
991 let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size();
992 let required_order = ceil_log2(required_pages);
993
994 let mut state = self.state.lock().unwrap();
995
996 let page_number = if let Some(page_number) =
997 Self::allocate_helper_retry(&mut state, required_order, lowest)?
998 {
999 page_number
1000 } else {
1001 self.grow(&mut state, required_order)?;
1002 Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
1003 };
1004
1005 #[cfg(debug_assertions)]
1006 {
1007 assert!(
1008 !self
1009 .read_page_ref_counts
1010 .lock()
1011 .unwrap()
1012 .contains_key(&page_number),
1013 "Allocated a page that is still referenced! {page_number:?}"
1014 );
1015 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
1016 }
1017
1018 if transactional {
1019 self.allocated_since_commit
1020 .lock()
1021 .unwrap()
1022 .insert(page_number);
1023 }
1024
1025 let address_range = page_number.address_range(
1026 self.page_size.into(),
1027 self.region_size,
1028 self.region_header_with_padding_size,
1029 self.page_size,
1030 );
1031 let len: usize = (address_range.end - address_range.start)
1032 .try_into()
1033 .unwrap();
1034
1035 #[allow(unused_mut)]
1036 let mut mem = self.storage.write(address_range.start, len, true)?;
1037 debug_assert!(mem.mem().len() >= allocation_size);
1038
1039 #[cfg(debug_assertions)]
1040 {
1041 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
1042
1043 mem.mem_mut().fill(0xFF);
1045 }
1046
1047 Ok(PageMut {
1048 mem,
1049 page_number,
1050 #[cfg(debug_assertions)]
1051 open_pages: self.open_dirty_pages.clone(),
1052 })
1053 }
1054
1055 fn allocate_helper_retry(
1056 state: &mut InMemoryState,
1057 required_order: u8,
1058 lowest: bool,
1059 ) -> Result<Option<PageNumber>> {
1060 loop {
1061 let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1062 else {
1063 return Ok(None);
1064 };
1065 let region = state.get_region_mut(candidate_region);
1066 let r = if lowest {
1067 region.alloc_lowest(required_order)
1068 } else {
1069 region.alloc(required_order)
1070 };
1071 if let Some(page) = r {
1072 return Ok(Some(PageNumber::new(
1073 candidate_region,
1074 page,
1075 required_order,
1076 )));
1077 }
1078 state
1080 .get_region_tracker_mut()
1081 .mark_full(required_order, candidate_region);
1082 }
1083 }
1084
1085 fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1086 let layout = state.header.layout();
1087 let last_region_index = layout.num_regions() - 1;
1088 let last_allocator = state.get_region(last_region_index);
1089 let trailing_free = last_allocator.trailing_free_pages();
1090 let last_allocator_len = last_allocator.len();
1091 if trailing_free < last_allocator_len / 2 {
1092 return Ok(false);
1093 }
1094 let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1095 trailing_free
1096 } else {
1097 trailing_free / 2
1098 };
1099
1100 let mut new_layout = layout;
1101 new_layout.reduce_last_region(reduce_by);
1102 state.allocators.resize_to(new_layout);
1103 assert!(new_layout.len() <= layout.len());
1104 state.header.set_layout(new_layout);
1105
1106 Ok(true)
1107 }
1108
1109 fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1110 let layout = state.header.layout();
1111 let required_growth =
1112 2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1113 let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1114 * u64::from(state.header.page_size());
1115 let next_desired_size = if layout.num_full_regions() > 0 {
1116 if let Some(trailing) = layout.trailing_region_layout() {
1117 if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1118 layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1120 } else {
1121 layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1123 }
1124 } else {
1125 layout.usable_bytes() + max_region_size
1127 }
1128 } else {
1129 max(
1130 layout.usable_bytes() * 2,
1131 layout.usable_bytes() + required_growth * 2,
1132 )
1133 };
1134 let new_layout = DatabaseLayout::calculate(
1135 next_desired_size,
1136 state.header.layout().full_region_layout().num_pages(),
1137 self.page_size,
1138 );
1139 assert!(new_layout.len() >= layout.len());
1140
1141 let result = self.storage.resize(new_layout.len());
1142 if result.is_err() {
1143 self.needs_recovery.store(true, Ordering::Release);
1146 return result;
1147 }
1148
1149 state.allocators.resize_to(new_layout);
1150 state.header.set_layout(new_layout);
1151 Ok(())
1152 }
1153
1154 pub(crate) fn allocate(&self, allocation_size: usize) -> Result<PageMut> {
1155 self.allocate_helper(allocation_size, false, true)
1156 }
1157
1158 pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1159 self.allocate_helper(allocation_size, true, true)
1160 }
1161
1162 fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1165 self.allocate_helper(allocation_size, lowest, false)
1166 }
1167
1168 pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1169 let state = self.state.lock().unwrap();
1170 let mut count = 0u64;
1171 for i in 0..state.header.layout().num_regions() {
1172 count += u64::from(state.get_region(i).count_allocated_pages());
1173 }
1174
1175 Ok(count)
1176 }
1177
1178 pub(crate) fn count_free_pages(&self) -> Result<u64> {
1179 let state = self.state.lock().unwrap();
1180 let mut count = 0u64;
1181 for i in 0..state.header.layout().num_regions() {
1182 count += u64::from(state.get_region(i).count_free_pages());
1183 }
1184
1185 Ok(count)
1186 }
1187
1188 pub(crate) fn get_page_size(&self) -> usize {
1189 self.page_size.try_into().unwrap()
1190 }
1191}
1192
1193impl Drop for TransactionalMemory {
1194 fn drop(&mut self) {
1195 if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1196 return;
1197 }
1198
1199 let tracker_page = self.state.lock().unwrap().header.region_tracker();
1201 self.free(tracker_page);
1202 if self.allocate_region_tracker_page().is_err() {
1203 #[cfg(feature = "logging")]
1204 warn!("Failure while flushing allocator state. Repair required at restart.");
1205 return;
1206 }
1207
1208 let mut state = self.state.lock().unwrap();
1209 if state
1210 .allocators
1211 .flush_to(
1212 state.header.region_tracker(),
1213 state.header.layout(),
1214 &self.storage,
1215 )
1216 .is_err()
1217 {
1218 #[cfg(feature = "logging")]
1219 warn!("Failure while flushing allocator state. Repair required at restart.");
1220 return;
1221 }
1222
1223 if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1224 state.header.recovery_required = false;
1225 let _ = self.write_header(&state.header);
1226 let _ = self.storage.flush(false);
1227 }
1228 }
1229}
1230
1231#[cfg(test)]
1232mod test {
1233 use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1234 use crate::{Database, TableDefinition};
1235
1236 #[test]
1238 fn out_of_regions() {
1239 let tmpfile = crate::create_tempfile();
1240 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1241 let page_size = 1024;
1242 let big_value = vec![0u8; 5 * page_size];
1243
1244 let db = Database::builder()
1245 .set_region_size((8 * page_size).try_into().unwrap())
1246 .set_page_size(page_size)
1247 .create(tmpfile.path())
1248 .unwrap();
1249
1250 let txn = db.begin_write().unwrap();
1251 {
1252 let mut table = txn.open_table(table_definition).unwrap();
1253 for i in 0..=INITIAL_REGIONS {
1254 table.insert(&i, big_value.as_slice()).unwrap();
1255 }
1256 }
1257 txn.commit().unwrap();
1258 drop(db);
1259
1260 let mut db = Database::builder()
1261 .set_region_size((8 * page_size).try_into().unwrap())
1262 .set_page_size(page_size)
1263 .open(tmpfile.path())
1264 .unwrap();
1265 assert!(db.check_integrity().unwrap());
1266 }
1267
1268 #[test]
1270 #[cfg(panic = "unwind")]
1271 fn panic() {
1272 let tmpfile = crate::create_tempfile();
1273 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1274
1275 let _ = std::panic::catch_unwind(|| {
1276 let db = Database::create(&tmpfile).unwrap();
1277 let txn = db.begin_write().unwrap();
1278 txn.open_table(table_definition).unwrap();
1279 panic!();
1280 });
1281
1282 let mut db = Database::open(tmpfile).unwrap();
1283 assert!(db.check_integrity().unwrap());
1284 }
1285}