1
//! Implement a cache for onion descriptors and the facility to remember a bit
2
//! about onion service history.
3

            
4
use std::fmt::Debug;
5
use std::mem;
6
use std::panic::AssertUnwindSafe;
7
use std::sync::{Arc, Mutex, MutexGuard};
8
use std::time::{Duration, Instant};
9

            
10
use futures::task::{SpawnError, SpawnExt as _};
11
use futures::FutureExt as _;
12

            
13
use async_trait::async_trait;
14
use educe::Educe;
15
use either::Either::{self, *};
16
use postage::stream::Stream as _;
17
use tracing::{debug, error, trace};
18

            
19
use safelog::sensitive as sv;
20
use tor_basic_utils::define_accessor_trait;
21
use tor_circmgr::isolation::Isolation;
22
use tor_error::{debug_report, error_report, internal, Bug, ErrorReport as _};
23
use tor_hscrypto::pk::HsId;
24
use tor_netdir::NetDir;
25
use tor_rtcompat::Runtime;
26

            
27
use crate::isol_map;
28
use crate::{ConnError, HsClientConnector, HsClientSecretKeys};
29

            
30
slotmap_careful::new_key_type! {
31
    struct TableIndex;
32
}
33

            
34
/// Configuration, currently just some retry parameters
35
#[derive(Default, Debug)]
36
// This is not really public.
37
// It has to be `pub` because it appears in one of the methods in `MockableConnectorData`.
38
// That has to be because that trait is a bound on a parameter for `HsClientConnector`.
39
// `Config` is not re-exported.  (This is isomorphic to the trait sealing pattern.)
40
//
41
// This means that this struct cannot live in the crate root, so we put it here.
42
pub struct Config {
43
    /// Retry parameters
44
    pub(crate) retry: tor_circmgr::CircuitTiming,
45
}
46

            
47
define_accessor_trait! {
48
    /// Configuration for an HS client connector
49
    ///
50
    /// If the HS client connector gains new configurabilities, this trait will gain additional
51
    /// supertraits, as an API break.
52
    ///
53
    /// Prefer to use `TorClientConfig`, which will always implement this trait.
54
    //
55
    // This arrangement is very like that for `CircMgrConfig`.
56
    pub trait HsClientConnectorConfig {
57
        circuit_timing: tor_circmgr::CircuitTiming,
58
    }
59
}
60

            
61
/// Number of times we're willing to iterate round the state machine loop
62
///
63
/// **Not** the number of retries of failed descriptor downloads, circuits, etc.
64
///
65
/// The state machine loop is a condition variable loop.
66
/// It repeatedly transforms the [`ServiceState`] to try to get to `Open`,
67
/// converting stale data to `Closed` and `Closed` to `Working`, and so on.
68
/// This ought only to go forwards so in principle we could use an infinite loop.
69
/// But if we have a logic error, we want to crash eventually.
70
/// The `rechecks` counter is for detecting such a situation.
71
///
72
/// This is fairly arbitrary, but we shouldn't get anywhere near it.
73
///
74
/// Note that this is **not** a number of operational retries
75
/// of fallible retriable operations.
76
/// Such retries are handled in [`connect.rs`](crate::connect).
77
const MAX_RECHECKS: u32 = 10;
78

            
79
/// C Tor `MaxCircuitDirtiness`
80
///
81
/// As per
82
///    <https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914433>
83
///
84
/// And C Tor's `tor(1)`, which says:
85
///
86
/// > MaxCircuitDirtiness NUM
87
/// >
88
/// > Feel free to reuse a circuit that was first used at most NUM
89
/// > seconds ago, but never attach a new stream to a circuit that is
90
/// > too old.  For hidden services, this applies to the last time a
91
/// > circuit was used, not the first.  Circuits with streams
92
/// > constructed with SOCKS authentication via SocksPorts that have
93
/// > KeepAliveIsolateSOCKSAuth also remain alive for
94
/// > MaxCircuitDirtiness seconds after carrying the last such
95
/// > stream. (Default: 10 minutes)
96
///
97
/// However, we're not entirely sure this is the right behaviour.
98
/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
99
///
100
// TODO SPEC: Explain C Tor `MaxCircuitDirtiness` behaviour
101
//
102
// TODO HS CFG: This should be configurable somehow
103
const RETAIN_CIRCUIT_AFTER_LAST_USE: Duration = Duration::from_secs(10 * 60);
104

            
105
/// How long to retain cached data about a hidden service
106
///
107
/// This is simply to reclaim space, not for correctness.
108
/// So we only check this during housekeeping, not operation.
109
///
110
/// The starting point for this interval is the last time we used the data,
111
/// or a circuit derived from it.
112
///
113
/// Note that this is a *maximum* for the length of time we will retain a descriptor;
114
/// HS descriptors' lifetimes (as declared in the descriptor) *are* honoured;
115
/// but that's done by the code in `connect.rs`, not here.
116
///
117
/// We're not sure this is the right value.
118
/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
119
//
120
// TODO SPEC: State how long IPT and descriptor data should be retained after use
121
//
122
// TODO HS CFG: Perhaps this should be configurable somehow?
123
const RETAIN_DATA_AFTER_LAST_USE: Duration = Duration::from_secs(48 * 3600 /*hours*/);
124

            
125
/// Hidden services;, our connections to them, and history of connections, etc.
126
///
127
/// Table containing state of our ideas about services.
128
/// Data structure is keyed (indexed) by:
129
///  * `HsId`, hidden service identity
130
///  * any secret keys we are to use
131
///  * circuit isolation
132
///
133
/// We treat different values for any of the above as completely independent,
134
/// except that we try isolation joining (narrowing) if everything else matches.
135
///
136
/// In other words,
137
///  * Two HS connection requests cannot share state and effort
138
///    (descriptor downloads, descriptors, intro pt history)
139
///    unless the restricted discovery keys to be used are the same.
140
///  * This criterion is checked before looking at isolations,
141
///    which may further restrict sharing:
142
///    Two HS connection requests will only share state subject to isolations.
143
///
144
/// Here "state and effort" includes underlying circuits such as hsdir circuits,
145
/// since each HS connection state will use `launch_specific_isolated` for those.
146
#[derive(Default, Debug)]
147
pub(crate) struct Services<D: MockableConnectorData> {
148
    /// The actual records of our connections/attempts for each service, as separated
149
    records: isol_map::MultikeyIsolatedMap<TableIndex, HsId, HsClientSecretKeys, ServiceState<D>>,
150

            
151
    /// Configuration
152
    ///
153
    /// `Arc` so that it can be shared with individual hs connector tasks
154
    config: Arc<Config>,
155
}
156

            
157
/// Entry in the 2nd-level lookup array
158
#[allow(dead_code)] // This alias is here for documentation if nothing else
159
type ServiceRecord<D> = isol_map::Record<HsClientSecretKeys, ServiceState<D>>;
160

            
161
/// Value in the `Services` data structure
162
///
163
/// State and history of of our connections, including connection to any connection task.
164
///
165
/// `last_used` is used to expire data eventually.
166
//
167
// TODO unify this with channels and circuits.  See arti#778.
168
#[derive(Educe)]
169
#[educe(Debug)]
170
enum ServiceState<D: MockableConnectorData> {
171
    /// We don't have a circuit
172
    Closed {
173
        /// The state
174
        data: D,
175
        /// Last time we touched this, including reuse
176
        last_used: Instant,
177
    },
178
    /// We have an open circuit, which we can (hopefully) just use
179
    Open {
180
        /// The state
181
        data: D,
182
        /// The circuit
183
        #[educe(Debug(ignore))]
184
        circuit: Arc<D::ClientCirc>,
185
        /// Last time we touched this, including reuse
186
        ///
187
        /// This is set when we created the circuit, and updated when we
188
        /// hand out this circuit again in response to a new request.
189
        ///
190
        /// We believe this mirrors C Tor behaviour;
191
        /// see [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
192
        last_used: Instant,
193
        /// We have a task that will close the circuit when required
194
        ///
195
        /// This field serves to require construction sites of Open
196
        /// to demonstrate that there *is* an expiry task.
197
        /// In the future, it may also serve to cancel old expiry tasks.
198
        circuit_expiry_task: CircuitExpiryTask,
199
    },
200
    /// We have a task trying to find the service and establish the circuit
201
    ///
202
    /// CachedData is owned by the task.
203
    Working {
204
        /// Signals instances of `get_or_launch_connection` when the task completes
205
        barrier_recv: postage::barrier::Receiver,
206
        /// Where the task will store the error.
207
        ///
208
        /// Lock hierarchy: this lock is "inside" the big lock on `Services`.
209
        error: Arc<Mutex<Option<ConnError>>>,
210
    },
211
    /// Dummy value for use with temporary mem replace
212
    Dummy,
213
}
214

            
215
impl<D: MockableConnectorData> ServiceState<D> {
216
    /// Make a new (blank) `ServiceState::Closed`
217
18
    fn blank(runtime: &impl Runtime) -> Self {
218
18
        ServiceState::Closed {
219
18
            data: D::default(),
220
18
            last_used: runtime.now(),
221
18
        }
222
18
    }
223
}
224

            
225
/// "Continuation" return type from `obtain_circuit_or_continuation_info`
226
type Continuation = (Arc<Mutex<Option<ConnError>>>, postage::barrier::Receiver);
227

            
228
/// Represents a task which is waiting to see when the circuit needs to be expired
229
///
230
/// TODO: Replace this with a task handle that cancels the task when dropped.
231
/// Until then, if the circuit is closed before then, the expiry task will
232
/// uselessly wake up some time later.
233
#[derive(Debug)] // Not Clone
234
struct CircuitExpiryTask {}
235
// impl Drop already, partly to allow explicit drop(CircuitExpiryTask) without clippy complaint
236
impl Drop for CircuitExpiryTask {
237
12
    fn drop(&mut self) {}
238
}
239

            
240
/// Obtain a circuit from the `Services` table, or return a continuation
241
///
242
/// This is the workhorse function for `get_or_launch_connection`.
243
///
244
/// `get_or_launch_connection`, together with `obtain_circuit_or_continuation_info`,
245
/// form a condition variable loop:
246
///
247
/// We check to see if we have a circuit.  If so, we return it.
248
/// Otherwise, we make sure that a circuit is being constructed,
249
/// and then go into a condvar wait;
250
/// we'll be signaled when the construction completes.
251
///
252
/// So the connection task we spawn does not return the circuit, or error,
253
/// via an inter-task stream.
254
/// It stores it in the data structure and wakes up all the client tasks.
255
/// (This means there is only one success path for the client task code.)
256
///
257
/// There are some wrinkles:
258
///
259
/// ### Existence of this as a separate function
260
///
261
/// The usual structure for a condition variable loop would be something like this:
262
///
263
/// ```rust,ignore
264
/// loop {
265
///    test state and maybe break;
266
///    cv.wait(guard).await; // consumes guard, unlocking after enqueueing us as a waiter
267
///    guard = lock();
268
/// }
269
/// ```
270
///
271
/// However, Rust does not currently understand that the mutex is not
272
/// actually a captured variable held across an await point,
273
/// when the variable is consumed before the await, and re-stored afterwards.
274
/// As a result, the async future becomes erroneously `!Send`:
275
/// <https://github.com/rust-lang/rust/issues/104883>.
276
/// We want the unstable feature `-Zdrop-tracking`:
277
/// <https://github.com/rust-lang/rust/issues/97331>.
278
///
279
/// Instead, to convince the compiler, we must use a scope-based drop of the mutex guard.
280
/// That means converting the "test state and maybe break" part into a sub-function.
281
/// That's what this function is.
282
///
283
/// It returns `Right` if the loop should be exited, returning the circuit to the caller.
284
/// It returns `Left` if the loop needs to do a condition variable wait.
285
///
286
/// ### We're using a barrier as a condition variable
287
///
288
/// We want to be signaled when the task exits.  Indeed, *only* when it exits.
289
/// This functionality is most conveniently in a `postage::barrier`.
290
///
291
/// ### Nested loops
292
///
293
/// Sometimes we want to go round again *without* unlocking.
294
/// Sometimes we must unlock and wait and relock.
295
///
296
/// The drop tracking workaround (see above) means we have to do these two
297
/// in separate scopes.
298
/// So there are two nested loops: one here, and one in `get_or_launch_connection`.
299
/// They both use the same backstop rechecks counter.
300
68
fn obtain_circuit_or_continuation_info<D: MockableConnectorData>(
301
68
    connector: &HsClientConnector<impl Runtime, D>,
302
68
    netdir: &Arc<NetDir>,
303
68
    hsid: &HsId,
304
68
    secret_keys: &HsClientSecretKeys,
305
68
    table_index: TableIndex,
306
68
    rechecks: &mut impl Iterator,
307
68
    mut guard: MutexGuard<'_, Services<D>>,
308
68
) -> Result<Either<Continuation, Arc<D::ClientCirc>>, ConnError> {
309
68
    let blank_state = || ServiceState::blank(&connector.runtime);
310

            
311
94
    for _recheck in rechecks {
312
94
        let record = guard
313
94
            .records
314
94
            .by_index_mut(table_index)
315
94
            .ok_or_else(|| internal!("guard table entry vanished!"))?;
316
94
        let state = &mut **record;
317
94

            
318
94
        trace!("HS conn state: {state:?}");
319

            
320
94
        let (data, barrier_send) = match state {
321
            ServiceState::Open {
322
                data: _,
323
40
                circuit,
324
40
                last_used,
325
40
                circuit_expiry_task: _,
326
40
            } => {
327
40
                let now = connector.runtime.now();
328
40
                if !D::circuit_is_ok(circuit) {
329
                    // Well that's no good, we need a fresh one, but keep the data
330
                    let data = match mem::replace(state, ServiceState::Dummy) {
331
                        ServiceState::Open {
332
                            data,
333
                            last_used: _,
334
                            circuit: _,
335
                            circuit_expiry_task: _,
336
                        } => data,
337
                        _ => panic!("state changed between matches"),
338
                    };
339
                    *state = ServiceState::Closed {
340
                        data,
341
                        last_used: now,
342
                    };
343
                    continue;
344
40
                }
345
40
                *last_used = now;
346
40
                // No need to tell expiry task about revised expiry time;
347
40
                // it will see the new last_used when it wakes up at the old expiry time.
348
40

            
349
40
                return Ok::<_, ConnError>(Right(circuit.clone()));
350
            }
351
            ServiceState::Working {
352
28
                barrier_recv,
353
28
                error,
354
            } => {
355
                if !matches!(
356
28
                    barrier_recv.try_recv(),
357
                    Err(postage::stream::TryRecvError::Pending)
358
                ) {
359
                    // This information is stale; the task no longer exists.
360
                    // We want information from a fresh task.
361
                    *state = blank_state();
362
                    continue;
363
28
                }
364
28
                let barrier_recv = barrier_recv.clone();
365
28

            
366
28
                // This clone of the error field Arc<Mutex<..>> allows us to collect errors
367
28
                // which happened due to the currently-running task, which we have just
368
28
                // found exists.  Ie, it will see errors that occurred after we entered
369
28
                // `get_or_launch`.  Stale errors, from previous tasks, were cleared above.
370
28
                let error = error.clone();
371
28

            
372
28
                // Wait for the task to complete (at which point it drops the barrier)
373
28
                return Ok(Left((error, barrier_recv)));
374
            }
375
            ServiceState::Closed { .. } => {
376
26
                let (barrier_send, barrier_recv) = postage::barrier::channel();
377
26
                let data = match mem::replace(
378
26
                    state,
379
26
                    ServiceState::Working {
380
26
                        barrier_recv,
381
26
                        error: Arc::new(Mutex::new(None)),
382
26
                    },
383
26
                ) {
384
26
                    ServiceState::Closed { data, .. } => data,
385
                    _ => panic!("state changed between matches"),
386
                };
387
26
                (data, barrier_send)
388
            }
389
            ServiceState::Dummy => {
390
                *state = blank_state();
391
                return Err(internal!("HS connector found dummy state").into());
392
            }
393
        };
394

            
395
        // Make a connection
396
26
        let runtime = &connector.runtime;
397
26
        let connector = (*connector).clone();
398
26
        let config = guard.config.clone();
399
26
        let netdir = netdir.clone();
400
26
        let secret_keys = secret_keys.clone();
401
26
        let hsid = *hsid;
402
26
        let connect_future = async move {
403
26
            let mut data = data;
404

            
405
26
            let got = AssertUnwindSafe(D::connect(
406
26
                &connector,
407
26
                netdir,
408
26
                config,
409
26
                hsid,
410
26
                &mut data,
411
26
                secret_keys,
412
26
            ))
413
26
            .catch_unwind()
414
26
            .await
415
26
            .unwrap_or_else(|_| {
416
                data = D::default();
417
                Err(internal!("hidden service connector task panicked!").into())
418
26
            });
419
26
            let now = connector.runtime.now();
420
26
            let last_used = now;
421
26

            
422
26
            let got = got.and_then(|circuit| {
423
26
                let circuit_expiry_task = ServiceState::spawn_circuit_expiry_task(
424
26
                    &connector,
425
26
                    hsid,
426
26
                    table_index,
427
26
                    last_used,
428
26
                    now,
429
26
                )
430
26
                .map_err(|cause| ConnError::Spawn {
431
                    spawning: "circuit expiry task",
432
                    cause: cause.into(),
433
26
                })?;
434
26
                Ok((circuit, circuit_expiry_task))
435
26
            });
436
26

            
437
26
            let got_error = got.as_ref().map(|_| ()).map_err(Clone::clone);
438

            
439
            // block for handling inability to store
440
26
            let stored = async {
441
26
                let mut guard = connector.services()?;
442
26
                let record = guard
443
26
                    .records
444
26
                    .by_index_mut(table_index)
445
26
                    .ok_or_else(|| internal!("HS table entry removed while task running"))?;
446
                // Always match this, so we check what we're overwriting
447
26
                let state = &mut **record;
448
26
                let error_store = match state {
449
26
                    ServiceState::Working { error, .. } => error,
450
                    _ => return Err(internal!("HS task found state other than Working")),
451
                };
452

            
453
26
                match got {
454
26
                    Ok((circuit, circuit_expiry_task)) => {
455
26
                        *state = ServiceState::Open {
456
26
                            data,
457
26
                            circuit,
458
26
                            last_used,
459
26
                            circuit_expiry_task,
460
26
                        }
461
                    }
462
                    Err(error) => {
463
                        let mut error_store = error_store
464
                            .lock()
465
                            .map_err(|_| internal!("Working error poisoned, cannot store error"))?;
466
                        *error_store = Some(error);
467
                    }
468
                };
469

            
470
26
                Ok(())
471
26
            }
472
26
            .await;
473

            
474
26
            match (got_error, stored) {
475
26
                (Ok::<(), ConnError>(()), Ok::<(), Bug>(())) => {}
476
                (Err(got_error), Ok(())) => {
477
                    debug_report!(got_error, "HS connection failure for {}", sv(hsid));
478
                }
479
                (Ok(()), Err(bug)) => {
480
                    error_report!(
481
                        bug,
482
                        "internal error storing built HS circuit for {}",
483
                        sv(hsid)
484
                    );
485
                }
486
                (Err(got_error), Err(bug)) => {
487
                    // We're reporting two errors, so we'll construct the event
488
                    // manually.
489
                    error!(
490
                        "internal error storing HS connection error for {}: {}; {}",
491
                        sv(hsid),
492
                        got_error.report(),
493
                        bug.report(),
494
                    );
495
                }
496
            };
497
26
            drop(barrier_send);
498
26
        };
499
26
        runtime
500
26
            .spawn_obj(Box::new(connect_future).into())
501
26
            .map_err(|cause| ConnError::Spawn {
502
                spawning: "connection task",
503
                cause: cause.into(),
504
26
            })?;
505
    }
506

            
507
    Err(internal!("HS connector state management malfunction (exceeded MAX_RECHECKS").into())
508
68
}
509

            
510
impl<D: MockableConnectorData> Services<D> {
511
    /// Create a new empty `Services`
512
8
    pub(crate) fn new(config: Config) -> Self {
513
8
        Services {
514
8
            records: Default::default(),
515
8
            config: Arc::new(config),
516
8
        }
517
8
    }
518

            
519
    /// Connect to a hidden service
520
    // We *do* drop guard.  There is *one* await point, just after drop(guard).
521
40
    pub(crate) async fn get_or_launch_connection(
522
40
        connector: &HsClientConnector<impl Runtime, D>,
523
40
        netdir: &Arc<NetDir>,
524
40
        hs_id: HsId,
525
40
        isolation: Box<dyn Isolation>,
526
40
        secret_keys: HsClientSecretKeys,
527
40
    ) -> Result<Arc<D::ClientCirc>, ConnError> {
528
40
        let blank_state = || ServiceState::blank(&connector.runtime);
529

            
530
40
        let mut rechecks = 0..MAX_RECHECKS;
531
40

            
532
68
        let mut obtain = |table_index, guard| {
533
68
            obtain_circuit_or_continuation_info(
534
68
                connector,
535
68
                netdir,
536
68
                &hs_id,
537
68
                &secret_keys,
538
68
                table_index,
539
68
                &mut rechecks,
540
68
                guard,
541
68
            )
542
68
        };
543

            
544
        let mut got;
545
        let table_index;
546
        {
547
40
            let mut guard = connector.services()?;
548
40
            let services = &mut *guard;
549
40

            
550
40
            trace!("HS conn get_or_launch: {hs_id:?} {isolation:?} {secret_keys:?}");
551
            //trace!("HS conn services: {services:?}");
552

            
553
40
            table_index =
554
40
                services
555
40
                    .records
556
40
                    .index_or_insert_with(&hs_id, &secret_keys, isolation, blank_state);
557
40

            
558
40
            let guard = guard;
559
40
            got = obtain(table_index, guard);
560
        }
561
        loop {
562
            // The parts of this loop which run after a `Left` is returned
563
            // logically belong in the case in `obtain_circuit_or_continuation_info`
564
            // for `ServiceState::Working`, where that function decides we need to wait.
565
            // This code has to be out here to help the compiler's drop tracking.
566
            {
567
                // Block to scope the acquisition of `error`, a guard
568
                // for the mutex-protected error field in the state,
569
                // and, for neatness, barrier_recv.
570

            
571
68
                let (error, mut barrier_recv) = match got? {
572
40
                    Right(ret) => return Ok(ret),
573
28
                    Left(continuation) => continuation,
574
28
                };
575
28

            
576
28
                barrier_recv.recv().await;
577

            
578
28
                let error = error
579
28
                    .lock()
580
28
                    .map_err(|_| internal!("Working error poisoned"))?;
581
28
                if let Some(error) = &*error {
582
                    return Err(error.clone());
583
28
                }
584
            }
585

            
586
28
            let guard = connector.services()?;
587

            
588
28
            got = obtain(table_index, guard);
589
        }
590
40
    }
591

            
592
    /// Perform housekeeping - delete data we aren't interested in any more
593
20
    pub(crate) fn run_housekeeping(&mut self, now: Instant) {
594
20
        self.expire_old_data(now);
595
20
    }
596

            
597
    /// Delete data we aren't interested in any more
598
20
    fn expire_old_data(&mut self, now: Instant) {
599
20
        self.records
600
20
            .retain(|hsid, record, _table_index| match &**record {
601
12
                ServiceState::Closed { data: _, last_used } => {
602
12
                    let Some(expiry_time) = last_used.checked_add(RETAIN_DATA_AFTER_LAST_USE)
603
                    else {
604
                        return false;
605
                    };
606
12
                    now <= expiry_time
607
                }
608
8
                ServiceState::Open { .. } | ServiceState::Working { .. } => true,
609
                ServiceState::Dummy => {
610
                    error!(
611
                        "bug: found dummy data during HS housekeeping, for {}",
612
                        sv(hsid)
613
                    );
614
                    false
615
                }
616
20
            });
617
20
    }
618
}
619

            
620
impl<D: MockableConnectorData> ServiceState<D> {
621
    /// Spawn a task that will drop our reference to the rendezvous circuit
622
    /// at `table_index` when it has gone too long without any use.
623
    ///
624
    /// According to [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
625
    //
626
    // As it happens, this function is always called with `last_used` equal to `now`,
627
    // but we pass separate arguments for clarity.
628
26
    fn spawn_circuit_expiry_task(
629
26
        connector: &HsClientConnector<impl Runtime, D>,
630
26
        hsid: HsId,
631
26
        table_index: TableIndex,
632
26
        last_used: Instant,
633
26
        now: Instant,
634
26
    ) -> Result<CircuitExpiryTask, SpawnError> {
635
        /// Returns the duration until expiry, or `None` if it should expire now
636
42
        fn calculate_expiry_wait(last_used: Instant, now: Instant) -> Option<Duration> {
637
42
            let expiry = last_used
638
42
                .checked_add(RETAIN_CIRCUIT_AFTER_LAST_USE)
639
42
                .or_else(|| {
640
                    error!("bug: time overflow calculating HS circuit expiry, killing circuit!");
641
                    None
642
42
                })?;
643
42
            let wait = expiry.checked_duration_since(now).unwrap_or_default();
644
42
            if wait == Duration::ZERO {
645
12
                return None;
646
30
            }
647
30
            Some(wait)
648
42
        }
649

            
650
26
        let mut maybe_wait = calculate_expiry_wait(last_used, now);
651
26
        let () = connector.runtime.spawn({
652
26
            let connector = connector.clone();
653
26
            async move {
654
                // This loop is slightly odd.  The wait ought naturally to be at the end,
655
                // but that would mean a useless re-lock and re-check right after creation,
656
                // or jumping into the middle of the loop.
657
                loop {
658
30
                    if let Some(yes_wait) = maybe_wait {
659
30
                        connector.runtime.sleep(yes_wait).await;
660
                    }
661
                    // If it's None, we can't rely on that to say we should expire it,
662
                    // since that information crossed a time when we didn't hold the lock.
663

            
664
16
                    let Ok(mut guard) = connector.services() else {
665
                        break;
666
                    };
667
16
                    let Some(record) = guard.records.by_index_mut(table_index) else {
668
                        break;
669
                    };
670
16
                    let state = &mut **record;
671
16
                    let last_used = match state {
672
                        ServiceState::Closed { .. } => break,
673
16
                        ServiceState::Open { last_used, .. } => *last_used,
674
                        ServiceState::Working { .. } => break, // someone else will respawn
675
                        ServiceState::Dummy => break,          // someone else will (report and) fix
676
                    };
677
16
                    maybe_wait = calculate_expiry_wait(last_used, connector.runtime.now());
678
16
                    if maybe_wait.is_none() {
679
12
                        match mem::replace(state, ServiceState::Dummy) {
680
                            ServiceState::Open {
681
12
                                data,
682
12
                                circuit,
683
12
                                last_used,
684
12
                                circuit_expiry_task,
685
12
                            } => {
686
12
                                debug!("HS connection expires: {hsid}");
687
12
                                drop(circuit);
688
12
                                drop(circuit_expiry_task); // that's us
689
12
                                *state = ServiceState::Closed { data, last_used };
690
12
                                break;
691
                            }
692
                            _ => panic!("state now {state:?} even though we just saw it Open"),
693
                        }
694
4
                    }
695
                }
696
26
            }
697
26
        })?;
698
26
        Ok(CircuitExpiryTask {})
699
26
    }
700
}
701

            
702
/// Mocking for actual HS connection work, to let us test the `Services` state machine
703
//
704
// Does *not* mock circmgr, chanmgr, etc. - those won't be used by the tests, since our
705
// `connect` won't call them.  But mocking them pollutes many types with `R` and is
706
// generally tiresome.  So let's not.  Instead the tests can make dummy ones.
707
//
708
// This trait is actually crate-private, since it isn't re-exported, but it must
709
// be `pub` because it appears as a default for a type parameter in HsClientConnector.
710
#[async_trait]
711
pub trait MockableConnectorData: Default + Debug + Send + Sync + 'static {
712
    /// Client circuit
713
    type ClientCirc: Sync + Send + 'static;
714

            
715
    /// Mock state
716
    type MockGlobalState: Clone + Sync + Send + 'static;
717

            
718
    /// Connect
719
    async fn connect<R: Runtime>(
720
        connector: &HsClientConnector<R, Self>,
721
        netdir: Arc<NetDir>,
722
        config: Arc<Config>,
723
        hsid: HsId,
724
        data: &mut Self,
725
        secret_keys: HsClientSecretKeys,
726
    ) -> Result<Arc<Self::ClientCirc>, ConnError>;
727

            
728
    /// Is circuit OK?  Ie, not `.is_closing()`.
729
    fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool;
730
}
731

            
732
#[cfg(test)]
733
pub(crate) mod test {
734
    // @@ begin test lint list maintained by maint/add_warning @@
735
    #![allow(clippy::bool_assert_comparison)]
736
    #![allow(clippy::clone_on_copy)]
737
    #![allow(clippy::dbg_macro)]
738
    #![allow(clippy::mixed_attributes_style)]
739
    #![allow(clippy::print_stderr)]
740
    #![allow(clippy::print_stdout)]
741
    #![allow(clippy::single_char_pattern)]
742
    #![allow(clippy::unwrap_used)]
743
    #![allow(clippy::unchecked_duration_subtraction)]
744
    #![allow(clippy::useless_vec)]
745
    #![allow(clippy::needless_pass_by_value)]
746
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
747
    use super::*;
748
    use crate::*;
749
    use futures::{poll, SinkExt};
750
    use std::fmt;
751
    use std::task::Poll::{self, *};
752
    use tokio::pin;
753
    use tokio_crate as tokio;
754
    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
755
    use tor_proto::memquota::ToplevelAccount;
756
    use tor_rtcompat::{test_with_one_runtime, SleepProvider};
757
    use tor_rtmock::MockRuntime;
758
    use tracing_test::traced_test;
759

            
760
    use ConnError as E;
761

            
762
    #[derive(Debug, Default)]
763
    struct MockData {
764
        connect_called: usize,
765
    }
766

            
767
    /// Type indicating what our `connect()` should return; it always makes a fresh MockCirc
768
    type MockGive = Poll<Result<(), E>>;
769

            
770
    #[derive(Debug, Clone)]
771
    struct MockGlobalState {
772
        // things will appear here when we have more sophisticated tests
773
        give: postage::watch::Receiver<MockGive>,
774
    }
775

            
776
    #[derive(Clone, Educe)]
777
    #[educe(Debug)]
778
    struct MockCirc {
779
        #[educe(Debug(method = "debug_arc_mutex"))]
780
        ok: Arc<Mutex<bool>>,
781
        connect_called: usize,
782
    }
783

            
784
    fn debug_arc_mutex(val: &Arc<Mutex<impl Debug>>, f: &mut fmt::Formatter) -> fmt::Result {
785
        write!(f, "@{:?}", Arc::as_ptr(val))?;
786
        let guard = val.lock();
787
        let guard = guard.or_else(|g| {
788
            write!(f, ",POISON")?;
789
            Ok::<_, fmt::Error>(g.into_inner())
790
        })?;
791
        write!(f, " ")?;
792
        Debug::fmt(&*guard, f)
793
    }
794

            
795
    impl PartialEq for MockCirc {
796
        fn eq(&self, other: &MockCirc) -> bool {
797
            Arc::ptr_eq(&self.ok, &other.ok)
798
        }
799
    }
800

            
801
    impl MockCirc {
802
        fn new(connect_called: usize) -> Self {
803
            let ok = Arc::new(Mutex::new(true));
804
            MockCirc { ok, connect_called }
805
        }
806
    }
807

            
808
    #[async_trait]
809
    impl MockableConnectorData for MockData {
810
        type ClientCirc = MockCirc;
811
        type MockGlobalState = MockGlobalState;
812

            
813
        async fn connect<R: Runtime>(
814
            connector: &HsClientConnector<R, MockData>,
815
            _netdir: Arc<NetDir>,
816
            _config: Arc<Config>,
817
            _hsid: HsId,
818
            data: &mut MockData,
819
            _secret_keys: HsClientSecretKeys,
820
        ) -> Result<Arc<Self::ClientCirc>, E> {
821
            data.connect_called += 1;
822
            let make = {
823
                let connect_called = data.connect_called;
824
                move |()| Arc::new(MockCirc::new(connect_called))
825
            };
826
            let mut give = connector.mock_for_state.give.clone();
827
            if let Ready(ret) = &*give.borrow() {
828
                return ret.clone().map(make);
829
            }
830
            loop {
831
                match give.recv().await.expect("EOF on mock_global_state stream") {
832
                    Pending => {}
833
                    Ready(ret) => return ret.map(make),
834
                }
835
            }
836
        }
837

            
838
        fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
839
            *circuit.ok.lock().unwrap()
840
        }
841
    }
842

            
843
    /// Makes a non-empty `HsClientSecretKeys`, containing (somehow) `kk`
844
    fn mk_keys(kk: u8) -> HsClientSecretKeys {
845
        let mut ss = [0_u8; 32];
846
        ss[0] = kk;
847
        let keypair = tor_llcrypto::pk::ed25519::Keypair::from_bytes(&ss);
848
        let mut b = HsClientSecretKeysBuilder::default();
849
        #[allow(deprecated)]
850
        b.ks_hsc_intro_auth(keypair.into());
851
        b.build().unwrap()
852
    }
853

            
854
    fn mk_hsconn<R: Runtime>(
855
        runtime: R,
856
    ) -> (
857
        HsClientConnector<R, MockData>,
858
        HsClientSecretKeys,
859
        postage::watch::Sender<MockGive>,
860
    ) {
861
        let chanmgr = tor_chanmgr::ChanMgr::new(
862
            runtime.clone(),
863
            &Default::default(),
864
            tor_chanmgr::Dormancy::Dormant,
865
            &Default::default(),
866
            ToplevelAccount::new_noop(),
867
        );
868
        let guardmgr = tor_guardmgr::GuardMgr::new(
869
            runtime.clone(),
870
            tor_persist::TestingStateMgr::new(),
871
            &tor_guardmgr::TestConfig::default(),
872
        )
873
        .unwrap();
874

            
875
        let circmgr = Arc::new(
876
            tor_circmgr::CircMgr::new(
877
                &tor_circmgr::TestConfig::default(),
878
                tor_persist::TestingStateMgr::new(),
879
                &runtime,
880
                Arc::new(chanmgr),
881
                &guardmgr,
882
            )
883
            .unwrap(),
884
        );
885
        let circpool = Arc::new(HsCircPool::new(&circmgr));
886
        let (give_send, give) = postage::watch::channel_with(Ready(Ok(())));
887
        let mock_for_state = MockGlobalState { give };
888
        #[allow(clippy::let_and_return)] // we'll probably add more in this function
889
        let hscc = HsClientConnector {
890
            runtime,
891
            circpool,
892
            services: Default::default(),
893
            mock_for_state,
894
        };
895
        let keys = HsClientSecretKeysBuilder::default().build().unwrap();
896
        (hscc, keys, give_send)
897
    }
898

            
899
    #[allow(clippy::unnecessary_wraps)]
900
    fn mk_isol(s: &str) -> Option<NarrowableIsolation> {
901
        Some(NarrowableIsolation(s.into()))
902
    }
903

            
904
    async fn launch_one(
905
        hsconn: &HsClientConnector<impl Runtime, MockData>,
906
        id: u8,
907
        secret_keys: &HsClientSecretKeys,
908
        isolation: Option<NarrowableIsolation>,
909
    ) -> Result<Arc<MockCirc>, ConnError> {
910
        let netdir = tor_netdir::testnet::construct_netdir()
911
            .unwrap_if_sufficient()
912
            .unwrap();
913
        let netdir = Arc::new(netdir);
914

            
915
        let hs_id = {
916
            let mut hs_id = [0_u8; 32];
917
            hs_id[0] = id;
918
            hs_id.into()
919
        };
920
        #[allow(clippy::redundant_closure)] // srsly, that would be worse
921
        let isolation = isolation.unwrap_or_default().into();
922
        Services::get_or_launch_connection(hsconn, &netdir, hs_id, isolation, secret_keys.clone())
923
            .await
924
    }
925

            
926
    #[derive(Default, Debug, Clone)]
927
    // TODO move this to tor-circmgr under a test feature?
928
    pub(crate) struct NarrowableIsolation(pub(crate) String);
929
    impl tor_circmgr::isolation::IsolationHelper for NarrowableIsolation {
930
        fn compatible_same_type(&self, other: &Self) -> bool {
931
            self.join_same_type(other).is_some()
932
        }
933
        fn join_same_type(&self, other: &Self) -> Option<Self> {
934
            Some(if self.0.starts_with(&other.0) {
935
                self.clone()
936
            } else if other.0.starts_with(&self.0) {
937
                other.clone()
938
            } else {
939
                return None;
940
            })
941
        }
942
    }
943

            
944
    #[test]
945
    #[traced_test]
946
    fn simple() {
947
        test_with_one_runtime!(|runtime| async {
948
            let (hsconn, keys, _give_send) = mk_hsconn(runtime);
949

            
950
            let circuit = launch_one(&hsconn, 0, &keys, None).await.unwrap();
951
            eprintln!("{:?}", circuit);
952
        });
953
    }
954

            
955
    #[test]
956
    #[traced_test]
957
    fn expiry() {
958
        MockRuntime::test_with_various(|runtime| async move {
959
            // This is the amount by which we adjust clock advances to make sure we
960
            // hit more or less than a particular value, to avoid edge cases and
961
            // cope with real time advancing too.
962
            // This does *not* represent an actual delay to real test runs.
963
            const TIMEOUT_SLOP: Duration = Duration::from_secs(10);
964

            
965
            let (hsconn, keys, _give_send) = mk_hsconn(runtime.clone());
966

            
967
            let advance = |duration| {
968
                let hsconn = hsconn.clone();
969
                let runtime = &runtime;
970
                async move {
971
                    // let expiry task get going and choose its expiry (wakeup) time
972
                    runtime.progress_until_stalled().await;
973
                    // TODO: Make this use runtime.advance_by() when that's not very slow
974
                    runtime.mock_sleep().advance(duration);
975
                    // let expiry task run
976
                    runtime.progress_until_stalled().await;
977
                    hsconn.services().unwrap().run_housekeeping(runtime.now());
978
                }
979
            };
980

            
981
            // make circuit1
982
            let circuit1 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
983

            
984
            // expire it
985
            advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
986

            
987
            // make circuit2 (a)
988
            let circuit2a = launch_one(&hsconn, 0, &keys, None).await.unwrap();
989
            assert_ne!(circuit1, circuit2a);
990

            
991
            // nearly expire it, then reuse it
992
            advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
993
            let circuit2b = launch_one(&hsconn, 0, &keys, None).await.unwrap();
994
            assert_eq!(circuit2a, circuit2b);
995

            
996
            // nearly expire it again, then reuse it
997
            advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
998
            let circuit2c = launch_one(&hsconn, 0, &keys, None).await.unwrap();
999
            assert_eq!(circuit2a, circuit2c);
            // actually expire it
            advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
            let circuit3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
            assert_ne!(circuit2c, circuit3);
            assert_eq!(circuit3.connect_called, 3);
            advance(RETAIN_DATA_AFTER_LAST_USE + Duration::from_secs(10)).await;
            let circuit4 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
            assert_eq!(circuit4.connect_called, 1);
        });
    }
    #[test]
    #[traced_test]
    fn coalesce() {
        test_with_one_runtime!(|runtime| async {
            let (hsconn, keys, mut give_send) = mk_hsconn(runtime);
            give_send.send(Pending).await.unwrap();
            let c1f = launch_one(&hsconn, 0, &keys, None);
            pin!(c1f);
            for _ in 0..10 {
                assert!(poll!(&mut c1f).is_pending());
            }
            // c2f will find Working
            let c2f = launch_one(&hsconn, 0, &keys, None);
            pin!(c2f);
            for _ in 0..10 {
                assert!(poll!(&mut c1f).is_pending());
                assert!(poll!(&mut c2f).is_pending());
            }
            give_send.send(Ready(Ok(()))).await.unwrap();
            let c1 = c1f.await.unwrap();
            let c2 = c2f.await.unwrap();
            assert_eq!(c1, c2);
            // c2 will find Open
            let c3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
            assert_eq!(c1, c3);
            assert_ne!(c1, launch_one(&hsconn, 1, &keys, None).await.unwrap());
            assert_ne!(
                c1,
                launch_one(&hsconn, 0, &mk_keys(42), None).await.unwrap()
            );
            let c_isol_1 = launch_one(&hsconn, 0, &keys, mk_isol("a")).await.unwrap();
            assert_eq!(c1, c_isol_1); // We can reuse, but now we've narrowed the isol
            let c_isol_2 = launch_one(&hsconn, 0, &keys, mk_isol("b")).await.unwrap();
            assert_ne!(c1, c_isol_2);
        });
    }
}