1
//! Net document storage backed by sqlite3.
2
//!
3
//! We store most objects in sqlite tables, except for very large ones,
4
//! which we store as "blob" files in a separate directory.
5

            
6
use super::ExpirationConfig;
7
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8
use crate::err::ReadOnlyStorageError;
9
use crate::storage::{InputString, Store};
10
use crate::{Error, Result};
11

            
12
use fs_mistrust::CheckedDir;
13
use tor_basic_utils::PathExt as _;
14
use tor_error::{into_internal, warn_report};
15
use tor_netdoc::doc::authcert::AuthCertKeyIds;
16
use tor_netdoc::doc::microdesc::MdDigest;
17
use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18
#[cfg(feature = "routerdesc")]
19
use tor_netdoc::doc::routerdesc::RdDigest;
20

            
21
#[cfg(feature = "bridge-client")]
22
pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
23

            
24
use std::collections::{HashMap, HashSet};
25
use std::fs::OpenOptions;
26
use std::path::{Path, PathBuf};
27
use std::result::Result as StdResult;
28
use std::sync::Arc;
29
use std::time::SystemTime;
30

            
31
use rusqlite::{params, OpenFlags, OptionalExtension, Transaction};
32
use time::OffsetDateTime;
33
use tracing::{trace, warn};
34

            
35
/// Local directory cache using a Sqlite3 connection.
36
pub(crate) struct SqliteStore {
37
    /// Connection to the sqlite3 database.
38
    conn: rusqlite::Connection,
39
    /// Location for the sqlite3 database; used to reopen it.
40
    sql_path: Option<PathBuf>,
41
    /// Location to store blob files.
42
    blob_dir: CheckedDir,
43
    /// Lockfile to prevent concurrent write attempts from different
44
    /// processes.
45
    ///
46
    /// If this is None we aren't using a lockfile.  Watch out!
47
    ///
48
    /// (sqlite supports that with connection locking, but we want to
49
    /// be a little more coarse-grained here)
50
    lockfile: Option<fslock::LockFile>,
51
}
52

            
53
/// # Some notes on blob consistency, and the lack thereof.
54
///
55
/// We store large documents (currently, consensuses) in separate files,
56
/// called "blobs",
57
/// outside of the the sqlite database.
58
/// We do this for performance reasons: for large objects,
59
/// mmap is far more efficient than sqlite in RAM and CPU.
60
///
61
/// In the sqlite database, we keep track of our blobs
62
/// using the ExtDocs table.
63
/// This scheme makes it possible for the blobs and the table
64
/// get out of sync.
65
///
66
/// In summary:
67
///   - _Vanished_ blobs (ones present only in ExtDocs) are possible;
68
///     we try to tolerate them.
69
///   - _Orphaned_ blobs (ones present only on the disk) are possible;
70
///     we try to tolerate them.
71
///   - _Corrupted_ blobs (ones with the wrong contents) are possible
72
///     but (we hope) unlikely;
73
///     we do not currently try to tolerate them.
74
///
75
/// In more detail:
76
///
77
/// Here are the practices we use when _writing_ blobs:
78
///
79
/// - We always create a blob before updating the ExtDocs table,
80
///   and remove an entry from the ExtDocs before deleting the blob.
81
/// - If we decide to roll back the transaction that adds the row to ExtDocs,
82
///   we delete the blob after doing so.
83
/// - We use [`CheckedDir::write_and_replace`] to store blobs,
84
///   so a half-formed blob shouldn't be common.
85
///   (We assume that "close" and "rename" are serialized by the OS,
86
///   so that _if_ the rename happens, the file is completely written.)
87
/// - Blob filenames include a digest of the file contents,
88
///   so collisions are unlikely.
89
///
90
/// Here are the practices we use when _deleting_ blobs:
91
/// - First, we drop the row from the ExtDocs table.
92
///   Only then do we delete the file.
93
///
94
/// These practices can result in _orphaned_ blobs
95
/// (ones with no row in the ExtDoc table),
96
/// or in _half-written_ blobs files with tempfile names
97
/// (which also have no row in the ExtDoc table).
98
/// This happens if we crash at the wrong moment.
99
/// Such blobs can be safely removed;
100
/// we do so in [`SqliteStore::remove_unreferenced_blobs`].
101
///
102
/// Despite our efforts, _vanished_ blobs
103
/// (entries in the ExtDoc table with no corresponding file)
104
/// are also possible.  They could happen for these reasons:
105
/// - The filesystem might not serialize or sync things in a way that's
106
///   consistent with the DB.
107
/// - An automatic process might remove random cache files.
108
/// - The user might run around deleting things to free space.
109
///
110
/// We try to tolerate vanished blobs.
111
///
112
/// _Corrupted_ blobs are also possible.  They can happen on FS corruption,
113
/// or on somebody messing around with the cache directory manually.
114
/// We do not attempt to tolerate corrupted blobs.
115
///
116
/// ## On trade-offs
117
///
118
/// TODO: The practices described above are more likely
119
/// to create _orphaned_ blobs than _vanished_ blobs.
120
/// We initially made this trade-off decision on the mistaken theory
121
/// that we could avoid vanished blobs entirely.
122
/// We _may_ want to revisit this choice,
123
/// on the rationale that we can respond to vanished blobs as soon as we notice they're gone,
124
/// whereas we can only handle orphaned blobs with a periodic cleanup.
125
/// On the other hand, since we need to handle both cases,
126
/// it may not matter very much in practice.
127
#[allow(unused)]
128
mod blob_consistency {}
129

            
130
/// Specific error returned when a blob will not be read.
131
///
132
/// This error is an internal type: it's never returned to the user.
133
#[derive(Debug)]
134
enum AbsentBlob {
135
    /// We did not find a blob file on the disk.
136
    VanishedFile,
137
    /// We did not even find a blob to read in ExtDocs.
138
    NothingToRead,
139
}
140

            
141
impl SqliteStore {
142
    /// Construct or open a new SqliteStore at some location on disk.
143
    /// The provided location must be a directory, or a possible
144
    /// location for a directory: the directory will be created if
145
    /// necessary.
146
    ///
147
    /// If readonly is true, the result will be a read-only store.
148
    /// Otherwise, when readonly is false, the result may be
149
    /// read-only or read-write, depending on whether we can acquire
150
    /// the lock.
151
    ///
152
    /// # Limitations:
153
    ///
154
    /// The file locking that we use to ensure that only one dirmgr is
155
    /// writing to a given storage directory at a time is currently
156
    /// _per process_. Therefore, you might get unexpected results if
157
    /// two SqliteStores are created in the same process with the
158
    /// path.
159
166
    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
160
166
        path: P,
161
166
        mistrust: &fs_mistrust::Mistrust,
162
166
        mut readonly: bool,
163
166
    ) -> Result<Self> {
164
166
        let path = path.as_ref();
165
166
        let sqlpath = path.join("dir.sqlite3");
166
166
        let blobpath = path.join("dir_blobs/");
167
166
        let lockpath = path.join("dir.lock");
168
166

            
169
166
        let verifier = mistrust.verifier().permit_readable().check_content();
170

            
171
166
        let blob_dir = if readonly {
172
4
            verifier.secure_dir(blobpath)?
173
        } else {
174
162
            verifier.make_secure_dir(blobpath)?
175
        };
176

            
177
        // Check permissions on the sqlite and lock files; don't require them to
178
        // exist.
179
328
        for p in [&lockpath, &sqlpath] {
180
328
            match mistrust
181
328
                .verifier()
182
328
                .permit_readable()
183
328
                .require_file()
184
328
                .check(p)
185
            {
186
328
                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
187
                Err(e) => return Err(e.into()),
188
            }
189
        }
190

            
191
164
        let mut lockfile = fslock::LockFile::open(&lockpath).map_err(Error::from_lockfile)?;
192
164
        if !readonly && !lockfile.try_lock().map_err(Error::from_lockfile)? {
193
            readonly = true; // we couldn't get the lock!
194
164
        };
195
164
        let flags = if readonly {
196
2
            OpenFlags::SQLITE_OPEN_READ_ONLY
197
        } else {
198
162
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
199
        };
200
164
        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
201
164
        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
202
164
        store.sql_path = Some(sqlpath);
203
164
        store.lockfile = Some(lockfile);
204
164
        Ok(store)
205
166
    }
206

            
207
    /// Construct a new SqliteStore from a database connection and a location
208
    /// for blob files.
209
    ///
210
    /// Used for testing with a memory-backed database.
211
    ///
212
    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
213
    /// this database, since we will freely remove unreferenced files from this directory.
214
    #[cfg(test)]
215
44
    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
216
44
        Self::from_conn_internal(conn, blob_dir, false)
217
44
    }
218

            
219
    /// Construct a new SqliteStore from a database connection and a location
220
    /// for blob files.
221
    ///
222
    /// The `readonly` argument specifies whether the database connection should be read-only.
223
208
    fn from_conn_internal(
224
208
        conn: rusqlite::Connection,
225
208
        blob_dir: CheckedDir,
226
208
        readonly: bool,
227
208
    ) -> Result<Self> {
228
208
        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
229
208
        // pragma on the connection.
230
208
        conn.pragma_update(None, "foreign_keys", "ON")?;
231

            
232
208
        let mut result = SqliteStore {
233
208
            conn,
234
208
            blob_dir,
235
208
            lockfile: None,
236
208
            sql_path: None,
237
208
        };
238
208

            
239
208
        result.check_schema(readonly)?;
240

            
241
206
        Ok(result)
242
208
    }
243

            
244
    /// Check whether this database has a schema format we can read, and
245
    /// install or upgrade the schema if necessary.
246
208
    fn check_schema(&mut self, readonly: bool) -> Result<()> {
247
208
        let tx = self.conn.transaction()?;
248
208
        let db_n_tables: u32 = tx.query_row(
249
208
            "SELECT COUNT(name) FROM sqlite_master
250
208
             WHERE type='table'
251
208
             AND name NOT LIKE 'sqlite_%'",
252
208
            [],
253
246
            |row| row.get(0),
254
208
        )?;
255
208
        let db_exists = db_n_tables > 0;
256
208

            
257
208
        // Update the schema from current_vsn to the latest (does not commit)
258
242
        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
259
600
            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
260
600
                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
261
600
                let new_vsn = from_vsn + 1;
262
600
                if current_vsn < new_vsn {
263
600
                    tx.execute_batch(update)?;
264
600
                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
265
                }
266
            }
267
200
            Ok::<_, Error>(())
268
200
        };
269

            
270
208
        if !db_exists {
271
200
            if !readonly {
272
200
                tx.execute_batch(INSTALL_V0_SCHEMA)?;
273
200
                update_schema(&tx, 0)?;
274
200
                tx.commit()?;
275
            } else {
276
                // The other process should have created the database!
277
                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
278
            }
279
200
            return Ok(());
280
8
        }
281

            
282
8
        let (version, readable_by): (u32, u32) = tx.query_row(
283
8
            "SELECT version, readable_by FROM TorSchemaMeta
284
8
             WHERE name = 'TorDirStorage'",
285
8
            [],
286
12
            |row| Ok((row.get(0)?, row.get(1)?)),
287
8
        )?;
288

            
289
8
        if version < SCHEMA_VERSION {
290
            if !readonly {
291
                update_schema(&tx, version)?;
292
                tx.commit()?;
293
            } else {
294
                return Err(Error::ReadOnlyStorage(
295
                    ReadOnlyStorageError::IncompatibleSchema {
296
                        schema: version,
297
                        supported: SCHEMA_VERSION,
298
                    },
299
                ));
300
            }
301

            
302
            return Ok(());
303
8
        } else if readable_by > SCHEMA_VERSION {
304
2
            return Err(Error::UnrecognizedSchema {
305
2
                schema: readable_by,
306
2
                supported: SCHEMA_VERSION,
307
2
            });
308
6
        }
309
6

            
310
6
        // rolls back the transaction, but nothing was done.
311
6
        Ok(())
312
208
    }
313

            
314
    /// Read a blob from disk, mapping it if possible.
315
    ///
316
    /// Return `Ok(Err(.))` if the file for the blob was not found on disk;
317
    /// returns an error in other cases.
318
    ///
319
    /// (See [`blob_consistency`] for information on why the blob might be absent.)
320
22
    fn read_blob(&self, path: &str) -> Result<StdResult<InputString, AbsentBlob>> {
321
22
        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
322
22
            Ok(file) => file,
323
            Err(fs_mistrust::Error::NotFound(_)) => {
324
                warn!(
325
                    "{:?} was listed in the database, but its corresponding file had been deleted",
326
                    path
327
                );
328
                return Ok(Err(AbsentBlob::VanishedFile));
329
            }
330
            Err(e) => return Err(e.into()),
331
        };
332

            
333
22
        InputString::load(file)
334
22
            .map_err(|err| Error::CacheFile {
335
                action: "loading",
336
                fname: PathBuf::from(path),
337
                error: Arc::new(err),
338
22
            })
339
22
            .map(Ok)
340
22
    }
341

            
342
    /// Write a file to disk as a blob, and record it in the ExtDocs table.
343
    ///
344
    /// Return a SavedBlobHandle that describes where the blob is, and which
345
    /// can be used either to commit the blob or delete it.
346
    ///
347
    /// See [`blob_consistency`] for more information on guarantees.
348
34
    fn save_blob_internal(
349
34
        &mut self,
350
34
        contents: &[u8],
351
34
        doctype: &str,
352
34
        digest_type: &str,
353
34
        digest: &[u8],
354
34
        expires: OffsetDateTime,
355
34
    ) -> Result<blob_handle::SavedBlobHandle<'_>> {
356
34
        let digest = hex::encode(digest);
357
34
        let digeststr = format!("{}-{}", digest_type, digest);
358
34
        let fname = format!("{}_{}", doctype, digeststr);
359

            
360
34
        let full_path = self.blob_dir.join(&fname)?;
361
34
        let unlinker = blob_handle::Unlinker::new(&full_path);
362
34
        self.blob_dir
363
34
            .write_and_replace(&fname, contents)
364
34
            .map_err(|e| match e {
365
                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
366
                    action: "saving",
367
                    fname: full_path,
368
                    error: err,
369
                },
370
                err => err.into(),
371
34
            })?;
372

            
373
34
        let tx = self.conn.unchecked_transaction()?;
374
34
        tx.execute(INSERT_EXTDOC, params![digeststr, expires, doctype, fname])?;
375

            
376
34
        Ok(blob_handle::SavedBlobHandle::new(
377
34
            tx, fname, digeststr, unlinker,
378
34
        ))
379
34
    }
380

            
381
    /// As `latest_consensus`, but do not retry.
382
16
    fn latest_consensus_internal(
383
16
        &self,
384
16
        flavor: ConsensusFlavor,
385
16
        pending: Option<bool>,
386
16
    ) -> Result<StdResult<InputString, AbsentBlob>> {
387
16
        trace!(?flavor, ?pending, "Loading latest consensus from cache");
388
16
        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
389
12
            None => self
390
12
                .conn
391
16
                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
392
12
                .optional()?,
393
4
            Some(pending_val) => self
394
4
                .conn
395
4
                .query_row(
396
4
                    FIND_CONSENSUS_P,
397
4
                    params![pending_val, flavor.name()],
398
5
                    |row| row.try_into(),
399
4
                )
400
4
                .optional()?,
401
        };
402

            
403
16
        if let Some((_va, _vu, filename)) = rv {
404
            // TODO blobs: If the cache is inconsistent (because this blob is _vanished_), and the cache has not yet
405
            // been cleaned, this may fail to find the latest consensus that we actually have.
406
10
            self.read_blob(&filename)
407
        } else {
408
6
            Ok(Err(AbsentBlob::NothingToRead))
409
        }
410
16
    }
411

            
412
    /// Save a blob to disk and commit it.
413
    #[cfg(test)]
414
24
    fn save_blob(
415
24
        &mut self,
416
24
        contents: &[u8],
417
24
        doctype: &str,
418
24
        digest_type: &str,
419
24
        digest: &[u8],
420
24
        expires: OffsetDateTime,
421
24
    ) -> Result<String> {
422
24
        let h = self.save_blob_internal(contents, doctype, digest_type, digest, expires)?;
423
24
        let fname = h.fname().to_string();
424
24
        h.commit()?;
425
24
        Ok(fname)
426
24
    }
427

            
428
    /// Return the valid-after time for the latest non non-pending consensus,
429
    #[cfg(test)]
430
    // We should revise the tests to use latest_consensus_meta instead.
431
6
    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
432
6
        Ok(self
433
6
            .latest_consensus_meta(flavor)?
434
7
            .map(|m| m.lifetime().valid_after().into()))
435
6
    }
436

            
437
    /// Remove the blob with name `fname`, but do not give an error on failure.
438
    ///
439
    /// See [`blob_consistency`]: we should call this only having first ensured
440
    /// that the blob is removed from the ExtDocs table.
441
2
    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
442
2
        let fname = fname.as_ref();
443
2
        if let Err(e) = self.blob_dir.remove_file(fname) {
444
            warn_report!(e, "Unable to remove {}", fname.display_lossy());
445
2
        }
446
2
    }
447

            
448
    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
449
    ///
450
    /// There shouldn't typically be any, but we don't want to let our cache grow infinitely
451
    /// if we have a bug.
452
10
    fn remove_unreferenced_blobs(
453
10
        &self,
454
10
        now: OffsetDateTime,
455
10
        expiration: &ExpirationConfig,
456
10
    ) -> Result<()> {
457
        // Now, look for any unreferenced blobs that are a bit old.
458
10
        for ent in self.blob_dir.read_directory(".")?.flatten() {
459
8
            let md_error = |io_error| Error::CacheFile {
460
                action: "getting metadata",
461
                fname: ent.file_name().into(),
462
                error: Arc::new(io_error),
463
            };
464
8
            if ent
465
8
                .metadata()
466
8
                .map_err(md_error)?
467
8
                .modified()
468
8
                .map_err(md_error)?
469
8
                + expiration.consensuses
470
8
                >= now
471
            {
472
                // this file is sufficiently recent that we should not remove it, just to be cautious.
473
6
                continue;
474
2
            }
475
2
            let filename = match ent.file_name().into_string() {
476
2
                Ok(s) => s,
477
                Err(os_str) => {
478
                    // This filename wasn't utf-8.  We will never create one of these.
479
                    warn!(
480
                        "Removing bizarre file '{}' from blob store.",
481
                        os_str.to_string_lossy()
482
                    );
483
                    self.remove_blob_or_warn(ent.file_name());
484
                    continue;
485
                }
486
            };
487
2
            let found: (u32,) =
488
2
                self.conn
489
3
                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
490
2
                        row.try_into()
491
3
                    })?;
492
2
            if found == (0,) {
493
2
                warn!("Removing unreferenced file '{}' from blob store", &filename);
494
2
                self.remove_blob_or_warn(ent.file_name());
495
            }
496
        }
497

            
498
10
        Ok(())
499
10
    }
500

            
501
    /// Remove any entry in the ExtDocs table for which a blob file is vanished.
502
    ///
503
    /// This method is `O(n)` in the size of the ExtDocs table and the size of the directory.
504
    /// It doesn't take self, to avoid problems with the borrow checker.
505
2
    fn remove_entries_for_vanished_blobs<'a>(
506
2
        blob_dir: &CheckedDir,
507
2
        tx: &Transaction<'a>,
508
2
    ) -> Result<usize> {
509
2
        let in_directory: HashSet<PathBuf> = blob_dir
510
2
            .read_directory(".")?
511
2
            .flatten()
512
9
            .map(|dir_entry| PathBuf::from(dir_entry.file_name()))
513
2
            .collect();
514
2
        let in_db: Vec<String> = tx
515
2
            .prepare(FIND_ALL_EXTDOC_FILENAMES)?
516
17
            .query_map([], |row| row.get::<_, String>(0))?
517
2
            .collect::<StdResult<Vec<String>, _>>()?;
518

            
519
2
        let mut n_removed = 0;
520
18
        for fname in in_db {
521
16
            if in_directory.contains(Path::new(&fname)) {
522
                // The blob is present; great!
523
8
                continue;
524
8
            }
525
8

            
526
8
            n_removed += tx.execute(DELETE_EXTDOC_BY_FILENAME, [fname])?;
527
        }
528

            
529
2
        Ok(n_removed)
530
2
    }
531
}
532

            
533
impl Store for SqliteStore {
534
38
    fn is_readonly(&self) -> bool {
535
38
        match &self.lockfile {
536
14
            Some(f) => !f.owns_lock(),
537
24
            None => false,
538
        }
539
38
    }
540
4
    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
541
4
        if self.is_readonly() && self.sql_path.is_some() {
542
2
            let lf = self
543
2
                .lockfile
544
2
                .as_mut()
545
2
                .expect("No lockfile open; cannot upgrade to read-write storage");
546
2
            if !lf.try_lock().map_err(Error::from_lockfile)? {
547
                // Somebody else has the lock.
548
                return Ok(false);
549
2
            }
550
2
            // Unwrap should be safe due to parent `.is_some()` check
551
2
            #[allow(clippy::unwrap_used)]
552
2
            match rusqlite::Connection::open(self.sql_path.as_ref().unwrap()) {
553
2
                Ok(conn) => {
554
2
                    self.conn = conn;
555
2
                }
556
                Err(e) => {
557
                    if let Err(e2) = lf.unlock() {
558
                        warn_report!(
559
                            e2,
560
                            "Unable to release lock file while upgrading DB to read/write"
561
                        );
562
                    }
563
                    return Err(e.into());
564
                }
565
            }
566
2
        }
567
4
        Ok(true)
568
4
    }
569
8
    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
570
8
        let tx = self.conn.transaction()?;
571
        // This works around a false positive; see
572
        //   https://github.com/rust-lang/rust-clippy/issues/8114
573
        #[allow(clippy::let_and_return)]
574
8
        let expired_blobs: Vec<String> = {
575
8
            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
576
8
            let names: Vec<String> = stmt
577
9
                .query_map([], |row| row.get::<_, String>(0))?
578
8
                .collect::<StdResult<Vec<String>, _>>()?;
579
8
            names
580
8
        };
581
8

            
582
8
        let now = OffsetDateTime::now_utc();
583
8
        tx.execute(DROP_OLD_EXTDOCS, [])?;
584

            
585
        // In theory bad system clocks might generate table rows with times far in the future.
586
        // However, for data which is cached here which comes from the network consensus,
587
        // we rely on the fact that no consensus from the future exists, so this can't happen.
588
8
        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
589
8
        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
590
8
        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
591
8
        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
592

            
593
        // Bridge descriptors come from bridges and bridges might send crazy times,
594
        // so we need to discard any that look like they are from the future,
595
        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
596
        #[cfg(feature = "bridge-client")]
597
8
        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
598

            
599
        // Find all consensus blobs that are no longer referenced,
600
        // and delete their entries from extdocs.
601
8
        let remove_consensus_blobs = {
602
            // TODO: This query can be O(n); but that won't matter for clients.
603
            // For relays, we may want to add an index to speed it up, if we use this code there too.
604
8
            let mut stmt = tx.prepare(FIND_UNREFERENCED_CONSENSUS_EXTDOCS)?;
605
8
            let filenames: Vec<String> = stmt
606
9
                .query_map([], |row| row.get::<_, String>(0))?
607
8
                .collect::<StdResult<Vec<String>, _>>()?;
608
8
            drop(stmt);
609
8
            let mut stmt = tx.prepare(DELETE_EXTDOC_BY_FILENAME)?;
610
8
            for fname in filenames.iter() {
611
2
                stmt.execute([fname])?;
612
            }
613
8
            filenames
614
8
        };
615
8

            
616
8
        tx.commit()?;
617
        // Now that the transaction has been committed, these blobs are
618
        // unreferenced in the ExtDocs table, and we can remove them from disk.
619
8
        let mut remove_blob_files: HashSet<_> = expired_blobs.iter().collect();
620
8
        remove_blob_files.extend(remove_consensus_blobs.iter());
621

            
622
12
        for name in remove_blob_files {
623
4
            let fname = self.blob_dir.join(name);
624
4
            if let Ok(fname) = fname {
625
4
                if let Err(e) = std::fs::remove_file(&fname) {
626
                    warn_report!(
627
                        e,
628
                        "Couldn't remove orphaned blob file {}",
629
                        fname.display_lossy()
630
                    );
631
4
                }
632
            }
633
        }
634

            
635
8
        self.remove_unreferenced_blobs(now, expiration)?;
636

            
637
8
        Ok(())
638
8
    }
639

            
640
    // Note: We cannot, and do not, call this function when a transaction already exists.
641
16
    fn latest_consensus(
642
16
        &self,
643
16
        flavor: ConsensusFlavor,
644
16
        pending: Option<bool>,
645
16
    ) -> Result<Option<InputString>> {
646
16
        match self.latest_consensus_internal(flavor, pending)? {
647
10
            Ok(s) => return Ok(Some(s)),
648
6
            Err(AbsentBlob::NothingToRead) => return Ok(None),
649
            Err(AbsentBlob::VanishedFile) => {
650
                // If we get here, the file was vanished.  Clean up the DB and try again.
651
            }
652
        }
653

            
654
        // We use unchecked_transaction() here because this API takes a non-mutable `SqliteStore`.
655
        // `unchecked_transaction()` will give an error if it is used
656
        // when a transaction already exists.
657
        // That's fine: We don't call this function from inside this module,
658
        // when a transaction might exist,
659
        // and we can't call multiple SqliteStore functions at once: it isn't sync.
660
        // Here we enforce that:
661
        static_assertions::assert_not_impl_any!(SqliteStore: Sync);
662

            
663
        // If we decide that this is unacceptable,
664
        // then since sqlite doesn't really support concurrent use of a connection,
665
        // we _could_ change the Store::latest_consensus API take &mut self,
666
        // or we could add a mutex,
667
        // or we could just not use a transaction object.
668
        let tx = self.conn.unchecked_transaction()?;
669
        Self::remove_entries_for_vanished_blobs(&self.blob_dir, &tx)?;
670
        tx.commit()?;
671

            
672
        match self.latest_consensus_internal(flavor, pending)? {
673
            Ok(s) => Ok(Some(s)),
674
            Err(AbsentBlob::NothingToRead) => Ok(None),
675
            Err(AbsentBlob::VanishedFile) => {
676
                warn!("Somehow remove_entries_for_vanished_blobs didn't resolve a VanishedFile");
677
                Ok(None)
678
            }
679
        }
680
16
    }
681

            
682
14
    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
683
14
        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
684
14
        let mut rows = stmt.query(params![flavor.name()])?;
685
14
        if let Some(row) = rows.next()? {
686
6
            Ok(Some(cmeta_from_row(row)?))
687
        } else {
688
8
            Ok(None)
689
        }
690
14
    }
691
    #[cfg(test)]
692
4
    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
693
2
        if let Some((text, _)) =
694
4
            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
695
        {
696
2
            Ok(text)
697
        } else {
698
2
            Err(Error::CacheCorruption(
699
2
                "couldn't find a consensus we thought we had.",
700
2
            ))
701
        }
702
4
    }
703
16
    fn consensus_by_sha3_digest_of_signed_part(
704
16
        &self,
705
16
        d: &[u8; 32],
706
16
    ) -> Result<Option<(InputString, ConsensusMeta)>> {
707
16
        let digest = hex::encode(d);
708
16
        let mut stmt = self
709
16
            .conn
710
16
            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
711
16
        let mut rows = stmt.query(params![digest])?;
712
16
        if let Some(row) = rows.next()? {
713
10
            let meta = cmeta_from_row(row)?;
714
10
            let fname: String = row.get(5)?;
715
10
            if let Ok(text) = self.read_blob(&fname)? {
716
10
                return Ok(Some((text, meta)));
717
            }
718
6
        }
719
6
        Ok(None)
720
16
    }
721
10
    fn store_consensus(
722
10
        &mut self,
723
10
        cmeta: &ConsensusMeta,
724
10
        flavor: ConsensusFlavor,
725
10
        pending: bool,
726
10
        contents: &str,
727
10
    ) -> Result<()> {
728
10
        let lifetime = cmeta.lifetime();
729
10
        let sha3_of_signed = cmeta.sha3_256_of_signed();
730
10
        let sha3_of_whole = cmeta.sha3_256_of_whole();
731
10
        let valid_after: OffsetDateTime = lifetime.valid_after().into();
732
10
        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
733
10
        let valid_until: OffsetDateTime = lifetime.valid_until().into();
734

            
735
        /// How long to keep a consensus around after it has expired
736
        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
737

            
738
        // After a few days have passed, a consensus is no good for
739
        // anything at all, not even diffs.
740
10
        let expires = valid_until + CONSENSUS_LIFETIME;
741
10

            
742
10
        let doctype = format!("con_{}", flavor.name());
743

            
744
10
        let h = self.save_blob_internal(
745
10
            contents.as_bytes(),
746
10
            &doctype,
747
10
            "sha3-256",
748
10
            &sha3_of_whole[..],
749
10
            expires,
750
10
        )?;
751
10
        h.tx().execute(
752
10
            INSERT_CONSENSUS,
753
10
            params![
754
10
                valid_after,
755
10
                fresh_until,
756
10
                valid_until,
757
10
                flavor.name(),
758
10
                pending,
759
10
                hex::encode(sha3_of_signed),
760
10
                h.digest_string()
761
10
            ],
762
10
        )?;
763
10
        h.commit()?;
764
10
        Ok(())
765
10
    }
766
2
    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
767
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
768
2
        let digest = format!("sha3-256-{}", d);
769

            
770
2
        let tx = self.conn.transaction()?;
771
2
        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
772
2
        trace!("Marked {} consensuses usable", n);
773
2
        tx.commit()?;
774

            
775
2
        Ok(())
776
2
    }
777
2
    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
778
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
779
2
        let digest = format!("sha3-256-{}", d);
780

            
781
        // TODO: We should probably remove the blob as well, but for now
782
        // this is enough.
783
2
        let tx = self.conn.transaction()?;
784
2
        tx.execute(REMOVE_CONSENSUS, params![digest])?;
785
2
        tx.commit()?;
786

            
787
2
        Ok(())
788
2
    }
789

            
790
8
    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
791
8
        let mut result = HashMap::new();
792
        // TODO(nickm): Do I need to get a transaction here for performance?
793
8
        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
794

            
795
18
        for ids in certs {
796
10
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
797
10
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
798
10
            if let Some(contents) = stmt
799
13
                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
800
10
                .optional()?
801
6
            {
802
6
                result.insert(*ids, contents);
803
6
            }
804
        }
805

            
806
8
        Ok(result)
807
8
    }
808
6
    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
809
6
        let tx = self.conn.transaction()?;
810
6
        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
811
14
        for (meta, content) in certs {
812
8
            let ids = meta.key_ids();
813
8
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
814
8
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
815
8
            let published: OffsetDateTime = meta.published().into();
816
8
            let expires: OffsetDateTime = meta.expires().into();
817
8
            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
818
        }
819
6
        stmt.finalize()?;
820
6
        tx.commit()?;
821
6
        Ok(())
822
6
    }
823

            
824
24
    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
825
24
        let mut result = HashMap::new();
826
24
        let mut stmt = self.conn.prepare(FIND_MD)?;
827

            
828
        // TODO(nickm): Should I speed this up with a transaction, or
829
        // does it not matter for queries?
830
82
        for md_digest in digests {
831
58
            let h_digest = hex::encode(md_digest);
832
58
            if let Some(contents) = stmt
833
80
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
834
58
                .optional()?
835
44
            {
836
44
                result.insert(*md_digest, contents);
837
44
            }
838
        }
839

            
840
24
        Ok(result)
841
24
    }
842
22
    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
843
22
        let when: OffsetDateTime = when.into();
844

            
845
22
        let tx = self.conn.transaction()?;
846
22
        let mut stmt = tx.prepare(INSERT_MD)?;
847

            
848
56
        for (content, md_digest) in digests {
849
34
            let h_digest = hex::encode(md_digest);
850
34
            stmt.execute(params![h_digest, when, content])?;
851
        }
852
22
        stmt.finalize()?;
853
22
        tx.commit()?;
854
22
        Ok(())
855
22
    }
856
4
    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
857
4
        let tx = self.conn.transaction()?;
858
4
        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
859
4
        let when: OffsetDateTime = when.into();
860

            
861
8
        for md_digest in digests {
862
4
            let h_digest = hex::encode(md_digest);
863
4
            stmt.execute(params![when, h_digest])?;
864
        }
865

            
866
4
        stmt.finalize()?;
867
4
        tx.commit()?;
868
4
        Ok(())
869
4
    }
870

            
871
    #[cfg(feature = "routerdesc")]
872
6
    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
873
6
        let mut result = HashMap::new();
874
6
        let mut stmt = self.conn.prepare(FIND_RD)?;
875

            
876
        // TODO(nickm): Should I speed this up with a transaction, or
877
        // does it not matter for queries?
878
20
        for rd_digest in digests {
879
14
            let h_digest = hex::encode(rd_digest);
880
14
            if let Some(contents) = stmt
881
18
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
882
14
                .optional()?
883
8
            {
884
8
                result.insert(*rd_digest, contents);
885
8
            }
886
        }
887

            
888
6
        Ok(result)
889
6
    }
890
    #[cfg(feature = "routerdesc")]
891
4
    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
892
4
        let tx = self.conn.transaction()?;
893
4
        let mut stmt = tx.prepare(INSERT_RD)?;
894

            
895
14
        for (content, when, rd_digest) in digests {
896
10
            let when: OffsetDateTime = (*when).into();
897
10
            let h_digest = hex::encode(rd_digest);
898
10
            stmt.execute(params![h_digest, when, content])?;
899
        }
900
4
        stmt.finalize()?;
901
4
        tx.commit()?;
902
4
        Ok(())
903
4
    }
904

            
905
    #[cfg(feature = "bridge-client")]
906
92
    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
907
92
        let bridge_line = bridge.to_string();
908
92
        Ok(self
909
92
            .conn
910
100
            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
911
16
                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
912
16
                let fetched = fetched.into();
913
16
                Ok(CachedBridgeDescriptor { fetched, document })
914
100
            })
915
92
            .optional()?)
916
92
    }
917

            
918
    #[cfg(feature = "bridge-client")]
919
24
    fn store_bridgedesc(
920
24
        &mut self,
921
24
        bridge: &BridgeConfig,
922
24
        entry: CachedBridgeDescriptor,
923
24
        until: SystemTime,
924
24
    ) -> Result<()> {
925
24
        if self.is_readonly() {
926
            // Hopefully whoever *does* have the lock will update the cache.
927
            // Otherwise it will contain a stale entry forever
928
            // (which we'll ignore, but waste effort on).
929
            return Ok(());
930
24
        }
931
24
        let bridge_line = bridge.to_string();
932
24
        let row = params![
933
24
            bridge_line,
934
24
            OffsetDateTime::from(entry.fetched),
935
24
            OffsetDateTime::from(until),
936
24
            entry.document,
937
24
        ];
938
24
        self.conn.execute(INSERT_BRIDGEDESC, row)?;
939
24
        Ok(())
940
24
    }
941

            
942
    #[cfg(feature = "bridge-client")]
943
    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
944
        if self.is_readonly() {
945
            // This is called when we find corrupted or stale cache entries,
946
            // to stop us wasting time on them next time.
947
            // Hopefully whoever *does* have the lock will do this.
948
            return Ok(());
949
        }
950
        let bridge_line = bridge.to_string();
951
        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
952
        Ok(())
953
    }
954

            
955
4
    fn update_protocol_recommendations(
956
4
        &mut self,
957
4
        valid_after: SystemTime,
958
4
        protocols: &tor_netdoc::doc::netstatus::ProtoStatuses,
959
4
    ) -> Result<()> {
960
4
        let json =
961
4
            serde_json::to_string(&protocols).map_err(into_internal!("Cannot encode protocols"))?;
962
4
        let params = params![OffsetDateTime::from(valid_after), json];
963
4
        self.conn.execute(UPDATE_PROTOCOL_STATUS, params)?;
964
4
        Ok(())
965
4
    }
966

            
967
160
    fn cached_protocol_recommendations(
968
160
        &self,
969
160
    ) -> Result<Option<(SystemTime, tor_netdoc::doc::netstatus::ProtoStatuses)>> {
970
160
        let opt_row: Option<(OffsetDateTime, String)> = self
971
160
            .conn
972
162
            .query_row(FIND_LATEST_PROTOCOL_STATUS, [], |row| {
973
4
                Ok((row.get(0)?, row.get(1)?))
974
162
            })
975
160
            .optional()?;
976

            
977
160
        let (date, json) = match opt_row {
978
4
            Some(v) => v,
979
156
            None => return Ok(None),
980
        };
981

            
982
4
        let date = date.into();
983
4
        let statuses: tor_netdoc::doc::netstatus::ProtoStatuses =
984
4
            serde_json::from_str(json.as_str()).map_err(|e| Error::BadJsonInCache(Arc::new(e)))?;
985

            
986
4
        Ok(Some((date, statuses)))
987
160
    }
988
}
989

            
990
/// Functionality related to uncommitted blobs.
991
mod blob_handle {
992
    use std::path::{Path, PathBuf};
993

            
994
    use crate::Result;
995
    use rusqlite::Transaction;
996
    use tor_basic_utils::PathExt as _;
997
    use tor_error::warn_report;
998

            
999
    /// Handle to a blob that we have saved to disk but
    /// not yet committed to
    /// the database, and the database transaction where we added a reference to it.
    ///
    /// Used to either commit the blob (by calling [`SavedBlobHandle::commit`]),
    /// or roll it back (by dropping the [`SavedBlobHandle`] without committing it.)
    #[must_use]
    pub(super) struct SavedBlobHandle<'a> {
        /// Transaction we're using to add the blob to the ExtDocs table.
        ///
        /// Note that struct fields are dropped in declaration order,
        /// so when we drop an uncommitted SavedBlobHandle,
        /// we roll back the transaction before we delete the file.
        /// (In practice, either order would be fine.)
        tx: Transaction<'a>,
        /// Filename for the file, with respect to the blob directory.
        fname: String,
        /// Declared digest string for this blob. Of the format
        /// "digesttype-hexstr".
        digeststr: String,
        /// An 'unlinker' for the blob file.
        unlinker: Unlinker,
    }
    impl<'a> SavedBlobHandle<'a> {
        /// Construct a SavedBlobHandle from its parts.
34
        pub(super) fn new(
34
            tx: Transaction<'a>,
34
            fname: String,
34
            digeststr: String,
34
            unlinker: Unlinker,
34
        ) -> Self {
34
            Self {
34
                tx,
34
                fname,
34
                digeststr,
34
                unlinker,
34
            }
34
        }
        /// Return a reference to the underlying database transaction.
10
        pub(super) fn tx(&self) -> &Transaction<'a> {
10
            &self.tx
10
        }
        /// Return the digest string of the saved blob.
        /// Other tables use this as a foreign key into ExtDocs.digest
10
        pub(super) fn digest_string(&self) -> &str {
10
            self.digeststr.as_ref()
10
        }
        /// Return the filename of this blob within the blob directory.
        #[allow(unused)] // used for testing.
24
        pub(super) fn fname(&self) -> &str {
24
            self.fname.as_ref()
24
        }
        /// Commit the relevant database transaction.
34
        pub(super) fn commit(self) -> Result<()> {
34
            // The blob has been written to disk, so it is safe to
34
            // commit the transaction.
34
            // If the commit returns an error, self.unlinker will remove the blob.
34
            // (This could result in a vanished blob if the commit reports an error,
34
            // but the transaction is still visible in the database.)
34
            self.tx.commit()?;
            // If we reach this point, we don't want to remove the file.
34
            self.unlinker.forget();
34
            Ok(())
34
        }
    }
    /// Handle to a file which we might have to delete.
    ///
    /// When this handle is dropped, the file gets deleted, unless you have
    /// first called [`Unlinker::forget`].
    pub(super) struct Unlinker {
        /// The location of the file to remove, or None if we shouldn't
        /// remove it.
        p: Option<PathBuf>,
    }
    impl Unlinker {
        /// Make a new Unlinker for a given filename.
34
        pub(super) fn new<P: AsRef<Path>>(p: P) -> Self {
34
            Unlinker {
34
                p: Some(p.as_ref().to_path_buf()),
34
            }
34
        }
        /// Forget about this unlinker, so that the corresponding file won't
        /// get dropped.
34
        fn forget(mut self) {
34
            self.p = None;
34
        }
    }
    impl Drop for Unlinker {
34
        fn drop(&mut self) {
34
            if let Some(p) = self.p.take() {
                if let Err(e) = std::fs::remove_file(&p) {
                    warn_report!(
                        e,
                        "Couldn't remove rolled-back blob file {}",
                        p.display_lossy()
                    );
                }
34
            }
34
        }
    }
}
/// Convert a hexadecimal sha3-256 digest from the database into an array.
32
fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
32
    let mut bytes = [0_u8; 32];
32
    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
32
    Ok(bytes)
32
}
/// Convert a hexadecimal sha3-256 "digest string" as used in the
/// digest column from the database into an array.
16
fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
16
    if let Some(stripped) = s.strip_prefix("sha3-256-") {
16
        digest_from_hex(stripped)
    } else {
        Err(Error::CacheCorruption("Invalid digest in database"))
    }
16
}
/// Create a ConsensusMeta from a `Row` returned by one of
/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
16
fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
16
    let va: OffsetDateTime = row.get(0)?;
16
    let fu: OffsetDateTime = row.get(1)?;
16
    let vu: OffsetDateTime = row.get(2)?;
16
    let d_signed: String = row.get(3)?;
16
    let d_all: String = row.get(4)?;
16
    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
16
        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
    Ok(ConsensusMeta::new(
16
        lifetime,
16
        digest_from_hex(&d_signed)?,
16
        digest_from_dstr(&d_all)?,
    ))
16
}
/// Set up the tables for the arti cache schema in a sqlite database.
const INSTALL_V0_SCHEMA: &str = "
  -- Helps us version the schema.  The schema here corresponds to a
  -- version number called 'version', and it should be readable by
  -- anybody who is compliant with versions of at least 'readable_by'.
  CREATE TABLE TorSchemaMeta (
     name TEXT NOT NULL PRIMARY KEY,
     version INTEGER NOT NULL,
     readable_by INTEGER NOT NULL
  );
  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
  -- Keeps track of external blobs on disk.
  CREATE TABLE ExtDocs (
    -- Records a digest of the file contents, in the form '<digest_type>-hexstr'
    digest TEXT PRIMARY KEY NOT NULL,
    -- When was this file created?
    created DATE NOT NULL,
    -- After what time will this file definitely be useless?
    expires DATE NOT NULL,
    -- What is the type of this file? Currently supported are 'con_<flavor>'.
    --   (Before tor-dirmgr ~0.28.0, we would erroneously record 'con_flavor' as 'sha3-256';
    --   Nothing depended on this yet, but will be used in the future
    --   as we add more large-document types.)
    type TEXT NOT NULL,
    -- Filename for this file within our blob directory.
    filename TEXT NOT NULL
  );
  -- All the microdescriptors we know about.
  CREATE TABLE Microdescs (
    sha256_digest TEXT PRIMARY KEY NOT NULL,
    last_listed DATE NOT NULL,
    contents BLOB NOT NULL
  );
  -- All the authority certificates we know.
  CREATE TABLE Authcerts (
    id_digest TEXT NOT NULL,
    sk_digest TEXT NOT NULL,
    published DATE NOT NULL,
    expires DATE NOT NULL,
    contents BLOB NOT NULL,
    PRIMARY KEY (id_digest, sk_digest)
  );
  -- All the consensuses we're storing.
  CREATE TABLE Consensuses (
    valid_after DATE NOT NULL,
    fresh_until DATE NOT NULL,
    valid_until DATE NOT NULL,
    flavor TEXT NOT NULL,
    pending BOOLEAN NOT NULL,
    sha3_of_signed_part TEXT NOT NULL,
    digest TEXT NOT NULL,
    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
  );
  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
";
/// Update the database schema, from each version to the next
const UPDATE_SCHEMA: &[&str] = &["
  -- Update the database schema from version 0 to version 1.
  CREATE TABLE RouterDescs (
    sha1_digest TEXT PRIMARY KEY NOT NULL,
    published DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
  -- Update the database schema from version 1 to version 2.
  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
  CREATE TABLE BridgeDescs (
    bridge_line TEXT PRIMARY KEY NOT NULL,
    fetched DATE NOT NULL,
    until DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
 -- Update the database schema from version 2 to version 3.
 -- Table to hold our latest ProtocolStatuses object, to tell us if we're obsolete.
 -- We hold this independently from our consensus,
 -- since we want to read it very early in our startup process,
 -- even if the consensus is expired.
 CREATE TABLE ProtocolStatus (
    -- Enforce that there is only one row in this table.
    -- (This is a bit kludgy, but I am assured that it is a common practice.)
    zero INTEGER PRIMARY KEY NOT NULL,
    -- valid-after date of the consensus from which we got this status
    date DATE NOT NULL,
    -- ProtoStatuses object, encoded as json
    statuses TEXT NOT NULL
 );
"];
/// Update the database schema version tracking, from each version to the next
const UPDATE_SCHEMA_VERSION: &str = "
  UPDATE TorSchemaMeta SET version=? WHERE version<?;
";
/// Version number used for this version of the arti cache schema.
const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
/// Query: find the latest-expiring microdesc consensus with a given
/// pending status.
const FIND_CONSENSUS_P: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE pending = ? AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: find the latest-expiring microdesc consensus, regardless of
/// pending status.
const FIND_CONSENSUS: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: Find the valid-after time for the latest-expiring
/// non-pending consensus of a given flavor.
const FIND_LATEST_CONSENSUS_META: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
  FROM Consensuses
  WHERE pending = 0 AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Look up a consensus by its digest-of-signed-part string.
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
  FROM Consensuses
  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
  WHERE Consensuses.sha3_of_signed_part = ?
  LIMIT 1;
";
/// Query: Update the consensus whose digest field is 'digest' to call it
/// no longer pending.
const MARK_CONSENSUS_NON_PENDING: &str = "
  UPDATE Consensuses
  SET pending = 0
  WHERE digest = ?;
";
/// Query: Remove the consensus with a given digest field.
#[allow(dead_code)]
const REMOVE_CONSENSUS: &str = "
  DELETE FROM Consensuses
  WHERE digest = ?;
";
/// Query: Find the authority certificate with given key digests.
const FIND_AUTHCERT: &str = "
  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
";
/// Query: find the microdescriptor with a given hex-encoded sha256 digest
const FIND_MD: &str = "
  SELECT contents
  FROM Microdescs
  WHERE sha256_digest = ?
";
/// Query: find the router descriptors with a given hex-encoded sha1 digest
#[cfg(feature = "routerdesc")]
const FIND_RD: &str = "
  SELECT contents
  FROM RouterDescs
  WHERE sha1_digest = ?
";
/// Query: find every ExtDocs member that has expired.
const FIND_EXPIRED_EXTDOCS: &str = "
  SELECT filename FROM ExtDocs where expires < datetime('now');
";
/// Query: find whether an ExtDoc is listed.
const COUNT_EXTDOC_BY_PATH: &str = "
  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
";
/// Query: Add a new entry to ExtDocs.
const INSERT_EXTDOC: &str = "
  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
  VALUES ( ?, datetime('now'), ?, ?, ? );
";
/// Query: Add a new consensus.
const INSERT_CONSENSUS: &str = "
  INSERT OR REPLACE INTO Consensuses
    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
  VALUES ( ?, ?, ?, ?, ?, ?, ? );
";
/// Query: Add a new AuthCert
const INSERT_AUTHCERT: &str = "
  INSERT OR REPLACE INTO Authcerts
    ( id_digest, sk_digest, published, expires, contents)
  VALUES ( ?, ?, ?, ?, ? );
";
/// Query: Add a new microdescriptor
const INSERT_MD: &str = "
  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Add a new router descriptor
#[allow(unused)]
#[cfg(feature = "routerdesc")]
const INSERT_RD: &str = "
  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Change the time when a given microdescriptor was last listed.
const UPDATE_MD_LISTED: &str = "
  UPDATE Microdescs
  SET last_listed = max(last_listed, ?)
  WHERE sha256_digest = ?;
";
/// Query: Find a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Record a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const INSERT_BRIDGEDESC: &str = "
  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
  VALUES ( ?, ?, ?, ? );
";
/// Query: Remove a cached bridge descriptor
#[cfg(feature = "bridge-client")]
#[allow(dead_code)]
const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Find all consensus extdocs that are not referenced in the consensus table.
///
/// Note: use of `sha3-256` is a synonym for `con_%` is a workaround.
const FIND_UNREFERENCED_CONSENSUS_EXTDOCS: &str = "
    SELECT filename FROM ExtDocs WHERE
         (type LIKE 'con_%' OR type = 'sha3-256')
    AND NOT EXISTS
         (SELECT digest FROM Consensuses WHERE Consensuses.digest = ExtDocs.digest);";
/// Query: Discard every expired extdoc.
///
/// External documents aren't exposed through [`Store`].
const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
/// Query: Discard an extdoc with a given path.
const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
/// Query: List all extdoc filenames.
const FIND_ALL_EXTDOC_FILENAMES: &str = "SELECT filename FROM ExtDocs;";
/// Query: Get the latest protocol status.
const FIND_LATEST_PROTOCOL_STATUS: &str = "SELECT date, statuses FROM ProtocolStatus WHERE zero=0;";
/// Query: Update the latest protocol status.
const UPDATE_PROTOCOL_STATUS: &str = "INSERT OR REPLACE INTO ProtocolStatus VALUES ( 0, ?, ? );";
/// Query: Discard every router descriptor that hasn't been listed for 3
/// months.
// TODO: Choose a more realistic time.
const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
// TODO: Choose a more realistic time.
const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
/// Query: Discard every expired authority certificate.
const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
/// Query: Discard every consensus that's been expired for at least
/// two days.
const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
#[cfg(feature = "bridge-client")]
const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
#[cfg(test)]
pub(crate) mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::storage::EXPIRATION_DEFAULTS;
    use digest::Digest;
    use hex_literal::hex;
    use tempfile::{tempdir, TempDir};
    use time::ext::NumericalDuration;
    use tor_llcrypto::d::Sha3_256;
    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
        let tmp_dir = tempdir().unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        let conn = rusqlite::Connection::open(sql_path)?;
        let blob_path = tmp_dir.path().join("blobs");
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .make_secure_dir(blob_path)
            .unwrap();
        let store = SqliteStore::from_conn(conn, blob_dir)?;
        Ok((tmp_dir, store))
    }
    #[test]
    fn init() -> Result<()> {
        let tmp_dir = tempdir().unwrap();
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .secure_dir(&tmp_dir)
            .unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        // Initial setup: everything should work.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Second setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Third setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Fourth: this says we can't read it, so we'll get an error.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
            let val = SqliteStore::from_conn(conn, blob_dir);
            assert!(val.is_err());
        }
        Ok(())
    }
    #[test]
    fn bad_blob_fname() -> Result<()> {
        let (_tmp_dir, store) = new_empty()?;
        assert!(store.blob_dir.join("abcd").is_ok());
        assert!(store.blob_dir.join("abcd..").is_ok());
        assert!(store.blob_dir.join("..abcd..").is_ok());
        assert!(store.blob_dir.join(".abcd").is_ok());
        assert!(store.blob_dir.join("..").is_err());
        assert!(store.blob_dir.join("../abcd").is_err());
        assert!(store.blob_dir.join("/abcd").is_err());
        Ok(())
    }
    #[test]
    fn blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        let fname1 = store.save_blob(
            b"Hello world",
            "greeting",
            "sha1",
            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
            now + one_week,
        )?;
        let fname2 = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now - one_week,
        )?;
        assert_eq!(
            fname1,
            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
            b"Goodbye, dear friends"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 2);
        let blob = store.read_blob(&fname2)?.unwrap();
        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
        // Now expire: the second file should go away.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        Ok(())
    }
    #[test]
    fn consensus() -> Result<()> {
        use tor_netdoc::doc::netstatus;
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_hour = 1.hours();
        assert_eq!(
            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
            None
        );
        let cmeta = ConsensusMeta::new(
            netstatus::Lifetime::new(
                now.into(),
                (now + one_hour).into(),
                SystemTime::from(now + one_hour * 2),
            )
            .unwrap(),
            [0xAB; 32],
            [0xBC; 32],
        );
        store.store_consensus(
            &cmeta,
            ConsensusFlavor::Microdesc,
            true,
            "Pretend this is a consensus",
        )?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                None
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
            assert!(consensus.is_none());
        }
        store.mark_consensus_usable(&cmeta)?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                now.into()
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
        }
        {
            let consensus_text = store.consensus_by_meta(&cmeta)?;
            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
            let (is, _cmeta2) = store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .unwrap();
            assert_eq!(is.as_str()?, "Pretend this is a consensus");
            let cmeta3 = ConsensusMeta::new(
                netstatus::Lifetime::new(
                    now.into(),
                    (now + one_hour).into(),
                    SystemTime::from(now + one_hour * 2),
                )
                .unwrap(),
                [0x99; 32],
                [0x99; 32],
            );
            assert!(store.consensus_by_meta(&cmeta3).is_err());
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
                .is_none());
        }
        {
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_some());
            store.delete_consensus(&cmeta)?;
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_none());
        }
        Ok(())
    }
    #[test]
    fn authcerts() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_hour = 1.hours();
        let keyids = AuthCertKeyIds {
            id_fingerprint: [3; 20].into(),
            sk_fingerprint: [4; 20].into(),
        };
        let keyids2 = AuthCertKeyIds {
            id_fingerprint: [4; 20].into(),
            sk_fingerprint: [3; 20].into(),
        };
        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
        let certs = store.authcerts(&[keyids, keyids2])?;
        assert_eq!(certs.len(), 1);
        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
        Ok(())
    }
    #[test]
    fn microdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();
        let d1 = [5_u8; 32];
        let d2 = [7; 32];
        let d3 = [42; 32];
        let d4 = [99; 32];
        let long_ago: OffsetDateTime = now - one_day * 100;
        store.store_microdescs(
            &[
                ("Fake micro 1", &d1),
                ("Fake micro 2", &d2),
                ("Fake micro 3", &d3),
            ],
            long_ago.into(),
        )?;
        store.update_microdescs_listed(&[d2], now.into())?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 2);
        assert_eq!(mds.get(&d1), None);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
        assert_eq!(mds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 1);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        Ok(())
    }
    #[test]
    #[cfg(feature = "routerdesc")]
    fn routerdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();
        let long_ago: OffsetDateTime = now - one_day * 100;
        let recently = now - one_day;
        let d1 = [5_u8; 20];
        let d2 = [7; 20];
        let d3 = [42; 20];
        let d4 = [99; 20];
        store.store_routerdescs(&[
            ("Fake routerdesc 1", long_ago.into(), &d1),
            ("Fake routerdesc 2", recently.into(), &d2),
            ("Fake routerdesc 3", long_ago.into(), &d3),
        ])?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 2);
        assert_eq!(rds.get(&d1), None);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
        assert_eq!(rds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 1);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        Ok(())
    }
    #[test]
    fn from_path_rw() -> Result<()> {
        let tmp = tempdir().unwrap();
        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
        // Nothing there: can't open read-only
        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
        assert!(r.is_err());
        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
        // Opening it read-write will crate the files
        {
            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
            assert!(tmp.path().join("dir_blobs").is_dir());
            assert!(store.lockfile.is_some());
            assert!(!store.is_readonly());
            assert!(store.upgrade_to_readwrite()?); // no-op.
        }
        // At this point, we can successfully make a read-only connection.
        {
            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
            assert!(store2.is_readonly());
            // Nobody else is locking this, so we can upgrade.
            assert!(store2.upgrade_to_readwrite()?); // no-op.
            assert!(!store2.is_readonly());
        }
        Ok(())
    }
    #[test]
    fn orphaned_blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        /*
        for ent in store.blob_dir.read_directory(".")?.flatten() {
            println!("{:?}", ent);
        }
        */
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        let _fname_good = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
        // timestamp.
        store
            .blob_dir
            .write_and_replace("fairly_new", b"new contents will stay")?;
        store
            .blob_dir
            .write_and_replace("fairly_old", b"old contents will be removed")?;
        filetime::set_file_mtime(
            store.blob_dir.join("fairly_old")?,
            SystemTime::from(now - one_week).into(),
        )
        .expect("Can't adjust mtime");
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
        Ok(())
    }
    #[test]
    fn unreferenced_consensus_blob() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        // Make a blob that claims to be a consensus, and which has not yet expired, but which is
        // not listed in the consensus table.  It should get removed.
        let fname = store.save_blob(
            b"pretend this is a consensus",
            "con_fake",
            "sha1",
            &hex!("803e5a45eea7766a62a735e051a25a50ffb9b1cf"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname)?).unwrap()[..],
            b"pretend this is a consensus"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 0);
        Ok(())
    }
    #[test]
    fn vanished_blob_cleanup() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_week = 1.weeks();
        // Make a few blobs.
        let mut fnames = vec![];
        for idx in 0..8 {
            let content = format!("Example {idx}");
            let digest = Sha3_256::digest(content.as_bytes());
            let fname = store.save_blob(
                content.as_bytes(),
                "blob",
                "sha3-256",
                digest.as_slice(),
                now + one_week,
            )?;
            fnames.push(fname);
        }
        // Delete the odd-numbered blobs.
        store.blob_dir.remove_file(&fnames[1])?;
        store.blob_dir.remove_file(&fnames[3])?;
        store.blob_dir.remove_file(&fnames[5])?;
        store.blob_dir.remove_file(&fnames[7])?;
        let n_removed = {
            let tx = store.conn.transaction()?;
            let n = SqliteStore::remove_entries_for_vanished_blobs(&store.blob_dir, &tx)?;
            tx.commit()?;
            n
        };
        assert_eq!(n_removed, 4);
        // Make sure that it was the _odd-numbered_ ones that got deleted from the DB.
        let (n_1,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[1]], |row| {
                    row.try_into()
                })?;
        let (n_2,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[2]], |row| {
                    row.try_into()
                })?;
        assert_eq!(n_1, 0);
        assert_eq!(n_2, 1);
        Ok(())
    }
    #[test]
    fn protocol_statuses() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = SystemTime::now();
        let hour = 1.hours();
        let valid_after = now;
        let protocols = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 LinkAuth=3",
                "recommended":"Link=1-5 LinkAuth=2-5"
            },
            "relay":{
                "required":"Wombat=20-22 Knish=25-27",
                "recommended":"Wombat=20-30 Knish=20-30"
            }
            }"#,
        )
        .unwrap();
        let v = store.cached_protocol_recommendations()?;
        assert!(v.is_none());
        store.update_protocol_recommendations(valid_after, &protocols)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now);
        assert_eq!(
            serde_json::to_string(&protocols).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        let protocols2 = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 ",
                "recommended":"Link=1-5"
            },
            "relay":{
                "required":"Wombat=20",
                "recommended":"Cons=6"
            }
            }"#,
        )
        .unwrap();
        let valid_after_2 = now + hour;
        store.update_protocol_recommendations(valid_after_2, &protocols2)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now + hour);
        assert_eq!(
            serde_json::to_string(&protocols2).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        Ok(())
    }
}