1use std::collections::hash_map::Entry;
7use std::collections::{HashMap, HashSet};
8use std::io;
9use std::marker::PhantomData;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14
15use tor_rtcompat::Runtime;
16
17use amplify::Getters;
18use notify::{EventKind, Watcher};
19use postage::watch;
20
21use futures::{Stream, StreamExt as _};
22
23pub type Result<T> = std::result::Result<T, FileWatcherBuildError>;
25
26cfg_if::cfg_if! {
27 if #[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))] {
28 type NotifyWatcher = notify::RecommendedWatcher;
30 } else {
31 type NotifyWatcher = notify::PollWatcher;
33 }
34}
35
36#[derive(Getters)]
59pub struct FileWatcher {
60 #[getter(skip)]
63 _watcher: NotifyWatcher,
64 watching_dirs: HashSet<PathBuf>,
66}
67
68impl FileWatcher {
69 pub fn builder<R: Runtime>(runtime: R) -> FileWatcherBuilder<R> {
71 FileWatcherBuilder::new(runtime)
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
83#[non_exhaustive]
84pub enum Event {
85 FileChanged,
90 Rescan,
92}
93
94pub struct FileWatcherBuilder<R: Runtime> {
96 #[allow(dead_code)]
101 runtime: PhantomData<R>,
102 watching_dirs: HashMap<PathBuf, HashSet<DirEventFilter>>,
107}
108
109#[derive(Clone, Debug, Hash, PartialEq, Eq)]
115enum DirEventFilter {
116 MatchesExtension(String),
118 MatchesPath(PathBuf),
120}
121
122impl DirEventFilter {
123 fn accepts_path(&self, path: &Path) -> bool {
125 match self {
126 DirEventFilter::MatchesExtension(ext) => path
127 .extension()
128 .and_then(|ext| ext.to_str())
129 .map(|e| e == ext.as_str())
130 .unwrap_or_default(),
131 DirEventFilter::MatchesPath(p) => p == path,
132 }
133 }
134}
135
136impl<R: Runtime> FileWatcherBuilder<R> {
137 pub fn new(_runtime: R) -> Self {
139 FileWatcherBuilder {
140 runtime: PhantomData,
141 watching_dirs: HashMap::new(),
142 }
143 }
144
145 pub fn watch_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
154 self.watch_just_parents(path.as_ref())?;
155 Ok(())
156 }
157
158 pub fn watch_dir<P: AsRef<Path>, S: AsRef<str>>(
167 &mut self,
168 path: P,
169 extension: S,
170 ) -> Result<()> {
171 let path = self.watch_just_parents(path.as_ref())?;
172 self.watch_just_abs_dir(
173 &path,
174 DirEventFilter::MatchesExtension(extension.as_ref().into()),
175 );
176 Ok(())
177 }
178
179 fn watch_just_parents(&mut self, path: &Path) -> Result<PathBuf> {
185 let cwd = std::env::current_dir()
192 .map_err(|e| FileWatcherBuildError::CurrentDirectory(Arc::new(e)))?;
193 let path = cwd.join(path);
194 debug_assert!(path.is_absolute());
195
196 let watch_target = match path.parent() {
198 Some(parent) => parent,
200 None => path.as_ref(),
204 };
205
206 self.watch_just_abs_dir(watch_target, DirEventFilter::MatchesPath(path.clone()));
209
210 Ok(path)
211 }
212
213 fn watch_just_abs_dir(&mut self, watch_target: &Path, filter: DirEventFilter) {
219 match self.watching_dirs.entry(watch_target.to_path_buf()) {
220 Entry::Occupied(mut o) => {
221 let _: bool = o.get_mut().insert(filter);
222 }
223 Entry::Vacant(v) => {
224 let _ = v.insert(HashSet::from([filter]));
225 }
226 }
227 }
228
229 pub fn start_watching(self, tx: FileEventSender) -> Result<FileWatcher> {
235 let watching_dirs = self.watching_dirs.clone();
236 let event_sender = move |event: notify::Result<notify::Event>| {
237 let event = handle_event(event, &watching_dirs);
238 if let Some(event) = event {
239 *tx.0.lock().expect("poisoned").borrow_mut() = event;
241 }
242 };
243
244 cfg_if::cfg_if! {
245 if #[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))] {
246 let config = notify::Config::default();
247 } else {
248 #[cfg(not(any(test, feature = "testing")))]
250 const WATCHER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
251
252 #[cfg(any(test, feature = "testing"))]
253 const WATCHER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
254
255 let config = notify::Config::default()
256 .with_poll_interval(WATCHER_POLL_INTERVAL);
257
258 #[cfg(any(test, feature = "testing"))]
263 let config = config.with_compare_contents(true);
264 }
265 }
266
267 let mut watcher = NotifyWatcher::new(event_sender, config).map_err(Arc::new)?;
268
269 let watching_dirs: HashSet<_> = self.watching_dirs.keys().cloned().collect();
270 for dir in &watching_dirs {
271 watcher
272 .watch(dir, notify::RecursiveMode::NonRecursive)
273 .map_err(Arc::new)?;
274 }
275
276 Ok(FileWatcher {
277 _watcher: watcher,
278 watching_dirs,
279 })
280 }
281}
282
283fn handle_event(
285 event: notify::Result<notify::Event>,
286 watching_dirs: &HashMap<PathBuf, HashSet<DirEventFilter>>,
287) -> Option<Event> {
288 let watching = |f: &PathBuf| {
289 let parent = f.parent().unwrap_or_else(|| f.as_ref());
292
293 match watching_dirs
295 .iter()
296 .find_map(|(dir, filters)| (dir == parent).then_some(filters))
297 {
298 Some(filters) => {
299 filters.iter().any(|filter| filter.accepts_path(f.as_ref()))
301 }
302 None => false,
303 }
304 };
305
306 match event {
308 Ok(event) => {
309 if event.need_rescan() {
310 Some(Event::Rescan)
311 } else if ignore_event_kind(&event.kind) {
312 None
313 } else if event.paths.iter().any(watching) {
314 Some(Event::FileChanged)
315 } else {
316 None
317 }
318 }
319 Err(error) => {
320 if error.paths.iter().any(watching) {
321 Some(Event::FileChanged)
322 } else {
323 None
324 }
325 }
326 }
327}
328
329fn ignore_event_kind(kind: &EventKind) -> bool {
336 use EventKind::*;
337 matches!(kind, Access(_) | Any | Other)
338}
339
340#[derive(Clone)]
349pub struct FileEventSender(Arc<Mutex<watch::Sender<Event>>>);
350
351#[derive(Clone)]
353pub struct FileEventReceiver(watch::Receiver<Event>);
354
355impl Stream for FileEventReceiver {
356 type Item = Event;
357
358 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359 self.0.poll_next_unpin(cx)
360 }
361}
362
363impl FileEventReceiver {
364 pub fn try_recv(&mut self) -> Option<Event> {
370 use postage::prelude::Stream;
371
372 self.0.try_recv().ok()
373 }
374}
375
376pub fn channel() -> (FileEventSender, FileEventReceiver) {
382 let (tx, rx) = watch::channel_with(Event::Rescan);
383 (
384 FileEventSender(Arc::new(Mutex::new(tx))),
385 FileEventReceiver(rx),
386 )
387}
388
389#[derive(Debug, Clone, thiserror::Error)]
391#[non_exhaustive]
392pub enum FileWatcherBuildError {
393 #[error("Invalid current working directory")]
398 CurrentDirectory(#[source] Arc<io::Error>),
399
400 #[error("Problem creating Watcher")]
402 Notify(#[from] Arc<notify::Error>),
403}
404
405#[cfg(test)]
406mod test {
407 #![allow(clippy::bool_assert_comparison)]
409 #![allow(clippy::clone_on_copy)]
410 #![allow(clippy::dbg_macro)]
411 #![allow(clippy::mixed_attributes_style)]
412 #![allow(clippy::print_stderr)]
413 #![allow(clippy::print_stdout)]
414 #![allow(clippy::single_char_pattern)]
415 #![allow(clippy::unwrap_used)]
416 #![allow(clippy::unchecked_duration_subtraction)]
417 #![allow(clippy::useless_vec)]
418 #![allow(clippy::needless_pass_by_value)]
419 use super::*;
422 use notify::event::{AccessKind, ModifyKind};
423 use test_temp_dir::{test_temp_dir, TestTempDir};
424
425 fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
427 let path = dir.as_path_untracked().join(name);
428 std::fs::write(&path, data).unwrap();
429 path
430 }
431
432 fn rescan_event() -> notify::Event {
434 let event = notify::Event::new(notify::EventKind::Any);
435 event.set_flag(notify::event::Flag::Rescan)
436 }
437
438 async fn assert_file_changed(rx: &mut FileEventReceiver) {
440 assert_eq!(rx.next().await, Some(Event::FileChanged));
441
442 while let Some(ev) = rx.try_recv() {
444 assert_eq!(ev, Event::FileChanged);
445 }
446 }
447
448 fn assert_ignored(event: ¬ify::Event, watching: &HashMap<PathBuf, HashSet<DirEventFilter>>) {
451 for kind in [EventKind::Access(AccessKind::Any), EventKind::Other] {
452 let ignored_event = event.clone().set_kind(kind);
453 assert_eq!(handle_event(Ok(ignored_event.clone()), watching), None);
454 let event = ignored_event.set_flag(notify::event::Flag::Rescan);
456 assert_eq!(handle_event(Ok(event), watching), Some(Event::Rescan));
457 }
458 }
459
460 #[test]
461 fn notify_event_handler() {
462 let mut event = notify::Event::new(notify::EventKind::Modify(ModifyKind::Any));
463
464 let mut watching_dirs = Default::default();
465 assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
466 assert_eq!(
467 handle_event(Ok(rescan_event()), &watching_dirs),
468 Some(Event::Rescan)
469 );
470
471 watching_dirs.insert(
473 "/foo/baz".into(),
474 HashSet::from([DirEventFilter::MatchesExtension("auth".into())]),
475 );
476 assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
477 assert_eq!(
478 handle_event(Ok(rescan_event()), &watching_dirs),
479 Some(Event::Rescan)
480 );
481
482 event = event.add_path("/foo/bar/alice.authh".into());
483 assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
484
485 event = event.add_path("/foo/bar/alice.auth".into());
486 assert_eq!(handle_event(Ok(event.clone()), &watching_dirs), None);
487
488 event = event.add_path("/foo/baz/bob.auth".into());
489 assert_eq!(
490 handle_event(Ok(event.clone()), &watching_dirs),
491 Some(Event::FileChanged)
492 );
493
494 assert_ignored(&event, &watching_dirs);
496
497 watching_dirs.insert(
499 "/foo/bar".into(),
500 HashSet::from([DirEventFilter::MatchesPath("/foo/bar/abc".into())]),
501 );
502
503 assert_eq!(
504 handle_event(Ok(event.clone()), &watching_dirs),
505 Some(Event::FileChanged)
506 );
507 assert_eq!(
508 handle_event(Ok(rescan_event()), &watching_dirs),
509 Some(Event::Rescan)
510 );
511
512 assert_ignored(&event, &watching_dirs);
514
515 let event = notify::Event::new(notify::EventKind::Modify(ModifyKind::Any))
517 .add_path("/a/b/c/d".into());
518 let watching_dirs = [(
519 "/a/b/c/".into(),
520 HashSet::from([DirEventFilter::MatchesPath("/a/b/c/d".into())]),
521 )]
522 .into_iter()
523 .collect();
524 assert_eq!(
525 handle_event(Ok(event), &watching_dirs),
526 Some(Event::FileChanged)
527 );
528 assert_eq!(
529 handle_event(Ok(rescan_event()), &watching_dirs),
530 Some(Event::Rescan)
531 );
532
533 let err = notify::Error::path_not_found();
535 assert_eq!(handle_event(Err(err), &watching_dirs), None);
536 let mut err = notify::Error::path_not_found();
537 err = err.add_path("/a/b/c/d".into());
538 assert_eq!(
539 handle_event(Err(err), &watching_dirs),
540 Some(Event::FileChanged)
541 );
542 }
543
544 #[test]
545 fn watch_dirs() {
546 tor_rtcompat::test_with_one_runtime!(|rt| async move {
547 let temp_dir = test_temp_dir!();
548 let (tx, mut rx) = channel();
549 let mut builder = FileWatcher::builder(rt.clone());
551 builder
552 .watch_dir(temp_dir.as_path_untracked(), "foo")
553 .unwrap();
554 let watcher = builder.start_watching(tx).unwrap();
555
556 assert_eq!(rx.try_recv(), Some(Event::Rescan));
560 assert_eq!(rx.try_recv(), None);
561
562 write_file(&temp_dir, "bar.foo", b"hello");
564
565 assert_eq!(rx.next().await, Some(Event::FileChanged));
566
567 drop(watcher);
568 while let Some(ev) = rx.next().await {
570 assert_eq!(ev.clone(), Event::FileChanged);
571 }
572 });
573 }
574
575 #[test]
576 fn watch_file_path() {
577 tor_rtcompat::test_with_one_runtime!(|rt| async move {
578 let temp_dir = test_temp_dir!();
579 let (tx, mut rx) = channel();
580 let path = write_file(&temp_dir, "hello.txt", b"hello");
582 let mut builder = FileWatcher::builder(rt.clone());
583 builder.watch_path(&path).unwrap();
584 let _watcher = builder.start_watching(tx).unwrap();
585
586 assert_eq!(rx.try_recv(), Some(Event::Rescan));
588 assert_eq!(rx.try_recv(), None);
589
590 let _: PathBuf = write_file(&temp_dir, "hello.txt", b"good-bye");
592
593 assert_file_changed(&mut rx).await;
594
595 std::fs::remove_file(&path).unwrap();
597 assert_file_changed(&mut rx).await;
598
599 let tmp_hello = write_file(&temp_dir, "hello.tmp", b"new hello");
601 std::fs::rename(&tmp_hello, &path).unwrap();
603 assert_file_changed(&mut rx).await;
604 });
605 }
606
607 #[test]
608 fn watch_dir_path() {
609 tor_rtcompat::test_with_one_runtime!(|rt| async move {
610 let temp_dir1 = tempfile::TempDir::new().unwrap();
611 let (tx, mut rx) = channel();
612 let mut builder = FileWatcher::builder(rt.clone());
614 builder.watch_path(temp_dir1.path()).unwrap();
615
616 let _watcher = builder.start_watching(tx).unwrap();
617
618 assert_eq!(rx.try_recv(), Some(Event::Rescan));
620 assert_eq!(rx.try_recv(), None);
621
622 std::fs::write(temp_dir1.path().join("hello.txt"), b"hello").unwrap();
624 assert_eq!(rx.try_recv(), None);
625
626 let temp_dir2 = tempfile::TempDir::new().unwrap();
628 std::fs::rename(&temp_dir1, &temp_dir2).unwrap();
629
630 assert_file_changed(&mut rx).await;
632 std::fs::rename(&temp_dir2, &temp_dir1).unwrap();
634 assert_file_changed(&mut rx).await;
635 });
636 }
637}