arti/
reload_cfg.rs

1//! Code to watch configuration files for any changes.
2
3use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, warn};
14use futures::task::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25/// How long to wait after an event got received, before we try to process it.
26const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28/// An object that can be reconfigured when our configuration changes.
29///
30/// We use this trait so that we can represent abstract modules in our
31/// application, and pass the configuration to each of them.
32//
33// TODO: It is very likely we will want to refactor this even further once we
34// have a notion of what our modules truly are.
35#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37    /// Try to reconfigure this module according to a newly loaded configuration.
38    ///
39    /// By convention, this should only return fatal errors; any such error
40    /// should cause the program to exit.  For other cases, we should just warn.
41    //
42    // TODO: This should probably take "how: Reconfigure" as an argument, and
43    // pass it down as appropriate. See issue #1156.
44    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47/// Launch a thread to reload our configuration files.
48///
49/// If current configuration requires it, watch for changes in `sources`
50/// and try to reload our configuration. On unix platforms, also watch
51/// for SIGHUP and reload configuration then.
52///
53/// The modules are `Weak` references to prevent this background task
54/// from keeping them alive.
55///
56/// See the [`FileWatcher`](FileWatcher#Limitations) docs for limitations.
57#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58pub(crate) fn watch_for_config_changes<R: Runtime>(
59    runtime: &R,
60    sources: ConfigurationSources,
61    config: &ArtiConfig,
62    modules: Vec<Weak<dyn ReconfigurableModule>>,
63) -> anyhow::Result<()> {
64    let watch_file = config.application().watch_configuration;
65
66    cfg_if::cfg_if! {
67        if #[cfg(target_family = "unix")] {
68            let sighup_stream = sighup_stream()?;
69        } else {
70            let sighup_stream = stream::pending();
71        }
72    }
73
74    let rt = runtime.clone();
75    let () = runtime.clone().spawn(async move {
76        let res: anyhow::Result<()> = run_watcher(
77            rt,
78            sources,
79            modules,
80            watch_file,
81            sighup_stream,
82            Some(DEBOUNCE_INTERVAL)
83        ).await;
84
85        match res {
86            Ok(()) => debug!("Config watcher task exiting"),
87            // TODO: warn_report does not work on anyhow::Error.
88            Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
89        }
90    }).context("failed to spawn task")?;
91
92    Ok(())
93}
94
95/// Start watching for configuration changes.
96///
97/// Spawned from `watch_for_config_changes`.
98#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
99async fn run_watcher<R: Runtime>(
100    runtime: R,
101    sources: ConfigurationSources,
102    modules: Vec<Weak<dyn ReconfigurableModule>>,
103    watch_file: bool,
104    mut sighup_stream: impl Stream<Item = ()> + Unpin,
105    debounce_interval: Option<Duration>,
106) -> anyhow::Result<()> {
107    let (tx, mut rx) = file_watcher::channel();
108    let mut watcher = if watch_file {
109        let mut watcher = FileWatcher::builder(runtime.clone());
110        prepare(&mut watcher, &sources)?;
111        Some(watcher.start_watching(tx.clone())?)
112    } else {
113        None
114    };
115
116    debug!("Entering FS event loop");
117
118    loop {
119        select_biased! {
120            event = sighup_stream.next().fuse() => {
121                let Some(()) = event else {
122                    break;
123                };
124
125                info!("Received SIGHUP");
126
127                watcher = reload_configuration(
128                    runtime.clone(),
129                    watcher,
130                    &sources,
131                    &modules,
132                    tx.clone()
133                ).await?;
134            },
135            event = rx.next().fuse() => {
136                if let Some(debounce_interval) = debounce_interval {
137                    runtime.sleep(debounce_interval).await;
138                }
139
140                while let Some(_ignore) = rx.try_recv() {
141                    // Discard other events, so that we only reload once.
142                    //
143                    // We can afford to treat both error cases from try_recv [Empty
144                    // and Disconnected] as meaning that we've discarded other
145                    // events: if we're disconnected, we'll notice it when we next
146                    // call recv() in the outer loop.
147                }
148                debug!("Config reload event {:?}: reloading configuration.", event);
149                watcher = reload_configuration(
150                    runtime.clone(),
151                    watcher,
152                    &sources,
153                    &modules,
154                    tx.clone()
155                ).await?;
156            },
157        }
158    }
159
160    Ok(())
161}
162
163/// Reload the configuration.
164#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
165async fn reload_configuration<R: Runtime>(
166    runtime: R,
167    mut watcher: Option<FileWatcher>,
168    sources: &ConfigurationSources,
169    modules: &[Weak<dyn ReconfigurableModule>],
170    tx: FileEventSender,
171) -> anyhow::Result<Option<FileWatcher>> {
172
173    let found_files = if watcher.is_some() {
174        let mut new_watcher = FileWatcher::builder(runtime.clone());
175        let found_files = prepare(&mut new_watcher, sources)
176            .context("FS watch: failed to rescan config and re-establish watch")?;
177        let new_watcher = new_watcher
178            .start_watching(tx.clone())
179            .context("FS watch: failed to start watching config")?;
180        watcher = Some(new_watcher);
181        found_files
182    } else {
183        sources
184            .scan()
185            .context("FS watch: failed to rescan config")?
186    };
187
188    match reconfigure(found_files, modules) {
189        Ok(watch) => {
190            info!("Successfully reloaded configuration.");
191            if watch && watcher.is_none() {
192                info!("Starting watching over configuration.");
193                let mut new_watcher = FileWatcher::builder(runtime.clone());
194                let _found_files = prepare(&mut new_watcher, sources).context(
195                    "FS watch: failed to rescan config and re-establish watch: {}",
196                )?;
197                let new_watcher = new_watcher.start_watching(tx.clone()).context(
198                    "FS watch: failed to rescan config and re-establish watch: {}",
199                )?;
200                watcher = Some(new_watcher);
201            } else if !watch && watcher.is_some() {
202                info!("Stopped watching over configuration.");
203                watcher = None;
204            }
205        }
206        // TODO: warn_report does not work on anyhow::Error.
207        Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
208    }
209
210    Ok(watcher)
211}
212
213impl<R: Runtime> ReconfigurableModule for TorClient<R> {
214    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
215        TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
216        Ok(())
217    }
218}
219
220/// Internal type to represent the Arti application as a `ReconfigurableModule`.
221pub(crate) struct Application {
222    /// The configuration that Arti had at startup.
223    ///
224    /// We use this to check whether the user is asking for any impermissible
225    /// transitions.
226    original_config: ArtiConfig,
227}
228
229impl Application {
230    /// Construct a new `Application` to receive configuration changes for the
231    /// arti application.
232    pub(crate) fn new(cfg: ArtiConfig) -> Self {
233        Self {
234            original_config: cfg,
235        }
236    }
237}
238
239impl ReconfigurableModule for Application {
240    // TODO: This should probably take "how: Reconfigure" as an argument, and
241    // pass it down as appropriate. See issue #1156.
242    #[allow(clippy::cognitive_complexity)]
243    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
244        let original = &self.original_config;
245        let config = &new.0;
246
247        if config.proxy() != original.proxy() {
248            warn!("Can't (yet) reconfigure proxy settings while arti is running.");
249        }
250        if config.logging() != original.logging() {
251            warn!("Can't (yet) reconfigure logging settings while arti is running.");
252        }
253        #[cfg(feature = "rpc")]
254        if config.rpc != original.rpc {
255            warn!("Can't (yet) change RPC settings while arti is running.");
256        }
257        if config.application().permit_debugging && !original.application().permit_debugging {
258            warn!("Cannot disable application hardening when it has already been enabled.");
259        }
260
261        // Note that this is the only config transition we actually perform so far.
262        if !config.application().permit_debugging {
263            #[cfg(feature = "harden")]
264            crate::process::enable_process_hardening()?;
265        }
266
267        Ok(())
268    }
269}
270
271/// Find the configuration files and prepare the watcher
272fn prepare<'a, R: Runtime>(
273    watcher: &mut FileWatcherBuilder<R>,
274    sources: &'a ConfigurationSources,
275) -> anyhow::Result<FoundConfigFiles<'a>> {
276    let sources = sources.scan()?;
277    for source in sources.iter() {
278        match source {
279            ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
280            ConfigurationSource::File(file) => watcher.watch_path(file)?,
281            ConfigurationSource::Verbatim(_) => {}
282        }
283    }
284    Ok(sources)
285}
286
287/// Reload the configuration files, apply the runtime configuration, and
288/// reconfigure the client as much as we can.
289///
290/// Return true if we should be watching for configuration changes.
291//
292// TODO: This should probably take "how: Reconfigure" as an argument, and
293// pass it down as appropriate. See issue #1156.
294fn reconfigure(
295    found_files: FoundConfigFiles<'_>,
296    reconfigurable: &[Weak<dyn ReconfigurableModule>],
297) -> anyhow::Result<bool> {
298    let _ = reconfigurable;
299    let config = found_files.load()?;
300    let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
301
302    // Filter out the modules that have been dropped
303    let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
304    // If there are no more modules, we should exit.
305    let mut has_modules = false;
306
307    for module in reconfigurable {
308        has_modules = true;
309        module.reconfigure(&config)?;
310    }
311
312    Ok(has_modules && config.0.application().watch_configuration)
313}
314
315#[cfg(test)]
316mod test {
317    // @@ begin test lint list maintained by maint/add_warning @@
318    #![allow(clippy::bool_assert_comparison)]
319    #![allow(clippy::clone_on_copy)]
320    #![allow(clippy::dbg_macro)]
321    #![allow(clippy::mixed_attributes_style)]
322    #![allow(clippy::print_stderr)]
323    #![allow(clippy::print_stdout)]
324    #![allow(clippy::single_char_pattern)]
325    #![allow(clippy::unwrap_used)]
326    #![allow(clippy::unchecked_duration_subtraction)]
327    #![allow(clippy::useless_vec)]
328    #![allow(clippy::needless_pass_by_value)]
329    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
330
331    use crate::ArtiConfigBuilder;
332
333    use super::*;
334    use futures::channel::mpsc;
335    use futures::SinkExt as _;
336    use tor_config::sources::MustRead;
337    use std::path::PathBuf;
338    use std::sync::{Arc, Mutex};
339    use test_temp_dir::{test_temp_dir, TestTempDir};
340    use postage::watch;
341    use tor_async_utils::PostageWatchSenderExt;
342
343    /// Filename for config1
344    const CONFIG_NAME1: &str = "config1.toml";
345    /// Filename for config2
346    const CONFIG_NAME2: &str = "config2.toml";
347    /// Filename for config3
348    const CONFIG_NAME3: &str = "config3.toml";
349
350    struct TestModule {
351        // A sender for sending the new config to the test function
352        tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
353    }
354
355    impl ReconfigurableModule for TestModule {
356        fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
357            let config = new.clone();
358            self.tx.lock().unwrap().maybe_send(|_| config);
359
360            Ok(())
361        }
362    }
363
364    /// Create a test reconfigurable module.
365    ///
366    /// Returns the module and a channel on which the new configs received by the module are sent.
367    async fn create_module(
368    ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
369        let (tx, mut rx) = watch::channel();
370        // Read the initial value from the postage::watch stream
371        // (the first observed value on this test stream is always the default config)
372        let _: ArtiCombinedConfig = rx.next().await.unwrap();
373
374        (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
375    }
376
377    /// Write `data` to file `name` within `dir`.
378    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
379        let tmp = dir.as_path_untracked().join("tmp");
380        std::fs::write(&tmp, data).unwrap();
381        let path = dir.as_path_untracked().join(name);
382        // Atomically write the config file
383        std::fs::rename(tmp, &path).unwrap();
384        path
385    }
386
387    /// Write an `ArtiConfigBuilder` to a file within `dir`.
388    fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
389        let s = toml::to_string(&config).unwrap();
390        write_file(dir, name, s.as_bytes())
391    }
392
393    #[test]
394    #[ignore] // TODO(#1607): Re-enable
395    fn watch_single_file() {
396        tor_rtcompat::test_with_one_runtime!(|rt| async move {
397            let temp_dir = test_temp_dir!();
398            let mut config_builder =  ArtiConfigBuilder::default();
399            config_builder.application().watch_configuration(true);
400
401            let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
402            let mut cfg_sources = ConfigurationSources::new_empty();
403            cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
404
405            let (module, mut rx) = create_module().await;
406
407            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
408            // loop is entered
409            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
410            let runtime = rt.clone();
411            let () = rt.spawn(async move {
412                run_watcher(
413                    runtime,
414                    cfg_sources,
415                    vec![Arc::downgrade(&module)],
416                    true,
417                    sighup_rx,
418                    None,
419                ).await.unwrap();
420            }).unwrap();
421
422            config_builder.logging().log_sensitive_information(true);
423            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
424            sighup_tx.send(()).await.unwrap();
425            // The reconfigurable modules should've been reloaded in response to sighup
426            let config = rx.next().await.unwrap();
427            assert_eq!(config.0, config_builder.build().unwrap());
428
429            // Overwrite the config
430            config_builder.logging().log_sensitive_information(false);
431            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
432            // The reconfigurable modules should've been reloaded in response to the config change
433            let config = rx.next().await.unwrap();
434            assert_eq!(config.0, config_builder.build().unwrap());
435
436        });
437    }
438
439    #[test]
440    fn watch_multiple() {
441        tor_rtcompat::test_with_one_runtime!(|rt| async move {
442            let temp_dir = test_temp_dir!();
443            let mut config_builder1 =  ArtiConfigBuilder::default();
444            config_builder1.application().watch_configuration(true);
445
446            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
447            let mut cfg_sources = ConfigurationSources::new_empty();
448            cfg_sources.push_source(
449                ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
450                MustRead::MustRead
451            );
452
453            let (module, mut rx) = create_module().await;
454            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
455            // loop is entered
456            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
457            let runtime = rt.clone();
458            let () = rt.spawn(async move {
459                run_watcher(
460                    runtime,
461                    cfg_sources,
462                    vec![Arc::downgrade(&module)],
463                    true,
464                    sighup_rx,
465                    None,
466                ).await.unwrap();
467            }).unwrap();
468
469            config_builder1.logging().log_sensitive_information(true);
470            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
471            sighup_tx.send(()).await.unwrap();
472            // The reconfigurable modules should've been reloaded in response to sighup
473            let config = rx.next().await.unwrap();
474            assert_eq!(config.0, config_builder1.build().unwrap());
475
476            let mut config_builder2 =  ArtiConfigBuilder::default();
477            config_builder2.application().watch_configuration(true);
478            // Write another config file...
479            config_builder2.system().max_files(0_u64);
480            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
481            // Check that the 2 config files are merged
482            let mut config_builder_combined = config_builder1.clone();
483            config_builder_combined.system().max_files(0_u64);
484            let config = rx.next().await.unwrap();
485            assert_eq!(config.0, config_builder_combined.build().unwrap());
486            // Now write a new config file to the watched dir
487            config_builder2.logging().console("foo".to_string());
488            let mut config_builder_combined2 = config_builder_combined.clone();
489            config_builder_combined2.logging().console("foo".to_string());
490            let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
491            let config = rx.next().await.unwrap();
492            assert_eq!(config.0, config_builder_combined2.build().unwrap());
493
494            // Removing the file should also trigger an event
495            std::fs::remove_file(config3).unwrap();
496            let config = rx.next().await.unwrap();
497            assert_eq!(config.0, config_builder_combined.build().unwrap());
498        });
499    }
500}