1use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, instrument, warn};
14use tor_rtcompat::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58#[instrument(level = "trace", skip_all)]
59pub(crate) fn watch_for_config_changes<R: Runtime>(
60 runtime: &R,
61 sources: ConfigurationSources,
62 config: &ArtiConfig,
63 modules: Vec<Weak<dyn ReconfigurableModule>>,
64) -> anyhow::Result<()> {
65 let watch_file = config.application().watch_configuration;
66
67 cfg_if::cfg_if! {
68 if #[cfg(target_family = "unix")] {
69 let sighup_stream = sighup_stream()?;
70 } else {
71 let sighup_stream = stream::pending();
72 }
73 }
74
75 let rt = runtime.clone();
76 let () = runtime.clone().spawn(async move {
77 let res: anyhow::Result<()> = run_watcher(
78 rt,
79 sources,
80 modules,
81 watch_file,
82 sighup_stream,
83 Some(DEBOUNCE_INTERVAL)
84 ).await;
85
86 match res {
87 Ok(()) => debug!("Config watcher task exiting"),
88 Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
90 }
91 }).context("failed to spawn task")?;
92
93 Ok(())
94}
95
96#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
101async fn run_watcher<R: Runtime>(
102 runtime: R,
103 sources: ConfigurationSources,
104 modules: Vec<Weak<dyn ReconfigurableModule>>,
105 watch_file: bool,
106 mut sighup_stream: impl Stream<Item = ()> + Unpin,
107 debounce_interval: Option<Duration>,
108) -> anyhow::Result<()> {
109 let (tx, mut rx) = file_watcher::channel();
110 let mut watcher = if watch_file {
111 let mut watcher = FileWatcher::builder(runtime.clone());
112 prepare(&mut watcher, &sources)?;
113 Some(watcher.start_watching(tx.clone())?)
114 } else {
115 None
116 };
117
118 debug!("Entering FS event loop");
119
120 loop {
121 select_biased! {
122 event = sighup_stream.next().fuse() => {
123 let Some(()) = event else {
124 break;
125 };
126
127 info!("Received SIGHUP");
128
129 watcher = reload_configuration(
130 runtime.clone(),
131 watcher,
132 &sources,
133 &modules,
134 tx.clone()
135 ).await?;
136 },
137 event = rx.next().fuse() => {
138 if let Some(debounce_interval) = debounce_interval {
139 runtime.sleep(debounce_interval).await;
140 }
141
142 while let Some(_ignore) = rx.try_recv() {
143 }
150 debug!("Config reload event {:?}: reloading configuration.", event);
151 watcher = reload_configuration(
152 runtime.clone(),
153 watcher,
154 &sources,
155 &modules,
156 tx.clone()
157 ).await?;
158 },
159 }
160 }
161
162 Ok(())
163}
164
165#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
168async fn reload_configuration<R: Runtime>(
169 runtime: R,
170 mut watcher: Option<FileWatcher>,
171 sources: &ConfigurationSources,
172 modules: &[Weak<dyn ReconfigurableModule>],
173 tx: FileEventSender,
174) -> anyhow::Result<Option<FileWatcher>> {
175
176 let found_files = if watcher.is_some() {
177 let mut new_watcher = FileWatcher::builder(runtime.clone());
178 let found_files = prepare(&mut new_watcher, sources)
179 .context("FS watch: failed to rescan config and re-establish watch")?;
180 let new_watcher = new_watcher
181 .start_watching(tx.clone())
182 .context("FS watch: failed to start watching config")?;
183 watcher = Some(new_watcher);
184 found_files
185 } else {
186 sources
187 .scan()
188 .context("FS watch: failed to rescan config")?
189 };
190
191 match reconfigure(found_files, modules) {
192 Ok(watch) => {
193 info!("Successfully reloaded configuration.");
194 if watch && watcher.is_none() {
195 info!("Starting watching over configuration.");
196 let mut new_watcher = FileWatcher::builder(runtime.clone());
197 let _found_files = prepare(&mut new_watcher, sources).context(
198 "FS watch: failed to rescan config and re-establish watch: {}",
199 )?;
200 let new_watcher = new_watcher.start_watching(tx.clone()).context(
201 "FS watch: failed to rescan config and re-establish watch: {}",
202 )?;
203 watcher = Some(new_watcher);
204 } else if !watch && watcher.is_some() {
205 info!("Stopped watching over configuration.");
206 watcher = None;
207 }
208 }
209 Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
211 }
212
213 Ok(watcher)
214}
215
216impl<R: Runtime> ReconfigurableModule for TorClient<R> {
217 #[instrument(level = "trace", skip_all)]
218 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
219 TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
220 Ok(())
221 }
222}
223
224pub(crate) struct Application {
226 original_config: ArtiConfig,
231}
232
233impl Application {
234 pub(crate) fn new(cfg: ArtiConfig) -> Self {
237 Self {
238 original_config: cfg,
239 }
240 }
241}
242
243impl ReconfigurableModule for Application {
244 #[allow(clippy::cognitive_complexity)]
247 #[instrument(level = "trace", skip_all)]
248 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
249 let original = &self.original_config;
250 let config = &new.0;
251
252 if config.proxy() != original.proxy() {
253 warn!("Can't (yet) reconfigure proxy settings while arti is running.");
254 }
255 if config.logging() != original.logging() {
256 warn!("Can't (yet) reconfigure logging settings while arti is running.");
257 }
258 #[cfg(feature = "rpc")]
259 if config.rpc != original.rpc {
260 warn!("Can't (yet) change RPC settings while arti is running.");
261 }
262 if config.application().permit_debugging && !original.application().permit_debugging {
263 warn!("Cannot disable application hardening when it has already been enabled.");
264 }
265
266 if !config.application().permit_debugging {
268 #[cfg(feature = "harden")]
269 crate::process::enable_process_hardening()?;
270 }
271
272 Ok(())
273 }
274}
275
276fn prepare<'a, R: Runtime>(
278 watcher: &mut FileWatcherBuilder<R>,
279 sources: &'a ConfigurationSources,
280) -> anyhow::Result<FoundConfigFiles<'a>> {
281 let sources = sources.scan()?;
282 for source in sources.iter() {
283 match source {
284 ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
285 ConfigurationSource::File(file) => watcher.watch_path(file)?,
286 ConfigurationSource::Verbatim(_) => {}
287 }
288 }
289 Ok(sources)
290}
291
292#[instrument(level = "trace", skip_all)]
300fn reconfigure(
301 found_files: FoundConfigFiles<'_>,
302 reconfigurable: &[Weak<dyn ReconfigurableModule>],
303) -> anyhow::Result<bool> {
304 let _ = reconfigurable;
305 let config = found_files.load()?;
306 let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
307
308 let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
310 let mut has_modules = false;
312
313 for module in reconfigurable {
314 has_modules = true;
315 module.reconfigure(&config)?;
316 }
317
318 Ok(has_modules && config.0.application().watch_configuration)
319}
320
321#[cfg(test)]
322mod test {
323 #![allow(clippy::bool_assert_comparison)]
325 #![allow(clippy::clone_on_copy)]
326 #![allow(clippy::dbg_macro)]
327 #![allow(clippy::mixed_attributes_style)]
328 #![allow(clippy::print_stderr)]
329 #![allow(clippy::print_stdout)]
330 #![allow(clippy::single_char_pattern)]
331 #![allow(clippy::unwrap_used)]
332 #![allow(clippy::unchecked_time_subtraction)]
333 #![allow(clippy::useless_vec)]
334 #![allow(clippy::needless_pass_by_value)]
335 use crate::ArtiConfigBuilder;
338
339 use super::*;
340 use futures::channel::mpsc;
341 use futures::SinkExt as _;
342 use tor_config::sources::MustRead;
343 use std::path::PathBuf;
344 use std::sync::{Arc, Mutex};
345 use test_temp_dir::{test_temp_dir, TestTempDir};
346 use postage::watch;
347 use tor_async_utils::PostageWatchSenderExt;
348
349 const CONFIG_NAME1: &str = "config1.toml";
351 const CONFIG_NAME2: &str = "config2.toml";
353 const CONFIG_NAME3: &str = "config3.toml";
355
356 struct TestModule {
357 tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
359 }
360
361 impl ReconfigurableModule for TestModule {
362 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
363 let config = new.clone();
364 self.tx.lock().unwrap().maybe_send(|_| config);
365
366 Ok(())
367 }
368 }
369
370 async fn create_module(
374 ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
375 let (tx, mut rx) = watch::channel();
376 let _: ArtiCombinedConfig = rx.next().await.unwrap();
379
380 (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
381 }
382
383 fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
385 let tmp = dir.as_path_untracked().join("tmp");
386 std::fs::write(&tmp, data).unwrap();
387 let path = dir.as_path_untracked().join(name);
388 std::fs::rename(tmp, &path).unwrap();
390 path
391 }
392
393 fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
395 let s = toml::to_string(&config).unwrap();
396 write_file(dir, name, s.as_bytes())
397 }
398
399 #[test]
400 fn watch_single_file() {
401 tor_rtcompat::test_with_one_runtime!(|rt| async move {
402 let temp_dir = test_temp_dir!();
403 let mut config_builder = ArtiConfigBuilder::default();
404 config_builder.application().watch_configuration(true);
405
406 let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
407 let mut cfg_sources = ConfigurationSources::new_empty();
408 cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
409
410 let (module, mut rx) = create_module().await;
411
412 config_builder.logging().log_sensitive_information(true);
413 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
414
415 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
418 let runtime = rt.clone();
419 let () = rt.spawn(async move {
420 run_watcher(
421 runtime,
422 cfg_sources,
423 vec![Arc::downgrade(&module)],
424 true,
425 sighup_rx,
426 None,
427 ).await.unwrap();
428 }).unwrap();
429
430 sighup_tx.send(()).await.unwrap();
431
432 let config = rx.next().await.unwrap();
434
435 assert_eq!(config.0, config_builder.build().unwrap());
436
437 config_builder.logging().log_sensitive_information(false);
439 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
440 let config = rx.next().await.unwrap();
442 assert_eq!(config.0, config_builder.build().unwrap());
443
444 });
445 }
446
447 #[test]
449 #[ignore]
450 fn watch_multiple() {
451 tor_rtcompat::test_with_one_runtime!(|rt| async move {
452 let temp_dir = test_temp_dir!();
453 let mut config_builder1 = ArtiConfigBuilder::default();
454 config_builder1.application().watch_configuration(true);
455
456 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
457 let mut cfg_sources = ConfigurationSources::new_empty();
458 cfg_sources.push_source(
459 ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
460 MustRead::MustRead
461 );
462
463 let (module, mut rx) = create_module().await;
464 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
467 let runtime = rt.clone();
468 let () = rt.spawn(async move {
469 run_watcher(
470 runtime,
471 cfg_sources,
472 vec![Arc::downgrade(&module)],
473 true,
474 sighup_rx,
475 None,
476 ).await.unwrap();
477 }).unwrap();
478
479 config_builder1.logging().log_sensitive_information(true);
480 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
481 sighup_tx.send(()).await.unwrap();
482 let config = rx.next().await.unwrap();
484 assert_eq!(config.0, config_builder1.build().unwrap());
485
486 let mut config_builder2 = ArtiConfigBuilder::default();
487 config_builder2.application().watch_configuration(true);
488 config_builder2.system().max_files(0_u64);
490 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
491 let mut config_builder_combined = config_builder1.clone();
493 config_builder_combined.system().max_files(0_u64);
494 let config = rx.next().await.unwrap();
495 assert_eq!(config.0, config_builder_combined.build().unwrap());
496 config_builder2.logging().console("foo".to_string());
498 let mut config_builder_combined2 = config_builder_combined.clone();
499 config_builder_combined2.logging().console("foo".to_string());
500 let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
501 let config = rx.next().await.unwrap();
502 assert_eq!(config.0, config_builder_combined2.build().unwrap());
503
504 std::fs::remove_file(config3).unwrap();
506 let config = rx.next().await.unwrap();
507 assert_eq!(config.0, config_builder_combined.build().unwrap());
508 });
509 }
510}