1
use std::io::{self, Read, Write};
2

            
3
use crc32c::crc32c_append;
4
use file_manager::FileManager;
5
use parking_lot::MutexGuard;
6

            
7
use crate::{
8
    log_file::{LogFile, LogFileWriter},
9
    to_io_result::ToIoResult,
10
    WriteAheadLog, WriteResult,
11
};
12

            
13
/// A writer for an entry in a [`WriteAheadLog`].
14
///
15
/// Only one writer can be active for a given [`WriteAheadLog`] at any given
16
/// time. See [`WriteAheadLog::begin_entry()`] for more information.
17
#[derive(Debug)]
18
pub struct EntryWriter<'a, M>
19
where
20
    M: FileManager,
21
{
22
    id: EntryId,
23
    log: &'a WriteAheadLog<M>,
24
    file: Option<LogFile<M::File>>,
25
    original_length: u64,
26
}
27

            
28
pub const NEW_ENTRY: u8 = 1;
29
pub const CHUNK: u8 = 2;
30
pub const END_OF_ENTRY: u8 = 3;
31

            
32
impl<'a, M> EntryWriter<'a, M>
33
where
34
    M: FileManager,
35
{
36
29749
    pub(super) fn new(
37
29749
        log: &'a WriteAheadLog<M>,
38
29749
        id: EntryId,
39
29749
        file: LogFile<M::File>,
40
29749
    ) -> io::Result<Self> {
41
29749
        let mut writer = file.lock();
42
29749
        let original_length = writer.position();
43
29749

            
44
29749
        writer.write_all(&[NEW_ENTRY])?;
45
29749
        writer.write_all(&id.0.to_le_bytes())?;
46
29749
        drop(writer);
47
29749

            
48
29749
        Ok(Self {
49
29749
            id,
50
29749
            log,
51
29749
            file: Some(file),
52
29749
            original_length,
53
29749
        })
54
29749
    }
55

            
56
    /// Returns the unique id of the log entry being written.
57
    #[must_use]
58
91
    pub const fn id(&self) -> EntryId {
59
91
        self.id
60
91
    }
61

            
62
    /// Commits this entry to the log. Once this call returns, all data is
63
    /// atomically updated and synchronized to disk.
64
    ///
65
    /// While the entry is being committed, other writers will be allowed to
66
    /// write to the log. See [`WriteAheadLog::begin_entry()`] for more
67
    /// information.
68
29746
    pub fn commit(self) -> io::Result<EntryId> {
69
29746
        self.commit_and(|_file| Ok(()))
70
29746
    }
71

            
72
29748
    pub(crate) fn commit_and<F: FnOnce(&mut LogFileWriter<M::File>) -> io::Result<()>>(
73
29748
        mut self,
74
29748
        callback: F,
75
29748
    ) -> io::Result<EntryId> {
76
29748
        let file = self.file.take().expect("already committed");
77
29748

            
78
29748
        let mut writer = file.lock();
79
29748

            
80
29748
        writer.write_all(&[END_OF_ENTRY])?;
81
29748
        let new_length = writer.position();
82
29748
        callback(&mut writer)?;
83
29748
        writer.set_last_entry_id(Some(self.id));
84
29748
        drop(writer);
85
29748

            
86
29748
        self.log.reclaim(file, WriteResult::Entry { new_length })?;
87

            
88
29748
        Ok(self.id)
89
29748
    }
90

            
91
    /// Abandons this entry, preventing the entry from being recovered in the
92
    /// future. This is automatically done when dropped, but errors that occur
93
    /// during drop will panic.
94
    pub fn rollback(mut self) -> io::Result<()> {
95
        self.rollback_session()
96
    }
97

            
98
1
    fn rollback_session(&mut self) -> io::Result<()> {
99
1
        let file = self.file.take().expect("file already dropped");
100
1

            
101
1
        let mut writer = file.lock();
102
1
        writer.revert_to(self.original_length)?;
103
1
        drop(writer);
104
1

            
105
1
        self.log.reclaim(file, WriteResult::RolledBack).unwrap();
106
1

            
107
1
        Ok(())
108
1
    }
109

            
110
    /// Appends a chunk of data to this log entry. Each chunk of data is able to
111
    /// be read using [`Entry::read_chunk`](crate::Entry).
112
30075
    pub fn write_chunk(&mut self, data: &[u8]) -> io::Result<ChunkRecord> {
113
30075
        let mut writer = self.begin_chunk(u32::try_from(data.len()).to_io()?)?;
114
30075
        writer.write_all(data)?;
115
30075
        writer.finish()
116
30075
    }
117

            
118
    /// Begins writing a chunk with the given `length`.
119
    ///
120
    /// The writer returned already contains an internal buffer. This function
121
    /// can be used to write a complex payload without needing to first
122
    /// combine it in another buffer.
123
30075
    pub fn begin_chunk(&mut self, length: u32) -> io::Result<ChunkWriter<'_, M::File>> {
124
30075
        let mut file = self.file.as_ref().expect("already dropped").lock();
125
30075

            
126
30075
        let position = LogPosition {
127
30075
            file_id: file.id(),
128
30075
            offset: file.position(),
129
30075
        };
130
30075

            
131
30075
        file.write_all(&[CHUNK])?;
132
30075
        file.write_all(&length.to_le_bytes())?;
133

            
134
30075
        Ok(ChunkWriter {
135
30075
            file,
136
30075
            position,
137
30075
            length,
138
30075
            bytes_remaining: length,
139
30075
            crc32: 0,
140
30075
            finished: false,
141
30075
        })
142
30075
    }
143
}
144

            
145
impl<'a, M> Drop for EntryWriter<'a, M>
146
where
147
    M: FileManager,
148
{
149
29749
    fn drop(&mut self) {
150
29749
        if self.file.is_some() {
151
1
            self.rollback_session().unwrap();
152
29748
        }
153
29749
    }
154
}
155

            
156
pub struct ChunkWriter<'a, F>
157
where
158
    F: file_manager::File,
159
{
160
    file: MutexGuard<'a, LogFileWriter<F>>,
161
    position: LogPosition,
162
    length: u32,
163
    bytes_remaining: u32,
164
    crc32: u32,
165
    finished: bool,
166
}
167

            
168
impl<'a, F> ChunkWriter<'a, F>
169
where
170
    F: file_manager::File,
171
{
172
    pub fn finish(mut self) -> io::Result<ChunkRecord> {
173
30075
        self.write_tail()?;
174
30075
        Ok(ChunkRecord {
175
30075
            position: self.position,
176
30075
            crc: self.crc32,
177
30075
            length: self.length,
178
30075
        })
179
30075
    }
180

            
181
30075
    fn write_tail(&mut self) -> io::Result<()> {
182
30075
        self.finished = true;
183
30075

            
184
30075
        if self.bytes_remaining != 0 {
185
            return Err(io::Error::new(
186
                io::ErrorKind::Other,
187
                "written length does not match expected length",
188
            ));
189
30075
        }
190
30075

            
191
30075
        self.file.write_all(&self.crc32.to_le_bytes())
192
30075
    }
193
}
194

            
195
impl<'a, F> Drop for ChunkWriter<'a, F>
196
where
197
    F: file_manager::File,
198
{
199
30075
    fn drop(&mut self) {
200
30075
        if !self.finished {
201
            self.write_tail()
202
                .expect("chunk writer dropped without finishing");
203
30075
        }
204
30075
    }
205
}
206

            
207
impl<'a, F> Write for ChunkWriter<'a, F>
208
where
209
    F: file_manager::File,
210
{
211
30641
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
212
30641
        let bytes_to_write = buf
213
30641
            .len()
214
30641
            .min(usize::try_from(self.bytes_remaining).to_io()?);
215

            
216
30641
        let bytes_written = self.file.write(&buf[..bytes_to_write])?;
217
30641
        if bytes_written > 0 {
218
30641
            self.bytes_remaining -= u32::try_from(bytes_written).to_io()?;
219
30641
            self.crc32 = crc32c_append(self.crc32, &buf[..bytes_written]);
220
        }
221
30641
        Ok(bytes_written)
222
30641
    }
223

            
224
    fn flush(&mut self) -> io::Result<()> {
225
        Ok(())
226
    }
227
}
228

            
229
/// The position of a chunk of data within a [`WriteAheadLog`].
230
204
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
231
pub struct LogPosition {
232
    pub(crate) file_id: u64,
233
    pub(crate) offset: u64,
234
}
235

            
236
impl LogPosition {
237
    /// The number of bytes required to serialize a `LogPosition` using
238
    /// [`LogPosition::serialize_to()`].
239
    pub const SERIALIZED_LENGTH: u8 = 16;
240

            
241
    /// Serializes this position to `destination`.
242
    ///
243
    /// This writes [`LogPosition::SERIALIZED_LENGTH`] bytes to `destination`.
244
1
    pub fn serialize_to<W: Write>(&self, mut destination: W) -> io::Result<()> {
245
1
        let mut all_bytes = [0; 16];
246
1
        all_bytes[..8].copy_from_slice(&self.file_id.to_le_bytes());
247
1
        all_bytes[8..].copy_from_slice(&self.offset.to_le_bytes());
248
1
        destination.write_all(&all_bytes)
249
1
    }
250

            
251
    /// Deserializes a `LogPosition` from `read`.
252
    ///
253
    /// This reads [`LogPosition::SERIALIZED_LENGTH`] bytes from `read` and
254
    /// returns the deserialized log position.
255
1
    pub fn deserialize_from<R: Read>(mut read: R) -> io::Result<Self> {
256
1
        let mut all_bytes = [0; 16];
257
1
        read.read_exact(&mut all_bytes)?;
258

            
259
1
        let file_id = u64::from_le_bytes(all_bytes[..8].try_into().expect("u64 is 8 bytes"));
260
1
        let offset = u64::from_le_bytes(all_bytes[8..].try_into().expect("u64 is 8 bytes"));
261
1

            
262
1
        Ok(Self { file_id, offset })
263
1
    }
264
}
265

            
266
1
#[test]
267
1
fn log_position_serialization() {
268
1
    let position = LogPosition {
269
1
        file_id: 1,
270
1
        offset: 2,
271
1
    };
272
1
    let mut serialized = Vec::new();
273
1
    position.serialize_to(&mut serialized).unwrap();
274
1
    assert_eq!(
275
1
        serialized.len(),
276
1
        usize::from(LogPosition::SERIALIZED_LENGTH)
277
1
    );
278
1
    let deserialized = LogPosition::deserialize_from(&serialized[..]).unwrap();
279
1
    assert_eq!(position, deserialized);
280
1
}
281

            
282
/// A record of a chunk that was written to a [`WriteAheadLog`].
283
204
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
284
pub struct ChunkRecord {
285
    /// The position of the chunk.
286
    pub position: LogPosition,
287
    /// The CRC calculated for the chunk.
288
    pub crc: u32,
289
    /// The length of the data contained inside of the chunk.
290
    pub length: u32,
291
}
292

            
293
/// The unique id of an entry written to a [`WriteAheadLog`]. These IDs are
294
/// ordered by the time the [`EntryWriter`] was created for the entry written with this id.
295
5328
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Default, Hash)]
296
pub struct EntryId(pub u64);