arti/
reload_cfg.rs

1//! Code to watch configuration files for any changes.
2
3use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, warn};
14use futures::task::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25/// How long to wait after an event got received, before we try to process it.
26const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28/// An object that can be reconfigured when our configuration changes.
29///
30/// We use this trait so that we can represent abstract modules in our
31/// application, and pass the configuration to each of them.
32//
33// TODO: It is very likely we will want to refactor this even further once we
34// have a notion of what our modules truly are.
35#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37    /// Try to reconfigure this module according to a newly loaded configuration.
38    ///
39    /// By convention, this should only return fatal errors; any such error
40    /// should cause the program to exit.  For other cases, we should just warn.
41    //
42    // TODO: This should probably take "how: Reconfigure" as an argument, and
43    // pass it down as appropriate. See issue #1156.
44    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47/// Launch a thread to reload our configuration files.
48///
49/// If current configuration requires it, watch for changes in `sources`
50/// and try to reload our configuration. On unix platforms, also watch
51/// for SIGHUP and reload configuration then.
52///
53/// The modules are `Weak` references to prevent this background task
54/// from keeping them alive.
55///
56/// See the [`FileWatcher`](FileWatcher#Limitations) docs for limitations.
57#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58pub(crate) fn watch_for_config_changes<R: Runtime>(
59    runtime: &R,
60    sources: ConfigurationSources,
61    config: &ArtiConfig,
62    modules: Vec<Weak<dyn ReconfigurableModule>>,
63) -> anyhow::Result<()> {
64    let watch_file = config.application().watch_configuration;
65
66    cfg_if::cfg_if! {
67        if #[cfg(target_family = "unix")] {
68            let sighup_stream = sighup_stream()?;
69        } else {
70            let sighup_stream = stream::pending();
71        }
72    }
73
74    let rt = runtime.clone();
75    let () = runtime.clone().spawn(async move {
76        let res: anyhow::Result<()> = run_watcher(
77            rt,
78            sources,
79            modules,
80            watch_file,
81            sighup_stream,
82            Some(DEBOUNCE_INTERVAL)
83        ).await;
84
85        match res {
86            Ok(()) => debug!("Config watcher task exiting"),
87            // TODO: warn_report does not work on anyhow::Error.
88            Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
89        }
90    }).context("failed to spawn task")?;
91
92    Ok(())
93}
94
95/// Start watching for configuration changes.
96///
97/// Spawned from `watch_for_config_changes`.
98async fn run_watcher<R: Runtime>(
99    runtime: R,
100    sources: ConfigurationSources,
101    modules: Vec<Weak<dyn ReconfigurableModule>>,
102    watch_file: bool,
103    mut sighup_stream: impl Stream<Item = ()> + Unpin,
104    debounce_interval: Option<Duration>,
105) -> anyhow::Result<()> {
106    let (tx, mut rx) = file_watcher::channel();
107    let mut watcher = if watch_file {
108        let mut watcher = FileWatcher::builder(runtime.clone());
109        prepare(&mut watcher, &sources)?;
110        Some(watcher.start_watching(tx.clone())?)
111    } else {
112        None
113    };
114
115    debug!("Entering FS event loop");
116
117    loop {
118        select_biased! {
119            event = sighup_stream.next().fuse() => {
120                let Some(()) = event else {
121                    break;
122                };
123
124                info!("Received SIGHUP");
125
126                watcher = reload_configuration(
127                    runtime.clone(),
128                    watcher,
129                    &sources,
130                    &modules,
131                    tx.clone()
132                ).await?;
133            },
134            event = rx.next().fuse() => {
135                if let Some(debounce_interval) = debounce_interval {
136                    runtime.sleep(debounce_interval).await;
137                }
138
139                while let Some(_ignore) = rx.try_recv() {
140                    // Discard other events, so that we only reload once.
141                    //
142                    // We can afford to treat both error cases from try_recv [Empty
143                    // and Disconnected] as meaning that we've discarded other
144                    // events: if we're disconnected, we'll notice it when we next
145                    // call recv() in the outer loop.
146                }
147                debug!("Config reload event {:?}: reloading configuration.", event);
148                watcher = reload_configuration(
149                    runtime.clone(),
150                    watcher,
151                    &sources,
152                    &modules,
153                    tx.clone()
154                ).await?;
155            },
156        }
157    }
158
159    Ok(())
160}
161
162/// Reload the configuration.
163async fn reload_configuration<R: Runtime>(
164    runtime: R,
165    mut watcher: Option<FileWatcher>,
166    sources: &ConfigurationSources,
167    modules: &[Weak<dyn ReconfigurableModule>],
168    tx: FileEventSender,
169) -> anyhow::Result<Option<FileWatcher>> {
170
171    let found_files = if watcher.is_some() {
172        let mut new_watcher = FileWatcher::builder(runtime.clone());
173        let found_files = prepare(&mut new_watcher, sources)
174            .context("FS watch: failed to rescan config and re-establish watch")?;
175        let new_watcher = new_watcher
176            .start_watching(tx.clone())
177            .context("FS watch: failed to start watching config")?;
178        watcher = Some(new_watcher);
179        found_files
180    } else {
181        sources
182            .scan()
183            .context("FS watch: failed to rescan config")?
184    };
185
186    match reconfigure(found_files, modules) {
187        Ok(watch) => {
188            info!("Successfully reloaded configuration.");
189            if watch && watcher.is_none() {
190                info!("Starting watching over configuration.");
191                let mut new_watcher = FileWatcher::builder(runtime.clone());
192                let _found_files = prepare(&mut new_watcher, sources).context(
193                    "FS watch: failed to rescan config and re-establish watch: {}",
194                )?;
195                let new_watcher = new_watcher.start_watching(tx.clone()).context(
196                    "FS watch: failed to rescan config and re-establish watch: {}",
197                )?;
198                watcher = Some(new_watcher);
199            } else if !watch && watcher.is_some() {
200                info!("Stopped watching over configuration.");
201                watcher = None;
202            }
203        }
204        // TODO: warn_report does not work on anyhow::Error.
205        Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
206    }
207
208    Ok(watcher)
209}
210
211impl<R: Runtime> ReconfigurableModule for TorClient<R> {
212    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
213        TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
214        Ok(())
215    }
216}
217
218/// Internal type to represent the Arti application as a `ReconfigurableModule`.
219pub(crate) struct Application {
220    /// The configuration that Arti had at startup.
221    ///
222    /// We use this to check whether the user is asking for any impermissible
223    /// transitions.
224    original_config: ArtiConfig,
225}
226
227impl Application {
228    /// Construct a new `Application` to receive configuration changes for the
229    /// arti application.
230    pub(crate) fn new(cfg: ArtiConfig) -> Self {
231        Self {
232            original_config: cfg,
233        }
234    }
235}
236
237impl ReconfigurableModule for Application {
238    // TODO: This should probably take "how: Reconfigure" as an argument, and
239    // pass it down as appropriate. See issue #1156.
240    #[allow(clippy::cognitive_complexity)]
241    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
242        let original = &self.original_config;
243        let config = &new.0;
244
245        if config.proxy() != original.proxy() {
246            warn!("Can't (yet) reconfigure proxy settings while arti is running.");
247        }
248        if config.logging() != original.logging() {
249            warn!("Can't (yet) reconfigure logging settings while arti is running.");
250        }
251        #[cfg(feature = "rpc")]
252        if config.rpc != original.rpc {
253            warn!("Can't (yet) change RPC settings while arti is running.");
254        }
255        if config.application().permit_debugging && !original.application().permit_debugging {
256            warn!("Cannot disable application hardening when it has already been enabled.");
257        }
258
259        // Note that this is the only config transition we actually perform so far.
260        if !config.application().permit_debugging {
261            #[cfg(feature = "harden")]
262            crate::process::enable_process_hardening()?;
263        }
264
265        Ok(())
266    }
267}
268
269/// Find the configuration files and prepare the watcher
270fn prepare<'a, R: Runtime>(
271    watcher: &mut FileWatcherBuilder<R>,
272    sources: &'a ConfigurationSources,
273) -> anyhow::Result<FoundConfigFiles<'a>> {
274    let sources = sources.scan()?;
275    for source in sources.iter() {
276        match source {
277            ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
278            ConfigurationSource::File(file) => watcher.watch_path(file)?,
279            ConfigurationSource::Verbatim(_) => {}
280        }
281    }
282    Ok(sources)
283}
284
285/// Reload the configuration files, apply the runtime configuration, and
286/// reconfigure the client as much as we can.
287///
288/// Return true if we should be watching for configuration changes.
289//
290// TODO: This should probably take "how: Reconfigure" as an argument, and
291// pass it down as appropriate. See issue #1156.
292fn reconfigure(
293    found_files: FoundConfigFiles<'_>,
294    reconfigurable: &[Weak<dyn ReconfigurableModule>],
295) -> anyhow::Result<bool> {
296    let _ = reconfigurable;
297    let config = found_files.load()?;
298    let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
299
300    // Filter out the modules that have been dropped
301    let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
302    // If there are no more modules, we should exit.
303    let mut has_modules = false;
304
305    for module in reconfigurable {
306        has_modules = true;
307        module.reconfigure(&config)?;
308    }
309
310    Ok(has_modules && config.0.application().watch_configuration)
311}
312
313#[cfg(test)]
314mod test {
315    // @@ begin test lint list maintained by maint/add_warning @@
316    #![allow(clippy::bool_assert_comparison)]
317    #![allow(clippy::clone_on_copy)]
318    #![allow(clippy::dbg_macro)]
319    #![allow(clippy::mixed_attributes_style)]
320    #![allow(clippy::print_stderr)]
321    #![allow(clippy::print_stdout)]
322    #![allow(clippy::single_char_pattern)]
323    #![allow(clippy::unwrap_used)]
324    #![allow(clippy::unchecked_duration_subtraction)]
325    #![allow(clippy::useless_vec)]
326    #![allow(clippy::needless_pass_by_value)]
327    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
328
329    use crate::ArtiConfigBuilder;
330
331    use super::*;
332    use futures::channel::mpsc;
333    use futures::SinkExt as _;
334    use tor_config::sources::MustRead;
335    use std::path::PathBuf;
336    use std::sync::{Arc, Mutex};
337    use test_temp_dir::{test_temp_dir, TestTempDir};
338    use postage::watch;
339    use tor_async_utils::PostageWatchSenderExt;
340
341    /// Filename for config1
342    const CONFIG_NAME1: &str = "config1.toml";
343    /// Filename for config2
344    const CONFIG_NAME2: &str = "config2.toml";
345    /// Filename for config3
346    const CONFIG_NAME3: &str = "config3.toml";
347
348    struct TestModule {
349        // A sender for sending the new config to the test function
350        tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
351    }
352
353    impl ReconfigurableModule for TestModule {
354        fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
355            let config = new.clone();
356            self.tx.lock().unwrap().maybe_send(|_| config);
357
358            Ok(())
359        }
360    }
361
362    /// Create a test reconfigurable module.
363    ///
364    /// Returns the module and a channel on which the new configs received by the module are sent.
365    async fn create_module(
366    ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
367        let (tx, mut rx) = watch::channel();
368        // Read the initial value from the postage::watch stream
369        // (the first observed value on this test stream is always the default config)
370        let _: ArtiCombinedConfig = rx.next().await.unwrap();
371
372        (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
373    }
374
375    /// Write `data` to file `name` within `dir`.
376    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
377        let tmp = dir.as_path_untracked().join("tmp");
378        std::fs::write(&tmp, data).unwrap();
379        let path = dir.as_path_untracked().join(name);
380        // Atomically write the config file
381        std::fs::rename(tmp, &path).unwrap();
382        path
383    }
384
385    /// Write an `ArtiConfigBuilder` to a file within `dir`.
386    fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
387        let s = toml::to_string(&config).unwrap();
388        write_file(dir, name, s.as_bytes())
389    }
390
391    #[test]
392    #[ignore] // TODO(#1607): Re-enable
393    fn watch_single_file() {
394        tor_rtcompat::test_with_one_runtime!(|rt| async move {
395            let temp_dir = test_temp_dir!();
396            let mut config_builder =  ArtiConfigBuilder::default();
397            config_builder.application().watch_configuration(true);
398
399            let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
400            let mut cfg_sources = ConfigurationSources::new_empty();
401            cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
402
403            let (module, mut rx) = create_module().await;
404
405            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
406            // loop is entered
407            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
408            let runtime = rt.clone();
409            let () = rt.spawn(async move {
410                run_watcher(
411                    runtime,
412                    cfg_sources,
413                    vec![Arc::downgrade(&module)],
414                    true,
415                    sighup_rx,
416                    None,
417                ).await.unwrap();
418            }).unwrap();
419
420            config_builder.logging().log_sensitive_information(true);
421            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
422            sighup_tx.send(()).await.unwrap();
423            // The reconfigurable modules should've been reloaded in response to sighup
424            let config = rx.next().await.unwrap();
425            assert_eq!(config.0, config_builder.build().unwrap());
426
427            // Overwrite the config
428            config_builder.logging().log_sensitive_information(false);
429            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
430            // The reconfigurable modules should've been reloaded in response to the config change
431            let config = rx.next().await.unwrap();
432            assert_eq!(config.0, config_builder.build().unwrap());
433
434        });
435    }
436
437    #[test]
438    fn watch_multiple() {
439        tor_rtcompat::test_with_one_runtime!(|rt| async move {
440            let temp_dir = test_temp_dir!();
441            let mut config_builder1 =  ArtiConfigBuilder::default();
442            config_builder1.application().watch_configuration(true);
443
444            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
445            let mut cfg_sources = ConfigurationSources::new_empty();
446            cfg_sources.push_source(
447                ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
448                MustRead::MustRead
449            );
450
451            let (module, mut rx) = create_module().await;
452            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
453            // loop is entered
454            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
455            let runtime = rt.clone();
456            let () = rt.spawn(async move {
457                run_watcher(
458                    runtime,
459                    cfg_sources,
460                    vec![Arc::downgrade(&module)],
461                    true,
462                    sighup_rx,
463                    None,
464                ).await.unwrap();
465            }).unwrap();
466
467            config_builder1.logging().log_sensitive_information(true);
468            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
469            sighup_tx.send(()).await.unwrap();
470            // The reconfigurable modules should've been reloaded in response to sighup
471            let config = rx.next().await.unwrap();
472            assert_eq!(config.0, config_builder1.build().unwrap());
473
474            let mut config_builder2 =  ArtiConfigBuilder::default();
475            config_builder2.application().watch_configuration(true);
476            // Write another config file...
477            config_builder2.system().max_files(0_u64);
478            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
479            // Check that the 2 config files are merged
480            let mut config_builder_combined = config_builder1.clone();
481            config_builder_combined.system().max_files(0_u64);
482            let config = rx.next().await.unwrap();
483            assert_eq!(config.0, config_builder_combined.build().unwrap());
484            // Now write a new config file to the watched dir
485            config_builder2.logging().console("foo".to_string());
486            let mut config_builder_combined2 = config_builder_combined.clone();
487            config_builder_combined2.logging().console("foo".to_string());
488            let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
489            let config = rx.next().await.unwrap();
490            assert_eq!(config.0, config_builder_combined2.build().unwrap());
491
492            // Removing the file should also trigger an event
493            std::fs::remove_file(config3).unwrap();
494            let config = rx.next().await.unwrap();
495            assert_eq!(config.0, config_builder_combined.build().unwrap());
496        });
497    }
498}