arti/
reload_cfg.rs

1//! Code to watch configuration files for any changes.
2
3use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, instrument, warn};
14use tor_rtcompat::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25/// How long to wait after an event got received, before we try to process it.
26const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28/// An object that can be reconfigured when our configuration changes.
29///
30/// We use this trait so that we can represent abstract modules in our
31/// application, and pass the configuration to each of them.
32//
33// TODO: It is very likely we will want to refactor this even further once we
34// have a notion of what our modules truly are.
35#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37    /// Try to reconfigure this module according to a newly loaded configuration.
38    ///
39    /// By convention, this should only return fatal errors; any such error
40    /// should cause the program to exit.  For other cases, we should just warn.
41    //
42    // TODO: This should probably take "how: Reconfigure" as an argument, and
43    // pass it down as appropriate. See issue #1156.
44    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47/// Launch a thread to reload our configuration files.
48///
49/// If current configuration requires it, watch for changes in `sources`
50/// and try to reload our configuration. On unix platforms, also watch
51/// for SIGHUP and reload configuration then.
52///
53/// The modules are `Weak` references to prevent this background task
54/// from keeping them alive.
55///
56/// See the [`FileWatcher`](FileWatcher#Limitations) docs for limitations.
57#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58#[instrument(level = "trace", skip_all)]
59pub(crate) fn watch_for_config_changes<R: Runtime>(
60    runtime: &R,
61    sources: ConfigurationSources,
62    config: &ArtiConfig,
63    modules: Vec<Weak<dyn ReconfigurableModule>>,
64) -> anyhow::Result<()> {
65    let watch_file = config.application().watch_configuration;
66
67    cfg_if::cfg_if! {
68        if #[cfg(target_family = "unix")] {
69            let sighup_stream = sighup_stream()?;
70        } else {
71            let sighup_stream = stream::pending();
72        }
73    }
74
75    let rt = runtime.clone();
76    let () = runtime.clone().spawn(async move {
77        let res: anyhow::Result<()> = run_watcher(
78            rt,
79            sources,
80            modules,
81            watch_file,
82            sighup_stream,
83            Some(DEBOUNCE_INTERVAL)
84        ).await;
85
86        match res {
87            Ok(()) => debug!("Config watcher task exiting"),
88            // TODO: warn_report does not work on anyhow::Error.
89            Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
90        }
91    }).context("failed to spawn task")?;
92
93    Ok(())
94}
95
96/// Start watching for configuration changes.
97///
98/// Spawned from `watch_for_config_changes`.
99#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
100#[instrument(level = "trace", skip_all)]
101async fn run_watcher<R: Runtime>(
102    runtime: R,
103    sources: ConfigurationSources,
104    modules: Vec<Weak<dyn ReconfigurableModule>>,
105    watch_file: bool,
106    mut sighup_stream: impl Stream<Item = ()> + Unpin,
107    debounce_interval: Option<Duration>,
108) -> anyhow::Result<()> {
109    let (tx, mut rx) = file_watcher::channel();
110    let mut watcher = if watch_file {
111        let mut watcher = FileWatcher::builder(runtime.clone());
112        prepare(&mut watcher, &sources)?;
113        Some(watcher.start_watching(tx.clone())?)
114    } else {
115        None
116    };
117
118    debug!("Entering FS event loop");
119
120    loop {
121        select_biased! {
122            event = sighup_stream.next().fuse() => {
123                let Some(()) = event else {
124                    break;
125                };
126
127                info!("Received SIGHUP");
128
129                watcher = reload_configuration(
130                    runtime.clone(),
131                    watcher,
132                    &sources,
133                    &modules,
134                    tx.clone()
135                ).await?;
136            },
137            event = rx.next().fuse() => {
138                if let Some(debounce_interval) = debounce_interval {
139                    runtime.sleep(debounce_interval).await;
140                }
141
142                while let Some(_ignore) = rx.try_recv() {
143                    // Discard other events, so that we only reload once.
144                    //
145                    // We can afford to treat both error cases from try_recv [Empty
146                    // and Disconnected] as meaning that we've discarded other
147                    // events: if we're disconnected, we'll notice it when we next
148                    // call recv() in the outer loop.
149                }
150                debug!("Config reload event {:?}: reloading configuration.", event);
151                watcher = reload_configuration(
152                    runtime.clone(),
153                    watcher,
154                    &sources,
155                    &modules,
156                    tx.clone()
157                ).await?;
158            },
159        }
160    }
161
162    Ok(())
163}
164
165/// Reload the configuration.
166#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
167#[instrument(level = "trace", skip_all)]
168async fn reload_configuration<R: Runtime>(
169    runtime: R,
170    mut watcher: Option<FileWatcher>,
171    sources: &ConfigurationSources,
172    modules: &[Weak<dyn ReconfigurableModule>],
173    tx: FileEventSender,
174) -> anyhow::Result<Option<FileWatcher>> {
175
176    let found_files = if watcher.is_some() {
177        let mut new_watcher = FileWatcher::builder(runtime.clone());
178        let found_files = prepare(&mut new_watcher, sources)
179            .context("FS watch: failed to rescan config and re-establish watch")?;
180        let new_watcher = new_watcher
181            .start_watching(tx.clone())
182            .context("FS watch: failed to start watching config")?;
183        watcher = Some(new_watcher);
184        found_files
185    } else {
186        sources
187            .scan()
188            .context("FS watch: failed to rescan config")?
189    };
190
191    match reconfigure(found_files, modules) {
192        Ok(watch) => {
193            info!("Successfully reloaded configuration.");
194            if watch && watcher.is_none() {
195                info!("Starting watching over configuration.");
196                let mut new_watcher = FileWatcher::builder(runtime.clone());
197                let _found_files = prepare(&mut new_watcher, sources).context(
198                    "FS watch: failed to rescan config and re-establish watch: {}",
199                )?;
200                let new_watcher = new_watcher.start_watching(tx.clone()).context(
201                    "FS watch: failed to rescan config and re-establish watch: {}",
202                )?;
203                watcher = Some(new_watcher);
204            } else if !watch && watcher.is_some() {
205                info!("Stopped watching over configuration.");
206                watcher = None;
207            }
208        }
209        // TODO: warn_report does not work on anyhow::Error.
210        Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
211    }
212
213    Ok(watcher)
214}
215
216impl<R: Runtime> ReconfigurableModule for TorClient<R> {
217    #[instrument(level = "trace", skip_all)]
218    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
219        TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
220        Ok(())
221    }
222}
223
224/// Internal type to represent the Arti application as a `ReconfigurableModule`.
225pub(crate) struct Application {
226    /// The configuration that Arti had at startup.
227    ///
228    /// We use this to check whether the user is asking for any impermissible
229    /// transitions.
230    original_config: ArtiConfig,
231}
232
233impl Application {
234    /// Construct a new `Application` to receive configuration changes for the
235    /// arti application.
236    pub(crate) fn new(cfg: ArtiConfig) -> Self {
237        Self {
238            original_config: cfg,
239        }
240    }
241}
242
243impl ReconfigurableModule for Application {
244    // TODO: This should probably take "how: Reconfigure" as an argument, and
245    // pass it down as appropriate. See issue #1156.
246    #[allow(clippy::cognitive_complexity)]
247    #[instrument(level = "trace", skip_all)]
248    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
249        let original = &self.original_config;
250        let config = &new.0;
251
252        if config.proxy() != original.proxy() {
253            warn!("Can't (yet) reconfigure proxy settings while arti is running.");
254        }
255        if config.logging() != original.logging() {
256            warn!("Can't (yet) reconfigure logging settings while arti is running.");
257        }
258        #[cfg(feature = "rpc")]
259        if config.rpc != original.rpc {
260            warn!("Can't (yet) change RPC settings while arti is running.");
261        }
262        if config.application().permit_debugging && !original.application().permit_debugging {
263            warn!("Cannot disable application hardening when it has already been enabled.");
264        }
265
266        // Note that this is the only config transition we actually perform so far.
267        if !config.application().permit_debugging {
268            #[cfg(feature = "harden")]
269            crate::process::enable_process_hardening()?;
270        }
271
272        Ok(())
273    }
274}
275
276/// Find the configuration files and prepare the watcher
277fn prepare<'a, R: Runtime>(
278    watcher: &mut FileWatcherBuilder<R>,
279    sources: &'a ConfigurationSources,
280) -> anyhow::Result<FoundConfigFiles<'a>> {
281    let sources = sources.scan()?;
282    for source in sources.iter() {
283        match source {
284            ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
285            ConfigurationSource::File(file) => watcher.watch_path(file)?,
286            ConfigurationSource::Verbatim(_) => {}
287        }
288    }
289    Ok(sources)
290}
291
292/// Reload the configuration files, apply the runtime configuration, and
293/// reconfigure the client as much as we can.
294///
295/// Return true if we should be watching for configuration changes.
296//
297// TODO: This should probably take "how: Reconfigure" as an argument, and
298// pass it down as appropriate. See issue #1156.
299#[instrument(level = "trace", skip_all)]
300fn reconfigure(
301    found_files: FoundConfigFiles<'_>,
302    reconfigurable: &[Weak<dyn ReconfigurableModule>],
303) -> anyhow::Result<bool> {
304    let _ = reconfigurable;
305    let config = found_files.load()?;
306    let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
307
308    // Filter out the modules that have been dropped
309    let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
310    // If there are no more modules, we should exit.
311    let mut has_modules = false;
312
313    for module in reconfigurable {
314        has_modules = true;
315        module.reconfigure(&config)?;
316    }
317
318    Ok(has_modules && config.0.application().watch_configuration)
319}
320
321#[cfg(test)]
322mod test {
323    // @@ begin test lint list maintained by maint/add_warning @@
324    #![allow(clippy::bool_assert_comparison)]
325    #![allow(clippy::clone_on_copy)]
326    #![allow(clippy::dbg_macro)]
327    #![allow(clippy::mixed_attributes_style)]
328    #![allow(clippy::print_stderr)]
329    #![allow(clippy::print_stdout)]
330    #![allow(clippy::single_char_pattern)]
331    #![allow(clippy::unwrap_used)]
332    #![allow(clippy::unchecked_time_subtraction)]
333    #![allow(clippy::useless_vec)]
334    #![allow(clippy::needless_pass_by_value)]
335    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
336
337    use crate::ArtiConfigBuilder;
338
339    use super::*;
340    use futures::channel::mpsc;
341    use futures::SinkExt as _;
342    use tor_config::sources::MustRead;
343    use std::path::PathBuf;
344    use std::sync::{Arc, Mutex};
345    use test_temp_dir::{test_temp_dir, TestTempDir};
346    use postage::watch;
347    use tor_async_utils::PostageWatchSenderExt;
348
349    /// Filename for config1
350    const CONFIG_NAME1: &str = "config1.toml";
351    /// Filename for config2
352    const CONFIG_NAME2: &str = "config2.toml";
353    /// Filename for config3
354    const CONFIG_NAME3: &str = "config3.toml";
355
356    struct TestModule {
357        // A sender for sending the new config to the test function
358        tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
359    }
360
361    impl ReconfigurableModule for TestModule {
362        fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
363            let config = new.clone();
364            self.tx.lock().unwrap().maybe_send(|_| config);
365
366            Ok(())
367        }
368    }
369
370    /// Create a test reconfigurable module.
371    ///
372    /// Returns the module and a channel on which the new configs received by the module are sent.
373    async fn create_module(
374    ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
375        let (tx, mut rx) = watch::channel();
376        // Read the initial value from the postage::watch stream
377        // (the first observed value on this test stream is always the default config)
378        let _: ArtiCombinedConfig = rx.next().await.unwrap();
379
380        (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
381    }
382
383    /// Write `data` to file `name` within `dir`.
384    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
385        let tmp = dir.as_path_untracked().join("tmp");
386        std::fs::write(&tmp, data).unwrap();
387        let path = dir.as_path_untracked().join(name);
388        // Atomically write the config file
389        std::fs::rename(tmp, &path).unwrap();
390        path
391    }
392
393    /// Write an `ArtiConfigBuilder` to a file within `dir`.
394    fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
395        let s = toml::to_string(&config).unwrap();
396        write_file(dir, name, s.as_bytes())
397    }
398
399    #[test]
400    fn watch_single_file() {
401        tor_rtcompat::test_with_one_runtime!(|rt| async move {
402            let temp_dir = test_temp_dir!();
403            let mut config_builder =  ArtiConfigBuilder::default();
404            config_builder.application().watch_configuration(true);
405
406            let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
407            let mut cfg_sources = ConfigurationSources::new_empty();
408            cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
409
410            let (module, mut rx) = create_module().await;
411
412            config_builder.logging().log_sensitive_information(true);
413            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
414
415            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
416            // loop is entered
417            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
418            let runtime = rt.clone();
419            let () = rt.spawn(async move {
420                run_watcher(
421                    runtime,
422                    cfg_sources,
423                    vec![Arc::downgrade(&module)],
424                    true,
425                    sighup_rx,
426                    None,
427                ).await.unwrap();
428            }).unwrap();
429
430            sighup_tx.send(()).await.unwrap();
431
432            // The reconfigurable modules should've been reloaded in response to sighup
433            let config = rx.next().await.unwrap();
434
435            assert_eq!(config.0, config_builder.build().unwrap());
436
437            // Overwrite the config
438            config_builder.logging().log_sensitive_information(false);
439            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
440            // The reconfigurable modules should've been reloaded in response to the config change
441            let config = rx.next().await.unwrap();
442            assert_eq!(config.0, config_builder.build().unwrap());
443
444        });
445    }
446
447    // TODO: Ignored until #1607 is fixed
448    #[test]
449    #[ignore]
450    fn watch_multiple() {
451        tor_rtcompat::test_with_one_runtime!(|rt| async move {
452            let temp_dir = test_temp_dir!();
453            let mut config_builder1 =  ArtiConfigBuilder::default();
454            config_builder1.application().watch_configuration(true);
455
456            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
457            let mut cfg_sources = ConfigurationSources::new_empty();
458            cfg_sources.push_source(
459                ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
460                MustRead::MustRead
461            );
462
463            let (module, mut rx) = create_module().await;
464            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
465            // loop is entered
466            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
467            let runtime = rt.clone();
468            let () = rt.spawn(async move {
469                run_watcher(
470                    runtime,
471                    cfg_sources,
472                    vec![Arc::downgrade(&module)],
473                    true,
474                    sighup_rx,
475                    None,
476                ).await.unwrap();
477            }).unwrap();
478
479            config_builder1.logging().log_sensitive_information(true);
480            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
481            sighup_tx.send(()).await.unwrap();
482            // The reconfigurable modules should've been reloaded in response to sighup
483            let config = rx.next().await.unwrap();
484            assert_eq!(config.0, config_builder1.build().unwrap());
485
486            let mut config_builder2 =  ArtiConfigBuilder::default();
487            config_builder2.application().watch_configuration(true);
488            // Write another config file...
489            config_builder2.system().max_files(0_u64);
490            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
491            // Check that the 2 config files are merged
492            let mut config_builder_combined = config_builder1.clone();
493            config_builder_combined.system().max_files(0_u64);
494            let config = rx.next().await.unwrap();
495            assert_eq!(config.0, config_builder_combined.build().unwrap());
496            // Now write a new config file to the watched dir
497            config_builder2.logging().console("foo".to_string());
498            let mut config_builder_combined2 = config_builder_combined.clone();
499            config_builder_combined2.logging().console("foo".to_string());
500            let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
501            let config = rx.next().await.unwrap();
502            assert_eq!(config.0, config_builder_combined2.build().unwrap());
503
504            // Removing the file should also trigger an event
505            std::fs::remove_file(config3).unwrap();
506            let config = rx.next().await.unwrap();
507            assert_eq!(config.0, config_builder_combined.build().unwrap());
508        });
509    }
510}