1
#![doc = include_str!(".crate-docs.md")]
2
#![forbid(unsafe_code)]
3
#![warn(
4
    clippy::cargo,
5
    missing_docs,
6
    clippy::pedantic,
7
    future_incompatible,
8
    rust_2018_idioms
9
)]
10
#![allow(
11
    clippy::option_if_let_else,
12
    clippy::module_name_repetitions,
13
    clippy::missing_errors_doc
14
)]
15

            
16
use std::{
17
    collections::{HashMap, VecDeque},
18
    ffi::OsStr,
19
    io::{self, BufReader, ErrorKind, Read, Seek, SeekFrom},
20
    path::Path,
21
    sync::{Arc, Weak},
22
    thread::JoinHandle,
23
    time::Instant,
24
};
25

            
26
use file_manager::{fs::StdFileManager, FileManager, OpenOptions, PathId};
27
use parking_lot::{Condvar, Mutex, MutexGuard};
28

            
29
pub use crate::{
30
    config::Configuration,
31
    entry::{ChunkRecord, EntryId, EntryWriter, LogPosition},
32
    log_file::{Entry, EntryChunk, ReadChunkResult, RecoveredSegment, SegmentReader},
33
    manager::{LogManager, LogVoid, Recovery},
34
};
35
use crate::{log_file::LogFile, to_io_result::ToIoResult};
36
pub use file_manager;
37

            
38
mod buffered;
39
mod config;
40
mod entry;
41
mod log_file;
42
mod manager;
43
mod to_io_result;
44

            
45
/// A [Write-Ahead Log][wal] that provides atomic and durable writes.
46
///
47
/// This type implements [`Clone`] and can be passed between multiple threads
48
/// safely.
49
///
50
/// When [`WriteAheadLog::begin_entry()`] is called, an exclusive lock to the
51
/// active log segment is acquired. The lock is released when the entry is
52
/// [rolled back](entry::EntryWriter::rollback) or once the entry is in queue
53
/// for synchronization during
54
/// [`EntryWriter::commit()`](entry::EntryWriter::commit).
55
///
56
/// Multiple threads may be queued for synchronization at any given time. This
57
/// allows good multi-threaded performance in spite of only using a single
58
/// active file.
59
///
60
/// [wal]: https://en.wikipedia.org/wiki/Write-ahead_logging
61
134
#[derive(Debug, Clone)]
62
pub struct WriteAheadLog<M = StdFileManager>
63
where
64
    M: FileManager,
65
{
66
    data: Arc<Data<M>>,
67
}
68

            
69
#[derive(Debug)]
70
struct Data<M>
71
where
72
    M: FileManager,
73
{
74
    files: Mutex<Files<M::File>>,
75
    manager: Mutex<Box<dyn LogManager<M>>>,
76
    active_sync: Condvar,
77
    dirfsync_sync: Condvar,
78
    config: Configuration<M>,
79
    checkpoint_sender: flume::Sender<CheckpointCommand<M::File>>,
80
    checkpoint_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
81
    readers: Mutex<HashMap<u64, usize>>,
82
    readers_sync: Condvar,
83
}
84

            
85
impl WriteAheadLog<StdFileManager> {
86
    /// Creates or opens a log in `directory` using the provided log manager to
87
    /// drive recovery and checkpointing.
88
4
    pub fn recover<AsRefPath: AsRef<Path>, Manager: LogManager>(
89
4
        directory: AsRefPath,
90
4
        manager: Manager,
91
4
    ) -> io::Result<Self> {
92
4
        Configuration::default_for(directory).open(manager)
93
4
    }
94
}
95
impl<M> WriteAheadLog<M>
96
where
97
    M: FileManager,
98
{
99
238
    fn open<Manager: LogManager<M>>(
100
238
        config: Configuration<M>,
101
238
        mut manager: Manager,
102
238
    ) -> io::Result<Self> {
103
238
        if !config.file_manager.exists(&config.directory) {
104
1
            config.file_manager.create_dir_all(&config.directory)?;
105
237
        }
106

            
107
        // Find all files that match this pattern: "wal-[0-9]+(-cp)?"
108
238
        let mut discovered_files = Vec::new();
109
429
        for file in config.file_manager.list(&config.directory)? {
110
408
            if let Some(file_name) = file.file_name().and_then(OsStr::to_str) {
111
408
                let mut parts = file_name.split('-');
112
408
                let prefix = parts.next();
113
408
                let entry_id = parts.next().and_then(|ts| ts.parse::<u64>().ok());
114
408
                let suffix = parts.next();
115
408
                match (prefix, entry_id, suffix) {
116
408
                    (Some(prefix), Some(entry_id), suffix) if prefix == "wal" => {
117
408
                        let has_checkpointed = suffix == Some("cp");
118
408
                        discovered_files.push((entry_id, file, has_checkpointed));
119
408
                    }
120
                    _ => {}
121
                }
122
            }
123
        }
124

            
125
        // Sort by the timestamp
126
238
        discovered_files.sort_by(|a, b| a.0.cmp(&b.0));
127
238

            
128
238
        let mut files = Files::default();
129
238
        let mut files_to_checkpoint = Vec::new();
130
646
        for (entry_id, path, has_checkpointed) in discovered_files {
131
            // We can safely assume that the entry id prior to the one that
132
            // labels the file have been used.
133
408
            files.last_entry_id = EntryId(entry_id - 1);
134
408
            if has_checkpointed {
135
200
                let file = LogFile::write(entry_id, path, 0, None, &config)?;
136
200
                files.all.insert(entry_id, file.clone());
137
200
                files.inactive.push_back(file);
138
            } else {
139
208
                let mut reader = SegmentReader::new(&path, entry_id, &config.file_manager)?;
140
208
                match manager.should_recover_segment(&reader.header)? {
141
                    Recovery::Recover => {
142
315
                        while let Some(mut entry) = reader.read_entry()? {
143
107
                            manager.recover(&mut entry)?;
144
107
                            while let Some(chunk) = match entry.read_chunk()? {
145
                                ReadChunkResult::Chunk(chunk) => Some(chunk),
146
107
                                ReadChunkResult::EndOfEntry | ReadChunkResult::AbortedEntry => None,
147
                            } {
148
                                chunk.skip_remaining_bytes()?;
149
                            }
150
107
                            files.last_entry_id = entry.id();
151
                        }
152

            
153
208
                        let file = LogFile::write(
154
208
                            entry_id,
155
208
                            path,
156
208
                            reader.valid_until,
157
208
                            reader.last_entry_id,
158
208
                            &config,
159
208
                        )?;
160
208
                        files.all.insert(entry_id, file.clone());
161
208
                        files_to_checkpoint.push(file);
162
                    }
163
                    Recovery::Abandon => {
164
                        let file = LogFile::write(entry_id, path, 0, None, &config)?;
165
                        files.all.insert(entry_id, file.clone());
166
                        files.inactive.push_back(file);
167
                    }
168
                }
169
            }
170
        }
171

            
172
        // If we recovered a file that wasn't checkpointed, activate it.
173
238
        if let Some(latest_file) = files_to_checkpoint.pop() {
174
208
            files.active = Some(latest_file);
175
208
        } else {
176
30
            files.activate_new_file(&config)?;
177
        }
178

            
179
238
        let (checkpoint_sender, checkpoint_receiver) = flume::unbounded();
180
238
        let wal = Self {
181
238
            data: Arc::new(Data {
182
238
                files: Mutex::new(files),
183
238
                manager: Mutex::new(Box::new(manager)),
184
238
                active_sync: Condvar::new(),
185
238
                dirfsync_sync: Condvar::new(),
186
238
                config,
187
238
                checkpoint_sender,
188
238
                checkpoint_thread: Mutex::new(None),
189
238
                readers: Mutex::default(),
190
238
                readers_sync: Condvar::new(),
191
238
            }),
192
238
        };
193

            
194
238
        for file_to_checkpoint in files_to_checkpoint {
195
            wal.data
196
                .checkpoint_sender
197
                .send(CheckpointCommand::Checkpoint(file_to_checkpoint))
198
                .to_io()?;
199
        }
200

            
201
238
        let weak_wal = Arc::downgrade(&wal.data);
202
238
        let mut checkpoint_thread = wal.data.checkpoint_thread.lock();
203
238
        *checkpoint_thread = Some(
204
238
            std::thread::Builder::new()
205
238
                .name(String::from("okaywal-cp"))
206
238
                .spawn(move || Self::checkpoint_thread(&weak_wal, &checkpoint_receiver))
207
238
                .expect("failed to spawn checkpointer thread"),
208
238
        );
209
238
        drop(checkpoint_thread);
210
238

            
211
238
        Ok(wal)
212
238
    }
213

            
214
    /// Begins writing an entry to this log.
215
    ///
216
    /// A new unique entry id will be allocated for the entry being written. If
217
    /// the entry is rolled back, the entry id will not be reused. The entry id
218
    /// can be retrieved through [`EntryWriter::id()`](entry::EntryWriter::id).
219
    ///
220
    /// This call will acquire an exclusive lock to the active file or block
221
    /// until it can be acquired.
222
29749
    pub fn begin_entry(&self) -> io::Result<EntryWriter<'_, M>> {
223
29749
        let mut files = self.data.files.lock();
224
29749
        let file = loop {
225
52087
            if let Some(file) = files.active.take() {
226
29749
                break file;
227
22338
            }
228
22338

            
229
22338
            // Wait for a free file
230
22338
            self.data.active_sync.wait(&mut files);
231
        };
232
29749
        files.last_entry_id.0 += 1;
233
29749
        let entry_id = files.last_entry_id;
234
29749
        drop(files);
235
29749

            
236
29749
        EntryWriter::new(self, entry_id, file)
237
29749
    }
238

            
239
    /// Checkpoint the currently active file immediately, regardless of its size.
240
    ///
241
    /// A new logfile is immediately made active for subsequent writes, and the currently
242
    /// active logfile is sent to the checkpoint thread for immediate background
243
    /// checkpointing.
244
    ///
245
    /// This call will acquire an exclusive lock to the active file or block
246
    /// until it can be acquired.
247
2
    pub fn checkpoint_active(&self) -> io::Result<()> {
248
2
        let mut files = self.data.files.lock();
249
2
        let file = loop {
250
2
            if let Some(file) = files.active.take() {
251
2
                break file;
252
            }
253

            
254
            // Wait for a free file
255
            self.data.active_sync.wait(&mut files);
256
        };
257
2
        drop(files);
258
2
        self.checkpoint_file_and_activate_new(file)?;
259

            
260
2
        Ok(())
261
2
    }
262

            
263
    fn reclaim(&self, file: LogFile<M::File>, result: WriteResult) -> io::Result<()> {
264
29749
        if let WriteResult::Entry { new_length } = result {
265
29748
            let last_directory_sync = if self.data.config.checkpoint_after_bytes <= new_length {
266
2469
                self.checkpoint_file_and_activate_new(file.clone())?
267
            } else {
268
                // We wrote an entry, but the file isn't ready to be
269
                // checkpointed. Return the file to allow more writes to happen
270
                // in the meantime.
271
27279
                let mut files = self.data.files.lock();
272
27279
                files.active = Some(file.clone());
273
27279
                let last_directory_sync = files.directory_synced_at;
274
27279
                drop(files);
275
27279
                self.data.active_sync.notify_one();
276
27279
                last_directory_sync
277
            };
278

            
279
            // Before reporting success we need to synchronize the data to
280
            // disk. To enable as much throughput as possible, we only want
281
            // one thread at a time to synchronize this file.
282
29748
            file.synchronize(new_length)?;
283

            
284
            // If this file was activated during this process, we need to
285
            // sync the directory to ensure the file's metadata is
286
            // up-to-date.
287
29748
            if let Some(created_at) = file.created_at() {
288
                // We want to avoid acquiring the lock again if we don't
289
                // need to. Verify that the file was created after the last
290
                // directory sync.
291
29646
                if last_directory_sync.is_none() || last_directory_sync.unwrap() < created_at {
292
524
                    let files = self.data.files.lock();
293
524
                    drop(self.sync_directory(files, created_at)?);
294
29122
                }
295
102
            }
296
1
        } else {
297
1
            // Nothing happened, return the file back to be written.
298
1
            let mut files = self.data.files.lock();
299
1
            files.active = Some(file);
300
1
            drop(files);
301
1
            self.data.active_sync.notify_one();
302
1
        }
303

            
304
29749
        Ok(())
305
29749
    }
306

            
307
2471
    fn checkpoint_file_and_activate_new(
308
2471
        &self,
309
2471
        file: LogFile<M::File>,
310
2471
    ) -> io::Result<Option<Instant>> {
311
2471
        // Checkpoint this file. This first means activating a new file.
312
2471
        let mut files = self.data.files.lock();
313
2471
        files.activate_new_file(&self.data.config)?;
314
2471
        let last_directory_sync = files.directory_synced_at;
315
2471
        drop(files);
316
2471
        self.data.active_sync.notify_one();
317
2471

            
318
2471
        // Now, send the file to the checkpointer.
319
2471
        self.data
320
2471
            .checkpoint_sender
321
2471
            .send(CheckpointCommand::Checkpoint(file))
322
2471
            .to_io()?;
323

            
324
2471
        Ok(last_directory_sync)
325
2471
    }
326

            
327
238
    fn checkpoint_thread(
328
238
        data: &Weak<Data<M>>,
329
238
        checkpoint_receiver: &flume::Receiver<CheckpointCommand<M::File>>,
330
238
    ) -> io::Result<()> {
331
2486
        while let Ok(CheckpointCommand::Checkpoint(file_to_checkpoint)) = checkpoint_receiver.recv()
332
        {
333
2249
            let wal = if let Some(data) = data.upgrade() {
334
2248
                WriteAheadLog { data }
335
            } else {
336
1
                break;
337
            };
338

            
339
2248
            let mut writer = file_to_checkpoint.lock();
340
2248
            let file_id = writer.id();
341
2625
            while !writer.is_synchronized() {
342
377
                let synchronize_target = writer.position();
343
377
                writer = file_to_checkpoint.synchronize_locked(writer, synchronize_target)?;
344
            }
345
2248
            if let Some(entry_id) = writer.last_entry_id() {
346
2248
                let mut reader =
347
2248
                    SegmentReader::new(writer.path(), file_id, &wal.data.config.file_manager)?;
348
2248
                drop(writer);
349
2248
                let mut manager = wal.data.manager.lock();
350
2248
                manager.checkpoint_to(entry_id, &mut reader, &wal)?;
351
2248
                writer = file_to_checkpoint.lock();
352
            }
353

            
354
            // Rename the file to denote that it's been checkpointed.
355
2248
            let new_name = format!(
356
2248
                "{}-cp",
357
2248
                writer
358
2248
                    .path()
359
2248
                    .file_name()
360
2248
                    .expect("missing name")
361
2248
                    .to_str()
362
2248
                    .expect("should be ascii")
363
2248
            );
364
2248
            writer.rename(&new_name)?;
365
2248
            drop(writer);
366
2248

            
367
2248
            // Now, read attempts will fail, but any current readers are still
368
2248
            // able to read the data. Before we truncate the file, we need to
369
2248
            // wait for all existing readers to close.
370
2248
            let mut readers = wal.data.readers.lock();
371
2248
            while readers.get(&file_id).copied().unwrap_or(0) > 0 {
372
                wal.data.readers_sync.wait(&mut readers);
373
            }
374
2248
            readers.remove(&file_id);
375
2248
            drop(readers);
376
2248

            
377
2248
            // Now that there are no readers, we can safely prepare the file for
378
2248
            // reuse.
379
2248
            let mut writer = file_to_checkpoint.lock();
380
2248
            writer.revert_to(0)?;
381
2248
            drop(writer);
382
2248

            
383
2248
            let sync_target = Instant::now();
384
2248

            
385
2248
            let files = wal.data.files.lock();
386
2248
            let mut files = wal.sync_directory(files, sync_target)?;
387
2248
            files.inactive.push_back(file_to_checkpoint);
388
        }
389

            
390
238
        Ok(())
391
238
    }
392

            
393
2772
    fn sync_directory<'a>(
394
2772
        &'a self,
395
2772
        mut files: MutexGuard<'a, Files<M::File>>,
396
2772
        sync_on_or_after: Instant,
397
2772
    ) -> io::Result<MutexGuard<'a, Files<M::File>>> {
398
2801
        loop {
399
2801
            if files.directory_synced_at.is_some()
400
2560
                && files.directory_synced_at.unwrap() >= sync_on_or_after
401
            {
402
398
                break;
403
2403
            } else if files.directory_is_syncing {
404
29
                self.data.dirfsync_sync.wait(&mut files);
405
29
            } else {
406
2374
                files.directory_is_syncing = true;
407
2374
                drop(files);
408
2374
                let synced_at = Instant::now();
409
2374
                self.data
410
2374
                    .config
411
2374
                    .file_manager
412
2374
                    .sync_all(&self.data.config.directory)?;
413

            
414
2374
                files = self.data.files.lock();
415
2374
                files.directory_is_syncing = false;
416
2374
                files.directory_synced_at = Some(synced_at);
417
2374
                self.data.dirfsync_sync.notify_all();
418
2374
                break;
419
            }
420
        }
421

            
422
2772
        Ok(files)
423
2772
    }
424

            
425
    /// Opens the log to read previously written data.
426
    ///
427
    /// # Errors
428
    ///
429
    /// May error if:
430
    ///
431
    /// - The file cannot be read.
432
    /// - The position refers to data that has been checkpointed.
433
3
    pub fn read_at(&self, position: LogPosition) -> io::Result<ChunkReader<'_, M>> {
434
3
        // Before opening the file to read, we need to check that this position
435
3
        // has been written to disk fully.
436
3
        let files = self.data.files.lock();
437
3
        let log_file = files
438
3
            .all
439
3
            .get(&position.file_id)
440
3
            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "invalid log position"))?
441
3
            .clone();
442
3
        drop(files);
443
3

            
444
3
        log_file.synchronize(position.offset)?;
445

            
446
3
        let mut file = self.data.config.file_manager.open(
447
3
            &PathId::from(
448
3
                self.data
449
3
                    .config
450
3
                    .directory
451
3
                    .join(format!("wal-{}", position.file_id)),
452
3
            ),
453
3
            OpenOptions::new().read(true),
454
3
        )?;
455
3
        file.seek(SeekFrom::Start(position.offset))?;
456
3
        let mut reader = BufReader::new(file);
457
3
        let mut header_bytes = [0; 5];
458
3
        reader.read_exact(&mut header_bytes)?;
459
3
        let length = u32::from_le_bytes(header_bytes[1..5].try_into().expect("u32 is 4 bytes"));
460
3

            
461
3
        let mut readers = self.data.readers.lock();
462
3
        let file_readers = readers.entry(position.file_id).or_default();
463
3
        *file_readers += 1;
464
3
        drop(readers);
465
3

            
466
3
        Ok(ChunkReader {
467
3
            wal: self,
468
3
            file_id: position.file_id,
469
3
            reader,
470
3
            stored_crc32: None,
471
3
            length,
472
3
            bytes_remaining: length,
473
3
            read_crc32: 0,
474
3
        })
475
3
    }
476

            
477
    /// Waits for all other instances of [`WriteAheadLog`] to be dropped and for
478
    /// the checkpointing thread to complete.
479
    ///
480
    /// This call will not interrupt any writers, and will block indefinitely if
481
    /// another instance of this [`WriteAheadLog`] exists and is not eventually
482
    /// dropped. This was the safest to implement, and because a WAL is
483
    /// primarily used in front of another storage layer, it follows that the
484
    /// shutdown logic of both layers should be synchronized.
485
205
    pub fn shutdown(self) -> io::Result<()> {
486
205
        let mut checkpoint_thread = self.data.checkpoint_thread.lock();
487
205
        let join_handle = checkpoint_thread.take().expect("shutdown already invoked");
488
205
        drop(checkpoint_thread);
489
205

            
490
205
        self.data
491
205
            .checkpoint_sender
492
205
            .send(CheckpointCommand::Shutdown)
493
205
            .map_err(|_| io::Error::from(ErrorKind::BrokenPipe))?;
494

            
495
        // Wait for the checkpoint thread to terminate.
496
205
        join_handle
497
205
            .join()
498
205
            .map_err(|_| io::Error::from(ErrorKind::BrokenPipe))?
499
205
    }
500
}
501

            
502
enum CheckpointCommand<F>
503
where
504
    F: file_manager::File,
505
{
506
    Checkpoint(LogFile<F>),
507
    Shutdown,
508
}
509

            
510
#[derive(Debug)]
511
struct Files<F>
512
where
513
    F: file_manager::File,
514
{
515
    active: Option<LogFile<F>>,
516
    inactive: VecDeque<LogFile<F>>,
517
    last_entry_id: EntryId,
518
    directory_synced_at: Option<Instant>,
519
    directory_is_syncing: bool,
520
    all: HashMap<u64, LogFile<F>>,
521
}
522

            
523
impl<F> Files<F>
524
where
525
    F: file_manager::File,
526
{
527
2501
    pub fn activate_new_file(&mut self, config: &Configuration<F::Manager>) -> io::Result<()> {
528
2501
        let next_id = self.last_entry_id.0 + 1;
529
2501
        let file_name = format!("wal-{next_id}");
530
2501
        let file = if let Some(file) = self.inactive.pop_front() {
531
2189
            let old_id = file.lock().id();
532
2189
            file.rename(next_id, &file_name)?;
533
2189
            let all_entry = self.all.remove(&old_id).expect("missing all entry");
534
2189
            self.all.insert(next_id, all_entry);
535
2189
            file
536
        } else {
537
312
            let file = LogFile::write(
538
312
                next_id,
539
312
                config.directory.join(file_name).into(),
540
312
                0,
541
312
                None,
542
312
                config,
543
312
            )?;
544
312
            self.all.insert(next_id, file.clone());
545
312
            file
546
        };
547

            
548
2501
        self.active = Some(file);
549
2501

            
550
2501
        Ok(())
551
2501
    }
552
}
553

            
554
impl<F> Default for Files<F>
555
where
556
    F: file_manager::File,
557
{
558
238
    fn default() -> Self {
559
238
        Self {
560
238
            active: None,
561
238
            inactive: VecDeque::new(),
562
238
            last_entry_id: EntryId::default(),
563
238
            directory_synced_at: None,
564
238
            directory_is_syncing: false,
565
238
            all: HashMap::new(),
566
238
        }
567
238
    }
568
}
569

            
570
#[derive(Clone, Copy)]
571
enum WriteResult {
572
    RolledBack,
573
    Entry { new_length: u64 },
574
}
575

            
576
/// A buffered reader for a previously written data chunk.
577
///
578
/// This reader will stop returning bytes after reading all bytes previously
579
/// written.
580
#[derive(Debug)]
581
pub struct ChunkReader<'a, M>
582
where
583
    M: FileManager,
584
{
585
    wal: &'a WriteAheadLog<M>,
586
    file_id: u64,
587
    reader: BufReader<M::File>,
588
    bytes_remaining: u32,
589
    length: u32,
590
    stored_crc32: Option<u32>,
591
    read_crc32: u32,
592
}
593

            
594
impl<'a, M> ChunkReader<'a, M>
595
where
596
    M: FileManager,
597
{
598
    /// Returns the length of the data stored.
599
    ///
600
    /// This value will not change as data is read.
601
    #[must_use]
602
    pub const fn chunk_length(&self) -> u32 {
603
        self.length
604
    }
605

            
606
    /// Returns the number of bytes remaining to read.
607
    ///
608
    /// This value will be updated as read calls return data.
609
    #[must_use]
610
    pub const fn bytes_remaining(&self) -> u32 {
611
        self.bytes_remaining
612
    }
613

            
614
    /// Returns true if the stored checksum matches the computed checksum during
615
    /// read.
616
    ///
617
    /// This function will only return `Ok()` if the chunk has been fully read.
618
2
    pub fn crc_is_valid(&mut self) -> io::Result<bool> {
619
2
        if self.bytes_remaining == 0 {
620
2
            if self.stored_crc32.is_none() {
621
2
                let mut stored_crc32 = [0; 4];
622
2
                // Bypass our internal read, otherwise our CRC would include the
623
2
                // CRC read itself.
624
2
                self.reader.read_exact(&mut stored_crc32)?;
625
2
                self.stored_crc32 = Some(u32::from_le_bytes(stored_crc32));
626
            }
627

            
628
2
            Ok(self.stored_crc32.expect("already initialized") == self.read_crc32)
629
        } else {
630
            Err(io::Error::new(
631
                io::ErrorKind::Other,
632
                "crc cannot be checked before reading all chunk bytes",
633
            ))
634
        }
635
2
    }
636
}
637

            
638
impl<'a, M> Read for ChunkReader<'a, M>
639
where
640
    M: FileManager,
641
{
642
5
    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
643
5
        let bytes_remaining =
644
5
            usize::try_from(self.bytes_remaining).expect("too large for platform");
645
5
        if buf.len() > bytes_remaining {
646
4
            buf = &mut buf[..bytes_remaining];
647
5
        }
648
5
        let bytes_read = self.reader.read(buf)?;
649
5
        self.read_crc32 = crc32c::crc32c_append(self.read_crc32, &buf[..bytes_read]);
650
5
        self.bytes_remaining -= u32::try_from(bytes_read).expect("can't be larger than buf.len()");
651
5
        Ok(bytes_read)
652
5
    }
653
}
654

            
655
impl<'a, M> Drop for ChunkReader<'a, M>
656
where
657
    M: FileManager,
658
{
659
3
    fn drop(&mut self) {
660
3
        let mut readers = self.wal.data.readers.lock();
661
3
        let file_readers = readers
662
3
            .get_mut(&self.file_id)
663
3
            .expect("reader entry not present");
664
3
        *file_readers -= 1;
665
3
        drop(readers);
666
3
        self.wal.data.readers_sync.notify_one();
667
3
    }
668
}
669

            
670
#[cfg(test)]
671
mod tests;