1use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
2use crate::tree_store::{PageNumber, PageTrackerPolicy};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::borrow::Borrow;
6use std::cmp::Ordering;
7use std::marker::PhantomData;
8use std::mem::size_of;
9use std::ops::Range;
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13pub(crate) const LEAF: u8 = 1;
14pub(crate) const BRANCH: u8 = 2;
15
16pub(crate) type Checksum = u128;
17pub(crate) const DEFERRED: Checksum = 999;
19
20pub(super) fn leaf_checksum<T: Page>(
21 page: &T,
22 fixed_key_size: Option<usize>,
23 fixed_value_size: Option<usize>,
24) -> Result<Checksum, StorageError> {
25 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
26 let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
27 StorageError::Corrupted(format!(
28 "Leaf page {:?} corrupted. Number of pairs is zero",
29 page.get_page_number()
30 ))
31 })?;
32 let end = accessor.value_end(last_pair).ok_or_else(|| {
33 StorageError::Corrupted(format!(
34 "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
35 page.get_page_number(),
36 last_pair,
37 ))
38 })?;
39 if end > page.memory().len() {
40 Err(StorageError::Corrupted(format!(
41 "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
42 page.get_page_number(),
43 end,
44 page.memory().len()
45 )))
46 } else {
47 Ok(xxh3_checksum(&page.memory()[..end]))
48 }
49}
50
51pub(super) fn branch_checksum<T: Page>(
52 page: &T,
53 fixed_key_size: Option<usize>,
54) -> Result<Checksum, StorageError> {
55 let accessor = BranchAccessor::new(page, fixed_key_size);
56 let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
57 StorageError::Corrupted(format!(
58 "Branch page {:?} corrupted. Number of keys is zero",
59 page.get_page_number()
60 ))
61 })?;
62 let end = accessor.key_end(last_key).ok_or_else(|| {
63 StorageError::Corrupted(format!(
64 "Branch page {:?} corrupted. Can't find offset for key {}",
65 page.get_page_number(),
66 last_key
67 ))
68 })?;
69 if end > page.memory().len() {
70 Err(StorageError::Corrupted(format!(
71 "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
72 page.get_page_number(),
73 end,
74 page.memory().len()
75 )))
76 } else {
77 Ok(xxh3_checksum(&page.memory()[..end]))
78 }
79}
80
81#[derive(Debug, Copy, Clone, Eq, PartialEq)]
82pub(crate) struct BtreeHeader {
83 pub(crate) root: PageNumber,
84 pub(crate) checksum: Checksum,
85 pub(crate) length: u64,
86}
87
88impl BtreeHeader {
89 pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
90 Self {
91 root,
92 checksum,
93 length,
94 }
95 }
96
97 pub(crate) const fn serialized_size() -> usize {
98 PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
99 }
100
101 pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
102 let root =
103 PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
104 let mut offset = PageNumber::serialized_size();
105 let checksum = Checksum::from_le_bytes(
106 bytes[offset..(offset + size_of::<Checksum>())]
107 .try_into()
108 .unwrap(),
109 );
110 offset += size_of::<Checksum>();
111 let length = u64::from_le_bytes(
112 bytes[offset..(offset + size_of::<u64>())]
113 .try_into()
114 .unwrap(),
115 );
116
117 Self {
118 root,
119 checksum,
120 length,
121 }
122 }
123
124 pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
125 let mut result = [0; Self::serialized_size()];
126 result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
127 result[PageNumber::serialized_size()
128 ..(PageNumber::serialized_size() + size_of::<Checksum>())]
129 .copy_from_slice(&self.checksum.to_le_bytes());
130 result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
131 .copy_from_slice(&self.length.to_le_bytes());
132
133 result
134 }
135}
136
137enum OnDrop {
138 None,
139 RemoveEntry {
140 position: usize,
141 fixed_key_size: Option<usize>,
142 },
143}
144
145enum EitherPage {
146 Immutable(PageImpl),
147 Mutable(PageMut),
148 OwnedMemory(Vec<u8>),
149 ArcMemory(Arc<[u8]>),
150}
151
152impl EitherPage {
153 fn memory(&self) -> &[u8] {
154 match self {
155 EitherPage::Immutable(page) => page.memory(),
156 EitherPage::Mutable(page) => page.memory(),
157 EitherPage::OwnedMemory(mem) => mem.as_slice(),
158 EitherPage::ArcMemory(mem) => mem,
159 }
160 }
161}
162
163pub struct AccessGuard<'a, V: Value + 'static> {
167 page: EitherPage,
168 offset: usize,
169 len: usize,
170 on_drop: OnDrop,
171 _value_type: PhantomData<V>,
172 _lifetime: PhantomData<&'a ()>,
174}
175
176impl<V: Value + 'static> AccessGuard<'_, V> {
177 pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
178 Self {
179 page: EitherPage::Immutable(page),
180 offset: range.start,
181 len: range.len(),
182 on_drop: OnDrop::None,
183 _value_type: Default::default(),
184 _lifetime: Default::default(),
185 }
186 }
187
188 pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
189 Self {
190 page: EitherPage::ArcMemory(page),
191 offset: range.start,
192 len: range.len(),
193 on_drop: OnDrop::None,
194 _value_type: Default::default(),
195 _lifetime: Default::default(),
196 }
197 }
198
199 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
200 let len = value.len();
201 Self {
202 page: EitherPage::OwnedMemory(value),
203 offset: 0,
204 len,
205 on_drop: OnDrop::None,
206 _value_type: Default::default(),
207 _lifetime: Default::default(),
208 }
209 }
210
211 pub(super) fn remove_on_drop(
212 page: PageMut,
213 offset: usize,
214 len: usize,
215 position: usize,
216 fixed_key_size: Option<usize>,
217 ) -> Self {
218 Self {
219 page: EitherPage::Mutable(page),
220 offset,
221 len,
222 on_drop: OnDrop::RemoveEntry {
223 position,
224 fixed_key_size,
225 },
226 _value_type: Default::default(),
227 _lifetime: Default::default(),
228 }
229 }
230
231 pub fn value(&self) -> V::SelfType<'_> {
233 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
234 }
235}
236
237impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
238 fn drop(&mut self) {
239 match self.on_drop {
240 OnDrop::None => {}
241 OnDrop::RemoveEntry {
242 position,
243 fixed_key_size,
244 } => {
245 if let EitherPage::Mutable(ref mut mut_page) = self.page {
246 let mut mutator =
247 LeafMutator::new(mut_page.memory_mut(), fixed_key_size, V::fixed_width());
248 mutator.remove(position);
249 } else if !thread::panicking() {
250 unreachable!();
251 }
252 }
253 }
254 }
255}
256
257pub struct AccessGuardMut<'a, V: Value + 'static> {
258 page: PageMut,
259 offset: usize,
260 len: usize,
261 entry_index: usize,
262 parent: Option<(PageMut, usize)>,
263 mem: Arc<TransactionalMemory>,
264 allocated: Arc<Mutex<PageTrackerPolicy>>,
265 root_ref: &'a mut BtreeHeader,
266 key_width: Option<usize>,
267 _value_type: PhantomData<V>,
268}
269
270impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
271 #[allow(clippy::too_many_arguments)]
272 pub(crate) fn new(
273 page: PageMut,
274 offset: usize,
275 len: usize,
276 entry_index: usize,
277 parent: Option<(PageMut, usize)>,
278 mem: Arc<TransactionalMemory>,
279 allocated: Arc<Mutex<PageTrackerPolicy>>,
280 root_ref: &'a mut BtreeHeader,
281 key_width: Option<usize>,
282 ) -> Self {
283 assert!(mem.uncommitted(page.get_page_number()));
284 if let Some((ref parent_page, _)) = parent {
285 assert!(mem.uncommitted(parent_page.get_page_number()));
286 }
287 AccessGuardMut {
288 page,
289 offset,
290 len,
291 entry_index,
292 parent,
293 mem,
294 allocated,
295 root_ref,
296 key_width,
297 _value_type: Default::default(),
298 }
299 }
300
301 pub fn value(&self) -> V::SelfType<'_> {
303 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
304 }
305
306 pub fn insert<'v>(&mut self, value: impl Borrow<V::SelfType<'v>>) -> Result<()> {
308 let value_bytes = V::as_bytes(value.borrow());
309
310 let key_bytes = {
312 let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
313 accessor.key_unchecked(self.entry_index).to_vec()
314 };
315
316 if LeafMutator::sufficient_insert_inplace_space(
317 &self.page,
318 self.entry_index,
319 true,
320 self.key_width,
321 V::fixed_width(),
322 key_bytes.as_slice(),
323 value_bytes.as_ref(),
324 ) {
325 let mut mutator =
326 LeafMutator::new(self.page.memory_mut(), self.key_width, V::fixed_width());
327 mutator.insert(self.entry_index, true, &key_bytes, value_bytes.as_ref());
328 } else {
329 let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
330 let mut builder = LeafBuilder::new(
331 &self.mem,
332 &self.allocated,
333 accessor.num_pairs(),
334 self.key_width,
335 V::fixed_width(),
336 );
337
338 for i in 0..accessor.num_pairs() {
339 if i == self.entry_index {
340 builder.push(&key_bytes, value_bytes.as_ref());
341 } else {
342 let entry = accessor.entry(i).unwrap();
343 builder.push(entry.key(), entry.value());
344 }
345 }
346
347 let new_page = builder.build()?;
348
349 if let Some((ref mut parent_page, parent_entry_index)) = self.parent {
351 let mut mutator = BranchMutator::new(parent_page.memory_mut());
352 mutator.write_child_page(parent_entry_index, new_page.get_page_number(), DEFERRED);
353 } else {
354 self.root_ref.root = new_page.get_page_number();
355 self.root_ref.checksum = DEFERRED;
356 }
357
358 let old_page_number = self.page.get_page_number();
359 self.page = new_page;
360 let mut allocated = self.allocated.lock().unwrap();
361 assert!(
362 self.mem
363 .free_if_uncommitted(old_page_number, &mut allocated)
364 );
365 }
366
367 let new_accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
369 let (new_start, new_end) = new_accessor.value_range(self.entry_index).unwrap();
370
371 self.offset = new_start;
372 self.len = new_end - new_start;
373
374 Ok(())
375 }
376}
377
378pub struct AccessGuardMutInPlace<'a, V: Value + 'static> {
379 page: PageMut,
380 offset: usize,
381 len: usize,
382 _value_type: PhantomData<V>,
383 _lifetime: PhantomData<&'a ()>,
385}
386
387impl<V: Value + 'static> AccessGuardMutInPlace<'_, V> {
388 pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
389 AccessGuardMutInPlace {
390 page,
391 offset,
392 len,
393 _value_type: Default::default(),
394 _lifetime: Default::default(),
395 }
396 }
397}
398
399impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMutInPlace<'_, V> {
400 fn as_mut(&mut self) -> &mut V::BaseRefType {
401 V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
402 }
403}
404
405pub struct EntryAccessor<'a> {
407 key: &'a [u8],
408 value: &'a [u8],
409}
410
411impl<'a> EntryAccessor<'a> {
412 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
413 EntryAccessor { key, value }
414 }
415}
416
417impl<'a: 'b, 'b> EntryAccessor<'a> {
418 pub(crate) fn key(&'b self) -> &'a [u8] {
419 self.key
420 }
421
422 pub(crate) fn value(&'b self) -> &'a [u8] {
423 self.value
424 }
425}
426
427pub(crate) struct LeafAccessor<'a> {
429 page: &'a [u8],
430 fixed_key_size: Option<usize>,
431 fixed_value_size: Option<usize>,
432 num_pairs: usize,
433}
434
435impl<'a> LeafAccessor<'a> {
436 pub(crate) fn new(
437 page: &'a [u8],
438 fixed_key_size: Option<usize>,
439 fixed_value_size: Option<usize>,
440 ) -> Self {
441 debug_assert_eq!(page[0], LEAF);
442 let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
443 LeafAccessor {
444 page,
445 fixed_key_size,
446 fixed_value_size,
447 num_pairs,
448 }
449 }
450
451 pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
452 let mut i = 0;
453 while let Some(entry) = self.entry(i) {
454 eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
455 if include_value {
456 eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
457 }
458 i += 1;
459 }
460 }
461
462 pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
463 let mut min_entry = 0;
465 let mut max_entry = self.num_pairs();
467 while min_entry < max_entry {
468 let mid = min_entry.midpoint(max_entry);
469 let key = self.key_unchecked(mid);
470 match K::compare(query, key) {
471 Ordering::Less => {
472 max_entry = mid;
473 }
474 Ordering::Equal => {
475 return (mid, true);
476 }
477 Ordering::Greater => {
478 min_entry = mid + 1;
479 }
480 }
481 }
482 debug_assert_eq!(min_entry, max_entry);
483 (min_entry, false)
484 }
485
486 pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
487 let (entry, found) = self.position::<K>(query);
488 if found { Some(entry) } else { None }
489 }
490
491 fn key_section_start(&self) -> usize {
492 let mut offset = 4;
493 if self.fixed_key_size.is_none() {
494 offset += size_of::<u32>() * self.num_pairs;
495 }
496 if self.fixed_value_size.is_none() {
497 offset += size_of::<u32>() * self.num_pairs;
498 }
499
500 offset
501 }
502
503 fn key_start(&self, n: usize) -> Option<usize> {
504 if n == 0 {
505 Some(self.key_section_start())
506 } else {
507 self.key_end(n - 1)
508 }
509 }
510
511 fn key_end(&self, n: usize) -> Option<usize> {
512 if n >= self.num_pairs() {
513 None
514 } else {
515 if let Some(fixed) = self.fixed_key_size {
516 return Some(self.key_section_start() + fixed * (n + 1));
517 }
518 let offset = 4 + size_of::<u32>() * n;
519 let end = u32::from_le_bytes(
520 self.page
521 .get(offset..(offset + size_of::<u32>()))?
522 .try_into()
523 .unwrap(),
524 ) as usize;
525 Some(end)
526 }
527 }
528
529 fn value_start(&self, n: usize) -> Option<usize> {
530 if n == 0 {
531 self.key_end(self.num_pairs() - 1)
532 } else {
533 self.value_end(n - 1)
534 }
535 }
536
537 fn value_end(&self, n: usize) -> Option<usize> {
538 if n >= self.num_pairs() {
539 None
540 } else {
541 if let Some(fixed) = self.fixed_value_size {
542 return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
543 }
544 let mut offset = 4 + size_of::<u32>() * n;
545 if self.fixed_key_size.is_none() {
546 offset += size_of::<u32>() * self.num_pairs;
547 }
548 let end = u32::from_le_bytes(
549 self.page
550 .get(offset..(offset + size_of::<u32>()))?
551 .try_into()
552 .unwrap(),
553 ) as usize;
554 Some(end)
555 }
556 }
557
558 pub(crate) fn num_pairs(&self) -> usize {
559 self.num_pairs
560 }
561
562 pub(super) fn offset_of_first_value(&self) -> usize {
563 self.offset_of_value(0).unwrap()
564 }
565
566 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
567 self.value_start(n)
568 }
569
570 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
571 Some((self.value_start(n)?, self.value_end(n)?))
572 }
573
574 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
576 self.length_of_values(start, end) + self.length_of_keys(start, end)
577 }
578
579 fn length_of_values(&self, start: usize, end: usize) -> usize {
580 if end == 0 {
581 return 0;
582 }
583 let end_offset = self.value_end(end - 1).unwrap();
584 let start_offset = self.value_start(start).unwrap();
585 end_offset - start_offset
586 }
587
588 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
590 if end == 0 {
591 return 0;
592 }
593 let end_offset = self.key_end(end - 1).unwrap();
594 let start_offset = self.key_start(start).unwrap();
595 end_offset - start_offset
596 }
597
598 pub(crate) fn total_length(&self) -> usize {
599 self.value_end(self.num_pairs() - 1).unwrap()
601 }
602
603 fn key_unchecked(&self, n: usize) -> &[u8] {
604 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
605 }
606
607 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
608 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
609 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
610 Some(EntryAccessor::new(key, value))
611 }
612
613 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
614 let key = self.key_start(n)?..self.key_end(n)?;
615 let value = self.value_start(n)?..self.value_end(n)?;
616 Some((key, value))
617 }
618
619 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
620 self.entry(self.num_pairs() - 1).unwrap()
621 }
622}
623
624pub(super) struct LeafBuilder<'a, 'b> {
625 pairs: Vec<(&'a [u8], &'a [u8])>,
626 fixed_key_size: Option<usize>,
627 fixed_value_size: Option<usize>,
628 total_key_bytes: usize,
629 total_value_bytes: usize,
630 mem: &'b TransactionalMemory,
631 allocated_pages: &'b Mutex<PageTrackerPolicy>,
632}
633
634impl<'a, 'b> LeafBuilder<'a, 'b> {
635 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
636 RawLeafBuilder::required_bytes(
637 num_pairs,
638 keys_values_bytes,
639 self.fixed_key_size,
640 self.fixed_value_size,
641 )
642 }
643
644 pub(super) fn new(
645 mem: &'b TransactionalMemory,
646 allocated_pages: &'b Mutex<PageTrackerPolicy>,
647 capacity: usize,
648 fixed_key_size: Option<usize>,
649 fixed_value_size: Option<usize>,
650 ) -> Self {
651 Self {
652 pairs: Vec::with_capacity(capacity),
653 fixed_key_size,
654 fixed_value_size,
655 total_key_bytes: 0,
656 total_value_bytes: 0,
657 mem,
658 allocated_pages,
659 }
660 }
661
662 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
663 self.total_key_bytes += key.len();
664 self.total_value_bytes += value.len();
665 self.pairs.push((key, value));
666 }
667
668 pub(super) fn push_all_except(
669 &mut self,
670 accessor: &'a LeafAccessor<'_>,
671 except: Option<usize>,
672 ) {
673 for i in 0..accessor.num_pairs() {
674 if let Some(except) = except
675 && except == i
676 {
677 continue;
678 }
679 let entry = accessor.entry(i).unwrap();
680 self.push(entry.key(), entry.value());
681 }
682 }
683
684 pub(super) fn should_split(&self) -> bool {
685 let required_size = self.required_bytes(
686 self.pairs.len(),
687 self.total_key_bytes + self.total_value_bytes,
688 );
689 required_size > self.mem.get_page_size() && self.pairs.len() > 1
690 }
691
692 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
693 let total_size = self.total_key_bytes + self.total_value_bytes;
694 let mut division = 0;
695 let mut first_split_key_bytes = 0;
696 let mut first_split_value_bytes = 0;
697 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
698 first_split_key_bytes += key.len();
699 first_split_value_bytes += value.len();
700 division += 1;
701 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
702 break;
703 }
704 }
705
706 let required_size =
707 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
708 let mut allocated_pages = self.allocated_pages.lock().unwrap();
709 let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
710 let mut builder = RawLeafBuilder::new(
711 page1.memory_mut(),
712 division,
713 self.fixed_key_size,
714 self.fixed_value_size,
715 first_split_key_bytes,
716 );
717 for (key, value) in self.pairs.iter().take(division) {
718 builder.append(key, value);
719 }
720 drop(builder);
721
722 let required_size = self.required_bytes(
723 self.pairs.len() - division,
724 self.total_key_bytes + self.total_value_bytes
725 - first_split_key_bytes
726 - first_split_value_bytes,
727 );
728 let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
729 let mut builder = RawLeafBuilder::new(
730 page2.memory_mut(),
731 self.pairs.len() - division,
732 self.fixed_key_size,
733 self.fixed_value_size,
734 self.total_key_bytes - first_split_key_bytes,
735 );
736 for (key, value) in &self.pairs[division..] {
737 builder.append(key, value);
738 }
739 drop(builder);
740
741 Ok((page1, self.pairs[division - 1].0, page2))
742 }
743
744 pub(super) fn build(self) -> Result<PageMut> {
745 let required_size = self.required_bytes(
746 self.pairs.len(),
747 self.total_key_bytes + self.total_value_bytes,
748 );
749 let mut allocated_pages = self.allocated_pages.lock().unwrap();
750 let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
751 let mut builder = RawLeafBuilder::new(
752 page.memory_mut(),
753 self.pairs.len(),
754 self.fixed_key_size,
755 self.fixed_value_size,
756 self.total_key_bytes,
757 );
758 for (key, value) in self.pairs {
759 builder.append(key, value);
760 }
761 drop(builder);
762 Ok(page)
763 }
764}
765
766pub(crate) struct RawLeafBuilder<'a> {
781 page: &'a mut [u8],
782 fixed_key_size: Option<usize>,
783 fixed_value_size: Option<usize>,
784 num_pairs: usize,
785 provisioned_key_bytes: usize,
786 pairs_written: usize, }
788
789impl<'a> RawLeafBuilder<'a> {
790 pub(crate) fn required_bytes(
791 num_pairs: usize,
792 keys_values_bytes: usize,
793 key_size: Option<usize>,
794 value_size: Option<usize>,
795 ) -> usize {
796 let mut result = 4;
798 if key_size.is_none() {
800 result += num_pairs * size_of::<u32>();
801 }
802 if value_size.is_none() {
803 result += num_pairs * size_of::<u32>();
804 }
805 result += keys_values_bytes;
806
807 result
808 }
809
810 pub(crate) fn new(
811 page: &'a mut [u8],
812 num_pairs: usize,
813 fixed_key_size: Option<usize>,
814 fixed_value_size: Option<usize>,
815 key_bytes: usize,
816 ) -> Self {
817 page[0] = LEAF;
818 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
819 #[cfg(debug_assertions)]
820 {
821 let mut last = 4;
823 if fixed_key_size.is_none() {
824 last += size_of::<u32>() * num_pairs;
825 }
826 if fixed_value_size.is_none() {
827 last += size_of::<u32>() * num_pairs;
828 }
829 for x in &mut page[4..last] {
830 *x = 0xFF;
831 }
832 }
833 RawLeafBuilder {
834 page,
835 fixed_key_size,
836 fixed_value_size,
837 num_pairs,
838 provisioned_key_bytes: key_bytes,
839 pairs_written: 0,
840 }
841 }
842
843 fn value_end(&self, n: usize) -> usize {
844 if let Some(fixed) = self.fixed_value_size {
845 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
846 }
847 let mut offset = 4 + size_of::<u32>() * n;
848 if self.fixed_key_size.is_none() {
849 offset += size_of::<u32>() * self.num_pairs;
850 }
851 u32::from_le_bytes(
852 self.page[offset..(offset + size_of::<u32>())]
853 .try_into()
854 .unwrap(),
855 ) as usize
856 }
857
858 fn key_section_start(&self) -> usize {
859 let mut offset = 4;
860 if self.fixed_key_size.is_none() {
861 offset += size_of::<u32>() * self.num_pairs;
862 }
863 if self.fixed_value_size.is_none() {
864 offset += size_of::<u32>() * self.num_pairs;
865 }
866
867 offset
868 }
869
870 fn key_end(&self, n: usize) -> usize {
871 if let Some(fixed) = self.fixed_key_size {
872 return self.key_section_start() + fixed * (n + 1);
873 }
874 let offset = 4 + size_of::<u32>() * n;
875 u32::from_le_bytes(
876 self.page[offset..(offset + size_of::<u32>())]
877 .try_into()
878 .unwrap(),
879 ) as usize
880 }
881
882 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
883 if let Some(key_width) = self.fixed_key_size {
884 assert_eq!(key_width, key.len());
885 }
886 if let Some(value_width) = self.fixed_value_size {
887 assert_eq!(value_width, value.len());
888 }
889 let key_offset = if self.pairs_written == 0 {
890 self.key_section_start()
891 } else {
892 self.key_end(self.pairs_written - 1)
893 };
894 let value_offset = if self.pairs_written == 0 {
895 self.key_section_start() + self.provisioned_key_bytes
896 } else {
897 self.value_end(self.pairs_written - 1)
898 };
899
900 let n = self.pairs_written;
901 if self.fixed_key_size.is_none() {
902 let offset = 4 + size_of::<u32>() * n;
903 self.page[offset..(offset + size_of::<u32>())]
904 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
905 }
906 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
907 let written_key_len = key_offset + key.len() - self.key_section_start();
908 assert!(written_key_len <= self.provisioned_key_bytes);
909
910 if self.fixed_value_size.is_none() {
911 let mut offset = 4 + size_of::<u32>() * n;
912 if self.fixed_key_size.is_none() {
913 offset += size_of::<u32>() * self.num_pairs;
914 }
915 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
916 &u32::try_from(value_offset + value.len())
917 .unwrap()
918 .to_le_bytes(),
919 );
920 }
921 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
922 self.pairs_written += 1;
923 }
924}
925
926impl Drop for RawLeafBuilder<'_> {
927 fn drop(&mut self) {
928 if !thread::panicking() {
929 assert_eq!(self.pairs_written, self.num_pairs);
930 assert_eq!(
931 self.key_section_start() + self.provisioned_key_bytes,
932 self.key_end(self.num_pairs - 1)
933 );
934 }
935 }
936}
937
938pub(crate) struct LeafMutator<'b> {
939 page: &'b mut [u8],
940 fixed_key_size: Option<usize>,
941 fixed_value_size: Option<usize>,
942}
943
944impl<'b> LeafMutator<'b> {
945 pub(crate) fn new(
946 page: &'b mut [u8],
947 fixed_key_size: Option<usize>,
948 fixed_value_size: Option<usize>,
949 ) -> Self {
950 assert_eq!(page[0], LEAF);
951 Self {
952 page,
953 fixed_key_size,
954 fixed_value_size,
955 }
956 }
957
958 pub(super) fn sufficient_insert_inplace_space(
959 page: &'_ impl Page,
960 position: usize,
961 overwrite: bool,
962 fixed_key_size: Option<usize>,
963 fixed_value_size: Option<usize>,
964 new_key: &[u8],
965 new_value: &[u8],
966 ) -> bool {
967 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
968 if overwrite {
969 let remaining = page.memory().len() - accessor.total_length();
970 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
971 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
972 required_delta <= isize::try_from(remaining).unwrap()
973 } else {
974 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
979 return false;
980 }
981 let remaining = page.memory().len() - accessor.total_length();
982 let mut required_delta = new_key.len() + new_value.len();
983 if fixed_key_size.is_none() {
984 required_delta += size_of::<u32>();
985 }
986 if fixed_value_size.is_none() {
987 required_delta += size_of::<u32>();
988 }
989 required_delta <= remaining
990 }
991 }
992
993 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
995 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
996 let required_delta = if overwrite {
997 isize::try_from(key.len() + value.len()).unwrap()
998 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
999 } else {
1000 let mut delta = key.len() + value.len();
1001 if self.fixed_key_size.is_none() {
1002 delta += size_of::<u32>();
1003 }
1004 if self.fixed_value_size.is_none() {
1005 delta += size_of::<u32>();
1006 }
1007 delta.try_into().unwrap()
1008 };
1009 assert!(
1010 isize::try_from(accessor.total_length()).unwrap() + required_delta
1011 <= isize::try_from(self.page.len()).unwrap()
1012 );
1013
1014 let num_pairs = accessor.num_pairs();
1015 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
1016 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1017 let shift_index = if overwrite { i + 1 } else { i };
1018 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
1019 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
1020 let existing_value_len = accessor
1021 .value_range(i)
1022 .map(|(start, end)| end - start)
1023 .unwrap_or_default();
1024
1025 let value_delta = if overwrite {
1026 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
1027 } else {
1028 value.len().try_into().unwrap()
1029 };
1030
1031 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
1033 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
1034 4
1035 } else {
1036 0
1037 };
1038 if !overwrite {
1039 for j in 0..i {
1040 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
1041 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1042 .try_into()
1043 .unwrap();
1044 self.update_value_end(j, value_delta);
1045 }
1046 }
1047 for j in i..num_pairs {
1048 if overwrite {
1049 self.update_value_end(j, value_delta);
1050 } else {
1051 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1052 .try_into()
1053 .unwrap();
1054 self.update_key_end(j, key_delta);
1055 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
1056 self.update_value_end(j, value_delta);
1057 }
1058 }
1059
1060 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
1061 self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1062
1063 let mut dest = if overwrite {
1065 (isize::try_from(shift_value_start).unwrap() + value_delta)
1066 .try_into()
1067 .unwrap()
1068 } else {
1069 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
1070 };
1071 let start = shift_value_start;
1072 let end = last_value_end;
1073 self.page.copy_within(start..end, dest);
1074
1075 let inserted_value_end: u32 = dest.try_into().unwrap();
1077 dest -= value.len();
1078 self.page[dest..(dest + value.len())].copy_from_slice(value);
1079
1080 if !overwrite {
1081 let start = shift_key_start;
1083 let end = shift_value_start;
1084 dest -= end - start;
1085 self.page.copy_within(start..end, dest);
1086
1087 let inserted_key_end: u32 = dest.try_into().unwrap();
1089 dest -= key.len();
1090 self.page[dest..(dest + key.len())].copy_from_slice(key);
1091
1092 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1094 let end = shift_key_start;
1095 dest -= end - start;
1096 debug_assert_eq!(
1097 dest,
1098 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
1099 );
1100 self.page.copy_within(start..end, dest);
1101
1102 if self.fixed_value_size.is_none() {
1104 dest -= size_of::<u32>();
1105 self.page[dest..(dest + size_of::<u32>())]
1106 .copy_from_slice(&inserted_value_end.to_le_bytes());
1107 }
1108
1109 let start = 4 + key_ptr_size * i;
1111 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1112 dest -= end - start;
1113 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
1114 self.page.copy_within(start..end, dest);
1115
1116 if self.fixed_key_size.is_none() {
1118 dest -= size_of::<u32>();
1119 self.page[dest..(dest + size_of::<u32>())]
1120 .copy_from_slice(&inserted_key_end.to_le_bytes());
1121 }
1122 debug_assert_eq!(dest, 4 + key_ptr_size * i);
1123 }
1124 }
1125
1126 pub(super) fn remove(&mut self, i: usize) {
1127 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
1128 let num_pairs = accessor.num_pairs();
1129 assert!(i < num_pairs);
1130 assert!(num_pairs > 1);
1131 let key_start = accessor.key_start(i).unwrap();
1132 let key_end = accessor.key_end(i).unwrap();
1133 let value_start = accessor.value_start(i).unwrap();
1134 let value_end = accessor.value_end(i).unwrap();
1135 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1136
1137 let key_ptr_size = if self.fixed_key_size.is_none() {
1139 size_of::<u32>()
1140 } else {
1141 0
1142 };
1143 let value_ptr_size = if self.fixed_value_size.is_none() {
1144 size_of::<u32>()
1145 } else {
1146 0
1147 };
1148 for j in 0..i {
1149 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1150 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1151 - isize::try_from(key_end - key_start).unwrap();
1152 self.update_value_end(j, value_delta);
1153 }
1154 for j in (i + 1)..num_pairs {
1155 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1156 - isize::try_from(key_end - key_start).unwrap();
1157 self.update_key_end(j, key_delta);
1158 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1159 self.update_value_end(j, value_delta);
1160 }
1161
1162 let new_num_pairs = num_pairs - 1;
1165 self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1166 let mut dest = 4 + key_ptr_size * i;
1168 let start = 4 + key_ptr_size * (i + 1);
1170 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1172 self.page.copy_within(start..end, dest);
1173 dest += end - start;
1174 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1175
1176 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1178 let end = key_start;
1179 self.page.copy_within(start..end, dest);
1180 dest += end - start;
1181
1182 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1183 debug_assert_eq!(
1184 dest,
1185 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1186 );
1187
1188 let start = key_end;
1190 let end = value_start;
1191 self.page.copy_within(start..end, dest);
1192 dest += end - start;
1193
1194 let preceding_data_len =
1196 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1197 debug_assert_eq!(
1198 dest,
1199 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1200 );
1201 let start = value_end;
1202 let end = last_value_end;
1203 self.page.copy_within(start..end, dest);
1204 }
1205
1206 fn update_key_end(&mut self, i: usize, delta: isize) {
1207 if self.fixed_key_size.is_some() {
1208 return;
1209 }
1210 let offset = 4 + size_of::<u32>() * i;
1211 let mut ptr = u32::from_le_bytes(
1212 self.page[offset..(offset + size_of::<u32>())]
1213 .try_into()
1214 .unwrap(),
1215 );
1216 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1217 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1218 }
1219
1220 fn update_value_end(&mut self, i: usize, delta: isize) {
1221 if self.fixed_value_size.is_some() {
1222 return;
1223 }
1224 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size);
1225 let num_pairs = accessor.num_pairs();
1226 let mut offset = 4 + size_of::<u32>() * i;
1227 if self.fixed_key_size.is_none() {
1228 offset += size_of::<u32>() * num_pairs;
1229 }
1230 let mut ptr = u32::from_le_bytes(
1231 self.page[offset..(offset + size_of::<u32>())]
1232 .try_into()
1233 .unwrap(),
1234 );
1235 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1236 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1237 }
1238}
1239
1240pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1243 page: &'b T,
1244 num_keys: usize,
1245 fixed_key_size: Option<usize>,
1246 _page_lifetime: PhantomData<&'a ()>,
1247}
1248
1249impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1250 pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1251 debug_assert_eq!(page.memory()[0], BRANCH);
1252 let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1253 BranchAccessor {
1254 page,
1255 num_keys,
1256 fixed_key_size,
1257 _page_lifetime: Default::default(),
1258 }
1259 }
1260
1261 pub(super) fn print_node<K: Key>(&self) {
1262 eprint!(
1263 "Internal[ (page={:?}), child_0={:?}",
1264 self.page.get_page_number(),
1265 self.child_page(0).unwrap()
1266 );
1267 for i in 0..(self.count_children() - 1) {
1268 if let Some(child) = self.child_page(i + 1) {
1269 let key = self.key(i).unwrap();
1270 eprint!(" key_{i}={:?}", K::from_bytes(key));
1271 eprint!(" child_{}={child:?}", i + 1);
1272 }
1273 }
1274 eprint!("]");
1275 }
1276
1277 pub(crate) fn total_length(&self) -> usize {
1278 self.key_end(self.num_keys() - 1).unwrap()
1280 }
1281
1282 pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1283 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1286 let mid = min_child.midpoint(max_child);
1287 match K::compare(query, self.key(mid).unwrap()) {
1288 Ordering::Less => {
1289 max_child = mid;
1290 }
1291 Ordering::Equal => {
1292 return (mid, self.child_page(mid).unwrap());
1293 }
1294 Ordering::Greater => {
1295 min_child = mid + 1;
1296 }
1297 }
1298 }
1299 debug_assert_eq!(min_child, max_child);
1300
1301 (min_child, self.child_page(min_child).unwrap())
1302 }
1303
1304 fn key_section_start(&self) -> usize {
1305 if self.fixed_key_size.is_none() {
1306 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1307 + size_of::<u32>() * self.num_keys()
1308 } else {
1309 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1310 }
1311 }
1312
1313 fn key_offset(&self, n: usize) -> usize {
1314 if n == 0 {
1315 self.key_section_start()
1316 } else {
1317 self.key_end(n - 1).unwrap()
1318 }
1319 }
1320
1321 fn key_end(&self, n: usize) -> Option<usize> {
1322 if let Some(fixed) = self.fixed_key_size {
1323 return Some(self.key_section_start() + fixed * (n + 1));
1324 }
1325 let offset = 8
1326 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1327 + size_of::<u32>() * n;
1328 Some(u32::from_le_bytes(
1329 self.page
1330 .memory()
1331 .get(offset..(offset + size_of::<u32>()))?
1332 .try_into()
1333 .unwrap(),
1334 ) as usize)
1335 }
1336
1337 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1338 if n >= self.num_keys() {
1339 return None;
1340 }
1341 let offset = self.key_offset(n);
1342 let end = self.key_end(n)?;
1343 Some(&self.page.memory()[offset..end])
1344 }
1345
1346 pub(crate) fn count_children(&self) -> usize {
1347 self.num_keys() + 1
1348 }
1349
1350 pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1351 if n >= self.count_children() {
1352 return None;
1353 }
1354
1355 let offset = 8 + size_of::<Checksum>() * n;
1356 Some(Checksum::from_le_bytes(
1357 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1358 .try_into()
1359 .unwrap(),
1360 ))
1361 }
1362
1363 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1364 if n >= self.count_children() {
1365 return None;
1366 }
1367
1368 let offset =
1369 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1370 Some(PageNumber::from_le_bytes(
1371 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1372 .try_into()
1373 .unwrap(),
1374 ))
1375 }
1376
1377 fn num_keys(&self) -> usize {
1378 self.num_keys
1379 }
1380}
1381
1382pub(super) struct BranchBuilder<'a, 'b> {
1383 children: Vec<(PageNumber, Checksum)>,
1384 keys: Vec<&'a [u8]>,
1385 total_key_bytes: usize,
1386 fixed_key_size: Option<usize>,
1387 mem: &'b TransactionalMemory,
1388 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1389}
1390
1391impl<'a, 'b> BranchBuilder<'a, 'b> {
1392 pub(super) fn new(
1393 mem: &'b TransactionalMemory,
1394 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1395 child_capacity: usize,
1396 fixed_key_size: Option<usize>,
1397 ) -> Self {
1398 Self {
1399 children: Vec::with_capacity(child_capacity),
1400 keys: Vec::with_capacity(child_capacity - 1),
1401 total_key_bytes: 0,
1402 fixed_key_size,
1403 mem,
1404 allocated_pages,
1405 }
1406 }
1407
1408 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1409 self.children[index] = (child, checksum);
1410 }
1411
1412 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1413 self.children.push((child, checksum));
1414 }
1415
1416 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1417 self.keys.push(key);
1418 self.total_key_bytes += key.len();
1419 }
1420
1421 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1422 for i in 0..accessor.count_children() {
1423 let child = accessor.child_page(i).unwrap();
1424 let checksum = accessor.child_checksum(i).unwrap();
1425 self.push_child(child, checksum);
1426 }
1427 for i in 0..(accessor.count_children() - 1) {
1428 self.push_key(accessor.key(i).unwrap());
1429 }
1430 }
1431
1432 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1433 if self.children.len() > 1 {
1434 None
1435 } else {
1436 Some(self.children[0])
1437 }
1438 }
1439
1440 pub(super) fn build(self) -> Result<PageMut> {
1441 assert_eq!(self.children.len(), self.keys.len() + 1);
1442 let size = RawBranchBuilder::required_bytes(
1443 self.keys.len(),
1444 self.total_key_bytes,
1445 self.fixed_key_size,
1446 );
1447 let mut allocated_pages = self.allocated_pages.lock().unwrap();
1448 let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1449 let mut builder =
1450 RawBranchBuilder::new(page.memory_mut(), self.keys.len(), self.fixed_key_size);
1451 builder.write_first_page(self.children[0].0, self.children[0].1);
1452 for i in 1..self.children.len() {
1453 let key = &self.keys[i - 1];
1454 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1455 }
1456 drop(builder);
1457
1458 Ok(page)
1459 }
1460
1461 pub(super) fn should_split(&self) -> bool {
1462 let size = RawBranchBuilder::required_bytes(
1463 self.keys.len(),
1464 self.total_key_bytes,
1465 self.fixed_key_size,
1466 );
1467 size > self.mem.get_page_size() && self.keys.len() >= 3
1468 }
1469
1470 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1471 let mut allocated_pages = self.allocated_pages.lock().unwrap();
1472 assert_eq!(self.children.len(), self.keys.len() + 1);
1473 assert!(self.keys.len() >= 3);
1474 let division = self.keys.len() / 2;
1475 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1476 let division_key = self.keys[division];
1477 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1478
1479 let size =
1480 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1481 let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1482 let mut builder = RawBranchBuilder::new(page1.memory_mut(), division, self.fixed_key_size);
1483 builder.write_first_page(self.children[0].0, self.children[0].1);
1484 for i in 0..division {
1485 let key = &self.keys[i];
1486 builder.write_nth_key(
1487 key.as_ref(),
1488 self.children[i + 1].0,
1489 self.children[i + 1].1,
1490 i,
1491 );
1492 }
1493 drop(builder);
1494
1495 let size = RawBranchBuilder::required_bytes(
1496 self.keys.len() - division - 1,
1497 second_split_key_len,
1498 self.fixed_key_size,
1499 );
1500 let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1501 let mut builder = RawBranchBuilder::new(
1502 page2.memory_mut(),
1503 self.keys.len() - division - 1,
1504 self.fixed_key_size,
1505 );
1506 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1507 for i in (division + 1)..self.keys.len() {
1508 let key = &self.keys[i];
1509 builder.write_nth_key(
1510 key.as_ref(),
1511 self.children[i + 1].0,
1512 self.children[i + 1].1,
1513 i - division - 1,
1514 );
1515 }
1516 drop(builder);
1517
1518 Ok((page1, division_key, page2))
1519 }
1520}
1521
1522pub(super) struct RawBranchBuilder<'b> {
1538 page: &'b mut [u8],
1539 fixed_key_size: Option<usize>,
1540 num_keys: usize,
1541 keys_written: usize, }
1543
1544impl<'b> RawBranchBuilder<'b> {
1545 pub(super) fn required_bytes(
1546 num_keys: usize,
1547 size_of_keys: usize,
1548 fixed_key_size: Option<usize>,
1549 ) -> usize {
1550 if fixed_key_size.is_none() {
1551 let fixed_size = 8
1552 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1553 + size_of::<u32>() * num_keys;
1554 size_of_keys + fixed_size
1555 } else {
1556 let fixed_size =
1557 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1558 size_of_keys + fixed_size
1559 }
1560 }
1561
1562 pub(super) fn new(page: &'b mut [u8], num_keys: usize, fixed_key_size: Option<usize>) -> Self {
1564 assert!(num_keys > 0);
1565 page[0] = BRANCH;
1566 page[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1567 #[cfg(debug_assertions)]
1568 {
1569 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1571 let last = 8
1572 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1573 + size_of::<u32>() * num_keys;
1574 for x in &mut page[start..last] {
1575 *x = 0xFF;
1576 }
1577 }
1578 RawBranchBuilder {
1579 page,
1580 fixed_key_size,
1581 num_keys,
1582 keys_written: 0,
1583 }
1584 }
1585
1586 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1587 let offset = 8;
1588 self.page[offset..(offset + size_of::<Checksum>())]
1589 .copy_from_slice(&checksum.to_le_bytes());
1590 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1591 self.page[offset..(offset + PageNumber::serialized_size())]
1592 .copy_from_slice(&page_number.to_le_bytes());
1593 }
1594
1595 fn key_section_start(&self) -> usize {
1596 let mut offset =
1597 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1598 if self.fixed_key_size.is_none() {
1599 offset += size_of::<u32>() * self.num_keys;
1600 }
1601
1602 offset
1603 }
1604
1605 fn key_end(&self, n: usize) -> usize {
1606 if let Some(fixed) = self.fixed_key_size {
1607 return self.key_section_start() + fixed * (n + 1);
1608 }
1609 let offset = 8
1610 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1611 + size_of::<u32>() * n;
1612 u32::from_le_bytes(
1613 self.page[offset..(offset + size_of::<u32>())]
1614 .try_into()
1615 .unwrap(),
1616 ) as usize
1617 }
1618
1619 pub(super) fn write_nth_key(
1622 &mut self,
1623 key: &[u8],
1624 page_number: PageNumber,
1625 checksum: Checksum,
1626 n: usize,
1627 ) {
1628 assert!(n < self.num_keys);
1629 assert_eq!(n, self.keys_written);
1630 self.keys_written += 1;
1631 let offset = 8 + size_of::<Checksum>() * (n + 1);
1632 self.page[offset..(offset + size_of::<Checksum>())]
1633 .copy_from_slice(&checksum.to_le_bytes());
1634 let offset = 8
1635 + size_of::<Checksum>() * (self.num_keys + 1)
1636 + PageNumber::serialized_size() * (n + 1);
1637 self.page[offset..(offset + PageNumber::serialized_size())]
1638 .copy_from_slice(&page_number.to_le_bytes());
1639
1640 let data_offset = if n > 0 {
1641 self.key_end(n - 1)
1642 } else {
1643 self.key_section_start()
1644 };
1645 if self.fixed_key_size.is_none() {
1646 let offset = 8
1647 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1648 + size_of::<u32>() * n;
1649 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1650 &u32::try_from(data_offset + key.len())
1651 .unwrap()
1652 .to_le_bytes(),
1653 );
1654 }
1655
1656 debug_assert!(data_offset > offset);
1657 self.page[data_offset..(data_offset + key.len())].copy_from_slice(key);
1658 }
1659}
1660
1661impl Drop for RawBranchBuilder<'_> {
1662 fn drop(&mut self) {
1663 if !thread::panicking() {
1664 assert_eq!(self.keys_written, self.num_keys);
1665 }
1666 }
1667}
1668
1669pub(crate) struct BranchMutator<'b> {
1670 page: &'b mut [u8],
1671}
1672
1673impl<'b> BranchMutator<'b> {
1674 pub(crate) fn new(page: &'b mut [u8]) -> Self {
1675 assert_eq!(page[0], BRANCH);
1676 Self { page }
1677 }
1678
1679 fn num_keys(&self) -> usize {
1680 u16::from_le_bytes(self.page[2..4].try_into().unwrap()) as usize
1681 }
1682
1683 pub(crate) fn write_child_page(
1684 &mut self,
1685 i: usize,
1686 page_number: PageNumber,
1687 checksum: Checksum,
1688 ) {
1689 debug_assert!(i <= self.num_keys());
1690 let offset = 8 + size_of::<Checksum>() * i;
1691 self.page[offset..(offset + size_of::<Checksum>())]
1692 .copy_from_slice(&checksum.to_le_bytes());
1693 let offset =
1694 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1695 self.page[offset..(offset + PageNumber::serialized_size())]
1696 .copy_from_slice(&page_number.to_le_bytes());
1697 }
1698}