tor_config/
file_watcher.rs

1//! Code to watch configuration files for any changes.
2//!
3// TODO: perhaps this shouldn't live in tor-config? But it doesn't seem substantial enough to have
4// its own crate, and it can't live in e.g. tor-basic-utils, because it depends on tor-rtcompat.
5
6use std::collections::hash_map::Entry;
7use std::collections::{HashMap, HashSet};
8use std::io;
9use std::marker::PhantomData;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14
15use tor_rtcompat::Runtime;
16
17use amplify::Getters;
18use notify::{EventKind, Watcher};
19use postage::watch;
20
21use futures::{Stream, StreamExt as _};
22
23/// `Result` whose `Err` is [`FileWatcherBuildError`].
24pub type Result<T> = std::result::Result<T, FileWatcherBuildError>;
25
26cfg_if::cfg_if! {
27    if #[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))] {
28        /// The concrete type of the underlying watcher.
29        type NotifyWatcher = notify::RecommendedWatcher;
30    } else {
31        /// The concrete type of the underlying watcher.
32        type NotifyWatcher = notify::PollWatcher;
33    }
34}
35
36/// A wrapper around a `notify::Watcher` to watch a set of parent
37/// directories in order to learn about changes in some specific files that they
38/// contain.
39///
40/// The `Watcher` implementation in `notify` has a weakness: it gives sensible
41/// results when you're watching directories, but if you start watching
42/// non-directory files, it won't notice when those files get replaced.  That's
43/// a problem for users who want to change their configuration atomically by
44/// making new files and then moving them into place over the old ones.
45///
46/// For more background on the issues with `notify`, see
47/// <https://github.com/notify-rs/notify/issues/165> and
48/// <https://github.com/notify-rs/notify/pull/166>.
49///
50/// ## Limitations
51///
52/// On backends using kqueue, this uses a polling watcher
53/// to work around a bug in the `notify` crate[^1].
54/// This introduces a perceivable delay,
55/// and can be very expensive for large file trees.
56///
57/// [^1]: See <https://github.com/notify-rs/notify/issues/644>
58#[derive(Getters)]
59pub struct FileWatcher {
60    /// An underlying `notify` watcher that tells us about directory changes.
61    // this field is kept only so the watcher is not dropped
62    #[getter(skip)]
63    _watcher: NotifyWatcher,
64    /// The list of directories that we're currently watching.
65    watching_dirs: HashSet<PathBuf>,
66}
67
68impl FileWatcher {
69    /// Create a `FileWatcherBuilder`
70    pub fn builder<R: Runtime>(runtime: R) -> FileWatcherBuilder<R> {
71        FileWatcherBuilder::new(runtime)
72    }
73}
74
75/// Event possibly triggering a configuration reload
76//
77// WARNING!
78//
79// Simply adding new, more specific, events, to this struct, would be wrong.
80// This is because internally, we transmit the events via a postage::watch,
81// which means that receivers might not receive all events!
82#[derive(Debug, Clone, PartialEq)]
83#[non_exhaustive]
84pub enum Event {
85    /// Some files may have been modified.
86    ///
87    /// This is semantically equivalent to `Rescan`, since in neither case
88    /// do we say *which* files may have been changed.
89    FileChanged,
90    /// Some filesystem events may have been missed.
91    Rescan,
92}
93
94/// Builder used to configure a [`FileWatcher`] before it starts watching for changes.
95pub struct FileWatcherBuilder<R: Runtime> {
96    /// The runtime.  We used to use this.
97    ///
98    /// TODO get rid of this, but after we decide whether to keep using postage::watch.
99    /// See the Warning note on Event.
100    #[allow(dead_code)]
101    runtime: PhantomData<R>,
102    /// The list of directories that we're currently watching.
103    ///
104    /// Each directory has a set of filters that indicates whether a given notify::Event
105    /// is relevant or not.
106    watching_dirs: HashMap<PathBuf, HashSet<DirEventFilter>>,
107}
108
109/// A filter for deciding what to do with a notify::Event pertaining
110/// to files that are relative to one of the directories we are watching.
111///
112// Private, as this is an implementation detail.
113// If/when we decide to make this public, this might need revisiting.
114#[derive(Clone, Debug, Hash, PartialEq, Eq)]
115enum DirEventFilter {
116    /// Notify the caller about the event, if the file has the specified extension.
117    MatchesExtension(String),
118    /// Notify the caller about the event, if the file has the specified path.
119    MatchesPath(PathBuf),
120}
121
122impl DirEventFilter {
123    /// Check whether this filter accepts `path`.
124    fn accepts_path(&self, path: &Path) -> bool {
125        match self {
126            DirEventFilter::MatchesExtension(ext) => path
127                .extension()
128                .and_then(|ext| ext.to_str())
129                .map(|e| e == ext.as_str())
130                .unwrap_or_default(),
131            DirEventFilter::MatchesPath(p) => p == path,
132        }
133    }
134}
135
136impl<R: Runtime> FileWatcherBuilder<R> {
137    /// Create a `FileWatcherBuilder`
138    pub fn new(_runtime: R) -> Self {
139        FileWatcherBuilder {
140            runtime: PhantomData,
141            watching_dirs: HashMap::new(),
142        }
143    }
144
145    /// Add a single path to the list of things to watch.
146    ///
147    /// The event receiver will be notified if the path is created, modified, renamed, or removed.
148    ///
149    /// If the path is a directory, its contents will **not** be watched.
150    /// To watch the contents of a directory, use [`watch_dir`](FileWatcherBuilder::watch_dir).
151    ///
152    /// Idempotent: does nothing if we're already watching that path.
153    pub fn watch_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
154        self.watch_just_parents(path.as_ref())?;
155        Ok(())
156    }
157
158    /// Add a directory (but not any subdirs) to the list of things to watch.
159    ///
160    /// The event receiver will be notified whenever a file with the specified `extension`
161    /// is created within this directory, or if an existing file with this extension
162    /// is modified, renamed, or removed.
163    /// Changes to files that have a different extension are ignored.
164    ///
165    /// Idempotent.
166    pub fn watch_dir<P: AsRef<Path>, S: AsRef<str>>(
167        &mut self,
168        path: P,
169        extension: S,
170    ) -> Result<()> {
171        let path = self.watch_just_parents(path.as_ref())?;
172        self.watch_just_abs_dir(
173            &path,
174            DirEventFilter::MatchesExtension(extension.as_ref().into()),
175        );
176        Ok(())
177    }
178
179    /// Add the parents of `path` to the list of things to watch.
180    ///
181    /// Returns the absolute path of `path`.
182    ///
183    /// Idempotent.
184    fn watch_just_parents(&mut self, path: &Path) -> Result<PathBuf> {
185        // Make the path absolute (without necessarily making it canonical).
186        //
187        // We do this because `notify` reports all of its events in terms of
188        // absolute paths, so if we were to tell it to watch a directory by its
189        // relative path, we'd get reports about the absolute paths of the files
190        // in that directory.
191        let cwd = std::env::current_dir()
192            .map_err(|e| FileWatcherBuildError::CurrentDirectory(Arc::new(e)))?;
193        let path = cwd.join(path);
194        debug_assert!(path.is_absolute());
195
196        // See what directory we should watch in order to watch this file.
197        let watch_target = match path.parent() {
198            // The file has a parent, so watch that.
199            Some(parent) => parent,
200            // The file has no parent.  Given that it's absolute, that means
201            // that we're looking at the root directory.  There's nowhere to go
202            // "up" from there.
203            None => path.as_ref(),
204        };
205
206        // Note this file as one that we're watching, so that we can see changes
207        // to it later on.
208        self.watch_just_abs_dir(watch_target, DirEventFilter::MatchesPath(path.clone()));
209
210        Ok(path)
211    }
212
213    /// Add just this (absolute) directory to the list of things to watch.
214    ///
215    /// Does not watch any of the parents.
216    ///
217    /// Idempotent.
218    fn watch_just_abs_dir(&mut self, watch_target: &Path, filter: DirEventFilter) {
219        match self.watching_dirs.entry(watch_target.to_path_buf()) {
220            Entry::Occupied(mut o) => {
221                let _: bool = o.get_mut().insert(filter);
222            }
223            Entry::Vacant(v) => {
224                let _ = v.insert(HashSet::from([filter]));
225            }
226        }
227    }
228
229    /// Build a `FileWatcher` and start sending events to `tx`.
230    ///
231    /// On startup, the watcher sends a [`Rescan`](Event::Rescan) event.
232    /// This helps mitigate the event loss that occurs if the watched files are modified between
233    /// the time they are initially loaded and the time when the watcher is set up.
234    pub fn start_watching(self, tx: FileEventSender) -> Result<FileWatcher> {
235        let watching_dirs = self.watching_dirs.clone();
236        let event_sender = move |event: notify::Result<notify::Event>| {
237            let event = handle_event(event, &watching_dirs);
238            if let Some(event) = event {
239                // NB!  This can lose events!  See the internal warning comment on `Event`
240                *tx.0.lock().expect("poisoned").borrow_mut() = event;
241            }
242        };
243
244        cfg_if::cfg_if! {
245            if #[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))] {
246                let config = notify::Config::default();
247            } else {
248                /// The polling frequency, for use with the `PollWatcher`.
249                #[cfg(not(any(test, feature = "testing")))]
250                const WATCHER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
251
252                #[cfg(any(test, feature = "testing"))]
253                const WATCHER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
254
255                let config = notify::Config::default()
256                    .with_poll_interval(WATCHER_POLL_INTERVAL);
257
258                // When testing, compare the contents of the files too, not just their mtime
259                // Otherwise, because the polling backend detects changes based on mtime,
260                // if the test creates/writes files too fast,
261                // it will fail to notice changes (this can happen, for example, on a tmpfs).
262                #[cfg(any(test, feature = "testing"))]
263                let config = config.with_compare_contents(true);
264            }
265        }
266
267        let mut watcher = NotifyWatcher::new(event_sender, config).map_err(Arc::new)?;
268
269        let watching_dirs: HashSet<_> = self.watching_dirs.keys().cloned().collect();
270        for dir in &watching_dirs {
271            watcher
272                .watch(dir, notify::RecursiveMode::NonRecursive)
273                .map_err(Arc::new)?;
274        }
275
276        Ok(FileWatcher {
277            _watcher: watcher,
278            watching_dirs,
279        })
280    }
281}
282
283/// Map a `notify` event to the [`Event`] type returned by [`FileWatcher`].
284fn handle_event(
285    event: notify::Result<notify::Event>,
286    watching_dirs: &HashMap<PathBuf, HashSet<DirEventFilter>>,
287) -> Option<Event> {
288    let watching = |f: &PathBuf| {
289        // For paths with no parent (i.e. root), the watcher is added for the path itself,
290        // so we do the same here.
291        let parent = f.parent().unwrap_or_else(|| f.as_ref());
292
293        // Find the filters that apply to this directory
294        match watching_dirs
295            .iter()
296            .find_map(|(dir, filters)| (dir == parent).then_some(filters))
297        {
298            Some(filters) => {
299                // This event is interesting, if any of the filters apply.
300                filters.iter().any(|filter| filter.accepts_path(f.as_ref()))
301            }
302            None => false,
303        }
304    };
305
306    // filter events we don't want and map to event code
307    match event {
308        Ok(event) => {
309            if event.need_rescan() {
310                Some(Event::Rescan)
311            } else if ignore_event_kind(&event.kind) {
312                None
313            } else if event.paths.iter().any(watching) {
314                Some(Event::FileChanged)
315            } else {
316                None
317            }
318        }
319        Err(error) => {
320            if error.paths.iter().any(watching) {
321                Some(Event::FileChanged)
322            } else {
323                None
324            }
325        }
326    }
327}
328
329/// Check whether this is a kind of [`notify::Event`] that we want to ignore.
330///
331/// Returns `true` for
332///   * events that trigger on non-mutating file accesses
333///   * catch-all events (used by `notify` for unsupported/unknown events)
334///   * "other" meta-events
335fn ignore_event_kind(kind: &EventKind) -> bool {
336    use EventKind::*;
337    matches!(kind, Access(_) | Any | Other)
338}
339
340/// The sender half of a watch channel used by a [`FileWatcher`] for sending [`Event`]s.
341///
342/// For use with [`FileWatcherBuilder::start_watching`].
343///
344/// **Important**: to avoid contention, avoid sharing clones of the same `FileEventSender`
345/// with multiple [`FileWatcherBuilder`]s. This type is [`Clone`] to support creating new
346/// [`FileWatcher`]s from an existing [`channel`], which enables existing receivers to receive
347/// events from new `FileWatcher`s (any old `FileWatcher`s are supposed to be discarded).
348#[derive(Clone)]
349pub struct FileEventSender(Arc<Mutex<watch::Sender<Event>>>);
350
351/// The receiver half of a watch channel used for receiving [`Event`]s sent by a [`FileWatcher`].
352#[derive(Clone)]
353pub struct FileEventReceiver(watch::Receiver<Event>);
354
355impl Stream for FileEventReceiver {
356    type Item = Event;
357
358    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359        self.0.poll_next_unpin(cx)
360    }
361}
362
363impl FileEventReceiver {
364    /// Try to read a message from the stream, without blocking.
365    ///
366    /// Returns `Some` if a message is ready.
367    /// Returns `None` if the stream is open, but no messages are available,
368    /// or if the stream is closed.
369    pub fn try_recv(&mut self) -> Option<Event> {
370        use postage::prelude::Stream;
371
372        self.0.try_recv().ok()
373    }
374}
375
376/// Create a new channel for use with a [`FileWatcher`].
377//
378// Note: the [`FileEventSender`] and [`FileEventReceiver`]  wrappers exist
379// so we don't expose the channel's underlying type
380// in our public API.
381pub fn channel() -> (FileEventSender, FileEventReceiver) {
382    let (tx, rx) = watch::channel_with(Event::Rescan);
383    (
384        FileEventSender(Arc::new(Mutex::new(tx))),
385        FileEventReceiver(rx),
386    )
387}
388
389/// An error coming from a [`FileWatcherBuilder`].
390#[derive(Debug, Clone, thiserror::Error)]
391#[non_exhaustive]
392pub enum FileWatcherBuildError {
393    /// Invalid current working directory.
394    ///
395    /// This error can happen if the current directory does not exist,
396    /// or if we don't have the necessary permissions to access it.
397    #[error("Invalid current working directory")]
398    CurrentDirectory(#[source] Arc<io::Error>),
399
400    /// Encountered a problem while creating a `Watcher`.
401    #[error("Problem creating Watcher")]
402    Notify(#[from] Arc<notify::Error>),
403}
404
405#[cfg(test)]
406mod test {
407    // @@ begin test lint list maintained by maint/add_warning @@
408    #![allow(clippy::bool_assert_comparison)]
409    #![allow(clippy::clone_on_copy)]
410    #![allow(clippy::dbg_macro)]
411    #![allow(clippy::mixed_attributes_style)]
412    #![allow(clippy::print_stderr)]
413    #![allow(clippy::print_stdout)]
414    #![allow(clippy::single_char_pattern)]
415    #![allow(clippy::unwrap_used)]
416    #![allow(clippy::unchecked_duration_subtraction)]
417    #![allow(clippy::useless_vec)]
418    #![allow(clippy::needless_pass_by_value)]
419    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
420
421    use super::*;
422    use notify::event::{AccessKind, ModifyKind};
423    use test_temp_dir::{test_temp_dir, TestTempDir};
424
425    /// Write `data` to file `name` within `dir`.
426    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
427        let path = dir.as_path_untracked().join(name);
428        std::fs::write(&path, data).unwrap();
429        path
430    }
431
432    /// Return an event that has the Rescan flag set
433    fn rescan_event() -> notify::Event {
434        let event = notify::Event::new(notify::EventKind::Any);
435        event.set_flag(notify::event::Flag::Rescan)
436    }
437
438    /// Assert that at least one FileChanged event is received.
439    async fn assert_file_changed(rx: &mut FileEventReceiver) {
440        assert_eq!(rx.next().await, Some(Event::FileChanged));
441
442        // The write might trigger more than one event
443        while let Some(ev) = rx.try_recv() {
444            assert_eq!(ev, Event::FileChanged);
445        }
446    }
447
448    /// Set the `EventKind` of `event` to an uninteresting `EventKind`
449    /// and assert that it is ignored by `handle_event`.
450    fn assert_ignored(event: &notify::Event, watching: &HashMap<PathBuf, HashSet<DirEventFilter>>) {
451        for kind in [EventKind::Access(AccessKind::Any), EventKind::Other] {
452            let ignored_event = event.clone().set_kind(kind);
453            assert_eq!(handle_event(Ok(ignored_event.clone()), watching), None);
454            // ...but if the rescan flag is set, the event is *not* ignored
455            let event = ignored_event.set_flag(notify::event::Flag::Rescan);
456            assert_eq!(handle_event(Ok(event), watching), Some(Event::Rescan));
457        }
458    }
459
460    #[test]
461    fn notify_event_handler() {
462        let mut event = notify::Event::new(notify::EventKind::Modify(ModifyKind::Any));
463
464        let mut watching_dirs = Default::default();
465        assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
466        assert_eq!(
467            handle_event(Ok(rescan_event()), &watching_dirs),
468            Some(Event::Rescan)
469        );
470
471        // Watch some directories
472        watching_dirs.insert(
473            "/foo/baz".into(),
474            HashSet::from([DirEventFilter::MatchesExtension("auth".into())]),
475        );
476        assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
477        assert_eq!(
478            handle_event(Ok(rescan_event()), &watching_dirs),
479            Some(Event::Rescan)
480        );
481
482        event = event.add_path("/foo/bar/alice.authh".into());
483        assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
484
485        event = event.add_path("/foo/bar/alice.auth".into());
486        assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
487
488        event = event.add_path("/foo/baz/bob.auth".into());
489        assert_eq!(
490            handle_event(Ok(event.clone()), &watching_dirs),
491            Some(Event::FileChanged)
492        );
493
494        // The same event, but with an irrelevant kind, gets ignored:
495        assert_ignored(&event, &watching_dirs);
496
497        // Watch some files within /foo/bar
498        watching_dirs.insert(
499            "/foo/bar".into(),
500            HashSet::from([DirEventFilter::MatchesPath("/foo/bar/abc".into())]),
501        );
502
503        assert_eq!(
504            handle_event(Ok(event.clone()), &watching_dirs),
505            Some(Event::FileChanged)
506        );
507        assert_eq!(
508            handle_event(Ok(rescan_event()), &watching_dirs),
509            Some(Event::Rescan)
510        );
511
512        // The same event, but with an irrelevant kind, gets ignored:
513        assert_ignored(&event, &watching_dirs);
514
515        // Watch some other files
516        let event = notify::Event::new(notify::EventKind::Modify(ModifyKind::Any))
517            .add_path("/a/b/c/d".into());
518        let watching_dirs = [(
519            "/a/b/c/".into(),
520            HashSet::from([DirEventFilter::MatchesPath("/a/b/c/d".into())]),
521        )]
522        .into_iter()
523        .collect();
524        assert_eq!(
525            handle_event(Ok(event), &watching_dirs),
526            Some(Event::FileChanged)
527        );
528        assert_eq!(
529            handle_event(Ok(rescan_event()), &watching_dirs),
530            Some(Event::Rescan)
531        );
532
533        // Errors can also trigger an event
534        let err = notify::Error::path_not_found();
535        assert_eq!(handle_event(Err(err), &watching_dirs), None);
536        let mut err = notify::Error::path_not_found();
537        err = err.add_path("/a/b/c/d".into());
538        assert_eq!(
539            handle_event(Err(err), &watching_dirs),
540            Some(Event::FileChanged)
541        );
542    }
543
544    #[test]
545    fn watch_dirs() {
546        tor_rtcompat::test_with_one_runtime!(|rt| async move {
547            let temp_dir = test_temp_dir!();
548            let (tx, mut rx) = channel();
549            // Watch for changes in .foo files from temp_dir
550            let mut builder = FileWatcher::builder(rt.clone());
551            builder
552                .watch_dir(temp_dir.as_path_untracked(), "foo")
553                .unwrap();
554            let watcher = builder.start_watching(tx).unwrap();
555
556            // On startup, the watcher sends a Event::Rescan event.
557            // This is because the watcher is often set up after loading
558            // the files or directories it is watching.
559            assert_eq!(rx.try_recv(), Some(Event::Rescan));
560            assert_eq!(rx.try_recv(), None);
561
562            // Write a file with extension "foo".
563            write_file(&temp_dir, "bar.foo", b"hello");
564
565            assert_eq!(rx.next().await, Some(Event::FileChanged));
566
567            drop(watcher);
568            // The write might trigger more than one event
569            while let Some(ev) = rx.next().await {
570                assert_eq!(ev.clone(), Event::FileChanged);
571            }
572        });
573    }
574
575    #[test]
576    fn watch_file_path() {
577        tor_rtcompat::test_with_one_runtime!(|rt| async move {
578            let temp_dir = test_temp_dir!();
579            let (tx, mut rx) = channel();
580            // Watch for changes to hello.txt
581            let path = write_file(&temp_dir, "hello.txt", b"hello");
582            let mut builder = FileWatcher::builder(rt.clone());
583            builder.watch_path(&path).unwrap();
584            let _watcher = builder.start_watching(tx).unwrap();
585
586            // On startup, the watcher sends a Event::Rescan event.
587            assert_eq!(rx.try_recv(), Some(Event::Rescan));
588            assert_eq!(rx.try_recv(), None);
589
590            // Write to hello.txt
591            let _: PathBuf = write_file(&temp_dir, "hello.txt", b"good-bye");
592
593            assert_file_changed(&mut rx).await;
594
595            // Remove hello.txt
596            std::fs::remove_file(&path).unwrap();
597            assert_file_changed(&mut rx).await;
598
599            // Create a new file
600            let tmp_hello = write_file(&temp_dir, "hello.tmp", b"new hello");
601            // Copy it over to the watched hello.txt location
602            std::fs::rename(&tmp_hello, &path).unwrap();
603            assert_file_changed(&mut rx).await;
604        });
605    }
606
607    #[test]
608    fn watch_dir_path() {
609        tor_rtcompat::test_with_one_runtime!(|rt| async move {
610            let temp_dir1 = tempfile::TempDir::new().unwrap();
611            let (tx, mut rx) = channel();
612            // Watch temp_dir for changes
613            let mut builder = FileWatcher::builder(rt.clone());
614            builder.watch_path(temp_dir1.path()).unwrap();
615
616            let _watcher = builder.start_watching(tx).unwrap();
617
618            // On startup, the watcher sends a Event::Rescan event.
619            assert_eq!(rx.try_recv(), Some(Event::Rescan));
620            assert_eq!(rx.try_recv(), None);
621
622            // Writing a file to this directory shouldn't trigger an event
623            std::fs::write(temp_dir1.path().join("hello.txt"), b"hello").unwrap();
624            assert_eq!(rx.try_recv(), None);
625
626            // Move temp_dir1 to temp_dir2
627            let temp_dir2 = tempfile::TempDir::new().unwrap();
628            std::fs::rename(&temp_dir1, &temp_dir2).unwrap();
629
630            // Moving the directory triggers an event...
631            assert_file_changed(&mut rx).await;
632            // ...and so does moving it back to its original location
633            std::fs::rename(&temp_dir2, &temp_dir1).unwrap();
634            assert_file_changed(&mut rx).await;
635        });
636    }
637}