1
use std::collections::BTreeMap;
2
use std::ffi::OsStr;
3
use std::io::{self, BufReader, Read, Seek};
4
use std::path::{Path, PathBuf};
5
use std::str::FromStr;
6
use std::sync::{Arc, Mutex, MutexGuard};
7

            
8
use crc32c::crc32c;
9
use okaywal::file_manager::{self, FSyncBatch, FileManager, OpenOptions, PathId};
10

            
11
use crate::basinmap::BasinMap;
12
use crate::commit_log::CommitLogEntry;
13
use crate::format::{
14
    BasinAndStratum, BasinId, Duplicable, FileHeader, GrainId, IndexHeader, StratumHeader,
15
    StratumId, TransactionId,
16
};
17
use crate::util::u32_to_usize;
18
use crate::{Error, Result};
19

            
20
#[derive(Debug)]
21
pub struct Store<FileManager>
22
where
23
    FileManager: file_manager::FileManager,
24
{
25
    pub directory: Arc<PathBuf>,
26
    disk_state: Mutex<DiskState<FileManager::File>>,
27
    pub file_manager: FileManager,
28
}
29

            
30
impl<FileManager> Store<FileManager>
31
where
32
    FileManager: file_manager::FileManager,
33
{
34
336
    pub fn recover(path: &Path, file_manager: FileManager) -> Result<Self> {
35
336
        let disk_state = DiskState::recover(path, &file_manager)?;
36
331
        Ok(Self {
37
331
            directory: Arc::new(path.to_path_buf()),
38
331
            disk_state: Mutex::new(disk_state),
39
331
            file_manager,
40
331
        })
41
336
    }
42

            
43
690
    pub fn lock(&self) -> Result<MutexGuard<'_, DiskState<FileManager::File>>> {
44
690
        Ok(self.disk_state.lock()?)
45
690
    }
46
}
47

            
48
#[derive(Debug)]
49
pub struct DiskState<File>
50
where
51
    File: file_manager::File,
52
{
53
    pub needs_directory_sync: bool,
54
    pub directory: File,
55
    pub index: Duplicated<IndexHeader>,
56
    pub index_writer: File,
57
    pub basins: BasinMap<BasinState<File>>,
58
}
59

            
60
impl<File> DiskState<File>
61
where
62
    File: file_manager::File,
63
{
64
336
    pub fn recover(path: &Path, manager: &File::Manager) -> Result<Self> {
65
336
        let path = PathId::from(path);
66
336
        if !manager.exists(&path) {
67
32
            manager.create_dir_all(&path)?;
68
304
        }
69

            
70
336
        let directory = manager.open(&path, OpenOptions::new().read(true))?;
71

            
72
336
        let index_path = PathId::from(path.join("index"));
73
336

            
74
336
        let mut scratch = Vec::new();
75
336
        let mut discovered_strata = discover_strata(&path, manager, &mut scratch)?;
76

            
77
335
        if index_path.exists() {
78
300
            let mut index_writer =
79
300
                manager.open(&index_path, OpenOptions::new().read(true).write(true))?;
80

            
81
300
            let file_header =
82
300
                FileHeader::<IndexHeader>::read_from(&mut index_writer, &mut scratch)?;
83

            
84
300
            let (mut first_is_active, mut active, older) = match file_header {
85
296
                FileHeader::Both(first, second) => {
86
296
                    if first.transaction_id > second.transaction_id {
87
149
                        (true, first, Some(second))
88
                    } else {
89
147
                        (false, second, Some(first))
90
                    }
91
                }
92
3
                FileHeader::First(first) => (true, first, None),
93
1
                FileHeader::Second(second) => (false, second, None),
94
            };
95

            
96
300
            let mut strata_to_clean = None;
97
300
            let commit_log = match (BasinMap::verify(&active, &mut discovered_strata), older) {
98
291
                (Ok(commit_log), _) => commit_log,
99
8
                (Err(_), Some(older)) => {
100
8
                    let commit_log = BasinMap::verify(&older, &mut discovered_strata)?;
101

            
102
7
                    active = older;
103
7
                    first_is_active = !first_is_active;
104
7

            
105
7
                    let mut invalid_strata = Vec::new();
106
13
                    for (id, stratum) in &discovered_strata {
107
6
                        if stratum.should_exist(&active)
108
3
                            && stratum.needs_cleanup(commit_log.as_ref())
109
3
                        {
110
3
                            invalid_strata.push(*id);
111
3
                        }
112
                    }
113
7
                    strata_to_clean = Some(invalid_strata);
114
7
                    commit_log
115
                }
116
1
                (Err(err), None) => return Err(err),
117
            };
118

            
119
297
            let mut basins =
120
298
                BasinMap::load_from(&active, commit_log.as_ref(), discovered_strata, &path)?;
121

            
122
297
            let mut index = Duplicated {
123
297
                active,
124
297
                first_is_active,
125
297
            };
126

            
127
297
            if let Some(strata_to_clean) = strata_to_clean {
128
10
                for id in strata_to_clean {
129
3
                    let basin = basins[id.basin()].as_mut().expect("just loaded");
130
3
                    let stratum = &mut basin.stratum[id.stratum().as_usize()];
131
3
                    stratum
132
3
                        .header
133
3
                        .write_to(stratum.file.as_mut().expect("just loaded"))?;
134
                }
135

            
136
7
                index.write_to(&mut index_writer)?;
137
7
                index_writer.sync_data()?;
138
290
            }
139

            
140
297
            Ok(Self {
141
297
                needs_directory_sync: false,
142
297
                directory,
143
297
                index,
144
297
                index_writer,
145
297
                basins,
146
297
            })
147
        } else {
148
35
            let mut index_writer = manager.open(
149
35
                &index_path,
150
35
                OpenOptions::new().read(true).write(true).create(true),
151
35
            )?;
152

            
153
35
            let mut empty_header = IndexHeader::default();
154
35
            empty_header.write_to(&mut index_writer)?;
155
35
            empty_header.write_to(&mut index_writer)?;
156

            
157
            // Ensure the file is fully persisted to disk.
158
35
            index_writer.sync_all()?;
159
35
            directory.sync_all()?;
160

            
161
35
            if discovered_strata.is_empty() {
162
34
                Ok(Self {
163
34
                    needs_directory_sync: false,
164
34
                    directory,
165
34
                    index: Duplicated {
166
34
                        active: empty_header,
167
34
                        first_is_active: false,
168
34
                    },
169
34
                    index_writer,
170
34
                    basins: BasinMap::new(),
171
34
                })
172
            } else {
173
1
                Err(Error::verification_failed("existing strata found without an index file. If this is intentional, clean the directory being used for the database."))
174
            }
175
        }
176
336
    }
177

            
178
359
    pub fn write_header(
179
359
        &mut self,
180
359
        transaction_id: TransactionId,
181
359
        sync: &FSyncBatch<File::Manager>,
182
359
    ) -> Result<()> {
183
359
        self.index.active.transaction_id = transaction_id;
184
392
        for (basin, count) in (&self.basins)
185
359
            .into_iter()
186
359
            .zip(&mut self.index.active.basin_strata_count)
187
392
        {
188
392
            *count = basin.1.stratum.len() as u64;
189
392
        }
190
359
        self.index.write_to(&mut self.index_writer)?;
191

            
192
359
        sync.queue_fsync_data(self.index_writer.try_clone()?)?;
193

            
194
359
        Ok(())
195
359
    }
196
}
197

            
198
44
#[derive(Debug, Default)]
199
pub struct Duplicated<T> {
200
    pub active: T,
201
    pub first_is_active: bool,
202
}
203

            
204
impl<T> Duplicated<T>
205
where
206
    T: Duplicable,
207
{
208
771
    pub fn write_to<File>(&mut self, file: &mut File) -> Result<()>
209
771
    where
210
771
        File: file_manager::File,
211
771
    {
212
771
        let offset = if self.first_is_active { T::BYTES } else { 0 };
213

            
214
771
        file.seek(io::SeekFrom::Start(offset))?;
215
771
        self.active.write_to(file)?;
216
771
        self.first_is_active = !self.first_is_active;
217
771

            
218
771
        Ok(())
219
771
    }
220
}
221

            
222
#[derive(Debug)]
223
pub struct BasinState<File>
224
where
225
    File: file_manager::File,
226
{
227
    pub id: BasinId,
228
    pub stratum: Vec<StratumState<File>>,
229
}
230

            
231
impl<File> BasinState<File>
232
where
233
    File: file_manager::File,
234
{
235
329
    pub fn default_for(id: BasinId) -> Self {
236
329
        Self {
237
329
            id,
238
329
            stratum: Vec::new(),
239
329
        }
240
329
    }
241

            
242
28330
    pub fn get_or_allocate_stratum(
243
28330
        &mut self,
244
28330
        id: StratumId,
245
28330
        directory: &Path,
246
28330
    ) -> &mut StratumState<File> {
247
28374
        while id.as_usize() >= self.stratum.len() {
248
44
            let new_id =
249
44
                StratumId::new(u64::try_from(self.stratum.len()).expect("too large of a database"))
250
44
                    .expect("invalid id");
251
44
            self.stratum.push(StratumState::default_for(PathId::from(
252
44
                directory.join(format!("{}{}", self.id, new_id)),
253
44
            )))
254
        }
255

            
256
28330
        &mut self.stratum[id.as_usize()]
257
28330
    }
258
}
259

            
260
336
fn discover_strata<File>(
261
336
    path: &PathId,
262
336
    manager: &File::Manager,
263
336
    scratch: &mut Vec<u8>,
264
336
) -> Result<BTreeMap<BasinAndStratum, UnverifiedStratum<File>>>
265
336
where
266
336
    File: file_manager::File,
267
336
{
268
336
    let mut discovered = BTreeMap::new();
269

            
270
1217
    for entry in manager.list(path)? {
271
1212
        if let Some(name) = entry.file_name().and_then(OsStr::to_str) {
272
1212
            if let Ok(basin_and_stratum) = BasinAndStratum::from_str(name) {
273
                discovered.insert(
274
300
                    basin_and_stratum,
275
300
                    UnverifiedStratum::read_from(entry, manager, basin_and_stratum, scratch)?,
276
                );
277
912
            }
278
        }
279
    }
280

            
281
335
    Ok(discovered)
282
336
}
283

            
284
#[derive(Debug)]
285
pub struct StratumState<File>
286
where
287
    File: file_manager::File,
288
{
289
    pub path: PathId,
290
    pub header: Duplicated<StratumHeader>,
291
    pub file: Option<File>,
292
}
293

            
294
impl<File> StratumState<File>
295
where
296
    File: file_manager::File,
297
{
298
44
    fn default_for(path: PathId) -> Self {
299
44
        Self {
300
44
            path,
301
44
            header: Duplicated::default(),
302
44
            file: None,
303
44
        }
304
44
    }
305

            
306
27928
    pub fn get_or_open_file(
307
27928
        &mut self,
308
27928
        manager: &File::Manager,
309
27928
        needs_directory_sync: &mut bool,
310
27928
    ) -> Result<&mut File> {
311
27928
        if self.file.is_none() {
312
            // If this file doesn't exist, we need to do a directory sync to
313
            // ensure the file is persisted.
314
44
            *needs_directory_sync |= !self.path.exists();
315

            
316
44
            let file = manager.open(
317
44
                &self.path,
318
44
                OpenOptions::new().read(true).write(true).create(true),
319
44
            )?;
320

            
321
44
            self.file = Some(file);
322
27884
        }
323

            
324
27928
        Ok(self.file.as_mut().expect("file always allocated above"))
325
27928
    }
326

            
327
402
    pub fn write_header(
328
402
        &mut self,
329
402
        new_transaction_id: TransactionId,
330
402
        sync_batch: &FSyncBatch<File::Manager>,
331
402
    ) -> io::Result<()> {
332
402
        let file = self
333
402
            .file
334
402
            .as_mut()
335
402
            .expect("shouldn't ever write a file header if no data was written");
336
402

            
337
402
        self.header.active.transaction_id = new_transaction_id;
338
402
        self.header.write_to(file)?;
339

            
340
402
        let file_to_sync = file.try_clone()?;
341
402
        sync_batch.queue_fsync_data(file_to_sync)?;
342

            
343
402
        Ok(())
344
402
    }
345
}
346

            
347
pub struct UnverifiedStratum<File> {
348
    pub path: PathId,
349
    pub id: BasinAndStratum,
350
    pub header: FileHeader<StratumHeader>,
351
    pub file: File,
352
}
353

            
354
impl<File> UnverifiedStratum<File>
355
where
356
    File: file_manager::File,
357
{
358
300
    pub fn read_from(
359
300
        path: PathId,
360
300
        manager: &File::Manager,
361
300
        id: BasinAndStratum,
362
300
        scratch: &mut Vec<u8>,
363
300
    ) -> Result<Self> {
364
300
        let mut file = manager.open(&path, OpenOptions::new().read(true).write(true))?;
365
300
        let header = FileHeader::read_from(&mut file, scratch)?;
366
299
        Ok(Self {
367
299
            path,
368
299
            id,
369
299
            header,
370
299
            file,
371
299
        })
372
300
    }
373

            
374
294
    pub fn validate(&self, commit_log: &CommitLogEntry) -> Result<()> {
375
294
        let (first, second) = self.validate_headers(Some(commit_log));
376
294

            
377
294
        if first.is_some() || second.is_some() {
378
292
            Ok(())
379
        } else {
380
2
            Err(Error::verification_failed("neither header is valid"))
381
        }
382
294
    }
383

            
384
596
    pub fn should_exist(&self, index: &IndexHeader) -> bool {
385
596
        self.id.stratum().as_u64() < index.basin_strata_count[usize::from(self.id.basin().index())]
386
596
    }
387

            
388
3
    pub fn needs_cleanup(&self, commit_log: Option<&CommitLogEntry>) -> bool {
389
3
        !matches!(self.validate_headers(commit_log), (Some(_), Some(_)))
390
3
    }
391

            
392
589
    fn validate_headers(
393
589
        &self,
394
589
        commit_log: Option<&CommitLogEntry>,
395
589
    ) -> (Option<&StratumHeader>, Option<&StratumHeader>) {
396
1170
        fn is_valid(
397
1170
            header: &StratumHeader,
398
1170
            commit_transaction: TransactionId,
399
1170
            commit_log: Option<&CommitLogEntry>,
400
1170
        ) -> bool {
401
1170
            header.transaction_id < commit_transaction
402
596
                || (header.transaction_id == commit_transaction
403
589
                    && commit_log
404
581
                        .map_or(true, |commit_log| header.reflects_changes_from(commit_log)))
405
1170
        }
406
589
        let commit_transaction = commit_log.map_or_else(TransactionId::default, |commit_log| {
407
589
            commit_log.transaction_id
408
589
        });
409
589
        let (first, second) = self.header.as_options();
410
589
        let first = first
411
589
            .and_then(|first| is_valid(first, commit_transaction, commit_log).then_some(first));
412
589
        let second = second
413
589
            .and_then(|second| is_valid(second, commit_transaction, commit_log).then_some(second));
414
589

            
415
589
        (first, second)
416
589
    }
417
}
418

            
419
impl<File> BasinMap<BasinState<File>>
420
where
421
    File: file_manager::File,
422
{
423
308
    pub fn verify(
424
308
        index: &IndexHeader,
425
308
        discovered_strata: &mut BTreeMap<BasinAndStratum, UnverifiedStratum<File>>,
426
308
    ) -> Result<Option<CommitLogEntry>> {
427
308
        let mut scratch = Vec::new();
428
308
        if let Some(commit_log_head) = index.commit_log_head {
429
302
            if let Some(stratum) = discovered_strata.get_mut(&commit_log_head.basin_and_stratum()) {
430
298
                let mut reader = BufReader::new(&mut stratum.file);
431
298
                verify_read_grain(
432
298
                    commit_log_head,
433
298
                    &mut reader,
434
298
                    index.transaction_id,
435
298
                    None,
436
298
                    &mut scratch,
437
298
                )?;
438
298
                let commit_log_entry = CommitLogEntry::read_from(&scratch[..])?;
439
586
                for new_grain in &commit_log_entry.new_grains {
440
292
                    verify_read_grain(
441
292
                        new_grain.id,
442
292
                        &mut reader,
443
292
                        index.transaction_id,
444
292
                        Some(new_grain.crc32),
445
292
                        &mut scratch,
446
292
                    )?;
447
                }
448

            
449
294
                for stratum in discovered_strata.values() {
450
294
                    if stratum.should_exist(index) {
451
294
                        stratum.validate(&commit_log_entry)?;
452
                    }
453
                }
454
292
                return Ok(Some(commit_log_entry));
455
            } else {
456
4
                return Err(Error::verification_failed("commit log stratum not found"));
457
            }
458
6
        }
459
6

            
460
6
        Ok(None)
461
308
    }
462

            
463
298
    pub fn load_from(
464
298
        index: &IndexHeader,
465
298
        commit_log: Option<&CommitLogEntry>,
466
298
        discovered_strata: BTreeMap<BasinAndStratum, UnverifiedStratum<File>>,
467
298
        directory: &PathId,
468
298
    ) -> Result<Self> {
469
298
        let mut basins = Self::new();
470
298
        for stratum in discovered_strata.into_values() {
471
296
            if !stratum.should_exist(index) {
472
4
                std::fs::remove_file(directory.join(stratum.id.to_string()))?;
473
4
                continue;
474
292
            }
475

            
476
292
            let header = match stratum.validate_headers(commit_log) {
477
281
                (Some(first), Some(second)) => {
478
281
                    if first.transaction_id >= second.transaction_id {
479
142
                        Duplicated {
480
142
                            active: stratum.header.into_first(),
481
142
                            first_is_active: true,
482
142
                        }
483
                    } else {
484
139
                        Duplicated {
485
139
                            active: stratum.header.into_second(),
486
139
                            first_is_active: false,
487
139
                        }
488
                    }
489
                }
490
8
                (Some(_), _) => Duplicated {
491
8
                    active: stratum.header.into_first(),
492
8
                    first_is_active: true,
493
8
                },
494
3
                (_, Some(_)) => Duplicated {
495
3
                    active: stratum.header.into_second(),
496
3
                    first_is_active: false,
497
3
                },
498
                (None, None) => {
499
                    unreachable!("error is handled in validation phase")
500
                }
501
            };
502

            
503
292
            let basin = basins.get_or_insert_with(stratum.id.basin(), || {
504
292
                BasinState::default_for(stratum.id.basin())
505
292
            });
506
292
            if stratum.id.stratum().as_usize() != basin.stratum.len() {
507
1
                return Err(Error::verification_failed("strata are non-sequential"));
508
291
            }
509
291

            
510
291
            basin.stratum.push(StratumState {
511
291
                path: stratum.path,
512
291
                header,
513
291
                file: Some(stratum.file),
514
291
            });
515
        }
516
297
        Ok(basins)
517
298
    }
518
}
519

            
520
fn verify_read_grain<File>(
521
    grain: GrainId,
522
    file: &mut BufReader<&mut File>,
523
    transaction_id: TransactionId,
524
    expected_crc: Option<u32>,
525
    buffer: &mut Vec<u8>,
526
) -> Result<()>
527
where
528
    File: file_manager::File,
529
{
530
590
    file.seek(io::SeekFrom::Start(grain.file_position()))?;
531
590
    let mut eight_bytes = [0; 8];
532
590
    file.read_exact(&mut eight_bytes)?;
533
590
    let grain_transaction_id = TransactionId::from_be_bytes(eight_bytes);
534
590
    if grain_transaction_id != transaction_id {
535
2
        return Err(Error::verification_failed(
536
2
            "new grain was written in a different transaction",
537
2
        ));
538
588
    }
539
588

            
540
588
    let mut four_bytes = [0; 4];
541
588
    file.read_exact(&mut four_bytes)?;
542
588
    let length = u32::from_be_bytes(four_bytes);
543

            
544
588
    let length = u32_to_usize(length)?;
545
588
    buffer.resize(length, 0);
546
588
    file.read_exact(buffer)?;
547

            
548
588
    let computed_crc = crc32c(buffer);
549
588

            
550
588
    file.read_exact(&mut four_bytes)?;
551
588
    let stored_crc = u32::from_be_bytes(four_bytes);
552
588

            
553
588
    if computed_crc == stored_crc && expected_crc.map_or(true, |expected| expected == stored_crc) {
554
586
        Ok(())
555
    } else {
556
2
        Err(Error::ChecksumFailed)
557
    }
558
590
}