1
//! Net document storage backed by sqlite3.
2
//!
3
//! We store most objects in sqlite tables, except for very large ones,
4
//! which we store as "blob" files in a separate directory.
5

            
6
use super::ExpirationConfig;
7
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8
use crate::err::ReadOnlyStorageError;
9
use crate::storage::{InputString, Store};
10
use crate::{Error, Result};
11

            
12
use fs_mistrust::CheckedDir;
13
use tor_basic_utils::PathExt as _;
14
use tor_error::warn_report;
15
use tor_netdoc::doc::authcert::AuthCertKeyIds;
16
use tor_netdoc::doc::microdesc::MdDigest;
17
use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18
#[cfg(feature = "routerdesc")]
19
use tor_netdoc::doc::routerdesc::RdDigest;
20

            
21
#[cfg(feature = "bridge-client")]
22
pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
23

            
24
use std::collections::HashMap;
25
use std::fs::OpenOptions;
26
use std::path::{Path, PathBuf};
27
use std::sync::Arc;
28
use std::time::SystemTime;
29

            
30
use rusqlite::{params, OpenFlags, OptionalExtension, Transaction};
31
use time::OffsetDateTime;
32
use tracing::{trace, warn};
33

            
34
/// Local directory cache using a Sqlite3 connection.
35
pub(crate) struct SqliteStore {
36
    /// Connection to the sqlite3 database.
37
    conn: rusqlite::Connection,
38
    /// Location for the sqlite3 database; used to reopen it.
39
    sql_path: Option<PathBuf>,
40
    /// Location to store blob files.
41
    blob_dir: CheckedDir,
42
    /// Lockfile to prevent concurrent write attempts from different
43
    /// processes.
44
    ///
45
    /// If this is None we aren't using a lockfile.  Watch out!
46
    ///
47
    /// (sqlite supports that with connection locking, but we want to
48
    /// be a little more coarse-grained here)
49
    lockfile: Option<fslock::LockFile>,
50
}
51

            
52
impl SqliteStore {
53
    /// Construct or open a new SqliteStore at some location on disk.
54
    /// The provided location must be a directory, or a possible
55
    /// location for a directory: the directory will be created if
56
    /// necessary.
57
    ///
58
    /// If readonly is true, the result will be a read-only store.
59
    /// Otherwise, when readonly is false, the result may be
60
    /// read-only or read-write, depending on whether we can acquire
61
    /// the lock.
62
    ///
63
    /// # Limitations:
64
    ///
65
    /// The file locking that we use to ensure that only one dirmgr is
66
    /// writing to a given storage directory at a time is currently
67
    /// _per process_. Therefore, you might get unexpected results if
68
    /// two SqliteStores are created in the same process with the
69
    /// path.
70
113
    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
71
113
        path: P,
72
113
        mistrust: &fs_mistrust::Mistrust,
73
113
        mut readonly: bool,
74
113
    ) -> Result<Self> {
75
113
        let path = path.as_ref();
76
113
        let sqlpath = path.join("dir.sqlite3");
77
113
        let blobpath = path.join("dir_blobs/");
78
113
        let lockpath = path.join("dir.lock");
79
113

            
80
113
        let verifier = mistrust.verifier().permit_readable().check_content();
81

            
82
113
        let blob_dir = if readonly {
83
4
            verifier.secure_dir(blobpath)?
84
        } else {
85
109
            verifier.make_secure_dir(blobpath)?
86
        };
87

            
88
        // Check permissions on the sqlite and lock files; don't require them to
89
        // exist.
90
222
        for p in [&lockpath, &sqlpath] {
91
222
            match mistrust
92
222
                .verifier()
93
222
                .permit_readable()
94
222
                .require_file()
95
222
                .check(p)
96
            {
97
222
                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
98
                Err(e) => return Err(e.into()),
99
            }
100
        }
101

            
102
111
        let mut lockfile = fslock::LockFile::open(&lockpath).map_err(Error::from_lockfile)?;
103
111
        if !readonly && !lockfile.try_lock().map_err(Error::from_lockfile)? {
104
            readonly = true; // we couldn't get the lock!
105
111
        };
106
111
        let flags = if readonly {
107
2
            OpenFlags::SQLITE_OPEN_READ_ONLY
108
        } else {
109
109
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
110
        };
111
111
        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
112
111
        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
113
111
        store.sql_path = Some(sqlpath);
114
111
        store.lockfile = Some(lockfile);
115
111
        Ok(store)
116
113
    }
117

            
118
    /// Construct a new SqliteStore from a database connection and a location
119
    /// for blob files.
120
    ///
121
    /// Used for testing with a memory-backed database.
122
    ///
123
    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
124
    /// this database, since we will freely remove unreferenced files from this directory.
125
    #[cfg(test)]
126
38
    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
127
38
        Self::from_conn_internal(conn, blob_dir, false)
128
38
    }
129

            
130
    /// Construct a new SqliteStore from a database connection and a location
131
    /// for blob files.
132
    ///
133
    /// The `readonly` argument specifies whether the database connection should be read-only.
134
149
    fn from_conn_internal(
135
149
        conn: rusqlite::Connection,
136
149
        blob_dir: CheckedDir,
137
149
        readonly: bool,
138
149
    ) -> Result<Self> {
139
149
        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
140
149
        // pragma on the connection.
141
149
        conn.pragma_update(None, "foreign_keys", "ON")?;
142

            
143
149
        let mut result = SqliteStore {
144
149
            conn,
145
149
            blob_dir,
146
149
            lockfile: None,
147
149
            sql_path: None,
148
149
        };
149
149

            
150
149
        result.check_schema(readonly)?;
151

            
152
147
        Ok(result)
153
149
    }
154

            
155
    /// Check whether this database has a schema format we can read, and
156
    /// install or upgrade the schema if necessary.
157
149
    fn check_schema(&mut self, readonly: bool) -> Result<()> {
158
149
        let tx = self.conn.transaction()?;
159
149
        let db_n_tables: u32 = tx.query_row(
160
149
            "SELECT COUNT(name) FROM sqlite_master
161
149
             WHERE type='table'
162
149
             AND name NOT LIKE 'sqlite_%'",
163
149
            [],
164
183
            |row| row.get(0),
165
149
        )?;
166
149
        let db_exists = db_n_tables > 0;
167
149

            
168
149
        // Update the schema from current_vsn to the latest (does not commit)
169
179
        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
170
282
            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
171
282
                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
172
282
                let new_vsn = from_vsn + 1;
173
282
                if current_vsn < new_vsn {
174
282
                    tx.execute_batch(update)?;
175
282
                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
176
                }
177
            }
178
141
            Ok::<_, Error>(())
179
141
        };
180

            
181
149
        if !db_exists {
182
141
            if !readonly {
183
141
                tx.execute_batch(INSTALL_V0_SCHEMA)?;
184
141
                update_schema(&tx, 0)?;
185
141
                tx.commit()?;
186
            } else {
187
                // The other process should have created the database!
188
                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
189
            }
190
141
            return Ok(());
191
8
        }
192

            
193
8
        let (version, readable_by): (u32, u32) = tx.query_row(
194
8
            "SELECT version, readable_by FROM TorSchemaMeta
195
8
             WHERE name = 'TorDirStorage'",
196
8
            [],
197
12
            |row| Ok((row.get(0)?, row.get(1)?)),
198
8
        )?;
199

            
200
8
        if version < SCHEMA_VERSION {
201
            if !readonly {
202
                update_schema(&tx, version)?;
203
                tx.commit()?;
204
            } else {
205
                return Err(Error::ReadOnlyStorage(
206
                    ReadOnlyStorageError::IncompatibleSchema {
207
                        schema: version,
208
                        supported: SCHEMA_VERSION,
209
                    },
210
                ));
211
            }
212

            
213
            return Ok(());
214
8
        } else if readable_by > SCHEMA_VERSION {
215
2
            return Err(Error::UnrecognizedSchema {
216
2
                schema: readable_by,
217
2
                supported: SCHEMA_VERSION,
218
2
            });
219
6
        }
220
6

            
221
6
        // rolls back the transaction, but nothing was done.
222
6
        Ok(())
223
149
    }
224

            
225
    /// Read a blob from disk, mapping it if possible.
226
    ///
227
    /// Return `Ok(None)` if the file for the blob was not found on disk;
228
    /// returns an error in other cases.
229
22
    fn read_blob(&self, path: &str) -> Result<Option<InputString>> {
230
22
        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
231
22
            Ok(file) => file,
232
            Err(fs_mistrust::Error::NotFound(_)) => {
233
                warn!(
234
                    "{:?} was listed in the database, but its corresponding file had been deleted",
235
                    path
236
                );
237
                self.conn
238
                    .execute(DELETE_EXTDOC_BY_FILENAME, params![path])?;
239
                return Ok(None);
240
            }
241
            Err(e) => return Err(e.into()),
242
        };
243

            
244
22
        InputString::load(file)
245
22
            .map_err(|err| Error::CacheFile {
246
                action: "loading",
247
                fname: PathBuf::from(path),
248
                error: Arc::new(err),
249
22
            })
250
22
            .map(Some)
251
22
    }
252

            
253
    /// Write a file to disk as a blob, and record it in the ExtDocs table.
254
    ///
255
    /// Return a SavedBlobHandle that describes where the blob is, and which
256
    /// can be used either to commit the blob or delete it.
257
16
    fn save_blob_internal(
258
16
        &mut self,
259
16
        contents: &[u8],
260
16
        doctype: &str,
261
16
        dtype: &str,
262
16
        digest: &[u8],
263
16
        expires: OffsetDateTime,
264
16
    ) -> Result<SavedBlobHandle<'_>> {
265
16
        let digest = hex::encode(digest);
266
16
        let digeststr = format!("{}-{}", dtype, digest);
267
16
        let fname = format!("{}_{}", doctype, digeststr);
268

            
269
16
        let full_path = self.blob_dir.join(&fname)?;
270
16
        let unlinker = Unlinker::new(&full_path);
271
16
        self.blob_dir
272
16
            .write_and_replace(&fname, contents)
273
16
            .map_err(|e| match e {
274
                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
275
                    action: "saving",
276
                    fname: full_path,
277
                    error: err,
278
                },
279
                err => err.into(),
280
16
            })?;
281

            
282
16
        let tx = self.conn.unchecked_transaction()?;
283
16
        tx.execute(INSERT_EXTDOC, params![digeststr, expires, dtype, fname])?;
284

            
285
16
        Ok(SavedBlobHandle {
286
16
            tx,
287
16
            fname,
288
16
            digeststr,
289
16
            unlinker,
290
16
        })
291
16
    }
292

            
293
    /// Save a blob to disk and commit it.
294
    #[cfg(test)]
295
6
    fn save_blob(
296
6
        &mut self,
297
6
        contents: &[u8],
298
6
        doctype: &str,
299
6
        dtype: &str,
300
6
        digest: &[u8],
301
6
        expires: OffsetDateTime,
302
6
    ) -> Result<String> {
303
6
        let h = self.save_blob_internal(contents, doctype, dtype, digest, expires)?;
304
        let SavedBlobHandle {
305
6
            tx,
306
6
            digeststr,
307
6
            fname,
308
6
            unlinker,
309
6
        } = h;
310
6
        let _ = digeststr;
311
6
        tx.commit()?;
312
6
        unlinker.forget();
313
6
        Ok(fname)
314
6
    }
315

            
316
    /// Return the valid-after time for the latest non non-pending consensus,
317
    #[cfg(test)]
318
    // We should revise the tests to use latest_consensus_meta instead.
319
6
    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
320
6
        Ok(self
321
6
            .latest_consensus_meta(flavor)?
322
7
            .map(|m| m.lifetime().valid_after().into()))
323
6
    }
324

            
325
    /// Remove the blob with name `fname`, but do not give an error on failure.
326
2
    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
327
2
        let fname = fname.as_ref();
328
2
        if let Err(e) = self.blob_dir.remove_file(fname) {
329
            warn_report!(e, "Unable to remove {}", fname.display_lossy());
330
2
        }
331
2
    }
332

            
333
    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
334
    ///
335
    /// There shouldn't actually be any, but we don't want to let our cache grow infinitely
336
    /// if we have a bug.
337
8
    fn remove_unreferenced_blobs(
338
8
        &self,
339
8
        now: OffsetDateTime,
340
8
        expiration: &ExpirationConfig,
341
8
    ) -> Result<()> {
342
        // Now, look for any unreferenced blobs that are a bit old.
343
8
        for ent in self.blob_dir.read_directory(".")?.flatten() {
344
8
            let md_error = |io_error| Error::CacheFile {
345
                action: "getting metadata",
346
                fname: ent.file_name().into(),
347
                error: Arc::new(io_error),
348
            };
349
8
            if ent
350
8
                .metadata()
351
8
                .map_err(md_error)?
352
8
                .modified()
353
8
                .map_err(md_error)?
354
8
                + expiration.consensuses
355
8
                >= now
356
            {
357
                // this file is sufficiently recent that we should not remove it, just to be cautious.
358
6
                continue;
359
2
            }
360
2
            let filename = match ent.file_name().into_string() {
361
2
                Ok(s) => s,
362
                Err(os_str) => {
363
                    // This filename wasn't utf-8.  We will never create one of these.
364
                    warn!(
365
                        "Removing bizarre file '{}' from blob store.",
366
                        os_str.to_string_lossy()
367
                    );
368
                    self.remove_blob_or_warn(ent.file_name());
369
                    continue;
370
                }
371
            };
372
2
            let found: (u32,) =
373
2
                self.conn
374
3
                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
375
2
                        row.try_into()
376
3
                    })?;
377
2
            if found == (0,) {
378
2
                warn!("Removing unreferenced file '{}' from blob store", &filename);
379
2
                self.remove_blob_or_warn(ent.file_name());
380
            }
381
        }
382

            
383
8
        Ok(())
384
8
    }
385
}
386

            
387
impl Store for SqliteStore {
388
38
    fn is_readonly(&self) -> bool {
389
38
        match &self.lockfile {
390
14
            Some(f) => !f.owns_lock(),
391
24
            None => false,
392
        }
393
38
    }
394
4
    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
395
4
        if self.is_readonly() && self.sql_path.is_some() {
396
2
            let lf = self
397
2
                .lockfile
398
2
                .as_mut()
399
2
                .expect("No lockfile open; cannot upgrade to read-write storage");
400
2
            if !lf.try_lock().map_err(Error::from_lockfile)? {
401
                // Somebody else has the lock.
402
                return Ok(false);
403
2
            }
404
2
            // Unwrap should be safe due to parent `.is_some()` check
405
2
            #[allow(clippy::unwrap_used)]
406
2
            match rusqlite::Connection::open(self.sql_path.as_ref().unwrap()) {
407
2
                Ok(conn) => {
408
2
                    self.conn = conn;
409
2
                }
410
                Err(e) => {
411
                    let _ignore = lf.unlock();
412
                    return Err(e.into());
413
                }
414
            }
415
2
        }
416
4
        Ok(true)
417
4
    }
418
6
    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
419
6
        let tx = self.conn.transaction()?;
420
        // This works around a false positive; see
421
        //   https://github.com/rust-lang/rust-clippy/issues/8114
422
        #[allow(clippy::let_and_return)]
423
6
        let expired_blobs: Vec<String> = {
424
6
            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
425
6
            let names = stmt
426
7
                .query_map([], |row| row.get::<_, String>(0))?
427
6
                .filter_map(std::result::Result::ok)
428
6
                .collect();
429
6
            names
430
6
        };
431
6

            
432
6
        let now = OffsetDateTime::now_utc();
433
6
        tx.execute(DROP_OLD_EXTDOCS, [])?;
434

            
435
        // In theory bad system clocks might generate table rows with times far in the future.
436
        // However, for data which is cached here which comes from the network consensus,
437
        // we rely on the fact that no consensus from the future exists, so this can't happen.
438
6
        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
439
6
        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
440
6
        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
441
6
        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
442

            
443
        // Bridge descriptors come from bridges and bridges might send crazy times,
444
        // so we need to discard any that look like they are from the future,
445
        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
446
        #[cfg(feature = "bridge-client")]
447
6
        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
448

            
449
6
        tx.commit()?;
450
8
        for name in expired_blobs {
451
2
            let fname = self.blob_dir.join(name);
452
2
            if let Ok(fname) = fname {
453
2
                let _ignore = std::fs::remove_file(fname);
454
2
            }
455
        }
456

            
457
6
        self.remove_unreferenced_blobs(now, expiration)?;
458

            
459
6
        Ok(())
460
6
    }
461

            
462
16
    fn latest_consensus(
463
16
        &self,
464
16
        flavor: ConsensusFlavor,
465
16
        pending: Option<bool>,
466
16
    ) -> Result<Option<InputString>> {
467
16
        trace!(?flavor, ?pending, "Loading latest consensus from cache");
468
16
        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
469
12
            None => self
470
12
                .conn
471
16
                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
472
12
                .optional()?,
473
4
            Some(pending_val) => self
474
4
                .conn
475
4
                .query_row(
476
4
                    FIND_CONSENSUS_P,
477
4
                    params![pending_val, flavor.name()],
478
5
                    |row| row.try_into(),
479
4
                )
480
4
                .optional()?,
481
        };
482

            
483
16
        if let Some((_va, _vu, filename)) = rv {
484
            // TODO: If the cache is corrupt (because this blob is missing), and the cache has not yet
485
            // been cleaned, this may fail to find the latest consensus that we actually have.
486
10
            self.read_blob(&filename)
487
        } else {
488
6
            Ok(None)
489
        }
490
16
    }
491
14
    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
492
14
        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
493
14
        let mut rows = stmt.query(params![flavor.name()])?;
494
14
        if let Some(row) = rows.next()? {
495
6
            Ok(Some(cmeta_from_row(row)?))
496
        } else {
497
8
            Ok(None)
498
        }
499
14
    }
500
    #[cfg(test)]
501
4
    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
502
2
        if let Some((text, _)) =
503
4
            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
504
        {
505
2
            Ok(text)
506
        } else {
507
2
            Err(Error::CacheCorruption(
508
2
                "couldn't find a consensus we thought we had.",
509
2
            ))
510
        }
511
4
    }
512
16
    fn consensus_by_sha3_digest_of_signed_part(
513
16
        &self,
514
16
        d: &[u8; 32],
515
16
    ) -> Result<Option<(InputString, ConsensusMeta)>> {
516
16
        let digest = hex::encode(d);
517
16
        let mut stmt = self
518
16
            .conn
519
16
            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
520
16
        let mut rows = stmt.query(params![digest])?;
521
16
        if let Some(row) = rows.next()? {
522
10
            let meta = cmeta_from_row(row)?;
523
10
            let fname: String = row.get(5)?;
524
10
            if let Some(text) = self.read_blob(&fname)? {
525
10
                return Ok(Some((text, meta)));
526
            }
527
6
        }
528
6
        Ok(None)
529
16
    }
530
10
    fn store_consensus(
531
10
        &mut self,
532
10
        cmeta: &ConsensusMeta,
533
10
        flavor: ConsensusFlavor,
534
10
        pending: bool,
535
10
        contents: &str,
536
10
    ) -> Result<()> {
537
10
        let lifetime = cmeta.lifetime();
538
10
        let sha3_of_signed = cmeta.sha3_256_of_signed();
539
10
        let sha3_of_whole = cmeta.sha3_256_of_whole();
540
10
        let valid_after: OffsetDateTime = lifetime.valid_after().into();
541
10
        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
542
10
        let valid_until: OffsetDateTime = lifetime.valid_until().into();
543

            
544
        /// How long to keep a consensus around after it has expired
545
        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
546

            
547
        // After a few days have passed, a consensus is no good for
548
        // anything at all, not even diffs.
549
10
        let expires = valid_until + CONSENSUS_LIFETIME;
550
10

            
551
10
        let doctype = format!("con_{}", flavor.name());
552

            
553
10
        let h = self.save_blob_internal(
554
10
            contents.as_bytes(),
555
10
            &doctype,
556
10
            "sha3-256",
557
10
            &sha3_of_whole[..],
558
10
            expires,
559
10
        )?;
560
10
        h.tx.execute(
561
10
            INSERT_CONSENSUS,
562
10
            params![
563
10
                valid_after,
564
10
                fresh_until,
565
10
                valid_until,
566
10
                flavor.name(),
567
10
                pending,
568
10
                hex::encode(sha3_of_signed),
569
10
                h.digeststr
570
10
            ],
571
10
        )?;
572
10
        h.tx.commit()?;
573
10
        h.unlinker.forget();
574
10
        Ok(())
575
10
    }
576
2
    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
577
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
578
2
        let digest = format!("sha3-256-{}", d);
579

            
580
2
        let tx = self.conn.transaction()?;
581
2
        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
582
2
        trace!("Marked {} consensuses usable", n);
583
2
        tx.commit()?;
584

            
585
2
        Ok(())
586
2
    }
587
2
    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
588
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
589
2
        let digest = format!("sha3-256-{}", d);
590

            
591
        // TODO: We should probably remove the blob as well, but for now
592
        // this is enough.
593
2
        let tx = self.conn.transaction()?;
594
2
        tx.execute(REMOVE_CONSENSUS, params![digest])?;
595
2
        tx.commit()?;
596

            
597
2
        Ok(())
598
2
    }
599

            
600
8
    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
601
8
        let mut result = HashMap::new();
602
        // TODO(nickm): Do I need to get a transaction here for performance?
603
8
        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
604

            
605
18
        for ids in certs {
606
10
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
607
10
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
608
10
            if let Some(contents) = stmt
609
13
                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
610
10
                .optional()?
611
6
            {
612
6
                result.insert(*ids, contents);
613
6
            }
614
        }
615

            
616
8
        Ok(result)
617
8
    }
618
6
    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
619
6
        let tx = self.conn.transaction()?;
620
6
        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
621
14
        for (meta, content) in certs {
622
8
            let ids = meta.key_ids();
623
8
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
624
8
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
625
8
            let published: OffsetDateTime = meta.published().into();
626
8
            let expires: OffsetDateTime = meta.expires().into();
627
8
            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
628
        }
629
6
        stmt.finalize()?;
630
6
        tx.commit()?;
631
6
        Ok(())
632
6
    }
633

            
634
24
    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
635
24
        let mut result = HashMap::new();
636
24
        let mut stmt = self.conn.prepare(FIND_MD)?;
637

            
638
        // TODO(nickm): Should I speed this up with a transaction, or
639
        // does it not matter for queries?
640
82
        for md_digest in digests {
641
58
            let h_digest = hex::encode(md_digest);
642
58
            if let Some(contents) = stmt
643
80
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
644
58
                .optional()?
645
44
            {
646
44
                result.insert(*md_digest, contents);
647
44
            }
648
        }
649

            
650
24
        Ok(result)
651
24
    }
652
22
    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
653
22
        let when: OffsetDateTime = when.into();
654

            
655
22
        let tx = self.conn.transaction()?;
656
22
        let mut stmt = tx.prepare(INSERT_MD)?;
657

            
658
56
        for (content, md_digest) in digests {
659
34
            let h_digest = hex::encode(md_digest);
660
34
            stmt.execute(params![h_digest, when, content])?;
661
        }
662
22
        stmt.finalize()?;
663
22
        tx.commit()?;
664
22
        Ok(())
665
22
    }
666
4
    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
667
4
        let tx = self.conn.transaction()?;
668
4
        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
669
4
        let when: OffsetDateTime = when.into();
670

            
671
8
        for md_digest in digests {
672
4
            let h_digest = hex::encode(md_digest);
673
4
            stmt.execute(params![when, h_digest])?;
674
        }
675

            
676
4
        stmt.finalize()?;
677
4
        tx.commit()?;
678
4
        Ok(())
679
4
    }
680

            
681
    #[cfg(feature = "routerdesc")]
682
6
    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
683
6
        let mut result = HashMap::new();
684
6
        let mut stmt = self.conn.prepare(FIND_RD)?;
685

            
686
        // TODO(nickm): Should I speed this up with a transaction, or
687
        // does it not matter for queries?
688
20
        for rd_digest in digests {
689
14
            let h_digest = hex::encode(rd_digest);
690
14
            if let Some(contents) = stmt
691
18
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
692
14
                .optional()?
693
8
            {
694
8
                result.insert(*rd_digest, contents);
695
8
            }
696
        }
697

            
698
6
        Ok(result)
699
6
    }
700
    #[cfg(feature = "routerdesc")]
701
4
    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
702
4
        let tx = self.conn.transaction()?;
703
4
        let mut stmt = tx.prepare(INSERT_RD)?;
704

            
705
14
        for (content, when, rd_digest) in digests {
706
10
            let when: OffsetDateTime = (*when).into();
707
10
            let h_digest = hex::encode(rd_digest);
708
10
            stmt.execute(params![h_digest, when, content])?;
709
        }
710
4
        stmt.finalize()?;
711
4
        tx.commit()?;
712
4
        Ok(())
713
4
    }
714

            
715
    #[cfg(feature = "bridge-client")]
716
92
    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
717
92
        let bridge_line = bridge.to_string();
718
92
        Ok(self
719
92
            .conn
720
100
            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
721
16
                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
722
16
                let fetched = fetched.into();
723
16
                Ok(CachedBridgeDescriptor { fetched, document })
724
100
            })
725
92
            .optional()?)
726
92
    }
727

            
728
    #[cfg(feature = "bridge-client")]
729
24
    fn store_bridgedesc(
730
24
        &mut self,
731
24
        bridge: &BridgeConfig,
732
24
        entry: CachedBridgeDescriptor,
733
24
        until: SystemTime,
734
24
    ) -> Result<()> {
735
24
        if self.is_readonly() {
736
            // Hopefully whoever *does* have the lock will update the cache.
737
            // Otherwise it will contain a stale entry forever
738
            // (which we'll ignore, but waste effort on).
739
            return Ok(());
740
24
        }
741
24
        let bridge_line = bridge.to_string();
742
24
        let row = params![
743
24
            bridge_line,
744
24
            OffsetDateTime::from(entry.fetched),
745
24
            OffsetDateTime::from(until),
746
24
            entry.document,
747
24
        ];
748
24
        self.conn.execute(INSERT_BRIDGEDESC, row)?;
749
24
        Ok(())
750
24
    }
751

            
752
    #[cfg(feature = "bridge-client")]
753
    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
754
        if self.is_readonly() {
755
            // This is called when we find corrupted or stale cache entries,
756
            // to stop us wasting time on them next time.
757
            // Hopefully whoever *does* have the lock will do this.
758
            return Ok(());
759
        }
760
        let bridge_line = bridge.to_string();
761
        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
762
        Ok(())
763
    }
764
}
765

            
766
/// Handle to a blob that we have saved to disk but not yet committed to
767
/// the database.
768
struct SavedBlobHandle<'a> {
769
    /// Transaction we're using to add the blob to the ExtDocs table.
770
    tx: Transaction<'a>,
771
    /// Filename for the file, with respect to the blob directory.
772
    #[allow(unused)]
773
    fname: String,
774
    /// Declared digest string for this blob. Of the format
775
    /// "digesttype-hexstr".
776
    digeststr: String,
777
    /// An 'unlinker' for the blob file.
778
    unlinker: Unlinker,
779
}
780

            
781
/// Handle to a file which we might have to delete.
782
///
783
/// When this handle is dropped, the file gets deleted, unless you have
784
/// first called [`Unlinker::forget`].
785
struct Unlinker {
786
    /// The location of the file to remove, or None if we shouldn't
787
    /// remove it.
788
    p: Option<PathBuf>,
789
}
790
impl Unlinker {
791
    /// Make a new Unlinker for a given filename.
792
16
    fn new<P: AsRef<Path>>(p: P) -> Self {
793
16
        Unlinker {
794
16
            p: Some(p.as_ref().to_path_buf()),
795
16
        }
796
16
    }
797
    /// Forget about this unlinker, so that the corresponding file won't
798
    /// get dropped.
799
16
    fn forget(mut self) {
800
16
        self.p = None;
801
16
    }
802
}
803
impl Drop for Unlinker {
804
16
    fn drop(&mut self) {
805
16
        if let Some(p) = self.p.take() {
806
            let _ignore_err = std::fs::remove_file(p);
807
16
        }
808
16
    }
809
}
810

            
811
/// Convert a hexadecimal sha3-256 digest from the database into an array.
812
32
fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
813
32
    let mut bytes = [0_u8; 32];
814
32
    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
815
32
    Ok(bytes)
816
32
}
817

            
818
/// Convert a hexadecimal sha3-256 "digest string" as used in the
819
/// digest column from the database into an array.
820
16
fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
821
16
    if let Some(stripped) = s.strip_prefix("sha3-256-") {
822
16
        digest_from_hex(stripped)
823
    } else {
824
        Err(Error::CacheCorruption("Invalid digest in database"))
825
    }
826
16
}
827

            
828
/// Create a ConsensusMeta from a `Row` returned by one of
829
/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
830
16
fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
831
16
    let va: OffsetDateTime = row.get(0)?;
832
16
    let fu: OffsetDateTime = row.get(1)?;
833
16
    let vu: OffsetDateTime = row.get(2)?;
834
16
    let d_signed: String = row.get(3)?;
835
16
    let d_all: String = row.get(4)?;
836
16
    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
837
16
        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
838
    Ok(ConsensusMeta::new(
839
16
        lifetime,
840
16
        digest_from_hex(&d_signed)?,
841
16
        digest_from_dstr(&d_all)?,
842
    ))
843
16
}
844

            
845
/// Set up the tables for the arti cache schema in a sqlite database.
846
const INSTALL_V0_SCHEMA: &str = "
847
  -- Helps us version the schema.  The schema here corresponds to a
848
  -- version number called 'version', and it should be readable by
849
  -- anybody who is compliant with versions of at least 'readable_by'.
850
  CREATE TABLE TorSchemaMeta (
851
     name TEXT NOT NULL PRIMARY KEY,
852
     version INTEGER NOT NULL,
853
     readable_by INTEGER NOT NULL
854
  );
855

            
856
  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
857

            
858
  -- Keeps track of external blobs on disk.
859
  CREATE TABLE ExtDocs (
860
    -- Records a digest of the file contents, in the form 'dtype-hexstr'
861
    digest TEXT PRIMARY KEY NOT NULL,
862
    -- When was this file created?
863
    created DATE NOT NULL,
864
    -- After what time will this file definitely be useless?
865
    expires DATE NOT NULL,
866
    -- What is the type of this file? Currently supported are 'con:<flavor>'.
867
    type TEXT NOT NULL,
868
    -- Filename for this file within our blob directory.
869
    filename TEXT NOT NULL
870
  );
871

            
872
  -- All the microdescriptors we know about.
873
  CREATE TABLE Microdescs (
874
    sha256_digest TEXT PRIMARY KEY NOT NULL,
875
    last_listed DATE NOT NULL,
876
    contents BLOB NOT NULL
877
  );
878

            
879
  -- All the authority certificates we know.
880
  CREATE TABLE Authcerts (
881
    id_digest TEXT NOT NULL,
882
    sk_digest TEXT NOT NULL,
883
    published DATE NOT NULL,
884
    expires DATE NOT NULL,
885
    contents BLOB NOT NULL,
886
    PRIMARY KEY (id_digest, sk_digest)
887
  );
888

            
889
  -- All the consensuses we're storing.
890
  CREATE TABLE Consensuses (
891
    valid_after DATE NOT NULL,
892
    fresh_until DATE NOT NULL,
893
    valid_until DATE NOT NULL,
894
    flavor TEXT NOT NULL,
895
    pending BOOLEAN NOT NULL,
896
    sha3_of_signed_part TEXT NOT NULL,
897
    digest TEXT NOT NULL,
898
    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
899
  );
900
  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
901

            
902
";
903

            
904
/// Update the database schema, from each version to the next
905
const UPDATE_SCHEMA: &[&str] = &["
906
  -- Update the database schema from version 0 to version 1.
907
  CREATE TABLE RouterDescs (
908
    sha1_digest TEXT PRIMARY KEY NOT NULL,
909
    published DATE NOT NULL,
910
    contents BLOB NOT NULL
911
  );
912
","
913
  -- Update the database schema from version 1 to version 2.
914
  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
915
  CREATE TABLE BridgeDescs (
916
    bridge_line TEXT PRIMARY KEY NOT NULL,
917
    fetched DATE NOT NULL,
918
    until DATE NOT NULL,
919
    contents BLOB NOT NULL
920
  );
921
"];
922

            
923
/// Update the database schema version tracking, from each version to the next
924
const UPDATE_SCHEMA_VERSION: &str = "
925
  UPDATE TorSchemaMeta SET version=? WHERE version<?;
926
";
927

            
928
/// Version number used for this version of the arti cache schema.
929
const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
930

            
931
/// Query: find the latest-expiring microdesc consensus with a given
932
/// pending status.
933
const FIND_CONSENSUS_P: &str = "
934
  SELECT valid_after, valid_until, filename
935
  FROM Consensuses
936
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
937
  WHERE pending = ? AND flavor = ?
938
  ORDER BY valid_until DESC
939
  LIMIT 1;
940
";
941

            
942
/// Query: find the latest-expiring microdesc consensus, regardless of
943
/// pending status.
944
const FIND_CONSENSUS: &str = "
945
  SELECT valid_after, valid_until, filename
946
  FROM Consensuses
947
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
948
  WHERE flavor = ?
949
  ORDER BY valid_until DESC
950
  LIMIT 1;
951
";
952

            
953
/// Query: Find the valid-after time for the latest-expiring
954
/// non-pending consensus of a given flavor.
955
const FIND_LATEST_CONSENSUS_META: &str = "
956
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
957
  FROM Consensuses
958
  WHERE pending = 0 AND flavor = ?
959
  ORDER BY valid_until DESC
960
  LIMIT 1;
961
";
962

            
963
/// Look up a consensus by its digest-of-signed-part string.
964
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
965
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
966
  FROM Consensuses
967
  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
968
  WHERE Consensuses.sha3_of_signed_part = ?
969
  LIMIT 1;
970
";
971

            
972
/// Query: Update the consensus whose digest field is 'digest' to call it
973
/// no longer pending.
974
const MARK_CONSENSUS_NON_PENDING: &str = "
975
  UPDATE Consensuses
976
  SET pending = 0
977
  WHERE digest = ?;
978
";
979

            
980
/// Query: Remove the consensus with a given digest field.
981
#[allow(dead_code)]
982
const REMOVE_CONSENSUS: &str = "
983
  DELETE FROM Consensuses
984
  WHERE digest = ?;
985
";
986

            
987
/// Query: Find the authority certificate with given key digests.
988
const FIND_AUTHCERT: &str = "
989
  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
990
";
991

            
992
/// Query: find the microdescriptor with a given hex-encoded sha256 digest
993
const FIND_MD: &str = "
994
  SELECT contents
995
  FROM Microdescs
996
  WHERE sha256_digest = ?
997
";
998

            
999
/// Query: find the router descriptors with a given hex-encoded sha1 digest
#[cfg(feature = "routerdesc")]
const FIND_RD: &str = "
  SELECT contents
  FROM RouterDescs
  WHERE sha1_digest = ?
";
/// Query: find every ExtDocs member that has expired.
const FIND_EXPIRED_EXTDOCS: &str = "
  SELECT filename FROM ExtDocs where expires < datetime('now');
";
/// Query: find whether an ExtDoc is listed.
const COUNT_EXTDOC_BY_PATH: &str = "
  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
";
/// Query: Add a new entry to ExtDocs.
const INSERT_EXTDOC: &str = "
  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
  VALUES ( ?, datetime('now'), ?, ?, ? );
";
/// Query: Add a new consensus.
const INSERT_CONSENSUS: &str = "
  INSERT OR REPLACE INTO Consensuses
    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
  VALUES ( ?, ?, ?, ?, ?, ?, ? );
";
/// Query: Add a new AuthCert
const INSERT_AUTHCERT: &str = "
  INSERT OR REPLACE INTO Authcerts
    ( id_digest, sk_digest, published, expires, contents)
  VALUES ( ?, ?, ?, ?, ? );
";
/// Query: Add a new microdescriptor
const INSERT_MD: &str = "
  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Add a new router descriptor
#[allow(unused)]
#[cfg(feature = "routerdesc")]
const INSERT_RD: &str = "
  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Change the time when a given microdescriptor was last listed.
const UPDATE_MD_LISTED: &str = "
  UPDATE Microdescs
  SET last_listed = max(last_listed, ?)
  WHERE sha256_digest = ?;
";
/// Query: Find a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Record a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const INSERT_BRIDGEDESC: &str = "
  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
  VALUES ( ?, ?, ?, ? );
";
/// Query: Remove a cached bridge descriptor
#[cfg(feature = "bridge-client")]
#[allow(dead_code)]
const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Discard every expired extdoc.
///
/// External documents aren't exposed through [`Store`].
const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
/// Query: Discard an extdoc with a given path.
const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
/// Query: Discard every router descriptor that hasn't been listed for 3
/// months.
// TODO: Choose a more realistic time.
const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
// TODO: Choose a more realistic time.
const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
/// Query: Discard every expired authority certificate.
const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
/// Query: Discard every consensus that's been expired for at least
/// two days.
const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
#[cfg(feature = "bridge-client")]
const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
#[cfg(test)]
pub(crate) mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::storage::EXPIRATION_DEFAULTS;
    use hex_literal::hex;
    use tempfile::{tempdir, TempDir};
    use time::ext::NumericalDuration;
    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
        let tmp_dir = tempdir().unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        let conn = rusqlite::Connection::open(sql_path)?;
        let blob_path = tmp_dir.path().join("blobs");
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .make_secure_dir(blob_path)
            .unwrap();
        let store = SqliteStore::from_conn(conn, blob_dir)?;
        Ok((tmp_dir, store))
    }
    #[test]
    fn init() -> Result<()> {
        let tmp_dir = tempdir().unwrap();
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .secure_dir(&tmp_dir)
            .unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        // Initial setup: everything should work.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Second setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Third setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Fourth: this says we can't read it, so we'll get an error.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
            let val = SqliteStore::from_conn(conn, blob_dir);
            assert!(val.is_err());
        }
        Ok(())
    }
    #[test]
    fn bad_blob_fname() -> Result<()> {
        let (_tmp_dir, store) = new_empty()?;
        assert!(store.blob_dir.join("abcd").is_ok());
        assert!(store.blob_dir.join("abcd..").is_ok());
        assert!(store.blob_dir.join("..abcd..").is_ok());
        assert!(store.blob_dir.join(".abcd").is_ok());
        assert!(store.blob_dir.join("..").is_err());
        assert!(store.blob_dir.join("../abcd").is_err());
        assert!(store.blob_dir.join("/abcd").is_err());
        Ok(())
    }
    #[test]
    fn blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        let fname1 = store.save_blob(
            b"Hello world",
            "greeting",
            "sha1",
            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
            now + one_week,
        )?;
        let fname2 = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now - one_week,
        )?;
        assert_eq!(
            fname1,
            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
            b"Goodbye, dear friends"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 2);
        let blob = store.read_blob(&fname2)?.unwrap();
        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
        // Now expire: the second file should go away.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        Ok(())
    }
    #[test]
    fn consensus() -> Result<()> {
        use tor_netdoc::doc::netstatus;
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_hour = 1.hours();
        assert_eq!(
            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
            None
        );
        let cmeta = ConsensusMeta::new(
            netstatus::Lifetime::new(
                now.into(),
                (now + one_hour).into(),
                SystemTime::from(now + one_hour * 2),
            )
            .unwrap(),
            [0xAB; 32],
            [0xBC; 32],
        );
        store.store_consensus(
            &cmeta,
            ConsensusFlavor::Microdesc,
            true,
            "Pretend this is a consensus",
        )?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                None
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
            assert!(consensus.is_none());
        }
        store.mark_consensus_usable(&cmeta)?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                now.into()
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
        }
        {
            let consensus_text = store.consensus_by_meta(&cmeta)?;
            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
            let (is, _cmeta2) = store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .unwrap();
            assert_eq!(is.as_str()?, "Pretend this is a consensus");
            let cmeta3 = ConsensusMeta::new(
                netstatus::Lifetime::new(
                    now.into(),
                    (now + one_hour).into(),
                    SystemTime::from(now + one_hour * 2),
                )
                .unwrap(),
                [0x99; 32],
                [0x99; 32],
            );
            assert!(store.consensus_by_meta(&cmeta3).is_err());
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
                .is_none());
        }
        {
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_some());
            store.delete_consensus(&cmeta)?;
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_none());
        }
        Ok(())
    }
    #[test]
    fn authcerts() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_hour = 1.hours();
        let keyids = AuthCertKeyIds {
            id_fingerprint: [3; 20].into(),
            sk_fingerprint: [4; 20].into(),
        };
        let keyids2 = AuthCertKeyIds {
            id_fingerprint: [4; 20].into(),
            sk_fingerprint: [3; 20].into(),
        };
        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
        let certs = store.authcerts(&[keyids, keyids2])?;
        assert_eq!(certs.len(), 1);
        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
        Ok(())
    }
    #[test]
    fn microdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();
        let d1 = [5_u8; 32];
        let d2 = [7; 32];
        let d3 = [42; 32];
        let d4 = [99; 32];
        let long_ago: OffsetDateTime = now - one_day * 100;
        store.store_microdescs(
            &[
                ("Fake micro 1", &d1),
                ("Fake micro 2", &d2),
                ("Fake micro 3", &d3),
            ],
            long_ago.into(),
        )?;
        store.update_microdescs_listed(&[d2], now.into())?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 2);
        assert_eq!(mds.get(&d1), None);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
        assert_eq!(mds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 1);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        Ok(())
    }
    #[test]
    #[cfg(feature = "routerdesc")]
    fn routerdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();
        let long_ago: OffsetDateTime = now - one_day * 100;
        let recently = now - one_day;
        let d1 = [5_u8; 20];
        let d2 = [7; 20];
        let d3 = [42; 20];
        let d4 = [99; 20];
        store.store_routerdescs(&[
            ("Fake routerdesc 1", long_ago.into(), &d1),
            ("Fake routerdesc 2", recently.into(), &d2),
            ("Fake routerdesc 3", long_ago.into(), &d3),
        ])?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 2);
        assert_eq!(rds.get(&d1), None);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
        assert_eq!(rds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 1);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        Ok(())
    }
    #[test]
    fn from_path_rw() -> Result<()> {
        let tmp = tempdir().unwrap();
        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
        // Nothing there: can't open read-only
        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
        assert!(r.is_err());
        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
        // Opening it read-write will crate the files
        {
            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
            assert!(tmp.path().join("dir_blobs").is_dir());
            assert!(store.lockfile.is_some());
            assert!(!store.is_readonly());
            assert!(store.upgrade_to_readwrite()?); // no-op.
        }
        // At this point, we can successfully make a read-only connection.
        {
            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
            assert!(store2.is_readonly());
            // Nobody else is locking this, so we can upgrade.
            assert!(store2.upgrade_to_readwrite()?); // no-op.
            assert!(!store2.is_readonly());
        }
        Ok(())
    }
    #[test]
    fn orphaned_blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        /*
        for ent in store.blob_dir.read_directory(".")?.flatten() {
            println!("{:?}", ent);
        }
        */
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        let _fname_good = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
        // timestamp.
        store
            .blob_dir
            .write_and_replace("fairly_new", b"new contents will stay")?;
        store
            .blob_dir
            .write_and_replace("fairly_old", b"old contents will be removed")?;
        filetime::set_file_mtime(
            store.blob_dir.join("fairly_old")?,
            SystemTime::from(now - one_week).into(),
        )
        .expect("Can't adjust mtime");
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
        Ok(())
    }
}