1use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
2use crate::tree_store::{PageNumber, PageTrackerPolicy};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::{Arc, Mutex};
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20 page: &T,
21 fixed_key_size: Option<usize>,
22 fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25 let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26 StorageError::Corrupted(format!(
27 "Leaf page {:?} corrupted. Number of pairs is zero",
28 page.get_page_number()
29 ))
30 })?;
31 let end = accessor.value_end(last_pair).ok_or_else(|| {
32 StorageError::Corrupted(format!(
33 "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34 page.get_page_number(),
35 last_pair,
36 ))
37 })?;
38 if end > page.memory().len() {
39 Err(StorageError::Corrupted(format!(
40 "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41 page.get_page_number(),
42 end,
43 page.memory().len()
44 )))
45 } else {
46 Ok(xxh3_checksum(&page.memory()[..end]))
47 }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51 page: &T,
52 fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54 let accessor = BranchAccessor::new(page, fixed_key_size);
55 let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56 StorageError::Corrupted(format!(
57 "Branch page {:?} corrupted. Number of keys is zero",
58 page.get_page_number()
59 ))
60 })?;
61 let end = accessor.key_end(last_key).ok_or_else(|| {
62 StorageError::Corrupted(format!(
63 "Branch page {:?} corrupted. Can't find offset for key {}",
64 page.get_page_number(),
65 last_key
66 ))
67 })?;
68 if end > page.memory().len() {
69 Err(StorageError::Corrupted(format!(
70 "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71 page.get_page_number(),
72 end,
73 page.memory().len()
74 )))
75 } else {
76 Ok(xxh3_checksum(&page.memory()[..end]))
77 }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82 pub(crate) root: PageNumber,
83 pub(crate) checksum: Checksum,
84 pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88 pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89 Self {
90 root,
91 checksum,
92 length,
93 }
94 }
95
96 pub(crate) const fn serialized_size() -> usize {
97 PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98 }
99
100 pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101 let root =
102 PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103 let mut offset = PageNumber::serialized_size();
104 let checksum = Checksum::from_le_bytes(
105 bytes[offset..(offset + size_of::<Checksum>())]
106 .try_into()
107 .unwrap(),
108 );
109 offset += size_of::<Checksum>();
110 let length = u64::from_le_bytes(
111 bytes[offset..(offset + size_of::<u64>())]
112 .try_into()
113 .unwrap(),
114 );
115
116 Self {
117 root,
118 checksum,
119 length,
120 }
121 }
122
123 pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124 let mut result = [0; Self::serialized_size()];
125 result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126 result[PageNumber::serialized_size()
127 ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128 .copy_from_slice(&self.checksum.to_le_bytes());
129 result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130 .copy_from_slice(&self.length.to_le_bytes());
131
132 result
133 }
134}
135
136enum OnDrop {
137 None,
138 RemoveEntry {
139 position: usize,
140 fixed_key_size: Option<usize>,
141 },
142}
143
144enum EitherPage {
145 Immutable(PageImpl),
146 Mutable(PageMut),
147 OwnedMemory(Vec<u8>),
148 ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152 fn memory(&self) -> &[u8] {
153 match self {
154 EitherPage::Immutable(page) => page.memory(),
155 EitherPage::Mutable(page) => page.memory(),
156 EitherPage::OwnedMemory(mem) => mem.as_slice(),
157 EitherPage::ArcMemory(mem) => mem,
158 }
159 }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163 page: EitherPage,
164 offset: usize,
165 len: usize,
166 on_drop: OnDrop,
167 _value_type: PhantomData<V>,
168 _lifetime: PhantomData<&'a ()>,
170}
171
172impl<V: Value + 'static> AccessGuard<'_, V> {
173 pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174 Self {
175 page: EitherPage::Immutable(page),
176 offset: range.start,
177 len: range.len(),
178 on_drop: OnDrop::None,
179 _value_type: Default::default(),
180 _lifetime: Default::default(),
181 }
182 }
183
184 pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185 Self {
186 page: EitherPage::ArcMemory(page),
187 offset: range.start,
188 len: range.len(),
189 on_drop: OnDrop::None,
190 _value_type: Default::default(),
191 _lifetime: Default::default(),
192 }
193 }
194
195 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196 let len = value.len();
197 Self {
198 page: EitherPage::OwnedMemory(value),
199 offset: 0,
200 len,
201 on_drop: OnDrop::None,
202 _value_type: Default::default(),
203 _lifetime: Default::default(),
204 }
205 }
206
207 pub(super) fn remove_on_drop(
208 page: PageMut,
209 offset: usize,
210 len: usize,
211 position: usize,
212 fixed_key_size: Option<usize>,
213 ) -> Self {
214 Self {
215 page: EitherPage::Mutable(page),
216 offset,
217 len,
218 on_drop: OnDrop::RemoveEntry {
219 position,
220 fixed_key_size,
221 },
222 _value_type: Default::default(),
223 _lifetime: Default::default(),
224 }
225 }
226
227 pub fn value(&self) -> V::SelfType<'_> {
228 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229 }
230}
231
232impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
233 fn drop(&mut self) {
234 match self.on_drop {
235 OnDrop::None => {}
236 OnDrop::RemoveEntry {
237 position,
238 fixed_key_size,
239 } => {
240 if let EitherPage::Mutable(ref mut mut_page) = self.page {
241 let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242 mutator.remove(position);
243 } else if !thread::panicking() {
244 unreachable!();
245 }
246 }
247 }
248 }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252 page: PageMut,
253 offset: usize,
254 len: usize,
255 _value_type: PhantomData<V>,
256 _lifetime: PhantomData<&'a ()>,
258}
259
260impl<V: Value + 'static> AccessGuardMut<'_, V> {
261 pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262 AccessGuardMut {
263 page,
264 offset,
265 len,
266 _value_type: Default::default(),
267 _lifetime: Default::default(),
268 }
269 }
270}
271
272impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'_, V> {
273 fn as_mut(&mut self) -> &mut V::BaseRefType {
274 V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275 }
276}
277
278pub struct EntryAccessor<'a> {
280 key: &'a [u8],
281 value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286 EntryAccessor { key, value }
287 }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291 pub(crate) fn key(&'b self) -> &'a [u8] {
292 self.key
293 }
294
295 pub(crate) fn value(&'b self) -> &'a [u8] {
296 self.value
297 }
298}
299
300pub(crate) struct LeafAccessor<'a> {
302 page: &'a [u8],
303 fixed_key_size: Option<usize>,
304 fixed_value_size: Option<usize>,
305 num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309 pub(crate) fn new(
310 page: &'a [u8],
311 fixed_key_size: Option<usize>,
312 fixed_value_size: Option<usize>,
313 ) -> Self {
314 debug_assert_eq!(page[0], LEAF);
315 let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316 LeafAccessor {
317 page,
318 fixed_key_size,
319 fixed_value_size,
320 num_pairs,
321 }
322 }
323
324 pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325 let mut i = 0;
326 while let Some(entry) = self.entry(i) {
327 eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328 if include_value {
329 eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330 }
331 i += 1;
332 }
333 }
334
335 pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336 let mut min_entry = 0;
338 let mut max_entry = self.num_pairs();
340 while min_entry < max_entry {
341 let mid = min_entry.midpoint(max_entry);
342 let key = self.key_unchecked(mid);
343 match K::compare(query, key) {
344 Ordering::Less => {
345 max_entry = mid;
346 }
347 Ordering::Equal => {
348 return (mid, true);
349 }
350 Ordering::Greater => {
351 min_entry = mid + 1;
352 }
353 }
354 }
355 debug_assert_eq!(min_entry, max_entry);
356 (min_entry, false)
357 }
358
359 pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360 let (entry, found) = self.position::<K>(query);
361 if found { Some(entry) } else { None }
362 }
363
364 fn key_section_start(&self) -> usize {
365 let mut offset = 4;
366 if self.fixed_key_size.is_none() {
367 offset += size_of::<u32>() * self.num_pairs;
368 }
369 if self.fixed_value_size.is_none() {
370 offset += size_of::<u32>() * self.num_pairs;
371 }
372
373 offset
374 }
375
376 fn key_start(&self, n: usize) -> Option<usize> {
377 if n == 0 {
378 Some(self.key_section_start())
379 } else {
380 self.key_end(n - 1)
381 }
382 }
383
384 fn key_end(&self, n: usize) -> Option<usize> {
385 if n >= self.num_pairs() {
386 None
387 } else {
388 if let Some(fixed) = self.fixed_key_size {
389 return Some(self.key_section_start() + fixed * (n + 1));
390 }
391 let offset = 4 + size_of::<u32>() * n;
392 let end = u32::from_le_bytes(
393 self.page
394 .get(offset..(offset + size_of::<u32>()))?
395 .try_into()
396 .unwrap(),
397 ) as usize;
398 Some(end)
399 }
400 }
401
402 fn value_start(&self, n: usize) -> Option<usize> {
403 if n == 0 {
404 self.key_end(self.num_pairs() - 1)
405 } else {
406 self.value_end(n - 1)
407 }
408 }
409
410 fn value_end(&self, n: usize) -> Option<usize> {
411 if n >= self.num_pairs() {
412 None
413 } else {
414 if let Some(fixed) = self.fixed_value_size {
415 return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
416 }
417 let mut offset = 4 + size_of::<u32>() * n;
418 if self.fixed_key_size.is_none() {
419 offset += size_of::<u32>() * self.num_pairs;
420 }
421 let end = u32::from_le_bytes(
422 self.page
423 .get(offset..(offset + size_of::<u32>()))?
424 .try_into()
425 .unwrap(),
426 ) as usize;
427 Some(end)
428 }
429 }
430
431 pub(crate) fn num_pairs(&self) -> usize {
432 self.num_pairs
433 }
434
435 pub(super) fn offset_of_first_value(&self) -> usize {
436 self.offset_of_value(0).unwrap()
437 }
438
439 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
440 self.value_start(n)
441 }
442
443 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
444 Some((self.value_start(n)?, self.value_end(n)?))
445 }
446
447 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
449 self.length_of_values(start, end) + self.length_of_keys(start, end)
450 }
451
452 fn length_of_values(&self, start: usize, end: usize) -> usize {
453 if end == 0 {
454 return 0;
455 }
456 let end_offset = self.value_end(end - 1).unwrap();
457 let start_offset = self.value_start(start).unwrap();
458 end_offset - start_offset
459 }
460
461 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
463 if end == 0 {
464 return 0;
465 }
466 let end_offset = self.key_end(end - 1).unwrap();
467 let start_offset = self.key_start(start).unwrap();
468 end_offset - start_offset
469 }
470
471 pub(crate) fn total_length(&self) -> usize {
472 self.value_end(self.num_pairs() - 1).unwrap()
474 }
475
476 fn key_unchecked(&self, n: usize) -> &[u8] {
477 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
478 }
479
480 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
481 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
482 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
483 Some(EntryAccessor::new(key, value))
484 }
485
486 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
487 let key = self.key_start(n)?..self.key_end(n)?;
488 let value = self.value_start(n)?..self.value_end(n)?;
489 Some((key, value))
490 }
491
492 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
493 self.entry(self.num_pairs() - 1).unwrap()
494 }
495}
496
497pub(super) struct LeafBuilder<'a, 'b> {
498 pairs: Vec<(&'a [u8], &'a [u8])>,
499 fixed_key_size: Option<usize>,
500 fixed_value_size: Option<usize>,
501 total_key_bytes: usize,
502 total_value_bytes: usize,
503 mem: &'b TransactionalMemory,
504 allocated_pages: &'b Mutex<PageTrackerPolicy>,
505}
506
507impl<'a, 'b> LeafBuilder<'a, 'b> {
508 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
509 RawLeafBuilder::required_bytes(
510 num_pairs,
511 keys_values_bytes,
512 self.fixed_key_size,
513 self.fixed_value_size,
514 )
515 }
516
517 pub(super) fn new(
518 mem: &'b TransactionalMemory,
519 allocated_pages: &'b Mutex<PageTrackerPolicy>,
520 capacity: usize,
521 fixed_key_size: Option<usize>,
522 fixed_value_size: Option<usize>,
523 ) -> Self {
524 Self {
525 pairs: Vec::with_capacity(capacity),
526 fixed_key_size,
527 fixed_value_size,
528 total_key_bytes: 0,
529 total_value_bytes: 0,
530 mem,
531 allocated_pages,
532 }
533 }
534
535 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
536 self.total_key_bytes += key.len();
537 self.total_value_bytes += value.len();
538 self.pairs.push((key, value));
539 }
540
541 pub(super) fn push_all_except(
542 &mut self,
543 accessor: &'a LeafAccessor<'_>,
544 except: Option<usize>,
545 ) {
546 for i in 0..accessor.num_pairs() {
547 if let Some(except) = except {
548 if except == i {
549 continue;
550 }
551 }
552 let entry = accessor.entry(i).unwrap();
553 self.push(entry.key(), entry.value());
554 }
555 }
556
557 pub(super) fn should_split(&self) -> bool {
558 let required_size = self.required_bytes(
559 self.pairs.len(),
560 self.total_key_bytes + self.total_value_bytes,
561 );
562 required_size > self.mem.get_page_size() && self.pairs.len() > 1
563 }
564
565 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
566 let total_size = self.total_key_bytes + self.total_value_bytes;
567 let mut division = 0;
568 let mut first_split_key_bytes = 0;
569 let mut first_split_value_bytes = 0;
570 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
571 first_split_key_bytes += key.len();
572 first_split_value_bytes += value.len();
573 division += 1;
574 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
575 break;
576 }
577 }
578
579 let required_size =
580 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
581 let mut allocated_pages = self.allocated_pages.lock().unwrap();
582 let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
583 let mut builder = RawLeafBuilder::new(
584 page1.memory_mut(),
585 division,
586 self.fixed_key_size,
587 self.fixed_value_size,
588 first_split_key_bytes,
589 );
590 for (key, value) in self.pairs.iter().take(division) {
591 builder.append(key, value);
592 }
593 drop(builder);
594
595 let required_size = self.required_bytes(
596 self.pairs.len() - division,
597 self.total_key_bytes + self.total_value_bytes
598 - first_split_key_bytes
599 - first_split_value_bytes,
600 );
601 let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
602 let mut builder = RawLeafBuilder::new(
603 page2.memory_mut(),
604 self.pairs.len() - division,
605 self.fixed_key_size,
606 self.fixed_value_size,
607 self.total_key_bytes - first_split_key_bytes,
608 );
609 for (key, value) in &self.pairs[division..] {
610 builder.append(key, value);
611 }
612 drop(builder);
613
614 Ok((page1, self.pairs[division - 1].0, page2))
615 }
616
617 pub(super) fn build(self) -> Result<PageMut> {
618 let required_size = self.required_bytes(
619 self.pairs.len(),
620 self.total_key_bytes + self.total_value_bytes,
621 );
622 let mut allocated_pages = self.allocated_pages.lock().unwrap();
623 let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
624 let mut builder = RawLeafBuilder::new(
625 page.memory_mut(),
626 self.pairs.len(),
627 self.fixed_key_size,
628 self.fixed_value_size,
629 self.total_key_bytes,
630 );
631 for (key, value) in self.pairs {
632 builder.append(key, value);
633 }
634 drop(builder);
635 Ok(page)
636 }
637}
638
639pub(crate) struct RawLeafBuilder<'a> {
654 page: &'a mut [u8],
655 fixed_key_size: Option<usize>,
656 fixed_value_size: Option<usize>,
657 num_pairs: usize,
658 provisioned_key_bytes: usize,
659 pairs_written: usize, }
661
662impl<'a> RawLeafBuilder<'a> {
663 pub(crate) fn required_bytes(
664 num_pairs: usize,
665 keys_values_bytes: usize,
666 key_size: Option<usize>,
667 value_size: Option<usize>,
668 ) -> usize {
669 let mut result = 4;
671 if key_size.is_none() {
673 result += num_pairs * size_of::<u32>();
674 }
675 if value_size.is_none() {
676 result += num_pairs * size_of::<u32>();
677 }
678 result += keys_values_bytes;
679
680 result
681 }
682
683 pub(crate) fn new(
684 page: &'a mut [u8],
685 num_pairs: usize,
686 fixed_key_size: Option<usize>,
687 fixed_value_size: Option<usize>,
688 key_bytes: usize,
689 ) -> Self {
690 page[0] = LEAF;
691 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
692 #[cfg(debug_assertions)]
693 {
694 let mut last = 4;
696 if fixed_key_size.is_none() {
697 last += size_of::<u32>() * num_pairs;
698 }
699 if fixed_value_size.is_none() {
700 last += size_of::<u32>() * num_pairs;
701 }
702 for x in &mut page[4..last] {
703 *x = 0xFF;
704 }
705 }
706 RawLeafBuilder {
707 page,
708 fixed_key_size,
709 fixed_value_size,
710 num_pairs,
711 provisioned_key_bytes: key_bytes,
712 pairs_written: 0,
713 }
714 }
715
716 fn value_end(&self, n: usize) -> usize {
717 if let Some(fixed) = self.fixed_value_size {
718 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
719 }
720 let mut offset = 4 + size_of::<u32>() * n;
721 if self.fixed_key_size.is_none() {
722 offset += size_of::<u32>() * self.num_pairs;
723 }
724 u32::from_le_bytes(
725 self.page[offset..(offset + size_of::<u32>())]
726 .try_into()
727 .unwrap(),
728 ) as usize
729 }
730
731 fn key_section_start(&self) -> usize {
732 let mut offset = 4;
733 if self.fixed_key_size.is_none() {
734 offset += size_of::<u32>() * self.num_pairs;
735 }
736 if self.fixed_value_size.is_none() {
737 offset += size_of::<u32>() * self.num_pairs;
738 }
739
740 offset
741 }
742
743 fn key_end(&self, n: usize) -> usize {
744 if let Some(fixed) = self.fixed_key_size {
745 return self.key_section_start() + fixed * (n + 1);
746 }
747 let offset = 4 + size_of::<u32>() * n;
748 u32::from_le_bytes(
749 self.page[offset..(offset + size_of::<u32>())]
750 .try_into()
751 .unwrap(),
752 ) as usize
753 }
754
755 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
756 if let Some(key_width) = self.fixed_key_size {
757 assert_eq!(key_width, key.len());
758 }
759 if let Some(value_width) = self.fixed_value_size {
760 assert_eq!(value_width, value.len());
761 }
762 let key_offset = if self.pairs_written == 0 {
763 self.key_section_start()
764 } else {
765 self.key_end(self.pairs_written - 1)
766 };
767 let value_offset = if self.pairs_written == 0 {
768 self.key_section_start() + self.provisioned_key_bytes
769 } else {
770 self.value_end(self.pairs_written - 1)
771 };
772
773 let n = self.pairs_written;
774 if self.fixed_key_size.is_none() {
775 let offset = 4 + size_of::<u32>() * n;
776 self.page[offset..(offset + size_of::<u32>())]
777 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
778 }
779 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
780 let written_key_len = key_offset + key.len() - self.key_section_start();
781 assert!(written_key_len <= self.provisioned_key_bytes);
782
783 if self.fixed_value_size.is_none() {
784 let mut offset = 4 + size_of::<u32>() * n;
785 if self.fixed_key_size.is_none() {
786 offset += size_of::<u32>() * self.num_pairs;
787 }
788 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
789 &u32::try_from(value_offset + value.len())
790 .unwrap()
791 .to_le_bytes(),
792 );
793 }
794 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
795 self.pairs_written += 1;
796 }
797}
798
799impl Drop for RawLeafBuilder<'_> {
800 fn drop(&mut self) {
801 if !thread::panicking() {
802 assert_eq!(self.pairs_written, self.num_pairs);
803 assert_eq!(
804 self.key_section_start() + self.provisioned_key_bytes,
805 self.key_end(self.num_pairs - 1)
806 );
807 }
808 }
809}
810
811pub(crate) struct LeafMutator<'b> {
812 page: &'b mut PageMut,
813 fixed_key_size: Option<usize>,
814 fixed_value_size: Option<usize>,
815}
816
817impl<'b> LeafMutator<'b> {
818 pub(crate) fn new(
819 page: &'b mut PageMut,
820 fixed_key_size: Option<usize>,
821 fixed_value_size: Option<usize>,
822 ) -> Self {
823 assert_eq!(page.memory_mut()[0], LEAF);
824 Self {
825 page,
826 fixed_key_size,
827 fixed_value_size,
828 }
829 }
830
831 pub(super) fn sufficient_insert_inplace_space(
832 page: &'_ PageImpl,
833 position: usize,
834 overwrite: bool,
835 fixed_key_size: Option<usize>,
836 fixed_value_size: Option<usize>,
837 new_key: &[u8],
838 new_value: &[u8],
839 ) -> bool {
840 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
841 if overwrite {
842 let remaining = page.memory().len() - accessor.total_length();
843 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
844 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
845 required_delta <= isize::try_from(remaining).unwrap()
846 } else {
847 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
852 return false;
853 }
854 let remaining = page.memory().len() - accessor.total_length();
855 let mut required_delta = new_key.len() + new_value.len();
856 if fixed_key_size.is_none() {
857 required_delta += size_of::<u32>();
858 }
859 if fixed_value_size.is_none() {
860 required_delta += size_of::<u32>();
861 }
862 required_delta <= remaining
863 }
864 }
865
866 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
868 let accessor = LeafAccessor::new(
869 self.page.memory(),
870 self.fixed_key_size,
871 self.fixed_value_size,
872 );
873 let required_delta = if overwrite {
874 isize::try_from(key.len() + value.len()).unwrap()
875 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
876 } else {
877 let mut delta = key.len() + value.len();
878 if self.fixed_key_size.is_none() {
879 delta += size_of::<u32>();
880 }
881 if self.fixed_value_size.is_none() {
882 delta += size_of::<u32>();
883 }
884 delta.try_into().unwrap()
885 };
886 assert!(
887 isize::try_from(accessor.total_length()).unwrap() + required_delta
888 <= isize::try_from(self.page.memory().len()).unwrap()
889 );
890
891 let num_pairs = accessor.num_pairs();
892 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
893 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
894 let shift_index = if overwrite { i + 1 } else { i };
895 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
896 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
897 let existing_value_len = accessor
898 .value_range(i)
899 .map(|(start, end)| end - start)
900 .unwrap_or_default();
901
902 let value_delta = if overwrite {
903 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
904 } else {
905 value.len().try_into().unwrap()
906 };
907
908 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
910 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
911 4
912 } else {
913 0
914 };
915 if !overwrite {
916 for j in 0..i {
917 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
918 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
919 .try_into()
920 .unwrap();
921 self.update_value_end(j, value_delta);
922 }
923 }
924 for j in i..num_pairs {
925 if overwrite {
926 self.update_value_end(j, value_delta);
927 } else {
928 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
929 .try_into()
930 .unwrap();
931 self.update_key_end(j, key_delta);
932 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
933 self.update_value_end(j, value_delta);
934 }
935 }
936
937 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
938 self.page.memory_mut()[2..4]
939 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
940
941 let mut dest = if overwrite {
943 (isize::try_from(shift_value_start).unwrap() + value_delta)
944 .try_into()
945 .unwrap()
946 } else {
947 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
948 };
949 let start = shift_value_start;
950 let end = last_value_end;
951 self.page.memory_mut().copy_within(start..end, dest);
952
953 let inserted_value_end: u32 = dest.try_into().unwrap();
955 dest -= value.len();
956 self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
957
958 if !overwrite {
959 let start = shift_key_start;
961 let end = shift_value_start;
962 dest -= end - start;
963 self.page.memory_mut().copy_within(start..end, dest);
964
965 let inserted_key_end: u32 = dest.try_into().unwrap();
967 dest -= key.len();
968 self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
969
970 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
972 let end = shift_key_start;
973 dest -= end - start;
974 debug_assert_eq!(
975 dest,
976 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
977 );
978 self.page.memory_mut().copy_within(start..end, dest);
979
980 if self.fixed_value_size.is_none() {
982 dest -= size_of::<u32>();
983 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
984 .copy_from_slice(&inserted_value_end.to_le_bytes());
985 }
986
987 let start = 4 + key_ptr_size * i;
989 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
990 dest -= end - start;
991 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
992 self.page.memory_mut().copy_within(start..end, dest);
993
994 if self.fixed_key_size.is_none() {
996 dest -= size_of::<u32>();
997 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
998 .copy_from_slice(&inserted_key_end.to_le_bytes());
999 }
1000 debug_assert_eq!(dest, 4 + key_ptr_size * i);
1001 }
1002 }
1003
1004 pub(super) fn remove(&mut self, i: usize) {
1005 let accessor = LeafAccessor::new(
1006 self.page.memory(),
1007 self.fixed_key_size,
1008 self.fixed_value_size,
1009 );
1010 let num_pairs = accessor.num_pairs();
1011 assert!(i < num_pairs);
1012 assert!(num_pairs > 1);
1013 let key_start = accessor.key_start(i).unwrap();
1014 let key_end = accessor.key_end(i).unwrap();
1015 let value_start = accessor.value_start(i).unwrap();
1016 let value_end = accessor.value_end(i).unwrap();
1017 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1018
1019 let key_ptr_size = if self.fixed_key_size.is_none() {
1021 size_of::<u32>()
1022 } else {
1023 0
1024 };
1025 let value_ptr_size = if self.fixed_value_size.is_none() {
1026 size_of::<u32>()
1027 } else {
1028 0
1029 };
1030 for j in 0..i {
1031 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1032 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1033 - isize::try_from(key_end - key_start).unwrap();
1034 self.update_value_end(j, value_delta);
1035 }
1036 for j in (i + 1)..num_pairs {
1037 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1038 - isize::try_from(key_end - key_start).unwrap();
1039 self.update_key_end(j, key_delta);
1040 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1041 self.update_value_end(j, value_delta);
1042 }
1043
1044 let new_num_pairs = num_pairs - 1;
1047 self.page.memory_mut()[2..4]
1048 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1049 let mut dest = 4 + key_ptr_size * i;
1051 let start = 4 + key_ptr_size * (i + 1);
1053 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1055 self.page.memory_mut().copy_within(start..end, dest);
1056 dest += end - start;
1057 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1058
1059 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1061 let end = key_start;
1062 self.page.memory_mut().copy_within(start..end, dest);
1063 dest += end - start;
1064
1065 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1066 debug_assert_eq!(
1067 dest,
1068 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1069 );
1070
1071 let start = key_end;
1073 let end = value_start;
1074 self.page.memory_mut().copy_within(start..end, dest);
1075 dest += end - start;
1076
1077 let preceding_data_len =
1079 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1080 debug_assert_eq!(
1081 dest,
1082 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1083 );
1084 let start = value_end;
1085 let end = last_value_end;
1086 self.page.memory_mut().copy_within(start..end, dest);
1087 }
1088
1089 fn update_key_end(&mut self, i: usize, delta: isize) {
1090 if self.fixed_key_size.is_some() {
1091 return;
1092 }
1093 let offset = 4 + size_of::<u32>() * i;
1094 let mut ptr = u32::from_le_bytes(
1095 self.page.memory()[offset..(offset + size_of::<u32>())]
1096 .try_into()
1097 .unwrap(),
1098 );
1099 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1100 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1101 .copy_from_slice(&ptr.to_le_bytes());
1102 }
1103
1104 fn update_value_end(&mut self, i: usize, delta: isize) {
1105 if self.fixed_value_size.is_some() {
1106 return;
1107 }
1108 let accessor = LeafAccessor::new(
1109 self.page.memory(),
1110 self.fixed_key_size,
1111 self.fixed_value_size,
1112 );
1113 let num_pairs = accessor.num_pairs();
1114 let mut offset = 4 + size_of::<u32>() * i;
1115 if self.fixed_key_size.is_none() {
1116 offset += size_of::<u32>() * num_pairs;
1117 }
1118 let mut ptr = u32::from_le_bytes(
1119 self.page.memory()[offset..(offset + size_of::<u32>())]
1120 .try_into()
1121 .unwrap(),
1122 );
1123 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1124 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1125 .copy_from_slice(&ptr.to_le_bytes());
1126 }
1127}
1128
1129pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1132 page: &'b T,
1133 num_keys: usize,
1134 fixed_key_size: Option<usize>,
1135 _page_lifetime: PhantomData<&'a ()>,
1136}
1137
1138impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1139 pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1140 debug_assert_eq!(page.memory()[0], BRANCH);
1141 let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1142 BranchAccessor {
1143 page,
1144 num_keys,
1145 fixed_key_size,
1146 _page_lifetime: Default::default(),
1147 }
1148 }
1149
1150 pub(super) fn print_node<K: Key>(&self) {
1151 eprint!(
1152 "Internal[ (page={:?}), child_0={:?}",
1153 self.page.get_page_number(),
1154 self.child_page(0).unwrap()
1155 );
1156 for i in 0..(self.count_children() - 1) {
1157 if let Some(child) = self.child_page(i + 1) {
1158 let key = self.key(i).unwrap();
1159 eprint!(" key_{i}={:?}", K::from_bytes(key));
1160 eprint!(" child_{}={child:?}", i + 1);
1161 }
1162 }
1163 eprint!("]");
1164 }
1165
1166 pub(crate) fn total_length(&self) -> usize {
1167 self.key_end(self.num_keys() - 1).unwrap()
1169 }
1170
1171 pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1172 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1175 let mid = min_child.midpoint(max_child);
1176 match K::compare(query, self.key(mid).unwrap()) {
1177 Ordering::Less => {
1178 max_child = mid;
1179 }
1180 Ordering::Equal => {
1181 return (mid, self.child_page(mid).unwrap());
1182 }
1183 Ordering::Greater => {
1184 min_child = mid + 1;
1185 }
1186 }
1187 }
1188 debug_assert_eq!(min_child, max_child);
1189
1190 (min_child, self.child_page(min_child).unwrap())
1191 }
1192
1193 fn key_section_start(&self) -> usize {
1194 if self.fixed_key_size.is_none() {
1195 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1196 + size_of::<u32>() * self.num_keys()
1197 } else {
1198 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1199 }
1200 }
1201
1202 fn key_offset(&self, n: usize) -> usize {
1203 if n == 0 {
1204 self.key_section_start()
1205 } else {
1206 self.key_end(n - 1).unwrap()
1207 }
1208 }
1209
1210 fn key_end(&self, n: usize) -> Option<usize> {
1211 if let Some(fixed) = self.fixed_key_size {
1212 return Some(self.key_section_start() + fixed * (n + 1));
1213 }
1214 let offset = 8
1215 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1216 + size_of::<u32>() * n;
1217 Some(u32::from_le_bytes(
1218 self.page
1219 .memory()
1220 .get(offset..(offset + size_of::<u32>()))?
1221 .try_into()
1222 .unwrap(),
1223 ) as usize)
1224 }
1225
1226 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1227 if n >= self.num_keys() {
1228 return None;
1229 }
1230 let offset = self.key_offset(n);
1231 let end = self.key_end(n)?;
1232 Some(&self.page.memory()[offset..end])
1233 }
1234
1235 pub(crate) fn count_children(&self) -> usize {
1236 self.num_keys() + 1
1237 }
1238
1239 pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1240 if n >= self.count_children() {
1241 return None;
1242 }
1243
1244 let offset = 8 + size_of::<Checksum>() * n;
1245 Some(Checksum::from_le_bytes(
1246 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1247 .try_into()
1248 .unwrap(),
1249 ))
1250 }
1251
1252 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1253 if n >= self.count_children() {
1254 return None;
1255 }
1256
1257 let offset =
1258 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1259 Some(PageNumber::from_le_bytes(
1260 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1261 .try_into()
1262 .unwrap(),
1263 ))
1264 }
1265
1266 fn num_keys(&self) -> usize {
1267 self.num_keys
1268 }
1269}
1270
1271pub(super) struct BranchBuilder<'a, 'b> {
1272 children: Vec<(PageNumber, Checksum)>,
1273 keys: Vec<&'a [u8]>,
1274 total_key_bytes: usize,
1275 fixed_key_size: Option<usize>,
1276 mem: &'b TransactionalMemory,
1277 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1278}
1279
1280impl<'a, 'b> BranchBuilder<'a, 'b> {
1281 pub(super) fn new(
1282 mem: &'b TransactionalMemory,
1283 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1284 child_capacity: usize,
1285 fixed_key_size: Option<usize>,
1286 ) -> Self {
1287 Self {
1288 children: Vec::with_capacity(child_capacity),
1289 keys: Vec::with_capacity(child_capacity - 1),
1290 total_key_bytes: 0,
1291 fixed_key_size,
1292 mem,
1293 allocated_pages,
1294 }
1295 }
1296
1297 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1298 self.children[index] = (child, checksum);
1299 }
1300
1301 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1302 self.children.push((child, checksum));
1303 }
1304
1305 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1306 self.keys.push(key);
1307 self.total_key_bytes += key.len();
1308 }
1309
1310 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1311 for i in 0..accessor.count_children() {
1312 let child = accessor.child_page(i).unwrap();
1313 let checksum = accessor.child_checksum(i).unwrap();
1314 self.push_child(child, checksum);
1315 }
1316 for i in 0..(accessor.count_children() - 1) {
1317 self.push_key(accessor.key(i).unwrap());
1318 }
1319 }
1320
1321 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1322 if self.children.len() > 1 {
1323 None
1324 } else {
1325 Some(self.children[0])
1326 }
1327 }
1328
1329 pub(super) fn build(self) -> Result<PageMut> {
1330 assert_eq!(self.children.len(), self.keys.len() + 1);
1331 let size = RawBranchBuilder::required_bytes(
1332 self.keys.len(),
1333 self.total_key_bytes,
1334 self.fixed_key_size,
1335 );
1336 let mut allocated_pages = self.allocated_pages.lock().unwrap();
1337 let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1338 let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1339 builder.write_first_page(self.children[0].0, self.children[0].1);
1340 for i in 1..self.children.len() {
1341 let key = &self.keys[i - 1];
1342 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1343 }
1344 drop(builder);
1345
1346 Ok(page)
1347 }
1348
1349 pub(super) fn should_split(&self) -> bool {
1350 let size = RawBranchBuilder::required_bytes(
1351 self.keys.len(),
1352 self.total_key_bytes,
1353 self.fixed_key_size,
1354 );
1355 size > self.mem.get_page_size() && self.keys.len() >= 3
1356 }
1357
1358 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1359 let mut allocated_pages = self.allocated_pages.lock().unwrap();
1360 assert_eq!(self.children.len(), self.keys.len() + 1);
1361 assert!(self.keys.len() >= 3);
1362 let division = self.keys.len() / 2;
1363 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1364 let division_key = self.keys[division];
1365 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1366
1367 let size =
1368 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1369 let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1370 let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1371 builder.write_first_page(self.children[0].0, self.children[0].1);
1372 for i in 0..division {
1373 let key = &self.keys[i];
1374 builder.write_nth_key(
1375 key.as_ref(),
1376 self.children[i + 1].0,
1377 self.children[i + 1].1,
1378 i,
1379 );
1380 }
1381 drop(builder);
1382
1383 let size = RawBranchBuilder::required_bytes(
1384 self.keys.len() - division - 1,
1385 second_split_key_len,
1386 self.fixed_key_size,
1387 );
1388 let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1389 let mut builder = RawBranchBuilder::new(
1390 &mut page2,
1391 self.keys.len() - division - 1,
1392 self.fixed_key_size,
1393 );
1394 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1395 for i in (division + 1)..self.keys.len() {
1396 let key = &self.keys[i];
1397 builder.write_nth_key(
1398 key.as_ref(),
1399 self.children[i + 1].0,
1400 self.children[i + 1].1,
1401 i - division - 1,
1402 );
1403 }
1404 drop(builder);
1405
1406 Ok((page1, division_key, page2))
1407 }
1408}
1409
1410pub(super) struct RawBranchBuilder<'b> {
1426 page: &'b mut PageMut,
1427 fixed_key_size: Option<usize>,
1428 num_keys: usize,
1429 keys_written: usize, }
1431
1432impl<'b> RawBranchBuilder<'b> {
1433 pub(super) fn required_bytes(
1434 num_keys: usize,
1435 size_of_keys: usize,
1436 fixed_key_size: Option<usize>,
1437 ) -> usize {
1438 if fixed_key_size.is_none() {
1439 let fixed_size = 8
1440 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1441 + size_of::<u32>() * num_keys;
1442 size_of_keys + fixed_size
1443 } else {
1444 let fixed_size =
1445 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1446 size_of_keys + fixed_size
1447 }
1448 }
1449
1450 pub(super) fn new(
1452 page: &'b mut PageMut,
1453 num_keys: usize,
1454 fixed_key_size: Option<usize>,
1455 ) -> Self {
1456 assert!(num_keys > 0);
1457 page.memory_mut()[0] = BRANCH;
1458 page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1459 #[cfg(debug_assertions)]
1460 {
1461 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1463 let last = 8
1464 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1465 + size_of::<u32>() * num_keys;
1466 for x in &mut page.memory_mut()[start..last] {
1467 *x = 0xFF;
1468 }
1469 }
1470 RawBranchBuilder {
1471 page,
1472 fixed_key_size,
1473 num_keys,
1474 keys_written: 0,
1475 }
1476 }
1477
1478 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1479 let offset = 8;
1480 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1481 .copy_from_slice(&checksum.to_le_bytes());
1482 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1483 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1484 .copy_from_slice(&page_number.to_le_bytes());
1485 }
1486
1487 fn key_section_start(&self) -> usize {
1488 let mut offset =
1489 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1490 if self.fixed_key_size.is_none() {
1491 offset += size_of::<u32>() * self.num_keys;
1492 }
1493
1494 offset
1495 }
1496
1497 fn key_end(&self, n: usize) -> usize {
1498 if let Some(fixed) = self.fixed_key_size {
1499 return self.key_section_start() + fixed * (n + 1);
1500 }
1501 let offset = 8
1502 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1503 + size_of::<u32>() * n;
1504 u32::from_le_bytes(
1505 self.page.memory()[offset..(offset + size_of::<u32>())]
1506 .try_into()
1507 .unwrap(),
1508 ) as usize
1509 }
1510
1511 pub(super) fn write_nth_key(
1514 &mut self,
1515 key: &[u8],
1516 page_number: PageNumber,
1517 checksum: Checksum,
1518 n: usize,
1519 ) {
1520 assert!(n < self.num_keys);
1521 assert_eq!(n, self.keys_written);
1522 self.keys_written += 1;
1523 let offset = 8 + size_of::<Checksum>() * (n + 1);
1524 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1525 .copy_from_slice(&checksum.to_le_bytes());
1526 let offset = 8
1527 + size_of::<Checksum>() * (self.num_keys + 1)
1528 + PageNumber::serialized_size() * (n + 1);
1529 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1530 .copy_from_slice(&page_number.to_le_bytes());
1531
1532 let data_offset = if n > 0 {
1533 self.key_end(n - 1)
1534 } else {
1535 self.key_section_start()
1536 };
1537 if self.fixed_key_size.is_none() {
1538 let offset = 8
1539 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1540 + size_of::<u32>() * n;
1541 self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1542 &u32::try_from(data_offset + key.len())
1543 .unwrap()
1544 .to_le_bytes(),
1545 );
1546 }
1547
1548 debug_assert!(data_offset > offset);
1549 self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1550 }
1551}
1552
1553impl Drop for RawBranchBuilder<'_> {
1554 fn drop(&mut self) {
1555 if !thread::panicking() {
1556 assert_eq!(self.keys_written, self.num_keys);
1557 }
1558 }
1559}
1560
1561pub(crate) struct BranchMutator<'b> {
1562 page: &'b mut PageMut,
1563}
1564
1565impl<'b> BranchMutator<'b> {
1566 pub(crate) fn new(page: &'b mut PageMut) -> Self {
1567 assert_eq!(page.memory()[0], BRANCH);
1568 Self { page }
1569 }
1570
1571 fn num_keys(&self) -> usize {
1572 u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1573 }
1574
1575 pub(crate) fn write_child_page(
1576 &mut self,
1577 i: usize,
1578 page_number: PageNumber,
1579 checksum: Checksum,
1580 ) {
1581 debug_assert!(i <= self.num_keys());
1582 let offset = 8 + size_of::<Checksum>() * i;
1583 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1584 .copy_from_slice(&checksum.to_le_bytes());
1585 let offset =
1586 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1587 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1588 .copy_from_slice(&page_number.to_le_bytes());
1589 }
1590}