1
use std::fmt::{Display, Write as _};
2
use std::io::{BufWriter, Read, Seek, Write};
3
use std::ops::{AddAssign, Deref, DerefMut};
4
use std::str::FromStr;
5

            
6
use crc32c::crc32c;
7
use okaywal::EntryId;
8

            
9
use crate::commit_log::CommitLogEntry;
10
use crate::{Error, Result};
11

            
12
287436
#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
13
pub struct GrainId(u64);
14

            
15
impl GrainId {
16
    pub const NONE: Self = Self(u64::MAX);
17

            
18
30756
    pub const fn new(basin: BasinId, stratum: StratumId, id: LocalGrainId) -> Self {
19
30756
        Self((basin.0 as u64) << 61 | stratum.0 << 20 | id.0)
20
30756
    }
21

            
22
35940
    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
23
35940
        let value = u64::from_be_bytes(bytes.try_into().ok()?);
24
35940
        if value != u64::MAX {
25
35011
            Some(Self(value))
26
        } else {
27
929
            None
28
        }
29
35940
    }
30

            
31
30523
    pub const fn to_bytes(self) -> [u8; 8] {
32
30523
        self.0.to_be_bytes()
33
30523
    }
34

            
35
213459
    pub const fn basin_id(self) -> BasinId {
36
213459
        BasinId((self.0 >> 61) as u8)
37
213459
    }
38

            
39
1127
    pub fn local_grain_id(self) -> LocalGrainId {
40
1127
        LocalGrainId(self.0 & 0xF_FFFF)
41
1127
    }
42

            
43
155838
    pub const fn local_grain_index(self) -> GrainIndex {
44
155838
        GrainIndex(((self.0 >> 6) & 0x3FFF) as u16)
45
155838
    }
46

            
47
253220
    pub const fn grain_count(self) -> u8 {
48
253220
        (self.0 & 0x3f) as u8
49
253220
    }
50

            
51
2563
    pub const fn basin_and_stratum(self) -> BasinAndStratum {
52
2563
        BasinAndStratum(self.0 >> 20)
53
2563
    }
54

            
55
153843
    pub const fn stratum_id(self) -> StratumId {
56
153843
        StratumId((self.0 >> 20) & 0x1ff_ffff_ffff)
57
153843
    }
58

            
59
59539
    pub(crate) const fn file_position(self) -> u64 {
60
59539
        let grain_size = self.basin_id().grain_stripe_bytes() as u64;
61
59539
        let index = self.local_grain_index().as_u16() as u64;
62
59539
        let header_size = StratumHeader::BYTES * 2;
63
59539

            
64
59539
        header_size + index * grain_size
65
59539
    }
66
}
67

            
68
impl FromStr for GrainId {
69
    type Err = GrainIdError;
70

            
71
8
    fn from_str(s: &str) -> Result<Self, Self::Err> {
72
8
        let mut parts = s.split('-');
73
8
        let basin_and_stratum = parts.next().ok_or(GrainIdError::InvalidFormat)?;
74
8
        let index = parts.next().ok_or(GrainIdError::InvalidFormat)?;
75
8
        if parts.next().is_some() || basin_and_stratum.len() < 2 {
76
1
            return Err(GrainIdError::InvalidFormat);
77
7
        }
78

            
79
7
        let basin_and_stratum = BasinAndStratum::from_str(basin_and_stratum)?;
80

            
81
3
        let index_and_count =
82
4
            u64::from_str_radix(index, 16).map_err(|_| GrainIdError::InvalidGrainIndex)?;
83
3
        let count = (index_and_count & 0x3f) as u8;
84
3
        let index = GrainIndex::new((index_and_count >> 6) as u16)
85
3
            .ok_or(GrainIdError::InvalidGrainIndex)?;
86
2
        let id = LocalGrainId::from_parts(index, count).ok_or(GrainIdError::InvalidGrainIndex)?;
87

            
88
2
        Ok(Self::new(
89
2
            basin_and_stratum.basin(),
90
2
            basin_and_stratum.stratum(),
91
2
            id,
92
2
        ))
93
8
    }
94
}
95

            
96
impl std::fmt::Debug for GrainId {
97
364
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98
364
        let basin_id = self.basin_id();
99
364
        let stratum_id = self.stratum_id();
100
364
        let local_index = self.local_grain_index();
101
364
        let count = self.grain_count();
102
364
        f.debug_struct("GrainId")
103
364
            .field("basin", &basin_id.0)
104
364
            .field("stratum", &stratum_id.0)
105
364
            .field("index", &local_index.0)
106
364
            .field("count", &count)
107
364
            .finish()
108
364
    }
109
}
110

            
111
#[derive(Debug, Eq, PartialEq)]
112
pub enum GrainIdError {
113
    InvalidFormat,
114
    InvalidBasinId,
115
    InvalidStratum,
116
    InvalidGrainIndex,
117
    InvalidGrainCount,
118
}
119

            
120
impl Display for GrainId {
121
2
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122
2
        let basin_and_stratum = self.basin_and_stratum();
123
2
        let local_index = self.local_grain_id();
124
2
        write!(f, "{basin_and_stratum}-{local_index}")
125
2
    }
126
}
127

            
128
1
#[test]
129
1
fn grain_id_strings() {
130
1
    let zero = GrainId(0);
131
1
    assert_eq!(zero.to_string(), "00-0");
132
1
    let none = GrainId::NONE;
133
1
    assert_eq!(none.to_string(), "71ffffffffff-fffff");
134
1
    assert_eq!(
135
1
        GrainId::from_str("71ffffffffff-fffff").unwrap(),
136
1
        GrainId::NONE
137
1
    );
138
1
    assert!(GrainId::from_str("72fffffffffff-fffff").is_err());
139
1
    assert!(GrainId::from_str("71fffffffffff-1fffff").is_err());
140
1
    assert!(GrainId::from_str("81fffffffffff-3fff").is_err());
141
1
    assert!(GrainId::from_str("---").is_err());
142
1
    assert!(GrainId::from_str("71ffffffffff-FFFFFFFFFFFFFFFFF").is_err());
143
1
    assert!(GrainId::from_str("0FFFFFFFFFFFFFFFFF-3fff").is_err());
144
1
}
145

            
146
30525
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
147
pub struct StratumId(u64);
148

            
149
impl StratumId {
150
691
    pub const fn new(id: u64) -> Option<Self> {
151
691
        if id < 2_u64.pow(45) {
152
690
            Some(Self(id))
153
        } else {
154
1
            None
155
        }
156
691
    }
157

            
158
152036
    pub const fn as_usize(self) -> usize {
159
152036
        self.0 as usize
160
152036
    }
161

            
162
596
    pub const fn as_u64(self) -> u64 {
163
596
        self.0
164
596
    }
165
}
166

            
167
impl Display for StratumId {
168
114
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169
114
        write!(f, "{:0x}", self.0)
170
114
    }
171
}
172

            
173
246032
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, Default)]
174
pub struct BasinId(u8);
175

            
176
impl BasinId {
177
    pub const MAX: Self = BasinId(7);
178
    pub const MIN: Self = BasinId(0);
179

            
180
251551
    pub const fn new(id: u8) -> Option<Self> {
181
251551
        if id < 8 {
182
250862
            Some(Self(id))
183
        } else {
184
689
            None
185
        }
186
251551
    }
187

            
188
122
    pub fn to_char(self) -> char {
189
122
        (b'0' + self.0) as char
190
122
    }
191

            
192
1227
    pub fn from_char(ch: char) -> Option<Self> {
193
1227
        if ('0'..='7').contains(&ch) {
194
314
            Some(Self(ch as u8 - b'0'))
195
        } else {
196
913
            None
197
        }
198
1227
    }
199

            
200
421220
    pub const fn index(self) -> u8 {
201
421220
        self.0
202
421220
    }
203

            
204
5520
    pub const fn next(self) -> Option<Self> {
205
5520
        Self::new(self.0 + 1)
206
5520
    }
207
}
208

            
209
impl Display for BasinId {
210
114
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211
114
        f.write_char(self.to_char())
212
114
    }
213
}
214

            
215
impl BasinId {
216
305257
    pub const fn grain_stripe_bytes(self) -> u32 {
217
305257
        match self.0 {
218
67233
            0 => 2_u32.pow(5),
219
53500
            1 => 2_u32.pow(8),
220
30754
            2 => 2_u32.pow(12),
221
30754
            3 => 2_u32.pow(16),
222
30754
            4 => 2_u32.pow(20),
223
30754
            5 => 2_u32.pow(24),
224
30754
            6 => 2_u32.pow(28),
225
30754
            7 => 2_u32.pow(31),
226
            _ => unreachable!(),
227
        }
228
305257
    }
229
}
230

            
231
1399
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
232
pub struct BasinAndStratum(u64);
233

            
234
impl BasinAndStratum {
235
51
    pub const fn from_parts(basin: BasinId, stratum: StratumId) -> Self {
236
51
        Self((basin.0 as u64) << 41 | stratum.0)
237
51
    }
238

            
239
1255
    pub fn basin(self) -> BasinId {
240
1255
        BasinId((self.0 >> 41) as u8)
241
1255
    }
242

            
243
963
    pub fn stratum(self) -> StratumId {
244
963
        StratumId(self.0 & 0x1ff_ffff_ffff)
245
963
    }
246
}
247

            
248
impl Display for BasinAndStratum {
249
70
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250
70
        let basin = self.basin();
251
70
        let stratum = self.stratum();
252
70
        write!(f, "{basin}{stratum}")
253
70
    }
254
}
255

            
256
impl FromStr for BasinAndStratum {
257
    type Err = GrainIdError;
258

            
259
1219
    fn from_str(basin_and_stratum: &str) -> Result<Self, Self::Err> {
260
1219
        let (basin, stratum) = basin_and_stratum.split_at(1);
261
1219
        let Some(basin) = BasinId::from_char(basin.as_bytes()[0] as char)
262
913
            else { return Err(GrainIdError::InvalidBasinId) };
263

            
264
306
        let stratum = u64::from_str_radix(stratum, 16).map_err(|_| GrainIdError::InvalidStratum)?;
265
305
        let stratum = StratumId::new(stratum).ok_or(GrainIdError::InvalidStratum)?;
266

            
267
304
        Ok(Self(u64::from(basin.0) << 41 | stratum.0))
268
1219
    }
269
}
270

            
271
1
#[test]
272
1
fn basin_id_encoding() {
273
8
    for (ch, value) in ('0'..='7').zip(0..=7) {
274
8
        let expected = BasinId(value);
275
8
        assert_eq!(BasinId::from_char(ch), Some(expected));
276
8
        assert_eq!(expected.to_char(), ch);
277
    }
278
1
}
279

            
280
261068
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
281
pub struct GrainIndex(u16);
282

            
283
impl GrainIndex {
284
62399
    pub const fn new(id: u16) -> Option<Self> {
285
62399
        if id < 2_u16.pow(14) {
286
62398
            Some(Self(id))
287
        } else {
288
1
            None
289
        }
290
62399
    }
291

            
292
515707
    pub const fn as_u16(self) -> u16 {
293
515707
        self.0
294
515707
    }
295
}
296

            
297
impl AddAssign<u8> for GrainIndex {
298
30741
    fn add_assign(&mut self, rhs: u8) {
299
30741
        self.0 += u16::from(rhs);
300
30741
    }
301
}
302

            
303
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
304
pub struct LocalGrainId(u64);
305

            
306
impl LocalGrainId {
307
30760
    pub const fn from_parts(index: GrainIndex, grain_count: u8) -> Option<Self> {
308
30760
        if grain_count < 64 {
309
30760
            Some(Self((index.0 as u64) << 6 | grain_count as u64))
310
        } else {
311
            None
312
        }
313
30760
    }
314

            
315
1135
    pub const fn grain_index(self) -> GrainIndex {
316
1135
        GrainIndex((self.0 >> 6) as u16)
317
1135
    }
318

            
319
1135
    pub const fn grain_count(self) -> u8 {
320
1135
        (self.0 & 0x3f) as u8
321
1135
    }
322
}
323

            
324
impl Display for LocalGrainId {
325
16
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326
16
        write!(f, "{:0x}", self.0)
327
16
    }
328
}
329

            
330
39203
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Default)]
331
pub struct TransactionId(u64);
332

            
333
impl TransactionId {
334
42719
    pub fn to_be_bytes(self) -> [u8; 8] {
335
42719
        self.0.to_be_bytes()
336
42719
    }
337

            
338
1661
    pub const fn from_be_bytes(bytes: [u8; 8]) -> Self {
339
1661
        Self(u64::from_be_bytes(bytes))
340
1661
    }
341
}
342

            
343
impl From<u64> for TransactionId {
344
7
    fn from(id: u64) -> Self {
345
7
        TransactionId(id)
346
7
    }
347
}
348

            
349
impl From<TransactionId> for u64 {
350
    fn from(id: TransactionId) -> Self {
351
        id.0
352
    }
353
}
354

            
355
impl From<EntryId> for TransactionId {
356
9358
    fn from(id: EntryId) -> Self {
357
9358
        Self(id.0)
358
9358
    }
359
}
360

            
361
impl From<TransactionId> for EntryId {
362
    fn from(tx_id: TransactionId) -> Self {
363
        EntryId(tx_id.0)
364
    }
365
}
366

            
367
impl PartialEq<u64> for TransactionId {
368
    fn eq(&self, other: &u64) -> bool {
369
        self.0 == *other
370
    }
371
}
372

            
373
impl PartialEq<EntryId> for TransactionId {
374
    fn eq(&self, other: &EntryId) -> bool {
375
        self.0 == other.0
376
    }
377
}
378

            
379
impl PartialOrd<EntryId> for TransactionId {
380
3015
    fn partial_cmp(&self, other: &EntryId) -> Option<std::cmp::Ordering> {
381
3015
        self.0.partial_cmp(&other.0)
382
3015
    }
383
}
384

            
385
pub enum FileHeader<T> {
386
    Both(T, T),
387
    First(T),
388
    Second(T),
389
}
390

            
391
impl<T> FileHeader<T>
392
where
393
    T: Duplicable,
394
{
395
600
    pub fn read_from<R: Read + Seek>(mut file: R, scratch: &mut Vec<u8>) -> Result<Self> {
396
600
        let first_header = T::read_from(&mut file, scratch);
397
600
        if first_header.is_err() {
398
4
            file.seek(std::io::SeekFrom::Start(T::BYTES))?;
399
596
        }
400
600
        let second_header = T::read_from(&mut file, scratch);
401
600
        match (first_header, second_header) {
402
591
            (Ok(first_header), Ok(second_header)) => Ok(Self::Both(first_header, second_header)),
403
1
            (Err(err), Err(_)) => Err(err),
404
5
            (Ok(first_header), Err(_)) => Ok(Self::First(first_header)),
405
3
            (Err(_), Ok(second_header)) => Ok(Self::Second(second_header)),
406
        }
407
600
    }
408

            
409
589
    pub fn as_options(&self) -> (Option<&T>, Option<&T>) {
410
589
        match self {
411
581
            FileHeader::Both(first, second) => (Some(first), Some(second)),
412
4
            FileHeader::First(first) => (Some(first), None),
413
4
            FileHeader::Second(second) => (None, Some(second)),
414
        }
415
589
    }
416

            
417
150
    pub fn into_first(self) -> T {
418
150
        match self {
419
150
            FileHeader::Both(first, _) | FileHeader::First(first) => first,
420
            FileHeader::Second(_) => unreachable!("did not contain a valid first"),
421
        }
422
150
    }
423

            
424
142
    pub fn into_second(self) -> T {
425
142
        match self {
426
142
            FileHeader::Both(_, second) | FileHeader::Second(second) => second,
427
            FileHeader::First(_) => unreachable!("did not contain a valid second"),
428
        }
429
142
    }
430
}
431

            
432
pub trait Duplicable: Sized {
433
    const BYTES: u64;
434

            
435
    fn read_from<R: Read>(reader: R, scratch: &mut Vec<u8>) -> Result<Self>;
436
    fn write_to<W: Write>(&mut self, writer: W) -> Result<()>;
437
}
438

            
439
/// A header inside of an "Index" file.
440
///
441
/// This data structure is serialized as:
442
///
443
/// - `transaction_id`: 8 bytes
444
/// - `embedded_header_data`: 8 bytes
445
/// - `commit_log_head`: 8 bytes
446
/// - `checkpoint_target`: 8 bytes
447
/// - `checkpointed_to`: 8 bytes
448
/// - `basin_strata_count`: 8 x 6 bytes (48 bytes).
449
/// - `crc32`: 4 bytes (checksum of previous 88 bytes)
450
///
451
/// The total header length is 36 bytes.
452
///
453
/// # About the Index file
454
///
455
/// Index files are the root of a Sediment database. The header is responsible
456
/// for pointing to several key pieces of data, which will be stored within the
457
/// other files.
458
///
459
/// The Index file is serialized in this fashion:
460
///
461
/// - Magic code + version (4 bytes)
462
/// - [`IndexHeader`]
463
/// - [`IndexHeader`]
464
///
465
/// The record with the highest transaction id should be checked upon recovery
466
/// to ensure that `embedded_header_data` is written with the same
467
/// [`TransactionId`].
468
36
#[derive(Debug, Clone, Eq, PartialEq, Default)]
469
pub struct IndexHeader {
470
    pub transaction_id: TransactionId,
471
    pub embedded_header_data: Option<GrainId>,
472
    pub commit_log_head: Option<GrainId>,
473
    pub checkpoint_target: TransactionId,
474
    pub checkpointed_to: TransactionId,
475
    pub basin_strata_count: [u64; 8],
476
    pub crc32: u32,
477
}
478

            
479
impl Duplicable for IndexHeader {
480
    const BYTES: u64 = 92;
481

            
482
600
    fn read_from<R: Read>(mut file: R, scratch: &mut Vec<u8>) -> Result<Self> {
483
600
        scratch.resize(Self::BYTES as usize, 0);
484
600
        file.read_exact(scratch)?;
485
600
        let crc32 = u32::from_be_bytes(scratch[88..].try_into().expect("u32 is 4 bytes"));
486
600
        let computed_crc = crc32c(&scratch[..88]);
487
600
        if crc32 != computed_crc {
488
4
            return Err(Error::ChecksumFailed);
489
596
        }
490
596

            
491
596
        let (transaction_bytes, remaining) = scratch.split_at(8);
492
596
        let transaction_id = TransactionId(u64::from_be_bytes(
493
596
            transaction_bytes.try_into().expect("u64 is 8 bytes"),
494
596
        ));
495
596
        let (embedded_header_bytes, remaining) = remaining.split_at(8);
496
596
        let embedded_header_data = GrainId::from_bytes(embedded_header_bytes);
497
596
        let (commit_log_head_bytes, remaining) = remaining.split_at(8);
498
596
        let commit_log_head = GrainId::from_bytes(commit_log_head_bytes);
499
596
        let (checkpoint_target_bytes, remaining) = remaining.split_at(8);
500
596
        let checkpoint_target = TransactionId(u64::from_be_bytes(
501
596
            checkpoint_target_bytes.try_into().expect("u64 is 8 bytes"),
502
596
        ));
503
596
        let (checkpointed_to_bytes, mut remaining) = remaining.split_at(8);
504
596
        let checkpointed_to = TransactionId(u64::from_be_bytes(
505
596
            checkpointed_to_bytes.try_into().expect("u64 is 8 bytes"),
506
596
        ));
507
596
        let mut basin_strata_count = [0; 8];
508
5364
        for count in &mut basin_strata_count {
509
4768
            let mut padded_bytes = [0; 8];
510
4768
            padded_bytes[2..].copy_from_slice(&remaining[..6]);
511
4768
            remaining = &remaining[6..];
512
4768
            *count = u64::from_be_bytes(padded_bytes);
513
4768
        }
514

            
515
596
        Ok(Self {
516
596
            transaction_id,
517
596
            embedded_header_data,
518
596
            commit_log_head,
519
596
            checkpoint_target,
520
596
            checkpointed_to,
521
596
            basin_strata_count,
522
596
            crc32,
523
596
        })
524
600
    }
525

            
526
439
    fn write_to<W: std::io::Write>(&mut self, writer: W) -> Result<()> {
527
439
        let mut writer = ChecksumWriter::new(writer);
528
439
        writer.write_all(&self.transaction_id.to_be_bytes())?;
529
439
        writer.write_all(
530
439
            &self
531
439
                .embedded_header_data
532
439
                .unwrap_or(GrainId::NONE)
533
439
                .0
534
439
                .to_be_bytes(),
535
439
        )?;
536
439
        writer.write_all(
537
439
            &self
538
439
                .commit_log_head
539
439
                .unwrap_or(GrainId::NONE)
540
439
                .0
541
439
                .to_be_bytes(),
542
439
        )?;
543
439
        writer.write_all(&self.checkpoint_target.to_be_bytes())?;
544
439
        writer.write_all(&self.checkpointed_to.to_be_bytes())?;
545
3951
        for count in &self.basin_strata_count {
546
3512
            writer.write_all(&count.to_be_bytes()[2..])?;
547
        }
548
439
        let (_, crc32) = writer.write_crc32_and_finish()?;
549
439
        self.crc32 = crc32;
550
439

            
551
439
        Ok(())
552
439
    }
553
}
554

            
555
/// Each Stratum header is 16kb, and describes the state of allocation of each
556
/// grain within the Stratum.
557
///
558
/// It is serialized as:
559
///
560
/// - [`TransactionId`]: 8 bytes
561
/// - [`GrainAllocationInfo`]: 16,372 one-byte entries
562
/// - CRC32: 4 bytes
563
///
564
/// The grain size is determined by the name of the file that contains the
565
/// header.
566
///
567
/// # About Statum files
568
///
569
/// Strata contain the data written to the Sediment database.
570
///
571
/// The header consists of two [`StratumHeader`]s serialized one after another.
572
/// The header with the latest [`TransactionId`] is considered the current
573
/// record. When updating the header, the inactive copy should be overwritten.
574
///
575
/// If an aborted write is detected and a rollback needs to happen, the rolled
576
/// back header should be overwritten with a second copy of the previous
577
/// version.
578
///
579
/// Directly after the two [`StratumHeader`]s is a tightly packed list of
580
/// grains. Each grain is serialized as:
581
///
582
/// - [`TransactionId`]: 8 bytes
583
/// - Data Length: 4 bytes
584
/// - Grain Data: The contiguous data stored within the grain.
585
/// - CRC32: The CRC of the [`TransactionId`] and the grain data.
586
///
587
/// Strata are grouped together to form a Basin. In each Basin, the grain stripe
588
/// size is always the same. The Basin's grain size is determined by the name of
589
/// the Stratum file. The first character is a single Base32 character whose
590
/// value is the exponent of the grain size equation: `2^(grain_exponent)`.
591
/// Because each piece of data must have 16 extra bytes allocated to it, the
592
/// smallest usable grain exponent is 5 (`F`).
593
///
594
/// To find the data associated with a grain, its local grain index must be
595
/// computed. Because each Stratum can contain a maximum of 16,372 grains, the
596
/// remaining characters in a Stratum's file name is a hexadecimal
597
/// representation of the top 50 bits of a `GrainId` in big endian. The
598
/// remaining 14 bits contain the local grain index.
599
///
600
/// The offset of a local grain index is `32kb + local_grain_index *
601
/// grain_size`. Because grains can be stored in stripes of up to 64 consecutive
602
/// grains, not every local grain index will point to the start of a grain
603
/// record. The [`StratumHeader`] must be used to determine if a given local
604
/// grain index is valid before trusting the data stored.
605
#[derive(Debug)]
606
pub struct StratumHeader {
607
    pub transaction_id: TransactionId,
608
    pub grains: [u8; 16372],
609
    pub crc32: u32,
610
}
611

            
612
impl StratumHeader {
613
2835514
    pub const fn grain_info(&self, index: usize) -> GrainAllocationInfo {
614
2835514
        GrainAllocationInfo(self.grains[index])
615
2835514
    }
616

            
617
581
    pub fn reflects_changes_from(&self, commit_log: &CommitLogEntry) -> bool {
618
581
        let new_grains = commit_log
619
581
            .new_grains
620
581
            .iter()
621
581
            .map(|new_grain| (GrainAllocationStatus::Allocated, new_grain.id));
622
581
        let archived_grains = commit_log
623
581
            .archived_grains
624
581
            .iter()
625
581
            .map(|grain| (GrainAllocationStatus::Archived, *grain));
626
581
        let freed_grains = commit_log
627
581
            .freed_grains
628
581
            .iter()
629
581
            .map(|grain| (GrainAllocationStatus::Free, *grain));
630
641
        for (expected_status, grain_id) in new_grains.chain(archived_grains).chain(freed_grains) {
631
641
            let start = usize::from(grain_id.local_grain_index().as_u16());
632
641
            let mut expected_count = grain_id.grain_count();
633
33793
            for info in self
634
641
                .grains
635
641
                .iter()
636
641
                .skip(start)
637
641
                .take(usize::from(expected_count))
638
            {
639
33793
                let info = GrainAllocationInfo(*info);
640

            
641
33793
                let matches = if info.status() == Some(expected_status) {
642
33787
                    if expected_status == GrainAllocationStatus::Free {
643
78
                        info.count() == 0
644
                    } else {
645
33709
                        info.count() == expected_count
646
                    }
647
                } else {
648
6
                    false
649
                };
650

            
651
33793
                if !matches {
652
6
                    return false;
653
33787
                }
654
33787

            
655
33787
                expected_count -= 1;
656
            }
657
        }
658

            
659
575
        true
660
581
    }
661
}
662

            
663
impl Duplicable for StratumHeader {
664
    const BYTES: u64 = 16_384;
665

            
666
600
    fn read_from<R: Read>(mut file: R, scratch: &mut Vec<u8>) -> Result<Self> {
667
600
        scratch.resize(16_384, 0);
668
600
        file.read_exact(scratch)?;
669

            
670
600
        let mut grains = [0; 16_372];
671
600

            
672
600
        let crc32 = u32::from_be_bytes(scratch[16_380..].try_into().expect("u32 is 4 bytes"));
673
600
        let computed_crc = crc32c(&scratch[..16_380]);
674
600
        if crc32 != computed_crc {
675
393222
            if scratch.iter().all(|b| b == &0) {
676
24
                return Ok(Self {
677
24
                    transaction_id: TransactionId::default(),
678
24
                    grains,
679
24
                    crc32: 0,
680
24
                });
681
6
            }
682
6

            
683
6
            return Err(Error::ChecksumFailed);
684
570
        }
685
570

            
686
570
        let transaction_id = TransactionId(u64::from_be_bytes(
687
570
            scratch[..8].try_into().expect("u64 is 8 bytes"),
688
570
        ));
689
570

            
690
570
        grains.copy_from_slice(&scratch[8..16_372 + 8]);
691
570

            
692
570
        Ok(Self {
693
570
            transaction_id,
694
570
            grains,
695
570
            crc32,
696
570
        })
697
600
    }
698

            
699
408
    fn write_to<W: std::io::Write>(&mut self, writer: W) -> Result<()> {
700
408
        let mut writer = ChecksumWriter::new(BufWriter::new(writer));
701
408
        writer.write_all(&self.transaction_id.to_be_bytes())?;
702
408
        writer.write_all(&self.grains)?;
703
408
        self.crc32 = writer.crc32();
704
408
        writer.write_all(&self.crc32.to_be_bytes())?;
705

            
706
408
        writer.flush()?;
707

            
708
408
        Ok(())
709
408
    }
710
}
711

            
712
impl Default for StratumHeader {
713
45
    fn default() -> Self {
714
45
        Self {
715
45
            transaction_id: Default::default(),
716
45
            grains: [0; 16372],
717
45
            crc32: Default::default(),
718
45
        }
719
45
    }
720
}
721

            
722
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
723
#[repr(transparent)]
724
pub struct GrainAllocationInfo(pub u8);
725

            
726
impl GrainAllocationInfo {
727
217800
    pub const fn allocated(count: u8) -> Self {
728
217800
        assert!(count < 64);
729
217800
        Self((1 << 6) | count)
730
217800
    }
731

            
732
3618
    pub const fn archived(count: u8) -> Self {
733
3618
        assert!(count < 64);
734
3618
        Self((2 << 6) | count)
735
3618
    }
736

            
737
5577606
    pub fn status(self) -> Option<GrainAllocationStatus> {
738
5577606
        match self.0 >> 6 {
739
5419819
            0 => Some(GrainAllocationStatus::Free),
740
157540
            1 => Some(GrainAllocationStatus::Allocated),
741
247
            2 => Some(GrainAllocationStatus::Archived),
742
            _ => None,
743
        }
744
5577606
    }
745

            
746
2869301
    pub fn count(self) -> u8 {
747
2869301
        self.0 & 0b0011_1111
748
2869301
    }
749
}
750

            
751
319498
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
752
pub enum GrainAllocationStatus {
753
    Allocated,
754
    Archived,
755
    Free,
756
}
757

            
758
pub trait ByteUtil {
759
    fn to_be_bytes(&self) -> [u8; 8];
760
    fn from_be_bytes(bytes: [u8; 8]) -> Self;
761
}
762

            
763
macro_rules! impl_bytes_for {
764
    ($type:ident) => {
765
        impl ByteUtil for $type {
766
43950
            fn to_be_bytes(&self) -> [u8; 8] {
767
43950
                self.0.to_be_bytes()
768
43950
            }
769

            
770
            fn from_be_bytes(bytes: [u8; 8]) -> Self {
771
                Self(u64::from_be_bytes(bytes))
772
            }
773
        }
774
    };
775
}
776

            
777
impl_bytes_for!(GrainId);
778

            
779
impl ByteUtil for Option<GrainId> {
780
6706
    fn to_be_bytes(&self) -> [u8; 8] {
781
6706
        self.unwrap_or(GrainId::NONE).to_be_bytes()
782
6706
    }
783

            
784
    fn from_be_bytes(bytes: [u8; 8]) -> Self {
785
        let id = GrainId::from_be_bytes(bytes);
786
        if id != GrainId::NONE {
787
            Some(id)
788
        } else {
789
            None
790
        }
791
    }
792
}
793

            
794
pub struct ChecksumWriter<W> {
795
    writer: W,
796
    crc32: u32,
797
}
798

            
799
impl<W> ChecksumWriter<W>
800
where
801
    W: Write,
802
{
803
847
    pub fn new(writer: W) -> Self {
804
847
        Self { writer, crc32: 0 }
805
847
    }
806

            
807
408
    pub fn crc32(&self) -> u32 {
808
408
        self.crc32
809
408
    }
810

            
811
    pub fn write_crc32_and_finish(mut self) -> Result<(W, u32)> {
812
439
        self.write_all(&self.crc32.to_be_bytes())?;
813
439
        Ok((self.writer, self.crc32))
814
439
    }
815
}
816

            
817
impl<W> Write for ChecksumWriter<W>
818
where
819
    W: Write,
820
{
821
7370
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
822
7370
        let bytes_written = self.writer.write(buf)?;
823
7370
        if bytes_written > 0 {
824
7370
            self.crc32 = crc32c::crc32c_append(self.crc32, &buf[..bytes_written]);
825
7370
        }
826
7370
        Ok(bytes_written)
827
7370
    }
828

            
829
408
    fn flush(&mut self) -> std::io::Result<()> {
830
408
        self.writer.flush()
831
408
    }
832
}
833

            
834
#[derive(Debug)]
835
pub struct Stored<T> {
836
    pub grain_id: GrainId,
837
    pub stored: T,
838
}
839

            
840
impl<T> Deref for Stored<T> {
841
    type Target = T;
842

            
843
30616
    fn deref(&self) -> &Self::Target {
844
30616
        &self.stored
845
30616
    }
846
}
847

            
848
impl<T> DerefMut for Stored<T> {
849
    fn deref_mut(&mut self) -> &mut Self::Target {
850
        &mut self.stored
851
    }
852
}