1
1
#![forbid(unsafe_code)]
2

            
3
use std::io::{self};
4
use std::num::TryFromIntError;
5
use std::path::Path;
6
use std::sync::{Arc, PoisonError};
7

            
8
use okaywal::file_manager::fs::StdFileManager;
9
use okaywal::file_manager::memory::MemoryFileManager;
10
use okaywal::file_manager::FSyncError;
11
use okaywal::{file_manager, WriteAheadLog};
12
pub use transaction::Transaction;
13

            
14
use crate::atlas::{Atlas, GrainReader};
15
use crate::checkpointer::Checkpointer;
16
use crate::commit_log::{CommitLogEntry, CommitLogs};
17
use crate::config::Config;
18
use crate::format::{GrainId, Stored, TransactionId};
19
use crate::store::Store;
20
use crate::transaction::TransactionLock;
21

            
22
mod allocations;
23
mod atlas;
24
mod basinmap;
25
mod checkpointer;
26
mod commit_log;
27
pub mod config;
28
pub mod format;
29
mod store;
30
#[cfg(test)]
31
mod tests;
32
mod transaction;
33
mod util;
34
mod wal;
35

            
36
4
#[derive(Debug, Clone)]
37
pub struct Database<FileManager = StdFileManager>
38
where
39
    FileManager: file_manager::FileManager,
40
{
41
    data: Arc<Data<FileManager>>,
42
    wal: WriteAheadLog<FileManager>,
43
}
44

            
45
impl Database<StdFileManager> {
46
7
    pub fn recover<AsRefPath: AsRef<Path>>(directory: AsRefPath) -> Result<Self> {
47
7
        Config::for_directory(directory).recover()
48
7
    }
49
}
50

            
51
impl Database<MemoryFileManager> {
52
    pub fn in_memory() -> Self {
53
        Config::in_memory()
54
            .recover()
55
            .expect("somehow failed to recover on default memory file manager")
56
    }
57
}
58

            
59
impl<FileManager> Database<FileManager>
60
where
61
    FileManager: file_manager::FileManager,
62
{
63
336
    fn recover_config(config: Config<FileManager>) -> Result<Self> {
64
        // Opening the store restores the database to the last fully committed
65
        // state. Each commit happens when the write ahead log is checkpointed.
66
336
        let store = Store::recover(
67
336
            config.wal.directory.as_ref(),
68
336
            config.wal.file_manager.clone(),
69
336
        )?;
70
331
        let atlas = Atlas::new(&store);
71
331
        let current_metadata = atlas.current_index_metadata()?;
72
331
        let (checkpointer, cp_spawner) = Checkpointer::new(current_metadata.checkpointed_to);
73
331
        let data = Arc::new(Data {
74
331
            store,
75
331
            atlas,
76
331
            tx_lock: TransactionLock::new(current_metadata),
77
331
            checkpointer,
78
331
            commit_logs: CommitLogs::default(),
79
331
        });
80

            
81
        // Recover any transactions from the write ahead log that haven't been
82
        // checkpointed to the store already.
83
331
        let wal = config.wal.open(wal::WalManager::new(&data))?;
84

            
85
        // The wal recovery process may have recovered sediment checkpoints that
86
        // are in the WAL but not yet in permanent storage. Refresh the metadata.
87
331
        let current_metadata = data.atlas.current_index_metadata()?;
88
331
        cp_spawner.spawn(current_metadata.checkpointed_to, &data, &wal)?;
89
331
        if current_metadata.checkpoint_target > current_metadata.checkpointed_to {
90
9
            data.checkpointer
91
9
                .checkpoint_to(current_metadata.checkpoint_target);
92
322
        }
93

            
94
331
        Ok(Self { data, wal })
95
336
    }
96

            
97
3356
    pub fn begin_transaction(&self) -> Result<Transaction<'_, FileManager>> {
98
3356
        let tx_guard = self.data.tx_lock.lock();
99
3356
        let wal_entry = self.wal.begin_entry()?;
100

            
101
3356
        Transaction::new(self, wal_entry, tx_guard)
102
3356
    }
103

            
104
31075
    pub fn read(&self, grain: GrainId) -> Result<Option<GrainReader<FileManager>>> {
105
31075
        self.data.atlas.find(grain, &self.wal)
106
31075
    }
107

            
108
7671
    pub fn read_commit_log_entry(&self, grain: GrainId) -> Result<Option<Arc<CommitLogEntry>>> {
109
7671
        self.data.commit_logs.get_or_lookup(grain, self)
110
7671
    }
111

            
112
    pub fn shutdown(self) -> Result<()> {
113
        // Shut the checkpointer down first, since it may try to access the
114
        // write-ahead log.
115
331
        self.data.checkpointer.shutdown()?;
116
        // Shut down the write-ahead log, which may still end up having its own
117
        // checkpointing process finishing up. This may require the file syncer.
118
331
        self.wal.shutdown()?;
119
        // With everything else shut down, we can now shut down the file
120
        // manager.
121
331
        self.data.store.file_manager.shutdown()?;
122

            
123
331
        Ok(())
124
331
    }
125

            
126
    pub fn checkpoint_target(&self) -> Result<TransactionId> {
127
        Ok(self.data.atlas.current_index_metadata()?.checkpoint_target)
128
    }
129

            
130
7641
    pub fn checkpointed_to(&self) -> Result<TransactionId> {
131
7641
        Ok(self.data.atlas.current_index_metadata()?.checkpointed_to)
132
7641
    }
133

            
134
4
    pub fn embedded_header(&self) -> Result<Option<GrainId>> {
135
4
        Ok(self
136
4
            .data
137
4
            .atlas
138
4
            .current_index_metadata()?
139
            .embedded_header_data)
140
4
    }
141

            
142
39
    pub fn commit_log_head(&self) -> Result<Option<Stored<Arc<CommitLogEntry>>>> {
143
39
        if let Some(entry_id) = self.data.atlas.current_index_metadata()?.commit_log_head {
144
39
            if let Some(entry) = self.read_commit_log_entry(entry_id)? {
145
39
                return Ok(Some(Stored {
146
39
                    grain_id: entry_id,
147
39
                    stored: entry,
148
39
                }));
149
            }
150
        }
151

            
152
        Ok(None)
153
39
    }
154
}
155

            
156
impl<FileManager> Eq for Database<FileManager> where FileManager: file_manager::FileManager {}
157

            
158
impl<FileManager> PartialEq for Database<FileManager>
159
where
160
    FileManager: file_manager::FileManager,
161
{
162
    fn eq(&self, other: &Self) -> bool {
163
        Arc::ptr_eq(&self.data, &other.data)
164
    }
165
}
166

            
167
#[derive(Debug)]
168
struct Data<FileManager>
169
where
170
    FileManager: file_manager::FileManager,
171
{
172
    store: Store<FileManager>,
173
    checkpointer: Checkpointer,
174
    atlas: Atlas<FileManager>,
175
    commit_logs: CommitLogs,
176
    tx_lock: TransactionLock,
177
}
178

            
179
#[derive(thiserror::Error, Debug)]
180
pub enum Error {
181
    #[error("a GrainId was used that was not allocated")]
182
    GrainNotAllocated,
183
    #[error("a poisoned lock was encountered, the database must be closed and reopened")]
184
    LockPoisoned,
185
    #[error("a thread was not able to be joined")]
186
    ThreadJoin,
187
    #[error("crc32 checksum mismatch")]
188
    ChecksumFailed,
189
    #[error("the value is too large to be stored in Sediment")]
190
    GrainTooLarge,
191
    #[error("an invalid grain id was encountered")]
192
    InvalidGrainId,
193
    #[error("the transaction id is not valid for this database")]
194
    InvalidTransactionId,
195
    #[error("value too large for target")]
196
    ValueOutOfBounds,
197
    #[error("io error: {0}")]
198
    Io(#[from] io::Error),
199
    #[error("the service has shut down")]
200
    Shutdown,
201
    #[error("database verification failed: {0}")]
202
    VerificationFailed(String),
203
}
204

            
205
impl Error {
206
11
    fn verification_failed(reason: impl Into<String>) -> Self {
207
11
        Self::VerificationFailed(reason.into())
208
11
    }
209
}
210

            
211
pub type Result<T, E = Error> = std::result::Result<T, E>;
212

            
213
impl From<Error> for io::Error {
214
    fn from(err: Error) -> Self {
215
        match err {
216
            Error::Io(err) => err,
217
            other => io::Error::new(io::ErrorKind::Other, other),
218
        }
219
    }
220
}
221

            
222
impl<T> From<PoisonError<T>> for Error {
223
    fn from(_: PoisonError<T>) -> Self {
224
        Self::LockPoisoned
225
    }
226
}
227

            
228
impl From<TryFromIntError> for Error {
229
    fn from(_: TryFromIntError) -> Self {
230
        Self::ValueOutOfBounds
231
    }
232
}
233

            
234
impl From<FSyncError> for Error {
235
    fn from(error: FSyncError) -> Self {
236
        match error {
237
            FSyncError::Shutdown => Self::Shutdown,
238
            FSyncError::ThreadJoin => Self::ThreadJoin,
239
            FSyncError::InternalInconstency => Self::LockPoisoned,
240
            FSyncError::Io(io) => Self::Io(io),
241
        }
242
    }
243
}