1
use std::{convert::Infallible, fmt::Display, sync::Arc};
2

            
3
use okaywal::{Configuration, LogVoid, WriteAheadLog};
4
use tempfile::TempDir;
5
use timings::{Benchmark, BenchmarkImplementation, Label, LabeledTimings, Timings};
6

            
7
1
fn main() {
8
1
    let measurements = Timings::default();
9
1
    let bench = Benchmark::for_each_config(vec![
10
1
        InsertConfig {
11
1
            number_of_bytes: 256,
12
1
            iters: 500,
13
1
        },
14
1
        InsertConfig {
15
1
            number_of_bytes: 1024,
16
1
            iters: 250,
17
1
        },
18
1
        InsertConfig {
19
1
            number_of_bytes: 4096,
20
1
            iters: 125,
21
1
        },
22
1
        InsertConfig {
23
1
            number_of_bytes: 1024 * 1024,
24
1
            iters: 75,
25
1
        },
26
1
    ])
27
1
    .with_each_number_of_threads([1, 2, 4, 8, 16])
28
1
    .with::<OkayWal>();
29
1

            
30
1
    #[cfg(feature = "sharded-log")]
31
1
    let bench = bench.with::<shardedlog::ShardedLog>();
32
1

            
33
1
    #[cfg(feature = "postgres")]
34
1
    let bench = bench.with::<postgres::Postgres>();
35
1

            
36
1
    #[cfg(feature = "sqlite")]
37
1
    let bench = bench.with::<sqlite::SQLite>();
38
1

            
39
1
    bench.run(&measurements).unwrap();
40
1

            
41
1
    let stats = measurements.wait_for_stats();
42
1
    timings::print_table_summaries(&stats).unwrap();
43
1
}
44

            
45
496
#[derive(Copy, Clone, Debug)]
46
struct InsertConfig {
47
    number_of_bytes: usize,
48
    iters: usize,
49
}
50

            
51
struct OkayWal {
52
    config: InsertConfig,
53
    _dir: Arc<TempDir>,
54
    log: WriteAheadLog,
55
}
56

            
57
impl BenchmarkImplementation<Label, InsertConfig, Infallible> for OkayWal {
58
    type SharedConfig = (InsertConfig, Arc<TempDir>, WriteAheadLog);
59

            
60
20
    fn label(number_of_threads: usize, _config: &InsertConfig) -> Label {
61
20
        Label::from(format!("okaywal-{number_of_threads:02}t"))
62
20
    }
63

            
64
20
    fn initialize_shared_config(
65
20
        _number_of_threads: usize,
66
20
        config: &InsertConfig,
67
20
    ) -> Result<Self::SharedConfig, Infallible> {
68
20
        let dir = Arc::new(TempDir::new_in(".").unwrap());
69
20
        let log = Configuration::default_for(&*dir).open(LogVoid).unwrap();
70
20
        Ok((*config, dir, log))
71
20
    }
72

            
73
40
    fn reset(_shutting_down: bool) -> Result<(), Infallible> {
74
40
        Ok(())
75
40
    }
76

            
77
124
    fn initialize(
78
124
        _number_of_threads: usize,
79
124
        (config, dir, log): Self::SharedConfig,
80
124
    ) -> Result<Self, Infallible> {
81
124
        Ok(Self {
82
124
            config,
83
124
            log,
84
124
            _dir: dir,
85
124
        })
86
124
    }
87

            
88
124
    fn measure(&mut self, measurements: &LabeledTimings<Label>) -> Result<(), Infallible> {
89
124
        let metric = Label::from(format!("commit-{}", Bytes(self.config.number_of_bytes)));
90
124
        let data = vec![42; self.config.number_of_bytes];
91
29450
        for _ in 0..self.config.iters {
92
29450
            let measurement = measurements.begin(metric.clone());
93
29450
            let mut session = self.log.begin_entry().unwrap();
94
29450
            session.write_chunk(&data).unwrap();
95
29450
            session.commit().unwrap();
96
29450
            measurement.finish();
97
29450
        }
98
124
        Ok(())
99
124
    }
100
}
101

            
102
#[cfg(feature = "sharded-log")]
103
mod shardedlog {
104
    use super::*;
105

            
106
    pub struct ShardedLog {
107
        config: InsertConfig,
108
        _dir: Arc<TempDir>,
109
        log: sharded_log::ShardedLog,
110
    }
111

            
112
    impl BenchmarkImplementation<Label, InsertConfig, Infallible> for ShardedLog {
113
        type SharedConfig = (InsertConfig, Arc<TempDir>, sharded_log::ShardedLog);
114

            
115
20
        fn label(number_of_threads: usize, _config: &InsertConfig) -> Label {
116
20
            Label::from(format!("sharded-log-{:02}t", number_of_threads))
117
20
        }
118

            
119
20
        fn initialize_shared_config(
120
20
            number_of_threads: usize,
121
20
            config: &InsertConfig,
122
20
        ) -> Result<Self::SharedConfig, Infallible> {
123
20
            let dir = Arc::new(TempDir::new_in(".").unwrap());
124
20
            let log = sharded_log::Config {
125
20
                path: dir.path().to_path_buf(),
126
20
                shards: u8::try_from(number_of_threads).unwrap(),
127
20
                ..sharded_log::Config::default()
128
20
            }
129
20
            .create()
130
20
            .unwrap();
131
20
            Ok((*config, dir, log))
132
20
        }
133

            
134
40
        fn reset(_shutting_down: bool) -> Result<(), Infallible> {
135
40
            Ok(())
136
40
        }
137

            
138
124
        fn initialize(
139
124
            _number_of_threads: usize,
140
124
            (config, dir, log): Self::SharedConfig,
141
124
        ) -> Result<Self, Infallible> {
142
124
            Ok(Self {
143
124
                config,
144
124
                log,
145
124
                _dir: dir,
146
124
            })
147
124
        }
148

            
149
124
        fn measure(&mut self, measurements: &LabeledTimings<Label>) -> Result<(), Infallible> {
150
124
            let metric = Label::from(format!("commit-{}", Bytes(self.config.number_of_bytes)));
151
124
            let data = vec![42; self.config.number_of_bytes];
152
29450
            for _ in 0..self.config.iters {
153
29450
                let measurement = measurements.begin(metric.clone());
154
29450
                self.log.write_batch(&[&data]).unwrap();
155
29450
                self.log.flush().unwrap();
156
29450
                measurement.finish();
157
29450
            }
158
124
            Ok(())
159
124
        }
160
    }
161
}
162

            
163
#[cfg(feature = "postgres")]
164
mod postgres {
165
    use ::postgres::NoTls;
166

            
167
    use super::*;
168

            
169
124
    #[derive(Clone, Debug)]
170
    pub struct Postgres {
171
        config: InsertConfig,
172
        pg_config: ::postgres::Config,
173
    }
174

            
175
    impl BenchmarkImplementation<Label, InsertConfig, Infallible> for Postgres {
176
        type SharedConfig = Self;
177

            
178
20
        fn label(number_of_threads: usize, _config: &InsertConfig) -> Label {
179
20
            Label::from(format!("postgres-{:02}t", number_of_threads))
180
20
        }
181

            
182
20
        fn initialize_shared_config(
183
20
            _number_of_threads: usize,
184
20
            config: &InsertConfig,
185
20
        ) -> Result<Self::SharedConfig, Infallible> {
186
20
            let mut pg_config = ::postgres::Config::new();
187
20
            pg_config
188
20
                .dbname("bench")
189
20
                .host("localhost")
190
20
                .user("bencher")
191
20
                .password("password");
192
20
            let mut client = pg_config.connect(NoTls).unwrap();
193
20
            client
194
20
                .execute("DROP TABLE IF EXISTS okaywal_inserts;", &[])
195
20
                .unwrap();
196
20
            client
197
20
                .execute("CREATE TABLE okaywal_inserts(data bytea);", &[])
198
20
                .unwrap();
199
20
            Ok(Self {
200
20
                config: *config,
201
20
                pg_config,
202
20
            })
203
20
        }
204

            
205
40
        fn reset(_shutting_down: bool) -> Result<(), Infallible> {
206
40
            Ok(())
207
40
        }
208

            
209
124
        fn initialize(
210
124
            _number_of_threads: usize,
211
124
            config: Self::SharedConfig,
212
124
        ) -> Result<Self, Infallible> {
213
124
            Ok(config)
214
124
        }
215

            
216
124
        fn measure(&mut self, measurements: &LabeledTimings<Label>) -> Result<(), Infallible> {
217
124
            let mut client = self.pg_config.connect(NoTls).unwrap();
218
124
            let metric = Label::from(format!("commit-{}", Bytes(self.config.number_of_bytes)));
219
124
            let data = vec![42_u8; self.config.number_of_bytes];
220
29450
            for _ in 0..self.config.iters {
221
29450
                let measurement = measurements.begin(metric.clone());
222
29450
                client
223
29450
                    .execute("INSERT INTO okaywal_inserts (data) values ($1)", &[&data])
224
29450
                    .unwrap();
225
29450

            
226
29450
                measurement.finish();
227
29450
            }
228
124
            Ok(())
229
124
        }
230
    }
231
}
232

            
233
#[cfg(feature = "sqlite")]
234
mod sqlite {
235

            
236
    use std::sync::Mutex;
237

            
238
    use rusqlite::Connection;
239
    use tempfile::NamedTempFile;
240

            
241
    use super::*;
242

            
243
124
    #[derive(Clone, Debug)]
244
    pub struct SQLite {
245
        config: InsertConfig,
246
        sqlite: Arc<Mutex<Connection>>,
247
        _file: Arc<NamedTempFile>,
248
    }
249

            
250
    impl BenchmarkImplementation<Label, InsertConfig, Infallible> for SQLite {
251
        type SharedConfig = Self;
252

            
253
20
        fn label(number_of_threads: usize, _config: &InsertConfig) -> Label {
254
20
            Label::from(format!("sqlite-{:02}t", number_of_threads))
255
20
        }
256

            
257
20
        fn initialize_shared_config(
258
20
            _number_of_threads: usize,
259
20
            config: &InsertConfig,
260
20
        ) -> Result<Self::SharedConfig, Infallible> {
261
20
            let tmp_file = NamedTempFile::new_in(".").unwrap();
262
20
            let sqlite = Connection::open(&tmp_file).unwrap();
263
20
            sqlite
264
20
                .busy_timeout(std::time::Duration::from_secs(3600))
265
20
                .unwrap();
266
20

            
267
20
            #[cfg(any(target_os = "macos", target_os = "ios"))]
268
20
            {
269
20
                // On macOS with built-in SQLite versions, despite the name and the SQLite
270
20
                // documentation, this pragma makes SQLite use `fcntl(_, F_BARRIER_FSYNC,
271
20
                // _)`. There's not a good practical way to make rusqlite's access of SQLite
272
20
                // on macOS use `F_FULLFSYNC`, which skews benchmarks heavily in favor of
273
20
                // SQLite when not enabling this feature.
274
20
                //
275
20
                // Enabling this feature reduces the durability guarantees, which breaks
276
20
                // ACID compliance. Unless performance is critical on macOS or you know that
277
20
                // ACID compliance is not important for your application, this feature
278
20
                // should be left disabled.
279
20
                //
280
20
                // <https://bonsaidb.io/blog/acid-on-apple/>
281
20
                // <https://www.sqlite.org/pragma.html#pragma_fullfsync>
282
20
                sqlite.pragma_update(None, "fullfsync", "on").unwrap();
283
20
                println!("The shipping version of SQLite on macOS is not actually ACID-compliant. See this blog post:\n<https://bonsaidb.io/blog/acid-on-apple/>");
284
20
            }
285
20
            sqlite.pragma_update(None, "journal_mode", "WAL").unwrap();
286
20

            
287
20
            sqlite
288
20
                .execute("create table if not exists okaywal_inserts (data BLOB)", [])
289
20
                .unwrap();
290
20
            Ok(Self {
291
20
                config: *config,
292
20
                sqlite: Arc::new(Mutex::new(sqlite)),
293
20
                _file: Arc::new(tmp_file),
294
20
            })
295
20
        }
296

            
297
40
        fn reset(_shutting_down: bool) -> Result<(), Infallible> {
298
40
            Ok(())
299
40
        }
300

            
301
124
        fn initialize(
302
124
            _number_of_threads: usize,
303
124
            config: Self::SharedConfig,
304
124
        ) -> Result<Self, Infallible> {
305
124
            Ok(config)
306
124
        }
307

            
308
124
        fn measure(&mut self, measurements: &LabeledTimings<Label>) -> Result<(), Infallible> {
309
124
            let metric = Label::from(format!("commit-{}", Bytes(self.config.number_of_bytes)));
310
124
            let data = vec![42_u8; self.config.number_of_bytes];
311
29450
            for _ in 0..self.config.iters {
312
29450
                let measurement = measurements.begin(metric.clone());
313
29450
                let client = self.sqlite.lock().unwrap(); // SQLite doesn't have a way to allow multi-threaded write access to a single database.
314
29450
                client
315
29450
                    .execute("INSERT INTO okaywal_inserts (data) values ($1)", [&data])
316
29450
                    .unwrap();
317
29450
                drop(client);
318
29450
                measurement.finish();
319
29450
            }
320
124
            Ok(())
321
124
        }
322
    }
323
}
324

            
325
struct Bytes(usize);
326

            
327
impl Display for Bytes {
328
496
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329
496
        let (size_number, size_label) = match self.0 {
330
496
            0..=1023 => (self.0, "B"),
331
372
            1_024..=1048575 => (self.0 / 1024, "KB"),
332
124
            1048576..=1073741823 => (self.0 / 1024 / 1024, "MB"),
333
            _ => unreachable!(),
334
        };
335
496
        write!(f, "{size_number}{size_label}")
336
496
    }
337
}