1
use std::collections::{hash_map, HashMap};
2
use std::io::{Read, Write};
3
use std::sync::{Arc, Condvar, Mutex};
4

            
5
use okaywal::file_manager;
6

            
7
use crate::format::{ByteUtil, GrainId, Stored, TransactionId};
8
use crate::util::{u32_to_usize, usize_to_u32};
9
use crate::{Database, Error, Result};
10

            
11
#[derive(Debug)]
12
pub struct CommitLogEntry {
13
    pub transaction_id: TransactionId,
14
    pub next_entry: Option<GrainId>,
15
    pub new_grains: Vec<NewGrain>,
16
    pub archived_grains: Vec<GrainId>,
17
    pub freed_grains: Vec<GrainId>,
18

            
19
    pub embedded_header_data: Option<GrainId>,
20
    pub checkpoint_target: TransactionId,
21
    pub checkpointed_to: TransactionId,
22
}
23

            
24
impl CommitLogEntry {
25
3356
    pub fn new(
26
3356
        transaction_id: TransactionId,
27
3356
        next_entry: Option<GrainId>,
28
3356
        embedded_header_data: Option<GrainId>,
29
3356
        checkpoint_target: TransactionId,
30
3356
        checkpointed_to: TransactionId,
31
3356
    ) -> Self {
32
3356
        Self {
33
3356
            transaction_id,
34
3356
            next_entry,
35
3356
            new_grains: Vec::new(),
36
3356
            archived_grains: Vec::new(),
37
3356
            freed_grains: Vec::new(),
38
3356
            embedded_header_data,
39
3356
            checkpoint_target,
40
3356
            checkpointed_to,
41
3356
        }
42
3356
    }
43

            
44
3353
    pub fn serialize_to(&self, bytes: &mut Vec<u8>) -> Result<()> {
45
3353
        let total_size = 8 // transaction_id
46
3353
            + 8 // next_entry
47
3353
            + 8 // embedded_header_data
48
3353
            + 8 // checkpoint_target
49
3353
            + 8 // checkpointed_to
50
3353
            + 4 * 3 // u32 counts of all three grain types
51
3353
            + (self.new_grains.len() * NewGrain::BYTES)
52
3353
            + (self.archived_grains.len() + self.freed_grains.len()) * 8;
53
3353
        bytes.clear();
54
3353
        bytes.reserve(total_size);
55
3353

            
56
3353
        bytes.write_all(&self.transaction_id.to_be_bytes())?;
57
3353
        bytes.write_all(&self.next_entry.to_be_bytes())?;
58

            
59
3353
        bytes.write_all(&self.embedded_header_data.to_be_bytes())?;
60
3353
        bytes.write_all(&self.checkpoint_target.to_be_bytes())?;
61
3353
        bytes.write_all(&self.checkpointed_to.to_be_bytes())?;
62

            
63
3353
        bytes.write_all(&usize_to_u32(self.new_grains.len())?.to_be_bytes())?;
64
3353
        bytes.write_all(&usize_to_u32(self.archived_grains.len())?.to_be_bytes())?;
65
3353
        bytes.write_all(&usize_to_u32(self.freed_grains.len())?.to_be_bytes())?;
66

            
67
30752
        for grain in &self.new_grains {
68
27399
            bytes.write_all(&grain.id.to_bytes())?;
69
27399
            bytes.write_all(&grain.crc32.to_be_bytes())?;
70
        }
71

            
72
5358
        for grain in &self.archived_grains {
73
2005
            bytes.write_all(&grain.to_bytes())?;
74
        }
75

            
76
4472
        for grain in &self.freed_grains {
77
1119
            bytes.write_all(&grain.to_bytes())?;
78
        }
79

            
80
3353
        Ok(())
81
3353
    }
82

            
83
357
    pub fn read_from<R: Read>(mut reader: R) -> Result<Self> {
84
357
        let mut eight_bytes = [0; 8];
85
357
        reader.read_exact(&mut eight_bytes)?;
86
357
        let transaction_id = TransactionId::from_be_bytes(eight_bytes);
87
357
        reader.read_exact(&mut eight_bytes)?;
88
357
        let next_entry = GrainId::from_bytes(&eight_bytes);
89
357
        reader.read_exact(&mut eight_bytes)?;
90
357
        let embedded_header_data = GrainId::from_bytes(&eight_bytes);
91
357
        reader.read_exact(&mut eight_bytes)?;
92
357
        let checkpoint_target = TransactionId::from_be_bytes(eight_bytes);
93
357
        reader.read_exact(&mut eight_bytes)?;
94
357
        let checkpointed_to = TransactionId::from_be_bytes(eight_bytes);
95
357

            
96
357
        let mut four_bytes = [0; 4];
97
357
        reader.read_exact(&mut four_bytes)?;
98
357
        let new_grain_count = u32::from_be_bytes(four_bytes);
99
357
        reader.read_exact(&mut four_bytes)?;
100
357
        let archived_grain_count = u32::from_be_bytes(four_bytes);
101
357
        reader.read_exact(&mut four_bytes)?;
102
357
        let freed_grain_count = u32::from_be_bytes(four_bytes);
103

            
104
357
        let mut new_grains = Vec::with_capacity(u32_to_usize(new_grain_count)?);
105
357
        for _ in 0..new_grain_count {
106
356
            reader.read_exact(&mut eight_bytes)?;
107
356
            let id = GrainId::from_bytes(&eight_bytes).ok_or(Error::InvalidGrainId)?;
108
356
            reader.read_exact(&mut four_bytes)?;
109
356
            let crc32 = u32::from_be_bytes(four_bytes);
110
356
            new_grains.push(NewGrain { id, crc32 });
111
        }
112

            
113
357
        let mut archived_grains = Vec::with_capacity(u32_to_usize(archived_grain_count)?);
114
357
        for _ in 0..archived_grain_count {
115
96
            reader.read_exact(&mut eight_bytes)?;
116
96
            let id = GrainId::from_bytes(&eight_bytes).ok_or(Error::InvalidGrainId)?;
117
96
            archived_grains.push(id);
118
        }
119

            
120
357
        let mut freed_grains = Vec::with_capacity(u32_to_usize(freed_grain_count)?);
121
357
        for _ in 0..freed_grain_count {
122
72
            reader.read_exact(&mut eight_bytes)?;
123
72
            let id = GrainId::from_bytes(&eight_bytes).ok_or(Error::InvalidGrainId)?;
124
72
            freed_grains.push(id);
125
        }
126

            
127
357
        Ok(Self {
128
357
            transaction_id,
129
357
            next_entry,
130
357
            new_grains,
131
357
            archived_grains,
132
357
            freed_grains,
133
357
            embedded_header_data,
134
357
            checkpoint_target,
135
357
            checkpointed_to,
136
357
        })
137
357
    }
138

            
139
7641
    pub fn next_entry<FileManager>(
140
7641
        &self,
141
7641
        database: &Database<FileManager>,
142
7641
    ) -> Result<Option<Stored<Arc<Self>>>>
143
7641
    where
144
7641
        FileManager: file_manager::FileManager,
145
7641
    {
146
7641
        if self.transaction_id > database.checkpointed_to()? {
147
7641
            if let Some(entry_id) = self.next_entry {
148
7632
                if let Some(entry) = database.read_commit_log_entry(entry_id)? {
149
7632
                    return Ok(Some(Stored {
150
7632
                        grain_id: entry_id,
151
7632
                        stored: entry,
152
7632
                    }));
153
                }
154
9
            }
155
        }
156

            
157
9
        Ok(None)
158
7641
    }
159
}
160

            
161
#[derive(Debug)]
162
pub struct NewGrain {
163
    pub id: GrainId,
164
    pub crc32: u32,
165
}
166

            
167
impl NewGrain {
168
    const BYTES: usize = 12;
169
}
170

            
171
331
#[derive(Debug, Default)]
172
pub struct CommitLogs {
173
    // TODO this should be an LRU
174
    cached: Mutex<HashMap<GrainId, CommitLogCacheEntry>>,
175
    sync: Condvar,
176
}
177

            
178
impl CommitLogs {
179
3353
    pub fn cache(&self, grain_id: GrainId, entry: Arc<CommitLogEntry>) -> Result<()> {
180
3353
        let mut data = self.cached.lock()?;
181
3353
        data.insert(grain_id, CommitLogCacheEntry::Cached(Some(entry)));
182
3353
        Ok(())
183
3353
    }
184

            
185
7671
    pub fn get_or_lookup<FileManager>(
186
7671
        &self,
187
7671
        grain_id: GrainId,
188
7671
        db: &Database<FileManager>,
189
7671
    ) -> Result<Option<Arc<CommitLogEntry>>>
190
7671
    where
191
7671
        FileManager: file_manager::FileManager,
192
7671
    {
193
7671
        let mut data = self.cached.lock()?;
194
7671
        loop {
195
7671
            match data.entry(grain_id) {
196
7612
                hash_map::Entry::Occupied(entry) => match entry.get() {
197
7612
                    CommitLogCacheEntry::Cached(cached) => return Ok(cached.clone()),
198
                    CommitLogCacheEntry::Caching => {
199
                        // Another thread is trying to cache this entry already.
200
                        data = self.sync.wait(data)?;
201
                    }
202
                },
203
59
                hash_map::Entry::Vacant(miss) => {
204
59
                    miss.insert(CommitLogCacheEntry::Caching);
205
59
                    drop(data);
206

            
207
                    // We want to be careful to not cause another thread to
208
                    // block indefinitely if we receive an error.
209
59
                    let result = match Self::read_entry(grain_id, db) {
210
59
                        Ok(entry) => {
211
59
                            let entry = entry.map(Arc::new);
212
59
                            data = self.cached.lock()?;
213
59
                            data.insert(grain_id, CommitLogCacheEntry::Cached(entry.clone()));
214
59
                            Ok(entry)
215
                        }
216
                        Err(err) => {
217
                            // We had an error reading, clear our entry in the
218
                            // cache before returning it.
219
                            data = self.cached.lock()?;
220
                            data.remove(&grain_id);
221
                            Err(err)
222
                        }
223
                    };
224

            
225
59
                    drop(data);
226
59

            
227
59
                    // This is wasteful to wake up all waiting threads, but we
228
59
                    // don't have a good way to notify just a single one.
229
59
                    self.sync.notify_all();
230
59

            
231
59
                    return result;
232
                }
233
            }
234
        }
235
7671
    }
236

            
237
59
    fn read_entry<FileManager>(
238
59
        grain_id: GrainId,
239
59
        db: &Database<FileManager>,
240
59
    ) -> Result<Option<CommitLogEntry>>
241
59
    where
242
59
        FileManager: file_manager::FileManager,
243
59
    {
244
59
        if let Some(reader) = db.read(grain_id)? {
245
59
            let data = reader.read_all_data()?;
246
59
            let entry = CommitLogEntry::read_from(&data[..])?;
247
59
            Ok(Some(entry))
248
        } else {
249
            Ok(None)
250
        }
251
59
    }
252
}
253

            
254
#[derive(Debug)]
255
enum CommitLogCacheEntry {
256
    Cached(Option<Arc<CommitLogEntry>>),
257
    Caching,
258
}