redb/tree_store/
btree_base.rs

1use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
2use crate::tree_store::{PageNumber, PageTrackerPolicy};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::{Arc, Mutex};
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16// Dummy value. Final value will be computed during commit
17pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20    page: &T,
21    fixed_key_size: Option<usize>,
22    fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25    let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26        StorageError::Corrupted(format!(
27            "Leaf page {:?} corrupted. Number of pairs is zero",
28            page.get_page_number()
29        ))
30    })?;
31    let end = accessor.value_end(last_pair).ok_or_else(|| {
32        StorageError::Corrupted(format!(
33            "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34            page.get_page_number(),
35            last_pair,
36        ))
37    })?;
38    if end > page.memory().len() {
39        Err(StorageError::Corrupted(format!(
40            "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41            page.get_page_number(),
42            end,
43            page.memory().len()
44        )))
45    } else {
46        Ok(xxh3_checksum(&page.memory()[..end]))
47    }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51    page: &T,
52    fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54    let accessor = BranchAccessor::new(page, fixed_key_size);
55    let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56        StorageError::Corrupted(format!(
57            "Branch page {:?} corrupted. Number of keys is zero",
58            page.get_page_number()
59        ))
60    })?;
61    let end = accessor.key_end(last_key).ok_or_else(|| {
62        StorageError::Corrupted(format!(
63            "Branch page {:?} corrupted. Can't find offset for key {}",
64            page.get_page_number(),
65            last_key
66        ))
67    })?;
68    if end > page.memory().len() {
69        Err(StorageError::Corrupted(format!(
70            "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71            page.get_page_number(),
72            end,
73            page.memory().len()
74        )))
75    } else {
76        Ok(xxh3_checksum(&page.memory()[..end]))
77    }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82    pub(crate) root: PageNumber,
83    pub(crate) checksum: Checksum,
84    pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88    pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89        Self {
90            root,
91            checksum,
92            length,
93        }
94    }
95
96    pub(crate) const fn serialized_size() -> usize {
97        PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98    }
99
100    pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101        let root =
102            PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103        let mut offset = PageNumber::serialized_size();
104        let checksum = Checksum::from_le_bytes(
105            bytes[offset..(offset + size_of::<Checksum>())]
106                .try_into()
107                .unwrap(),
108        );
109        offset += size_of::<Checksum>();
110        let length = u64::from_le_bytes(
111            bytes[offset..(offset + size_of::<u64>())]
112                .try_into()
113                .unwrap(),
114        );
115
116        Self {
117            root,
118            checksum,
119            length,
120        }
121    }
122
123    pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124        let mut result = [0; Self::serialized_size()];
125        result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126        result[PageNumber::serialized_size()
127            ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128            .copy_from_slice(&self.checksum.to_le_bytes());
129        result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130            .copy_from_slice(&self.length.to_le_bytes());
131
132        result
133    }
134}
135
136enum OnDrop {
137    None,
138    RemoveEntry {
139        position: usize,
140        fixed_key_size: Option<usize>,
141    },
142}
143
144enum EitherPage {
145    Immutable(PageImpl),
146    Mutable(PageMut),
147    OwnedMemory(Vec<u8>),
148    ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152    fn memory(&self) -> &[u8] {
153        match self {
154            EitherPage::Immutable(page) => page.memory(),
155            EitherPage::Mutable(page) => page.memory(),
156            EitherPage::OwnedMemory(mem) => mem.as_slice(),
157            EitherPage::ArcMemory(mem) => mem,
158        }
159    }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163    page: EitherPage,
164    offset: usize,
165    len: usize,
166    on_drop: OnDrop,
167    _value_type: PhantomData<V>,
168    // Used so that logical references into a Table respect the appropriate lifetime
169    _lifetime: PhantomData<&'a ()>,
170}
171
172impl<V: Value + 'static> AccessGuard<'_, V> {
173    pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174        Self {
175            page: EitherPage::Immutable(page),
176            offset: range.start,
177            len: range.len(),
178            on_drop: OnDrop::None,
179            _value_type: Default::default(),
180            _lifetime: Default::default(),
181        }
182    }
183
184    pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185        Self {
186            page: EitherPage::ArcMemory(page),
187            offset: range.start,
188            len: range.len(),
189            on_drop: OnDrop::None,
190            _value_type: Default::default(),
191            _lifetime: Default::default(),
192        }
193    }
194
195    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196        let len = value.len();
197        Self {
198            page: EitherPage::OwnedMemory(value),
199            offset: 0,
200            len,
201            on_drop: OnDrop::None,
202            _value_type: Default::default(),
203            _lifetime: Default::default(),
204        }
205    }
206
207    pub(super) fn remove_on_drop(
208        page: PageMut,
209        offset: usize,
210        len: usize,
211        position: usize,
212        fixed_key_size: Option<usize>,
213    ) -> Self {
214        Self {
215            page: EitherPage::Mutable(page),
216            offset,
217            len,
218            on_drop: OnDrop::RemoveEntry {
219                position,
220                fixed_key_size,
221            },
222            _value_type: Default::default(),
223            _lifetime: Default::default(),
224        }
225    }
226
227    pub fn value(&self) -> V::SelfType<'_> {
228        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229    }
230}
231
232impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
233    fn drop(&mut self) {
234        match self.on_drop {
235            OnDrop::None => {}
236            OnDrop::RemoveEntry {
237                position,
238                fixed_key_size,
239            } => {
240                if let EitherPage::Mutable(ref mut mut_page) = self.page {
241                    let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242                    mutator.remove(position);
243                } else if !thread::panicking() {
244                    unreachable!();
245                }
246            }
247        }
248    }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252    page: PageMut,
253    offset: usize,
254    len: usize,
255    _value_type: PhantomData<V>,
256    // Used so that logical references into a Table respect the appropriate lifetime
257    _lifetime: PhantomData<&'a ()>,
258}
259
260impl<V: Value + 'static> AccessGuardMut<'_, V> {
261    pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262        AccessGuardMut {
263            page,
264            offset,
265            len,
266            _value_type: Default::default(),
267            _lifetime: Default::default(),
268        }
269    }
270}
271
272impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'_, V> {
273    fn as_mut(&mut self) -> &mut V::BaseRefType {
274        V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275    }
276}
277
278// Provides a simple zero-copy way to access entries
279pub struct EntryAccessor<'a> {
280    key: &'a [u8],
281    value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286        EntryAccessor { key, value }
287    }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291    pub(crate) fn key(&'b self) -> &'a [u8] {
292        self.key
293    }
294
295    pub(crate) fn value(&'b self) -> &'a [u8] {
296        self.value
297    }
298}
299
300// Provides a simple zero-copy way to access a leaf page
301pub(crate) struct LeafAccessor<'a> {
302    page: &'a [u8],
303    fixed_key_size: Option<usize>,
304    fixed_value_size: Option<usize>,
305    num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309    pub(crate) fn new(
310        page: &'a [u8],
311        fixed_key_size: Option<usize>,
312        fixed_value_size: Option<usize>,
313    ) -> Self {
314        debug_assert_eq!(page[0], LEAF);
315        let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316        LeafAccessor {
317            page,
318            fixed_key_size,
319            fixed_value_size,
320            num_pairs,
321        }
322    }
323
324    pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325        let mut i = 0;
326        while let Some(entry) = self.entry(i) {
327            eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328            if include_value {
329                eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330            }
331            i += 1;
332        }
333    }
334
335    pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336        // inclusive
337        let mut min_entry = 0;
338        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
339        let mut max_entry = self.num_pairs();
340        while min_entry < max_entry {
341            let mid = min_entry.midpoint(max_entry);
342            let key = self.key_unchecked(mid);
343            match K::compare(query, key) {
344                Ordering::Less => {
345                    max_entry = mid;
346                }
347                Ordering::Equal => {
348                    return (mid, true);
349                }
350                Ordering::Greater => {
351                    min_entry = mid + 1;
352                }
353            }
354        }
355        debug_assert_eq!(min_entry, max_entry);
356        (min_entry, false)
357    }
358
359    pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360        let (entry, found) = self.position::<K>(query);
361        if found { Some(entry) } else { None }
362    }
363
364    fn key_section_start(&self) -> usize {
365        let mut offset = 4;
366        if self.fixed_key_size.is_none() {
367            offset += size_of::<u32>() * self.num_pairs;
368        }
369        if self.fixed_value_size.is_none() {
370            offset += size_of::<u32>() * self.num_pairs;
371        }
372
373        offset
374    }
375
376    fn key_start(&self, n: usize) -> Option<usize> {
377        if n == 0 {
378            Some(self.key_section_start())
379        } else {
380            self.key_end(n - 1)
381        }
382    }
383
384    fn key_end(&self, n: usize) -> Option<usize> {
385        if n >= self.num_pairs() {
386            None
387        } else {
388            if let Some(fixed) = self.fixed_key_size {
389                return Some(self.key_section_start() + fixed * (n + 1));
390            }
391            let offset = 4 + size_of::<u32>() * n;
392            let end = u32::from_le_bytes(
393                self.page
394                    .get(offset..(offset + size_of::<u32>()))?
395                    .try_into()
396                    .unwrap(),
397            ) as usize;
398            Some(end)
399        }
400    }
401
402    fn value_start(&self, n: usize) -> Option<usize> {
403        if n == 0 {
404            self.key_end(self.num_pairs() - 1)
405        } else {
406            self.value_end(n - 1)
407        }
408    }
409
410    fn value_end(&self, n: usize) -> Option<usize> {
411        if n >= self.num_pairs() {
412            None
413        } else {
414            if let Some(fixed) = self.fixed_value_size {
415                return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
416            }
417            let mut offset = 4 + size_of::<u32>() * n;
418            if self.fixed_key_size.is_none() {
419                offset += size_of::<u32>() * self.num_pairs;
420            }
421            let end = u32::from_le_bytes(
422                self.page
423                    .get(offset..(offset + size_of::<u32>()))?
424                    .try_into()
425                    .unwrap(),
426            ) as usize;
427            Some(end)
428        }
429    }
430
431    pub(crate) fn num_pairs(&self) -> usize {
432        self.num_pairs
433    }
434
435    pub(super) fn offset_of_first_value(&self) -> usize {
436        self.offset_of_value(0).unwrap()
437    }
438
439    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
440        self.value_start(n)
441    }
442
443    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
444        Some((self.value_start(n)?, self.value_end(n)?))
445    }
446
447    // Returns the length of all keys and values between [start, end)
448    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
449        self.length_of_values(start, end) + self.length_of_keys(start, end)
450    }
451
452    fn length_of_values(&self, start: usize, end: usize) -> usize {
453        if end == 0 {
454            return 0;
455        }
456        let end_offset = self.value_end(end - 1).unwrap();
457        let start_offset = self.value_start(start).unwrap();
458        end_offset - start_offset
459    }
460
461    // Returns the length of all keys between [start, end)
462    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
463        if end == 0 {
464            return 0;
465        }
466        let end_offset = self.key_end(end - 1).unwrap();
467        let start_offset = self.key_start(start).unwrap();
468        end_offset - start_offset
469    }
470
471    pub(crate) fn total_length(&self) -> usize {
472        // Values are stored last
473        self.value_end(self.num_pairs() - 1).unwrap()
474    }
475
476    fn key_unchecked(&self, n: usize) -> &[u8] {
477        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
478    }
479
480    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
481        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
482        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
483        Some(EntryAccessor::new(key, value))
484    }
485
486    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
487        let key = self.key_start(n)?..self.key_end(n)?;
488        let value = self.value_start(n)?..self.value_end(n)?;
489        Some((key, value))
490    }
491
492    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
493        self.entry(self.num_pairs() - 1).unwrap()
494    }
495}
496
497pub(super) struct LeafBuilder<'a, 'b> {
498    pairs: Vec<(&'a [u8], &'a [u8])>,
499    fixed_key_size: Option<usize>,
500    fixed_value_size: Option<usize>,
501    total_key_bytes: usize,
502    total_value_bytes: usize,
503    mem: &'b TransactionalMemory,
504    allocated_pages: &'b Mutex<PageTrackerPolicy>,
505}
506
507impl<'a, 'b> LeafBuilder<'a, 'b> {
508    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
509        RawLeafBuilder::required_bytes(
510            num_pairs,
511            keys_values_bytes,
512            self.fixed_key_size,
513            self.fixed_value_size,
514        )
515    }
516
517    pub(super) fn new(
518        mem: &'b TransactionalMemory,
519        allocated_pages: &'b Mutex<PageTrackerPolicy>,
520        capacity: usize,
521        fixed_key_size: Option<usize>,
522        fixed_value_size: Option<usize>,
523    ) -> Self {
524        Self {
525            pairs: Vec::with_capacity(capacity),
526            fixed_key_size,
527            fixed_value_size,
528            total_key_bytes: 0,
529            total_value_bytes: 0,
530            mem,
531            allocated_pages,
532        }
533    }
534
535    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
536        self.total_key_bytes += key.len();
537        self.total_value_bytes += value.len();
538        self.pairs.push((key, value));
539    }
540
541    pub(super) fn push_all_except(
542        &mut self,
543        accessor: &'a LeafAccessor<'_>,
544        except: Option<usize>,
545    ) {
546        for i in 0..accessor.num_pairs() {
547            if let Some(except) = except {
548                if except == i {
549                    continue;
550                }
551            }
552            let entry = accessor.entry(i).unwrap();
553            self.push(entry.key(), entry.value());
554        }
555    }
556
557    pub(super) fn should_split(&self) -> bool {
558        let required_size = self.required_bytes(
559            self.pairs.len(),
560            self.total_key_bytes + self.total_value_bytes,
561        );
562        required_size > self.mem.get_page_size() && self.pairs.len() > 1
563    }
564
565    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
566        let total_size = self.total_key_bytes + self.total_value_bytes;
567        let mut division = 0;
568        let mut first_split_key_bytes = 0;
569        let mut first_split_value_bytes = 0;
570        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
571            first_split_key_bytes += key.len();
572            first_split_value_bytes += value.len();
573            division += 1;
574            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
575                break;
576            }
577        }
578
579        let required_size =
580            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
581        let mut allocated_pages = self.allocated_pages.lock().unwrap();
582        let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
583        let mut builder = RawLeafBuilder::new(
584            page1.memory_mut(),
585            division,
586            self.fixed_key_size,
587            self.fixed_value_size,
588            first_split_key_bytes,
589        );
590        for (key, value) in self.pairs.iter().take(division) {
591            builder.append(key, value);
592        }
593        drop(builder);
594
595        let required_size = self.required_bytes(
596            self.pairs.len() - division,
597            self.total_key_bytes + self.total_value_bytes
598                - first_split_key_bytes
599                - first_split_value_bytes,
600        );
601        let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
602        let mut builder = RawLeafBuilder::new(
603            page2.memory_mut(),
604            self.pairs.len() - division,
605            self.fixed_key_size,
606            self.fixed_value_size,
607            self.total_key_bytes - first_split_key_bytes,
608        );
609        for (key, value) in &self.pairs[division..] {
610            builder.append(key, value);
611        }
612        drop(builder);
613
614        Ok((page1, self.pairs[division - 1].0, page2))
615    }
616
617    pub(super) fn build(self) -> Result<PageMut> {
618        let required_size = self.required_bytes(
619            self.pairs.len(),
620            self.total_key_bytes + self.total_value_bytes,
621        );
622        let mut allocated_pages = self.allocated_pages.lock().unwrap();
623        let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
624        let mut builder = RawLeafBuilder::new(
625            page.memory_mut(),
626            self.pairs.len(),
627            self.fixed_key_size,
628            self.fixed_value_size,
629            self.total_key_bytes,
630        );
631        for (key, value) in self.pairs {
632            builder.append(key, value);
633        }
634        drop(builder);
635        Ok(page)
636    }
637}
638
639// Note the caller is responsible for ensuring that the buffer is large enough
640// and rewriting all fields if any dynamically sized fields are written
641// Layout is:
642// 1 byte: type
643// 1 byte: reserved (padding to 32bits aligned)
644// 2 bytes: num_entries (number of pairs)
645// (optional) repeating (num_entries times):
646// 4 bytes: key_end
647// (optional) repeating (num_entries times):
648// 4 bytes: value_end
649// repeating (num_entries times):
650// * n bytes: key data
651// repeating (num_entries times):
652// * n bytes: value data
653pub(crate) struct RawLeafBuilder<'a> {
654    page: &'a mut [u8],
655    fixed_key_size: Option<usize>,
656    fixed_value_size: Option<usize>,
657    num_pairs: usize,
658    provisioned_key_bytes: usize,
659    pairs_written: usize, // used for debugging
660}
661
662impl<'a> RawLeafBuilder<'a> {
663    pub(crate) fn required_bytes(
664        num_pairs: usize,
665        keys_values_bytes: usize,
666        key_size: Option<usize>,
667        value_size: Option<usize>,
668    ) -> usize {
669        // Page id & header;
670        let mut result = 4;
671        // key & value lengths
672        if key_size.is_none() {
673            result += num_pairs * size_of::<u32>();
674        }
675        if value_size.is_none() {
676            result += num_pairs * size_of::<u32>();
677        }
678        result += keys_values_bytes;
679
680        result
681    }
682
683    pub(crate) fn new(
684        page: &'a mut [u8],
685        num_pairs: usize,
686        fixed_key_size: Option<usize>,
687        fixed_value_size: Option<usize>,
688        key_bytes: usize,
689    ) -> Self {
690        page[0] = LEAF;
691        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
692        #[cfg(debug_assertions)]
693        {
694            // Poison all the key & value offsets, in case the caller forgets to write them
695            let mut last = 4;
696            if fixed_key_size.is_none() {
697                last += size_of::<u32>() * num_pairs;
698            }
699            if fixed_value_size.is_none() {
700                last += size_of::<u32>() * num_pairs;
701            }
702            for x in &mut page[4..last] {
703                *x = 0xFF;
704            }
705        }
706        RawLeafBuilder {
707            page,
708            fixed_key_size,
709            fixed_value_size,
710            num_pairs,
711            provisioned_key_bytes: key_bytes,
712            pairs_written: 0,
713        }
714    }
715
716    fn value_end(&self, n: usize) -> usize {
717        if let Some(fixed) = self.fixed_value_size {
718            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
719        }
720        let mut offset = 4 + size_of::<u32>() * n;
721        if self.fixed_key_size.is_none() {
722            offset += size_of::<u32>() * self.num_pairs;
723        }
724        u32::from_le_bytes(
725            self.page[offset..(offset + size_of::<u32>())]
726                .try_into()
727                .unwrap(),
728        ) as usize
729    }
730
731    fn key_section_start(&self) -> usize {
732        let mut offset = 4;
733        if self.fixed_key_size.is_none() {
734            offset += size_of::<u32>() * self.num_pairs;
735        }
736        if self.fixed_value_size.is_none() {
737            offset += size_of::<u32>() * self.num_pairs;
738        }
739
740        offset
741    }
742
743    fn key_end(&self, n: usize) -> usize {
744        if let Some(fixed) = self.fixed_key_size {
745            return self.key_section_start() + fixed * (n + 1);
746        }
747        let offset = 4 + size_of::<u32>() * n;
748        u32::from_le_bytes(
749            self.page[offset..(offset + size_of::<u32>())]
750                .try_into()
751                .unwrap(),
752        ) as usize
753    }
754
755    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
756        if let Some(key_width) = self.fixed_key_size {
757            assert_eq!(key_width, key.len());
758        }
759        if let Some(value_width) = self.fixed_value_size {
760            assert_eq!(value_width, value.len());
761        }
762        let key_offset = if self.pairs_written == 0 {
763            self.key_section_start()
764        } else {
765            self.key_end(self.pairs_written - 1)
766        };
767        let value_offset = if self.pairs_written == 0 {
768            self.key_section_start() + self.provisioned_key_bytes
769        } else {
770            self.value_end(self.pairs_written - 1)
771        };
772
773        let n = self.pairs_written;
774        if self.fixed_key_size.is_none() {
775            let offset = 4 + size_of::<u32>() * n;
776            self.page[offset..(offset + size_of::<u32>())]
777                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
778        }
779        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
780        let written_key_len = key_offset + key.len() - self.key_section_start();
781        assert!(written_key_len <= self.provisioned_key_bytes);
782
783        if self.fixed_value_size.is_none() {
784            let mut offset = 4 + size_of::<u32>() * n;
785            if self.fixed_key_size.is_none() {
786                offset += size_of::<u32>() * self.num_pairs;
787            }
788            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
789                &u32::try_from(value_offset + value.len())
790                    .unwrap()
791                    .to_le_bytes(),
792            );
793        }
794        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
795        self.pairs_written += 1;
796    }
797}
798
799impl Drop for RawLeafBuilder<'_> {
800    fn drop(&mut self) {
801        if !thread::panicking() {
802            assert_eq!(self.pairs_written, self.num_pairs);
803            assert_eq!(
804                self.key_section_start() + self.provisioned_key_bytes,
805                self.key_end(self.num_pairs - 1)
806            );
807        }
808    }
809}
810
811pub(crate) struct LeafMutator<'b> {
812    page: &'b mut PageMut,
813    fixed_key_size: Option<usize>,
814    fixed_value_size: Option<usize>,
815}
816
817impl<'b> LeafMutator<'b> {
818    pub(crate) fn new(
819        page: &'b mut PageMut,
820        fixed_key_size: Option<usize>,
821        fixed_value_size: Option<usize>,
822    ) -> Self {
823        assert_eq!(page.memory_mut()[0], LEAF);
824        Self {
825            page,
826            fixed_key_size,
827            fixed_value_size,
828        }
829    }
830
831    pub(super) fn sufficient_insert_inplace_space(
832        page: &'_ PageImpl,
833        position: usize,
834        overwrite: bool,
835        fixed_key_size: Option<usize>,
836        fixed_value_size: Option<usize>,
837        new_key: &[u8],
838        new_value: &[u8],
839    ) -> bool {
840        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
841        if overwrite {
842            let remaining = page.memory().len() - accessor.total_length();
843            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
844                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
845            required_delta <= isize::try_from(remaining).unwrap()
846        } else {
847            // If this is a large page, only allow in-place appending to avoid write amplification
848            //
849            // Note: this check is also required to avoid inserting an unbounded number of small values
850            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
851            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
852                return false;
853            }
854            let remaining = page.memory().len() - accessor.total_length();
855            let mut required_delta = new_key.len() + new_value.len();
856            if fixed_key_size.is_none() {
857                required_delta += size_of::<u32>();
858            }
859            if fixed_value_size.is_none() {
860                required_delta += size_of::<u32>();
861            }
862            required_delta <= remaining
863        }
864    }
865
866    // Insert the given key, value pair at index i and shift all following pairs to the right
867    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
868        let accessor = LeafAccessor::new(
869            self.page.memory(),
870            self.fixed_key_size,
871            self.fixed_value_size,
872        );
873        let required_delta = if overwrite {
874            isize::try_from(key.len() + value.len()).unwrap()
875                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
876        } else {
877            let mut delta = key.len() + value.len();
878            if self.fixed_key_size.is_none() {
879                delta += size_of::<u32>();
880            }
881            if self.fixed_value_size.is_none() {
882                delta += size_of::<u32>();
883            }
884            delta.try_into().unwrap()
885        };
886        assert!(
887            isize::try_from(accessor.total_length()).unwrap() + required_delta
888                <= isize::try_from(self.page.memory().len()).unwrap()
889        );
890
891        let num_pairs = accessor.num_pairs();
892        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
893        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
894        let shift_index = if overwrite { i + 1 } else { i };
895        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
896        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
897        let existing_value_len = accessor
898            .value_range(i)
899            .map(|(start, end)| end - start)
900            .unwrap_or_default();
901
902        let value_delta = if overwrite {
903            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
904        } else {
905            value.len().try_into().unwrap()
906        };
907
908        // Update all the pointers
909        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
910        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
911            4
912        } else {
913            0
914        };
915        if !overwrite {
916            for j in 0..i {
917                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
918                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
919                    .try_into()
920                    .unwrap();
921                self.update_value_end(j, value_delta);
922            }
923        }
924        for j in i..num_pairs {
925            if overwrite {
926                self.update_value_end(j, value_delta);
927            } else {
928                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
929                    .try_into()
930                    .unwrap();
931                self.update_key_end(j, key_delta);
932                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
933                self.update_value_end(j, value_delta);
934            }
935        }
936
937        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
938        self.page.memory_mut()[2..4]
939            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
940
941        // Right shift the trailing values
942        let mut dest = if overwrite {
943            (isize::try_from(shift_value_start).unwrap() + value_delta)
944                .try_into()
945                .unwrap()
946        } else {
947            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
948        };
949        let start = shift_value_start;
950        let end = last_value_end;
951        self.page.memory_mut().copy_within(start..end, dest);
952
953        // Insert the value
954        let inserted_value_end: u32 = dest.try_into().unwrap();
955        dest -= value.len();
956        self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
957
958        if !overwrite {
959            // Right shift the trailing key data & preceding value data
960            let start = shift_key_start;
961            let end = shift_value_start;
962            dest -= end - start;
963            self.page.memory_mut().copy_within(start..end, dest);
964
965            // Insert the key
966            let inserted_key_end: u32 = dest.try_into().unwrap();
967            dest -= key.len();
968            self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
969
970            // Right shift the trailing value pointers & preceding key data
971            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
972            let end = shift_key_start;
973            dest -= end - start;
974            debug_assert_eq!(
975                dest,
976                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
977            );
978            self.page.memory_mut().copy_within(start..end, dest);
979
980            // Insert the value pointer
981            if self.fixed_value_size.is_none() {
982                dest -= size_of::<u32>();
983                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
984                    .copy_from_slice(&inserted_value_end.to_le_bytes());
985            }
986
987            // Right shift the trailing key pointers & preceding value pointers
988            let start = 4 + key_ptr_size * i;
989            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
990            dest -= end - start;
991            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
992            self.page.memory_mut().copy_within(start..end, dest);
993
994            // Insert the key pointer
995            if self.fixed_key_size.is_none() {
996                dest -= size_of::<u32>();
997                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
998                    .copy_from_slice(&inserted_key_end.to_le_bytes());
999            }
1000            debug_assert_eq!(dest, 4 + key_ptr_size * i);
1001        }
1002    }
1003
1004    pub(super) fn remove(&mut self, i: usize) {
1005        let accessor = LeafAccessor::new(
1006            self.page.memory(),
1007            self.fixed_key_size,
1008            self.fixed_value_size,
1009        );
1010        let num_pairs = accessor.num_pairs();
1011        assert!(i < num_pairs);
1012        assert!(num_pairs > 1);
1013        let key_start = accessor.key_start(i).unwrap();
1014        let key_end = accessor.key_end(i).unwrap();
1015        let value_start = accessor.value_start(i).unwrap();
1016        let value_end = accessor.value_end(i).unwrap();
1017        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1018
1019        // Update all the pointers
1020        let key_ptr_size = if self.fixed_key_size.is_none() {
1021            size_of::<u32>()
1022        } else {
1023            0
1024        };
1025        let value_ptr_size = if self.fixed_value_size.is_none() {
1026            size_of::<u32>()
1027        } else {
1028            0
1029        };
1030        for j in 0..i {
1031            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1032            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1033                - isize::try_from(key_end - key_start).unwrap();
1034            self.update_value_end(j, value_delta);
1035        }
1036        for j in (i + 1)..num_pairs {
1037            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1038                - isize::try_from(key_end - key_start).unwrap();
1039            self.update_key_end(j, key_delta);
1040            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1041            self.update_value_end(j, value_delta);
1042        }
1043
1044        // Left shift all the pointers & data
1045
1046        let new_num_pairs = num_pairs - 1;
1047        self.page.memory_mut()[2..4]
1048            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1049        // Left shift the trailing key pointers & preceding value pointers
1050        let mut dest = 4 + key_ptr_size * i;
1051        // First trailing key pointer
1052        let start = 4 + key_ptr_size * (i + 1);
1053        // Last preceding value pointer
1054        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1055        self.page.memory_mut().copy_within(start..end, dest);
1056        dest += end - start;
1057        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1058
1059        // Left shift the trailing value pointers & preceding key data
1060        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1061        let end = key_start;
1062        self.page.memory_mut().copy_within(start..end, dest);
1063        dest += end - start;
1064
1065        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1066        debug_assert_eq!(
1067            dest,
1068            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1069        );
1070
1071        // Left shift the trailing key data & preceding value data
1072        let start = key_end;
1073        let end = value_start;
1074        self.page.memory_mut().copy_within(start..end, dest);
1075        dest += end - start;
1076
1077        // Left shift the trailing value data
1078        let preceding_data_len =
1079            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1080        debug_assert_eq!(
1081            dest,
1082            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1083        );
1084        let start = value_end;
1085        let end = last_value_end;
1086        self.page.memory_mut().copy_within(start..end, dest);
1087    }
1088
1089    fn update_key_end(&mut self, i: usize, delta: isize) {
1090        if self.fixed_key_size.is_some() {
1091            return;
1092        }
1093        let offset = 4 + size_of::<u32>() * i;
1094        let mut ptr = u32::from_le_bytes(
1095            self.page.memory()[offset..(offset + size_of::<u32>())]
1096                .try_into()
1097                .unwrap(),
1098        );
1099        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1100        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1101            .copy_from_slice(&ptr.to_le_bytes());
1102    }
1103
1104    fn update_value_end(&mut self, i: usize, delta: isize) {
1105        if self.fixed_value_size.is_some() {
1106            return;
1107        }
1108        let accessor = LeafAccessor::new(
1109            self.page.memory(),
1110            self.fixed_key_size,
1111            self.fixed_value_size,
1112        );
1113        let num_pairs = accessor.num_pairs();
1114        let mut offset = 4 + size_of::<u32>() * i;
1115        if self.fixed_key_size.is_none() {
1116            offset += size_of::<u32>() * num_pairs;
1117        }
1118        let mut ptr = u32::from_le_bytes(
1119            self.page.memory()[offset..(offset + size_of::<u32>())]
1120                .try_into()
1121                .unwrap(),
1122        );
1123        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1124        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1125            .copy_from_slice(&ptr.to_le_bytes());
1126    }
1127}
1128
1129// Provides a simple zero-copy way to access a branch page
1130// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1131pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1132    page: &'b T,
1133    num_keys: usize,
1134    fixed_key_size: Option<usize>,
1135    _page_lifetime: PhantomData<&'a ()>,
1136}
1137
1138impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1139    pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1140        debug_assert_eq!(page.memory()[0], BRANCH);
1141        let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1142        BranchAccessor {
1143            page,
1144            num_keys,
1145            fixed_key_size,
1146            _page_lifetime: Default::default(),
1147        }
1148    }
1149
1150    pub(super) fn print_node<K: Key>(&self) {
1151        eprint!(
1152            "Internal[ (page={:?}), child_0={:?}",
1153            self.page.get_page_number(),
1154            self.child_page(0).unwrap()
1155        );
1156        for i in 0..(self.count_children() - 1) {
1157            if let Some(child) = self.child_page(i + 1) {
1158                let key = self.key(i).unwrap();
1159                eprint!(" key_{i}={:?}", K::from_bytes(key));
1160                eprint!(" child_{}={child:?}", i + 1);
1161            }
1162        }
1163        eprint!("]");
1164    }
1165
1166    pub(crate) fn total_length(&self) -> usize {
1167        // Keys are stored at the end
1168        self.key_end(self.num_keys() - 1).unwrap()
1169    }
1170
1171    pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1172        let mut min_child = 0; // inclusive
1173        let mut max_child = self.num_keys(); // inclusive
1174        while min_child < max_child {
1175            let mid = min_child.midpoint(max_child);
1176            match K::compare(query, self.key(mid).unwrap()) {
1177                Ordering::Less => {
1178                    max_child = mid;
1179                }
1180                Ordering::Equal => {
1181                    return (mid, self.child_page(mid).unwrap());
1182                }
1183                Ordering::Greater => {
1184                    min_child = mid + 1;
1185                }
1186            }
1187        }
1188        debug_assert_eq!(min_child, max_child);
1189
1190        (min_child, self.child_page(min_child).unwrap())
1191    }
1192
1193    fn key_section_start(&self) -> usize {
1194        if self.fixed_key_size.is_none() {
1195            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1196                + size_of::<u32>() * self.num_keys()
1197        } else {
1198            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1199        }
1200    }
1201
1202    fn key_offset(&self, n: usize) -> usize {
1203        if n == 0 {
1204            self.key_section_start()
1205        } else {
1206            self.key_end(n - 1).unwrap()
1207        }
1208    }
1209
1210    fn key_end(&self, n: usize) -> Option<usize> {
1211        if let Some(fixed) = self.fixed_key_size {
1212            return Some(self.key_section_start() + fixed * (n + 1));
1213        }
1214        let offset = 8
1215            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1216            + size_of::<u32>() * n;
1217        Some(u32::from_le_bytes(
1218            self.page
1219                .memory()
1220                .get(offset..(offset + size_of::<u32>()))?
1221                .try_into()
1222                .unwrap(),
1223        ) as usize)
1224    }
1225
1226    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1227        if n >= self.num_keys() {
1228            return None;
1229        }
1230        let offset = self.key_offset(n);
1231        let end = self.key_end(n)?;
1232        Some(&self.page.memory()[offset..end])
1233    }
1234
1235    pub(crate) fn count_children(&self) -> usize {
1236        self.num_keys() + 1
1237    }
1238
1239    pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1240        if n >= self.count_children() {
1241            return None;
1242        }
1243
1244        let offset = 8 + size_of::<Checksum>() * n;
1245        Some(Checksum::from_le_bytes(
1246            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1247                .try_into()
1248                .unwrap(),
1249        ))
1250    }
1251
1252    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1253        if n >= self.count_children() {
1254            return None;
1255        }
1256
1257        let offset =
1258            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1259        Some(PageNumber::from_le_bytes(
1260            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1261                .try_into()
1262                .unwrap(),
1263        ))
1264    }
1265
1266    fn num_keys(&self) -> usize {
1267        self.num_keys
1268    }
1269}
1270
1271pub(super) struct BranchBuilder<'a, 'b> {
1272    children: Vec<(PageNumber, Checksum)>,
1273    keys: Vec<&'a [u8]>,
1274    total_key_bytes: usize,
1275    fixed_key_size: Option<usize>,
1276    mem: &'b TransactionalMemory,
1277    allocated_pages: &'b Mutex<PageTrackerPolicy>,
1278}
1279
1280impl<'a, 'b> BranchBuilder<'a, 'b> {
1281    pub(super) fn new(
1282        mem: &'b TransactionalMemory,
1283        allocated_pages: &'b Mutex<PageTrackerPolicy>,
1284        child_capacity: usize,
1285        fixed_key_size: Option<usize>,
1286    ) -> Self {
1287        Self {
1288            children: Vec::with_capacity(child_capacity),
1289            keys: Vec::with_capacity(child_capacity - 1),
1290            total_key_bytes: 0,
1291            fixed_key_size,
1292            mem,
1293            allocated_pages,
1294        }
1295    }
1296
1297    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1298        self.children[index] = (child, checksum);
1299    }
1300
1301    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1302        self.children.push((child, checksum));
1303    }
1304
1305    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1306        self.keys.push(key);
1307        self.total_key_bytes += key.len();
1308    }
1309
1310    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1311        for i in 0..accessor.count_children() {
1312            let child = accessor.child_page(i).unwrap();
1313            let checksum = accessor.child_checksum(i).unwrap();
1314            self.push_child(child, checksum);
1315        }
1316        for i in 0..(accessor.count_children() - 1) {
1317            self.push_key(accessor.key(i).unwrap());
1318        }
1319    }
1320
1321    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1322        if self.children.len() > 1 {
1323            None
1324        } else {
1325            Some(self.children[0])
1326        }
1327    }
1328
1329    pub(super) fn build(self) -> Result<PageMut> {
1330        assert_eq!(self.children.len(), self.keys.len() + 1);
1331        let size = RawBranchBuilder::required_bytes(
1332            self.keys.len(),
1333            self.total_key_bytes,
1334            self.fixed_key_size,
1335        );
1336        let mut allocated_pages = self.allocated_pages.lock().unwrap();
1337        let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1338        let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1339        builder.write_first_page(self.children[0].0, self.children[0].1);
1340        for i in 1..self.children.len() {
1341            let key = &self.keys[i - 1];
1342            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1343        }
1344        drop(builder);
1345
1346        Ok(page)
1347    }
1348
1349    pub(super) fn should_split(&self) -> bool {
1350        let size = RawBranchBuilder::required_bytes(
1351            self.keys.len(),
1352            self.total_key_bytes,
1353            self.fixed_key_size,
1354        );
1355        size > self.mem.get_page_size() && self.keys.len() >= 3
1356    }
1357
1358    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1359        let mut allocated_pages = self.allocated_pages.lock().unwrap();
1360        assert_eq!(self.children.len(), self.keys.len() + 1);
1361        assert!(self.keys.len() >= 3);
1362        let division = self.keys.len() / 2;
1363        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1364        let division_key = self.keys[division];
1365        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1366
1367        let size =
1368            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1369        let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1370        let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1371        builder.write_first_page(self.children[0].0, self.children[0].1);
1372        for i in 0..division {
1373            let key = &self.keys[i];
1374            builder.write_nth_key(
1375                key.as_ref(),
1376                self.children[i + 1].0,
1377                self.children[i + 1].1,
1378                i,
1379            );
1380        }
1381        drop(builder);
1382
1383        let size = RawBranchBuilder::required_bytes(
1384            self.keys.len() - division - 1,
1385            second_split_key_len,
1386            self.fixed_key_size,
1387        );
1388        let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1389        let mut builder = RawBranchBuilder::new(
1390            &mut page2,
1391            self.keys.len() - division - 1,
1392            self.fixed_key_size,
1393        );
1394        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1395        for i in (division + 1)..self.keys.len() {
1396            let key = &self.keys[i];
1397            builder.write_nth_key(
1398                key.as_ref(),
1399                self.children[i + 1].0,
1400                self.children[i + 1].1,
1401                i - division - 1,
1402            );
1403        }
1404        drop(builder);
1405
1406        Ok((page1, division_key, page2))
1407    }
1408}
1409
1410// Note the caller is responsible for ensuring that the buffer is large enough
1411// and rewriting all fields if any dynamically sized fields are written
1412// Layout is:
1413// 1 byte: type
1414// 1 byte: padding (padding to 16bits aligned)
1415// 2 bytes: num_keys (number of keys)
1416// 4 byte: padding (padding to 64bits aligned)
1417// repeating (num_keys + 1 times):
1418// 16 bytes: child page checksum
1419// repeating (num_keys + 1 times):
1420// 8 bytes: page number
1421// (optional) repeating (num_keys times):
1422// * 4 bytes: key end. Ending offset of the key, exclusive
1423// repeating (num_keys times):
1424// * n bytes: key data
1425pub(super) struct RawBranchBuilder<'b> {
1426    page: &'b mut PageMut,
1427    fixed_key_size: Option<usize>,
1428    num_keys: usize,
1429    keys_written: usize, // used for debugging
1430}
1431
1432impl<'b> RawBranchBuilder<'b> {
1433    pub(super) fn required_bytes(
1434        num_keys: usize,
1435        size_of_keys: usize,
1436        fixed_key_size: Option<usize>,
1437    ) -> usize {
1438        if fixed_key_size.is_none() {
1439            let fixed_size = 8
1440                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1441                + size_of::<u32>() * num_keys;
1442            size_of_keys + fixed_size
1443        } else {
1444            let fixed_size =
1445                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1446            size_of_keys + fixed_size
1447        }
1448    }
1449
1450    // Caller MUST write num_keys values
1451    pub(super) fn new(
1452        page: &'b mut PageMut,
1453        num_keys: usize,
1454        fixed_key_size: Option<usize>,
1455    ) -> Self {
1456        assert!(num_keys > 0);
1457        page.memory_mut()[0] = BRANCH;
1458        page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1459        #[cfg(debug_assertions)]
1460        {
1461            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1462            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1463            let last = 8
1464                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1465                + size_of::<u32>() * num_keys;
1466            for x in &mut page.memory_mut()[start..last] {
1467                *x = 0xFF;
1468            }
1469        }
1470        RawBranchBuilder {
1471            page,
1472            fixed_key_size,
1473            num_keys,
1474            keys_written: 0,
1475        }
1476    }
1477
1478    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1479        let offset = 8;
1480        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1481            .copy_from_slice(&checksum.to_le_bytes());
1482        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1483        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1484            .copy_from_slice(&page_number.to_le_bytes());
1485    }
1486
1487    fn key_section_start(&self) -> usize {
1488        let mut offset =
1489            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1490        if self.fixed_key_size.is_none() {
1491            offset += size_of::<u32>() * self.num_keys;
1492        }
1493
1494        offset
1495    }
1496
1497    fn key_end(&self, n: usize) -> usize {
1498        if let Some(fixed) = self.fixed_key_size {
1499            return self.key_section_start() + fixed * (n + 1);
1500        }
1501        let offset = 8
1502            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1503            + size_of::<u32>() * n;
1504        u32::from_le_bytes(
1505            self.page.memory()[offset..(offset + size_of::<u32>())]
1506                .try_into()
1507                .unwrap(),
1508        ) as usize
1509    }
1510
1511    // Write the nth key and page of values greater than this key, but less than or equal to the next
1512    // Caller must write keys & pages in increasing order
1513    pub(super) fn write_nth_key(
1514        &mut self,
1515        key: &[u8],
1516        page_number: PageNumber,
1517        checksum: Checksum,
1518        n: usize,
1519    ) {
1520        assert!(n < self.num_keys);
1521        assert_eq!(n, self.keys_written);
1522        self.keys_written += 1;
1523        let offset = 8 + size_of::<Checksum>() * (n + 1);
1524        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1525            .copy_from_slice(&checksum.to_le_bytes());
1526        let offset = 8
1527            + size_of::<Checksum>() * (self.num_keys + 1)
1528            + PageNumber::serialized_size() * (n + 1);
1529        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1530            .copy_from_slice(&page_number.to_le_bytes());
1531
1532        let data_offset = if n > 0 {
1533            self.key_end(n - 1)
1534        } else {
1535            self.key_section_start()
1536        };
1537        if self.fixed_key_size.is_none() {
1538            let offset = 8
1539                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1540                + size_of::<u32>() * n;
1541            self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1542                &u32::try_from(data_offset + key.len())
1543                    .unwrap()
1544                    .to_le_bytes(),
1545            );
1546        }
1547
1548        debug_assert!(data_offset > offset);
1549        self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1550    }
1551}
1552
1553impl Drop for RawBranchBuilder<'_> {
1554    fn drop(&mut self) {
1555        if !thread::panicking() {
1556            assert_eq!(self.keys_written, self.num_keys);
1557        }
1558    }
1559}
1560
1561pub(crate) struct BranchMutator<'b> {
1562    page: &'b mut PageMut,
1563}
1564
1565impl<'b> BranchMutator<'b> {
1566    pub(crate) fn new(page: &'b mut PageMut) -> Self {
1567        assert_eq!(page.memory()[0], BRANCH);
1568        Self { page }
1569    }
1570
1571    fn num_keys(&self) -> usize {
1572        u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1573    }
1574
1575    pub(crate) fn write_child_page(
1576        &mut self,
1577        i: usize,
1578        page_number: PageNumber,
1579        checksum: Checksum,
1580    ) {
1581        debug_assert!(i <= self.num_keys());
1582        let offset = 8 + size_of::<Checksum>() * i;
1583        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1584            .copy_from_slice(&checksum.to_le_bytes());
1585        let offset =
1586            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1587        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1588            .copy_from_slice(&page_number.to_le_bytes());
1589    }
1590}