tor_proto/tunnel/reactor/
conflux.rs

1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::{Arc, Mutex};
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use itertools::structs::ExactlyOneError;
14use itertools::Itertools;
15use smallvec::{smallvec, SmallVec};
16use tor_rtcompat::SleepProviderExt as _;
17use tracing::{info, trace, warn};
18
19use tor_async_utils::SinkPrepareExt as _;
20use tor_basic_utils::flatten;
21use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
22use tor_error::{bad_api_usage, internal, Bug};
23use tor_linkspec::HasRelayIds as _;
24
25use crate::circuit::path::HopDetail;
26use crate::circuit::{TunnelMutableState, UniqId};
27use crate::crypto::cell::HopNum;
28use crate::tunnel::reactor::circuit::ConfluxStatus;
29use crate::tunnel::{streammap, TunnelId};
30use crate::util::err::ReactorError;
31
32use super::circuit::CircHop;
33use super::{Circuit, CircuitAction, RemoveLegReason, SendRelayCell};
34
35#[cfg(feature = "conflux")]
36use {
37    tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
38    tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
39};
40
41#[cfg(feature = "conflux")]
42pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
43
44/// The maximum number of conflux legs to store in the conflux set SmallVec.
45///
46/// Attempting to store more legs will cause the SmallVec to spill to the heap.
47///
48/// Note: this value was picked arbitrarily and may not be suitable.
49const MAX_CONFLUX_LEGS: usize = 16;
50
51/// A set with one or more circuits.
52///
53/// ### Conflux set life cycle
54///
55/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
56///
57/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
58///
59/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
60/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
61///
62/// The reactor can then turn the `ConfluxSet` into a multi-path set
63/// (a multi-path set is a conflux set that contains more than 1 circuit).
64/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
65/// by the reactor user (also referred to as the "conflux handshake initiator").
66/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
67///
68/// Circuits can be removed from the set using [`ConfluxSet::remove`].
69///
70/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
71/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
72/// This can happen on an explicit shutdown request, or if a fatal error occurs.
73///
74/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
75/// For example, if after being instructed to remove a circuit from the set
76/// using [`ConfluxSet::remove`], the set is completely depleted,
77/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
78/// which will cause the reactor to shut down.
79pub(super) struct ConfluxSet {
80    /// The unique identifier of the tunnel this conflux set belongs to.
81    ///
82    /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
83    /// that gets used for logging purposes.
84    tunnel_id: TunnelId,
85    /// The circuits in this conflux set.
86    legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
87    /// Tunnel state, shared with `ClientCirc`.
88    ///
89    /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
90    mutable: Arc<TunnelMutableState>,
91    /// The unique identifier of the primary leg
92    primary_id: UniqId,
93    /// The join point of the set, if this is a multi-path set.
94    ///
95    /// Initially the conflux set starts out as a single-path set with no join point.
96    /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
97    /// the join point is initialized to the last hop in the tunnel.
98    //
99    // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
100    // to ensure the HopNum of the join point is the same for all of them.
101    //
102    // In the future we might want to relax this restriction.
103    join_point: Option<JoinPoint>,
104    /// The nonce associated with the circuits from this set.
105    #[cfg(feature = "conflux")]
106    nonce: V1Nonce,
107    /// The desired UX
108    #[cfg(feature = "conflux")]
109    desired_ux: V1DesiredUx,
110    /// The absolute sequence number of the last cell delivered to a stream.
111    ///
112    /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
113    ///
114    /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
115    /// of the leg compares the (leg-local) sequence number of the message
116    /// with this sequence number to determine whether the message is in-order.
117    ///
118    /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
119    /// to deliver it to its corresponding stream.
120    ///
121    /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
122    /// to instruct the reactor to buffer the message.
123    last_seq_delivered: Arc<AtomicU64>,
124    /// Whether we have selected our initial primary leg,
125    /// if this is a multipath conflux set.
126    selected_init_primary: bool,
127}
128
129/// The conflux join point.
130#[derive(Clone, derive_more::Debug)]
131struct JoinPoint {
132    /// The hop number.
133    hop: HopNum,
134    /// The [`HopDetail`] of the hop.
135    detail: HopDetail,
136    /// The stream map of the joint point, shared with each circuit leg.
137    #[debug(skip)]
138    streams: Arc<Mutex<streammap::StreamMap>>,
139}
140
141impl ConfluxSet {
142    /// Create a new conflux set, consisting of a single leg.
143    ///
144    /// Returns the newly created set and a reference to its [`TunnelMutableState`].
145    pub(super) fn new(
146        tunnel_id: TunnelId,
147        circuit_leg: Circuit,
148    ) -> (Self, Arc<TunnelMutableState>) {
149        let primary_id = circuit_leg.unique_id();
150        let circ_mutable = Arc::clone(circuit_leg.mutable());
151        let legs = smallvec![circuit_leg];
152        // Note: the join point is only set for multi-path tunnels
153        let join_point = None;
154
155        // TODO(#2035): read this from the consensus/config.
156        #[cfg(feature = "conflux")]
157        let desired_ux = V1DesiredUx::NO_OPINION;
158
159        let mutable = Arc::new(TunnelMutableState::default());
160        mutable.insert(primary_id, circ_mutable);
161
162        let set = Self {
163            tunnel_id,
164            legs,
165            primary_id,
166            join_point,
167            mutable: mutable.clone(),
168            #[cfg(feature = "conflux")]
169            nonce: V1Nonce::new(&mut rand::rng()),
170            #[cfg(feature = "conflux")]
171            desired_ux,
172            last_seq_delivered: Arc::new(AtomicU64::new(0)),
173            selected_init_primary: false,
174        };
175
176        (set, mutable)
177    }
178
179    /// Remove and return the only leg of this conflux set.
180    ///
181    /// Returns an error if there is more than one leg in the set,
182    /// or if called before any circuit legs are available.
183    ///
184    /// Calling this function will empty the [`ConfluxSet`].
185    pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
186        let circ = self
187            .legs
188            .iter()
189            .exactly_one()
190            .map_err(NotSingleLegError::from)?;
191        let circ_id = circ.unique_id();
192
193        debug_assert!(circ_id == self.primary_id);
194
195        self.remove_unchecked(circ_id)
196    }
197
198    /// Return a reference to the only leg of this conflux set,
199    /// along with the leg's ID.
200    ///
201    /// Returns an error if there is more than one leg in the set,
202    /// or if called before any circuit legs are available.
203    pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
204        Ok(self.legs.iter().exactly_one()?)
205    }
206
207    /// Return a mutable reference to the only leg of this conflux set,
208    /// along with the leg's ID.
209    ///
210    /// Returns an error if there is more than one leg in the set,
211    /// or if called before any circuit legs are available.
212    pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
213        Ok(self.legs.iter_mut().exactly_one()?)
214    }
215
216    /// Return the primary leg of this conflux set.
217    ///
218    /// Returns an error if called before any circuit legs are available.
219    pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
220        #[cfg(not(feature = "conflux"))]
221        if self.legs.len() > 1 {
222            return Err(internal!(
223                "got multipath tunnel, but conflux feature is disabled?!"
224            ));
225        }
226
227        if self.legs.is_empty() {
228            Err(bad_api_usage!(
229                "tried to get circuit leg before creating it?!"
230            ))
231        } else {
232            let circ = self
233                .leg_mut(self.primary_id)
234                .ok_or_else(|| internal!("conflux set is empty?!"))?;
235
236            Ok(circ)
237        }
238    }
239
240    /// Return a reference to the leg of this conflux set with the given id.
241    pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
242        self.legs.iter().find(|circ| circ.unique_id() == leg_id)
243    }
244
245    /// Return a mutable reference to the leg of this conflux set with the given id.
246    pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
247        self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
248    }
249
250    /// Return the number of legs in this conflux set.
251    pub(super) fn len(&self) -> usize {
252        self.legs.len()
253    }
254
255    /// Return whether this conflux set is empty.
256    pub(super) fn is_empty(&self) -> bool {
257        self.legs.len() == 0
258    }
259
260    /// Remove the specified leg from this conflux set.
261    ///
262    /// Returns an error if the given leg doesn't exist in the set.
263    ///
264    /// Returns an error instructing the reactor to perform a clean shutdown
265    /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
266    ///
267    ///   * the set is depleted (empty) after removing the specified leg
268    ///   * `leg` is currently the sending (primary) leg of this set
269    ///   * the closed leg had the highest non-zero last_seq_recv/sent
270    ///   * the closed leg had some in-progress data (inflight > cc_sendme_inc)
271    ///
272    /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
273    ///
274    /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
275    pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
276        let circ = self.remove_unchecked(leg)?;
277
278        tracing::trace!(
279            circ_id = %circ.unique_id(),
280            "Circuit removed from conflux set"
281        );
282
283        self.mutable.remove(circ.unique_id());
284
285        if self.legs.is_empty() {
286            // TODO: log the tunnel ID
287            tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
288
289            // The last circuit in the set has just died, so the reactor should exit.
290            return Err(ReactorError::Shutdown);
291        }
292
293        if leg == self.primary_id {
294            // We have just removed our sending leg,
295            // so it's time to close the entire conflux set.
296            return Err(ReactorError::Shutdown);
297        }
298
299        cfg_if::cfg_if! {
300            if #[cfg(feature = "conflux")] {
301                self.remove_conflux(circ)
302            } else {
303                // Conflux is disabled, so we can't possibly continue running if the only
304                // leg in the tunnel is gone.
305                //
306                // Technically this should be unreachable (because of the is_empty()
307                // check above)
308                Err(internal!("Multiple legs in single-path tunnel?!").into())
309            }
310        }
311    }
312
313    /// Handle the removal of a circuit,
314    /// returning an error if the reactor needs to shut down.
315    #[cfg(feature = "conflux")]
316    fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
317        let Some(status) = circ.conflux_status() else {
318            return Err(internal!("Found non-conflux circuit in conflux set?!").into());
319        };
320
321        // TODO(conflux): should the circmgr be notified about the leg removal?
322        //
323        // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
324        // is closed, subject to the limits in [SIDE_CHANNELS]."
325
326        // If we've reached this point and the conflux set is non-empty,
327        // it means it's a multi-path set.
328        //
329        // Time to check if we need to tear down the entire set.
330        match status {
331            ConfluxStatus::Unlinked => {
332                // This circuit hasn't yet begun the conflux handshake,
333                // so we can safely remove it from the set
334                Ok(circ)
335            }
336            ConfluxStatus::Pending | ConfluxStatus::Linked => {
337                let (circ_last_seq_recv, circ_last_seq_sent) =
338                    (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
339
340                // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
341                if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
342                    if circ_last_seq_recv > max_last_seq_recv {
343                        return Err(ReactorError::Shutdown);
344                    }
345                }
346
347                if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
348                    if circ_last_seq_sent > max_last_seq_sent {
349                        return Err(ReactorError::Shutdown);
350                    }
351                }
352
353                let hop = self.join_point_hop(&circ)?;
354
355                let (inflight, cwnd) = (|| {
356                    let ccontrol = hop.ccontrol();
357                    let inflight = ccontrol.inflight()?;
358                    let cwnd = ccontrol.cwnd()?;
359
360                    Some((inflight, cwnd))
361                })()
362                .ok_or_else(|| {
363                    internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
364                })?;
365
366                // If data is in progress on the leg (inflight > cc_sendme_inc),
367                // then all legs must be closed
368                if inflight >= cwnd.params().sendme_inc() {
369                    return Err(ReactorError::Shutdown);
370                }
371
372                Ok(circ)
373            }
374        }
375    }
376
377    /// Return the maximum relative last_seq_recv across all circuits.
378    #[cfg(feature = "conflux")]
379    fn max_last_seq_recv(&self) -> Option<u64> {
380        self.legs
381            .iter()
382            .filter_map(|leg| leg.last_seq_recv().ok())
383            .max()
384    }
385
386    /// Return the maximum relative last_seq_sent across all circuits.
387    #[cfg(feature = "conflux")]
388    fn max_last_seq_sent(&self) -> Option<u64> {
389        self.legs
390            .iter()
391            .filter_map(|leg| leg.last_seq_sent().ok())
392            .max()
393    }
394
395    /// Get the [`CircHop`] of the join point on the specified `circ`,
396    /// returning an error if this is a single path conflux set.
397    fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
398        let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
399            return Err(internal!("No join point on conflux tunnel?!"));
400        };
401
402        circ.hop(join_point)
403            .ok_or_else(|| internal!("Conflux join point disappeared?!"))
404    }
405
406    /// Return an iterator of all circuits in the conflux set.
407    fn circuits(&self) -> impl Iterator<Item = &Circuit> {
408        self.legs.iter()
409    }
410
411    /// Add legs to the this conflux set.
412    ///
413    /// Returns an error if any of the legs is invalid.
414    ///
415    /// A leg is considered valid if
416    ///
417    ///   * the circuit has the same length as all the other circuits in the set
418    ///   * its last hop is equal to the designated join point
419    ///   * the circuit has no streams attached to any of its hops
420    ///   * the circuit is not already part of a conflux set
421    ///
422    /// Note: the circuits will not begin linking until
423    /// [`link_circuits`](Self::link_circuits) is called.
424    ///
425    /// IMPORTANT: this function does not prevent the construction of conflux sets
426    /// where the circuit legs share guard or middle relays. It is the responsibility
427    /// of the caller to enforce the following invariant from prop354:
428    ///
429    /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
430    /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
431    /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
432    ///
433    /// This is because at this level we don't actually know which relays are the guards,
434    /// so we can't know if the join point happens to be one of the Guard + Exit relays.
435    #[cfg(feature = "conflux")]
436    pub(super) fn add_legs(
437        &mut self,
438        legs: Vec<Circuit>,
439        runtime: &tor_rtcompat::DynTimeProvider,
440    ) -> Result<(), Bug> {
441        if legs.is_empty() {
442            return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
443        }
444
445        let join_point = match self.join_point.take() {
446            Some(p) => {
447                // Preserve the existing join point, if there is one.
448                p
449            }
450            None => {
451                let (hop, detail, streams) = (|| {
452                    let first_leg = self.circuits().next()?;
453                    let first_leg_path = first_leg.path();
454                    let all_hops = first_leg_path.all_hops();
455                    let hop_num = first_leg.last_hop_num()?;
456                    let detail = all_hops.last()?;
457                    let hop = first_leg.hop(hop_num)?;
458                    let streams = Arc::clone(hop.stream_map());
459                    Some((hop_num, detail.clone(), streams))
460                })()
461                .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
462
463                JoinPoint {
464                    hop,
465                    detail,
466                    streams,
467                }
468            }
469        };
470
471        // Check two HopDetails for equality.
472        //
473        // Returns an error if one of the hops is virtual.
474        let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
475            match (h1, h2) {
476                (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => Ok(t1.same_relay_ids(t2)),
477                #[cfg(feature = "hs-common")]
478                (HopDetail::Virtual, HopDetail::Virtual) => {
479                    // TODO(#2016): support onion service conflux
480                    Err(internal!("onion service conflux not supported"))
481                }
482                _ => Ok(false),
483            }
484        };
485
486        // A leg is considered valid if
487        //
488        //   * the circuit has the expected length
489        //     (the length of the first circuit we added to the set)
490        //   * its last hop is equal to the designated join point
491        //     (the last hop of the first circuit we added)
492        //   * the circuit has no streams attached to any of its hops
493        //   * the circuit is not already part of a conflux tunnel
494        //
495        // Returns an error if any hops are virtual.
496        let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
497            use crate::ccparams::Algorithm;
498
499            let path = leg.path();
500            let Some(last_hop) = path.all_hops().last() else {
501                // A circuit with no hops is invalid
502                return Ok(false);
503            };
504
505            // TODO: this sort of duplicates the check above.
506            // The difference is that above we read the hop detail
507            // information from the circuit Path, whereas here we get
508            // the actual last CircHop of the circuit.
509            let Some(last_hop_num) = leg.last_hop_num() else {
510                // A circuit with no hops is invalid
511                return Ok(false);
512            };
513
514            let circhop = leg
515                .hop(last_hop_num)
516                .ok_or_else(|| internal!("hop disappeared?!"))?;
517
518            // Ensure we negotiated a suitable cc algorithm
519            let is_cc_suitable = match circhop.ccontrol().algorithm() {
520                Algorithm::FixedWindow(_) => false,
521                Algorithm::Vegas(_) => true,
522            };
523
524            if !is_cc_suitable {
525                return Ok(false);
526            }
527
528            Ok(last_hop_num == join_point.hop
529                && hops_eq(last_hop, &join_point.detail)?
530                && !leg.has_streams()
531                && leg.conflux_status().is_none())
532        };
533
534        for leg in &legs {
535            if !leg_is_valid(leg)? {
536                return Err(bad_api_usage!("one more more conflux circuits are invalid"));
537            }
538        }
539
540        // Select a join point, or put the existing one back into self.
541        self.join_point = Some(join_point.clone());
542
543        // The legs are valid, so add them to the set.
544        for circ in legs {
545            let mutable = Arc::clone(circ.mutable());
546            let unique_id = circ.unique_id();
547            self.legs.push(circ);
548            // Merge the mutable state of the circuit into our tunnel state.
549            self.mutable.insert(unique_id, mutable);
550        }
551
552        for circ in self.legs.iter_mut() {
553            // The circuits that have a None status don't know they're part of
554            // a multi-path tunnel yet. They need to be initialized with a
555            // conflux message handler, and have their join point fixed up
556            // to share a stream map with the join point on all the other circuits.
557            if circ.conflux_status().is_none() {
558                let conflux_handler = ConfluxMsgHandler::new_client(
559                    join_point.hop,
560                    self.nonce,
561                    Arc::clone(&self.last_seq_delivered),
562                    runtime.clone(),
563                );
564
565                circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
566
567                // Ensure the stream map of the last hop is shared by all the legs
568                let last_hop = circ
569                    .hop_mut(join_point.hop)
570                    .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
571                last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
572            }
573        }
574
575        Ok(())
576    }
577
578    /// Try to update the primary leg based on the configured desired UX,
579    /// if needed.
580    ///
581    /// Returns the SWITCH cell to send on the primary leg,
582    /// if we switched primary leg.
583    #[cfg(feature = "conflux")]
584    pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
585        use tor_error::into_internal;
586
587        let Some(join_point) = self.join_point.as_ref() else {
588            // Return early if this is not a multi-path tunnel
589            return Ok(None);
590        };
591
592        let join_point = join_point.hop;
593
594        if !self.should_update_primary_leg() {
595            // Nothing to do
596            return Ok(None);
597        }
598
599        let Some(new_primary_id) = self.select_primary_leg()? else {
600            // None of the legs satisfy our UX requirements, continue using the existing one.
601            return Ok(None);
602        };
603
604        // Check that the newly selected leg is actually different from the previous
605        if self.primary_id == new_primary_id {
606            // The primary leg stays the same, nothing to do.
607            return Ok(None);
608        }
609
610        let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
611        self.primary_id = new_primary_id;
612        let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
613
614        // If this fails, it means we haven't updated our primary leg in a very long time.
615        //
616        // TODO(#2036): there are currently no safeguards to prevent us from staying
617        // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
618        // such that it forces us to switch legs periodically, to prevent the seqno delta from
619        // getting too big?
620        let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
621            into_internal!("Seqno delta for switch does not fit in u32?!"),
622        )?;
623
624        // We need to carry the last_seq_sent over to the next leg
625        // (the next cell sent will have seqno = prev_last_seq_sent + 1)
626        self.primary_leg_mut()?
627            .set_last_seq_sent(prev_last_seq_sent)?;
628
629        let switch = ConfluxSwitch::new(seqno_delta);
630        let cell = AnyRelayMsgOuter::new(None, switch.into());
631        Ok(Some(SendRelayCell {
632            hop: join_point,
633            early: false,
634            cell,
635        }))
636    }
637
638    /// Whether it's time to select a new primary leg.
639    #[cfg(feature = "conflux")]
640    fn should_update_primary_leg(&mut self) -> bool {
641        if !self.selected_init_primary {
642            self.maybe_select_init_primary();
643            return false;
644        }
645
646        // If we don't have at least 2 legs,
647        // we can't switch our primary leg.
648        if self.legs.len() < 2 {
649            return false;
650        }
651
652        // TODO(conflux-tuning): if it turns out we switch legs too frequently,
653        // we might want to implement some sort of rate-limiting here
654        // (see c-tor's conflux_can_switch).
655
656        true
657    }
658
659    /// Return the best leg according to the configured desired UX.
660    ///
661    /// Returns `None` if no suitable leg was found.
662    #[cfg(feature = "conflux")]
663    fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
664        match self.desired_ux {
665            V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
666                self.select_primary_leg_min_rtt(false)
667            }
668            V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
669            V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
670                // TODO(conflux-tuning): add support for low-memory algorithms
671                self.select_primary_leg_min_rtt(false)
672            }
673            _ => {
674                // Default to MIN_RTT if we don't recognize the desired UX value
675                warn!(
676                    tunnel_id = %self.tunnel_id,
677                    "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
678                    self.desired_ux
679                );
680                self.select_primary_leg_min_rtt(false)
681            }
682        }
683    }
684
685    /// Try to choose an initial primary leg, if we have an initial RTT measurement
686    /// for at least one of the legs.
687    #[cfg(feature = "conflux")]
688    fn maybe_select_init_primary(&mut self) {
689        let best = self
690            .legs
691            .iter()
692            .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
693            .min_by_key(|(_leg, rtt)| *rtt)
694            .map(|(leg, _rtt)| leg.unique_id());
695
696        if let Some(best) = best {
697            self.primary_id = best;
698            self.selected_init_primary = true;
699        }
700    }
701
702    /// Return the leg with the best (lowest) RTT.
703    ///
704    /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
705    ///
706    /// Returns `None` if no suitable leg was found.
707    #[cfg(feature = "conflux")]
708    fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
709        let mut best = None;
710
711        for circ in self.legs.iter() {
712            let leg_id = circ.unique_id();
713            let join_point = self.join_point_hop(circ)?;
714            let ccontrol = join_point.ccontrol();
715
716            if check_can_send && !ccontrol.can_send() {
717                continue;
718            }
719
720            let rtt = ccontrol.rtt();
721            let init_rtt_usec = || {
722                circ.init_rtt()
723                    .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
724            };
725
726            let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
727                return Err(internal!(
728                    "attempted to select primary leg before handshake completed?!"
729                ));
730            };
731
732            match best.take() {
733                None => {
734                    best = Some((leg_id, ewma_rtt));
735                }
736                Some(best_so_far) => {
737                    if best_so_far.1 <= ewma_rtt {
738                        best = Some(best_so_far);
739                    } else {
740                        best = Some((leg_id, ewma_rtt));
741                    }
742                }
743            }
744        }
745
746        Ok(best.map(|(leg_id, _)| leg_id))
747    }
748
749    /// Returns `true` if our conflux join point is blocked on congestion control
750    /// on the specified `circuit`.
751    ///
752    /// Returns `false` if the join point is not blocked on cc,
753    /// or if this is a single-path set.
754    ///
755    /// Returns an error if this is a multipath tunnel,
756    /// but the joint point hop doesn't exist on the specified circuit.
757    #[cfg(feature = "conflux")]
758    fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
759        let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
760            internal!(
761                "Join point hop {} not found on circuit {}?!",
762                join_hop.display(),
763                circuit.unique_id(),
764            )
765        })?;
766
767        Ok(!join_circhop.ccontrol().can_send())
768    }
769
770    /// Returns the `HopNum` of the join point
771    /// if [`next_circ_action`](Self::next_circ_action)
772    /// should avoid polling the join point streams entirely.
773    #[cfg(feature = "conflux")]
774    fn should_skip_join_point(&self) -> Result<Option<HopNum>, Bug> {
775        let Some(primary_join_point) = self.primary_join_point() else {
776            // Single-path, there is no join point
777            return Ok(None);
778        };
779
780        let join_hop = primary_join_point.1;
781        let primary_blocked_on_cc = {
782            let primary = self
783                .leg(self.primary_id)
784                .ok_or_else(|| internal!("primary leg disappeared?!"))?;
785            Self::is_join_point_blocked_on_cc(join_hop, primary)?
786        };
787
788        if !primary_blocked_on_cc {
789            // Easy, we can just carry on
790            return Ok(None);
791        }
792
793        // Now, if the primary *is* blocked on cc, we may still be able to poll
794        // the join point streams (if we're using the right desired UX)
795        let exclude = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
796            // The primary leg is blocked on cc, and we can't switch because we're
797            // not using the high throughput algorithm, so we must stop reading
798            // the join point streams.
799            //
800            // Note: if the selected algorithm is HIGH_THROUGHPUT,
801            // it's okay to continue reading from the edge connection,
802            // because maybe_update_primary_leg() will select a new,
803            // non-blocked primary leg, just before sending.
804            trace!(
805                tunnel_id = %self.tunnel_id,
806                join_point = ?primary_join_point,
807                reason = "sending leg blocked on congestion control",
808                "Pausing join point stream reads"
809            );
810
811            Some(join_hop)
812        } else {
813            // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
814            // to an unblocked leg before sending any cells over the join point,
815            // as long as there are some unblocked legs.
816
817            // TODO: figure out how to rewrite this with an idiomatic iterator combinator
818            let mut all_blocked_on_cc = true;
819            for leg in &self.legs {
820                all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
821                if !all_blocked_on_cc {
822                    break;
823                }
824            }
825
826            if all_blocked_on_cc {
827                // All legs are blocked on cc, so we must stop reading from
828                // the join point streams for now.
829                trace!(
830                    tunnel_id = %self.tunnel_id,
831                    join_point = ?primary_join_point,
832                    reason = "all legs blocked on congestion control",
833                    "Pausing join point stream reads"
834                );
835
836                Some(join_hop)
837            } else {
838                // At least one leg is not blocked, so we can continue reading
839                // from the join point streams
840                None
841            }
842        };
843
844        Ok(exclude)
845    }
846
847    /// Returns the next ready [`CircuitAction`],
848    /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
849    ///
850    /// Will return an error if there are no circuits in this set,
851    /// or other internal errors occur.
852    ///
853    /// This is cancellation-safe.
854    #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
855    pub(super) fn next_circ_action<'a>(
856        &'a mut self,
857        runtime: &'a tor_rtcompat::DynTimeProvider,
858    ) -> Result<impl Future<Output = Result<CircuitAction, crate::Error>> + 'a, Bug> {
859        // Avoid polling the streams on the join point if our primary
860        // leg is blocked on cc
861        cfg_if::cfg_if! {
862            if #[cfg(feature = "conflux")] {
863                let exclude_hop = self.should_skip_join_point()?;
864            } else {
865                let exclude_hop: Option<HopNum> = None;
866            }
867        };
868
869        Ok(self.legs
870            .iter_mut()
871            .map(|leg| {
872                let unique_id = leg.unique_id();
873                let tunnel_id = self.tunnel_id;
874
875                // The client SHOULD abandon and close circuit if the LINKED message takes too long to
876                // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
877                // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
878                // implementation currently uses Circuit Build Timeout).
879                let conflux_hs_timeout = if leg.conflux_status() == Some(ConfluxStatus::Pending) {
880                    if let Some(timeout) = leg.conflux_hs_timeout() {
881                        // TODO: ask Diziet if we can have a sleep_until_instant() function
882                        Box::pin(runtime.sleep_until_wallclock(timeout))
883                            as Pin<Box<dyn Future<Output = ()> + Send>>
884                    } else {
885                        Box::pin(std::future::pending())
886                    }
887                } else {
888                    Box::pin(std::future::pending())
889                };
890
891                let mut ready_streams = leg.ready_streams_iterator(exclude_hop);
892                let input = &mut leg.input;
893                // TODO: we don't really need prepare_send_from here
894                // because the inner select_biased! is cancel-safe.
895                // We should replace this with a simple sink readiness check
896                let send_fut = leg.chan_sender.prepare_send_from(async move {
897                    // A future to wait for the next ready stream.
898                    let next_ready_stream = async {
899                        match ready_streams.next().await {
900                            Some(x) => x,
901                            None => {
902                                info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
903                                // There are no ready streams (for example, they may all be
904                                // blocked due to congestion control), so there is nothing
905                                // to do.
906                                // We await an infinitely pending future so that we don't
907                                // immediately return a `None` in the `select_biased!` below.
908                                // We'd rather wait on `input.next()` than immediately return with
909                                // no `CircuitAction`, which could put the reactor into a spin loop.
910                                let () = std::future::pending().await;
911                                unreachable!();
912                            }
913                        }
914                    };
915
916                    // NOTE: the stream returned by this function is polled in the select_biased!
917                    // from Reactor::run_once(), so each block from *this* select_biased! must be
918                    // cancellation-safe
919                    select_biased! {
920                        // Check whether we've got an input message pending.
921                        ret = input.next().fuse() => {
922                            let Some(cell) = ret else {
923                                return Ok(CircuitAction::RemoveLeg {
924                                    leg: unique_id,
925                                    reason: RemoveLegReason::ChannelClosed,
926                                });
927                            };
928
929                            Ok(CircuitAction::HandleCell { leg: unique_id, cell })
930                        },
931                        ret = next_ready_stream.fuse() => {
932                            let ret = ret.map(|cmd| {
933                                Ok(CircuitAction::RunCmd { leg: unique_id, cmd })
934                            });
935
936                            flatten(ret)
937                        },
938                    }
939                });
940
941                let mut send_fut = Box::pin(send_fut);
942
943                async move {
944                    select_biased! {
945                        () = conflux_hs_timeout.fuse() => {
946                            warn!(
947                                tunnel_id = %tunnel_id,
948                                circ_id = %unique_id,
949                                "Conflux handshake timed out on circuit"
950                            );
951
952                            // Conflux handshake has timed out, time to remove this circuit leg,
953                            // and notify the handshake initiator.
954                            Ok(Ok(CircuitAction::RemoveLeg {
955                                leg: unique_id,
956                                reason: RemoveLegReason::ConfluxHandshakeTimeout,
957                            }))
958                        }
959                        ret = send_fut => {
960                            // Note: We don't actually use the returned SinkSendable,
961                            // and continue writing to the SometimesUboundedSink in the reactor :(
962                            ret.map(|ret| ret.0)
963                        }
964                    }
965                }
966            })
967            .collect::<FuturesUnordered<_>>()
968            // We only return the first ready action as a Future.
969            // Can't use `next()` since it borrows the stream.
970            .into_future()
971            .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
972            // Clean up the nested `Result`s before returning to the caller.
973            .map(|res| flatten(flatten(res))))
974    }
975
976    /// The join point on the current primary leg.
977    pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
978        self.join_point
979            .as_ref()
980            .map(|join_point| (self.primary_id, join_point.hop))
981    }
982
983    /// Does congestion control use stream SENDMEs for the given hop?
984    ///
985    /// Returns `None` if either the `leg` or `hop` don't exist.
986    pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
987        self.leg(leg)?.uses_stream_sendme(hop)
988    }
989
990    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
991    ///
992    /// See [`Circuit::send_relay_cell`].
993    pub(super) async fn send_relay_cell_on_leg(
994        &mut self,
995        msg: SendRelayCell,
996        leg: Option<UniqId>,
997    ) -> crate::Result<()> {
998        let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
999        let leg = if let Some(join_point) = conflux_join_point {
1000            // Conflux circuits always send multiplexed relay commands to
1001            // to the last hop (the join point).
1002            if cmd_counts_towards_seqno(msg.cell.cmd()) {
1003                if msg.hop != join_point {
1004                    // For leaky pipe, we must continue using the original leg
1005                    leg
1006                } else {
1007                    let old_primary_leg = self.primary_id;
1008                    // Check if it's time to switch our primary leg.
1009                    #[cfg(feature = "conflux")]
1010                    if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1011                        trace!(
1012                            old = ?old_primary_leg,
1013                            new = ?self.primary_id,
1014                            "Switching primary conflux leg..."
1015                        );
1016
1017                        self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1018                    }
1019
1020                    // Use the possibly updated primary leg
1021                    Some(self.primary_id)
1022                }
1023            } else {
1024                // Non-multiplexed commands go on their original
1025                // circuit and hop
1026                leg
1027            }
1028        } else {
1029            // If there is no join point, it means this is not
1030            // a multi-path tunnel, so we continue using
1031            // the leg_id/hop the cmd came from.
1032            leg
1033        };
1034
1035        let leg = leg.unwrap_or(self.primary_id);
1036
1037        let circ = self
1038            .leg_mut(leg)
1039            .ok_or_else(|| internal!("leg disappeared?!"))?;
1040
1041        circ.send_relay_cell(msg).await
1042    }
1043
1044    /// Send a LINK cell down each unlinked leg.
1045    #[cfg(feature = "conflux")]
1046    pub(super) async fn link_circuits(
1047        &mut self,
1048        runtime: &tor_rtcompat::DynTimeProvider,
1049    ) -> crate::Result<()> {
1050        let (_leg_id, join_point) = self
1051            .primary_join_point()
1052            .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1053
1054        // Link all the circuits that haven't started the conflux handshake yet.
1055        for circ in self
1056            .legs
1057            .iter_mut()
1058            // TODO: it is an internal error if any of the legs don't have a conflux handler
1059            // (i.e. if conflux_status() returns None)
1060            .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1061        {
1062            let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1063            let link = ConfluxLink::new(v1_payload);
1064            let cell = AnyRelayMsgOuter::new(None, link.into());
1065
1066            circ.begin_conflux_link(join_point, cell, runtime).await?;
1067        }
1068
1069        // TODO(conflux): the caller should take care to not allow opening streams
1070        // until the conflux set is ready (i.e. until at least one of the legs completes
1071        // the handshake).
1072        //
1073        // We will probably need a channel for notifying the caller
1074        // of handshake completion/conflux set readiness
1075
1076        Ok(())
1077    }
1078
1079    /// Get the number of unlinked or non-conflux legs.
1080    #[cfg(feature = "conflux")]
1081    pub(super) fn num_unlinked(&self) -> usize {
1082        self.circuits()
1083            .filter(|circ| {
1084                let status = circ.conflux_status();
1085                status.is_none() || status == Some(ConfluxStatus::Unlinked)
1086            })
1087            .count()
1088    }
1089
1090    /// Check if the specified sequence number is the sequence number of the
1091    /// next message we're expecting to handle.
1092    pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1093        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1094        seq_recv == last_seq_delivered + 1
1095    }
1096
1097    /// Remove the circuit leg with the specified `UniqId` from this conflux set.
1098    ///
1099    /// Unlike [`ConfluxSet::remove`], this function does not check
1100    /// if the removal of the leg ought to trigger a reactor shutdown.
1101    ///
1102    /// Returns an error if the leg doesn't exit in the conflux set.
1103    fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
1104        let idx = self
1105            .legs
1106            .iter()
1107            .position(|circ| circ.unique_id() == circ_id)
1108            .ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
1109
1110        Ok(self.legs.remove(idx))
1111    }
1112}
1113
1114/// An error returned when a method is expecting a single-leg conflux circuit,
1115/// but it is not single-leg.
1116#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
1117pub(super) struct NotSingleLegError(#[source] Bug);
1118
1119impl From<NotSingleLegError> for Bug {
1120    fn from(e: NotSingleLegError) -> Self {
1121        e.0
1122    }
1123}
1124
1125impl From<NotSingleLegError> for crate::Error {
1126    fn from(e: NotSingleLegError) -> Self {
1127        Self::from(e.0)
1128    }
1129}
1130
1131impl From<NotSingleLegError> for ReactorError {
1132    fn from(e: NotSingleLegError) -> Self {
1133        Self::from(e.0)
1134    }
1135}
1136
1137impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
1138    fn from(e: ExactlyOneError<I>) -> Self {
1139        // TODO: cannot wrap the ExactlyOneError with into_bad_api_usage
1140        // because it's not Send + Sync
1141        Self(bad_api_usage!("not a single leg conflux set ({e})"))
1142    }
1143}
1144
1145/// Whether the specified `cmd` counts towards the conflux sequence numbers.
1146fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
1147    // Note: copy-pasted from c-tor
1148    match cmd {
1149        // These are all fine to multiplex, and must be so that ordering is preserved
1150        RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
1151
1152        // We can't multiplex these because they are circuit-specific
1153        RelayCmd::SENDME
1154        | RelayCmd::EXTEND
1155        | RelayCmd::EXTENDED
1156        | RelayCmd::TRUNCATE
1157        | RelayCmd::TRUNCATED
1158        | RelayCmd::DROP => false,
1159
1160        //  We must multiplex RESOLVEs because their ordering impacts begin/end.
1161        RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
1162
1163        // These are all circuit-specific
1164        RelayCmd::BEGIN_DIR
1165        | RelayCmd::EXTEND2
1166        | RelayCmd::EXTENDED2
1167        | RelayCmd::ESTABLISH_INTRO
1168        | RelayCmd::ESTABLISH_RENDEZVOUS
1169        | RelayCmd::INTRODUCE1
1170        | RelayCmd::INTRODUCE2
1171        | RelayCmd::RENDEZVOUS1
1172        | RelayCmd::RENDEZVOUS2
1173        | RelayCmd::INTRO_ESTABLISHED
1174        | RelayCmd::RENDEZVOUS_ESTABLISHED
1175        | RelayCmd::INTRODUCE_ACK
1176        | RelayCmd::PADDING_NEGOTIATE
1177        | RelayCmd::PADDING_NEGOTIATED => false,
1178
1179        // Flow control cells must be ordered (see prop 329).
1180        RelayCmd::XOFF | RelayCmd::XON => true,
1181
1182        // These two are not multiplexed, because they must be processed immediately
1183        // to update sequence numbers before any other cells are processed on the circuit
1184        RelayCmd::CONFLUX_SWITCH
1185        | RelayCmd::CONFLUX_LINK
1186        | RelayCmd::CONFLUX_LINKED
1187        | RelayCmd::CONFLUX_LINKED_ACK => false,
1188
1189        _ => {
1190            tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1191            false
1192        }
1193    }
1194}
1195
1196#[cfg(test)]
1197mod test {
1198    // Tested in [`crate::tunnel::circuit::test`].
1199}