redb/tree_store/page_store/
page_manager.rs

1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{MAX_PAGE_INDEX, PageHint};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DB_HEADER_SIZE, DatabaseHeader, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{PageImpl, PageMut, hash128_with_seed};
11use crate::tree_store::{Page, PageNumber, PageTrackerPolicy};
12use crate::{CacheStats, StorageBackend};
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::io::ErrorKind;
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread;
27
28// Regions have a maximum size of 4GiB. A `4GiB - overhead` value is the largest that can be represented,
29// because the leaf node format uses 32bit offsets
30const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
31// TODO: remove this constant?
32pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
33pub(super) const MIN_USABLE_PAGES: u32 = 10;
34const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
35
36pub(super) const INITIAL_REGIONS: u32 = 1000; // Enough for a 4TiB database
37
38// Original file format. No lengths stored with btrees
39pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
40// New file format. All btrees have a separate length stored in their header for constant time access
41pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
42// New file format:
43// * Allocator state is stored in a system table, instead of in the region headers
44// * Freed tree split into two system tables: one for the data tables, and one for the system tables
45//   It is no longer stored in a separate tree
46// * New "allocated pages table" which tracks the pages allocated, in the data tree, by a transaction.
47//   This is a system table. It is only written when a savepoint exists
48// * New persistent savepoint format
49pub(crate) const FILE_FORMAT_VERSION3: u8 = 3;
50
51fn ceil_log2(x: usize) -> u8 {
52    if x.is_power_of_two() {
53        x.trailing_zeros().try_into().unwrap()
54    } else {
55        x.next_power_of_two().trailing_zeros().try_into().unwrap()
56    }
57}
58
59pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
60    hash128_with_seed(data, 0)
61}
62
63struct InMemoryState {
64    header: DatabaseHeader,
65    allocators: Allocators,
66}
67
68impl InMemoryState {
69    fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile, version: u8) -> Result<Self> {
70        // TODO: seems like there should be a nicer way to structure this, rather than having
71        // a format version check here
72        let allocators = if header.recovery_required || version >= FILE_FORMAT_VERSION3 {
73            Allocators::new(header.layout())
74        } else {
75            Allocators::from_bytes(&header, file)?
76        };
77        Ok(Self { header, allocators })
78    }
79
80    fn get_region(&self, region: u32) -> &BuddyAllocator {
81        &self.allocators.region_allocators[region as usize]
82    }
83
84    fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
85        &mut self.allocators.region_allocators[region as usize]
86    }
87
88    fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
89        &mut self.allocators.region_tracker
90    }
91}
92
93pub(crate) struct TransactionalMemory {
94    // Pages allocated since the last commit
95    // TODO: maybe this should be moved to WriteTransaction?
96    allocated_since_commit: Mutex<HashSet<PageNumber>>,
97    // True if the allocator state was corrupted when the file was opened
98    // TODO: maybe we can remove this flag now that CheckedBackend exists?
99    needs_recovery: AtomicBool,
100    storage: PagedCachedFile,
101    state: Mutex<InMemoryState>,
102    // The number of PageMut which are outstanding
103    #[cfg(debug_assertions)]
104    open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
105    // Reference counts of PageImpls that are outstanding
106    #[cfg(debug_assertions)]
107    read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
108    // Indicates that a non-durable commit has been made, so reads should be served from the secondary meta page
109    read_from_secondary: AtomicBool,
110    page_size: u32,
111    // We store these separately from the layout because they're static, and accessed on the get_page()
112    // code path where there is no locking
113    region_size: u64,
114    region_header_with_padding_size: u64,
115    file_format: u8,
116}
117
118impl TransactionalMemory {
119    #[allow(clippy::too_many_arguments)]
120    pub(crate) fn new(
121        file: Box<dyn StorageBackend>,
122        // Allow initializing a new database in an empty file
123        allow_initialize: bool,
124        page_size: usize,
125        requested_region_size: Option<u64>,
126        read_cache_size_bytes: usize,
127        write_cache_size_bytes: usize,
128        default_to_file_format_v3: bool,
129    ) -> Result<Self, DatabaseError> {
130        assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
131
132        let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
133        let region_size = min(
134            region_size,
135            (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
136        );
137        assert!(region_size.is_power_of_two());
138
139        let storage = PagedCachedFile::new(
140            file,
141            page_size as u64,
142            read_cache_size_bytes,
143            write_cache_size_bytes,
144        )?;
145
146        let initial_storage_len = storage.raw_file_len()?;
147
148        let magic_number: [u8; MAGICNUMBER.len()] =
149            if initial_storage_len >= MAGICNUMBER.len() as u64 {
150                storage
151                    .read_direct(0, MAGICNUMBER.len())?
152                    .try_into()
153                    .unwrap()
154            } else {
155                [0; MAGICNUMBER.len()]
156            };
157
158        if initial_storage_len > 0 {
159            // File already exists check that the magic number matches
160            if magic_number != MAGICNUMBER {
161                return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
162            }
163        } else {
164            // File is empty, check that we're allowed to initialize a new database (i.e. the caller is Database::create() and not open())
165            if !allow_initialize {
166                return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
167            }
168        }
169
170        if magic_number != MAGICNUMBER {
171            let region_tracker_required_bytes =
172                RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
173                    .to_vec()
174                    .len();
175
176            // Make sure that there is enough room to allocate the region tracker into a page
177            let size: u64 = max(
178                MIN_DESIRED_USABLE_BYTES,
179                page_size as u64 * u64::from(MIN_USABLE_PAGES),
180            );
181            let tracker_space =
182                (page_size * region_tracker_required_bytes.div_ceil(page_size)) as u64;
183            let starting_size = size + tracker_space;
184
185            let layout = DatabaseLayout::calculate(
186                starting_size,
187                (region_size / u64::try_from(page_size).unwrap())
188                    .try_into()
189                    .unwrap(),
190                page_size.try_into().unwrap(),
191            );
192
193            {
194                let file_len = storage.raw_file_len()?;
195
196                if file_len < layout.len() {
197                    storage.resize(layout.len())?;
198                }
199            }
200
201            let mut allocators = Allocators::new(layout);
202
203            // Allocate the region tracker in the zeroth region
204            let tracker_page = {
205                let tracker_required_pages =
206                    allocators.region_tracker.to_vec().len().div_ceil(page_size);
207                let required_order = ceil_log2(tracker_required_pages);
208                let page_number = allocators.region_allocators[0]
209                    .alloc(required_order)
210                    .unwrap();
211                PageNumber::new(0, page_number, required_order)
212            };
213
214            let file_format = if default_to_file_format_v3 {
215                FILE_FORMAT_VERSION3
216            } else {
217                FILE_FORMAT_VERSION2
218            };
219            let mut header =
220                DatabaseHeader::new(layout, TransactionId::new(0), file_format, tracker_page);
221
222            header.recovery_required = false;
223            header.two_phase_commit = true;
224            storage
225                .write(0, DB_HEADER_SIZE, true)?
226                .mem_mut()
227                .copy_from_slice(&header.to_bytes(false));
228            allocators.flush_to(tracker_page, layout, &storage)?;
229
230            storage.flush(false)?;
231            // Write the magic number only after the data structure is initialized and written to disk
232            // to ensure that it's crash safe
233            storage
234                .write(0, DB_HEADER_SIZE, true)?
235                .mem_mut()
236                .copy_from_slice(&header.to_bytes(true));
237            storage.flush(false)?;
238        }
239        let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
240        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
241
242        assert_eq!(header.page_size() as usize, page_size);
243        assert!(storage.raw_file_len()? >= header.layout().len());
244        let needs_recovery =
245            header.recovery_required || header.layout().len() != storage.raw_file_len()?;
246        if needs_recovery {
247            let layout = header.layout();
248            let region_max_pages = layout.full_region_layout().num_pages();
249            let region_header_pages = layout.full_region_layout().get_header_pages();
250            header.set_layout(DatabaseLayout::recalculate(
251                storage.raw_file_len()?,
252                region_header_pages,
253                region_max_pages,
254                page_size.try_into().unwrap(),
255            ));
256            header.pick_primary_for_repair(repair_info)?;
257            assert!(!repair_info.invalid_magic_number);
258            storage
259                .write(0, DB_HEADER_SIZE, true)?
260                .mem_mut()
261                .copy_from_slice(&header.to_bytes(true));
262            storage.flush(false)?;
263        }
264
265        let layout = header.layout();
266        assert_eq!(layout.len(), storage.raw_file_len()?);
267        let region_size = layout.full_region_layout().len();
268        let region_header_size = layout.full_region_layout().data_section().start;
269        let version = header.primary_slot().version;
270        let state = InMemoryState::from_bytes(header, &storage, version)?;
271
272        assert!(page_size >= DB_HEADER_SIZE);
273
274        Ok(Self {
275            allocated_since_commit: Mutex::new(HashSet::new()),
276            needs_recovery: AtomicBool::new(needs_recovery),
277            storage,
278            state: Mutex::new(state),
279            #[cfg(debug_assertions)]
280            open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
281            #[cfg(debug_assertions)]
282            read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
283            read_from_secondary: AtomicBool::new(false),
284            page_size: page_size.try_into().unwrap(),
285            region_size,
286            region_header_with_padding_size: region_header_size,
287            file_format: version,
288        })
289    }
290
291    pub(crate) fn upgrade_to_v3(&mut self) -> Result {
292        let data_root = self.get_data_root();
293        let system_root = self.get_system_root();
294        let transaction_id = self.get_last_committed_transaction_id()?;
295        assert!(self.get_freed_root().is_none());
296
297        let tracker_page = self.tracker_page();
298        self.file_format = FILE_FORMAT_VERSION3;
299        // Repeat twice just to be sure both slots have the new version number
300        for _ in 0..2 {
301            match self.commit(data_root, system_root, None, transaction_id, false, true) {
302                Ok(()) => {}
303                Err(err) => {
304                    self.storage.set_irrecoverable_io_error();
305                    return Err(err);
306                }
307            }
308        }
309        self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
310
311        Ok(())
312    }
313
314    pub(crate) fn file_format_v3(&self) -> bool {
315        self.file_format == FILE_FORMAT_VERSION3
316    }
317
318    pub(crate) fn cache_stats(&self) -> CacheStats {
319        self.storage.cache_stats()
320    }
321
322    pub(crate) fn check_io_errors(&self) -> Result {
323        self.storage.check_io_errors()
324    }
325
326    #[cfg(any(test, fuzzing))]
327    pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
328        self.state.lock().unwrap().allocators.all_allocated()
329    }
330
331    pub(crate) fn tracker_page(&self) -> PageNumber {
332        self.state.lock().unwrap().header.region_tracker()
333    }
334
335    pub(crate) fn clear_read_cache(&self) {
336        self.storage.invalidate_cache_all();
337    }
338
339    pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
340        assert!(self.allocated_since_commit.lock().unwrap().is_empty());
341
342        self.storage.flush(false)?;
343        self.storage.invalidate_cache_all();
344
345        let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
346        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
347        // TODO: This ends up always being true because this is called from check_integrity() once the db is already open
348        // TODO: Also we should recheck the layout
349        let mut was_clean = true;
350        if header.recovery_required {
351            if !header.pick_primary_for_repair(repair_info)? {
352                was_clean = false;
353            }
354            if repair_info.invalid_magic_number {
355                return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
356            }
357            self.storage
358                .write(0, DB_HEADER_SIZE, true)?
359                .mem_mut()
360                .copy_from_slice(&header.to_bytes(true));
361            self.storage.flush(false)?;
362        }
363
364        self.needs_recovery
365            .store(header.recovery_required, Ordering::Release);
366        self.state.lock().unwrap().header = header;
367
368        Ok(was_clean)
369    }
370
371    pub(crate) fn begin_writable(&self) -> Result {
372        let mut state = self.state.lock().unwrap();
373        assert!(!state.header.recovery_required);
374        state.header.recovery_required = true;
375        self.write_header(&state.header)?;
376        self.storage.flush(false)
377    }
378
379    pub(crate) fn needs_repair(&self) -> Result<bool> {
380        Ok(self.state.lock().unwrap().header.recovery_required)
381    }
382
383    pub(crate) fn used_two_phase_commit(&self) -> bool {
384        self.state.lock().unwrap().header.two_phase_commit
385    }
386
387    pub(crate) fn allocator_hash(&self) -> u128 {
388        self.state.lock().unwrap().allocators.xxh3_hash()
389    }
390
391    // TODO: need a clearer distinction between this and needs_repair()
392    pub(crate) fn storage_failure(&self) -> bool {
393        self.needs_recovery.load(Ordering::Acquire)
394    }
395
396    pub(crate) fn repair_primary_corrupted(&self) {
397        let mut state = self.state.lock().unwrap();
398        state.header.swap_primary_slot();
399    }
400
401    pub(crate) fn begin_repair(&self) -> Result<()> {
402        let mut state = self.state.lock().unwrap();
403        state.allocators = Allocators::new(state.header.layout());
404
405        Ok(())
406    }
407
408    pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
409        let mut state = self.state.lock().unwrap();
410        let region_index = page_number.region;
411        let allocator = state.get_region_mut(region_index);
412        allocator.record_alloc(page_number.page_index, page_number.page_order);
413    }
414
415    fn write_header(&self, header: &DatabaseHeader) -> Result {
416        self.storage
417            .write(0, DB_HEADER_SIZE, true)?
418            .mem_mut()
419            .copy_from_slice(&header.to_bytes(true));
420
421        Ok(())
422    }
423
424    pub(crate) fn end_repair(&self) -> Result<()> {
425        if !self.file_format_v3() {
426            self.allocate_region_tracker_page()?;
427        }
428
429        let mut state = self.state.lock().unwrap();
430        if !self.file_format_v3() {
431            let tracker_page = state.header.region_tracker();
432            state
433                .allocators
434                .flush_to(tracker_page, state.header.layout(), &self.storage)?;
435        }
436
437        state.header.recovery_required = false;
438        self.write_header(&state.header)?;
439        let result = self.storage.flush(false);
440        self.needs_recovery.store(false, Ordering::Release);
441
442        result
443    }
444
445    pub(crate) fn reserve_allocator_state(
446        &self,
447        tree: &mut AllocatorStateTree,
448        transaction_id: TransactionId,
449    ) -> Result<u32> {
450        let state = self.state.lock().unwrap();
451        let layout = state.header.layout();
452        let num_regions = layout.num_regions();
453        let region_tracker_len = state.allocators.region_tracker.to_vec().len();
454        let region_lens: Vec<usize> = state
455            .allocators
456            .region_allocators
457            .iter()
458            .map(|x| x.to_vec().len())
459            .collect();
460        drop(state);
461
462        for i in 0..num_regions {
463            let region_bytes_len = region_lens[i as usize];
464            tree.insert(
465                &AllocatorStateKey::Region(i),
466                &vec![0; region_bytes_len].as_ref(),
467            )?;
468        }
469
470        tree.insert(
471            &AllocatorStateKey::RegionTracker,
472            &vec![0; region_tracker_len].as_ref(),
473        )?;
474
475        tree.insert(
476            &AllocatorStateKey::TransactionId,
477            &transaction_id.raw_id().to_le_bytes().as_ref(),
478        )?;
479
480        Ok(num_regions)
481    }
482
483    // Returns true on success, or false if the number of regions has changed
484    pub(crate) fn try_save_allocator_state(
485        &self,
486        tree: &mut AllocatorStateTree,
487        num_regions: u32,
488    ) -> Result<bool> {
489        // Has the number of regions changed since reserve_allocator_state() was called?
490        let state = self.state.lock().unwrap();
491        if num_regions != state.header.layout().num_regions() {
492            return Ok(false);
493        }
494
495        // Temporarily free the region tracker page, because we don't want to include it in our
496        // recorded allocations
497        let tracker_page = if !self.file_format_v3() {
498            let tracker_page = state.header.region_tracker();
499            drop(state);
500            self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
501            Some(tracker_page)
502        } else {
503            drop(state);
504            None
505        };
506
507        let result = self.try_save_allocator_state_inner(tree, num_regions);
508
509        // Restore the region tracker page
510        if let Some(tracker_page) = tracker_page {
511            self.mark_page_allocated(tracker_page);
512        }
513
514        result
515    }
516
517    fn try_save_allocator_state_inner(
518        &self,
519        tree: &mut AllocatorStateTree,
520        num_regions: u32,
521    ) -> Result<bool> {
522        for i in 0..num_regions {
523            let region_bytes =
524                &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
525            tree.insert_inplace(&AllocatorStateKey::Region(i), &region_bytes.as_ref())?;
526        }
527
528        let region_tracker_bytes = self
529            .state
530            .lock()
531            .unwrap()
532            .allocators
533            .region_tracker
534            .to_vec();
535        tree.insert_inplace(
536            &AllocatorStateKey::RegionTracker,
537            &region_tracker_bytes.as_ref(),
538        )?;
539
540        Ok(true)
541    }
542
543    // Returns true if the allocator state table is up to date, or false if it's stale
544    pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
545        // See if this is stale allocator state left over from a previous transaction. That won't
546        // happen during normal operation, since WriteTransaction::commit() always updates the
547        // allocator state table before calling TransactionalMemory::commit(), but there are also
548        // a few places where TransactionalMemory::commit() is called directly without using a
549        // WriteTransaction. When that happens, any existing allocator state table will be left
550        // in place but is no longer valid. (And even if there were no such calls today, it would
551        // be an easy mistake to make! So it's good that we check.)
552        let transaction_id = TransactionId::new(u64::from_le_bytes(
553            tree.get(&AllocatorStateKey::TransactionId)?
554                .unwrap()
555                .value()
556                .try_into()
557                .unwrap(),
558        ));
559
560        Ok(transaction_id == self.get_last_committed_transaction_id()?)
561    }
562
563    pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
564        assert!(self.is_valid_allocator_state(tree)?);
565
566        // Load the allocator state
567        let mut region_allocators = vec![];
568        for region in
569            tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
570        {
571            region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
572        }
573
574        let region_tracker = RegionTracker::from_page(
575            tree.get(&AllocatorStateKey::RegionTracker)?
576                .unwrap()
577                .value(),
578        );
579
580        let mut state = self.state.lock().unwrap();
581        state.allocators = Allocators {
582            region_tracker,
583            region_allocators,
584        };
585
586        // Resize the allocators to match the current file size
587        let layout = state.header.layout();
588        state.allocators.resize_to(layout);
589        drop(state);
590
591        // Allocate a page for the region tracker
592        if !self.file_format_v3() {
593            self.allocate_region_tracker_page()?;
594        }
595
596        self.state.lock().unwrap().header.recovery_required = false;
597        self.needs_recovery.store(false, Ordering::Release);
598
599        Ok(())
600    }
601
602    pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
603        let state = self.state.lock().unwrap();
604        let allocator = state.get_region(page.region);
605
606        allocator.is_allocated(page.page_index, page.page_order)
607    }
608
609    // Allocate a page for the region tracker. If possible, this will pick the same page that
610    // was used last time; otherwise it'll pick a new page and update the database header to
611    // match
612    fn allocate_region_tracker_page(&self) -> Result {
613        let mut state = self.state.lock().unwrap();
614        let tracker_len = state.allocators.region_tracker.to_vec().len();
615        let tracker_page = state.header.region_tracker();
616
617        let allocator = state.get_region_mut(tracker_page.region);
618        // Pick a new tracker page, if the old one was overwritten or is too small
619        if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
620            || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
621        {
622            drop(state);
623
624            let new_tracker_page = self
625                .allocate_non_transactional(tracker_len, false)?
626                .get_page_number();
627
628            let mut state = self.state.lock().unwrap();
629            state.header.set_region_tracker(new_tracker_page);
630            self.write_header(&state.header)?;
631            self.storage.flush(false)?;
632        } else {
633            // The old page is available, so just mark it as allocated
634            allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
635            drop(state);
636        }
637
638        Ok(())
639    }
640
641    // Relocates the region tracker to a lower page, if possible
642    // Returns true if the page was moved
643    pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
644        let state = self.state.lock().unwrap();
645        let region_tracker_size = state
646            .header
647            .region_tracker()
648            .page_size_bytes(self.page_size);
649        let old_tracker_page = state.header.region_tracker();
650        // allocate acquires this lock, so we need to drop it
651        drop(state);
652        let new_page =
653            self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
654        if new_page.get_page_number().is_before(old_tracker_page) {
655            let mut state = self.state.lock().unwrap();
656            state.header.set_region_tracker(new_page.get_page_number());
657            drop(state);
658            self.free(old_tracker_page, &mut PageTrackerPolicy::Ignore);
659            Ok(true)
660        } else {
661            let new_page_number = new_page.get_page_number();
662            drop(new_page);
663            self.free(new_page_number, &mut PageTrackerPolicy::Ignore);
664            Ok(false)
665        }
666    }
667
668    // Diffs region_states, which must be derived from get_raw_allocator_states(), against
669    // the currently allocated set of pages
670    pub(crate) fn pages_allocated_since_raw_state(
671        &self,
672        old_states: &[BuddyAllocator],
673    ) -> Vec<PageNumber> {
674        let mut result = vec![];
675        let state = self.state.lock().unwrap();
676
677        for i in 0..state.header.layout().num_regions() {
678            let current_state = state.get_region(i);
679            if let Some(old_state) = old_states.get(i as usize) {
680                current_state.difference(i, old_state, &mut result);
681            } else {
682                // This region didn't exist, so everything is newly allocated
683                current_state.get_allocated_pages(i, &mut result);
684            }
685        }
686
687        // Don't include the region tracker, since we manage that internally to the TranscationalMemory
688        // Otherwise restoring a savepoint would free it.
689        result.retain(|x| *x != state.header.region_tracker());
690
691        result
692    }
693
694    pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
695        let state = self.state.lock().unwrap();
696
697        let mut regional_allocators = vec![];
698        for i in 0..state.header.layout().num_regions() {
699            regional_allocators.push(state.get_region(i).make_state_for_savepoint());
700        }
701
702        regional_allocators
703    }
704
705    // Commit all outstanding changes and make them visible as the primary
706    #[allow(clippy::too_many_arguments)]
707    pub(crate) fn commit(
708        &self,
709        data_root: Option<BtreeHeader>,
710        system_root: Option<BtreeHeader>,
711        freed_root: Option<BtreeHeader>,
712        transaction_id: TransactionId,
713        eventual: bool,
714        two_phase: bool,
715    ) -> Result {
716        let result = self.commit_inner(
717            data_root,
718            system_root,
719            freed_root,
720            transaction_id,
721            eventual,
722            two_phase,
723        );
724        if result.is_err() {
725            self.needs_recovery.store(true, Ordering::Release);
726        }
727        result
728    }
729
730    #[allow(clippy::too_many_arguments)]
731    fn commit_inner(
732        &self,
733        data_root: Option<BtreeHeader>,
734        system_root: Option<BtreeHeader>,
735        freed_root: Option<BtreeHeader>,
736        transaction_id: TransactionId,
737        eventual: bool,
738        two_phase: bool,
739    ) -> Result {
740        // All mutable pages must be dropped, this ensures that when a transaction completes
741        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
742        // to future read transactions
743        #[cfg(debug_assertions)]
744        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
745        assert!(!self.needs_recovery.load(Ordering::Acquire));
746
747        let mut state = self.state.lock().unwrap();
748        // Trim surplus file space, before finalizing the commit
749        let shrunk = Self::try_shrink(&mut state)?;
750        // Copy the header so that we can release the state lock, while we flush the file
751        let mut header = state.header.clone();
752        drop(state);
753
754        let old_transaction_id = header.secondary_slot().transaction_id;
755        let secondary = header.secondary_slot_mut();
756        secondary.transaction_id = transaction_id;
757        secondary.user_root = data_root;
758        secondary.system_root = system_root;
759        secondary.freed_root = freed_root;
760        secondary.version = self.file_format;
761
762        self.write_header(&header)?;
763
764        // Use 2-phase commit, if checksums are disabled
765        if two_phase {
766            self.storage.flush(eventual)?;
767        }
768
769        // Make our new commit the primary, and record whether it was a 2-phase commit.
770        // These two bits need to be written atomically
771        header.swap_primary_slot();
772        header.two_phase_commit = two_phase;
773
774        // Write the new header to disk
775        self.write_header(&header)?;
776        self.storage.flush(eventual)?;
777
778        if shrunk {
779            let result = self.storage.resize(header.layout().len());
780            if result.is_err() {
781                // TODO: it would be nice to have a more cohesive approach to setting this.
782                // we do it in commit() & rollback() on failure, but there are probably other places that need it
783                self.needs_recovery.store(true, Ordering::Release);
784                return result;
785            }
786        }
787        self.allocated_since_commit.lock().unwrap().clear();
788
789        let mut state = self.state.lock().unwrap();
790        assert_eq!(
791            state.header.secondary_slot().transaction_id,
792            old_transaction_id
793        );
794        state.header = header;
795        self.read_from_secondary.store(false, Ordering::Release);
796        // Hold lock until read_from_secondary is set to false, so that the new primary state is read.
797        // TODO: maybe we can remove the whole read_from_secondary flag?
798        drop(state);
799
800        Ok(())
801    }
802
803    // Make changes visible, without a durability guarantee
804    pub(crate) fn non_durable_commit(
805        &self,
806        data_root: Option<BtreeHeader>,
807        system_root: Option<BtreeHeader>,
808        freed_root: Option<BtreeHeader>,
809        transaction_id: TransactionId,
810    ) -> Result {
811        // All mutable pages must be dropped, this ensures that when a transaction completes
812        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
813        // to future read transactions
814        #[cfg(debug_assertions)]
815        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
816        assert!(!self.needs_recovery.load(Ordering::Acquire));
817
818        self.allocated_since_commit.lock().unwrap().clear();
819        self.storage.write_barrier()?;
820
821        let mut state = self.state.lock().unwrap();
822        let secondary = state.header.secondary_slot_mut();
823        secondary.transaction_id = transaction_id;
824        secondary.user_root = data_root;
825        secondary.system_root = system_root;
826        secondary.freed_root = freed_root;
827
828        // TODO: maybe we can remove this flag and just update the in-memory DatabaseHeader state?
829        self.read_from_secondary.store(true, Ordering::Release);
830
831        Ok(())
832    }
833
834    pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
835        let result = self.rollback_uncommitted_writes_inner();
836        if result.is_err() {
837            self.needs_recovery.store(true, Ordering::Release);
838        }
839        result
840    }
841
842    fn rollback_uncommitted_writes_inner(&self) -> Result {
843        #[cfg(debug_assertions)]
844        {
845            let dirty_pages = self.open_dirty_pages.lock().unwrap();
846            debug_assert!(
847                dirty_pages.is_empty(),
848                "Dirty pages outstanding: {dirty_pages:?}"
849            );
850        }
851        assert!(!self.needs_recovery.load(Ordering::Acquire));
852        let mut state = self.state.lock().unwrap();
853        let mut guard = self.allocated_since_commit.lock().unwrap();
854        for page_number in guard.iter() {
855            let region_index = page_number.region;
856            state
857                .get_region_tracker_mut()
858                .mark_free(page_number.page_order, region_index);
859            state
860                .get_region_mut(region_index)
861                .free(page_number.page_index, page_number.page_order);
862
863            let address = page_number.address_range(
864                self.page_size.into(),
865                self.region_size,
866                self.region_header_with_padding_size,
867                self.page_size,
868            );
869            let len: usize = (address.end - address.start).try_into().unwrap();
870            self.storage.invalidate_cache(address.start, len);
871            self.storage.cancel_pending_write(address.start, len);
872        }
873        guard.clear();
874
875        Ok(())
876    }
877
878    // TODO: make all callers explicitly provide a hint
879    pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
880        self.get_page_extended(page_number, PageHint::None)
881    }
882
883    pub(crate) fn get_page_extended(
884        &self,
885        page_number: PageNumber,
886        hint: PageHint,
887    ) -> Result<PageImpl> {
888        let range = page_number.address_range(
889            self.page_size.into(),
890            self.region_size,
891            self.region_header_with_padding_size,
892            self.page_size,
893        );
894        let len: usize = (range.end - range.start).try_into().unwrap();
895        let mem = self.storage.read(range.start, len, hint)?;
896
897        // We must not retrieve an immutable reference to a page which already has a mutable ref to it
898        #[cfg(debug_assertions)]
899        {
900            let dirty_pages = self.open_dirty_pages.lock().unwrap();
901            debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
902            *(self
903                .read_page_ref_counts
904                .lock()
905                .unwrap()
906                .entry(page_number)
907                .or_default()) += 1;
908            drop(dirty_pages);
909        }
910
911        Ok(PageImpl {
912            mem,
913            page_number,
914            #[cfg(debug_assertions)]
915            open_pages: self.read_page_ref_counts.clone(),
916        })
917    }
918
919    // NOTE: the caller must ensure that the read cache has been invalidated or stale reads my occur
920    pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
921        #[cfg(debug_assertions)]
922        {
923            assert!(
924                !self
925                    .read_page_ref_counts
926                    .lock()
927                    .unwrap()
928                    .contains_key(&page_number)
929            );
930            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
931        }
932
933        let address_range = page_number.address_range(
934            self.page_size.into(),
935            self.region_size,
936            self.region_header_with_padding_size,
937            self.page_size,
938        );
939        let len: usize = (address_range.end - address_range.start)
940            .try_into()
941            .unwrap();
942        let mem = self.storage.write(address_range.start, len, false)?;
943
944        #[cfg(debug_assertions)]
945        {
946            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
947        }
948
949        Ok(PageMut {
950            mem,
951            page_number,
952            #[cfg(debug_assertions)]
953            open_pages: self.open_dirty_pages.clone(),
954        })
955    }
956
957    pub(crate) fn get_version(&self) -> u8 {
958        let state = self.state.lock().unwrap();
959        if self.read_from_secondary.load(Ordering::Acquire) {
960            state.header.secondary_slot().version
961        } else {
962            state.header.primary_slot().version
963        }
964    }
965
966    pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
967        let state = self.state.lock().unwrap();
968        if self.read_from_secondary.load(Ordering::Acquire) {
969            state.header.secondary_slot().user_root
970        } else {
971            state.header.primary_slot().user_root
972        }
973    }
974
975    pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
976        let state = self.state.lock().unwrap();
977        if self.read_from_secondary.load(Ordering::Acquire) {
978            state.header.secondary_slot().system_root
979        } else {
980            state.header.primary_slot().system_root
981        }
982    }
983
984    pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
985        let state = self.state.lock().unwrap();
986        if self.read_from_secondary.load(Ordering::Acquire) {
987            state.header.secondary_slot().freed_root
988        } else {
989            state.header.primary_slot().freed_root
990        }
991    }
992
993    pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
994        let state = self.state.lock()?;
995        if self.read_from_secondary.load(Ordering::Acquire) {
996            Ok(state.header.secondary_slot().transaction_id)
997        } else {
998            Ok(state.header.primary_slot().transaction_id)
999        }
1000    }
1001
1002    pub(crate) fn free(&self, page: PageNumber, allocated: &mut PageTrackerPolicy) {
1003        self.allocated_since_commit.lock().unwrap().remove(&page);
1004        self.free_helper(page, allocated);
1005    }
1006
1007    fn free_helper(&self, page: PageNumber, allocated: &mut PageTrackerPolicy) {
1008        allocated.remove(page);
1009        let mut state = self.state.lock().unwrap();
1010        let region_index = page.region;
1011        // Free in the regional allocator
1012        state
1013            .get_region_mut(region_index)
1014            .free(page.page_index, page.page_order);
1015        // Ensure that the region is marked as having free space
1016        state
1017            .get_region_tracker_mut()
1018            .mark_free(page.page_order, region_index);
1019
1020        let address_range = page.address_range(
1021            self.page_size.into(),
1022            self.region_size,
1023            self.region_header_with_padding_size,
1024            self.page_size,
1025        );
1026        let len: usize = (address_range.end - address_range.start)
1027            .try_into()
1028            .unwrap();
1029        self.storage.invalidate_cache(address_range.start, len);
1030        self.storage.cancel_pending_write(address_range.start, len);
1031    }
1032
1033    // Frees the page if it was allocated since the last commit. Returns true, if the page was freed
1034    pub(crate) fn free_if_uncommitted(
1035        &self,
1036        page: PageNumber,
1037        allocated: &mut PageTrackerPolicy,
1038    ) -> bool {
1039        if self.allocated_since_commit.lock().unwrap().remove(&page) {
1040            self.free_helper(page, allocated);
1041            true
1042        } else {
1043            false
1044        }
1045    }
1046
1047    // Page has not been committed
1048    pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
1049        self.allocated_since_commit.lock().unwrap().contains(&page)
1050    }
1051
1052    pub(crate) fn allocate_helper(
1053        &self,
1054        allocation_size: usize,
1055        lowest: bool,
1056        transactional: bool,
1057    ) -> Result<PageMut> {
1058        let required_pages = allocation_size.div_ceil(self.get_page_size());
1059        let required_order = ceil_log2(required_pages);
1060
1061        let mut state = self.state.lock().unwrap();
1062
1063        let page_number = if let Some(page_number) =
1064            Self::allocate_helper_retry(&mut state, required_order, lowest)?
1065        {
1066            page_number
1067        } else {
1068            self.grow(&mut state, required_order)?;
1069            Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
1070        };
1071
1072        #[cfg(debug_assertions)]
1073        {
1074            assert!(
1075                !self
1076                    .read_page_ref_counts
1077                    .lock()
1078                    .unwrap()
1079                    .contains_key(&page_number),
1080                "Allocated a page that is still referenced! {page_number:?}"
1081            );
1082            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
1083        }
1084
1085        if transactional {
1086            self.allocated_since_commit
1087                .lock()
1088                .unwrap()
1089                .insert(page_number);
1090        }
1091
1092        let address_range = page_number.address_range(
1093            self.page_size.into(),
1094            self.region_size,
1095            self.region_header_with_padding_size,
1096            self.page_size,
1097        );
1098        let len: usize = (address_range.end - address_range.start)
1099            .try_into()
1100            .unwrap();
1101
1102        #[allow(unused_mut)]
1103        let mut mem = self.storage.write(address_range.start, len, true)?;
1104        debug_assert!(mem.mem().len() >= allocation_size);
1105
1106        #[cfg(debug_assertions)]
1107        {
1108            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
1109
1110            // Poison the memory in debug mode to help detect uninitialized reads
1111            mem.mem_mut().fill(0xFF);
1112        }
1113
1114        Ok(PageMut {
1115            mem,
1116            page_number,
1117            #[cfg(debug_assertions)]
1118            open_pages: self.open_dirty_pages.clone(),
1119        })
1120    }
1121
1122    fn allocate_helper_retry(
1123        state: &mut InMemoryState,
1124        required_order: u8,
1125        lowest: bool,
1126    ) -> Result<Option<PageNumber>> {
1127        loop {
1128            let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1129            else {
1130                return Ok(None);
1131            };
1132            let region = state.get_region_mut(candidate_region);
1133            let r = if lowest {
1134                region.alloc_lowest(required_order)
1135            } else {
1136                region.alloc(required_order)
1137            };
1138            if let Some(page) = r {
1139                return Ok(Some(PageNumber::new(
1140                    candidate_region,
1141                    page,
1142                    required_order,
1143                )));
1144            }
1145            // Mark the region, if it's full
1146            state
1147                .get_region_tracker_mut()
1148                .mark_full(required_order, candidate_region);
1149        }
1150    }
1151
1152    fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1153        let layout = state.header.layout();
1154        let last_region_index = layout.num_regions() - 1;
1155        let last_allocator = state.get_region(last_region_index);
1156        let trailing_free = last_allocator.trailing_free_pages();
1157        let last_allocator_len = last_allocator.len();
1158        if trailing_free < last_allocator_len / 2 {
1159            return Ok(false);
1160        }
1161        let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1162            trailing_free
1163        } else {
1164            trailing_free / 2
1165        };
1166
1167        let mut new_layout = layout;
1168        new_layout.reduce_last_region(reduce_by);
1169        state.allocators.resize_to(new_layout);
1170        assert!(new_layout.len() <= layout.len());
1171        state.header.set_layout(new_layout);
1172
1173        Ok(true)
1174    }
1175
1176    fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1177        let layout = state.header.layout();
1178        let required_growth =
1179            2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1180        let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1181            * u64::from(state.header.page_size());
1182        let next_desired_size = if layout.num_full_regions() > 0 {
1183            if let Some(trailing) = layout.trailing_region_layout() {
1184                if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1185                    // Fill out the trailing region
1186                    layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1187                } else {
1188                    // Fill out trailing & Grow by 1 region
1189                    layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1190                }
1191            } else {
1192                // Grow by 1 region
1193                layout.usable_bytes() + max_region_size
1194            }
1195        } else {
1196            max(
1197                layout.usable_bytes() * 2,
1198                layout.usable_bytes() + required_growth * 2,
1199            )
1200        };
1201        let new_layout = DatabaseLayout::calculate(
1202            next_desired_size,
1203            state.header.layout().full_region_layout().num_pages(),
1204            self.page_size,
1205        );
1206        assert!(new_layout.len() >= layout.len());
1207
1208        let result = self.storage.resize(new_layout.len());
1209        if result.is_err() {
1210            // TODO: it would be nice to have a more cohesive approach to setting this.
1211            // we do it in commit() & rollback() on failure, but there are probably other places that need it
1212            self.needs_recovery.store(true, Ordering::Release);
1213            return result;
1214        }
1215
1216        state.allocators.resize_to(new_layout);
1217        state.header.set_layout(new_layout);
1218        Ok(())
1219    }
1220
1221    pub(crate) fn allocate(
1222        &self,
1223        allocation_size: usize,
1224        allocated: &mut PageTrackerPolicy,
1225    ) -> Result<PageMut> {
1226        let result = self.allocate_helper(allocation_size, false, true);
1227        if let Ok(ref page) = result {
1228            allocated.insert(page.get_page_number());
1229        }
1230        result
1231    }
1232
1233    pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1234        self.allocate_helper(allocation_size, true, true)
1235    }
1236
1237    // Allocate a page not associated with any transaction. The page is immediately considered committed,
1238    // and won't be rolled back if an abort happens. This is only used for the region tracker
1239    fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1240        self.allocate_helper(allocation_size, lowest, false)
1241    }
1242
1243    pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1244        let state = self.state.lock().unwrap();
1245        let mut count = 0u64;
1246        for i in 0..state.header.layout().num_regions() {
1247            count += u64::from(state.get_region(i).count_allocated_pages());
1248        }
1249
1250        Ok(count)
1251    }
1252
1253    pub(crate) fn count_free_pages(&self) -> Result<u64> {
1254        let state = self.state.lock().unwrap();
1255        let mut count = 0u64;
1256        for i in 0..state.header.layout().num_regions() {
1257            count += u64::from(state.get_region(i).count_free_pages());
1258        }
1259
1260        Ok(count)
1261    }
1262
1263    pub(crate) fn get_page_size(&self) -> usize {
1264        self.page_size.try_into().unwrap()
1265    }
1266}
1267
1268impl Drop for TransactionalMemory {
1269    fn drop(&mut self) {
1270        if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1271            return;
1272        }
1273
1274        // Reallocate the region tracker page, which will grow it if necessary
1275        if !self.file_format_v3() {
1276            let tracker_page = self.state.lock().unwrap().header.region_tracker();
1277            self.free(tracker_page, &mut PageTrackerPolicy::Ignore);
1278            if self.allocate_region_tracker_page().is_err() {
1279                #[cfg(feature = "logging")]
1280                warn!("Failure while flushing allocator state. Repair required at restart.");
1281                return;
1282            }
1283        }
1284
1285        let mut state = self.state.lock().unwrap();
1286        // File format v3 now relies on the quick repair code path
1287        if !self.file_format_v3()
1288            && state
1289                .allocators
1290                .flush_to(
1291                    state.header.region_tracker(),
1292                    state.header.layout(),
1293                    &self.storage,
1294                )
1295                .is_err()
1296        {
1297            #[cfg(feature = "logging")]
1298            warn!("Failure while flushing allocator state. Repair required at restart.");
1299            return;
1300        }
1301
1302        if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1303            state.header.recovery_required = false;
1304            let _ = self.write_header(&state.header);
1305            let _ = self.storage.flush(false);
1306        }
1307    }
1308}
1309
1310#[cfg(test)]
1311mod test {
1312    use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1313    use crate::{Database, TableDefinition};
1314
1315    // Test that the region tracker expansion code works, by adding more data than fits into the initial max regions
1316    #[test]
1317    fn out_of_regions() {
1318        let tmpfile = crate::create_tempfile();
1319        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1320        let page_size = 1024;
1321        let big_value = vec![0u8; 5 * page_size];
1322
1323        let db = Database::builder()
1324            .set_region_size((8 * page_size).try_into().unwrap())
1325            .set_page_size(page_size)
1326            .create(tmpfile.path())
1327            .unwrap();
1328
1329        let txn = db.begin_write().unwrap();
1330        {
1331            let mut table = txn.open_table(table_definition).unwrap();
1332            for i in 0..=INITIAL_REGIONS {
1333                table.insert(&i, big_value.as_slice()).unwrap();
1334            }
1335        }
1336        txn.commit().unwrap();
1337        drop(db);
1338
1339        let mut db = Database::builder()
1340            .set_region_size((8 * page_size).try_into().unwrap())
1341            .set_page_size(page_size)
1342            .open(tmpfile.path())
1343            .unwrap();
1344        assert!(db.check_integrity().unwrap());
1345    }
1346
1347    // Make sure the database remains consistent after a panic
1348    #[test]
1349    #[cfg(panic = "unwind")]
1350    fn panic() {
1351        let tmpfile = crate::create_tempfile();
1352        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1353
1354        let _ = std::panic::catch_unwind(|| {
1355            let db = Database::create(&tmpfile).unwrap();
1356            let txn = db.begin_write().unwrap();
1357            txn.open_table(table_definition).unwrap();
1358            panic!();
1359        });
1360
1361        let mut db = Database::open(tmpfile).unwrap();
1362        assert!(db.check_integrity().unwrap());
1363    }
1364}