1
use std::collections::{HashMap, HashSet, VecDeque};
2
use std::io::{BufReader, Read, Seek};
3
use std::path::PathBuf;
4
use std::sync::{Arc, Mutex};
5

            
6
use okaywal::file_manager::{OpenOptions, PathId};
7
use okaywal::{file_manager, ChunkReader, LogPosition, WriteAheadLog};
8
use tinyvec::ArrayVec;
9

            
10
use crate::allocations::FreeLocations;
11
use crate::basinmap::BasinMap;
12
use crate::format::{
13
    BasinAndStratum, BasinId, GrainAllocationStatus, GrainId, GrainIndex, StratumHeader, StratumId,
14
    TransactionId,
15
};
16
use crate::store::{BasinState, Store};
17
use crate::util::{u32_to_usize, usize_to_u32};
18
use crate::{Error, Result};
19

            
20
#[derive(Debug)]
21
pub struct Atlas<FileManager> {
22
    data: Mutex<Data<FileManager>>,
23
}
24

            
25
impl<FileManager> Atlas<FileManager>
26
where
27
    FileManager: file_manager::FileManager,
28
{
29
331
    pub fn new(store: &Store<FileManager>) -> Self {
30
331
        let disk_state = store.lock().expect("unable to lock store");
31
331

            
32
331
        let mut basins = BasinMap::new();
33

            
34
331
        for (basin_id, basin) in &disk_state.basins {
35
291
            basins[basin_id] = Some(Basin::from(basin));
36
291
        }
37

            
38
331
        Self {
39
331
            data: Mutex::new(Data {
40
331
                directory: store.directory.clone(),
41
331
                index: IndexMetadata {
42
331
                    commit_log_head: disk_state.index.active.commit_log_head,
43
331
                    embedded_header_data: disk_state.index.active.embedded_header_data,
44
331
                    checkpoint_target: disk_state.index.active.checkpoint_target,
45
331
                    checkpointed_to: disk_state.index.active.checkpointed_to,
46
331
                },
47
331
                basins,
48
331
                uncheckpointed_grains: HashMap::new(),
49
331
                file_manager: store.file_manager.clone(),
50
331
            }),
51
331
        }
52
331
    }
53

            
54
8348
    pub fn current_index_metadata(&self) -> Result<IndexMetadata> {
55
8348
        let data = self.data.lock()?;
56
8348
        Ok(data.index)
57
8348
    }
58

            
59
31075
    pub fn find<'wal>(
60
31075
        &self,
61
31075
        grain: GrainId,
62
31075
        wal: &'wal WriteAheadLog<FileManager>,
63
31075
    ) -> Result<Option<GrainReader<'wal, FileManager>>> {
64
31075
        let data = self.data.lock()?;
65
31075
        match data.uncheckpointed_grains.get(&grain) {
66
14
            Some(UncheckpointedGrain::PendingCommit) => Ok(None),
67
16
            Some(UncheckpointedGrain::InWal(location)) => {
68
16
                let location = *location;
69
16
                let mut chunk_reader = wal.read_at(location)?;
70
                // We hold onto the data lock until after we read from the wal
71
                // to ensure a checkpoint doesn't happen before we start the
72
                // read operation.
73
16
                drop(data);
74
16

            
75
16
                // Skip over the WalChunk info.
76
16
                chunk_reader.read_exact(&mut [0; 9])?;
77

            
78
16
                Ok(Some(GrainReader::InWal(chunk_reader)))
79
            }
80
            None => {
81
31045
                if data.check_grain_validity(grain).is_err() {
82
28
                    return Ok(None);
83
31017
                }
84
31017

            
85
31017
                let file_path = data.basins[grain.basin_id()]
86
31017
                    .as_ref()
87
31017
                    .expect("grain validated")
88
31017
                    .strata
89
31017
                    .get(grain.stratum_id().as_usize())
90
31017
                    .expect("grain validated")
91
31017
                    .path
92
31017
                    .clone();
93
31017

            
94
31017
                // Remove the lock before we do any file operations.
95
31017
                let file_manager = data.file_manager.clone();
96
31017
                drop(data);
97

            
98
31017
                let mut file = file_manager.open(&file_path, OpenOptions::new().read(true))?;
99
                // The grain data starts with the transaction id, followed
100
                // by the byte length.
101
31017
                file.seek(std::io::SeekFrom::Start(grain.file_position() + 8))?;
102
31017
                let mut file = BufReader::new(file);
103
31017
                let mut length = [0; 4];
104
31017
                file.read_exact(&mut length)?;
105
31017
                let length = u32::from_be_bytes(length);
106
31017

            
107
31017
                return Ok(Some(GrainReader::InStratum(StratumGrainReader {
108
31017
                    file,
109
31017
                    length,
110
31017
                    bytes_remaining: length,
111
31017
                })));
112
            }
113
        }
114
31075
    }
115

            
116
30754
    pub fn reserve(&self, length: u32) -> Result<GrainId> {
117
        // First, determine what basins have been allocated, and within those,
118
        // which ones are the best fit (least amount of wasted space). For
119
        // example, storing a 80 byte value as 2 64 byte grains vs 3 32 byte
120
        // grains would waste 48 bytes in one case and waste 0 bytes in the
121
        // other.
122
30754
        let length_with_grain_info = length.checked_add(16).ok_or(Error::GrainTooLarge)?;
123
30754
        let mut data = self.data.lock()?;
124
        // Accessing fields through MutexGuard's DerefMut causes issues with the
125
        // borrow checker extending the lifetime of the borrow across both
126
        // basins and uncheckpointed_grains. So, we perform the DerefMut to get
127
        // the Data pointer first, allowing the borrow checker to see that the
128
        // mutable accesses are unique.
129
30754
        let data = &mut *data;
130
30754
        let mut eligible_basins = ArrayVec::<[(BasinId, u32, bool, u32); 8]>::new();
131
276786
        for basin in 0..=7 {
132
246032
            let basin_id = BasinId::new(basin).expect("valid basin id");
133
246032
            let grain_size = basin_id.grain_stripe_bytes();
134
246032
            let number_of_grains_needed =
135
246032
                if let Some(padded_length) = length_with_grain_info.checked_add(grain_size - 1) {
136
246032
                    padded_length / grain_size
137
                } else {
138
                    todo!("handle large grains")
139
                };
140
246032
            let extra_bytes = number_of_grains_needed * grain_size - length;
141
246032

            
142
246032
            if number_of_grains_needed <= 63 {
143
241624
                eligible_basins.push((
144
241624
                    basin_id,
145
241624
                    number_of_grains_needed,
146
241624
                    data.basins[basin_id].is_some(),
147
241624
                    extra_bytes,
148
241624
                ));
149
241624
            }
150
        }
151

            
152
210870
        eligible_basins.sort_by(|a, b| a.3.cmp(&b.3));
153

            
154
        // Now we have a list of basins to consider.
155
51674
        for (basin_id, number_of_grains_needed, _, _) in eligible_basins
156
30754
            .iter()
157
52009
            .filter(|(_, _, is_allocated, _)| *is_allocated)
158
        {
159
51642
            let basin = data.basins[*basin_id]
160
51642
                .as_mut()
161
51642
                .expect("filter should prevent none");
162
51642

            
163
51642
            let mut free_stata = basin.free_strata.iter_mut();
164
51904
            while let Some(stratum_id) = free_stata.next() {
165
30968
                let stratum = basin
166
30968
                    .strata
167
30968
                    .get_mut(stratum_id.as_usize())
168
30968
                    .expect("strata should be allocated");
169
30968
                if let Ok(grain_id) = allocate_grain_within_stratum(
170
30968
                    stratum,
171
30968
                    &mut data.uncheckpointed_grains,
172
30968
                    *basin_id,
173
30968
                    stratum_id,
174
30968
                    *number_of_grains_needed as u8,
175
30968
                ) {
176
30706
                    return Ok(grain_id);
177
262
                } else if stratum.allocations.is_full() {
178
10
                    free_stata.remove_current();
179
252
                }
180
            }
181
        }
182

            
183
        // We couldn't find an existing stratum that was able to fit the
184
        // allocation. Create a new one.
185
48
        let (basin_id, number_of_grains_needed, is_allocated, _) = eligible_basins
186
48
            .first()
187
48
            .expect("at least one basin should fit");
188
48
        if !*is_allocated {
189
41
            data.basins[*basin_id] = Some(Basin::default());
190
41
        }
191
48
        let basin = data.basins[*basin_id].as_mut().expect("just allocated");
192
48
        let new_id = StratumId::new(basin.strata.len() as u64).expect("valid stratum id");
193
48
        basin
194
48
            .strata
195
48
            .push(Stratum::default_for(PathId::from(data.directory.join(
196
48
                BasinAndStratum::from_parts(*basin_id, new_id).to_string(),
197
48
            ))));
198
48
        basin.free_strata.push(new_id);
199
48
        Ok(allocate_grain_within_stratum(
200
48
            basin.strata.last_mut().expect("just pushed"),
201
48
            &mut data.uncheckpointed_grains,
202
48
            *basin_id,
203
48
            new_id,
204
48
            *number_of_grains_needed as u8,
205
48
        )
206
48
        .expect("empty stratum should have room"))
207
30754
    }
208

            
209
3355
    pub fn note_transaction_committed(
210
3355
        &self,
211
3355
        new_metadata: IndexMetadata,
212
3355
        written_grains: impl IntoIterator<Item = (GrainId, LogPosition)>,
213
3355
        mut freed_grains: &[GrainId],
214
3355
        is_from_wal: bool,
215
3355
    ) -> Result<()> {
216
3355
        let mut data = self.data.lock()?;
217
3355
        let data = &mut *data; // This local deref helps avoid lifetime issues with borrows
218
3355
        data.index = new_metadata;
219
3355
        if is_from_wal {
220
6
            for (grain, log_position) in written_grains {
221
4
                data.uncheckpointed_grains
222
4
                    .insert(grain, UncheckpointedGrain::InWal(log_position));
223
4
                let basin = data.basins.get_or_default(grain.basin_id());
224

            
225
                // We may be committing a grain to a new stratum.
226
6
                while grain.stratum_id().as_usize() >= basin.strata.len() {
227
2
                    let new_id =
228
2
                        StratumId::new(basin.strata.len().try_into()?).expect("invalid statum id");
229
2
                    basin.strata.push(Stratum::default_for(PathId::from(
230
2
                        data.directory.join(
231
2
                            BasinAndStratum::from_parts(grain.basin_id(), new_id).to_string(),
232
2
                        ),
233
2
                    )));
234
                }
235

            
236
4
                let stratum = &mut basin.strata[grain.stratum_id().as_usize()];
237
4
                assert!(stratum.allocations.allocate_grain(grain.local_grain_id()));
238
4
                stratum.known_grains.insert(grain.local_grain_index());
239
            }
240
        } else {
241
34105
            for (grain, log_position) in written_grains {
242
30752
                if let Some(uncheckpointed) = data.uncheckpointed_grains.get_mut(&grain) {
243
30752
                    *uncheckpointed = UncheckpointedGrain::InWal(log_position);
244
30752
                }
245
            }
246
        }
247

            
248
        // We assume that freed_grains is sorted. To avoid continuing to re-look
249
        // up the basin and stratum for grains that are from the same stratum,
250
        // we use two loops -- one to get the stratum and one to do the actual
251
        // free operations. Only the inner loop advances the iterator.
252
3382
        while let Some(next_grain) = freed_grains.first().copied() {
253
27
            let basin = data.basins.get_or_default(next_grain.basin_id());
254
27
            let stratum = &mut basin.strata[next_grain.stratum_id().as_usize()];
255

            
256
1146
            while let Some(grain) = freed_grains
257
1146
                .first()
258
1146
                .filter(|g| g.basin_and_stratum() == next_grain.basin_and_stratum())
259
1146
                .copied()
260
1119
            {
261
1119
                freed_grains = &freed_grains[1..];
262
1119

            
263
1119
                stratum.allocations.free_grain(grain.local_grain_id());
264
1119
                stratum.known_grains.remove(&grain.local_grain_index());
265
1119
            }
266
        }
267

            
268
3355
        Ok(())
269
3355
    }
270

            
271
359
    pub fn note_grains_checkpointed<'a>(
272
359
        &self,
273
359
        checkpointed_grains: impl IntoIterator<Item = &'a (GrainId, GrainAllocationStatus)>,
274
359
    ) -> Result<()> {
275
359
        let mut data = self.data.lock()?;
276
30861
        for (grain, status) in checkpointed_grains {
277
30502
            match status {
278
27928
                GrainAllocationStatus::Allocated => {
279
27928
                    // The grain can now be found in the Stratum, so we can stop
280
27928
                    // returning readers to the WAL.
281
27928
                    data.uncheckpointed_grains.remove(grain);
282
27928
                }
283
1667
                GrainAllocationStatus::Archived => {
284
1667
                    // Archiving has no effect to the Atlas.
285
1667
                }
286
907
                GrainAllocationStatus::Free => {
287
907
                    // The grains area already removed during the WAL phase.
288
907
                }
289
            }
290
        }
291
359
        Ok(())
292
359
    }
293

            
294
3
    pub fn rollback_grains(&self, written_grains: impl IntoIterator<Item = GrainId>) -> Result<()> {
295
3
        let mut data = self.data.lock()?;
296
5
        for grain in written_grains {
297
2
            data.uncheckpointed_grains.remove(&grain);
298
2
            let basin = data.basins[grain.basin_id()]
299
2
                .as_mut()
300
2
                .expect("basin missing");
301
2
            let stratum = basin
302
2
                .strata
303
2
                .get_mut(grain.stratum_id().as_usize())
304
2
                .expect("stratum missing");
305
2

            
306
2
            stratum.allocations.free_grain(grain.local_grain_id());
307
2
            stratum.known_grains.remove(&grain.local_grain_index());
308
2
        }
309
3
        Ok(())
310
3
    }
311

            
312
2005
    pub fn check_grain_validity(&self, grain: GrainId) -> Result<()> {
313
2005
        let data = self.data.lock()?;
314
2005
        data.check_grain_validity(grain)
315
2005
    }
316
}
317

            
318
#[derive(Debug)]
319
struct Data<FileManager> {
320
    directory: Arc<PathBuf>,
321
    index: IndexMetadata,
322
    basins: BasinMap<Basin>,
323
    uncheckpointed_grains: HashMap<GrainId, UncheckpointedGrain>,
324
    file_manager: FileManager,
325
}
326

            
327
impl<FileManager> Data<FileManager> {
328
33050
    pub fn check_grain_validity(&self, grain: GrainId) -> Result<()> {
329
33050
        let basin = self.basins[grain.basin_id()]
330
33050
            .as_ref()
331
33050
            .ok_or(Error::GrainNotAllocated)?;
332

            
333
33046
        let stratum = basin
334
33046
            .strata
335
33046
            .get(grain.stratum_id().as_usize())
336
33046
            .ok_or(Error::GrainNotAllocated)?;
337
33046
        if stratum.known_grains.contains(&grain.local_grain_index()) {
338
33022
            Ok(())
339
        } else {
340
24
            Err(Error::GrainNotAllocated)
341
        }
342
33050
    }
343
}
344

            
345
fn allocate_grain_within_stratum(
346
    stratum: &mut Stratum,
347
    uncheckpointed_grains: &mut HashMap<GrainId, UncheckpointedGrain>,
348

            
349
    basin_id: BasinId,
350
    stratum_id: StratumId,
351
    number_of_grains_needed: u8,
352
) -> Result<GrainId, ()> {
353
31016
    if let Some(index) = stratum.allocations.allocate(number_of_grains_needed) {
354
30754
        let id = GrainId::new(basin_id, stratum_id, index);
355
30754
        uncheckpointed_grains.insert(id, UncheckpointedGrain::PendingCommit);
356
30754
        stratum.known_grains.insert(id.local_grain_index());
357
30754
        Ok(id)
358
    } else {
359
262
        Err(())
360
    }
361
31016
}
362

            
363
43
#[derive(Debug, Default)]
364
struct Basin {
365
    strata: Vec<Stratum>,
366
    free_strata: StratumIdRing,
367
}
368

            
369
impl<'a, File> From<&'a BasinState<File>> for Basin
370
where
371
    File: file_manager::File,
372
{
373
291
    fn from(state: &'a BasinState<File>) -> Self {
374
291
        let mut strata = Vec::new();
375
291
        let mut free_strata = StratumIdRing::default();
376
582
        for stratum in &state.stratum {
377
291
            let stratum = Stratum::from_stratum(stratum.path.clone(), &stratum.header.active);
378
291

            
379
291
            if !stratum.allocations.is_full() {
380
291
                free_strata.push(StratumId::new(strata.len() as u64).expect("valid stratum id"));
381
291
            }
382

            
383
291
            strata.push(stratum);
384
        }
385

            
386
291
        Self {
387
291
            strata,
388
291
            free_strata,
389
291
        }
390
291
    }
391
}
392

            
393
#[derive(Debug)]
394
struct Stratum {
395
    path: PathId,
396
    allocations: FreeLocations,
397
    known_grains: HashSet<GrainIndex>,
398
}
399

            
400
impl Stratum {
401
291
    fn from_stratum(path: PathId, stratum: &StratumHeader) -> Self {
402
291
        let allocations = FreeLocations::from_stratum(stratum);
403
291

            
404
291
        let mut known_grains = HashSet::new();
405
291
        let mut index = 0;
406
2773483
        while index < 16_372 {
407
2773192
            let index_status = stratum.grain_info(index);
408
2773192
            let count = index_status.count();
409
2773192
            let allocated = !matches!(
410
2773192
                index_status.status().expect("invalid header"),
411
                GrainAllocationStatus::Free
412
            );
413

            
414
2773192
            if allocated {
415
62031
                known_grains.insert(
416
62031
                    GrainIndex::new(index.try_into().expect("only valid indexes are used"))
417
62031
                        .expect("only valid grains are used"),
418
62031
                );
419
62031
                index += usize::from(count);
420
2711161
            } else {
421
2711161
                index += 1;
422
2711161
            }
423
        }
424

            
425
291
        Self {
426
291
            path,
427
291
            allocations,
428
291
            known_grains,
429
291
        }
430
291
    }
431

            
432
50
    fn default_for(path: PathId) -> Self {
433
50
        Self {
434
50
            path,
435
50
            allocations: FreeLocations::default(),
436
50
            known_grains: HashSet::default(),
437
50
        }
438
50
    }
439
}
440

            
441
#[derive(Debug)]
442
pub enum GrainReader<'a, FileManager>
443
where
444
    FileManager: file_manager::FileManager,
445
{
446
    InWal(ChunkReader<'a, FileManager>),
447
    InStratum(StratumGrainReader<FileManager::File>),
448
}
449

            
450
impl<'a, FileManager> GrainReader<'a, FileManager>
451
where
452
    FileManager: file_manager::FileManager,
453
{
454
12
    pub const fn bytes_remaining(&self) -> u32 {
455
12
        match self {
456
12
            GrainReader::InWal(reader) => reader.bytes_remaining(),
457
            GrainReader::InStratum(reader) => reader.bytes_remaining,
458
        }
459
12
    }
460

            
461
8
    pub const fn length(&self) -> u32 {
462
8
        match self {
463
8
            GrainReader::InWal(reader) => reader.chunk_length() - 9,
464
            GrainReader::InStratum(reader) => reader.length,
465
        }
466
8
    }
467

            
468
31029
    pub fn read_all_data(mut self) -> Result<Vec<u8>> {
469
31029
        let mut data = Vec::new();
470
31029
        self.read_to_end(&mut data)?;
471

            
472
        // TODO offer a way to do a crc check?
473

            
474
31029
        Ok(data)
475
31029
    }
476
}
477

            
478
impl<'a, FileManager> Read for GrainReader<'a, FileManager>
479
where
480
    FileManager: file_manager::FileManager,
481
{
482
247553
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
483
247553
        match self {
484
38
            GrainReader::InWal(reader) => reader.read(buf),
485
247515
            GrainReader::InStratum(reader) => {
486
247515
                let bytes_remaining = u32_to_usize(reader.bytes_remaining)?;
487
247515
                let bytes_to_read = buf.len().min(bytes_remaining);
488
247515
                let bytes_read = reader.file.read(&mut buf[..bytes_to_read])?;
489
247515
                reader.bytes_remaining -= usize_to_u32(bytes_read)?;
490
247515
                Ok(bytes_read)
491
            }
492
        }
493
247553
    }
494
}
495

            
496
#[derive(Debug)]
497
pub struct StratumGrainReader<File>
498
where
499
    File: file_manager::File,
500
{
501
    file: BufReader<File>,
502
    length: u32,
503
    bytes_remaining: u32,
504
}
505

            
506
#[derive(Debug)]
507
enum UncheckpointedGrain {
508
    PendingCommit,
509
    InWal(LogPosition),
510
}
511

            
512
334
#[derive(Debug, Default)]
513
struct StratumIdRing(VecDeque<StratumId>);
514

            
515
impl StratumIdRing {
516
339
    pub fn push(&mut self, id: StratumId) {
517
339
        self.0.push_back(id);
518
339
    }
519

            
520
51642
    pub fn iter_mut(&mut self) -> StratumIdIter<'_> {
521
51642
        StratumIdIter {
522
51642
            ring: self,
523
51642
            iterated: 0,
524
51642
        }
525
51642
    }
526
}
527

            
528
struct StratumIdIter<'a> {
529
    ring: &'a mut StratumIdRing,
530
    iterated: usize,
531
}
532

            
533
impl<'a> Iterator for StratumIdIter<'a> {
534
    type Item = StratumId;
535

            
536
51904
    fn next(&mut self) -> Option<Self::Item> {
537
51904
        if self.iterated == self.ring.0.len() {
538
20936
            None
539
        } else {
540
            // Cycle the ring, moving the front to the end. We keep track of how
541
            // many times we've iterated to ensure we don't return the same id
542
            // twice.
543
30968
            self.iterated += 1;
544
30968
            self.ring.0.rotate_left(1);
545
30968
            self.ring.0.front().copied()
546
        }
547
51904
    }
548
}
549

            
550
impl<'a> StratumIdIter<'a> {
551
    /// Removes the current id from the ring.
552
    ///
553
    /// # Panics
554
    ///
555
    /// Panics if `Iterator::next()` wasn't called at least once before calling
556
    /// this function.
557
10
    pub fn remove_current(&mut self) {
558
10
        assert!(self.iterated > 0);
559
10
        self.ring.0.pop_front();
560
10
        self.iterated -= 1;
561
10
    }
562
}
563

            
564
#[derive(Debug, Clone, Copy, Default)]
565
pub struct IndexMetadata {
566
    pub embedded_header_data: Option<GrainId>,
567
    pub commit_log_head: Option<GrainId>,
568
    pub checkpoint_target: TransactionId,
569
    pub checkpointed_to: TransactionId,
570
}