1use std::sync::Weak;
4use std::time::Duration;
5
6use anyhow::Context;
7use arti_client::config::Reconfigure;
8use arti_client::TorClient;
9use futures::{select_biased, FutureExt as _, Stream};
10use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12use tor_rtcompat::Runtime;
13use tracing::{debug, error, info, warn};
14use futures::task::SpawnExt;
15use futures::StreamExt;
16
17#[cfg(target_family = "unix")]
18use crate::process::sighup_stream;
19
20#[cfg(not(target_family = "unix"))]
21use futures::stream;
22
23use crate::{ArtiCombinedConfig, ArtiConfig};
24
25const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27
28#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36pub(crate) trait ReconfigurableModule: Send + Sync {
37 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45}
46
47#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58pub(crate) fn watch_for_config_changes<R: Runtime>(
59 runtime: &R,
60 sources: ConfigurationSources,
61 config: &ArtiConfig,
62 modules: Vec<Weak<dyn ReconfigurableModule>>,
63) -> anyhow::Result<()> {
64 let watch_file = config.application().watch_configuration;
65
66 cfg_if::cfg_if! {
67 if #[cfg(target_family = "unix")] {
68 let sighup_stream = sighup_stream()?;
69 } else {
70 let sighup_stream = stream::pending();
71 }
72 }
73
74 let rt = runtime.clone();
75 let () = runtime.clone().spawn(async move {
76 let res: anyhow::Result<()> = run_watcher(
77 rt,
78 sources,
79 modules,
80 watch_file,
81 sighup_stream,
82 Some(DEBOUNCE_INTERVAL)
83 ).await;
84
85 match res {
86 Ok(()) => debug!("Config watcher task exiting"),
87 Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
89 }
90 }).context("failed to spawn task")?;
91
92 Ok(())
93}
94
95async fn run_watcher<R: Runtime>(
99 runtime: R,
100 sources: ConfigurationSources,
101 modules: Vec<Weak<dyn ReconfigurableModule>>,
102 watch_file: bool,
103 mut sighup_stream: impl Stream<Item = ()> + Unpin,
104 debounce_interval: Option<Duration>,
105) -> anyhow::Result<()> {
106 let (tx, mut rx) = file_watcher::channel();
107 let mut watcher = if watch_file {
108 let mut watcher = FileWatcher::builder(runtime.clone());
109 prepare(&mut watcher, &sources)?;
110 Some(watcher.start_watching(tx.clone())?)
111 } else {
112 None
113 };
114
115 debug!("Entering FS event loop");
116
117 loop {
118 select_biased! {
119 event = sighup_stream.next().fuse() => {
120 let Some(()) = event else {
121 break;
122 };
123
124 info!("Received SIGHUP");
125
126 watcher = reload_configuration(
127 runtime.clone(),
128 watcher,
129 &sources,
130 &modules,
131 tx.clone()
132 ).await?;
133 },
134 event = rx.next().fuse() => {
135 if let Some(debounce_interval) = debounce_interval {
136 runtime.sleep(debounce_interval).await;
137 }
138
139 while let Some(_ignore) = rx.try_recv() {
140 }
147 debug!("Config reload event {:?}: reloading configuration.", event);
148 watcher = reload_configuration(
149 runtime.clone(),
150 watcher,
151 &sources,
152 &modules,
153 tx.clone()
154 ).await?;
155 },
156 }
157 }
158
159 Ok(())
160}
161
162async fn reload_configuration<R: Runtime>(
164 runtime: R,
165 mut watcher: Option<FileWatcher>,
166 sources: &ConfigurationSources,
167 modules: &[Weak<dyn ReconfigurableModule>],
168 tx: FileEventSender,
169) -> anyhow::Result<Option<FileWatcher>> {
170
171 let found_files = if watcher.is_some() {
172 let mut new_watcher = FileWatcher::builder(runtime.clone());
173 let found_files = prepare(&mut new_watcher, sources)
174 .context("FS watch: failed to rescan config and re-establish watch")?;
175 let new_watcher = new_watcher
176 .start_watching(tx.clone())
177 .context("FS watch: failed to start watching config")?;
178 watcher = Some(new_watcher);
179 found_files
180 } else {
181 sources
182 .scan()
183 .context("FS watch: failed to rescan config")?
184 };
185
186 match reconfigure(found_files, modules) {
187 Ok(watch) => {
188 info!("Successfully reloaded configuration.");
189 if watch && watcher.is_none() {
190 info!("Starting watching over configuration.");
191 let mut new_watcher = FileWatcher::builder(runtime.clone());
192 let _found_files = prepare(&mut new_watcher, sources).context(
193 "FS watch: failed to rescan config and re-establish watch: {}",
194 )?;
195 let new_watcher = new_watcher.start_watching(tx.clone()).context(
196 "FS watch: failed to rescan config and re-establish watch: {}",
197 )?;
198 watcher = Some(new_watcher);
199 } else if !watch && watcher.is_some() {
200 info!("Stopped watching over configuration.");
201 watcher = None;
202 }
203 }
204 Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
206 }
207
208 Ok(watcher)
209}
210
211impl<R: Runtime> ReconfigurableModule for TorClient<R> {
212 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
213 TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
214 Ok(())
215 }
216}
217
218pub(crate) struct Application {
220 original_config: ArtiConfig,
225}
226
227impl Application {
228 pub(crate) fn new(cfg: ArtiConfig) -> Self {
231 Self {
232 original_config: cfg,
233 }
234 }
235}
236
237impl ReconfigurableModule for Application {
238 #[allow(clippy::cognitive_complexity)]
241 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
242 let original = &self.original_config;
243 let config = &new.0;
244
245 if config.proxy() != original.proxy() {
246 warn!("Can't (yet) reconfigure proxy settings while arti is running.");
247 }
248 if config.logging() != original.logging() {
249 warn!("Can't (yet) reconfigure logging settings while arti is running.");
250 }
251 #[cfg(feature = "rpc")]
252 if config.rpc != original.rpc {
253 warn!("Can't (yet) change RPC settings while arti is running.");
254 }
255 if config.application().permit_debugging && !original.application().permit_debugging {
256 warn!("Cannot disable application hardening when it has already been enabled.");
257 }
258
259 if !config.application().permit_debugging {
261 #[cfg(feature = "harden")]
262 crate::process::enable_process_hardening()?;
263 }
264
265 Ok(())
266 }
267}
268
269fn prepare<'a, R: Runtime>(
271 watcher: &mut FileWatcherBuilder<R>,
272 sources: &'a ConfigurationSources,
273) -> anyhow::Result<FoundConfigFiles<'a>> {
274 let sources = sources.scan()?;
275 for source in sources.iter() {
276 match source {
277 ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
278 ConfigurationSource::File(file) => watcher.watch_path(file)?,
279 ConfigurationSource::Verbatim(_) => {}
280 }
281 }
282 Ok(sources)
283}
284
285fn reconfigure(
293 found_files: FoundConfigFiles<'_>,
294 reconfigurable: &[Weak<dyn ReconfigurableModule>],
295) -> anyhow::Result<bool> {
296 let _ = reconfigurable;
297 let config = found_files.load()?;
298 let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
299
300 let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
302 let mut has_modules = false;
304
305 for module in reconfigurable {
306 has_modules = true;
307 module.reconfigure(&config)?;
308 }
309
310 Ok(has_modules && config.0.application().watch_configuration)
311}
312
313#[cfg(test)]
314mod test {
315 #![allow(clippy::bool_assert_comparison)]
317 #![allow(clippy::clone_on_copy)]
318 #![allow(clippy::dbg_macro)]
319 #![allow(clippy::mixed_attributes_style)]
320 #![allow(clippy::print_stderr)]
321 #![allow(clippy::print_stdout)]
322 #![allow(clippy::single_char_pattern)]
323 #![allow(clippy::unwrap_used)]
324 #![allow(clippy::unchecked_duration_subtraction)]
325 #![allow(clippy::useless_vec)]
326 #![allow(clippy::needless_pass_by_value)]
327 use crate::ArtiConfigBuilder;
330
331 use super::*;
332 use futures::channel::mpsc;
333 use futures::SinkExt as _;
334 use tor_config::sources::MustRead;
335 use std::path::PathBuf;
336 use std::sync::{Arc, Mutex};
337 use test_temp_dir::{test_temp_dir, TestTempDir};
338 use postage::watch;
339 use tor_async_utils::PostageWatchSenderExt;
340
341 const CONFIG_NAME1: &str = "config1.toml";
343 const CONFIG_NAME2: &str = "config2.toml";
345 const CONFIG_NAME3: &str = "config3.toml";
347
348 struct TestModule {
349 tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
351 }
352
353 impl ReconfigurableModule for TestModule {
354 fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
355 let config = new.clone();
356 self.tx.lock().unwrap().maybe_send(|_| config);
357
358 Ok(())
359 }
360 }
361
362 async fn create_module(
366 ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
367 let (tx, mut rx) = watch::channel();
368 let _: ArtiCombinedConfig = rx.next().await.unwrap();
371
372 (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
373 }
374
375 fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
377 let tmp = dir.as_path_untracked().join("tmp");
378 std::fs::write(&tmp, data).unwrap();
379 let path = dir.as_path_untracked().join(name);
380 std::fs::rename(tmp, &path).unwrap();
382 path
383 }
384
385 fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
387 let s = toml::to_string(&config).unwrap();
388 write_file(dir, name, s.as_bytes())
389 }
390
391 #[test]
392 #[ignore] fn watch_single_file() {
394 tor_rtcompat::test_with_one_runtime!(|rt| async move {
395 let temp_dir = test_temp_dir!();
396 let mut config_builder = ArtiConfigBuilder::default();
397 config_builder.application().watch_configuration(true);
398
399 let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
400 let mut cfg_sources = ConfigurationSources::new_empty();
401 cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
402
403 let (module, mut rx) = create_module().await;
404
405 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
408 let runtime = rt.clone();
409 let () = rt.spawn(async move {
410 run_watcher(
411 runtime,
412 cfg_sources,
413 vec![Arc::downgrade(&module)],
414 true,
415 sighup_rx,
416 None,
417 ).await.unwrap();
418 }).unwrap();
419
420 config_builder.logging().log_sensitive_information(true);
421 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
422 sighup_tx.send(()).await.unwrap();
423 let config = rx.next().await.unwrap();
425 assert_eq!(config.0, config_builder.build().unwrap());
426
427 config_builder.logging().log_sensitive_information(false);
429 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
430 let config = rx.next().await.unwrap();
432 assert_eq!(config.0, config_builder.build().unwrap());
433
434 });
435 }
436
437 #[test]
438 fn watch_multiple() {
439 tor_rtcompat::test_with_one_runtime!(|rt| async move {
440 let temp_dir = test_temp_dir!();
441 let mut config_builder1 = ArtiConfigBuilder::default();
442 config_builder1.application().watch_configuration(true);
443
444 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
445 let mut cfg_sources = ConfigurationSources::new_empty();
446 cfg_sources.push_source(
447 ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
448 MustRead::MustRead
449 );
450
451 let (module, mut rx) = create_module().await;
452 let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
455 let runtime = rt.clone();
456 let () = rt.spawn(async move {
457 run_watcher(
458 runtime,
459 cfg_sources,
460 vec![Arc::downgrade(&module)],
461 true,
462 sighup_rx,
463 None,
464 ).await.unwrap();
465 }).unwrap();
466
467 config_builder1.logging().log_sensitive_information(true);
468 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
469 sighup_tx.send(()).await.unwrap();
470 let config = rx.next().await.unwrap();
472 assert_eq!(config.0, config_builder1.build().unwrap());
473
474 let mut config_builder2 = ArtiConfigBuilder::default();
475 config_builder2.application().watch_configuration(true);
476 config_builder2.system().max_files(0_u64);
478 let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
479 let mut config_builder_combined = config_builder1.clone();
481 config_builder_combined.system().max_files(0_u64);
482 let config = rx.next().await.unwrap();
483 assert_eq!(config.0, config_builder_combined.build().unwrap());
484 config_builder2.logging().console("foo".to_string());
486 let mut config_builder_combined2 = config_builder_combined.clone();
487 config_builder_combined2.logging().console("foo".to_string());
488 let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
489 let config = rx.next().await.unwrap();
490 assert_eq!(config.0, config_builder_combined2.build().unwrap());
491
492 std::fs::remove_file(config3).unwrap();
494 let config = rx.next().await.unwrap();
495 assert_eq!(config.0, config_builder_combined.build().unwrap());
496 });
497 }
498}