1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{MAX_PAGE_INDEX, PageHint};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DB_HEADER_SIZE, DatabaseHeader, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{PageImpl, PageMut, hash128_with_seed};
11use crate::tree_store::{Page, PageNumber, PageTrackerPolicy};
12use crate::{CacheStats, StorageBackend};
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::io::ErrorKind;
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread;
27
28const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
31pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
33pub(super) const MIN_USABLE_PAGES: u32 = 10;
34const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
35
36pub(super) const INITIAL_REGIONS: u32 = 1000; pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
40pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
42pub(crate) const FILE_FORMAT_VERSION3: u8 = 3;
50
51fn ceil_log2(x: usize) -> u8 {
52 if x.is_power_of_two() {
53 x.trailing_zeros().try_into().unwrap()
54 } else {
55 x.next_power_of_two().trailing_zeros().try_into().unwrap()
56 }
57}
58
59pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
60 hash128_with_seed(data, 0)
61}
62
63struct InMemoryState {
64 header: DatabaseHeader,
65 allocators: Allocators,
66}
67
68impl InMemoryState {
69 fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile, version: u8) -> Result<Self> {
70 let allocators = if header.recovery_required || version >= FILE_FORMAT_VERSION3 {
73 Allocators::new(header.layout())
74 } else {
75 Allocators::from_bytes(&header, file)?
76 };
77 Ok(Self { header, allocators })
78 }
79
80 fn get_region(&self, region: u32) -> &BuddyAllocator {
81 &self.allocators.region_allocators[region as usize]
82 }
83
84 fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
85 &mut self.allocators.region_allocators[region as usize]
86 }
87
88 fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
89 &mut self.allocators.region_tracker
90 }
91}
92
93pub(crate) struct TransactionalMemory {
94 allocated_since_commit: Mutex<HashSet<PageNumber>>,
97 needs_recovery: AtomicBool,
100 storage: PagedCachedFile,
101 state: Mutex<InMemoryState>,
102 #[cfg(debug_assertions)]
104 open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
105 #[cfg(debug_assertions)]
107 read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
108 read_from_secondary: AtomicBool,
110 page_size: u32,
111 region_size: u64,
114 region_header_with_padding_size: u64,
115 file_format: u8,
116}
117
118impl TransactionalMemory {
119 #[allow(clippy::too_many_arguments)]
120 pub(crate) fn new(
121 file: Box<dyn StorageBackend>,
122 allow_initialize: bool,
124 page_size: usize,
125 requested_region_size: Option<u64>,
126 read_cache_size_bytes: usize,
127 write_cache_size_bytes: usize,
128 default_to_file_format_v3: bool,
129 ) -> Result<Self, DatabaseError> {
130 assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
131
132 let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
133 let region_size = min(
134 region_size,
135 (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
136 );
137 assert!(region_size.is_power_of_two());
138
139 let storage = PagedCachedFile::new(
140 file,
141 page_size as u64,
142 read_cache_size_bytes,
143 write_cache_size_bytes,
144 )?;
145
146 let initial_storage_len = storage.raw_file_len()?;
147
148 let magic_number: [u8; MAGICNUMBER.len()] =
149 if initial_storage_len >= MAGICNUMBER.len() as u64 {
150 storage
151 .read_direct(0, MAGICNUMBER.len())?
152 .try_into()
153 .unwrap()
154 } else {
155 [0; MAGICNUMBER.len()]
156 };
157
158 if initial_storage_len > 0 {
159 if magic_number != MAGICNUMBER {
161 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
162 }
163 } else {
164 if !allow_initialize {
166 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
167 }
168 }
169
170 if magic_number != MAGICNUMBER {
171 let region_tracker_required_bytes =
172 RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
173 .to_vec()
174 .len();
175
176 let size: u64 = max(
178 MIN_DESIRED_USABLE_BYTES,
179 page_size as u64 * u64::from(MIN_USABLE_PAGES),
180 );
181 let tracker_space =
182 (page_size * region_tracker_required_bytes.div_ceil(page_size)) as u64;
183 let starting_size = size + tracker_space;
184
185 let layout = DatabaseLayout::calculate(
186 starting_size,
187 (region_size / u64::try_from(page_size).unwrap())
188 .try_into()
189 .unwrap(),
190 page_size.try_into().unwrap(),
191 );
192
193 {
194 let file_len = storage.raw_file_len()?;
195
196 if file_len < layout.len() {
197 storage.resize(layout.len())?;
198 }
199 }
200
201 let mut allocators = Allocators::new(layout);
202
203 let tracker_page = {
205 let tracker_required_pages =
206 allocators.region_tracker.to_vec().len().div_ceil(page_size);
207 let required_order = ceil_log2(tracker_required_pages);
208 let page_number = allocators.region_allocators[0]
209 .alloc(required_order)
210 .unwrap();
211 PageNumber::new(0, page_number, required_order)
212 };
213
214 let file_format = if default_to_file_format_v3 {
215 FILE_FORMAT_VERSION3
216 } else {
217 FILE_FORMAT_VERSION2
218 };
219 let mut header =
220 DatabaseHeader::new(layout, TransactionId::new(0), file_format, tracker_page);
221
222 header.recovery_required = false;
223 header.two_phase_commit = true;
224 storage
225 .write(0, DB_HEADER_SIZE, true)?
226 .mem_mut()
227 .copy_from_slice(&header.to_bytes(false));
228 allocators.flush_to(tracker_page, layout, &storage)?;
229
230 storage.flush(false)?;
231 storage
234 .write(0, DB_HEADER_SIZE, true)?
235 .mem_mut()
236 .copy_from_slice(&header.to_bytes(true));
237 storage.flush(false)?;
238 }
239 let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
240 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
241
242 assert_eq!(header.page_size() as usize, page_size);
243 assert!(storage.raw_file_len()? >= header.layout().len());
244 let needs_recovery =
245 header.recovery_required || header.layout().len() != storage.raw_file_len()?;
246 if needs_recovery {
247 let layout = header.layout();
248 let region_max_pages = layout.full_region_layout().num_pages();
249 let region_header_pages = layout.full_region_layout().get_header_pages();
250 header.set_layout(DatabaseLayout::recalculate(
251 storage.raw_file_len()?,
252 region_header_pages,
253 region_max_pages,
254 page_size.try_into().unwrap(),
255 ));
256 header.pick_primary_for_repair(repair_info)?;
257 assert!(!repair_info.invalid_magic_number);
258 storage
259 .write(0, DB_HEADER_SIZE, true)?
260 .mem_mut()
261 .copy_from_slice(&header.to_bytes(true));
262 storage.flush(false)?;
263 }
264
265 let layout = header.layout();
266 assert_eq!(layout.len(), storage.raw_file_len()?);
267 let region_size = layout.full_region_layout().len();
268 let region_header_size = layout.full_region_layout().data_section().start;
269 let version = header.primary_slot().version;
270 let state = InMemoryState::from_bytes(header, &storage, version)?;
271
272 assert!(page_size >= DB_HEADER_SIZE);
273
274 Ok(Self {
275 allocated_since_commit: Mutex::new(HashSet::new()),
276 needs_recovery: AtomicBool::new(needs_recovery),
277 storage,
278 state: Mutex::new(state),
279 #[cfg(debug_assertions)]
280 open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
281 #[cfg(debug_assertions)]
282 read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
283 read_from_secondary: AtomicBool::new(false),
284 page_size: page_size.try_into().unwrap(),
285 region_size,
286 region_header_with_padding_size: region_header_size,
287 file_format: version,
288 })
289 }
290
291 pub(crate) fn upgrade_to_v3(&mut self) -> Result {
292 let data_root = self.get_data_root();
293 let system_root = self.get_system_root();
294 let transaction_id = self.get_last_committed_transaction_id()?;
295 assert!(self.get_freed_root().is_none());
296
297 let tracker_page = self.tracker_page();
298 self.file_format = FILE_FORMAT_VERSION3;
299 for _ in 0..2 {
301 match self.commit(data_root, system_root, None, transaction_id, false, true) {
302 Ok(()) => {}
303 Err(err) => {
304 self.storage.set_irrecoverable_io_error();
305 return Err(err);
306 }
307 }
308 }
309 self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
310
311 Ok(())
312 }
313
314 pub(crate) fn file_format_v3(&self) -> bool {
315 self.file_format == FILE_FORMAT_VERSION3
316 }
317
318 pub(crate) fn cache_stats(&self) -> CacheStats {
319 self.storage.cache_stats()
320 }
321
322 pub(crate) fn check_io_errors(&self) -> Result {
323 self.storage.check_io_errors()
324 }
325
326 #[cfg(any(test, fuzzing))]
327 pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
328 self.state.lock().unwrap().allocators.all_allocated()
329 }
330
331 pub(crate) fn tracker_page(&self) -> PageNumber {
332 self.state.lock().unwrap().header.region_tracker()
333 }
334
335 pub(crate) fn clear_read_cache(&self) {
336 self.storage.invalidate_cache_all();
337 }
338
339 pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
340 assert!(self.allocated_since_commit.lock().unwrap().is_empty());
341
342 self.storage.flush(false)?;
343 self.storage.invalidate_cache_all();
344
345 let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
346 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
347 let mut was_clean = true;
350 if header.recovery_required {
351 if !header.pick_primary_for_repair(repair_info)? {
352 was_clean = false;
353 }
354 if repair_info.invalid_magic_number {
355 return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
356 }
357 self.storage
358 .write(0, DB_HEADER_SIZE, true)?
359 .mem_mut()
360 .copy_from_slice(&header.to_bytes(true));
361 self.storage.flush(false)?;
362 }
363
364 self.needs_recovery
365 .store(header.recovery_required, Ordering::Release);
366 self.state.lock().unwrap().header = header;
367
368 Ok(was_clean)
369 }
370
371 pub(crate) fn begin_writable(&self) -> Result {
372 let mut state = self.state.lock().unwrap();
373 assert!(!state.header.recovery_required);
374 state.header.recovery_required = true;
375 self.write_header(&state.header)?;
376 self.storage.flush(false)
377 }
378
379 pub(crate) fn needs_repair(&self) -> Result<bool> {
380 Ok(self.state.lock().unwrap().header.recovery_required)
381 }
382
383 pub(crate) fn used_two_phase_commit(&self) -> bool {
384 self.state.lock().unwrap().header.two_phase_commit
385 }
386
387 pub(crate) fn allocator_hash(&self) -> u128 {
388 self.state.lock().unwrap().allocators.xxh3_hash()
389 }
390
391 pub(crate) fn storage_failure(&self) -> bool {
393 self.needs_recovery.load(Ordering::Acquire)
394 }
395
396 pub(crate) fn repair_primary_corrupted(&self) {
397 let mut state = self.state.lock().unwrap();
398 state.header.swap_primary_slot();
399 }
400
401 pub(crate) fn begin_repair(&self) -> Result<()> {
402 let mut state = self.state.lock().unwrap();
403 state.allocators = Allocators::new(state.header.layout());
404
405 Ok(())
406 }
407
408 pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
409 let mut state = self.state.lock().unwrap();
410 let region_index = page_number.region;
411 let allocator = state.get_region_mut(region_index);
412 allocator.record_alloc(page_number.page_index, page_number.page_order);
413 }
414
415 fn write_header(&self, header: &DatabaseHeader) -> Result {
416 self.storage
417 .write(0, DB_HEADER_SIZE, true)?
418 .mem_mut()
419 .copy_from_slice(&header.to_bytes(true));
420
421 Ok(())
422 }
423
424 pub(crate) fn end_repair(&self) -> Result<()> {
425 if !self.file_format_v3() {
426 self.allocate_region_tracker_page()?;
427 }
428
429 let mut state = self.state.lock().unwrap();
430 if !self.file_format_v3() {
431 let tracker_page = state.header.region_tracker();
432 state
433 .allocators
434 .flush_to(tracker_page, state.header.layout(), &self.storage)?;
435 }
436
437 state.header.recovery_required = false;
438 self.write_header(&state.header)?;
439 let result = self.storage.flush(false);
440 self.needs_recovery.store(false, Ordering::Release);
441
442 result
443 }
444
445 pub(crate) fn reserve_allocator_state(
446 &self,
447 tree: &mut AllocatorStateTree,
448 transaction_id: TransactionId,
449 ) -> Result<u32> {
450 let state = self.state.lock().unwrap();
451 let layout = state.header.layout();
452 let num_regions = layout.num_regions();
453 let region_tracker_len = state.allocators.region_tracker.to_vec().len();
454 let region_lens: Vec<usize> = state
455 .allocators
456 .region_allocators
457 .iter()
458 .map(|x| x.to_vec().len())
459 .collect();
460 drop(state);
461
462 for i in 0..num_regions {
463 let region_bytes_len = region_lens[i as usize];
464 tree.insert(
465 &AllocatorStateKey::Region(i),
466 &vec![0; region_bytes_len].as_ref(),
467 )?;
468 }
469
470 tree.insert(
471 &AllocatorStateKey::RegionTracker,
472 &vec![0; region_tracker_len].as_ref(),
473 )?;
474
475 tree.insert(
476 &AllocatorStateKey::TransactionId,
477 &transaction_id.raw_id().to_le_bytes().as_ref(),
478 )?;
479
480 Ok(num_regions)
481 }
482
483 pub(crate) fn try_save_allocator_state(
485 &self,
486 tree: &mut AllocatorStateTree,
487 num_regions: u32,
488 ) -> Result<bool> {
489 let state = self.state.lock().unwrap();
491 if num_regions != state.header.layout().num_regions() {
492 return Ok(false);
493 }
494
495 let tracker_page = if !self.file_format_v3() {
498 let tracker_page = state.header.region_tracker();
499 drop(state);
500 self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
501 Some(tracker_page)
502 } else {
503 drop(state);
504 None
505 };
506
507 let result = self.try_save_allocator_state_inner(tree, num_regions);
508
509 if let Some(tracker_page) = tracker_page {
511 self.mark_page_allocated(tracker_page);
512 }
513
514 result
515 }
516
517 fn try_save_allocator_state_inner(
518 &self,
519 tree: &mut AllocatorStateTree,
520 num_regions: u32,
521 ) -> Result<bool> {
522 for i in 0..num_regions {
523 let region_bytes =
524 &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
525 tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?;
526 }
527
528 let region_tracker_bytes = self
529 .state
530 .lock()
531 .unwrap()
532 .allocators
533 .region_tracker
534 .to_vec();
535 tree.insert_inplace(
536 &AllocatorStateKey::RegionTracker,
537 ®ion_tracker_bytes.as_ref(),
538 )?;
539
540 Ok(true)
541 }
542
543 pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
545 let transaction_id = TransactionId::new(u64::from_le_bytes(
553 tree.get(&AllocatorStateKey::TransactionId)?
554 .unwrap()
555 .value()
556 .try_into()
557 .unwrap(),
558 ));
559
560 Ok(transaction_id == self.get_last_committed_transaction_id()?)
561 }
562
563 pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
564 assert!(self.is_valid_allocator_state(tree)?);
565
566 let mut region_allocators = vec![];
568 for region in
569 tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
570 {
571 region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
572 }
573
574 let region_tracker = RegionTracker::from_page(
575 tree.get(&AllocatorStateKey::RegionTracker)?
576 .unwrap()
577 .value(),
578 );
579
580 let mut state = self.state.lock().unwrap();
581 state.allocators = Allocators {
582 region_tracker,
583 region_allocators,
584 };
585
586 let layout = state.header.layout();
588 state.allocators.resize_to(layout);
589 drop(state);
590
591 if !self.file_format_v3() {
593 self.allocate_region_tracker_page()?;
594 }
595
596 self.state.lock().unwrap().header.recovery_required = false;
597 self.needs_recovery.store(false, Ordering::Release);
598
599 Ok(())
600 }
601
602 pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
603 let state = self.state.lock().unwrap();
604 let allocator = state.get_region(page.region);
605
606 allocator.is_allocated(page.page_index, page.page_order)
607 }
608
609 fn allocate_region_tracker_page(&self) -> Result {
613 let mut state = self.state.lock().unwrap();
614 let tracker_len = state.allocators.region_tracker.to_vec().len();
615 let tracker_page = state.header.region_tracker();
616
617 let allocator = state.get_region_mut(tracker_page.region);
618 if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
620 || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
621 {
622 drop(state);
623
624 let new_tracker_page = self
625 .allocate_non_transactional(tracker_len, false)?
626 .get_page_number();
627
628 let mut state = self.state.lock().unwrap();
629 state.header.set_region_tracker(new_tracker_page);
630 self.write_header(&state.header)?;
631 self.storage.flush(false)?;
632 } else {
633 allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
635 drop(state);
636 }
637
638 Ok(())
639 }
640
641 pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
644 let state = self.state.lock().unwrap();
645 let region_tracker_size = state
646 .header
647 .region_tracker()
648 .page_size_bytes(self.page_size);
649 let old_tracker_page = state.header.region_tracker();
650 drop(state);
652 let new_page =
653 self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
654 if new_page.get_page_number().is_before(old_tracker_page) {
655 let mut state = self.state.lock().unwrap();
656 state.header.set_region_tracker(new_page.get_page_number());
657 drop(state);
658 self.free(old_tracker_page, &mut PageTrackerPolicy::Ignore);
659 Ok(true)
660 } else {
661 let new_page_number = new_page.get_page_number();
662 drop(new_page);
663 self.free(new_page_number, &mut PageTrackerPolicy::Ignore);
664 Ok(false)
665 }
666 }
667
668 pub(crate) fn pages_allocated_since_raw_state(
671 &self,
672 old_states: &[BuddyAllocator],
673 ) -> Vec<PageNumber> {
674 let mut result = vec![];
675 let state = self.state.lock().unwrap();
676
677 for i in 0..state.header.layout().num_regions() {
678 let current_state = state.get_region(i);
679 if let Some(old_state) = old_states.get(i as usize) {
680 current_state.difference(i, old_state, &mut result);
681 } else {
682 current_state.get_allocated_pages(i, &mut result);
684 }
685 }
686
687 result.retain(|x| *x != state.header.region_tracker());
690
691 result
692 }
693
694 pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
695 let state = self.state.lock().unwrap();
696
697 let mut regional_allocators = vec![];
698 for i in 0..state.header.layout().num_regions() {
699 regional_allocators.push(state.get_region(i).make_state_for_savepoint());
700 }
701
702 regional_allocators
703 }
704
705 #[allow(clippy::too_many_arguments)]
707 pub(crate) fn commit(
708 &self,
709 data_root: Option<BtreeHeader>,
710 system_root: Option<BtreeHeader>,
711 freed_root: Option<BtreeHeader>,
712 transaction_id: TransactionId,
713 eventual: bool,
714 two_phase: bool,
715 ) -> Result {
716 let result = self.commit_inner(
717 data_root,
718 system_root,
719 freed_root,
720 transaction_id,
721 eventual,
722 two_phase,
723 );
724 if result.is_err() {
725 self.needs_recovery.store(true, Ordering::Release);
726 }
727 result
728 }
729
730 #[allow(clippy::too_many_arguments)]
731 fn commit_inner(
732 &self,
733 data_root: Option<BtreeHeader>,
734 system_root: Option<BtreeHeader>,
735 freed_root: Option<BtreeHeader>,
736 transaction_id: TransactionId,
737 eventual: bool,
738 two_phase: bool,
739 ) -> Result {
740 #[cfg(debug_assertions)]
744 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
745 assert!(!self.needs_recovery.load(Ordering::Acquire));
746
747 let mut state = self.state.lock().unwrap();
748 let shrunk = Self::try_shrink(&mut state)?;
750 let mut header = state.header.clone();
752 drop(state);
753
754 let old_transaction_id = header.secondary_slot().transaction_id;
755 let secondary = header.secondary_slot_mut();
756 secondary.transaction_id = transaction_id;
757 secondary.user_root = data_root;
758 secondary.system_root = system_root;
759 secondary.freed_root = freed_root;
760 secondary.version = self.file_format;
761
762 self.write_header(&header)?;
763
764 if two_phase {
766 self.storage.flush(eventual)?;
767 }
768
769 header.swap_primary_slot();
772 header.two_phase_commit = two_phase;
773
774 self.write_header(&header)?;
776 self.storage.flush(eventual)?;
777
778 if shrunk {
779 let result = self.storage.resize(header.layout().len());
780 if result.is_err() {
781 self.needs_recovery.store(true, Ordering::Release);
784 return result;
785 }
786 }
787 self.allocated_since_commit.lock().unwrap().clear();
788
789 let mut state = self.state.lock().unwrap();
790 assert_eq!(
791 state.header.secondary_slot().transaction_id,
792 old_transaction_id
793 );
794 state.header = header;
795 self.read_from_secondary.store(false, Ordering::Release);
796 drop(state);
799
800 Ok(())
801 }
802
803 pub(crate) fn non_durable_commit(
805 &self,
806 data_root: Option<BtreeHeader>,
807 system_root: Option<BtreeHeader>,
808 freed_root: Option<BtreeHeader>,
809 transaction_id: TransactionId,
810 ) -> Result {
811 #[cfg(debug_assertions)]
815 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
816 assert!(!self.needs_recovery.load(Ordering::Acquire));
817
818 self.allocated_since_commit.lock().unwrap().clear();
819 self.storage.write_barrier()?;
820
821 let mut state = self.state.lock().unwrap();
822 let secondary = state.header.secondary_slot_mut();
823 secondary.transaction_id = transaction_id;
824 secondary.user_root = data_root;
825 secondary.system_root = system_root;
826 secondary.freed_root = freed_root;
827
828 self.read_from_secondary.store(true, Ordering::Release);
830
831 Ok(())
832 }
833
834 pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
835 let result = self.rollback_uncommitted_writes_inner();
836 if result.is_err() {
837 self.needs_recovery.store(true, Ordering::Release);
838 }
839 result
840 }
841
842 fn rollback_uncommitted_writes_inner(&self) -> Result {
843 #[cfg(debug_assertions)]
844 {
845 let dirty_pages = self.open_dirty_pages.lock().unwrap();
846 debug_assert!(
847 dirty_pages.is_empty(),
848 "Dirty pages outstanding: {dirty_pages:?}"
849 );
850 }
851 assert!(!self.needs_recovery.load(Ordering::Acquire));
852 let mut state = self.state.lock().unwrap();
853 let mut guard = self.allocated_since_commit.lock().unwrap();
854 for page_number in guard.iter() {
855 let region_index = page_number.region;
856 state
857 .get_region_tracker_mut()
858 .mark_free(page_number.page_order, region_index);
859 state
860 .get_region_mut(region_index)
861 .free(page_number.page_index, page_number.page_order);
862
863 let address = page_number.address_range(
864 self.page_size.into(),
865 self.region_size,
866 self.region_header_with_padding_size,
867 self.page_size,
868 );
869 let len: usize = (address.end - address.start).try_into().unwrap();
870 self.storage.invalidate_cache(address.start, len);
871 self.storage.cancel_pending_write(address.start, len);
872 }
873 guard.clear();
874
875 Ok(())
876 }
877
878 pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
880 self.get_page_extended(page_number, PageHint::None)
881 }
882
883 pub(crate) fn get_page_extended(
884 &self,
885 page_number: PageNumber,
886 hint: PageHint,
887 ) -> Result<PageImpl> {
888 let range = page_number.address_range(
889 self.page_size.into(),
890 self.region_size,
891 self.region_header_with_padding_size,
892 self.page_size,
893 );
894 let len: usize = (range.end - range.start).try_into().unwrap();
895 let mem = self.storage.read(range.start, len, hint)?;
896
897 #[cfg(debug_assertions)]
899 {
900 let dirty_pages = self.open_dirty_pages.lock().unwrap();
901 debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
902 *(self
903 .read_page_ref_counts
904 .lock()
905 .unwrap()
906 .entry(page_number)
907 .or_default()) += 1;
908 drop(dirty_pages);
909 }
910
911 Ok(PageImpl {
912 mem,
913 page_number,
914 #[cfg(debug_assertions)]
915 open_pages: self.read_page_ref_counts.clone(),
916 })
917 }
918
919 pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
921 #[cfg(debug_assertions)]
922 {
923 assert!(
924 !self
925 .read_page_ref_counts
926 .lock()
927 .unwrap()
928 .contains_key(&page_number)
929 );
930 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
931 }
932
933 let address_range = page_number.address_range(
934 self.page_size.into(),
935 self.region_size,
936 self.region_header_with_padding_size,
937 self.page_size,
938 );
939 let len: usize = (address_range.end - address_range.start)
940 .try_into()
941 .unwrap();
942 let mem = self.storage.write(address_range.start, len, false)?;
943
944 #[cfg(debug_assertions)]
945 {
946 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
947 }
948
949 Ok(PageMut {
950 mem,
951 page_number,
952 #[cfg(debug_assertions)]
953 open_pages: self.open_dirty_pages.clone(),
954 })
955 }
956
957 pub(crate) fn get_version(&self) -> u8 {
958 let state = self.state.lock().unwrap();
959 if self.read_from_secondary.load(Ordering::Acquire) {
960 state.header.secondary_slot().version
961 } else {
962 state.header.primary_slot().version
963 }
964 }
965
966 pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
967 let state = self.state.lock().unwrap();
968 if self.read_from_secondary.load(Ordering::Acquire) {
969 state.header.secondary_slot().user_root
970 } else {
971 state.header.primary_slot().user_root
972 }
973 }
974
975 pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
976 let state = self.state.lock().unwrap();
977 if self.read_from_secondary.load(Ordering::Acquire) {
978 state.header.secondary_slot().system_root
979 } else {
980 state.header.primary_slot().system_root
981 }
982 }
983
984 pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
985 let state = self.state.lock().unwrap();
986 if self.read_from_secondary.load(Ordering::Acquire) {
987 state.header.secondary_slot().freed_root
988 } else {
989 state.header.primary_slot().freed_root
990 }
991 }
992
993 pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
994 let state = self.state.lock()?;
995 if self.read_from_secondary.load(Ordering::Acquire) {
996 Ok(state.header.secondary_slot().transaction_id)
997 } else {
998 Ok(state.header.primary_slot().transaction_id)
999 }
1000 }
1001
1002 pub(crate) fn free(&self, page: PageNumber, allocated: &mut PageTrackerPolicy) {
1003 self.allocated_since_commit.lock().unwrap().remove(&page);
1004 self.free_helper(page, allocated);
1005 }
1006
1007 fn free_helper(&self, page: PageNumber, allocated: &mut PageTrackerPolicy) {
1008 allocated.remove(page);
1009 let mut state = self.state.lock().unwrap();
1010 let region_index = page.region;
1011 state
1013 .get_region_mut(region_index)
1014 .free(page.page_index, page.page_order);
1015 state
1017 .get_region_tracker_mut()
1018 .mark_free(page.page_order, region_index);
1019
1020 let address_range = page.address_range(
1021 self.page_size.into(),
1022 self.region_size,
1023 self.region_header_with_padding_size,
1024 self.page_size,
1025 );
1026 let len: usize = (address_range.end - address_range.start)
1027 .try_into()
1028 .unwrap();
1029 self.storage.invalidate_cache(address_range.start, len);
1030 self.storage.cancel_pending_write(address_range.start, len);
1031 }
1032
1033 pub(crate) fn free_if_uncommitted(
1035 &self,
1036 page: PageNumber,
1037 allocated: &mut PageTrackerPolicy,
1038 ) -> bool {
1039 if self.allocated_since_commit.lock().unwrap().remove(&page) {
1040 self.free_helper(page, allocated);
1041 true
1042 } else {
1043 false
1044 }
1045 }
1046
1047 pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
1049 self.allocated_since_commit.lock().unwrap().contains(&page)
1050 }
1051
1052 pub(crate) fn allocate_helper(
1053 &self,
1054 allocation_size: usize,
1055 lowest: bool,
1056 transactional: bool,
1057 ) -> Result<PageMut> {
1058 let required_pages = allocation_size.div_ceil(self.get_page_size());
1059 let required_order = ceil_log2(required_pages);
1060
1061 let mut state = self.state.lock().unwrap();
1062
1063 let page_number = if let Some(page_number) =
1064 Self::allocate_helper_retry(&mut state, required_order, lowest)?
1065 {
1066 page_number
1067 } else {
1068 self.grow(&mut state, required_order)?;
1069 Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
1070 };
1071
1072 #[cfg(debug_assertions)]
1073 {
1074 assert!(
1075 !self
1076 .read_page_ref_counts
1077 .lock()
1078 .unwrap()
1079 .contains_key(&page_number),
1080 "Allocated a page that is still referenced! {page_number:?}"
1081 );
1082 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
1083 }
1084
1085 if transactional {
1086 self.allocated_since_commit
1087 .lock()
1088 .unwrap()
1089 .insert(page_number);
1090 }
1091
1092 let address_range = page_number.address_range(
1093 self.page_size.into(),
1094 self.region_size,
1095 self.region_header_with_padding_size,
1096 self.page_size,
1097 );
1098 let len: usize = (address_range.end - address_range.start)
1099 .try_into()
1100 .unwrap();
1101
1102 #[allow(unused_mut)]
1103 let mut mem = self.storage.write(address_range.start, len, true)?;
1104 debug_assert!(mem.mem().len() >= allocation_size);
1105
1106 #[cfg(debug_assertions)]
1107 {
1108 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
1109
1110 mem.mem_mut().fill(0xFF);
1112 }
1113
1114 Ok(PageMut {
1115 mem,
1116 page_number,
1117 #[cfg(debug_assertions)]
1118 open_pages: self.open_dirty_pages.clone(),
1119 })
1120 }
1121
1122 fn allocate_helper_retry(
1123 state: &mut InMemoryState,
1124 required_order: u8,
1125 lowest: bool,
1126 ) -> Result<Option<PageNumber>> {
1127 loop {
1128 let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1129 else {
1130 return Ok(None);
1131 };
1132 let region = state.get_region_mut(candidate_region);
1133 let r = if lowest {
1134 region.alloc_lowest(required_order)
1135 } else {
1136 region.alloc(required_order)
1137 };
1138 if let Some(page) = r {
1139 return Ok(Some(PageNumber::new(
1140 candidate_region,
1141 page,
1142 required_order,
1143 )));
1144 }
1145 state
1147 .get_region_tracker_mut()
1148 .mark_full(required_order, candidate_region);
1149 }
1150 }
1151
1152 fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1153 let layout = state.header.layout();
1154 let last_region_index = layout.num_regions() - 1;
1155 let last_allocator = state.get_region(last_region_index);
1156 let trailing_free = last_allocator.trailing_free_pages();
1157 let last_allocator_len = last_allocator.len();
1158 if trailing_free < last_allocator_len / 2 {
1159 return Ok(false);
1160 }
1161 let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1162 trailing_free
1163 } else {
1164 trailing_free / 2
1165 };
1166
1167 let mut new_layout = layout;
1168 new_layout.reduce_last_region(reduce_by);
1169 state.allocators.resize_to(new_layout);
1170 assert!(new_layout.len() <= layout.len());
1171 state.header.set_layout(new_layout);
1172
1173 Ok(true)
1174 }
1175
1176 fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1177 let layout = state.header.layout();
1178 let required_growth =
1179 2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1180 let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1181 * u64::from(state.header.page_size());
1182 let next_desired_size = if layout.num_full_regions() > 0 {
1183 if let Some(trailing) = layout.trailing_region_layout() {
1184 if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1185 layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1187 } else {
1188 layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1190 }
1191 } else {
1192 layout.usable_bytes() + max_region_size
1194 }
1195 } else {
1196 max(
1197 layout.usable_bytes() * 2,
1198 layout.usable_bytes() + required_growth * 2,
1199 )
1200 };
1201 let new_layout = DatabaseLayout::calculate(
1202 next_desired_size,
1203 state.header.layout().full_region_layout().num_pages(),
1204 self.page_size,
1205 );
1206 assert!(new_layout.len() >= layout.len());
1207
1208 let result = self.storage.resize(new_layout.len());
1209 if result.is_err() {
1210 self.needs_recovery.store(true, Ordering::Release);
1213 return result;
1214 }
1215
1216 state.allocators.resize_to(new_layout);
1217 state.header.set_layout(new_layout);
1218 Ok(())
1219 }
1220
1221 pub(crate) fn allocate(
1222 &self,
1223 allocation_size: usize,
1224 allocated: &mut PageTrackerPolicy,
1225 ) -> Result<PageMut> {
1226 let result = self.allocate_helper(allocation_size, false, true);
1227 if let Ok(ref page) = result {
1228 allocated.insert(page.get_page_number());
1229 }
1230 result
1231 }
1232
1233 pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1234 self.allocate_helper(allocation_size, true, true)
1235 }
1236
1237 fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1240 self.allocate_helper(allocation_size, lowest, false)
1241 }
1242
1243 pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1244 let state = self.state.lock().unwrap();
1245 let mut count = 0u64;
1246 for i in 0..state.header.layout().num_regions() {
1247 count += u64::from(state.get_region(i).count_allocated_pages());
1248 }
1249
1250 Ok(count)
1251 }
1252
1253 pub(crate) fn count_free_pages(&self) -> Result<u64> {
1254 let state = self.state.lock().unwrap();
1255 let mut count = 0u64;
1256 for i in 0..state.header.layout().num_regions() {
1257 count += u64::from(state.get_region(i).count_free_pages());
1258 }
1259
1260 Ok(count)
1261 }
1262
1263 pub(crate) fn get_page_size(&self) -> usize {
1264 self.page_size.try_into().unwrap()
1265 }
1266}
1267
1268impl Drop for TransactionalMemory {
1269 fn drop(&mut self) {
1270 if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1271 return;
1272 }
1273
1274 if !self.file_format_v3() {
1276 let tracker_page = self.state.lock().unwrap().header.region_tracker();
1277 self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
1278 if self.allocate_region_tracker_page().is_err() {
1279 #[cfg(feature = "logging")]
1280 warn!("Failure while flushing allocator state. Repair required at restart.");
1281 return;
1282 }
1283 }
1284
1285 let mut state = self.state.lock().unwrap();
1286 if !self.file_format_v3()
1288 && state
1289 .allocators
1290 .flush_to(
1291 state.header.region_tracker(),
1292 state.header.layout(),
1293 &self.storage,
1294 )
1295 .is_err()
1296 {
1297 #[cfg(feature = "logging")]
1298 warn!("Failure while flushing allocator state. Repair required at restart.");
1299 return;
1300 }
1301
1302 if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1303 state.header.recovery_required = false;
1304 let _ = self.write_header(&state.header);
1305 let _ = self.storage.flush(false);
1306 }
1307 }
1308}
1309
1310#[cfg(test)]
1311mod test {
1312 use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1313 use crate::{Database, TableDefinition};
1314
1315 #[test]
1317 fn out_of_regions() {
1318 let tmpfile = crate::create_tempfile();
1319 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1320 let page_size = 1024;
1321 let big_value = vec![0u8; 5 * page_size];
1322
1323 let db = Database::builder()
1324 .set_region_size((8 * page_size).try_into().unwrap())
1325 .set_page_size(page_size)
1326 .create(tmpfile.path())
1327 .unwrap();
1328
1329 let txn = db.begin_write().unwrap();
1330 {
1331 let mut table = txn.open_table(table_definition).unwrap();
1332 for i in 0..=INITIAL_REGIONS {
1333 table.insert(&i, big_value.as_slice()).unwrap();
1334 }
1335 }
1336 txn.commit().unwrap();
1337 drop(db);
1338
1339 let mut db = Database::builder()
1340 .set_region_size((8 * page_size).try_into().unwrap())
1341 .set_page_size(page_size)
1342 .open(tmpfile.path())
1343 .unwrap();
1344 assert!(db.check_integrity().unwrap());
1345 }
1346
1347 #[test]
1349 #[cfg(panic = "unwind")]
1350 fn panic() {
1351 let tmpfile = crate::create_tempfile();
1352 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1353
1354 let _ = std::panic::catch_unwind(|| {
1355 let db = Database::create(&tmpfile).unwrap();
1356 let txn = db.begin_write().unwrap();
1357 txn.open_table(table_definition).unwrap();
1358 panic!();
1359 });
1360
1361 let mut db = Database::open(tmpfile).unwrap();
1362 assert!(db.check_integrity().unwrap());
1363 }
1364}