redb/tree_store/
btree_base.rs

1use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
2use crate::tree_store::{PageNumber, PageTrackerPolicy};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::borrow::Borrow;
6use std::cmp::Ordering;
7use std::marker::PhantomData;
8use std::mem::size_of;
9use std::ops::Range;
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13pub(crate) const LEAF: u8 = 1;
14pub(crate) const BRANCH: u8 = 2;
15
16pub(crate) type Checksum = u128;
17// Dummy value. Final value will be computed during commit
18pub(crate) const DEFERRED: Checksum = 999;
19
20pub(super) fn leaf_checksum<T: Page>(
21    page: &T,
22    fixed_key_size: Option<usize>,
23    fixed_value_size: Option<usize>,
24) -> Result<Checksum, StorageError> {
25    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
26    let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
27        StorageError::Corrupted(format!(
28            "Leaf page {:?} corrupted. Number of pairs is zero",
29            page.get_page_number()
30        ))
31    })?;
32    let end = accessor.value_end(last_pair).ok_or_else(|| {
33        StorageError::Corrupted(format!(
34            "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
35            page.get_page_number(),
36            last_pair,
37        ))
38    })?;
39    if end > page.memory().len() {
40        Err(StorageError::Corrupted(format!(
41            "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
42            page.get_page_number(),
43            end,
44            page.memory().len()
45        )))
46    } else {
47        Ok(xxh3_checksum(&page.memory()[..end]))
48    }
49}
50
51pub(super) fn branch_checksum<T: Page>(
52    page: &T,
53    fixed_key_size: Option<usize>,
54) -> Result<Checksum, StorageError> {
55    let accessor = BranchAccessor::new(page, fixed_key_size);
56    let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
57        StorageError::Corrupted(format!(
58            "Branch page {:?} corrupted. Number of keys is zero",
59            page.get_page_number()
60        ))
61    })?;
62    let end = accessor.key_end(last_key).ok_or_else(|| {
63        StorageError::Corrupted(format!(
64            "Branch page {:?} corrupted. Can't find offset for key {}",
65            page.get_page_number(),
66            last_key
67        ))
68    })?;
69    if end > page.memory().len() {
70        Err(StorageError::Corrupted(format!(
71            "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
72            page.get_page_number(),
73            end,
74            page.memory().len()
75        )))
76    } else {
77        Ok(xxh3_checksum(&page.memory()[..end]))
78    }
79}
80
81#[derive(Debug, Copy, Clone, Eq, PartialEq)]
82pub(crate) struct BtreeHeader {
83    pub(crate) root: PageNumber,
84    pub(crate) checksum: Checksum,
85    pub(crate) length: u64,
86}
87
88impl BtreeHeader {
89    pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
90        Self {
91            root,
92            checksum,
93            length,
94        }
95    }
96
97    pub(crate) const fn serialized_size() -> usize {
98        PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
99    }
100
101    pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
102        let root =
103            PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
104        let mut offset = PageNumber::serialized_size();
105        let checksum = Checksum::from_le_bytes(
106            bytes[offset..(offset + size_of::<Checksum>())]
107                .try_into()
108                .unwrap(),
109        );
110        offset += size_of::<Checksum>();
111        let length = u64::from_le_bytes(
112            bytes[offset..(offset + size_of::<u64>())]
113                .try_into()
114                .unwrap(),
115        );
116
117        Self {
118            root,
119            checksum,
120            length,
121        }
122    }
123
124    pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
125        let mut result = [0; Self::serialized_size()];
126        result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
127        result[PageNumber::serialized_size()
128            ..(PageNumber::serialized_size() + size_of::<Checksum>())]
129            .copy_from_slice(&self.checksum.to_le_bytes());
130        result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
131            .copy_from_slice(&self.length.to_le_bytes());
132
133        result
134    }
135}
136
137enum OnDrop {
138    None,
139    RemoveEntry {
140        position: usize,
141        fixed_key_size: Option<usize>,
142    },
143}
144
145enum EitherPage {
146    Immutable(PageImpl),
147    Mutable(PageMut),
148    OwnedMemory(Vec<u8>),
149    ArcMemory(Arc<[u8]>),
150}
151
152impl EitherPage {
153    fn memory(&self) -> &[u8] {
154        match self {
155            EitherPage::Immutable(page) => page.memory(),
156            EitherPage::Mutable(page) => page.memory(),
157            EitherPage::OwnedMemory(mem) => mem.as_slice(),
158            EitherPage::ArcMemory(mem) => mem,
159        }
160    }
161}
162
163/// Scoped accessor to data in the database
164///
165/// When this structure is dropped (goes out of scope), the data is released
166pub struct AccessGuard<'a, V: Value + 'static> {
167    page: EitherPage,
168    offset: usize,
169    len: usize,
170    on_drop: OnDrop,
171    _value_type: PhantomData<V>,
172    // Used so that logical references into a Table respect the appropriate lifetime
173    _lifetime: PhantomData<&'a ()>,
174}
175
176impl<V: Value + 'static> AccessGuard<'_, V> {
177    pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
178        Self {
179            page: EitherPage::Immutable(page),
180            offset: range.start,
181            len: range.len(),
182            on_drop: OnDrop::None,
183            _value_type: Default::default(),
184            _lifetime: Default::default(),
185        }
186    }
187
188    pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
189        Self {
190            page: EitherPage::ArcMemory(page),
191            offset: range.start,
192            len: range.len(),
193            on_drop: OnDrop::None,
194            _value_type: Default::default(),
195            _lifetime: Default::default(),
196        }
197    }
198
199    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
200        let len = value.len();
201        Self {
202            page: EitherPage::OwnedMemory(value),
203            offset: 0,
204            len,
205            on_drop: OnDrop::None,
206            _value_type: Default::default(),
207            _lifetime: Default::default(),
208        }
209    }
210
211    pub(super) fn remove_on_drop(
212        page: PageMut,
213        offset: usize,
214        len: usize,
215        position: usize,
216        fixed_key_size: Option<usize>,
217    ) -> Self {
218        Self {
219            page: EitherPage::Mutable(page),
220            offset,
221            len,
222            on_drop: OnDrop::RemoveEntry {
223                position,
224                fixed_key_size,
225            },
226            _value_type: Default::default(),
227            _lifetime: Default::default(),
228        }
229    }
230
231    /// Access the stored value
232    pub fn value(&self) -> V::SelfType<'_> {
233        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
234    }
235}
236
237impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
238    fn drop(&mut self) {
239        match self.on_drop {
240            OnDrop::None => {}
241            OnDrop::RemoveEntry {
242                position,
243                fixed_key_size,
244            } => {
245                if let EitherPage::Mutable(ref mut mut_page) = self.page {
246                    let mut mutator =
247                        LeafMutator::new(mut_page.memory_mut(), fixed_key_size, V::fixed_width());
248                    mutator.remove(position);
249                } else if !thread::panicking() {
250                    unreachable!();
251                }
252            }
253        }
254    }
255}
256
257pub struct AccessGuardMut<'a, V: Value + 'static> {
258    page: PageMut,
259    offset: usize,
260    len: usize,
261    entry_index: usize,
262    parent: Option<(PageMut, usize)>,
263    mem: Arc<TransactionalMemory>,
264    allocated: Arc<Mutex<PageTrackerPolicy>>,
265    root_ref: &'a mut BtreeHeader,
266    key_width: Option<usize>,
267    _value_type: PhantomData<V>,
268}
269
270impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
271    #[allow(clippy::too_many_arguments)]
272    pub(crate) fn new(
273        page: PageMut,
274        offset: usize,
275        len: usize,
276        entry_index: usize,
277        parent: Option<(PageMut, usize)>,
278        mem: Arc<TransactionalMemory>,
279        allocated: Arc<Mutex<PageTrackerPolicy>>,
280        root_ref: &'a mut BtreeHeader,
281        key_width: Option<usize>,
282    ) -> Self {
283        assert!(mem.uncommitted(page.get_page_number()));
284        if let Some((ref parent_page, _)) = parent {
285            assert!(mem.uncommitted(parent_page.get_page_number()));
286        }
287        AccessGuardMut {
288            page,
289            offset,
290            len,
291            entry_index,
292            parent,
293            mem,
294            allocated,
295            root_ref,
296            key_width,
297            _value_type: Default::default(),
298        }
299    }
300
301    /// Access the stored value
302    pub fn value(&self) -> V::SelfType<'_> {
303        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
304    }
305
306    /// Replace the stored value
307    pub fn insert<'v>(&mut self, value: impl Borrow<V::SelfType<'v>>) -> Result<()> {
308        let value_bytes = V::as_bytes(value.borrow());
309
310        // TODO: optimize this to avoid copying the key
311        let key_bytes = {
312            let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
313            accessor.key_unchecked(self.entry_index).to_vec()
314        };
315
316        if LeafMutator::sufficient_insert_inplace_space(
317            &self.page,
318            self.entry_index,
319            true,
320            self.key_width,
321            V::fixed_width(),
322            key_bytes.as_slice(),
323            value_bytes.as_ref(),
324        ) {
325            let mut mutator =
326                LeafMutator::new(self.page.memory_mut(), self.key_width, V::fixed_width());
327            mutator.insert(self.entry_index, true, &key_bytes, value_bytes.as_ref());
328        } else {
329            let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
330            let mut builder = LeafBuilder::new(
331                &self.mem,
332                &self.allocated,
333                accessor.num_pairs(),
334                self.key_width,
335                V::fixed_width(),
336            );
337
338            for i in 0..accessor.num_pairs() {
339                if i == self.entry_index {
340                    builder.push(&key_bytes, value_bytes.as_ref());
341                } else {
342                    let entry = accessor.entry(i).unwrap();
343                    builder.push(entry.key(), entry.value());
344                }
345            }
346
347            let new_page = builder.build()?;
348
349            // Update parent branch page if it exists, otherwise update root
350            if let Some((ref mut parent_page, parent_entry_index)) = self.parent {
351                let mut mutator = BranchMutator::new(parent_page.memory_mut());
352                mutator.write_child_page(parent_entry_index, new_page.get_page_number(), DEFERRED);
353            } else {
354                self.root_ref.root = new_page.get_page_number();
355                self.root_ref.checksum = DEFERRED;
356            }
357
358            let old_page_number = self.page.get_page_number();
359            self.page = new_page;
360            let mut allocated = self.allocated.lock().unwrap();
361            assert!(
362                self.mem
363                    .free_if_uncommitted(old_page_number, &mut allocated)
364            );
365        }
366
367        // Update our page reference to the new page and recalculate offset/length
368        let new_accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
369        let (new_start, new_end) = new_accessor.value_range(self.entry_index).unwrap();
370
371        self.offset = new_start;
372        self.len = new_end - new_start;
373
374        Ok(())
375    }
376}
377
378pub struct AccessGuardMutInPlace<'a, V: Value + 'static> {
379    page: PageMut,
380    offset: usize,
381    len: usize,
382    _value_type: PhantomData<V>,
383    // Used so that logical references into a Table respect the appropriate lifetime
384    _lifetime: PhantomData<&'a ()>,
385}
386
387impl<V: Value + 'static> AccessGuardMutInPlace<'_, V> {
388    pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
389        AccessGuardMutInPlace {
390            page,
391            offset,
392            len,
393            _value_type: Default::default(),
394            _lifetime: Default::default(),
395        }
396    }
397}
398
399impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMutInPlace<'_, V> {
400    fn as_mut(&mut self) -> &mut V::BaseRefType {
401        V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
402    }
403}
404
405// Provides a simple zero-copy way to access entries
406pub struct EntryAccessor<'a> {
407    key: &'a [u8],
408    value: &'a [u8],
409}
410
411impl<'a> EntryAccessor<'a> {
412    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
413        EntryAccessor { key, value }
414    }
415}
416
417impl<'a: 'b, 'b> EntryAccessor<'a> {
418    pub(crate) fn key(&'b self) -> &'a [u8] {
419        self.key
420    }
421
422    pub(crate) fn value(&'b self) -> &'a [u8] {
423        self.value
424    }
425}
426
427// Provides a simple zero-copy way to access a leaf page
428pub(crate) struct LeafAccessor<'a> {
429    page: &'a [u8],
430    fixed_key_size: Option<usize>,
431    fixed_value_size: Option<usize>,
432    num_pairs: usize,
433}
434
435impl<'a> LeafAccessor<'a> {
436    pub(crate) fn new(
437        page: &'a [u8],
438        fixed_key_size: Option<usize>,
439        fixed_value_size: Option<usize>,
440    ) -> Self {
441        debug_assert_eq!(page[0], LEAF);
442        let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
443        LeafAccessor {
444            page,
445            fixed_key_size,
446            fixed_value_size,
447            num_pairs,
448        }
449    }
450
451    pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
452        let mut i = 0;
453        while let Some(entry) = self.entry(i) {
454            eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
455            if include_value {
456                eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
457            }
458            i += 1;
459        }
460    }
461
462    pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
463        // inclusive
464        let mut min_entry = 0;
465        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
466        let mut max_entry = self.num_pairs();
467        while min_entry < max_entry {
468            let mid = min_entry.midpoint(max_entry);
469            let key = self.key_unchecked(mid);
470            match K::compare(query, key) {
471                Ordering::Less => {
472                    max_entry = mid;
473                }
474                Ordering::Equal => {
475                    return (mid, true);
476                }
477                Ordering::Greater => {
478                    min_entry = mid + 1;
479                }
480            }
481        }
482        debug_assert_eq!(min_entry, max_entry);
483        (min_entry, false)
484    }
485
486    pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
487        let (entry, found) = self.position::<K>(query);
488        if found { Some(entry) } else { None }
489    }
490
491    fn key_section_start(&self) -> usize {
492        let mut offset = 4;
493        if self.fixed_key_size.is_none() {
494            offset += size_of::<u32>() * self.num_pairs;
495        }
496        if self.fixed_value_size.is_none() {
497            offset += size_of::<u32>() * self.num_pairs;
498        }
499
500        offset
501    }
502
503    fn key_start(&self, n: usize) -> Option<usize> {
504        if n == 0 {
505            Some(self.key_section_start())
506        } else {
507            self.key_end(n - 1)
508        }
509    }
510
511    fn key_end(&self, n: usize) -> Option<usize> {
512        if n >= self.num_pairs() {
513            None
514        } else {
515            if let Some(fixed) = self.fixed_key_size {
516                return Some(self.key_section_start() + fixed * (n + 1));
517            }
518            let offset = 4 + size_of::<u32>() * n;
519            let end = u32::from_le_bytes(
520                self.page
521                    .get(offset..(offset + size_of::<u32>()))?
522                    .try_into()
523                    .unwrap(),
524            ) as usize;
525            Some(end)
526        }
527    }
528
529    fn value_start(&self, n: usize) -> Option<usize> {
530        if n == 0 {
531            self.key_end(self.num_pairs() - 1)
532        } else {
533            self.value_end(n - 1)
534        }
535    }
536
537    fn value_end(&self, n: usize) -> Option<usize> {
538        if n >= self.num_pairs() {
539            None
540        } else {
541            if let Some(fixed) = self.fixed_value_size {
542                return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
543            }
544            let mut offset = 4 + size_of::<u32>() * n;
545            if self.fixed_key_size.is_none() {
546                offset += size_of::<u32>() * self.num_pairs;
547            }
548            let end = u32::from_le_bytes(
549                self.page
550                    .get(offset..(offset + size_of::<u32>()))?
551                    .try_into()
552                    .unwrap(),
553            ) as usize;
554            Some(end)
555        }
556    }
557
558    pub(crate) fn num_pairs(&self) -> usize {
559        self.num_pairs
560    }
561
562    pub(super) fn offset_of_first_value(&self) -> usize {
563        self.offset_of_value(0).unwrap()
564    }
565
566    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
567        self.value_start(n)
568    }
569
570    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
571        Some((self.value_start(n)?, self.value_end(n)?))
572    }
573
574    // Returns the length of all keys and values between [start, end)
575    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
576        self.length_of_values(start, end) + self.length_of_keys(start, end)
577    }
578
579    fn length_of_values(&self, start: usize, end: usize) -> usize {
580        if end == 0 {
581            return 0;
582        }
583        let end_offset = self.value_end(end - 1).unwrap();
584        let start_offset = self.value_start(start).unwrap();
585        end_offset - start_offset
586    }
587
588    // Returns the length of all keys between [start, end)
589    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
590        if end == 0 {
591            return 0;
592        }
593        let end_offset = self.key_end(end - 1).unwrap();
594        let start_offset = self.key_start(start).unwrap();
595        end_offset - start_offset
596    }
597
598    pub(crate) fn total_length(&self) -> usize {
599        // Values are stored last
600        self.value_end(self.num_pairs() - 1).unwrap()
601    }
602
603    fn key_unchecked(&self, n: usize) -> &[u8] {
604        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
605    }
606
607    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
608        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
609        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
610        Some(EntryAccessor::new(key, value))
611    }
612
613    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
614        let key = self.key_start(n)?..self.key_end(n)?;
615        let value = self.value_start(n)?..self.value_end(n)?;
616        Some((key, value))
617    }
618
619    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
620        self.entry(self.num_pairs() - 1).unwrap()
621    }
622}
623
624pub(super) struct LeafBuilder<'a, 'b> {
625    pairs: Vec<(&'a [u8], &'a [u8])>,
626    fixed_key_size: Option<usize>,
627    fixed_value_size: Option<usize>,
628    total_key_bytes: usize,
629    total_value_bytes: usize,
630    mem: &'b TransactionalMemory,
631    allocated_pages: &'b Mutex<PageTrackerPolicy>,
632}
633
634impl<'a, 'b> LeafBuilder<'a, 'b> {
635    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
636        RawLeafBuilder::required_bytes(
637            num_pairs,
638            keys_values_bytes,
639            self.fixed_key_size,
640            self.fixed_value_size,
641        )
642    }
643
644    pub(super) fn new(
645        mem: &'b TransactionalMemory,
646        allocated_pages: &'b Mutex<PageTrackerPolicy>,
647        capacity: usize,
648        fixed_key_size: Option<usize>,
649        fixed_value_size: Option<usize>,
650    ) -> Self {
651        Self {
652            pairs: Vec::with_capacity(capacity),
653            fixed_key_size,
654            fixed_value_size,
655            total_key_bytes: 0,
656            total_value_bytes: 0,
657            mem,
658            allocated_pages,
659        }
660    }
661
662    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
663        self.total_key_bytes += key.len();
664        self.total_value_bytes += value.len();
665        self.pairs.push((key, value));
666    }
667
668    pub(super) fn push_all_except(
669        &mut self,
670        accessor: &'a LeafAccessor<'_>,
671        except: Option<usize>,
672    ) {
673        for i in 0..accessor.num_pairs() {
674            if let Some(except) = except
675                && except == i
676            {
677                continue;
678            }
679            let entry = accessor.entry(i).unwrap();
680            self.push(entry.key(), entry.value());
681        }
682    }
683
684    pub(super) fn should_split(&self) -> bool {
685        let required_size = self.required_bytes(
686            self.pairs.len(),
687            self.total_key_bytes + self.total_value_bytes,
688        );
689        required_size > self.mem.get_page_size() && self.pairs.len() > 1
690    }
691
692    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
693        let total_size = self.total_key_bytes + self.total_value_bytes;
694        let mut division = 0;
695        let mut first_split_key_bytes = 0;
696        let mut first_split_value_bytes = 0;
697        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
698            first_split_key_bytes += key.len();
699            first_split_value_bytes += value.len();
700            division += 1;
701            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
702                break;
703            }
704        }
705
706        let required_size =
707            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
708        let mut allocated_pages = self.allocated_pages.lock().unwrap();
709        let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
710        let mut builder = RawLeafBuilder::new(
711            page1.memory_mut(),
712            division,
713            self.fixed_key_size,
714            self.fixed_value_size,
715            first_split_key_bytes,
716        );
717        for (key, value) in self.pairs.iter().take(division) {
718            builder.append(key, value);
719        }
720        drop(builder);
721
722        let required_size = self.required_bytes(
723            self.pairs.len() - division,
724            self.total_key_bytes + self.total_value_bytes
725                - first_split_key_bytes
726                - first_split_value_bytes,
727        );
728        let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
729        let mut builder = RawLeafBuilder::new(
730            page2.memory_mut(),
731            self.pairs.len() - division,
732            self.fixed_key_size,
733            self.fixed_value_size,
734            self.total_key_bytes - first_split_key_bytes,
735        );
736        for (key, value) in &self.pairs[division..] {
737            builder.append(key, value);
738        }
739        drop(builder);
740
741        Ok((page1, self.pairs[division - 1].0, page2))
742    }
743
744    pub(super) fn build(self) -> Result<PageMut> {
745        let required_size = self.required_bytes(
746            self.pairs.len(),
747            self.total_key_bytes + self.total_value_bytes,
748        );
749        let mut allocated_pages = self.allocated_pages.lock().unwrap();
750        let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
751        let mut builder = RawLeafBuilder::new(
752            page.memory_mut(),
753            self.pairs.len(),
754            self.fixed_key_size,
755            self.fixed_value_size,
756            self.total_key_bytes,
757        );
758        for (key, value) in self.pairs {
759            builder.append(key, value);
760        }
761        drop(builder);
762        Ok(page)
763    }
764}
765
766// Note the caller is responsible for ensuring that the buffer is large enough
767// and rewriting all fields if any dynamically sized fields are written
768// Layout is:
769// 1 byte: type
770// 1 byte: reserved (padding to 32bits aligned)
771// 2 bytes: num_entries (number of pairs)
772// (optional) repeating (num_entries times):
773// 4 bytes: key_end
774// (optional) repeating (num_entries times):
775// 4 bytes: value_end
776// repeating (num_entries times):
777// * n bytes: key data
778// repeating (num_entries times):
779// * n bytes: value data
780pub(crate) struct RawLeafBuilder<'a> {
781    page: &'a mut [u8],
782    fixed_key_size: Option<usize>,
783    fixed_value_size: Option<usize>,
784    num_pairs: usize,
785    provisioned_key_bytes: usize,
786    pairs_written: usize, // used for debugging
787}
788
789impl<'a> RawLeafBuilder<'a> {
790    pub(crate) fn required_bytes(
791        num_pairs: usize,
792        keys_values_bytes: usize,
793        key_size: Option<usize>,
794        value_size: Option<usize>,
795    ) -> usize {
796        // Page id & header;
797        let mut result = 4;
798        // key & value lengths
799        if key_size.is_none() {
800            result += num_pairs * size_of::<u32>();
801        }
802        if value_size.is_none() {
803            result += num_pairs * size_of::<u32>();
804        }
805        result += keys_values_bytes;
806
807        result
808    }
809
810    pub(crate) fn new(
811        page: &'a mut [u8],
812        num_pairs: usize,
813        fixed_key_size: Option<usize>,
814        fixed_value_size: Option<usize>,
815        key_bytes: usize,
816    ) -> Self {
817        page[0] = LEAF;
818        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
819        #[cfg(debug_assertions)]
820        {
821            // Poison all the key & value offsets, in case the caller forgets to write them
822            let mut last = 4;
823            if fixed_key_size.is_none() {
824                last += size_of::<u32>() * num_pairs;
825            }
826            if fixed_value_size.is_none() {
827                last += size_of::<u32>() * num_pairs;
828            }
829            for x in &mut page[4..last] {
830                *x = 0xFF;
831            }
832        }
833        RawLeafBuilder {
834            page,
835            fixed_key_size,
836            fixed_value_size,
837            num_pairs,
838            provisioned_key_bytes: key_bytes,
839            pairs_written: 0,
840        }
841    }
842
843    fn value_end(&self, n: usize) -> usize {
844        if let Some(fixed) = self.fixed_value_size {
845            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
846        }
847        let mut offset = 4 + size_of::<u32>() * n;
848        if self.fixed_key_size.is_none() {
849            offset += size_of::<u32>() * self.num_pairs;
850        }
851        u32::from_le_bytes(
852            self.page[offset..(offset + size_of::<u32>())]
853                .try_into()
854                .unwrap(),
855        ) as usize
856    }
857
858    fn key_section_start(&self) -> usize {
859        let mut offset = 4;
860        if self.fixed_key_size.is_none() {
861            offset += size_of::<u32>() * self.num_pairs;
862        }
863        if self.fixed_value_size.is_none() {
864            offset += size_of::<u32>() * self.num_pairs;
865        }
866
867        offset
868    }
869
870    fn key_end(&self, n: usize) -> usize {
871        if let Some(fixed) = self.fixed_key_size {
872            return self.key_section_start() + fixed * (n + 1);
873        }
874        let offset = 4 + size_of::<u32>() * n;
875        u32::from_le_bytes(
876            self.page[offset..(offset + size_of::<u32>())]
877                .try_into()
878                .unwrap(),
879        ) as usize
880    }
881
882    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
883        if let Some(key_width) = self.fixed_key_size {
884            assert_eq!(key_width, key.len());
885        }
886        if let Some(value_width) = self.fixed_value_size {
887            assert_eq!(value_width, value.len());
888        }
889        let key_offset = if self.pairs_written == 0 {
890            self.key_section_start()
891        } else {
892            self.key_end(self.pairs_written - 1)
893        };
894        let value_offset = if self.pairs_written == 0 {
895            self.key_section_start() + self.provisioned_key_bytes
896        } else {
897            self.value_end(self.pairs_written - 1)
898        };
899
900        let n = self.pairs_written;
901        if self.fixed_key_size.is_none() {
902            let offset = 4 + size_of::<u32>() * n;
903            self.page[offset..(offset + size_of::<u32>())]
904                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
905        }
906        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
907        let written_key_len = key_offset + key.len() - self.key_section_start();
908        assert!(written_key_len <= self.provisioned_key_bytes);
909
910        if self.fixed_value_size.is_none() {
911            let mut offset = 4 + size_of::<u32>() * n;
912            if self.fixed_key_size.is_none() {
913                offset += size_of::<u32>() * self.num_pairs;
914            }
915            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
916                &u32::try_from(value_offset + value.len())
917                    .unwrap()
918                    .to_le_bytes(),
919            );
920        }
921        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
922        self.pairs_written += 1;
923    }
924}
925
926impl Drop for RawLeafBuilder<'_> {
927    fn drop(&mut self) {
928        if !thread::panicking() {
929            assert_eq!(self.pairs_written, self.num_pairs);
930            assert_eq!(
931                self.key_section_start() + self.provisioned_key_bytes,
932                self.key_end(self.num_pairs - 1)
933            );
934        }
935    }
936}
937
938pub(crate) struct LeafMutator<'b> {
939    page: &'b mut [u8],
940    fixed_key_size: Option<usize>,
941    fixed_value_size: Option<usize>,
942}
943
944impl<'b> LeafMutator<'b> {
945    pub(crate) fn new(
946        page: &'b mut [u8],
947        fixed_key_size: Option<usize>,
948        fixed_value_size: Option<usize>,
949    ) -> Self {
950        assert_eq!(page[0], LEAF);
951        Self {
952            page,
953            fixed_key_size,
954            fixed_value_size,
955        }
956    }
957
958    pub(super) fn sufficient_insert_inplace_space(
959        page: &'_ impl Page,
960        position: usize,
961        overwrite: bool,
962        fixed_key_size: Option<usize>,
963        fixed_value_size: Option<usize>,
964        new_key: &[u8],
965        new_value: &[u8],
966    ) -> bool {
967        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
968        if overwrite {
969            let remaining = page.memory().len() - accessor.total_length();
970            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
971                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
972            required_delta <= isize::try_from(remaining).unwrap()
973        } else {
974            // If this is a large page, only allow in-place appending to avoid write amplification
975            //
976            // Note: this check is also required to avoid inserting an unbounded number of small values
977            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
978            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
979                return false;
980            }
981            let remaining = page.memory().len() - accessor.total_length();
982            let mut required_delta = new_key.len() + new_value.len();
983            if fixed_key_size.is_none() {
984                required_delta += size_of::<u32>();
985            }
986            if fixed_value_size.is_none() {
987                required_delta += size_of::<u32>();
988            }
989            required_delta <= remaining
990        }
991    }
992
993    // Insert the given key, value pair at index i and shift all following pairs to the right
994    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
995        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
996        let required_delta = if overwrite {
997            isize::try_from(key.len() + value.len()).unwrap()
998                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
999        } else {
1000            let mut delta = key.len() + value.len();
1001            if self.fixed_key_size.is_none() {
1002                delta += size_of::<u32>();
1003            }
1004            if self.fixed_value_size.is_none() {
1005                delta += size_of::<u32>();
1006            }
1007            delta.try_into().unwrap()
1008        };
1009        assert!(
1010            isize::try_from(accessor.total_length()).unwrap() + required_delta
1011                <= isize::try_from(self.page.len()).unwrap()
1012        );
1013
1014        let num_pairs = accessor.num_pairs();
1015        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
1016        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1017        let shift_index = if overwrite { i + 1 } else { i };
1018        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
1019        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
1020        let existing_value_len = accessor
1021            .value_range(i)
1022            .map(|(start, end)| end - start)
1023            .unwrap_or_default();
1024
1025        let value_delta = if overwrite {
1026            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
1027        } else {
1028            value.len().try_into().unwrap()
1029        };
1030
1031        // Update all the pointers
1032        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
1033        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
1034            4
1035        } else {
1036            0
1037        };
1038        if !overwrite {
1039            for j in 0..i {
1040                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
1041                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1042                    .try_into()
1043                    .unwrap();
1044                self.update_value_end(j, value_delta);
1045            }
1046        }
1047        for j in i..num_pairs {
1048            if overwrite {
1049                self.update_value_end(j, value_delta);
1050            } else {
1051                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1052                    .try_into()
1053                    .unwrap();
1054                self.update_key_end(j, key_delta);
1055                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
1056                self.update_value_end(j, value_delta);
1057            }
1058        }
1059
1060        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
1061        self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1062
1063        // Right shift the trailing values
1064        let mut dest = if overwrite {
1065            (isize::try_from(shift_value_start).unwrap() + value_delta)
1066                .try_into()
1067                .unwrap()
1068        } else {
1069            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
1070        };
1071        let start = shift_value_start;
1072        let end = last_value_end;
1073        self.page.copy_within(start..end, dest);
1074
1075        // Insert the value
1076        let inserted_value_end: u32 = dest.try_into().unwrap();
1077        dest -= value.len();
1078        self.page[dest..(dest + value.len())].copy_from_slice(value);
1079
1080        if !overwrite {
1081            // Right shift the trailing key data & preceding value data
1082            let start = shift_key_start;
1083            let end = shift_value_start;
1084            dest -= end - start;
1085            self.page.copy_within(start..end, dest);
1086
1087            // Insert the key
1088            let inserted_key_end: u32 = dest.try_into().unwrap();
1089            dest -= key.len();
1090            self.page[dest..(dest + key.len())].copy_from_slice(key);
1091
1092            // Right shift the trailing value pointers & preceding key data
1093            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1094            let end = shift_key_start;
1095            dest -= end - start;
1096            debug_assert_eq!(
1097                dest,
1098                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
1099            );
1100            self.page.copy_within(start..end, dest);
1101
1102            // Insert the value pointer
1103            if self.fixed_value_size.is_none() {
1104                dest -= size_of::<u32>();
1105                self.page[dest..(dest + size_of::<u32>())]
1106                    .copy_from_slice(&inserted_value_end.to_le_bytes());
1107            }
1108
1109            // Right shift the trailing key pointers & preceding value pointers
1110            let start = 4 + key_ptr_size * i;
1111            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1112            dest -= end - start;
1113            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
1114            self.page.copy_within(start..end, dest);
1115
1116            // Insert the key pointer
1117            if self.fixed_key_size.is_none() {
1118                dest -= size_of::<u32>();
1119                self.page[dest..(dest + size_of::<u32>())]
1120                    .copy_from_slice(&inserted_key_end.to_le_bytes());
1121            }
1122            debug_assert_eq!(dest, 4 + key_ptr_size * i);
1123        }
1124    }
1125
1126    pub(super) fn remove(&mut self, i: usize) {
1127        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
1128        let num_pairs = accessor.num_pairs();
1129        assert!(i < num_pairs);
1130        assert!(num_pairs > 1);
1131        let key_start = accessor.key_start(i).unwrap();
1132        let key_end = accessor.key_end(i).unwrap();
1133        let value_start = accessor.value_start(i).unwrap();
1134        let value_end = accessor.value_end(i).unwrap();
1135        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1136
1137        // Update all the pointers
1138        let key_ptr_size = if self.fixed_key_size.is_none() {
1139            size_of::<u32>()
1140        } else {
1141            0
1142        };
1143        let value_ptr_size = if self.fixed_value_size.is_none() {
1144            size_of::<u32>()
1145        } else {
1146            0
1147        };
1148        for j in 0..i {
1149            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1150            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1151                - isize::try_from(key_end - key_start).unwrap();
1152            self.update_value_end(j, value_delta);
1153        }
1154        for j in (i + 1)..num_pairs {
1155            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1156                - isize::try_from(key_end - key_start).unwrap();
1157            self.update_key_end(j, key_delta);
1158            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1159            self.update_value_end(j, value_delta);
1160        }
1161
1162        // Left shift all the pointers & data
1163
1164        let new_num_pairs = num_pairs - 1;
1165        self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1166        // Left shift the trailing key pointers & preceding value pointers
1167        let mut dest = 4 + key_ptr_size * i;
1168        // First trailing key pointer
1169        let start = 4 + key_ptr_size * (i + 1);
1170        // Last preceding value pointer
1171        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1172        self.page.copy_within(start..end, dest);
1173        dest += end - start;
1174        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1175
1176        // Left shift the trailing value pointers & preceding key data
1177        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1178        let end = key_start;
1179        self.page.copy_within(start..end, dest);
1180        dest += end - start;
1181
1182        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1183        debug_assert_eq!(
1184            dest,
1185            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1186        );
1187
1188        // Left shift the trailing key data & preceding value data
1189        let start = key_end;
1190        let end = value_start;
1191        self.page.copy_within(start..end, dest);
1192        dest += end - start;
1193
1194        // Left shift the trailing value data
1195        let preceding_data_len =
1196            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1197        debug_assert_eq!(
1198            dest,
1199            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1200        );
1201        let start = value_end;
1202        let end = last_value_end;
1203        self.page.copy_within(start..end, dest);
1204    }
1205
1206    fn update_key_end(&mut self, i: usize, delta: isize) {
1207        if self.fixed_key_size.is_some() {
1208            return;
1209        }
1210        let offset = 4 + size_of::<u32>() * i;
1211        let mut ptr = u32::from_le_bytes(
1212            self.page[offset..(offset + size_of::<u32>())]
1213                .try_into()
1214                .unwrap(),
1215        );
1216        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1217        self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1218    }
1219
1220    fn update_value_end(&mut self, i: usize, delta: isize) {
1221        if self.fixed_value_size.is_some() {
1222            return;
1223        }
1224        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
1225        let num_pairs = accessor.num_pairs();
1226        let mut offset = 4 + size_of::<u32>() * i;
1227        if self.fixed_key_size.is_none() {
1228            offset += size_of::<u32>() * num_pairs;
1229        }
1230        let mut ptr = u32::from_le_bytes(
1231            self.page[offset..(offset + size_of::<u32>())]
1232                .try_into()
1233                .unwrap(),
1234        );
1235        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1236        self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1237    }
1238}
1239
1240// Provides a simple zero-copy way to access a branch page
1241// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1242pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1243    page: &'b T,
1244    num_keys: usize,
1245    fixed_key_size: Option<usize>,
1246    _page_lifetime: PhantomData<&'a ()>,
1247}
1248
1249impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1250    pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1251        debug_assert_eq!(page.memory()[0], BRANCH);
1252        let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1253        BranchAccessor {
1254            page,
1255            num_keys,
1256            fixed_key_size,
1257            _page_lifetime: Default::default(),
1258        }
1259    }
1260
1261    pub(super) fn print_node<K: Key>(&self) {
1262        eprint!(
1263            "Internal[ (page={:?}), child_0={:?}",
1264            self.page.get_page_number(),
1265            self.child_page(0).unwrap()
1266        );
1267        for i in 0..(self.count_children() - 1) {
1268            if let Some(child) = self.child_page(i + 1) {
1269                let key = self.key(i).unwrap();
1270                eprint!(" key_{i}={:?}", K::from_bytes(key));
1271                eprint!(" child_{}={child:?}", i + 1);
1272            }
1273        }
1274        eprint!("]");
1275    }
1276
1277    pub(crate) fn total_length(&self) -> usize {
1278        // Keys are stored at the end
1279        self.key_end(self.num_keys() - 1).unwrap()
1280    }
1281
1282    pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1283        let mut min_child = 0; // inclusive
1284        let mut max_child = self.num_keys(); // inclusive
1285        while min_child < max_child {
1286            let mid = min_child.midpoint(max_child);
1287            match K::compare(query, self.key(mid).unwrap()) {
1288                Ordering::Less => {
1289                    max_child = mid;
1290                }
1291                Ordering::Equal => {
1292                    return (mid, self.child_page(mid).unwrap());
1293                }
1294                Ordering::Greater => {
1295                    min_child = mid + 1;
1296                }
1297            }
1298        }
1299        debug_assert_eq!(min_child, max_child);
1300
1301        (min_child, self.child_page(min_child).unwrap())
1302    }
1303
1304    fn key_section_start(&self) -> usize {
1305        if self.fixed_key_size.is_none() {
1306            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1307                + size_of::<u32>() * self.num_keys()
1308        } else {
1309            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1310        }
1311    }
1312
1313    fn key_offset(&self, n: usize) -> usize {
1314        if n == 0 {
1315            self.key_section_start()
1316        } else {
1317            self.key_end(n - 1).unwrap()
1318        }
1319    }
1320
1321    fn key_end(&self, n: usize) -> Option<usize> {
1322        if let Some(fixed) = self.fixed_key_size {
1323            return Some(self.key_section_start() + fixed * (n + 1));
1324        }
1325        let offset = 8
1326            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1327            + size_of::<u32>() * n;
1328        Some(u32::from_le_bytes(
1329            self.page
1330                .memory()
1331                .get(offset..(offset + size_of::<u32>()))?
1332                .try_into()
1333                .unwrap(),
1334        ) as usize)
1335    }
1336
1337    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1338        if n >= self.num_keys() {
1339            return None;
1340        }
1341        let offset = self.key_offset(n);
1342        let end = self.key_end(n)?;
1343        Some(&self.page.memory()[offset..end])
1344    }
1345
1346    pub(crate) fn count_children(&self) -> usize {
1347        self.num_keys() + 1
1348    }
1349
1350    pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1351        if n >= self.count_children() {
1352            return None;
1353        }
1354
1355        let offset = 8 + size_of::<Checksum>() * n;
1356        Some(Checksum::from_le_bytes(
1357            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1358                .try_into()
1359                .unwrap(),
1360        ))
1361    }
1362
1363    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1364        if n >= self.count_children() {
1365            return None;
1366        }
1367
1368        let offset =
1369            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1370        Some(PageNumber::from_le_bytes(
1371            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1372                .try_into()
1373                .unwrap(),
1374        ))
1375    }
1376
1377    fn num_keys(&self) -> usize {
1378        self.num_keys
1379    }
1380}
1381
1382pub(super) struct BranchBuilder<'a, 'b> {
1383    children: Vec<(PageNumber, Checksum)>,
1384    keys: Vec<&'a [u8]>,
1385    total_key_bytes: usize,
1386    fixed_key_size: Option<usize>,
1387    mem: &'b TransactionalMemory,
1388    allocated_pages: &'b Mutex<PageTrackerPolicy>,
1389}
1390
1391impl<'a, 'b> BranchBuilder<'a, 'b> {
1392    pub(super) fn new(
1393        mem: &'b TransactionalMemory,
1394        allocated_pages: &'b Mutex<PageTrackerPolicy>,
1395        child_capacity: usize,
1396        fixed_key_size: Option<usize>,
1397    ) -> Self {
1398        Self {
1399            children: Vec::with_capacity(child_capacity),
1400            keys: Vec::with_capacity(child_capacity - 1),
1401            total_key_bytes: 0,
1402            fixed_key_size,
1403            mem,
1404            allocated_pages,
1405        }
1406    }
1407
1408    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1409        self.children[index] = (child, checksum);
1410    }
1411
1412    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1413        self.children.push((child, checksum));
1414    }
1415
1416    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1417        self.keys.push(key);
1418        self.total_key_bytes += key.len();
1419    }
1420
1421    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1422        for i in 0..accessor.count_children() {
1423            let child = accessor.child_page(i).unwrap();
1424            let checksum = accessor.child_checksum(i).unwrap();
1425            self.push_child(child, checksum);
1426        }
1427        for i in 0..(accessor.count_children() - 1) {
1428            self.push_key(accessor.key(i).unwrap());
1429        }
1430    }
1431
1432    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1433        if self.children.len() > 1 {
1434            None
1435        } else {
1436            Some(self.children[0])
1437        }
1438    }
1439
1440    pub(super) fn build(self) -> Result<PageMut> {
1441        assert_eq!(self.children.len(), self.keys.len() + 1);
1442        let size = RawBranchBuilder::required_bytes(
1443            self.keys.len(),
1444            self.total_key_bytes,
1445            self.fixed_key_size,
1446        );
1447        let mut allocated_pages = self.allocated_pages.lock().unwrap();
1448        let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1449        let mut builder =
1450            RawBranchBuilder::new(page.memory_mut(), self.keys.len(), self.fixed_key_size);
1451        builder.write_first_page(self.children[0].0, self.children[0].1);
1452        for i in 1..self.children.len() {
1453            let key = &self.keys[i - 1];
1454            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1455        }
1456        drop(builder);
1457
1458        Ok(page)
1459    }
1460
1461    pub(super) fn should_split(&self) -> bool {
1462        let size = RawBranchBuilder::required_bytes(
1463            self.keys.len(),
1464            self.total_key_bytes,
1465            self.fixed_key_size,
1466        );
1467        size > self.mem.get_page_size() && self.keys.len() >= 3
1468    }
1469
1470    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1471        let mut allocated_pages = self.allocated_pages.lock().unwrap();
1472        assert_eq!(self.children.len(), self.keys.len() + 1);
1473        assert!(self.keys.len() >= 3);
1474        let division = self.keys.len() / 2;
1475        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1476        let division_key = self.keys[division];
1477        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1478
1479        let size =
1480            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1481        let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1482        let mut builder = RawBranchBuilder::new(page1.memory_mut(), division, self.fixed_key_size);
1483        builder.write_first_page(self.children[0].0, self.children[0].1);
1484        for i in 0..division {
1485            let key = &self.keys[i];
1486            builder.write_nth_key(
1487                key.as_ref(),
1488                self.children[i + 1].0,
1489                self.children[i + 1].1,
1490                i,
1491            );
1492        }
1493        drop(builder);
1494
1495        let size = RawBranchBuilder::required_bytes(
1496            self.keys.len() - division - 1,
1497            second_split_key_len,
1498            self.fixed_key_size,
1499        );
1500        let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1501        let mut builder = RawBranchBuilder::new(
1502            page2.memory_mut(),
1503            self.keys.len() - division - 1,
1504            self.fixed_key_size,
1505        );
1506        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1507        for i in (division + 1)..self.keys.len() {
1508            let key = &self.keys[i];
1509            builder.write_nth_key(
1510                key.as_ref(),
1511                self.children[i + 1].0,
1512                self.children[i + 1].1,
1513                i - division - 1,
1514            );
1515        }
1516        drop(builder);
1517
1518        Ok((page1, division_key, page2))
1519    }
1520}
1521
1522// Note the caller is responsible for ensuring that the buffer is large enough
1523// and rewriting all fields if any dynamically sized fields are written
1524// Layout is:
1525// 1 byte: type
1526// 1 byte: padding (padding to 16bits aligned)
1527// 2 bytes: num_keys (number of keys)
1528// 4 byte: padding (padding to 64bits aligned)
1529// repeating (num_keys + 1 times):
1530// 16 bytes: child page checksum
1531// repeating (num_keys + 1 times):
1532// 8 bytes: page number
1533// (optional) repeating (num_keys times):
1534// * 4 bytes: key end. Ending offset of the key, exclusive
1535// repeating (num_keys times):
1536// * n bytes: key data
1537pub(super) struct RawBranchBuilder<'b> {
1538    page: &'b mut [u8],
1539    fixed_key_size: Option<usize>,
1540    num_keys: usize,
1541    keys_written: usize, // used for debugging
1542}
1543
1544impl<'b> RawBranchBuilder<'b> {
1545    pub(super) fn required_bytes(
1546        num_keys: usize,
1547        size_of_keys: usize,
1548        fixed_key_size: Option<usize>,
1549    ) -> usize {
1550        if fixed_key_size.is_none() {
1551            let fixed_size = 8
1552                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1553                + size_of::<u32>() * num_keys;
1554            size_of_keys + fixed_size
1555        } else {
1556            let fixed_size =
1557                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1558            size_of_keys + fixed_size
1559        }
1560    }
1561
1562    // Caller MUST write num_keys values
1563    pub(super) fn new(page: &'b mut [u8], num_keys: usize, fixed_key_size: Option<usize>) -> Self {
1564        assert!(num_keys > 0);
1565        page[0] = BRANCH;
1566        page[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1567        #[cfg(debug_assertions)]
1568        {
1569            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1570            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1571            let last = 8
1572                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1573                + size_of::<u32>() * num_keys;
1574            for x in &mut page[start..last] {
1575                *x = 0xFF;
1576            }
1577        }
1578        RawBranchBuilder {
1579            page,
1580            fixed_key_size,
1581            num_keys,
1582            keys_written: 0,
1583        }
1584    }
1585
1586    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1587        let offset = 8;
1588        self.page[offset..(offset + size_of::<Checksum>())]
1589            .copy_from_slice(&checksum.to_le_bytes());
1590        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1591        self.page[offset..(offset + PageNumber::serialized_size())]
1592            .copy_from_slice(&page_number.to_le_bytes());
1593    }
1594
1595    fn key_section_start(&self) -> usize {
1596        let mut offset =
1597            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1598        if self.fixed_key_size.is_none() {
1599            offset += size_of::<u32>() * self.num_keys;
1600        }
1601
1602        offset
1603    }
1604
1605    fn key_end(&self, n: usize) -> usize {
1606        if let Some(fixed) = self.fixed_key_size {
1607            return self.key_section_start() + fixed * (n + 1);
1608        }
1609        let offset = 8
1610            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1611            + size_of::<u32>() * n;
1612        u32::from_le_bytes(
1613            self.page[offset..(offset + size_of::<u32>())]
1614                .try_into()
1615                .unwrap(),
1616        ) as usize
1617    }
1618
1619    // Write the nth key and page of values greater than this key, but less than or equal to the next
1620    // Caller must write keys & pages in increasing order
1621    pub(super) fn write_nth_key(
1622        &mut self,
1623        key: &[u8],
1624        page_number: PageNumber,
1625        checksum: Checksum,
1626        n: usize,
1627    ) {
1628        assert!(n < self.num_keys);
1629        assert_eq!(n, self.keys_written);
1630        self.keys_written += 1;
1631        let offset = 8 + size_of::<Checksum>() * (n + 1);
1632        self.page[offset..(offset + size_of::<Checksum>())]
1633            .copy_from_slice(&checksum.to_le_bytes());
1634        let offset = 8
1635            + size_of::<Checksum>() * (self.num_keys + 1)
1636            + PageNumber::serialized_size() * (n + 1);
1637        self.page[offset..(offset + PageNumber::serialized_size())]
1638            .copy_from_slice(&page_number.to_le_bytes());
1639
1640        let data_offset = if n > 0 {
1641            self.key_end(n - 1)
1642        } else {
1643            self.key_section_start()
1644        };
1645        if self.fixed_key_size.is_none() {
1646            let offset = 8
1647                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1648                + size_of::<u32>() * n;
1649            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1650                &u32::try_from(data_offset + key.len())
1651                    .unwrap()
1652                    .to_le_bytes(),
1653            );
1654        }
1655
1656        debug_assert!(data_offset > offset);
1657        self.page[data_offset..(data_offset + key.len())].copy_from_slice(key);
1658    }
1659}
1660
1661impl Drop for RawBranchBuilder<'_> {
1662    fn drop(&mut self) {
1663        if !thread::panicking() {
1664            assert_eq!(self.keys_written, self.num_keys);
1665        }
1666    }
1667}
1668
1669pub(crate) struct BranchMutator<'b> {
1670    page: &'b mut [u8],
1671}
1672
1673impl<'b> BranchMutator<'b> {
1674    pub(crate) fn new(page: &'b mut [u8]) -> Self {
1675        assert_eq!(page[0], BRANCH);
1676        Self { page }
1677    }
1678
1679    fn num_keys(&self) -> usize {
1680        u16::from_le_bytes(self.page[2..4].try_into().unwrap()) as usize
1681    }
1682
1683    pub(crate) fn write_child_page(
1684        &mut self,
1685        i: usize,
1686        page_number: PageNumber,
1687        checksum: Checksum,
1688    ) {
1689        debug_assert!(i <= self.num_keys());
1690        let offset = 8 + size_of::<Checksum>() * i;
1691        self.page[offset..(offset + size_of::<Checksum>())]
1692            .copy_from_slice(&checksum.to_le_bytes());
1693        let offset =
1694            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1695        self.page[offset..(offset + PageNumber::serialized_size())]
1696            .copy_from_slice(&page_number.to_le_bytes());
1697    }
1698}