1
use std::fs::{self, OpenOptions};
2
use std::io::{Read, Seek, Write};
3
use std::path::Path;
4

            
5
use okaywal::file_manager::{self};
6

            
7
use crate::config::Config;
8
use crate::format::{
9
    BasinAndStratum, Duplicable, IndexHeader, StratumHeader, StratumId, TransactionId,
10
};
11
use crate::{Database, Error};
12

            
13
2
fn basic<FileManager>(config: Config<FileManager>)
14
2
where
15
2
    FileManager: file_manager::FileManager,
16
2
{
17
2
    if config.wal.file_manager.exists(&config.wal.directory) {
18
1
        config
19
1
            .wal
20
1
            .file_manager
21
1
            .remove_dir_all(&config.wal.directory)
22
1
            .unwrap();
23
1
    }
24

            
25
2
    let db = config.clone().recover().unwrap();
26
2
    assert!(db.embedded_header().unwrap().is_none());
27
2
    let mut tx = db.begin_transaction().unwrap();
28
2
    let grain = tx.write(b"hello, world").unwrap();
29
2
    println!("Wrote {grain:?}");
30
2
    tx.set_embedded_header(Some(grain)).unwrap();
31
2
    assert!(db.read(grain).unwrap().is_none());
32
2
    let tx_id = tx.commit().unwrap();
33
2
    assert_eq!(db.embedded_header().unwrap(), Some(grain));
34

            
35
4
    let verify = |db: &Database<FileManager>| {
36
4
        let mut reader = db.read(grain).unwrap().expect("grain not found");
37
4
        assert_eq!(reader.length(), 12);
38
4
        assert_eq!(reader.bytes_remaining(), reader.length());
39
4
        let mut read_contents = [0; 12];
40
4
        reader.read_exact(&mut read_contents[..6]).unwrap();
41
4
        assert_eq!(reader.bytes_remaining(), 6);
42
4
        reader.read_exact(&mut read_contents[6..]).unwrap();
43
4
        assert_eq!(reader.bytes_remaining(), 0);
44
4
        assert_eq!(&read_contents[..], b"hello, world");
45

            
46
4
        assert_eq!(reader.read(&mut read_contents).unwrap(), 0);
47

            
48
4
        let commit = db.commit_log_head().unwrap().expect("commit log missing");
49
4
        assert_eq!(commit.transaction_id, tx_id);
50
4
        assert_eq!(commit.new_grains.len(), 1);
51
4
        assert_eq!(commit.new_grains[0].id, grain);
52
4
        assert_eq!(commit.embedded_header_data, Some(grain));
53
4
        assert!(commit.freed_grains.is_empty());
54
4
        assert!(commit.archived_grains.is_empty());
55
4
        assert!(commit.next_entry(db).unwrap().is_none());
56
4
    };
57

            
58
2
    verify(&db);
59
2

            
60
2
    // Close the database and reopen it. Since this has a default WAL
61
2
    // configuration, this transaction will be recovered from the WAL, unlike a
62
2
    // lot of the other unit tests.
63
2
    db.shutdown().unwrap();
64
2
    let db = config.clone().recover().unwrap();
65
2

            
66
2
    verify(&db);
67
2
    db.shutdown().unwrap();
68
2

            
69
2
    config
70
2
        .wal
71
2
        .file_manager
72
2
        .remove_dir_all(&config.wal.directory)
73
2
        .unwrap();
74
2
}
75

            
76
1
#[test]
77
1
fn basic_std() {
78
1
    basic(Config::for_directory("test"));
79
1
}
80

            
81
1
#[test]
82
1
fn basic_memory() {
83
1
    basic(Config::in_memory());
84
1
}
85

            
86
1
#[test]
87
1
fn wal_checkpoint() {
88
1
    let path = Path::new(".test-checkpoint");
89
1
    if path.exists() {
90
        fs::remove_dir_all(path).unwrap();
91
1
    }
92

            
93
    // Configure the WAL to checkpoint after 10 bytes -- "hello, world" is 12.
94
1
    let db = Config::for_directory(path)
95
1
        .configure_wal(|wal| wal.checkpoint_after_bytes(10))
96
1
        .recover()
97
1
        .unwrap();
98
1
    let mut tx = db.begin_transaction().unwrap();
99
1
    let grain = tx.write(b"hello, world").unwrap();
100
1
    assert!(db.read(grain).unwrap().is_none());
101
1
    tx.commit().unwrap();
102
1
    db.shutdown().unwrap();
103
1

            
104
1
    let db = Config::for_directory(path)
105
1
        .configure_wal(|wal| wal.checkpoint_after_bytes(10))
106
1
        .recover()
107
1
        .unwrap();
108
1
    let contents = db
109
1
        .read(grain)
110
1
        .unwrap()
111
1
        .expect("grain not found")
112
1
        .read_all_data()
113
1
        .unwrap();
114
1
    assert_eq!(contents, b"hello, world");
115

            
116
1
    db.shutdown().unwrap();
117
1

            
118
1
    fs::remove_dir_all(path).unwrap();
119
1
}
120

            
121
1
#[test]
122
1
fn wal_checkpoint_loop() {
123
1
    let path = Path::new(".test-checkpoint-loop");
124
1
    if path.exists() {
125
        fs::remove_dir_all(path).unwrap();
126
1
    }
127

            
128
    // Configure the WAL to checkpoint after 10 bytes -- "hello, world" is 12.
129
1
    let mut grains_written = Vec::new();
130
11
    for i in 0_usize..10 {
131
10
        println!("{i}");
132
10
        let db = Config::for_directory(path)
133
10
            .configure_wal(|wal| wal.checkpoint_after_bytes(10))
134
10
            .recover()
135
10
            .unwrap();
136
10
        let mut tx = db.begin_transaction().unwrap();
137
10
        let grain = dbg!(tx.write(&i.to_be_bytes()).unwrap());
138
10
        assert!(db.read(grain).unwrap().is_none());
139
10
        grains_written.push(grain);
140
10
        tx.commit().unwrap();
141

            
142
55
        for (index, grain) in grains_written.iter().enumerate() {
143
55
            dbg!(grain);
144
55
            let contents = db
145
55
                .read(*grain)
146
55
                .unwrap()
147
55
                .expect("grain not found")
148
55
                .read_all_data()
149
55
                .unwrap();
150
55
            assert_eq!(contents, &index.to_be_bytes());
151
        }
152

            
153
10
        db.shutdown().unwrap();
154
    }
155

            
156
1
    let db = Config::for_directory(path)
157
1
        .configure_wal(|wal| wal.checkpoint_after_bytes(10))
158
1
        .recover()
159
1
        .unwrap();
160
10
    for (index, grain) in grains_written.iter().enumerate() {
161
10
        let contents = db
162
10
            .read(*grain)
163
10
            .unwrap()
164
10
            .expect("grain not found")
165
10
            .read_all_data()
166
10
            .unwrap();
167
10
        assert_eq!(contents, &index.to_be_bytes());
168
    }
169

            
170
    // Verify the commit log is correct. The commit log head will contain the
171
    // addition of the most recent grain, and we should be able to iterate
172
    // backwards and find each grain in each entry.
173
1
    let mut grains_to_read = grains_written.iter().rev();
174
1
    let mut current_commit_log_entry = db.commit_log_head().unwrap();
175
11
    while let Some(commit_log_entry) = current_commit_log_entry {
176
10
        let expected_grain = grains_to_read.next().expect("too many commit log entries");
177
10
        assert_eq!(&commit_log_entry.new_grains[0].id, expected_grain);
178
10
        current_commit_log_entry = commit_log_entry.next_entry(&db).unwrap();
179
    }
180

            
181
1
    db.shutdown().unwrap();
182
1

            
183
1
    fs::remove_dir_all(path).unwrap();
184
1
}
185

            
186
1
#[test]
187
1
fn sediment_checkpoint_loop() {
188
1
    let path = Path::new(".test-sediment-checkpoint-loop");
189
1
    if path.exists() {
190
        fs::remove_dir_all(path).unwrap();
191
1
    }
192

            
193
    // Configure the WAL to checkpoint after 10 bytes -- "hello, world" is 12.
194
1
    let mut grains_written = Vec::new();
195
1
    let mut headers_written = Vec::new();
196
1
    let mut tx_id = TransactionId::default();
197
11
    for i in 0_usize..10 {
198
10
        let db = Config::for_directory(path)
199
10
            .configure_wal(|wal| wal.checkpoint_after_bytes(10))
200
10
            .recover()
201
10
            .unwrap();
202
10
        let mut tx = db.begin_transaction().unwrap();
203
10
        let new_grain = tx.write(&i.to_be_bytes()).unwrap();
204
10
        if let Some(last_grain) = grains_written.last() {
205
9
            tx.archive(*last_grain).unwrap();
206
9
        }
207
10
        grains_written.push(new_grain);
208
10
        // The old headers are automatically archived.
209
10
        let new_header = tx.write(&i.to_be_bytes()).unwrap();
210
10
        tx.set_embedded_header(Some(new_header)).unwrap();
211
10
        headers_written.push(new_header);
212
10

            
213
10
        tx.checkpoint_to(tx_id).unwrap();
214
10
        tx_id = tx.commit().unwrap();
215
10

            
216
10
        db.shutdown().unwrap();
217
    }
218

            
219
1
    let db = Config::for_directory(path)
220
1
        .configure_wal(|wal| wal.checkpoint_after_bytes(10))
221
1
        .recover()
222
1
        .unwrap();
223
1

            
224
1
    // Because we close and reopen the database so often, we may not actually
225
1
    // have finished the sediment checkpoint yet. This thread sleep gives it
226
1
    // time to complete if it was run upon recovery.
227
1
    std::thread::sleep(std::time::Duration::from_millis(100));
228

            
229
    // Because we archived all grains except the last one, we should only be able to read the last grain
230
10
    for (index, (grain, header)) in grains_written.iter().zip(&headers_written).enumerate() {
231
10
        let result = db.read(*grain).unwrap();
232
10
        let header_result = db.read(*header).unwrap();
233
10
        if index >= grains_written.len() - 2 {
234
2
            let contents = result.expect("grain not found").read_all_data().unwrap();
235
2
            assert_eq!(contents, &index.to_be_bytes());
236
2
            let contents = header_result
237
2
                .expect("grain not found")
238
2
                .read_all_data()
239
2
                .unwrap();
240
2
            assert_eq!(contents, &index.to_be_bytes());
241
8
        } else if let Some(grain) = result.or(header_result) {
242
            // Because grain IDs can be reused, we may have "lucked" out and
243
            // stumbled upon another written grain. If we get an error reading
244
            // the data or the contents aren't what we expect, this is a passed
245
            // check.
246
4
            if let Ok(contents) = grain.read_all_data() {
247
4
                assert_ne!(contents, &index.to_be_bytes());
248
            }
249
4
        } else {
250
4
            // None means the grain couldn't be read.
251
4
        }
252
    }
253

            
254
1
    db.shutdown().unwrap();
255
1

            
256
1
    fs::remove_dir_all(path).unwrap();
257
1
}
258

            
259
1
#[test]
260
1
fn rollback() {
261
1
    let path = Path::new("rollback");
262
1
    if path.exists() {
263
        fs::remove_dir_all(path).unwrap();
264
1
    }
265

            
266
1
    let db = Database::recover(path).unwrap();
267
1
    let mut tx = db.begin_transaction().unwrap();
268
1
    let grain = tx.write(b"hello, world").unwrap();
269
1
    println!("Wrote {grain:?}");
270
1
    tx.set_embedded_header(Some(grain)).unwrap();
271
1
    assert!(db.read(grain).unwrap().is_none());
272
1
    drop(tx);
273
1

            
274
1
    // Ensure we still didn't get it published.
275
1
    assert!(db.read(grain).unwrap().is_none());
276

            
277
    // Trying again, we should get the same grain id back.
278
1
    let mut tx = db.begin_transaction().unwrap();
279
1
    assert_eq!(tx.write(b"hello, world").unwrap(), grain);
280
1
    tx.rollback().unwrap();
281
1

            
282
1
    db.shutdown().unwrap();
283
1

            
284
1
    fs::remove_dir_all(path).unwrap();
285
1
}
286

            
287
248
#[derive(Clone)]
288
enum WriteCommand<'a> {
289
    Write {
290
        target: Target,
291
        offset: u64,
292
        bytes: &'a [u8],
293
    },
294
    RemoveStratum(Option<StratumId>),
295
    RemoveIndex,
296
    DoNothing,
297
}
298

            
299
#[derive(Clone)]
300
enum Target {
301
    Grain,
302
    Stratum,
303
    Index,
304
}
305

            
306
1
#[test]
307
1
fn last_write_rollback() {
308
1
    #[track_caller]
309
21
    fn test_write_after(commands: &[WriteCommand], expect_error: bool) {
310
21
        let path = Path::new("last-write");
311
21
        if path.exists() {
312
            fs::remove_dir_all(path).unwrap();
313
21
        }
314
1

            
315
21
        let mut written_grains = Vec::new();
316
21
        let mut rolled_back_grains = Vec::new();
317
21
        let mut commands = commands.iter();
318
21
        let mut index = 0_usize;
319
301
        loop {
320
301
            let db = Config::for_directory(path)
321
301
                .configure_wal(|wal| wal.checkpoint_after_bytes(10))
322
301
                .recover();
323
296
            let db = match db {
324
296
                Ok(db) => db,
325
5
                Err(_) if commands.len() == 0 && expect_error => break,
326
1
                Err(err) => unreachable!("error when not expected: {err}"),
327
1
            };
328
1

            
329
31192
            for (grain_id, expected_data) in &written_grains {
330
30896
                assert_eq!(
331
30896
                    &db.read(*grain_id)
332
30896
                        .unwrap()
333
30896
                        .expect("grain missing")
334
30896
                        .read_all_data()
335
30896
                        .unwrap(),
336
30896
                    expected_data
337
30896
                )
338
1
            }
339
1

            
340
311
            for (grain_id, expected_data) in &rolled_back_grains {
341
15
                if let Some(reader) = db.read(*grain_id).unwrap() {
342
1
                    // The grain id can be reused, but the contents shouldn't
343
1
                    // match. Note that this rollback required forcibly changing
344
1
                    // bits after the transaction was written. In a normal crash
345
1
                    // or power outage scenario, the grain id wouldn't have been
346
1
                    // returned until the data is fully synced to disk.
347
1
                    assert_ne!(&reader.read_all_data().unwrap(), expected_data);
348
15
                }
349
1
            }
350
1

            
351
296
            let mut tx = db.begin_transaction().unwrap();
352
296
            let data = index
353
296
                .to_be_bytes()
354
296
                .into_iter()
355
296
                .cycle()
356
296
                .take(2000)
357
296
                .collect::<Vec<_>>();
358
296
            index += 1;
359
296
            let grain_id = dbg!(tx.write(&data).unwrap());
360
296
            assert_eq!(grain_id.grain_count(), 63);
361
296
            tx.commit().unwrap();
362
296

            
363
296
            db.shutdown().unwrap();
364
296

            
365
296
            match commands.next() {
366
1
                Some(WriteCommand::Write {
367
18
                    target,
368
18
                    offset,
369
18
                    bytes,
370
1
                }) => {
371
18
                    let mut file = match target {
372
12
                        Target::Grain | Target::Stratum => OpenOptions::new()
373
12
                            .read(true)
374
12
                            .write(true)
375
12
                            .open(path.join(grain_id.basin_and_stratum().to_string()))
376
12
                            .unwrap(),
377
6
                        Target::Index => OpenOptions::new()
378
6
                            .read(true)
379
6
                            .write(true)
380
6
                            .open(path.join("index"))
381
6
                            .unwrap(),
382
1
                    };
383
18
                    let position = match target {
384
4
                        Target::Grain => grain_id.file_position() + *offset,
385
14
                        Target::Stratum | Target::Index => *offset,
386
1
                    };
387
18
                    file.seek(std::io::SeekFrom::Start(position)).unwrap();
388
18
                    file.write_all(bytes).unwrap();
389
18
                    rolled_back_grains.push((grain_id, data));
390
1
                }
391
1
                Some(WriteCommand::RemoveStratum(Some(stratum))) => {
392
1
                    let id = BasinAndStratum::from_parts(grain_id.basin_id(), *stratum);
393
1
                    std::fs::remove_file(path.join(id.to_string())).unwrap();
394
1
                }
395
1
                Some(WriteCommand::RemoveStratum(None)) => {
396
1
                    std::fs::remove_file(path.join(grain_id.basin_and_stratum().to_string()))
397
1
                        .unwrap();
398
1
                }
399
1
                Some(WriteCommand::RemoveIndex) => {
400
1
                    std::fs::remove_file(path.join("index")).unwrap();
401
1
                }
402
259
                Some(WriteCommand::DoNothing) => written_grains.push((grain_id, data)),
403
1
                None if expect_error => unreachable!("expected error but no error was encountered"),
404
16
                None => break,
405
1
            }
406
1
        }
407
1

            
408
21
        fs::remove_dir_all(path).unwrap();
409
21
    }
410
1

            
411
1
    // Test removing the stratum after it's been created. This simulates a file
412
1
    // being written but the directory metadata not being synchronized, causing
413
1
    // the file's record to be entirely lost.
414
1
    test_write_after(&[WriteCommand::RemoveStratum(None)], false);
415
1
    // Test overwriting the headers with 0 -- an edge case where the file record
416
1
    // was synced but the headers weren't.
417
1
    test_write_after(
418
1
        &[WriteCommand::Write {
419
1
            target: Target::Stratum,
420
1
            offset: 0,
421
1
            bytes: &[1; 16_384],
422
1
        }],
423
1
        false,
424
1
    );
425
1
    test_write_after(
426
1
        &[
427
1
            WriteCommand::DoNothing,
428
1
            WriteCommand::Write {
429
1
                target: Target::Stratum,
430
1
                offset: StratumHeader::BYTES,
431
1
                bytes: &[1; 16_384],
432
1
            },
433
1
        ],
434
1
        false,
435
1
    );
436
1
    test_write_after(
437
1
        &[
438
1
            WriteCommand::DoNothing,
439
1
            WriteCommand::DoNothing,
440
1
            WriteCommand::Write {
441
1
                target: Target::Stratum,
442
1
                offset: 0,
443
1
                bytes: &[1; 16_384 * 2],
444
1
            },
445
1
        ],
446
1
        true,
447
1
    );
448
1
    // Test overwriting the header with a valid but incorrect header. This
449
1
    // shouldn't ever happen in practice, because recovery is supposed to
450
1
    // overwrite the bad headers.
451
1
    let mut valid_header = StratumHeader::default();
452
1
    let mut valid_header_bytes = Vec::new();
453
1
    valid_header.transaction_id = TransactionId::from(1);
454
1
    valid_header.write_to(&mut valid_header_bytes).unwrap();
455
1
    test_write_after(
456
1
        &[WriteCommand::Write {
457
1
            target: Target::Stratum,
458
1
            offset: 0,
459
1
            bytes: &valid_header_bytes,
460
1
        }],
461
1
        false,
462
1
    );
463
1
    valid_header.transaction_id = TransactionId::from(2);
464
1
    valid_header_bytes.clear();
465
1
    valid_header.write_to(&mut valid_header_bytes).unwrap();
466
1
    test_write_after(
467
1
        &[
468
1
            WriteCommand::DoNothing,
469
1
            WriteCommand::Write {
470
1
                target: Target::Stratum,
471
1
                offset: StratumHeader::BYTES,
472
1
                bytes: &valid_header_bytes,
473
1
            },
474
1
        ],
475
1
        false,
476
1
    );
477
1
    // Test overwriting both headers with crc-valid, but not actually valid,
478
1
    // headers.
479
1
    valid_header.write_to(&mut valid_header_bytes).unwrap();
480
1
    test_write_after(
481
1
        &[
482
1
            WriteCommand::DoNothing,
483
1
            WriteCommand::Write {
484
1
                target: Target::Stratum,
485
1
                offset: 0,
486
1
                bytes: &valid_header_bytes,
487
1
            },
488
1
        ],
489
1
        true,
490
1
    );
491
1

            
492
1
    // Test overwriting a grain's transaction ID in both the first and second
493
1
    // headers.
494
1
    test_write_after(
495
1
        &[WriteCommand::Write {
496
1
            target: Target::Grain,
497
1
            offset: 0,
498
1
            bytes: &[0xFF],
499
1
        }],
500
1
        false,
501
1
    );
502
1

            
503
1
    test_write_after(
504
1
        &[
505
1
            WriteCommand::DoNothing,
506
1
            WriteCommand::Write {
507
1
                target: Target::Grain,
508
1
                offset: 0,
509
1
                bytes: &[0xFF],
510
1
            },
511
1
        ],
512
1
        false,
513
1
    );
514
1

            
515
1
    // Test mutating the grain data, causing its CRC to fail to validate.
516
1
    test_write_after(
517
1
        &[WriteCommand::Write {
518
1
            target: Target::Grain,
519
1
            offset: 13,
520
1
            bytes: &[0xFF],
521
1
        }],
522
1
        false,
523
1
    );
524
1

            
525
1
    test_write_after(
526
1
        &[
527
1
            WriteCommand::DoNothing,
528
1
            WriteCommand::Write {
529
1
                target: Target::Grain,
530
1
                offset: 13,
531
1
                bytes: &[0xFF],
532
1
            },
533
1
        ],
534
1
        false,
535
1
    );
536
1

            
537
1
    // Test overwriting the stratum header.
538
1
    test_write_after(
539
1
        &[WriteCommand::Write {
540
1
            target: Target::Stratum,
541
1
            offset: 0,
542
1
            bytes: &[0xFF],
543
1
        }],
544
1
        false,
545
1
    );
546
1

            
547
1
    test_write_after(
548
1
        &[
549
1
            WriteCommand::DoNothing,
550
1
            WriteCommand::Write {
551
1
                target: Target::Stratum,
552
1
                offset: StratumHeader::BYTES,
553
1
                bytes: &[0xFF],
554
1
            },
555
1
        ],
556
1
        false,
557
1
    );
558
1

            
559
1
    // Test mucking with the index file
560
1
    test_write_after(
561
1
        &[WriteCommand::Write {
562
1
            target: Target::Index,
563
1
            offset: 0,
564
1
            bytes: &[0xFF],
565
1
        }],
566
1
        false,
567
1
    );
568
1

            
569
1
    test_write_after(
570
1
        &[
571
1
            WriteCommand::DoNothing,
572
1
            WriteCommand::Write {
573
1
                target: Target::Index,
574
1
                offset: IndexHeader::BYTES,
575
1
                bytes: &[0xFF],
576
1
            },
577
1
        ],
578
1
        false,
579
1
    );
580
1

            
581
1
    test_write_after(
582
1
        &[
583
1
            WriteCommand::DoNothing,
584
1
            WriteCommand::Write {
585
1
                target: Target::Index,
586
1
                offset: IndexHeader::BYTES,
587
1
                bytes: &[0xFF],
588
1
            },
589
1
        ],
590
1
        false,
591
1
    );
592
1

            
593
1
    // Test writing a "valid" index header, but with broken data.
594
1
    let mut index_header = IndexHeader {
595
1
        // Point the commit log to an invalid id.
596
1
        transaction_id: TransactionId::from(1),
597
1
        commit_log_head: "71fffffffffe-fffff".parse().ok(),
598
1
        ..IndexHeader::default()
599
1
    };
600
1
    let mut index_header_bytes = Vec::new();
601
1
    index_header.write_to(&mut index_header_bytes).unwrap();
602
1
    test_write_after(
603
1
        &[WriteCommand::Write {
604
1
            target: Target::Index,
605
1
            offset: 0,
606
1
            bytes: &index_header_bytes,
607
1
        }],
608
1
        false,
609
1
    );
610
1
    index_header.transaction_id = TransactionId::from(2);
611
1
    index_header_bytes.clear();
612
1
    index_header.write_to(&mut index_header_bytes).unwrap();
613
1
    test_write_after(
614
1
        &[
615
1
            WriteCommand::DoNothing,
616
1
            WriteCommand::Write {
617
1
                target: Target::Index,
618
1
                offset: IndexHeader::BYTES,
619
1
                bytes: &index_header_bytes,
620
1
            },
621
1
        ],
622
1
        false,
623
1
    );
624
1

            
625
1
    // Test removing the index file. This should generate an error because the
626
1
    // existing strata can be found.
627
1
    test_write_after(&[WriteCommand::RemoveIndex], true);
628
1

            
629
1
    // Test writing a valid index header, then overwriting part of the second header,
630
1
    // causing one header to fail to validate while the other can't parse due to
631
1
    // a crc error. This should never happen in real life.
632
1
    index_header.transaction_id = TransactionId::from(1);
633
1
    index_header_bytes.clear();
634
1
    index_header.write_to(&mut index_header_bytes).unwrap();
635
1
    // Overwrite part of the transaction id of the second header, causing its crc to fail.
636
1
    index_header_bytes.push(1);
637
1
    test_write_after(
638
1
        &[WriteCommand::Write {
639
1
            target: Target::Index,
640
1
            offset: 0,
641
1
            bytes: &index_header_bytes,
642
1
        }],
643
1
        true,
644
1
    );
645
1

            
646
1
    // Write enough data to need two stratum, then remove the first to receiven
647
1
    // error. There are 3 grains required at the current allocation strategy for
648
1
    // a commit log entry that describes 1 new grain. The test function writes
649
1
    // each grain such that it takes up 63 consecutive grains.
650
1
    let mut commands = vec![WriteCommand::DoNothing; 16_372 / (3 + 63) + 1];
651
1
    *commands.last_mut().unwrap() = WriteCommand::RemoveStratum(Some(StratumId::new(0).unwrap()));
652
1
    test_write_after(&commands, true);
653
1
}
654

            
655
1
#[test]
656
1
fn invalid_checkpointing() {
657
1
    let path = Path::new("invalid-checkpointing");
658
1
    if path.exists() {
659
        fs::remove_dir_all(path).unwrap();
660
1
    }
661

            
662
1
    let db = Database::recover(path).unwrap();
663
1
    let mut tx = db.begin_transaction().unwrap();
664
1
    assert!(matches!(
665
1
        tx.checkpoint_to(TransactionId::from(1)).unwrap_err(),
666
        Error::InvalidTransactionId
667
    ));
668
1
    assert!(matches!(
669
1
        tx.checkpointed_to(TransactionId::from(1)).unwrap_err(),
670
        Error::InvalidTransactionId
671
    ));
672
1
    drop(tx);
673
1

            
674
1
    db.shutdown().unwrap();
675
1
    fs::remove_dir_all(path).unwrap();
676
1
}