tor_dirmgr/storage/
sqlite.rs

1//! Net document storage backed by sqlite3.
2//!
3//! We store most objects in sqlite tables, except for very large ones,
4//! which we store as "blob" files in a separate directory.
5
6use super::ExpirationConfig;
7use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8use crate::err::ReadOnlyStorageError;
9use crate::storage::{InputString, Store};
10use crate::{Error, Result};
11
12use fs_mistrust::CheckedDir;
13use tor_basic_utils::PathExt as _;
14use tor_error::{into_internal, warn_report};
15use tor_netdoc::doc::authcert::AuthCertKeyIds;
16use tor_netdoc::doc::microdesc::MdDigest;
17use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18#[cfg(feature = "routerdesc")]
19use tor_netdoc::doc::routerdesc::RdDigest;
20
21#[cfg(feature = "bridge-client")]
22pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
23
24use std::collections::{HashMap, HashSet};
25use std::fs::OpenOptions;
26use std::path::{Path, PathBuf};
27use std::result::Result as StdResult;
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use rusqlite::{params, OpenFlags, OptionalExtension, Transaction};
32use time::OffsetDateTime;
33use tracing::{trace, warn};
34
35/// Local directory cache using a Sqlite3 connection.
36pub(crate) struct SqliteStore {
37    /// Connection to the sqlite3 database.
38    conn: rusqlite::Connection,
39    /// Location for the sqlite3 database; used to reopen it.
40    sql_path: Option<PathBuf>,
41    /// Location to store blob files.
42    blob_dir: CheckedDir,
43    /// Lockfile to prevent concurrent write attempts from different
44    /// processes.
45    ///
46    /// If this is None we aren't using a lockfile.  Watch out!
47    ///
48    /// (sqlite supports that with connection locking, but we want to
49    /// be a little more coarse-grained here)
50    lockfile: Option<fslock::LockFile>,
51}
52
53/// # Some notes on blob consistency, and the lack thereof.
54///
55/// We store large documents (currently, consensuses) in separate files,
56/// called "blobs",
57/// outside of the the sqlite database.
58/// We do this for performance reasons: for large objects,
59/// mmap is far more efficient than sqlite in RAM and CPU.
60///
61/// In the sqlite database, we keep track of our blobs
62/// using the ExtDocs table.
63/// This scheme makes it possible for the blobs and the table
64/// get out of sync.
65///
66/// In summary:
67///   - _Vanished_ blobs (ones present only in ExtDocs) are possible;
68///     we try to tolerate them.
69///   - _Orphaned_ blobs (ones present only on the disk) are possible;
70///     we try to tolerate them.
71///   - _Corrupted_ blobs (ones with the wrong contents) are possible
72///     but (we hope) unlikely;
73///     we do not currently try to tolerate them.
74///
75/// In more detail:
76///
77/// Here are the practices we use when _writing_ blobs:
78///
79/// - We always create a blob before updating the ExtDocs table,
80///   and remove an entry from the ExtDocs before deleting the blob.
81/// - If we decide to roll back the transaction that adds the row to ExtDocs,
82///   we delete the blob after doing so.
83/// - We use [`CheckedDir::write_and_replace`] to store blobs,
84///   so a half-formed blob shouldn't be common.
85///   (We assume that "close" and "rename" are serialized by the OS,
86///   so that _if_ the rename happens, the file is completely written.)
87/// - Blob filenames include a digest of the file contents,
88///   so collisions are unlikely.
89///
90/// Here are the practices we use when _deleting_ blobs:
91/// - First, we drop the row from the ExtDocs table.
92///   Only then do we delete the file.
93///
94/// These practices can result in _orphaned_ blobs
95/// (ones with no row in the ExtDoc table),
96/// or in _half-written_ blobs files with tempfile names
97/// (which also have no row in the ExtDoc table).
98/// This happens if we crash at the wrong moment.
99/// Such blobs can be safely removed;
100/// we do so in [`SqliteStore::remove_unreferenced_blobs`].
101///
102/// Despite our efforts, _vanished_ blobs
103/// (entries in the ExtDoc table with no corresponding file)
104/// are also possible.  They could happen for these reasons:
105/// - The filesystem might not serialize or sync things in a way that's
106///   consistent with the DB.
107/// - An automatic process might remove random cache files.
108/// - The user might run around deleting things to free space.
109///
110/// We try to tolerate vanished blobs.
111///
112/// _Corrupted_ blobs are also possible.  They can happen on FS corruption,
113/// or on somebody messing around with the cache directory manually.
114/// We do not attempt to tolerate corrupted blobs.
115///
116/// ## On trade-offs
117///
118/// TODO: The practices described above are more likely
119/// to create _orphaned_ blobs than _vanished_ blobs.
120/// We initially made this trade-off decision on the mistaken theory
121/// that we could avoid vanished blobs entirely.
122/// We _may_ want to revisit this choice,
123/// on the rationale that we can respond to vanished blobs as soon as we notice they're gone,
124/// whereas we can only handle orphaned blobs with a periodic cleanup.
125/// On the other hand, since we need to handle both cases,
126/// it may not matter very much in practice.
127#[allow(unused)]
128mod blob_consistency {}
129
130/// Specific error returned when a blob will not be read.
131///
132/// This error is an internal type: it's never returned to the user.
133#[derive(Debug)]
134enum AbsentBlob {
135    /// We did not find a blob file on the disk.
136    VanishedFile,
137    /// We did not even find a blob to read in ExtDocs.
138    NothingToRead,
139}
140
141impl SqliteStore {
142    /// Construct or open a new SqliteStore at some location on disk.
143    /// The provided location must be a directory, or a possible
144    /// location for a directory: the directory will be created if
145    /// necessary.
146    ///
147    /// If readonly is true, the result will be a read-only store.
148    /// Otherwise, when readonly is false, the result may be
149    /// read-only or read-write, depending on whether we can acquire
150    /// the lock.
151    ///
152    /// # Limitations:
153    ///
154    /// The file locking that we use to ensure that only one dirmgr is
155    /// writing to a given storage directory at a time is currently
156    /// _per process_. Therefore, you might get unexpected results if
157    /// two SqliteStores are created in the same process with the
158    /// path.
159    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
160        path: P,
161        mistrust: &fs_mistrust::Mistrust,
162        mut readonly: bool,
163    ) -> Result<Self> {
164        let path = path.as_ref();
165        let sqlpath = path.join("dir.sqlite3");
166        let blobpath = path.join("dir_blobs/");
167        let lockpath = path.join("dir.lock");
168
169        let verifier = mistrust.verifier().permit_readable().check_content();
170
171        let blob_dir = if readonly {
172            verifier.secure_dir(blobpath)?
173        } else {
174            verifier.make_secure_dir(blobpath)?
175        };
176
177        // Check permissions on the sqlite and lock files; don't require them to
178        // exist.
179        for p in [&lockpath, &sqlpath] {
180            match mistrust
181                .verifier()
182                .permit_readable()
183                .require_file()
184                .check(p)
185            {
186                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
187                Err(e) => return Err(e.into()),
188            }
189        }
190
191        let mut lockfile = fslock::LockFile::open(&lockpath).map_err(Error::from_lockfile)?;
192        if !readonly && !lockfile.try_lock().map_err(Error::from_lockfile)? {
193            readonly = true; // we couldn't get the lock!
194        };
195        let flags = if readonly {
196            OpenFlags::SQLITE_OPEN_READ_ONLY
197        } else {
198            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
199        };
200        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
201        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
202        store.sql_path = Some(sqlpath);
203        store.lockfile = Some(lockfile);
204        Ok(store)
205    }
206
207    /// Construct a new SqliteStore from a database connection and a location
208    /// for blob files.
209    ///
210    /// Used for testing with a memory-backed database.
211    ///
212    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
213    /// this database, since we will freely remove unreferenced files from this directory.
214    #[cfg(test)]
215    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
216        Self::from_conn_internal(conn, blob_dir, false)
217    }
218
219    /// Construct a new SqliteStore from a database connection and a location
220    /// for blob files.
221    ///
222    /// The `readonly` argument specifies whether the database connection should be read-only.
223    fn from_conn_internal(
224        conn: rusqlite::Connection,
225        blob_dir: CheckedDir,
226        readonly: bool,
227    ) -> Result<Self> {
228        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
229        // pragma on the connection.
230        conn.pragma_update(None, "foreign_keys", "ON")?;
231
232        let mut result = SqliteStore {
233            conn,
234            blob_dir,
235            lockfile: None,
236            sql_path: None,
237        };
238
239        result.check_schema(readonly)?;
240
241        Ok(result)
242    }
243
244    /// Check whether this database has a schema format we can read, and
245    /// install or upgrade the schema if necessary.
246    fn check_schema(&mut self, readonly: bool) -> Result<()> {
247        let tx = self.conn.transaction()?;
248        let db_n_tables: u32 = tx.query_row(
249            "SELECT COUNT(name) FROM sqlite_master
250             WHERE type='table'
251             AND name NOT LIKE 'sqlite_%'",
252            [],
253            |row| row.get(0),
254        )?;
255        let db_exists = db_n_tables > 0;
256
257        // Update the schema from current_vsn to the latest (does not commit)
258        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
259            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
260                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
261                let new_vsn = from_vsn + 1;
262                if current_vsn < new_vsn {
263                    tx.execute_batch(update)?;
264                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
265                }
266            }
267            Ok::<_, Error>(())
268        };
269
270        if !db_exists {
271            if !readonly {
272                tx.execute_batch(INSTALL_V0_SCHEMA)?;
273                update_schema(&tx, 0)?;
274                tx.commit()?;
275            } else {
276                // The other process should have created the database!
277                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
278            }
279            return Ok(());
280        }
281
282        let (version, readable_by): (u32, u32) = tx.query_row(
283            "SELECT version, readable_by FROM TorSchemaMeta
284             WHERE name = 'TorDirStorage'",
285            [],
286            |row| Ok((row.get(0)?, row.get(1)?)),
287        )?;
288
289        if version < SCHEMA_VERSION {
290            if !readonly {
291                update_schema(&tx, version)?;
292                tx.commit()?;
293            } else {
294                return Err(Error::ReadOnlyStorage(
295                    ReadOnlyStorageError::IncompatibleSchema {
296                        schema: version,
297                        supported: SCHEMA_VERSION,
298                    },
299                ));
300            }
301
302            return Ok(());
303        } else if readable_by > SCHEMA_VERSION {
304            return Err(Error::UnrecognizedSchema {
305                schema: readable_by,
306                supported: SCHEMA_VERSION,
307            });
308        }
309
310        // rolls back the transaction, but nothing was done.
311        Ok(())
312    }
313
314    /// Read a blob from disk, mapping it if possible.
315    ///
316    /// Return `Ok(Err(.))` if the file for the blob was not found on disk;
317    /// returns an error in other cases.
318    ///
319    /// (See [`blob_consistency`] for information on why the blob might be absent.)
320    fn read_blob(&self, path: &str) -> Result<StdResult<InputString, AbsentBlob>> {
321        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
322            Ok(file) => file,
323            Err(fs_mistrust::Error::NotFound(_)) => {
324                warn!(
325                    "{:?} was listed in the database, but its corresponding file had been deleted",
326                    path
327                );
328                return Ok(Err(AbsentBlob::VanishedFile));
329            }
330            Err(e) => return Err(e.into()),
331        };
332
333        InputString::load(file)
334            .map_err(|err| Error::CacheFile {
335                action: "loading",
336                fname: PathBuf::from(path),
337                error: Arc::new(err),
338            })
339            .map(Ok)
340    }
341
342    /// Write a file to disk as a blob, and record it in the ExtDocs table.
343    ///
344    /// Return a SavedBlobHandle that describes where the blob is, and which
345    /// can be used either to commit the blob or delete it.
346    ///
347    /// See [`blob_consistency`] for more information on guarantees.
348    fn save_blob_internal(
349        &mut self,
350        contents: &[u8],
351        doctype: &str,
352        digest_type: &str,
353        digest: &[u8],
354        expires: OffsetDateTime,
355    ) -> Result<blob_handle::SavedBlobHandle<'_>> {
356        let digest = hex::encode(digest);
357        let digeststr = format!("{}-{}", digest_type, digest);
358        let fname = format!("{}_{}", doctype, digeststr);
359
360        let full_path = self.blob_dir.join(&fname)?;
361        let unlinker = blob_handle::Unlinker::new(&full_path);
362        self.blob_dir
363            .write_and_replace(&fname, contents)
364            .map_err(|e| match e {
365                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
366                    action: "saving",
367                    fname: full_path,
368                    error: err,
369                },
370                err => err.into(),
371            })?;
372
373        let tx = self.conn.unchecked_transaction()?;
374        tx.execute(INSERT_EXTDOC, params![digeststr, expires, doctype, fname])?;
375
376        Ok(blob_handle::SavedBlobHandle::new(
377            tx, fname, digeststr, unlinker,
378        ))
379    }
380
381    /// As `latest_consensus`, but do not retry.
382    fn latest_consensus_internal(
383        &self,
384        flavor: ConsensusFlavor,
385        pending: Option<bool>,
386    ) -> Result<StdResult<InputString, AbsentBlob>> {
387        trace!(?flavor, ?pending, "Loading latest consensus from cache");
388        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
389            None => self
390                .conn
391                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
392                .optional()?,
393            Some(pending_val) => self
394                .conn
395                .query_row(
396                    FIND_CONSENSUS_P,
397                    params![pending_val, flavor.name()],
398                    |row| row.try_into(),
399                )
400                .optional()?,
401        };
402
403        if let Some((_va, _vu, filename)) = rv {
404            // TODO blobs: If the cache is inconsistent (because this blob is _vanished_), and the cache has not yet
405            // been cleaned, this may fail to find the latest consensus that we actually have.
406            self.read_blob(&filename)
407        } else {
408            Ok(Err(AbsentBlob::NothingToRead))
409        }
410    }
411
412    /// Save a blob to disk and commit it.
413    #[cfg(test)]
414    fn save_blob(
415        &mut self,
416        contents: &[u8],
417        doctype: &str,
418        digest_type: &str,
419        digest: &[u8],
420        expires: OffsetDateTime,
421    ) -> Result<String> {
422        let h = self.save_blob_internal(contents, doctype, digest_type, digest, expires)?;
423        let fname = h.fname().to_string();
424        h.commit()?;
425        Ok(fname)
426    }
427
428    /// Return the valid-after time for the latest non non-pending consensus,
429    #[cfg(test)]
430    // We should revise the tests to use latest_consensus_meta instead.
431    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
432        Ok(self
433            .latest_consensus_meta(flavor)?
434            .map(|m| m.lifetime().valid_after().into()))
435    }
436
437    /// Remove the blob with name `fname`, but do not give an error on failure.
438    ///
439    /// See [`blob_consistency`]: we should call this only having first ensured
440    /// that the blob is removed from the ExtDocs table.
441    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
442        let fname = fname.as_ref();
443        if let Err(e) = self.blob_dir.remove_file(fname) {
444            warn_report!(e, "Unable to remove {}", fname.display_lossy());
445        }
446    }
447
448    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
449    ///
450    /// There shouldn't typically be any, but we don't want to let our cache grow infinitely
451    /// if we have a bug.
452    fn remove_unreferenced_blobs(
453        &self,
454        now: OffsetDateTime,
455        expiration: &ExpirationConfig,
456    ) -> Result<()> {
457        // Now, look for any unreferenced blobs that are a bit old.
458        for ent in self.blob_dir.read_directory(".")?.flatten() {
459            let md_error = |io_error| Error::CacheFile {
460                action: "getting metadata",
461                fname: ent.file_name().into(),
462                error: Arc::new(io_error),
463            };
464            if ent
465                .metadata()
466                .map_err(md_error)?
467                .modified()
468                .map_err(md_error)?
469                + expiration.consensuses
470                >= now
471            {
472                // this file is sufficiently recent that we should not remove it, just to be cautious.
473                continue;
474            }
475            let filename = match ent.file_name().into_string() {
476                Ok(s) => s,
477                Err(os_str) => {
478                    // This filename wasn't utf-8.  We will never create one of these.
479                    warn!(
480                        "Removing bizarre file '{}' from blob store.",
481                        os_str.to_string_lossy()
482                    );
483                    self.remove_blob_or_warn(ent.file_name());
484                    continue;
485                }
486            };
487            let found: (u32,) =
488                self.conn
489                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
490                        row.try_into()
491                    })?;
492            if found == (0,) {
493                warn!("Removing unreferenced file '{}' from blob store", &filename);
494                self.remove_blob_or_warn(ent.file_name());
495            }
496        }
497
498        Ok(())
499    }
500
501    /// Remove any entry in the ExtDocs table for which a blob file is vanished.
502    ///
503    /// This method is `O(n)` in the size of the ExtDocs table and the size of the directory.
504    /// It doesn't take self, to avoid problems with the borrow checker.
505    fn remove_entries_for_vanished_blobs<'a>(
506        blob_dir: &CheckedDir,
507        tx: &Transaction<'a>,
508    ) -> Result<usize> {
509        let in_directory: HashSet<PathBuf> = blob_dir
510            .read_directory(".")?
511            .flatten()
512            .map(|dir_entry| PathBuf::from(dir_entry.file_name()))
513            .collect();
514        let in_db: Vec<String> = tx
515            .prepare(FIND_ALL_EXTDOC_FILENAMES)?
516            .query_map([], |row| row.get::<_, String>(0))?
517            .collect::<StdResult<Vec<String>, _>>()?;
518
519        let mut n_removed = 0;
520        for fname in in_db {
521            if in_directory.contains(Path::new(&fname)) {
522                // The blob is present; great!
523                continue;
524            }
525
526            n_removed += tx.execute(DELETE_EXTDOC_BY_FILENAME, [fname])?;
527        }
528
529        Ok(n_removed)
530    }
531}
532
533impl Store for SqliteStore {
534    fn is_readonly(&self) -> bool {
535        match &self.lockfile {
536            Some(f) => !f.owns_lock(),
537            None => false,
538        }
539    }
540    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
541        if self.is_readonly() && self.sql_path.is_some() {
542            let lf = self
543                .lockfile
544                .as_mut()
545                .expect("No lockfile open; cannot upgrade to read-write storage");
546            if !lf.try_lock().map_err(Error::from_lockfile)? {
547                // Somebody else has the lock.
548                return Ok(false);
549            }
550            // Unwrap should be safe due to parent `.is_some()` check
551            #[allow(clippy::unwrap_used)]
552            match rusqlite::Connection::open(self.sql_path.as_ref().unwrap()) {
553                Ok(conn) => {
554                    self.conn = conn;
555                }
556                Err(e) => {
557                    if let Err(e2) = lf.unlock() {
558                        warn_report!(
559                            e2,
560                            "Unable to release lock file while upgrading DB to read/write"
561                        );
562                    }
563                    return Err(e.into());
564                }
565            }
566        }
567        Ok(true)
568    }
569    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
570        let tx = self.conn.transaction()?;
571        // This works around a false positive; see
572        //   https://github.com/rust-lang/rust-clippy/issues/8114
573        #[allow(clippy::let_and_return)]
574        let expired_blobs: Vec<String> = {
575            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
576            let names: Vec<String> = stmt
577                .query_map([], |row| row.get::<_, String>(0))?
578                .collect::<StdResult<Vec<String>, _>>()?;
579            names
580        };
581
582        let now = OffsetDateTime::now_utc();
583        tx.execute(DROP_OLD_EXTDOCS, [])?;
584
585        // In theory bad system clocks might generate table rows with times far in the future.
586        // However, for data which is cached here which comes from the network consensus,
587        // we rely on the fact that no consensus from the future exists, so this can't happen.
588        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
589        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
590        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
591        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
592
593        // Bridge descriptors come from bridges and bridges might send crazy times,
594        // so we need to discard any that look like they are from the future,
595        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
596        #[cfg(feature = "bridge-client")]
597        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
598
599        // Find all consensus blobs that are no longer referenced,
600        // and delete their entries from extdocs.
601        let remove_consensus_blobs = {
602            // TODO: This query can be O(n); but that won't matter for clients.
603            // For relays, we may want to add an index to speed it up, if we use this code there too.
604            let mut stmt = tx.prepare(FIND_UNREFERENCED_CONSENSUS_EXTDOCS)?;
605            let filenames: Vec<String> = stmt
606                .query_map([], |row| row.get::<_, String>(0))?
607                .collect::<StdResult<Vec<String>, _>>()?;
608            drop(stmt);
609            let mut stmt = tx.prepare(DELETE_EXTDOC_BY_FILENAME)?;
610            for fname in filenames.iter() {
611                stmt.execute([fname])?;
612            }
613            filenames
614        };
615
616        tx.commit()?;
617        // Now that the transaction has been committed, these blobs are
618        // unreferenced in the ExtDocs table, and we can remove them from disk.
619        let mut remove_blob_files: HashSet<_> = expired_blobs.iter().collect();
620        remove_blob_files.extend(remove_consensus_blobs.iter());
621
622        for name in remove_blob_files {
623            let fname = self.blob_dir.join(name);
624            if let Ok(fname) = fname {
625                if let Err(e) = std::fs::remove_file(&fname) {
626                    warn_report!(
627                        e,
628                        "Couldn't remove orphaned blob file {}",
629                        fname.display_lossy()
630                    );
631                }
632            }
633        }
634
635        self.remove_unreferenced_blobs(now, expiration)?;
636
637        Ok(())
638    }
639
640    // Note: We cannot, and do not, call this function when a transaction already exists.
641    fn latest_consensus(
642        &self,
643        flavor: ConsensusFlavor,
644        pending: Option<bool>,
645    ) -> Result<Option<InputString>> {
646        match self.latest_consensus_internal(flavor, pending)? {
647            Ok(s) => return Ok(Some(s)),
648            Err(AbsentBlob::NothingToRead) => return Ok(None),
649            Err(AbsentBlob::VanishedFile) => {
650                // If we get here, the file was vanished.  Clean up the DB and try again.
651            }
652        }
653
654        // We use unchecked_transaction() here because this API takes a non-mutable `SqliteStore`.
655        // `unchecked_transaction()` will give an error if it is used
656        // when a transaction already exists.
657        // That's fine: We don't call this function from inside this module,
658        // when a transaction might exist,
659        // and we can't call multiple SqliteStore functions at once: it isn't sync.
660        // Here we enforce that:
661        static_assertions::assert_not_impl_any!(SqliteStore: Sync);
662
663        // If we decide that this is unacceptable,
664        // then since sqlite doesn't really support concurrent use of a connection,
665        // we _could_ change the Store::latest_consensus API take &mut self,
666        // or we could add a mutex,
667        // or we could just not use a transaction object.
668        let tx = self.conn.unchecked_transaction()?;
669        Self::remove_entries_for_vanished_blobs(&self.blob_dir, &tx)?;
670        tx.commit()?;
671
672        match self.latest_consensus_internal(flavor, pending)? {
673            Ok(s) => Ok(Some(s)),
674            Err(AbsentBlob::NothingToRead) => Ok(None),
675            Err(AbsentBlob::VanishedFile) => {
676                warn!("Somehow remove_entries_for_vanished_blobs didn't resolve a VanishedFile");
677                Ok(None)
678            }
679        }
680    }
681
682    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
683        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
684        let mut rows = stmt.query(params![flavor.name()])?;
685        if let Some(row) = rows.next()? {
686            Ok(Some(cmeta_from_row(row)?))
687        } else {
688            Ok(None)
689        }
690    }
691    #[cfg(test)]
692    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
693        if let Some((text, _)) =
694            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
695        {
696            Ok(text)
697        } else {
698            Err(Error::CacheCorruption(
699                "couldn't find a consensus we thought we had.",
700            ))
701        }
702    }
703    fn consensus_by_sha3_digest_of_signed_part(
704        &self,
705        d: &[u8; 32],
706    ) -> Result<Option<(InputString, ConsensusMeta)>> {
707        let digest = hex::encode(d);
708        let mut stmt = self
709            .conn
710            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
711        let mut rows = stmt.query(params![digest])?;
712        if let Some(row) = rows.next()? {
713            let meta = cmeta_from_row(row)?;
714            let fname: String = row.get(5)?;
715            if let Ok(text) = self.read_blob(&fname)? {
716                return Ok(Some((text, meta)));
717            }
718        }
719        Ok(None)
720    }
721    fn store_consensus(
722        &mut self,
723        cmeta: &ConsensusMeta,
724        flavor: ConsensusFlavor,
725        pending: bool,
726        contents: &str,
727    ) -> Result<()> {
728        let lifetime = cmeta.lifetime();
729        let sha3_of_signed = cmeta.sha3_256_of_signed();
730        let sha3_of_whole = cmeta.sha3_256_of_whole();
731        let valid_after: OffsetDateTime = lifetime.valid_after().into();
732        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
733        let valid_until: OffsetDateTime = lifetime.valid_until().into();
734
735        /// How long to keep a consensus around after it has expired
736        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
737
738        // After a few days have passed, a consensus is no good for
739        // anything at all, not even diffs.
740        let expires = valid_until + CONSENSUS_LIFETIME;
741
742        let doctype = format!("con_{}", flavor.name());
743
744        let h = self.save_blob_internal(
745            contents.as_bytes(),
746            &doctype,
747            "sha3-256",
748            &sha3_of_whole[..],
749            expires,
750        )?;
751        h.tx().execute(
752            INSERT_CONSENSUS,
753            params![
754                valid_after,
755                fresh_until,
756                valid_until,
757                flavor.name(),
758                pending,
759                hex::encode(sha3_of_signed),
760                h.digest_string()
761            ],
762        )?;
763        h.commit()?;
764        Ok(())
765    }
766    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
767        let d = hex::encode(cmeta.sha3_256_of_whole());
768        let digest = format!("sha3-256-{}", d);
769
770        let tx = self.conn.transaction()?;
771        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
772        trace!("Marked {} consensuses usable", n);
773        tx.commit()?;
774
775        Ok(())
776    }
777    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
778        let d = hex::encode(cmeta.sha3_256_of_whole());
779        let digest = format!("sha3-256-{}", d);
780
781        // TODO: We should probably remove the blob as well, but for now
782        // this is enough.
783        let tx = self.conn.transaction()?;
784        tx.execute(REMOVE_CONSENSUS, params![digest])?;
785        tx.commit()?;
786
787        Ok(())
788    }
789
790    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
791        let mut result = HashMap::new();
792        // TODO(nickm): Do I need to get a transaction here for performance?
793        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
794
795        for ids in certs {
796            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
797            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
798            if let Some(contents) = stmt
799                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
800                .optional()?
801            {
802                result.insert(*ids, contents);
803            }
804        }
805
806        Ok(result)
807    }
808    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
809        let tx = self.conn.transaction()?;
810        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
811        for (meta, content) in certs {
812            let ids = meta.key_ids();
813            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
814            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
815            let published: OffsetDateTime = meta.published().into();
816            let expires: OffsetDateTime = meta.expires().into();
817            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
818        }
819        stmt.finalize()?;
820        tx.commit()?;
821        Ok(())
822    }
823
824    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
825        let mut result = HashMap::new();
826        let mut stmt = self.conn.prepare(FIND_MD)?;
827
828        // TODO(nickm): Should I speed this up with a transaction, or
829        // does it not matter for queries?
830        for md_digest in digests {
831            let h_digest = hex::encode(md_digest);
832            if let Some(contents) = stmt
833                .query_row(params![h_digest], |row| row.get::<_, String>(0))
834                .optional()?
835            {
836                result.insert(*md_digest, contents);
837            }
838        }
839
840        Ok(result)
841    }
842    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
843        let when: OffsetDateTime = when.into();
844
845        let tx = self.conn.transaction()?;
846        let mut stmt = tx.prepare(INSERT_MD)?;
847
848        for (content, md_digest) in digests {
849            let h_digest = hex::encode(md_digest);
850            stmt.execute(params![h_digest, when, content])?;
851        }
852        stmt.finalize()?;
853        tx.commit()?;
854        Ok(())
855    }
856    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
857        let tx = self.conn.transaction()?;
858        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
859        let when: OffsetDateTime = when.into();
860
861        for md_digest in digests {
862            let h_digest = hex::encode(md_digest);
863            stmt.execute(params![when, h_digest])?;
864        }
865
866        stmt.finalize()?;
867        tx.commit()?;
868        Ok(())
869    }
870
871    #[cfg(feature = "routerdesc")]
872    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
873        let mut result = HashMap::new();
874        let mut stmt = self.conn.prepare(FIND_RD)?;
875
876        // TODO(nickm): Should I speed this up with a transaction, or
877        // does it not matter for queries?
878        for rd_digest in digests {
879            let h_digest = hex::encode(rd_digest);
880            if let Some(contents) = stmt
881                .query_row(params![h_digest], |row| row.get::<_, String>(0))
882                .optional()?
883            {
884                result.insert(*rd_digest, contents);
885            }
886        }
887
888        Ok(result)
889    }
890    #[cfg(feature = "routerdesc")]
891    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
892        let tx = self.conn.transaction()?;
893        let mut stmt = tx.prepare(INSERT_RD)?;
894
895        for (content, when, rd_digest) in digests {
896            let when: OffsetDateTime = (*when).into();
897            let h_digest = hex::encode(rd_digest);
898            stmt.execute(params![h_digest, when, content])?;
899        }
900        stmt.finalize()?;
901        tx.commit()?;
902        Ok(())
903    }
904
905    #[cfg(feature = "bridge-client")]
906    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
907        let bridge_line = bridge.to_string();
908        Ok(self
909            .conn
910            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
911                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
912                let fetched = fetched.into();
913                Ok(CachedBridgeDescriptor { fetched, document })
914            })
915            .optional()?)
916    }
917
918    #[cfg(feature = "bridge-client")]
919    fn store_bridgedesc(
920        &mut self,
921        bridge: &BridgeConfig,
922        entry: CachedBridgeDescriptor,
923        until: SystemTime,
924    ) -> Result<()> {
925        if self.is_readonly() {
926            // Hopefully whoever *does* have the lock will update the cache.
927            // Otherwise it will contain a stale entry forever
928            // (which we'll ignore, but waste effort on).
929            return Ok(());
930        }
931        let bridge_line = bridge.to_string();
932        let row = params![
933            bridge_line,
934            OffsetDateTime::from(entry.fetched),
935            OffsetDateTime::from(until),
936            entry.document,
937        ];
938        self.conn.execute(INSERT_BRIDGEDESC, row)?;
939        Ok(())
940    }
941
942    #[cfg(feature = "bridge-client")]
943    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
944        if self.is_readonly() {
945            // This is called when we find corrupted or stale cache entries,
946            // to stop us wasting time on them next time.
947            // Hopefully whoever *does* have the lock will do this.
948            return Ok(());
949        }
950        let bridge_line = bridge.to_string();
951        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
952        Ok(())
953    }
954
955    fn update_protocol_recommendations(
956        &mut self,
957        valid_after: SystemTime,
958        protocols: &tor_netdoc::doc::netstatus::ProtoStatuses,
959    ) -> Result<()> {
960        let json =
961            serde_json::to_string(&protocols).map_err(into_internal!("Cannot encode protocols"))?;
962        let params = params![OffsetDateTime::from(valid_after), json];
963        self.conn.execute(UPDATE_PROTOCOL_STATUS, params)?;
964        Ok(())
965    }
966
967    fn cached_protocol_recommendations(
968        &self,
969    ) -> Result<Option<(SystemTime, tor_netdoc::doc::netstatus::ProtoStatuses)>> {
970        let opt_row: Option<(OffsetDateTime, String)> = self
971            .conn
972            .query_row(FIND_LATEST_PROTOCOL_STATUS, [], |row| {
973                Ok((row.get(0)?, row.get(1)?))
974            })
975            .optional()?;
976
977        let (date, json) = match opt_row {
978            Some(v) => v,
979            None => return Ok(None),
980        };
981
982        let date = date.into();
983        let statuses: tor_netdoc::doc::netstatus::ProtoStatuses =
984            serde_json::from_str(json.as_str()).map_err(|e| Error::BadJsonInCache(Arc::new(e)))?;
985
986        Ok(Some((date, statuses)))
987    }
988}
989
990/// Functionality related to uncommitted blobs.
991mod blob_handle {
992    use std::path::{Path, PathBuf};
993
994    use crate::Result;
995    use rusqlite::Transaction;
996    use tor_basic_utils::PathExt as _;
997    use tor_error::warn_report;
998
999    /// Handle to a blob that we have saved to disk but
1000    /// not yet committed to
1001    /// the database, and the database transaction where we added a reference to it.
1002    ///
1003    /// Used to either commit the blob (by calling [`SavedBlobHandle::commit`]),
1004    /// or roll it back (by dropping the [`SavedBlobHandle`] without committing it.)
1005    #[must_use]
1006    pub(super) struct SavedBlobHandle<'a> {
1007        /// Transaction we're using to add the blob to the ExtDocs table.
1008        ///
1009        /// Note that struct fields are dropped in declaration order,
1010        /// so when we drop an uncommitted SavedBlobHandle,
1011        /// we roll back the transaction before we delete the file.
1012        /// (In practice, either order would be fine.)
1013        tx: Transaction<'a>,
1014        /// Filename for the file, with respect to the blob directory.
1015        fname: String,
1016        /// Declared digest string for this blob. Of the format
1017        /// "digesttype-hexstr".
1018        digeststr: String,
1019        /// An 'unlinker' for the blob file.
1020        unlinker: Unlinker,
1021    }
1022
1023    impl<'a> SavedBlobHandle<'a> {
1024        /// Construct a SavedBlobHandle from its parts.
1025        pub(super) fn new(
1026            tx: Transaction<'a>,
1027            fname: String,
1028            digeststr: String,
1029            unlinker: Unlinker,
1030        ) -> Self {
1031            Self {
1032                tx,
1033                fname,
1034                digeststr,
1035                unlinker,
1036            }
1037        }
1038
1039        /// Return a reference to the underlying database transaction.
1040        pub(super) fn tx(&self) -> &Transaction<'a> {
1041            &self.tx
1042        }
1043        /// Return the digest string of the saved blob.
1044        /// Other tables use this as a foreign key into ExtDocs.digest
1045        pub(super) fn digest_string(&self) -> &str {
1046            self.digeststr.as_ref()
1047        }
1048        /// Return the filename of this blob within the blob directory.
1049        #[allow(unused)] // used for testing.
1050        pub(super) fn fname(&self) -> &str {
1051            self.fname.as_ref()
1052        }
1053        /// Commit the relevant database transaction.
1054        pub(super) fn commit(self) -> Result<()> {
1055            // The blob has been written to disk, so it is safe to
1056            // commit the transaction.
1057            // If the commit returns an error, self.unlinker will remove the blob.
1058            // (This could result in a vanished blob if the commit reports an error,
1059            // but the transaction is still visible in the database.)
1060            self.tx.commit()?;
1061            // If we reach this point, we don't want to remove the file.
1062            self.unlinker.forget();
1063            Ok(())
1064        }
1065    }
1066
1067    /// Handle to a file which we might have to delete.
1068    ///
1069    /// When this handle is dropped, the file gets deleted, unless you have
1070    /// first called [`Unlinker::forget`].
1071    pub(super) struct Unlinker {
1072        /// The location of the file to remove, or None if we shouldn't
1073        /// remove it.
1074        p: Option<PathBuf>,
1075    }
1076    impl Unlinker {
1077        /// Make a new Unlinker for a given filename.
1078        pub(super) fn new<P: AsRef<Path>>(p: P) -> Self {
1079            Unlinker {
1080                p: Some(p.as_ref().to_path_buf()),
1081            }
1082        }
1083        /// Forget about this unlinker, so that the corresponding file won't
1084        /// get dropped.
1085        fn forget(mut self) {
1086            self.p = None;
1087        }
1088    }
1089    impl Drop for Unlinker {
1090        fn drop(&mut self) {
1091            if let Some(p) = self.p.take() {
1092                if let Err(e) = std::fs::remove_file(&p) {
1093                    warn_report!(
1094                        e,
1095                        "Couldn't remove rolled-back blob file {}",
1096                        p.display_lossy()
1097                    );
1098                }
1099            }
1100        }
1101    }
1102}
1103
1104/// Convert a hexadecimal sha3-256 digest from the database into an array.
1105fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
1106    let mut bytes = [0_u8; 32];
1107    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
1108    Ok(bytes)
1109}
1110
1111/// Convert a hexadecimal sha3-256 "digest string" as used in the
1112/// digest column from the database into an array.
1113fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
1114    if let Some(stripped) = s.strip_prefix("sha3-256-") {
1115        digest_from_hex(stripped)
1116    } else {
1117        Err(Error::CacheCorruption("Invalid digest in database"))
1118    }
1119}
1120
1121/// Create a ConsensusMeta from a `Row` returned by one of
1122/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
1123fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
1124    let va: OffsetDateTime = row.get(0)?;
1125    let fu: OffsetDateTime = row.get(1)?;
1126    let vu: OffsetDateTime = row.get(2)?;
1127    let d_signed: String = row.get(3)?;
1128    let d_all: String = row.get(4)?;
1129    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
1130        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
1131    Ok(ConsensusMeta::new(
1132        lifetime,
1133        digest_from_hex(&d_signed)?,
1134        digest_from_dstr(&d_all)?,
1135    ))
1136}
1137
1138/// Set up the tables for the arti cache schema in a sqlite database.
1139const INSTALL_V0_SCHEMA: &str = "
1140  -- Helps us version the schema.  The schema here corresponds to a
1141  -- version number called 'version', and it should be readable by
1142  -- anybody who is compliant with versions of at least 'readable_by'.
1143  CREATE TABLE TorSchemaMeta (
1144     name TEXT NOT NULL PRIMARY KEY,
1145     version INTEGER NOT NULL,
1146     readable_by INTEGER NOT NULL
1147  );
1148
1149  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
1150
1151  -- Keeps track of external blobs on disk.
1152  CREATE TABLE ExtDocs (
1153    -- Records a digest of the file contents, in the form '<digest_type>-hexstr'
1154    digest TEXT PRIMARY KEY NOT NULL,
1155    -- When was this file created?
1156    created DATE NOT NULL,
1157    -- After what time will this file definitely be useless?
1158    expires DATE NOT NULL,
1159    -- What is the type of this file? Currently supported are 'con_<flavor>'.
1160    --   (Before tor-dirmgr ~0.28.0, we would erroneously record 'con_flavor' as 'sha3-256';
1161    --   Nothing depended on this yet, but will be used in the future
1162    --   as we add more large-document types.)
1163    type TEXT NOT NULL,
1164    -- Filename for this file within our blob directory.
1165    filename TEXT NOT NULL
1166  );
1167
1168  -- All the microdescriptors we know about.
1169  CREATE TABLE Microdescs (
1170    sha256_digest TEXT PRIMARY KEY NOT NULL,
1171    last_listed DATE NOT NULL,
1172    contents BLOB NOT NULL
1173  );
1174
1175  -- All the authority certificates we know.
1176  CREATE TABLE Authcerts (
1177    id_digest TEXT NOT NULL,
1178    sk_digest TEXT NOT NULL,
1179    published DATE NOT NULL,
1180    expires DATE NOT NULL,
1181    contents BLOB NOT NULL,
1182    PRIMARY KEY (id_digest, sk_digest)
1183  );
1184
1185  -- All the consensuses we're storing.
1186  CREATE TABLE Consensuses (
1187    valid_after DATE NOT NULL,
1188    fresh_until DATE NOT NULL,
1189    valid_until DATE NOT NULL,
1190    flavor TEXT NOT NULL,
1191    pending BOOLEAN NOT NULL,
1192    sha3_of_signed_part TEXT NOT NULL,
1193    digest TEXT NOT NULL,
1194    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
1195  );
1196  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
1197
1198";
1199
1200/// Update the database schema, from each version to the next
1201const UPDATE_SCHEMA: &[&str] = &["
1202  -- Update the database schema from version 0 to version 1.
1203  CREATE TABLE RouterDescs (
1204    sha1_digest TEXT PRIMARY KEY NOT NULL,
1205    published DATE NOT NULL,
1206    contents BLOB NOT NULL
1207  );
1208","
1209  -- Update the database schema from version 1 to version 2.
1210  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
1211  CREATE TABLE BridgeDescs (
1212    bridge_line TEXT PRIMARY KEY NOT NULL,
1213    fetched DATE NOT NULL,
1214    until DATE NOT NULL,
1215    contents BLOB NOT NULL
1216  );
1217","
1218 -- Update the database schema from version 2 to version 3.
1219
1220 -- Table to hold our latest ProtocolStatuses object, to tell us if we're obsolete.
1221 -- We hold this independently from our consensus,
1222 -- since we want to read it very early in our startup process,
1223 -- even if the consensus is expired.
1224 CREATE TABLE ProtocolStatus (
1225    -- Enforce that there is only one row in this table.
1226    -- (This is a bit kludgy, but I am assured that it is a common practice.)
1227    zero INTEGER PRIMARY KEY NOT NULL,
1228    -- valid-after date of the consensus from which we got this status
1229    date DATE NOT NULL,
1230    -- ProtoStatuses object, encoded as json
1231    statuses TEXT NOT NULL
1232 );
1233"];
1234
1235/// Update the database schema version tracking, from each version to the next
1236const UPDATE_SCHEMA_VERSION: &str = "
1237  UPDATE TorSchemaMeta SET version=? WHERE version<?;
1238";
1239
1240/// Version number used for this version of the arti cache schema.
1241const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
1242
1243/// Query: find the latest-expiring microdesc consensus with a given
1244/// pending status.
1245const FIND_CONSENSUS_P: &str = "
1246  SELECT valid_after, valid_until, filename
1247  FROM Consensuses
1248  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
1249  WHERE pending = ? AND flavor = ?
1250  ORDER BY valid_until DESC
1251  LIMIT 1;
1252";
1253
1254/// Query: find the latest-expiring microdesc consensus, regardless of
1255/// pending status.
1256const FIND_CONSENSUS: &str = "
1257  SELECT valid_after, valid_until, filename
1258  FROM Consensuses
1259  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
1260  WHERE flavor = ?
1261  ORDER BY valid_until DESC
1262  LIMIT 1;
1263";
1264
1265/// Query: Find the valid-after time for the latest-expiring
1266/// non-pending consensus of a given flavor.
1267const FIND_LATEST_CONSENSUS_META: &str = "
1268  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
1269  FROM Consensuses
1270  WHERE pending = 0 AND flavor = ?
1271  ORDER BY valid_until DESC
1272  LIMIT 1;
1273";
1274
1275/// Look up a consensus by its digest-of-signed-part string.
1276const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
1277  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
1278  FROM Consensuses
1279  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
1280  WHERE Consensuses.sha3_of_signed_part = ?
1281  LIMIT 1;
1282";
1283
1284/// Query: Update the consensus whose digest field is 'digest' to call it
1285/// no longer pending.
1286const MARK_CONSENSUS_NON_PENDING: &str = "
1287  UPDATE Consensuses
1288  SET pending = 0
1289  WHERE digest = ?;
1290";
1291
1292/// Query: Remove the consensus with a given digest field.
1293#[allow(dead_code)]
1294const REMOVE_CONSENSUS: &str = "
1295  DELETE FROM Consensuses
1296  WHERE digest = ?;
1297";
1298
1299/// Query: Find the authority certificate with given key digests.
1300const FIND_AUTHCERT: &str = "
1301  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
1302";
1303
1304/// Query: find the microdescriptor with a given hex-encoded sha256 digest
1305const FIND_MD: &str = "
1306  SELECT contents
1307  FROM Microdescs
1308  WHERE sha256_digest = ?
1309";
1310
1311/// Query: find the router descriptors with a given hex-encoded sha1 digest
1312#[cfg(feature = "routerdesc")]
1313const FIND_RD: &str = "
1314  SELECT contents
1315  FROM RouterDescs
1316  WHERE sha1_digest = ?
1317";
1318
1319/// Query: find every ExtDocs member that has expired.
1320const FIND_EXPIRED_EXTDOCS: &str = "
1321  SELECT filename FROM ExtDocs where expires < datetime('now');
1322";
1323
1324/// Query: find whether an ExtDoc is listed.
1325const COUNT_EXTDOC_BY_PATH: &str = "
1326  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
1327";
1328
1329/// Query: Add a new entry to ExtDocs.
1330const INSERT_EXTDOC: &str = "
1331  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
1332  VALUES ( ?, datetime('now'), ?, ?, ? );
1333";
1334
1335/// Query: Add a new consensus.
1336const INSERT_CONSENSUS: &str = "
1337  INSERT OR REPLACE INTO Consensuses
1338    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
1339  VALUES ( ?, ?, ?, ?, ?, ?, ? );
1340";
1341
1342/// Query: Add a new AuthCert
1343const INSERT_AUTHCERT: &str = "
1344  INSERT OR REPLACE INTO Authcerts
1345    ( id_digest, sk_digest, published, expires, contents)
1346  VALUES ( ?, ?, ?, ?, ? );
1347";
1348
1349/// Query: Add a new microdescriptor
1350const INSERT_MD: &str = "
1351  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
1352  VALUES ( ?, ?, ? );
1353";
1354
1355/// Query: Add a new router descriptor
1356#[allow(unused)]
1357#[cfg(feature = "routerdesc")]
1358const INSERT_RD: &str = "
1359  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
1360  VALUES ( ?, ?, ? );
1361";
1362
1363/// Query: Change the time when a given microdescriptor was last listed.
1364const UPDATE_MD_LISTED: &str = "
1365  UPDATE Microdescs
1366  SET last_listed = max(last_listed, ?)
1367  WHERE sha256_digest = ?;
1368";
1369
1370/// Query: Find a cached bridge descriptor
1371#[cfg(feature = "bridge-client")]
1372const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
1373/// Query: Record a cached bridge descriptor
1374#[cfg(feature = "bridge-client")]
1375const INSERT_BRIDGEDESC: &str = "
1376  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
1377  VALUES ( ?, ?, ?, ? );
1378";
1379/// Query: Remove a cached bridge descriptor
1380#[cfg(feature = "bridge-client")]
1381#[allow(dead_code)]
1382const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
1383
1384/// Query: Find all consensus extdocs that are not referenced in the consensus table.
1385///
1386/// Note: use of `sha3-256` is a synonym for `con_%` is a workaround.
1387const FIND_UNREFERENCED_CONSENSUS_EXTDOCS: &str = "
1388    SELECT filename FROM ExtDocs WHERE
1389         (type LIKE 'con_%' OR type = 'sha3-256')
1390    AND NOT EXISTS
1391         (SELECT digest FROM Consensuses WHERE Consensuses.digest = ExtDocs.digest);";
1392
1393/// Query: Discard every expired extdoc.
1394///
1395/// External documents aren't exposed through [`Store`].
1396const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
1397
1398/// Query: Discard an extdoc with a given path.
1399const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
1400
1401/// Query: List all extdoc filenames.
1402const FIND_ALL_EXTDOC_FILENAMES: &str = "SELECT filename FROM ExtDocs;";
1403
1404/// Query: Get the latest protocol status.
1405const FIND_LATEST_PROTOCOL_STATUS: &str = "SELECT date, statuses FROM ProtocolStatus WHERE zero=0;";
1406/// Query: Update the latest protocol status.
1407const UPDATE_PROTOCOL_STATUS: &str = "INSERT OR REPLACE INTO ProtocolStatus VALUES ( 0, ?, ? );";
1408
1409/// Query: Discard every router descriptor that hasn't been listed for 3
1410/// months.
1411// TODO: Choose a more realistic time.
1412const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
1413/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
1414// TODO: Choose a more realistic time.
1415const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
1416/// Query: Discard every expired authority certificate.
1417const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
1418/// Query: Discard every consensus that's been expired for at least
1419/// two days.
1420const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
1421/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
1422#[cfg(feature = "bridge-client")]
1423const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
1424
1425#[cfg(test)]
1426pub(crate) mod test {
1427    #![allow(clippy::unwrap_used)]
1428    use super::*;
1429    use crate::storage::EXPIRATION_DEFAULTS;
1430    use digest::Digest;
1431    use hex_literal::hex;
1432    use tempfile::{tempdir, TempDir};
1433    use time::ext::NumericalDuration;
1434    use tor_llcrypto::d::Sha3_256;
1435
1436    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
1437        let tmp_dir = tempdir().unwrap();
1438        let sql_path = tmp_dir.path().join("db.sql");
1439        let conn = rusqlite::Connection::open(sql_path)?;
1440        let blob_path = tmp_dir.path().join("blobs");
1441        let blob_dir = fs_mistrust::Mistrust::builder()
1442            .dangerously_trust_everyone()
1443            .build()
1444            .unwrap()
1445            .verifier()
1446            .make_secure_dir(blob_path)
1447            .unwrap();
1448        let store = SqliteStore::from_conn(conn, blob_dir)?;
1449
1450        Ok((tmp_dir, store))
1451    }
1452
1453    #[test]
1454    fn init() -> Result<()> {
1455        let tmp_dir = tempdir().unwrap();
1456        let blob_dir = fs_mistrust::Mistrust::builder()
1457            .dangerously_trust_everyone()
1458            .build()
1459            .unwrap()
1460            .verifier()
1461            .secure_dir(&tmp_dir)
1462            .unwrap();
1463        let sql_path = tmp_dir.path().join("db.sql");
1464        // Initial setup: everything should work.
1465        {
1466            let conn = rusqlite::Connection::open(&sql_path)?;
1467            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1468        }
1469        // Second setup: shouldn't need to upgrade.
1470        {
1471            let conn = rusqlite::Connection::open(&sql_path)?;
1472            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1473        }
1474        // Third setup: shouldn't need to upgrade.
1475        {
1476            let conn = rusqlite::Connection::open(&sql_path)?;
1477            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
1478            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1479        }
1480        // Fourth: this says we can't read it, so we'll get an error.
1481        {
1482            let conn = rusqlite::Connection::open(&sql_path)?;
1483            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
1484            let val = SqliteStore::from_conn(conn, blob_dir);
1485            assert!(val.is_err());
1486        }
1487        Ok(())
1488    }
1489
1490    #[test]
1491    fn bad_blob_fname() -> Result<()> {
1492        let (_tmp_dir, store) = new_empty()?;
1493
1494        assert!(store.blob_dir.join("abcd").is_ok());
1495        assert!(store.blob_dir.join("abcd..").is_ok());
1496        assert!(store.blob_dir.join("..abcd..").is_ok());
1497        assert!(store.blob_dir.join(".abcd").is_ok());
1498
1499        assert!(store.blob_dir.join("..").is_err());
1500        assert!(store.blob_dir.join("../abcd").is_err());
1501        assert!(store.blob_dir.join("/abcd").is_err());
1502
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn blobs() -> Result<()> {
1508        let (_tmp_dir, mut store) = new_empty()?;
1509
1510        let now = OffsetDateTime::now_utc();
1511        let one_week = 1.weeks();
1512
1513        let fname1 = store.save_blob(
1514            b"Hello world",
1515            "greeting",
1516            "sha1",
1517            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
1518            now + one_week,
1519        )?;
1520
1521        let fname2 = store.save_blob(
1522            b"Goodbye, dear friends",
1523            "greeting",
1524            "sha1",
1525            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
1526            now - one_week,
1527        )?;
1528
1529        assert_eq!(
1530            fname1,
1531            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
1532        );
1533        assert_eq!(
1534            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
1535            b"Hello world"
1536        );
1537        assert_eq!(
1538            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
1539            b"Goodbye, dear friends"
1540        );
1541
1542        let n: u32 = store
1543            .conn
1544            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1545        assert_eq!(n, 2);
1546
1547        let blob = store.read_blob(&fname2)?.unwrap();
1548        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
1549
1550        // Now expire: the second file should go away.
1551        store.expire_all(&EXPIRATION_DEFAULTS)?;
1552        assert_eq!(
1553            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
1554            b"Hello world"
1555        );
1556        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
1557        let n: u32 = store
1558            .conn
1559            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1560        assert_eq!(n, 1);
1561
1562        Ok(())
1563    }
1564
1565    #[test]
1566    fn consensus() -> Result<()> {
1567        use tor_netdoc::doc::netstatus;
1568
1569        let (_tmp_dir, mut store) = new_empty()?;
1570        let now = OffsetDateTime::now_utc();
1571        let one_hour = 1.hours();
1572
1573        assert_eq!(
1574            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1575            None
1576        );
1577
1578        let cmeta = ConsensusMeta::new(
1579            netstatus::Lifetime::new(
1580                now.into(),
1581                (now + one_hour).into(),
1582                SystemTime::from(now + one_hour * 2),
1583            )
1584            .unwrap(),
1585            [0xAB; 32],
1586            [0xBC; 32],
1587        );
1588
1589        store.store_consensus(
1590            &cmeta,
1591            ConsensusFlavor::Microdesc,
1592            true,
1593            "Pretend this is a consensus",
1594        )?;
1595
1596        {
1597            assert_eq!(
1598                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1599                None
1600            );
1601            let consensus = store
1602                .latest_consensus(ConsensusFlavor::Microdesc, None)?
1603                .unwrap();
1604            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1605            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
1606            assert!(consensus.is_none());
1607        }
1608
1609        store.mark_consensus_usable(&cmeta)?;
1610
1611        {
1612            assert_eq!(
1613                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1614                now.into()
1615            );
1616            let consensus = store
1617                .latest_consensus(ConsensusFlavor::Microdesc, None)?
1618                .unwrap();
1619            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1620            let consensus = store
1621                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
1622                .unwrap();
1623            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1624        }
1625
1626        {
1627            let consensus_text = store.consensus_by_meta(&cmeta)?;
1628            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
1629
1630            let (is, _cmeta2) = store
1631                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1632                .unwrap();
1633            assert_eq!(is.as_str()?, "Pretend this is a consensus");
1634
1635            let cmeta3 = ConsensusMeta::new(
1636                netstatus::Lifetime::new(
1637                    now.into(),
1638                    (now + one_hour).into(),
1639                    SystemTime::from(now + one_hour * 2),
1640                )
1641                .unwrap(),
1642                [0x99; 32],
1643                [0x99; 32],
1644            );
1645            assert!(store.consensus_by_meta(&cmeta3).is_err());
1646
1647            assert!(store
1648                .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
1649                .is_none());
1650        }
1651
1652        {
1653            assert!(store
1654                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1655                .is_some());
1656            store.delete_consensus(&cmeta)?;
1657            assert!(store
1658                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1659                .is_none());
1660        }
1661
1662        Ok(())
1663    }
1664
1665    #[test]
1666    fn authcerts() -> Result<()> {
1667        let (_tmp_dir, mut store) = new_empty()?;
1668        let now = OffsetDateTime::now_utc();
1669        let one_hour = 1.hours();
1670
1671        let keyids = AuthCertKeyIds {
1672            id_fingerprint: [3; 20].into(),
1673            sk_fingerprint: [4; 20].into(),
1674        };
1675        let keyids2 = AuthCertKeyIds {
1676            id_fingerprint: [4; 20].into(),
1677            sk_fingerprint: [3; 20].into(),
1678        };
1679
1680        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
1681
1682        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
1683
1684        let certs = store.authcerts(&[keyids, keyids2])?;
1685        assert_eq!(certs.len(), 1);
1686        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
1687
1688        Ok(())
1689    }
1690
1691    #[test]
1692    fn microdescs() -> Result<()> {
1693        let (_tmp_dir, mut store) = new_empty()?;
1694
1695        let now = OffsetDateTime::now_utc();
1696        let one_day = 1.days();
1697
1698        let d1 = [5_u8; 32];
1699        let d2 = [7; 32];
1700        let d3 = [42; 32];
1701        let d4 = [99; 32];
1702
1703        let long_ago: OffsetDateTime = now - one_day * 100;
1704        store.store_microdescs(
1705            &[
1706                ("Fake micro 1", &d1),
1707                ("Fake micro 2", &d2),
1708                ("Fake micro 3", &d3),
1709            ],
1710            long_ago.into(),
1711        )?;
1712
1713        store.update_microdescs_listed(&[d2], now.into())?;
1714
1715        let mds = store.microdescs(&[d2, d3, d4])?;
1716        assert_eq!(mds.len(), 2);
1717        assert_eq!(mds.get(&d1), None);
1718        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1719        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
1720        assert_eq!(mds.get(&d4), None);
1721
1722        // Now we'll expire.  that should drop everything but d2.
1723        store.expire_all(&EXPIRATION_DEFAULTS)?;
1724        let mds = store.microdescs(&[d2, d3, d4])?;
1725        assert_eq!(mds.len(), 1);
1726        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1727
1728        Ok(())
1729    }
1730
1731    #[test]
1732    #[cfg(feature = "routerdesc")]
1733    fn routerdescs() -> Result<()> {
1734        let (_tmp_dir, mut store) = new_empty()?;
1735
1736        let now = OffsetDateTime::now_utc();
1737        let one_day = 1.days();
1738        let long_ago: OffsetDateTime = now - one_day * 100;
1739        let recently = now - one_day;
1740
1741        let d1 = [5_u8; 20];
1742        let d2 = [7; 20];
1743        let d3 = [42; 20];
1744        let d4 = [99; 20];
1745
1746        store.store_routerdescs(&[
1747            ("Fake routerdesc 1", long_ago.into(), &d1),
1748            ("Fake routerdesc 2", recently.into(), &d2),
1749            ("Fake routerdesc 3", long_ago.into(), &d3),
1750        ])?;
1751
1752        let rds = store.routerdescs(&[d2, d3, d4])?;
1753        assert_eq!(rds.len(), 2);
1754        assert_eq!(rds.get(&d1), None);
1755        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1756        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
1757        assert_eq!(rds.get(&d4), None);
1758
1759        // Now we'll expire.  that should drop everything but d2.
1760        store.expire_all(&EXPIRATION_DEFAULTS)?;
1761        let rds = store.routerdescs(&[d2, d3, d4])?;
1762        assert_eq!(rds.len(), 1);
1763        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1764
1765        Ok(())
1766    }
1767
1768    #[test]
1769    fn from_path_rw() -> Result<()> {
1770        let tmp = tempdir().unwrap();
1771        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
1772
1773        // Nothing there: can't open read-only
1774        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
1775        assert!(r.is_err());
1776        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
1777
1778        // Opening it read-write will crate the files
1779        {
1780            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
1781            assert!(tmp.path().join("dir_blobs").is_dir());
1782            assert!(store.lockfile.is_some());
1783            assert!(!store.is_readonly());
1784            assert!(store.upgrade_to_readwrite()?); // no-op.
1785        }
1786
1787        // At this point, we can successfully make a read-only connection.
1788        {
1789            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
1790            assert!(store2.is_readonly());
1791
1792            // Nobody else is locking this, so we can upgrade.
1793            assert!(store2.upgrade_to_readwrite()?); // no-op.
1794            assert!(!store2.is_readonly());
1795        }
1796        Ok(())
1797    }
1798
1799    #[test]
1800    fn orphaned_blobs() -> Result<()> {
1801        let (_tmp_dir, mut store) = new_empty()?;
1802        /*
1803        for ent in store.blob_dir.read_directory(".")?.flatten() {
1804            println!("{:?}", ent);
1805        }
1806        */
1807        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
1808
1809        let now = OffsetDateTime::now_utc();
1810        let one_week = 1.weeks();
1811        let _fname_good = store.save_blob(
1812            b"Goodbye, dear friends",
1813            "greeting",
1814            "sha1",
1815            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
1816            now + one_week,
1817        )?;
1818        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
1819
1820        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
1821        // timestamp.
1822        store
1823            .blob_dir
1824            .write_and_replace("fairly_new", b"new contents will stay")?;
1825        store
1826            .blob_dir
1827            .write_and_replace("fairly_old", b"old contents will be removed")?;
1828        filetime::set_file_mtime(
1829            store.blob_dir.join("fairly_old")?,
1830            SystemTime::from(now - one_week).into(),
1831        )
1832        .expect("Can't adjust mtime");
1833
1834        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
1835
1836        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
1837        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
1838
1839        Ok(())
1840    }
1841
1842    #[test]
1843    fn unreferenced_consensus_blob() -> Result<()> {
1844        let (_tmp_dir, mut store) = new_empty()?;
1845
1846        let now = OffsetDateTime::now_utc();
1847        let one_week = 1.weeks();
1848
1849        // Make a blob that claims to be a consensus, and which has not yet expired, but which is
1850        // not listed in the consensus table.  It should get removed.
1851        let fname = store.save_blob(
1852            b"pretend this is a consensus",
1853            "con_fake",
1854            "sha1",
1855            &hex!("803e5a45eea7766a62a735e051a25a50ffb9b1cf"),
1856            now + one_week,
1857        )?;
1858
1859        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
1860        assert_eq!(
1861            &std::fs::read(store.blob_dir.join(&fname)?).unwrap()[..],
1862            b"pretend this is a consensus"
1863        );
1864        let n: u32 = store
1865            .conn
1866            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1867        assert_eq!(n, 1);
1868
1869        store.expire_all(&EXPIRATION_DEFAULTS)?;
1870        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
1871
1872        let n: u32 = store
1873            .conn
1874            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1875        assert_eq!(n, 0);
1876
1877        Ok(())
1878    }
1879
1880    #[test]
1881    fn vanished_blob_cleanup() -> Result<()> {
1882        let (_tmp_dir, mut store) = new_empty()?;
1883
1884        let now = OffsetDateTime::now_utc();
1885        let one_week = 1.weeks();
1886
1887        // Make a few blobs.
1888        let mut fnames = vec![];
1889        for idx in 0..8 {
1890            let content = format!("Example {idx}");
1891            let digest = Sha3_256::digest(content.as_bytes());
1892            let fname = store.save_blob(
1893                content.as_bytes(),
1894                "blob",
1895                "sha3-256",
1896                digest.as_slice(),
1897                now + one_week,
1898            )?;
1899            fnames.push(fname);
1900        }
1901
1902        // Delete the odd-numbered blobs.
1903        store.blob_dir.remove_file(&fnames[1])?;
1904        store.blob_dir.remove_file(&fnames[3])?;
1905        store.blob_dir.remove_file(&fnames[5])?;
1906        store.blob_dir.remove_file(&fnames[7])?;
1907
1908        let n_removed = {
1909            let tx = store.conn.transaction()?;
1910            let n = SqliteStore::remove_entries_for_vanished_blobs(&store.blob_dir, &tx)?;
1911            tx.commit()?;
1912            n
1913        };
1914        assert_eq!(n_removed, 4);
1915
1916        // Make sure that it was the _odd-numbered_ ones that got deleted from the DB.
1917        let (n_1,): (u32,) =
1918            store
1919                .conn
1920                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[1]], |row| {
1921                    row.try_into()
1922                })?;
1923        let (n_2,): (u32,) =
1924            store
1925                .conn
1926                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[2]], |row| {
1927                    row.try_into()
1928                })?;
1929        assert_eq!(n_1, 0);
1930        assert_eq!(n_2, 1);
1931        Ok(())
1932    }
1933
1934    #[test]
1935    fn protocol_statuses() -> Result<()> {
1936        let (_tmp_dir, mut store) = new_empty()?;
1937
1938        let now = SystemTime::now();
1939        let hour = 1.hours();
1940
1941        let valid_after = now;
1942        let protocols = serde_json::from_str(
1943            r#"{
1944            "client":{
1945                "required":"Link=5 LinkAuth=3",
1946                "recommended":"Link=1-5 LinkAuth=2-5"
1947            },
1948            "relay":{
1949                "required":"Wombat=20-22 Knish=25-27",
1950                "recommended":"Wombat=20-30 Knish=20-30"
1951            }
1952            }"#,
1953        )
1954        .unwrap();
1955
1956        let v = store.cached_protocol_recommendations()?;
1957        assert!(v.is_none());
1958
1959        store.update_protocol_recommendations(valid_after, &protocols)?;
1960        let v = store.cached_protocol_recommendations()?.unwrap();
1961        assert_eq!(v.0, now);
1962        assert_eq!(
1963            serde_json::to_string(&protocols).unwrap(),
1964            serde_json::to_string(&v.1).unwrap()
1965        );
1966
1967        let protocols2 = serde_json::from_str(
1968            r#"{
1969            "client":{
1970                "required":"Link=5 ",
1971                "recommended":"Link=1-5"
1972            },
1973            "relay":{
1974                "required":"Wombat=20",
1975                "recommended":"Cons=6"
1976            }
1977            }"#,
1978        )
1979        .unwrap();
1980
1981        let valid_after_2 = now + hour;
1982        store.update_protocol_recommendations(valid_after_2, &protocols2)?;
1983
1984        let v = store.cached_protocol_recommendations()?.unwrap();
1985        assert_eq!(v.0, now + hour);
1986        assert_eq!(
1987            serde_json::to_string(&protocols2).unwrap(),
1988            serde_json::to_string(&v.1).unwrap()
1989        );
1990
1991        Ok(())
1992    }
1993}