1
use std::fmt::Debug;
2
use std::num::NonZeroUsize;
3
use std::ops::Range;
4
use std::path::Path;
5
use std::sync::Arc;
6

            
7
use rand::prelude::StdRng;
8
use rand::{Rng, SeedableRng};
9
use sediment::format::TransactionId;
10
use sediment::Database;
11
use timings::{Benchmark, BenchmarkImplementation, Label, LabeledTimings, Timings};
12

            
13
const ITERS: u128 = 100;
14
const INSERTS_PER_BATCH: usize = 20;
15

            
16
1
fn main() {
17
1
    #[cfg(any(target_os = "macos", target_os = "ios"))]
18
1
    {
19
1
        if cfg!(feature = "sqlite") && !cfg!(feature = "fbarrier-fsync") {
20
1
            eprintln!("SQLite bundled in macOS uses F_BARRIERFSYNC instead of F_FULLFSYNC, which means it does not provide ACID guarantees. Enable feature `fbarrier-fsync` to configure Sediment to use the same synchronization primitive. See <https://bonsaidb.io/blog/acid-on-apple/> for more information.");
21
1
        }
22
1

            
23
1
        if cfg!(feature = "rocksdb") {
24
1
            if cfg!(feature = "fbarrier-fsync") {
25
1
                eprintln!("RocksDB prior to 7.3.1 only utilizes fdatasync. As of writing this, RocksDB does not support F_BARRIERFSYNC. The current version used by the rocksdb crate is 7.1.2.");
26
1
                eprintln!("rocksdb crate's built version: <https://github.com/rust-rocksdb/rust-rocksdb/blob/master/librocksdb-sys/build_version.cc#L11>");
27
1
                eprintln!("ACID on Apple: <https://bonsaidb.io/blog/acid-on-apple/>");
28
1
            } else {
29
1
                eprintln!("RocksDB does not use F_FULLFSYNC until version 7.3.1. The current version used by the rocksdb crate is 7.1.2.");
30
1
                eprintln!("rocksdb crate's built version: <https://github.com/rust-rocksdb/rust-rocksdb/blob/master/librocksdb-sys/build_version.cc#L11>");
31
1
            }
32
1
        }
33
1
    }
34
1

            
35
1
    let measurements = Timings::default();
36
1

            
37
1
    let source = vec![0; 4096];
38
1
    let mut ranges = Vec::new();
39
1
    let mut rng = StdRng::from_seed([0; 32]);
40
101
    for _ in 0..ITERS {
41
100
        let mut batch = Vec::with_capacity(rng.gen_range(1..INSERTS_PER_BATCH));
42
930
        for _ in 0..batch.capacity() {
43
930
            let start = rng.gen_range(0..source.len());
44
930
            let end = rng.gen_range(start..source.len());
45
930
            batch.push(start..end);
46
930
        }
47
100
        ranges.push(batch);
48
    }
49

            
50
1
    let threads = std::thread::available_parallelism()
51
1
        .map(NonZeroUsize::get)
52
1
        .unwrap_or(4)
53
1
        .max(4);
54
1

            
55
1
    let mut benchmark = Benchmark::for_config(Arc::new(ThreadedInsertsData { source, ranges }))
56
1
        .with_each_number_of_threads([threads * 4, threads * 2, threads, 1]);
57
1

            
58
1
    #[cfg(feature = "sqlite")]
59
1
    {
60
1
        benchmark = benchmark.with::<SqliteThreadedInserts>();
61
1
    }
62
1
    // #[cfg(feature = "rocksdb")]
63
1
    // {
64
1
    //     benchmark = benchmark.with::<self::rocksdb::ThreadedInserts>();
65
1
    // }
66
1

            
67
1
    benchmark = benchmark.with::<SedimentThreadedInserts>();
68
1

            
69
1
    benchmark.run(&measurements).unwrap();
70
1
    // return;
71
1

            
72
1
    measure_sediment(&measurements);
73
1
    #[cfg(feature = "marble")]
74
1
    marble::measure(&measurements);
75
1
    #[cfg(feature = "sqlite")]
76
1
    measure_sqlite(&measurements);
77
1
    // #[cfg(feature = "rocksdb")]
78
1
    // self::rocksdb::measure(&measurements);
79
1

            
80
1
    let stats = measurements.wait_for_stats();
81
1
    timings::print_table_summaries(&stats).unwrap();
82
1
}
83

            
84
1
fn measure_sediment(measurements: &Timings<String>) {
85
1
    let path = Path::new(".bench-suite.sediment");
86
1
    if path.exists() {
87
        std::fs::remove_dir_all(path).unwrap();
88
1
    }
89

            
90
1
    let sediment = Database::recover(path).unwrap();
91
1
    let mut checkpoint_to = TransactionId::default();
92
101
    for i in 0_u128..ITERS {
93
100
        let measurement = measurements.begin("sediment", String::from("insert 16b"));
94
100
        let mut session = sediment.begin_transaction().unwrap();
95
100
        session.write(&i.to_le_bytes()).unwrap();
96
100
        session.checkpoint_to(checkpoint_to).unwrap();
97
100
        checkpoint_to = session.commit().unwrap();
98
100
        measurement.finish();
99
100
    }
100

            
101
1
    sediment.shutdown().unwrap();
102
1
    std::fs::remove_dir_all(path).unwrap();
103
1
}
104

            
105
#[cfg(feature = "sqlite")]
106
1
fn measure_sqlite(measurements: &Timings<String>) {
107
1
    let path = Path::new("./bench-suite.sqlite");
108
1
    if path.exists() {
109
        std::fs::remove_file(path).unwrap();
110
1
    }
111
1
    let mut sqlite = initialize_sqlite(path);
112

            
113
101
    for i in 0_u128..ITERS {
114
100
        let measurement = measurements.begin("sqlite", String::from("insert 16b"));
115
100
        let tx = sqlite.transaction().unwrap();
116
100
        tx.execute("insert into blobs (value) values ($1)", [&i.to_le_bytes()])
117
100
            .unwrap();
118
100
        tx.commit().unwrap();
119
100
        measurement.finish();
120
100
    }
121
1
    drop(sqlite);
122
1

            
123
1
    std::fs::remove_file(path).unwrap();
124
1
}
125

            
126
#[cfg(feature = "sqlite")]
127
30
fn initialize_sqlite(path: &Path) -> rusqlite::Connection {
128
30
    let sqlite = rusqlite::Connection::open(path).unwrap();
129
30
    sqlite
130
30
        .busy_timeout(std::time::Duration::from_secs(3600))
131
30
        .unwrap();
132
30

            
133
30
    #[cfg(any(target_os = "macos", target_os = "ios"))]
134
30
    {
135
30
        // On macOS with built-in SQLite versions, despite the name and the SQLite
136
30
        // documentation, this pragma makes SQLite use `fcntl(_, F_BARRIER_FSYNC,
137
30
        // _)`. There's not a good practical way to make rusqlite's access of SQLite
138
30
        // on macOS to use `F_FULLFSYNC`, which skews benchmarks heavily in favor of
139
30
        // SQLite when not enabling this feature.
140
30
        //
141
30
        // Enabling this feature reduces the durability guarantees, which breaks
142
30
        // ACID compliance. Unless performance is critical on macOS or you know that
143
30
        // ACID compliance is not important for your application, this feature
144
30
        // should be left disabled.
145
30
        //
146
30
        // <https://bonsaidb.io/blog/acid-on-apple/>
147
30
        // <https://www.sqlite.org/pragma.html#pragma_fullfsync>
148
30
        sqlite.pragma_update(None, "fullfsync", "on").unwrap();
149
30
    }
150
30

            
151
30
    sqlite
152
30
        .execute("create table if not exists blobs (value BLOB)", [])
153
30
        .unwrap();
154
30
    sqlite
155
30
}
156

            
157
#[cfg(feature = "marble")]
158
mod marble {
159
    use super::*;
160

            
161
1
    pub fn measure(measurements: &Timings<String>) {
162
1
        let path = Path::new("./bench-suite.marble");
163
1
        if path.exists() {
164
            std::fs::remove_dir_all(path).unwrap();
165
1
        }
166
1
        let marble = ::marble::open(path).unwrap();
167

            
168
101
        for i in 0_u128..ITERS {
169
100
            let measurement = measurements.begin("marble", String::from("insert 16b"));
170
100
            marble
171
100
                .write_batch([(i as u64 + 1, Some(i.to_le_bytes()))])
172
100
                .unwrap();
173
100
            marble.maintenance().unwrap();
174
100
            measurement.finish();
175
100
        }
176

            
177
1
        drop(marble);
178
1
        std::fs::remove_dir_all(path).unwrap();
179
1
    }
180
}
181

            
182
pub struct ThreadedInsertsData {
183
    source: Vec<u8>,
184
    ranges: Vec<Vec<Range<usize>>>,
185
}
186

            
187
impl Debug for ThreadedInsertsData {
188
4
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189
4
        let batches = self.ranges.len();
190
400
        let total_inserts = self.ranges.iter().map(|batch| batch.len()).sum::<usize>();
191
4
        f.debug_struct("ThreadedInsertsData")
192
4
            .field("batches", &batches)
193
4
            .field("total_inserts", &total_inserts)
194
4
            .finish_non_exhaustive()
195
4
    }
196
}
197

            
198
29
#[derive(Debug, Clone)]
199
pub struct SedimentThreadedInserts {
200
    db: Arc<Database>,
201
    number_of_threads: usize,
202
    data: Arc<ThreadedInsertsData>,
203
}
204

            
205
impl BenchmarkImplementation<String, Arc<ThreadedInsertsData>, ()> for SedimentThreadedInserts {
206
    type SharedConfig = Self;
207

            
208
4
    fn initialize_shared_config(
209
4
        number_of_threads: usize,
210
4
        config: &Arc<ThreadedInsertsData>,
211
4
    ) -> Result<Self::SharedConfig, ()> {
212
4
        Ok(Self {
213
4
            db: Arc::new(Database::recover(".threaded-inserts.sediment").unwrap()),
214
4
            number_of_threads,
215
4
            data: config.clone(),
216
4
        })
217
4
    }
218

            
219
8
    fn reset(shutting_down: bool) -> Result<(), ()> {
220
8
        if shutting_down {
221
4
            let path = Path::new(".threaded-inserts.sediment");
222
4
            if path.exists() {
223
4
                println!("Cleaning up.");
224
4
                std::fs::remove_dir_all(path).unwrap();
225
4
            }
226
4
        }
227
8
        Ok(())
228
8
    }
229

            
230
29
    fn initialize(_number_of_threads: usize, config: Self) -> Result<Self, ()> {
231
29
        Ok(config)
232
29
    }
233

            
234
29
    fn measure(&mut self, measurements: &LabeledTimings<String>) -> Result<(), ()> {
235
29
        let mut checkpoint_to = TransactionId::default();
236
2900
        for batch in &self.data.ranges {
237
2900
            let measurement =
238
2900
                measurements.begin(format!("{}-threads-inserts", self.number_of_threads));
239
2900
            let mut session = self.db.begin_transaction().unwrap();
240
29870
            for range in batch {
241
26970
                session.write(&self.data.source[range.clone()]).unwrap();
242
26970
            }
243
2900
            session.checkpoint_to(checkpoint_to).unwrap();
244
2900
            checkpoint_to = session.commit().unwrap();
245
2900
            //session.commit().unwrap();
246
2900
            measurement.finish();
247
        }
248

            
249
        // dbg!(self.db.statistics());
250

            
251
29
        Ok(())
252
29
    }
253

            
254
4
    fn label(_number_of_threads: usize, _config: &Arc<ThreadedInsertsData>) -> Label {
255
4
        Label::from("sediment")
256
4
    }
257
}
258

            
259
impl Drop for SedimentThreadedInserts {
260
33
    fn drop(&mut self) {
261
33
        if Arc::strong_count(&self.db) == 1 {
262
4
            // This is the last instance of this database, shut it down before
263
4
            // the benchmark invokes reset. Otherwise, the checkpointer or the
264
4
            // wal could encounter an error after the files are deleted out from
265
4
            // underneath it.
266
4
            self.db.as_ref().clone().shutdown().unwrap();
267
29
        }
268
33
    }
269
}
270

            
271
#[cfg(feature = "sqlite")]
272
#[derive(Debug)]
273
pub struct SqliteThreadedInserts {
274
    number_of_threads: usize,
275
    data: Arc<ThreadedInsertsData>,
276
}
277

            
278
#[cfg(feature = "sqlite")]
279
impl BenchmarkImplementation<String, Arc<ThreadedInsertsData>, ()> for SqliteThreadedInserts {
280
    type SharedConfig = Arc<ThreadedInsertsData>;
281

            
282
4
    fn initialize_shared_config(
283
4
        _number_of_threads: usize,
284
4
        config: &Arc<ThreadedInsertsData>,
285
4
    ) -> Result<Self::SharedConfig, ()> {
286
4
        Ok(config.clone())
287
4
    }
288

            
289
29
    fn initialize(number_of_threads: usize, config: Arc<ThreadedInsertsData>) -> Result<Self, ()> {
290
29
        Ok(Self {
291
29
            number_of_threads,
292
29
            data: config,
293
29
        })
294
29
    }
295

            
296
29
    fn measure(&mut self, measurements: &LabeledTimings<String>) -> Result<(), ()> {
297
29
        let path = Path::new(".threaded-inserts.sqlite3");
298
29
        let mut db = initialize_sqlite(path);
299

            
300
2900
        for batch in &self.data.ranges {
301
2900
            let measurement =
302
2900
                measurements.begin(format!("{}-threads-inserts", self.number_of_threads));
303
2900
            let tx = db.transaction().unwrap();
304
29870
            for range in batch {
305
26970
                tx.execute(
306
26970
                    "insert into blobs (value) values ($1)",
307
26970
                    [&self.data.source[range.clone()]],
308
26970
                )
309
26970
                .unwrap();
310
26970
            }
311
2900
            tx.commit().unwrap();
312
2900
            measurement.finish();
313
        }
314

            
315
29
        Ok(())
316
29
    }
317

            
318
8
    fn reset(_shutting_down: bool) -> Result<(), ()> {
319
8
        let path = Path::new(".threaded-inserts.sqlite3");
320
8
        if path.exists() {
321
4
            std::fs::remove_file(path).unwrap();
322
4
        }
323
8
        Ok(())
324
8
    }
325

            
326
4
    fn label(_number_of_threads: usize, _config: &Arc<ThreadedInsertsData>) -> Label {
327
4
        Label::from("sqlite")
328
4
    }
329
}
330

            
331
// #[cfg(feature = "rocksdb")]
332
// mod rocksdb {
333
//     use std::path::Path;
334
//     use std::sync::atomic::{AtomicU64, Ordering};
335
//     use std::sync::Arc;
336

            
337
//     use rocksdb::{DBWithThreadMode, MultiThreaded, WriteBatch, WriteOptions, DB};
338
//     use timings::{BenchmarkImplementation, LabeledTimings, Timings};
339

            
340
//     use super::ITERS;
341
//     use crate::ThreadedInsertsData;
342

            
343
//     pub fn measure(measurements: &Timings<String>) {
344
//         let path = Path::new("./bench-suite.rocksdb");
345
//         if path.exists() {
346
//             std::fs::remove_dir_all(path).unwrap();
347
//         }
348
//         let db = DB::open_default(path).unwrap();
349
//         let mut write_opts = WriteOptions::new();
350
//         write_opts.set_sync(true);
351

            
352
//         for i in 0_u128..ITERS {
353
//             let measurement = measurements.begin("rocksdb", String::from("insert 16b"));
354

            
355
//             db.put_opt(i.to_be_bytes(), i.to_le_bytes(), &write_opts)
356
//                 .unwrap();
357
//             measurement.finish();
358
//         }
359

            
360
//         drop(db);
361
//         std::fs::remove_dir_all(path).unwrap();
362
//     }
363

            
364
//     pub struct ThreadedInserts {
365
//         number_of_threads: usize,
366
//         config: ThreadedInsertsConfig,
367
//     }
368

            
369
//     #[derive(Clone)]
370
//     pub struct ThreadedInsertsConfig {
371
//         db: Arc<rocksdb::DBWithThreadMode<MultiThreaded>>,
372
//         unique_id_counter: Arc<AtomicU64>,
373
//         data: Arc<ThreadedInsertsData>,
374
//     }
375

            
376
//     impl BenchmarkImplementation<String, Arc<ThreadedInsertsData>, ()> for ThreadedInserts {
377
//         type SharedConfig = ThreadedInsertsConfig;
378

            
379
//         fn initialize_shared_config(
380
//             _number_of_threads: usize,
381
//             config: &Arc<ThreadedInsertsData>,
382
//         ) -> Result<Self::SharedConfig, ()> {
383
//             let path = Path::new("./.threaded-inserts.rocksdb");
384
//             let db = DBWithThreadMode::<MultiThreaded>::open_default(path).unwrap();
385
//             Ok(ThreadedInsertsConfig {
386
//                 db: Arc::new(db),
387
//                 unique_id_counter: Arc::default(),
388
//                 data: config.clone(),
389
//             })
390
//         }
391

            
392
//         fn initialize(number_of_threads: usize, config: ThreadedInsertsConfig) -> Result<Self, ()> {
393
//             Ok(Self {
394
//                 number_of_threads,
395
//                 config,
396
//             })
397
//         }
398

            
399
//         #[allow(clippy::unnecessary_to_owned)] // TODO submit PR against rocksdb to allow ?Sized
400
//         fn measure(&mut self, measurements: &LabeledTimings<String>) -> Result<(), ()> {
401
//             let mut write_opts = WriteOptions::new();
402
//             write_opts.set_sync(true);
403
//             for batch in &self.config.data.ranges {
404
//                 let measurement =
405
//                     measurements.begin(format!("{}-threads-inserts", self.number_of_threads));
406
//                 let mut write_batch = WriteBatch::default();
407

            
408
//                 for range in batch {
409
//                     let unique_id = self.config.unique_id_counter.fetch_add(1, Ordering::SeqCst);
410
//                     write_batch.put(
411
//                         unique_id.to_be_bytes(),
412
//                         &self.config.data.source[range.clone()].to_vec(),
413
//                     );
414
//                 }
415
//                 self.config.db.write_opt(write_batch, &write_opts).unwrap();
416
//                 measurement.finish();
417
//             }
418

            
419
//             Ok(())
420
//         }
421

            
422
//         fn reset(shutting_down: bool) -> Result<(), ()> {
423
//             let path = Path::new("./.threaded-inserts.rocksdb");
424
//             if path.exists() {
425
//                 std::fs::remove_dir_all(path).unwrap();
426
//             }
427
//             if !shutting_down {
428
//                 std::fs::create_dir(path).unwrap();
429
//             }
430
//             Ok(())
431
//         }
432

            
433
//         fn label(_number_of_threads: usize, _config: &Arc<ThreadedInsertsData>) -> timings::Label {
434
//             "rocksdb".into()
435
//         }
436
//     }
437
// }