1use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, warn};
14use futures::task::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58pub(crate) fn watch_for_config_changes<R: Runtime>(
59 runtime: &R,
60 sources: ConfigurationSources,
61 config: &ArtiConfig,
62 modules: Vec<Weak<dyn ReconfigurableModule>>,
63) -> anyhow::Result<()> {
64 let watch_file = config.application().watch_configuration;
65
66 cfg_if::cfg_if! {
67 if #[cfg(target_family = "unix")] {
68 let sighup_stream = sighup_stream()?;
69 } else {
70 let sighup_stream = stream::pending();
71 }
72 }
73
74 let rt = runtime.clone();
75 let () = runtime.clone().spawn(async move {
76 let res: anyhow::Result<()> = run_watcher(
77 rt,
78 sources,
79 modules,
80 watch_file,
81 sighup_stream,
82 Some(DEBOUNCE_INTERVAL)
83 ).await;
84
85 match res {
86 Ok(()) => debug!("Config watcher task exiting"),
87 Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
89 }
90 }).context("failed to spawn task")?;
91
92 Ok(())
93}
94
95#[allow(clippy::cognitive_complexity)] async fn run_watcher<R: Runtime>(
100 runtime: R,
101 sources: ConfigurationSources,
102 modules: Vec<Weak<dyn ReconfigurableModule>>,
103 watch_file: bool,
104 mut sighup_stream: impl Stream<Item = ()> + Unpin,
105 debounce_interval: Option<Duration>,
106) -> anyhow::Result<()> {
107 let (tx, mut rx) = file_watcher::channel();
108 let mut watcher = if watch_file {
109 let mut watcher = FileWatcher::builder(runtime.clone());
110 prepare(&mut watcher, &sources)?;
111 Some(watcher.start_watching(tx.clone())?)
112 } else {
113 None
114 };
115
116 debug!("Entering FS event loop");
117
118 loop {
119 select_biased! {
120 event = sighup_stream.next().fuse() => {
121 let Some(()) = event else {
122 break;
123 };
124
125 info!("Received SIGHUP");
126
127 watcher = reload_configuration(
128 runtime.clone(),
129 watcher,
130 &sources,
131 &modules,
132 tx.clone()
133 ).await?;
134 },
135 event = rx.next().fuse() => {
136 if let Some(debounce_interval) = debounce_interval {
137 runtime.sleep(debounce_interval).await;
138 }
139
140 while let Some(_ignore) = rx.try_recv() {
141 }
148 debug!("Config reload event {:?}: reloading configuration.", event);
149 watcher = reload_configuration(
150 runtime.clone(),
151 watcher,
152 &sources,
153 &modules,
154 tx.clone()
155 ).await?;
156 },
157 }
158 }
159
160 Ok(())
161}
162
163#[allow(clippy::cognitive_complexity)] async fn reload_configuration<R: Runtime>(
166 runtime: R,
167 mut watcher: Option<FileWatcher>,
168 sources: &ConfigurationSources,
169 modules: &[Weak<dyn ReconfigurableModule>],
170 tx: FileEventSender,
171) -> anyhow::Result<Option<FileWatcher>> {
172
173 let found_files = if watcher.is_some() {
174 let mut new_watcher = FileWatcher::builder(runtime.clone());
175 let found_files = prepare(&mut new_watcher, sources)
176 .context("FS watch: failed to rescan config and re-establish watch")?;
177 let new_watcher = new_watcher
178 .start_watching(tx.clone())
179 .context("FS watch: failed to start watching config")?;
180 watcher = Some(new_watcher);
181 found_files
182 } else {
183 sources
184 .scan()
185 .context("FS watch: failed to rescan config")?
186 };
187
188 match reconfigure(found_files, modules) {
189 Ok(watch) => {
190 info!("Successfully reloaded configuration.");
191 if watch && watcher.is_none() {
192 info!("Starting watching over configuration.");
193 let mut new_watcher = FileWatcher::builder(runtime.clone());
194 let _found_files = prepare(&mut new_watcher, sources).context(
195 "FS watch: failed to rescan config and re-establish watch: {}",
196 )?;
197 let new_watcher = new_watcher.start_watching(tx.clone()).context(
198 "FS watch: failed to rescan config and re-establish watch: {}",
199 )?;
200 watcher = Some(new_watcher);
201 } else if !watch && watcher.is_some() {
202 info!("Stopped watching over configuration.");
203 watcher = None;
204 }
205 }
206 Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
208 }
209
210 Ok(watcher)
211}
212
213impl<R: Runtime> ReconfigurableModule for TorClient<R> {
214 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
215 TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
216 Ok(())
217 }
218}
219
220pub(crate) struct Application {
222 original_config: ArtiConfig,
227}
228
229impl Application {
230 pub(crate) fn new(cfg: ArtiConfig) -> Self {
233 Self {
234 original_config: cfg,
235 }
236 }
237}
238
239impl ReconfigurableModule for Application {
240 #[allow(clippy::cognitive_complexity)]
243 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
244 let original = &self.original_config;
245 let config = &new.0;
246
247 if config.proxy() != original.proxy() {
248 warn!("Can't (yet) reconfigure proxy settings while arti is running.");
249 }
250 if config.logging() != original.logging() {
251 warn!("Can't (yet) reconfigure logging settings while arti is running.");
252 }
253 #[cfg(feature = "rpc")]
254 if config.rpc != original.rpc {
255 warn!("Can't (yet) change RPC settings while arti is running.");
256 }
257 if config.application().permit_debugging && !original.application().permit_debugging {
258 warn!("Cannot disable application hardening when it has already been enabled.");
259 }
260
261 if !config.application().permit_debugging {
263 #[cfg(feature = "harden")]
264 crate::process::enable_process_hardening()?;
265 }
266
267 Ok(())
268 }
269}
270
271fn prepare<'a, R: Runtime>(
273 watcher: &mut FileWatcherBuilder<R>,
274 sources: &'a ConfigurationSources,
275) -> anyhow::Result<FoundConfigFiles<'a>> {
276 let sources = sources.scan()?;
277 for source in sources.iter() {
278 match source {
279 ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
280 ConfigurationSource::File(file) => watcher.watch_path(file)?,
281 ConfigurationSource::Verbatim(_) => {}
282 }
283 }
284 Ok(sources)
285}
286
287fn reconfigure(
295 found_files: FoundConfigFiles<'_>,
296 reconfigurable: &[Weak<dyn ReconfigurableModule>],
297) -> anyhow::Result<bool> {
298 let _ = reconfigurable;
299 let config = found_files.load()?;
300 let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
301
302 let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
304 let mut has_modules = false;
306
307 for module in reconfigurable {
308 has_modules = true;
309 module.reconfigure(&config)?;
310 }
311
312 Ok(has_modules && config.0.application().watch_configuration)
313}
314
315#[cfg(test)]
316mod test {
317 #![allow(clippy::bool_assert_comparison)]
319 #![allow(clippy::clone_on_copy)]
320 #![allow(clippy::dbg_macro)]
321 #![allow(clippy::mixed_attributes_style)]
322 #![allow(clippy::print_stderr)]
323 #![allow(clippy::print_stdout)]
324 #![allow(clippy::single_char_pattern)]
325 #![allow(clippy::unwrap_used)]
326 #![allow(clippy::unchecked_duration_subtraction)]
327 #![allow(clippy::useless_vec)]
328 #![allow(clippy::needless_pass_by_value)]
329 use crate::ArtiConfigBuilder;
332
333 use super::*;
334 use futures::channel::mpsc;
335 use futures::SinkExt as _;
336 use tor_config::sources::MustRead;
337 use std::path::PathBuf;
338 use std::sync::{Arc, Mutex};
339 use test_temp_dir::{test_temp_dir, TestTempDir};
340 use postage::watch;
341 use tor_async_utils::PostageWatchSenderExt;
342
343 const CONFIG_NAME1: &str = "config1.toml";
345 const CONFIG_NAME2: &str = "config2.toml";
347 const CONFIG_NAME3: &str = "config3.toml";
349
350 struct TestModule {
351 tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
353 }
354
355 impl ReconfigurableModule for TestModule {
356 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
357 let config = new.clone();
358 self.tx.lock().unwrap().maybe_send(|_| config);
359
360 Ok(())
361 }
362 }
363
364 async fn create_module(
368 ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
369 let (tx, mut rx) = watch::channel();
370 let _: ArtiCombinedConfig = rx.next().await.unwrap();
373
374 (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
375 }
376
377 fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
379 let tmp = dir.as_path_untracked().join("tmp");
380 std::fs::write(&tmp, data).unwrap();
381 let path = dir.as_path_untracked().join(name);
382 std::fs::rename(tmp, &path).unwrap();
384 path
385 }
386
387 fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
389 let s = toml::to_string(&config).unwrap();
390 write_file(dir, name, s.as_bytes())
391 }
392
393 #[test]
394 #[ignore] fn watch_single_file() {
396 tor_rtcompat::test_with_one_runtime!(|rt| async move {
397 let temp_dir = test_temp_dir!();
398 let mut config_builder = ArtiConfigBuilder::default();
399 config_builder.application().watch_configuration(true);
400
401 let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
402 let mut cfg_sources = ConfigurationSources::new_empty();
403 cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
404
405 let (module, mut rx) = create_module().await;
406
407 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
410 let runtime = rt.clone();
411 let () = rt.spawn(async move {
412 run_watcher(
413 runtime,
414 cfg_sources,
415 vec![Arc::downgrade(&module)],
416 true,
417 sighup_rx,
418 None,
419 ).await.unwrap();
420 }).unwrap();
421
422 config_builder.logging().log_sensitive_information(true);
423 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
424 sighup_tx.send(()).await.unwrap();
425 let config = rx.next().await.unwrap();
427 assert_eq!(config.0, config_builder.build().unwrap());
428
429 config_builder.logging().log_sensitive_information(false);
431 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
432 let config = rx.next().await.unwrap();
434 assert_eq!(config.0, config_builder.build().unwrap());
435
436 });
437 }
438
439 #[test]
440 fn watch_multiple() {
441 tor_rtcompat::test_with_one_runtime!(|rt| async move {
442 let temp_dir = test_temp_dir!();
443 let mut config_builder1 = ArtiConfigBuilder::default();
444 config_builder1.application().watch_configuration(true);
445
446 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
447 let mut cfg_sources = ConfigurationSources::new_empty();
448 cfg_sources.push_source(
449 ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
450 MustRead::MustRead
451 );
452
453 let (module, mut rx) = create_module().await;
454 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
457 let runtime = rt.clone();
458 let () = rt.spawn(async move {
459 run_watcher(
460 runtime,
461 cfg_sources,
462 vec![Arc::downgrade(&module)],
463 true,
464 sighup_rx,
465 None,
466 ).await.unwrap();
467 }).unwrap();
468
469 config_builder1.logging().log_sensitive_information(true);
470 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
471 sighup_tx.send(()).await.unwrap();
472 let config = rx.next().await.unwrap();
474 assert_eq!(config.0, config_builder1.build().unwrap());
475
476 let mut config_builder2 = ArtiConfigBuilder::default();
477 config_builder2.application().watch_configuration(true);
478 config_builder2.system().max_files(0_u64);
480 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
481 let mut config_builder_combined = config_builder1.clone();
483 config_builder_combined.system().max_files(0_u64);
484 let config = rx.next().await.unwrap();
485 assert_eq!(config.0, config_builder_combined.build().unwrap());
486 config_builder2.logging().console("foo".to_string());
488 let mut config_builder_combined2 = config_builder_combined.clone();
489 config_builder_combined2.logging().console("foo".to_string());
490 let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
491 let config = rx.next().await.unwrap();
492 assert_eq!(config.0, config_builder_combined2.build().unwrap());
493
494 std::fs::remove_file(config3).unwrap();
496 let config = rx.next().await.unwrap();
497 assert_eq!(config.0, config_builder_combined.build().unwrap());
498 });
499 }
500}