tor_proto/tunnel/reactor/conflux.rs
1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::{Arc, Mutex};
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use itertools::structs::ExactlyOneError;
14use itertools::Itertools;
15use smallvec::{smallvec, SmallVec};
16use tor_rtcompat::SleepProviderExt as _;
17use tracing::{info, trace, warn};
18
19use tor_async_utils::SinkPrepareExt as _;
20use tor_basic_utils::flatten;
21use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
22use tor_error::{bad_api_usage, internal, Bug};
23use tor_linkspec::HasRelayIds as _;
24
25use crate::circuit::path::HopDetail;
26use crate::circuit::{TunnelMutableState, UniqId};
27use crate::crypto::cell::HopNum;
28use crate::tunnel::reactor::circuit::ConfluxStatus;
29use crate::tunnel::{streammap, TunnelId};
30use crate::util::err::ReactorError;
31
32use super::circuit::CircHop;
33use super::{Circuit, CircuitAction, RemoveLegReason, SendRelayCell};
34
35#[cfg(feature = "conflux")]
36use {
37 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
38 tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
39};
40
41#[cfg(feature = "conflux")]
42pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
43
44/// The maximum number of conflux legs to store in the conflux set SmallVec.
45///
46/// Attempting to store more legs will cause the SmallVec to spill to the heap.
47///
48/// Note: this value was picked arbitrarily and may not be suitable.
49const MAX_CONFLUX_LEGS: usize = 16;
50
51/// A set with one or more circuits.
52///
53/// ### Conflux set life cycle
54///
55/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
56///
57/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
58///
59/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
60/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
61///
62/// The reactor can then turn the `ConfluxSet` into a multi-path set
63/// (a multi-path set is a conflux set that contains more than 1 circuit).
64/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
65/// by the reactor user (also referred to as the "conflux handshake initiator").
66/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
67///
68/// Circuits can be removed from the set using [`ConfluxSet::remove`].
69///
70/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
71/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
72/// This can happen on an explicit shutdown request, or if a fatal error occurs.
73///
74/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
75/// For example, if after being instructed to remove a circuit from the set
76/// using [`ConfluxSet::remove`], the set is completely depleted,
77/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
78/// which will cause the reactor to shut down.
79pub(super) struct ConfluxSet {
80 /// The unique identifier of the tunnel this conflux set belongs to.
81 ///
82 /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
83 /// that gets used for logging purposes.
84 tunnel_id: TunnelId,
85 /// The circuits in this conflux set.
86 legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
87 /// Tunnel state, shared with `ClientCirc`.
88 ///
89 /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
90 mutable: Arc<TunnelMutableState>,
91 /// The unique identifier of the primary leg
92 primary_id: UniqId,
93 /// The join point of the set, if this is a multi-path set.
94 ///
95 /// Initially the conflux set starts out as a single-path set with no join point.
96 /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
97 /// the join point is initialized to the last hop in the tunnel.
98 //
99 // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
100 // to ensure the HopNum of the join point is the same for all of them.
101 //
102 // In the future we might want to relax this restriction.
103 join_point: Option<JoinPoint>,
104 /// The nonce associated with the circuits from this set.
105 #[cfg(feature = "conflux")]
106 nonce: V1Nonce,
107 /// The desired UX
108 #[cfg(feature = "conflux")]
109 desired_ux: V1DesiredUx,
110 /// The absolute sequence number of the last cell delivered to a stream.
111 ///
112 /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
113 ///
114 /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
115 /// of the leg compares the (leg-local) sequence number of the message
116 /// with this sequence number to determine whether the message is in-order.
117 ///
118 /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
119 /// to deliver it to its corresponding stream.
120 ///
121 /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
122 /// to instruct the reactor to buffer the message.
123 last_seq_delivered: Arc<AtomicU64>,
124 /// Whether we have selected our initial primary leg,
125 /// if this is a multipath conflux set.
126 selected_init_primary: bool,
127}
128
129/// The conflux join point.
130#[derive(Clone, derive_more::Debug)]
131struct JoinPoint {
132 /// The hop number.
133 hop: HopNum,
134 /// The [`HopDetail`] of the hop.
135 detail: HopDetail,
136 /// The stream map of the joint point, shared with each circuit leg.
137 #[debug(skip)]
138 streams: Arc<Mutex<streammap::StreamMap>>,
139}
140
141impl ConfluxSet {
142 /// Create a new conflux set, consisting of a single leg.
143 ///
144 /// Returns the newly created set and a reference to its [`TunnelMutableState`].
145 pub(super) fn new(
146 tunnel_id: TunnelId,
147 circuit_leg: Circuit,
148 ) -> (Self, Arc<TunnelMutableState>) {
149 let primary_id = circuit_leg.unique_id();
150 let circ_mutable = Arc::clone(circuit_leg.mutable());
151 let legs = smallvec![circuit_leg];
152 // Note: the join point is only set for multi-path tunnels
153 let join_point = None;
154
155 // TODO(#2035): read this from the consensus/config.
156 #[cfg(feature = "conflux")]
157 let desired_ux = V1DesiredUx::NO_OPINION;
158
159 let mutable = Arc::new(TunnelMutableState::default());
160 mutable.insert(primary_id, circ_mutable);
161
162 let set = Self {
163 tunnel_id,
164 legs,
165 primary_id,
166 join_point,
167 mutable: mutable.clone(),
168 #[cfg(feature = "conflux")]
169 nonce: V1Nonce::new(&mut rand::rng()),
170 #[cfg(feature = "conflux")]
171 desired_ux,
172 last_seq_delivered: Arc::new(AtomicU64::new(0)),
173 selected_init_primary: false,
174 };
175
176 (set, mutable)
177 }
178
179 /// Remove and return the only leg of this conflux set.
180 ///
181 /// Returns an error if there is more than one leg in the set,
182 /// or if called before any circuit legs are available.
183 ///
184 /// Calling this function will empty the [`ConfluxSet`].
185 pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
186 let circ = self
187 .legs
188 .iter()
189 .exactly_one()
190 .map_err(NotSingleLegError::from)?;
191 let circ_id = circ.unique_id();
192
193 debug_assert!(circ_id == self.primary_id);
194
195 self.remove_unchecked(circ_id)
196 }
197
198 /// Return a reference to the only leg of this conflux set,
199 /// along with the leg's ID.
200 ///
201 /// Returns an error if there is more than one leg in the set,
202 /// or if called before any circuit legs are available.
203 pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
204 Ok(self.legs.iter().exactly_one()?)
205 }
206
207 /// Return a mutable reference to the only leg of this conflux set,
208 /// along with the leg's ID.
209 ///
210 /// Returns an error if there is more than one leg in the set,
211 /// or if called before any circuit legs are available.
212 pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
213 Ok(self.legs.iter_mut().exactly_one()?)
214 }
215
216 /// Return the primary leg of this conflux set.
217 ///
218 /// Returns an error if called before any circuit legs are available.
219 pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
220 #[cfg(not(feature = "conflux"))]
221 if self.legs.len() > 1 {
222 return Err(internal!(
223 "got multipath tunnel, but conflux feature is disabled?!"
224 ));
225 }
226
227 if self.legs.is_empty() {
228 Err(bad_api_usage!(
229 "tried to get circuit leg before creating it?!"
230 ))
231 } else {
232 let circ = self
233 .leg_mut(self.primary_id)
234 .ok_or_else(|| internal!("conflux set is empty?!"))?;
235
236 Ok(circ)
237 }
238 }
239
240 /// Return a reference to the leg of this conflux set with the given id.
241 pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
242 self.legs.iter().find(|circ| circ.unique_id() == leg_id)
243 }
244
245 /// Return a mutable reference to the leg of this conflux set with the given id.
246 pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
247 self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
248 }
249
250 /// Return the number of legs in this conflux set.
251 pub(super) fn len(&self) -> usize {
252 self.legs.len()
253 }
254
255 /// Return whether this conflux set is empty.
256 pub(super) fn is_empty(&self) -> bool {
257 self.legs.len() == 0
258 }
259
260 /// Remove the specified leg from this conflux set.
261 ///
262 /// Returns an error if the given leg doesn't exist in the set.
263 ///
264 /// Returns an error instructing the reactor to perform a clean shutdown
265 /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
266 ///
267 /// * the set is depleted (empty) after removing the specified leg
268 /// * `leg` is currently the sending (primary) leg of this set
269 /// * the closed leg had the highest non-zero last_seq_recv/sent
270 /// * the closed leg had some in-progress data (inflight > cc_sendme_inc)
271 ///
272 /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
273 ///
274 /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
275 pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
276 let circ = self.remove_unchecked(leg)?;
277
278 tracing::trace!(
279 circ_id = %circ.unique_id(),
280 "Circuit removed from conflux set"
281 );
282
283 self.mutable.remove(circ.unique_id());
284
285 if self.legs.is_empty() {
286 // TODO: log the tunnel ID
287 tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
288
289 // The last circuit in the set has just died, so the reactor should exit.
290 return Err(ReactorError::Shutdown);
291 }
292
293 if leg == self.primary_id {
294 // We have just removed our sending leg,
295 // so it's time to close the entire conflux set.
296 return Err(ReactorError::Shutdown);
297 }
298
299 cfg_if::cfg_if! {
300 if #[cfg(feature = "conflux")] {
301 self.remove_conflux(circ)
302 } else {
303 // Conflux is disabled, so we can't possibly continue running if the only
304 // leg in the tunnel is gone.
305 //
306 // Technically this should be unreachable (because of the is_empty()
307 // check above)
308 Err(internal!("Multiple legs in single-path tunnel?!").into())
309 }
310 }
311 }
312
313 /// Handle the removal of a circuit,
314 /// returning an error if the reactor needs to shut down.
315 #[cfg(feature = "conflux")]
316 fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
317 let Some(status) = circ.conflux_status() else {
318 return Err(internal!("Found non-conflux circuit in conflux set?!").into());
319 };
320
321 // TODO(conflux): should the circmgr be notified about the leg removal?
322 //
323 // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
324 // is closed, subject to the limits in [SIDE_CHANNELS]."
325
326 // If we've reached this point and the conflux set is non-empty,
327 // it means it's a multi-path set.
328 //
329 // Time to check if we need to tear down the entire set.
330 match status {
331 ConfluxStatus::Unlinked => {
332 // This circuit hasn't yet begun the conflux handshake,
333 // so we can safely remove it from the set
334 Ok(circ)
335 }
336 ConfluxStatus::Pending | ConfluxStatus::Linked => {
337 let (circ_last_seq_recv, circ_last_seq_sent) =
338 (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
339
340 // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
341 if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
342 if circ_last_seq_recv > max_last_seq_recv {
343 return Err(ReactorError::Shutdown);
344 }
345 }
346
347 if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
348 if circ_last_seq_sent > max_last_seq_sent {
349 return Err(ReactorError::Shutdown);
350 }
351 }
352
353 let hop = self.join_point_hop(&circ)?;
354
355 let (inflight, cwnd) = (|| {
356 let ccontrol = hop.ccontrol();
357 let inflight = ccontrol.inflight()?;
358 let cwnd = ccontrol.cwnd()?;
359
360 Some((inflight, cwnd))
361 })()
362 .ok_or_else(|| {
363 internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
364 })?;
365
366 // If data is in progress on the leg (inflight > cc_sendme_inc),
367 // then all legs must be closed
368 if inflight >= cwnd.params().sendme_inc() {
369 return Err(ReactorError::Shutdown);
370 }
371
372 Ok(circ)
373 }
374 }
375 }
376
377 /// Return the maximum relative last_seq_recv across all circuits.
378 #[cfg(feature = "conflux")]
379 fn max_last_seq_recv(&self) -> Option<u64> {
380 self.legs
381 .iter()
382 .filter_map(|leg| leg.last_seq_recv().ok())
383 .max()
384 }
385
386 /// Return the maximum relative last_seq_sent across all circuits.
387 #[cfg(feature = "conflux")]
388 fn max_last_seq_sent(&self) -> Option<u64> {
389 self.legs
390 .iter()
391 .filter_map(|leg| leg.last_seq_sent().ok())
392 .max()
393 }
394
395 /// Get the [`CircHop`] of the join point on the specified `circ`,
396 /// returning an error if this is a single path conflux set.
397 fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
398 let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
399 return Err(internal!("No join point on conflux tunnel?!"));
400 };
401
402 circ.hop(join_point)
403 .ok_or_else(|| internal!("Conflux join point disappeared?!"))
404 }
405
406 /// Return an iterator of all circuits in the conflux set.
407 fn circuits(&self) -> impl Iterator<Item = &Circuit> {
408 self.legs.iter()
409 }
410
411 /// Add legs to the this conflux set.
412 ///
413 /// Returns an error if any of the legs is invalid.
414 ///
415 /// A leg is considered valid if
416 ///
417 /// * the circuit has the same length as all the other circuits in the set
418 /// * its last hop is equal to the designated join point
419 /// * the circuit has no streams attached to any of its hops
420 /// * the circuit is not already part of a conflux set
421 ///
422 /// Note: the circuits will not begin linking until
423 /// [`link_circuits`](Self::link_circuits) is called.
424 ///
425 /// IMPORTANT: this function does not prevent the construction of conflux sets
426 /// where the circuit legs share guard or middle relays. It is the responsibility
427 /// of the caller to enforce the following invariant from prop354:
428 ///
429 /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
430 /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
431 /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
432 ///
433 /// This is because at this level we don't actually know which relays are the guards,
434 /// so we can't know if the join point happens to be one of the Guard + Exit relays.
435 #[cfg(feature = "conflux")]
436 pub(super) fn add_legs(
437 &mut self,
438 legs: Vec<Circuit>,
439 runtime: &tor_rtcompat::DynTimeProvider,
440 ) -> Result<(), Bug> {
441 if legs.is_empty() {
442 return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
443 }
444
445 let join_point = match self.join_point.take() {
446 Some(p) => {
447 // Preserve the existing join point, if there is one.
448 p
449 }
450 None => {
451 let (hop, detail, streams) = (|| {
452 let first_leg = self.circuits().next()?;
453 let first_leg_path = first_leg.path();
454 let all_hops = first_leg_path.all_hops();
455 let hop_num = first_leg.last_hop_num()?;
456 let detail = all_hops.last()?;
457 let hop = first_leg.hop(hop_num)?;
458 let streams = Arc::clone(hop.stream_map());
459 Some((hop_num, detail.clone(), streams))
460 })()
461 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
462
463 JoinPoint {
464 hop,
465 detail,
466 streams,
467 }
468 }
469 };
470
471 // Check two HopDetails for equality.
472 //
473 // Returns an error if one of the hops is virtual.
474 let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
475 match (h1, h2) {
476 (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => Ok(t1.same_relay_ids(t2)),
477 #[cfg(feature = "hs-common")]
478 (HopDetail::Virtual, HopDetail::Virtual) => {
479 // TODO(#2016): support onion service conflux
480 Err(internal!("onion service conflux not supported"))
481 }
482 _ => Ok(false),
483 }
484 };
485
486 // A leg is considered valid if
487 //
488 // * the circuit has the expected length
489 // (the length of the first circuit we added to the set)
490 // * its last hop is equal to the designated join point
491 // (the last hop of the first circuit we added)
492 // * the circuit has no streams attached to any of its hops
493 // * the circuit is not already part of a conflux tunnel
494 //
495 // Returns an error if any hops are virtual.
496 let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
497 use crate::ccparams::Algorithm;
498
499 let path = leg.path();
500 let Some(last_hop) = path.all_hops().last() else {
501 // A circuit with no hops is invalid
502 return Ok(false);
503 };
504
505 // TODO: this sort of duplicates the check above.
506 // The difference is that above we read the hop detail
507 // information from the circuit Path, whereas here we get
508 // the actual last CircHop of the circuit.
509 let Some(last_hop_num) = leg.last_hop_num() else {
510 // A circuit with no hops is invalid
511 return Ok(false);
512 };
513
514 let circhop = leg
515 .hop(last_hop_num)
516 .ok_or_else(|| internal!("hop disappeared?!"))?;
517
518 // Ensure we negotiated a suitable cc algorithm
519 let is_cc_suitable = match circhop.ccontrol().algorithm() {
520 Algorithm::FixedWindow(_) => false,
521 Algorithm::Vegas(_) => true,
522 };
523
524 if !is_cc_suitable {
525 return Ok(false);
526 }
527
528 Ok(last_hop_num == join_point.hop
529 && hops_eq(last_hop, &join_point.detail)?
530 && !leg.has_streams()
531 && leg.conflux_status().is_none())
532 };
533
534 for leg in &legs {
535 if !leg_is_valid(leg)? {
536 return Err(bad_api_usage!("one more more conflux circuits are invalid"));
537 }
538 }
539
540 // Select a join point, or put the existing one back into self.
541 self.join_point = Some(join_point.clone());
542
543 // The legs are valid, so add them to the set.
544 for circ in legs {
545 let mutable = Arc::clone(circ.mutable());
546 let unique_id = circ.unique_id();
547 self.legs.push(circ);
548 // Merge the mutable state of the circuit into our tunnel state.
549 self.mutable.insert(unique_id, mutable);
550 }
551
552 for circ in self.legs.iter_mut() {
553 // The circuits that have a None status don't know they're part of
554 // a multi-path tunnel yet. They need to be initialized with a
555 // conflux message handler, and have their join point fixed up
556 // to share a stream map with the join point on all the other circuits.
557 if circ.conflux_status().is_none() {
558 let conflux_handler = ConfluxMsgHandler::new_client(
559 join_point.hop,
560 self.nonce,
561 Arc::clone(&self.last_seq_delivered),
562 runtime.clone(),
563 );
564
565 circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
566
567 // Ensure the stream map of the last hop is shared by all the legs
568 let last_hop = circ
569 .hop_mut(join_point.hop)
570 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
571 last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
572 }
573 }
574
575 Ok(())
576 }
577
578 /// Try to update the primary leg based on the configured desired UX,
579 /// if needed.
580 ///
581 /// Returns the SWITCH cell to send on the primary leg,
582 /// if we switched primary leg.
583 #[cfg(feature = "conflux")]
584 pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
585 use tor_error::into_internal;
586
587 let Some(join_point) = self.join_point.as_ref() else {
588 // Return early if this is not a multi-path tunnel
589 return Ok(None);
590 };
591
592 let join_point = join_point.hop;
593
594 if !self.should_update_primary_leg() {
595 // Nothing to do
596 return Ok(None);
597 }
598
599 let Some(new_primary_id) = self.select_primary_leg()? else {
600 // None of the legs satisfy our UX requirements, continue using the existing one.
601 return Ok(None);
602 };
603
604 // Check that the newly selected leg is actually different from the previous
605 if self.primary_id == new_primary_id {
606 // The primary leg stays the same, nothing to do.
607 return Ok(None);
608 }
609
610 let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
611 self.primary_id = new_primary_id;
612 let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
613
614 // If this fails, it means we haven't updated our primary leg in a very long time.
615 //
616 // TODO(#2036): there are currently no safeguards to prevent us from staying
617 // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
618 // such that it forces us to switch legs periodically, to prevent the seqno delta from
619 // getting too big?
620 let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
621 into_internal!("Seqno delta for switch does not fit in u32?!"),
622 )?;
623
624 // We need to carry the last_seq_sent over to the next leg
625 // (the next cell sent will have seqno = prev_last_seq_sent + 1)
626 self.primary_leg_mut()?
627 .set_last_seq_sent(prev_last_seq_sent)?;
628
629 let switch = ConfluxSwitch::new(seqno_delta);
630 let cell = AnyRelayMsgOuter::new(None, switch.into());
631 Ok(Some(SendRelayCell {
632 hop: join_point,
633 early: false,
634 cell,
635 }))
636 }
637
638 /// Whether it's time to select a new primary leg.
639 #[cfg(feature = "conflux")]
640 fn should_update_primary_leg(&mut self) -> bool {
641 if !self.selected_init_primary {
642 self.maybe_select_init_primary();
643 return false;
644 }
645
646 // If we don't have at least 2 legs,
647 // we can't switch our primary leg.
648 if self.legs.len() < 2 {
649 return false;
650 }
651
652 // TODO(conflux-tuning): if it turns out we switch legs too frequently,
653 // we might want to implement some sort of rate-limiting here
654 // (see c-tor's conflux_can_switch).
655
656 true
657 }
658
659 /// Return the best leg according to the configured desired UX.
660 ///
661 /// Returns `None` if no suitable leg was found.
662 #[cfg(feature = "conflux")]
663 fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
664 match self.desired_ux {
665 V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
666 self.select_primary_leg_min_rtt(false)
667 }
668 V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
669 V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
670 // TODO(conflux-tuning): add support for low-memory algorithms
671 self.select_primary_leg_min_rtt(false)
672 }
673 _ => {
674 // Default to MIN_RTT if we don't recognize the desired UX value
675 warn!(
676 tunnel_id = %self.tunnel_id,
677 "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
678 self.desired_ux
679 );
680 self.select_primary_leg_min_rtt(false)
681 }
682 }
683 }
684
685 /// Try to choose an initial primary leg, if we have an initial RTT measurement
686 /// for at least one of the legs.
687 #[cfg(feature = "conflux")]
688 fn maybe_select_init_primary(&mut self) {
689 let best = self
690 .legs
691 .iter()
692 .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
693 .min_by_key(|(_leg, rtt)| *rtt)
694 .map(|(leg, _rtt)| leg.unique_id());
695
696 if let Some(best) = best {
697 self.primary_id = best;
698 self.selected_init_primary = true;
699 }
700 }
701
702 /// Return the leg with the best (lowest) RTT.
703 ///
704 /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
705 ///
706 /// Returns `None` if no suitable leg was found.
707 #[cfg(feature = "conflux")]
708 fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
709 let mut best = None;
710
711 for circ in self.legs.iter() {
712 let leg_id = circ.unique_id();
713 let join_point = self.join_point_hop(circ)?;
714 let ccontrol = join_point.ccontrol();
715
716 if check_can_send && !ccontrol.can_send() {
717 continue;
718 }
719
720 let rtt = ccontrol.rtt();
721 let init_rtt_usec = || {
722 circ.init_rtt()
723 .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
724 };
725
726 let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
727 return Err(internal!(
728 "attempted to select primary leg before handshake completed?!"
729 ));
730 };
731
732 match best.take() {
733 None => {
734 best = Some((leg_id, ewma_rtt));
735 }
736 Some(best_so_far) => {
737 if best_so_far.1 <= ewma_rtt {
738 best = Some(best_so_far);
739 } else {
740 best = Some((leg_id, ewma_rtt));
741 }
742 }
743 }
744 }
745
746 Ok(best.map(|(leg_id, _)| leg_id))
747 }
748
749 /// Returns `true` if our conflux join point is blocked on congestion control
750 /// on the specified `circuit`.
751 ///
752 /// Returns `false` if the join point is not blocked on cc,
753 /// or if this is a single-path set.
754 ///
755 /// Returns an error if this is a multipath tunnel,
756 /// but the joint point hop doesn't exist on the specified circuit.
757 #[cfg(feature = "conflux")]
758 fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
759 let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
760 internal!(
761 "Join point hop {} not found on circuit {}?!",
762 join_hop.display(),
763 circuit.unique_id(),
764 )
765 })?;
766
767 Ok(!join_circhop.ccontrol().can_send())
768 }
769
770 /// Returns the `HopNum` of the join point
771 /// if [`next_circ_action`](Self::next_circ_action)
772 /// should avoid polling the join point streams entirely.
773 #[cfg(feature = "conflux")]
774 fn should_skip_join_point(&self) -> Result<Option<HopNum>, Bug> {
775 let Some(primary_join_point) = self.primary_join_point() else {
776 // Single-path, there is no join point
777 return Ok(None);
778 };
779
780 let join_hop = primary_join_point.1;
781 let primary_blocked_on_cc = {
782 let primary = self
783 .leg(self.primary_id)
784 .ok_or_else(|| internal!("primary leg disappeared?!"))?;
785 Self::is_join_point_blocked_on_cc(join_hop, primary)?
786 };
787
788 if !primary_blocked_on_cc {
789 // Easy, we can just carry on
790 return Ok(None);
791 }
792
793 // Now, if the primary *is* blocked on cc, we may still be able to poll
794 // the join point streams (if we're using the right desired UX)
795 let exclude = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
796 // The primary leg is blocked on cc, and we can't switch because we're
797 // not using the high throughput algorithm, so we must stop reading
798 // the join point streams.
799 //
800 // Note: if the selected algorithm is HIGH_THROUGHPUT,
801 // it's okay to continue reading from the edge connection,
802 // because maybe_update_primary_leg() will select a new,
803 // non-blocked primary leg, just before sending.
804 trace!(
805 tunnel_id = %self.tunnel_id,
806 join_point = ?primary_join_point,
807 reason = "sending leg blocked on congestion control",
808 "Pausing join point stream reads"
809 );
810
811 Some(join_hop)
812 } else {
813 // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
814 // to an unblocked leg before sending any cells over the join point,
815 // as long as there are some unblocked legs.
816
817 // TODO: figure out how to rewrite this with an idiomatic iterator combinator
818 let mut all_blocked_on_cc = true;
819 for leg in &self.legs {
820 all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
821 if !all_blocked_on_cc {
822 break;
823 }
824 }
825
826 if all_blocked_on_cc {
827 // All legs are blocked on cc, so we must stop reading from
828 // the join point streams for now.
829 trace!(
830 tunnel_id = %self.tunnel_id,
831 join_point = ?primary_join_point,
832 reason = "all legs blocked on congestion control",
833 "Pausing join point stream reads"
834 );
835
836 Some(join_hop)
837 } else {
838 // At least one leg is not blocked, so we can continue reading
839 // from the join point streams
840 None
841 }
842 };
843
844 Ok(exclude)
845 }
846
847 /// Returns the next ready [`CircuitAction`],
848 /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
849 ///
850 /// Will return an error if there are no circuits in this set,
851 /// or other internal errors occur.
852 ///
853 /// This is cancellation-safe.
854 #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
855 pub(super) fn next_circ_action<'a>(
856 &'a mut self,
857 runtime: &'a tor_rtcompat::DynTimeProvider,
858 ) -> Result<impl Future<Output = Result<CircuitAction, crate::Error>> + 'a, Bug> {
859 // Avoid polling the streams on the join point if our primary
860 // leg is blocked on cc
861 cfg_if::cfg_if! {
862 if #[cfg(feature = "conflux")] {
863 let exclude_hop = self.should_skip_join_point()?;
864 } else {
865 let exclude_hop: Option<HopNum> = None;
866 }
867 };
868
869 Ok(self.legs
870 .iter_mut()
871 .map(|leg| {
872 let unique_id = leg.unique_id();
873 let tunnel_id = self.tunnel_id;
874
875 // The client SHOULD abandon and close circuit if the LINKED message takes too long to
876 // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
877 // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
878 // implementation currently uses Circuit Build Timeout).
879 let conflux_hs_timeout = if leg.conflux_status() == Some(ConfluxStatus::Pending) {
880 if let Some(timeout) = leg.conflux_hs_timeout() {
881 // TODO: ask Diziet if we can have a sleep_until_instant() function
882 Box::pin(runtime.sleep_until_wallclock(timeout))
883 as Pin<Box<dyn Future<Output = ()> + Send>>
884 } else {
885 Box::pin(std::future::pending())
886 }
887 } else {
888 Box::pin(std::future::pending())
889 };
890
891 let mut ready_streams = leg.ready_streams_iterator(exclude_hop);
892 let input = &mut leg.input;
893 // TODO: we don't really need prepare_send_from here
894 // because the inner select_biased! is cancel-safe.
895 // We should replace this with a simple sink readiness check
896 let send_fut = leg.chan_sender.prepare_send_from(async move {
897 // A future to wait for the next ready stream.
898 let next_ready_stream = async {
899 match ready_streams.next().await {
900 Some(x) => x,
901 None => {
902 info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
903 // There are no ready streams (for example, they may all be
904 // blocked due to congestion control), so there is nothing
905 // to do.
906 // We await an infinitely pending future so that we don't
907 // immediately return a `None` in the `select_biased!` below.
908 // We'd rather wait on `input.next()` than immediately return with
909 // no `CircuitAction`, which could put the reactor into a spin loop.
910 let () = std::future::pending().await;
911 unreachable!();
912 }
913 }
914 };
915
916 // NOTE: the stream returned by this function is polled in the select_biased!
917 // from Reactor::run_once(), so each block from *this* select_biased! must be
918 // cancellation-safe
919 select_biased! {
920 // Check whether we've got an input message pending.
921 ret = input.next().fuse() => {
922 let Some(cell) = ret else {
923 return Ok(CircuitAction::RemoveLeg {
924 leg: unique_id,
925 reason: RemoveLegReason::ChannelClosed,
926 });
927 };
928
929 Ok(CircuitAction::HandleCell { leg: unique_id, cell })
930 },
931 ret = next_ready_stream.fuse() => {
932 let ret = ret.map(|cmd| {
933 Ok(CircuitAction::RunCmd { leg: unique_id, cmd })
934 });
935
936 flatten(ret)
937 },
938 }
939 });
940
941 let mut send_fut = Box::pin(send_fut);
942
943 async move {
944 select_biased! {
945 () = conflux_hs_timeout.fuse() => {
946 warn!(
947 tunnel_id = %tunnel_id,
948 circ_id = %unique_id,
949 "Conflux handshake timed out on circuit"
950 );
951
952 // Conflux handshake has timed out, time to remove this circuit leg,
953 // and notify the handshake initiator.
954 Ok(Ok(CircuitAction::RemoveLeg {
955 leg: unique_id,
956 reason: RemoveLegReason::ConfluxHandshakeTimeout,
957 }))
958 }
959 ret = send_fut => {
960 // Note: We don't actually use the returned SinkSendable,
961 // and continue writing to the SometimesUboundedSink in the reactor :(
962 ret.map(|ret| ret.0)
963 }
964 }
965 }
966 })
967 .collect::<FuturesUnordered<_>>()
968 // We only return the first ready action as a Future.
969 // Can't use `next()` since it borrows the stream.
970 .into_future()
971 .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
972 // Clean up the nested `Result`s before returning to the caller.
973 .map(|res| flatten(flatten(res))))
974 }
975
976 /// The join point on the current primary leg.
977 pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
978 self.join_point
979 .as_ref()
980 .map(|join_point| (self.primary_id, join_point.hop))
981 }
982
983 /// Does congestion control use stream SENDMEs for the given hop?
984 ///
985 /// Returns `None` if either the `leg` or `hop` don't exist.
986 pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
987 self.leg(leg)?.uses_stream_sendme(hop)
988 }
989
990 /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
991 ///
992 /// See [`Circuit::send_relay_cell`].
993 pub(super) async fn send_relay_cell_on_leg(
994 &mut self,
995 msg: SendRelayCell,
996 leg: Option<UniqId>,
997 ) -> crate::Result<()> {
998 let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
999 let leg = if let Some(join_point) = conflux_join_point {
1000 // Conflux circuits always send multiplexed relay commands to
1001 // to the last hop (the join point).
1002 if cmd_counts_towards_seqno(msg.cell.cmd()) {
1003 if msg.hop != join_point {
1004 // For leaky pipe, we must continue using the original leg
1005 leg
1006 } else {
1007 let old_primary_leg = self.primary_id;
1008 // Check if it's time to switch our primary leg.
1009 #[cfg(feature = "conflux")]
1010 if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1011 trace!(
1012 old = ?old_primary_leg,
1013 new = ?self.primary_id,
1014 "Switching primary conflux leg..."
1015 );
1016
1017 self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1018 }
1019
1020 // Use the possibly updated primary leg
1021 Some(self.primary_id)
1022 }
1023 } else {
1024 // Non-multiplexed commands go on their original
1025 // circuit and hop
1026 leg
1027 }
1028 } else {
1029 // If there is no join point, it means this is not
1030 // a multi-path tunnel, so we continue using
1031 // the leg_id/hop the cmd came from.
1032 leg
1033 };
1034
1035 let leg = leg.unwrap_or(self.primary_id);
1036
1037 let circ = self
1038 .leg_mut(leg)
1039 .ok_or_else(|| internal!("leg disappeared?!"))?;
1040
1041 circ.send_relay_cell(msg).await
1042 }
1043
1044 /// Send a LINK cell down each unlinked leg.
1045 #[cfg(feature = "conflux")]
1046 pub(super) async fn link_circuits(
1047 &mut self,
1048 runtime: &tor_rtcompat::DynTimeProvider,
1049 ) -> crate::Result<()> {
1050 let (_leg_id, join_point) = self
1051 .primary_join_point()
1052 .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1053
1054 // Link all the circuits that haven't started the conflux handshake yet.
1055 for circ in self
1056 .legs
1057 .iter_mut()
1058 // TODO: it is an internal error if any of the legs don't have a conflux handler
1059 // (i.e. if conflux_status() returns None)
1060 .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1061 {
1062 let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1063 let link = ConfluxLink::new(v1_payload);
1064 let cell = AnyRelayMsgOuter::new(None, link.into());
1065
1066 circ.begin_conflux_link(join_point, cell, runtime).await?;
1067 }
1068
1069 // TODO(conflux): the caller should take care to not allow opening streams
1070 // until the conflux set is ready (i.e. until at least one of the legs completes
1071 // the handshake).
1072 //
1073 // We will probably need a channel for notifying the caller
1074 // of handshake completion/conflux set readiness
1075
1076 Ok(())
1077 }
1078
1079 /// Get the number of unlinked or non-conflux legs.
1080 #[cfg(feature = "conflux")]
1081 pub(super) fn num_unlinked(&self) -> usize {
1082 self.circuits()
1083 .filter(|circ| {
1084 let status = circ.conflux_status();
1085 status.is_none() || status == Some(ConfluxStatus::Unlinked)
1086 })
1087 .count()
1088 }
1089
1090 /// Check if the specified sequence number is the sequence number of the
1091 /// next message we're expecting to handle.
1092 pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1093 let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1094 seq_recv == last_seq_delivered + 1
1095 }
1096
1097 /// Remove the circuit leg with the specified `UniqId` from this conflux set.
1098 ///
1099 /// Unlike [`ConfluxSet::remove`], this function does not check
1100 /// if the removal of the leg ought to trigger a reactor shutdown.
1101 ///
1102 /// Returns an error if the leg doesn't exit in the conflux set.
1103 fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
1104 let idx = self
1105 .legs
1106 .iter()
1107 .position(|circ| circ.unique_id() == circ_id)
1108 .ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
1109
1110 Ok(self.legs.remove(idx))
1111 }
1112}
1113
1114/// An error returned when a method is expecting a single-leg conflux circuit,
1115/// but it is not single-leg.
1116#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
1117pub(super) struct NotSingleLegError(#[source] Bug);
1118
1119impl From<NotSingleLegError> for Bug {
1120 fn from(e: NotSingleLegError) -> Self {
1121 e.0
1122 }
1123}
1124
1125impl From<NotSingleLegError> for crate::Error {
1126 fn from(e: NotSingleLegError) -> Self {
1127 Self::from(e.0)
1128 }
1129}
1130
1131impl From<NotSingleLegError> for ReactorError {
1132 fn from(e: NotSingleLegError) -> Self {
1133 Self::from(e.0)
1134 }
1135}
1136
1137impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
1138 fn from(e: ExactlyOneError<I>) -> Self {
1139 // TODO: cannot wrap the ExactlyOneError with into_bad_api_usage
1140 // because it's not Send + Sync
1141 Self(bad_api_usage!("not a single leg conflux set ({e})"))
1142 }
1143}
1144
1145/// Whether the specified `cmd` counts towards the conflux sequence numbers.
1146fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
1147 // Note: copy-pasted from c-tor
1148 match cmd {
1149 // These are all fine to multiplex, and must be so that ordering is preserved
1150 RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
1151
1152 // We can't multiplex these because they are circuit-specific
1153 RelayCmd::SENDME
1154 | RelayCmd::EXTEND
1155 | RelayCmd::EXTENDED
1156 | RelayCmd::TRUNCATE
1157 | RelayCmd::TRUNCATED
1158 | RelayCmd::DROP => false,
1159
1160 // We must multiplex RESOLVEs because their ordering impacts begin/end.
1161 RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
1162
1163 // These are all circuit-specific
1164 RelayCmd::BEGIN_DIR
1165 | RelayCmd::EXTEND2
1166 | RelayCmd::EXTENDED2
1167 | RelayCmd::ESTABLISH_INTRO
1168 | RelayCmd::ESTABLISH_RENDEZVOUS
1169 | RelayCmd::INTRODUCE1
1170 | RelayCmd::INTRODUCE2
1171 | RelayCmd::RENDEZVOUS1
1172 | RelayCmd::RENDEZVOUS2
1173 | RelayCmd::INTRO_ESTABLISHED
1174 | RelayCmd::RENDEZVOUS_ESTABLISHED
1175 | RelayCmd::INTRODUCE_ACK
1176 | RelayCmd::PADDING_NEGOTIATE
1177 | RelayCmd::PADDING_NEGOTIATED => false,
1178
1179 // Flow control cells must be ordered (see prop 329).
1180 RelayCmd::XOFF | RelayCmd::XON => true,
1181
1182 // These two are not multiplexed, because they must be processed immediately
1183 // to update sequence numbers before any other cells are processed on the circuit
1184 RelayCmd::CONFLUX_SWITCH
1185 | RelayCmd::CONFLUX_LINK
1186 | RelayCmd::CONFLUX_LINKED
1187 | RelayCmd::CONFLUX_LINKED_ACK => false,
1188
1189 _ => {
1190 tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1191 false
1192 }
1193 }
1194}
1195
1196#[cfg(test)]
1197mod test {
1198 // Tested in [`crate::tunnel::circuit::test`].
1199}