tor_proto/tunnel/reactor/
conflux.rs

1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::{Arc, Mutex};
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use smallvec::{smallvec, SmallVec};
14use tor_rtcompat::SleepProviderExt as _;
15use tracing::{info, trace, warn};
16
17use tor_async_utils::SinkPrepareExt as _;
18use tor_basic_utils::flatten;
19use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
20use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
21use tor_linkspec::HasRelayIds as _;
22
23use crate::circuit::path::HopDetail;
24use crate::circuit::{TunnelMutableState, UniqId};
25use crate::crypto::cell::HopNum;
26use crate::tunnel::reactor::circuit::ConfluxStatus;
27use crate::tunnel::{streammap, TunnelId};
28use crate::util::err::ReactorError;
29
30use super::circuit::CircHop;
31use super::{Circuit, CircuitAction, RemoveLegReason, SendRelayCell};
32
33#[cfg(feature = "conflux")]
34use {
35    tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
36    tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
37};
38
39#[cfg(feature = "conflux")]
40pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
41
42/// The maximum number of conflux legs to store in the conflux set SmallVec.
43///
44/// Attempting to store more legs will cause the SmallVec to spill to the heap.
45///
46/// Note: this value was picked arbitrarily and may not be suitable.
47const MAX_CONFLUX_LEGS: usize = 16;
48
49/// A set with one or more circuits.
50///
51/// ### Conflux set life cycle
52///
53/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
54///
55/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
56///
57/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
58/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
59///
60/// The reactor can then turn the `ConfluxSet` into a multi-path set
61/// (a multi-path set is a conflux set that contains more than 1 circuit).
62/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
63/// by the reactor user (also referred to as the "conflux handshake initiator").
64/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
65///
66/// Circuits can be removed from the set using [`ConfluxSet::remove`].
67///
68/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
69/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
70/// This can happen on an explicit shutdown request, or if a fatal error occurs.
71///
72/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
73/// For example, if after being instructed to remove a circuit from the set
74/// using [`ConfluxSet::remove`], the set is completely depleted,
75/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
76/// which will cause the reactor to shut down.
77pub(super) struct ConfluxSet {
78    /// The unique identifier of the tunnel this conflux set belongs to.
79    ///
80    /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
81    /// that gets used for logging purposes.
82    tunnel_id: TunnelId,
83    /// The circuits in this conflux set.
84    legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
85    /// Tunnel state, shared with `ClientCirc`.
86    ///
87    /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
88    mutable: Arc<TunnelMutableState>,
89    /// The unique identifier of the primary leg
90    primary_id: UniqId,
91    /// The join point of the set, if this is a multi-path set.
92    ///
93    /// Initially the conflux set starts out as a single-path set with no join point.
94    /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
95    /// the join point is initialized to the last hop in the tunnel.
96    //
97    // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
98    // to ensure the HopNum of the join point is the same for all of them.
99    //
100    // In the future we might want to relax this restriction.
101    join_point: Option<JoinPoint>,
102    /// The nonce associated with the circuits from this set.
103    #[cfg(feature = "conflux")]
104    nonce: V1Nonce,
105    /// The desired UX
106    #[cfg(feature = "conflux")]
107    desired_ux: V1DesiredUx,
108    /// The absolute sequence number of the last cell delivered to a stream.
109    ///
110    /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
111    ///
112    /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
113    /// of the leg compares the (leg-local) sequence number of the message
114    /// with this sequence number to determine whether the message is in-order.
115    ///
116    /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
117    /// to deliver it to its corresponding stream.
118    ///
119    /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
120    /// to instruct the reactor to buffer the message.
121    last_seq_delivered: Arc<AtomicU64>,
122    /// Whether we have selected our initial primary leg,
123    /// if this is a multipath conflux set.
124    selected_init_primary: bool,
125}
126
127/// The conflux join point.
128#[derive(Clone, derive_more::Debug)]
129struct JoinPoint {
130    /// The hop number.
131    hop: HopNum,
132    /// The [`HopDetail`] of the hop.
133    detail: HopDetail,
134    /// The stream map of the joint point, shared with each circuit leg.
135    #[debug(skip)]
136    streams: Arc<Mutex<streammap::StreamMap>>,
137}
138
139impl ConfluxSet {
140    /// Create a new conflux set, consisting of a single leg.
141    ///
142    /// Returns the newly created set and a reference to its [`TunnelMutableState`].
143    pub(super) fn new(
144        tunnel_id: TunnelId,
145        circuit_leg: Circuit,
146    ) -> (Self, Arc<TunnelMutableState>) {
147        let primary_id = circuit_leg.unique_id();
148        let circ_mutable = Arc::clone(circuit_leg.mutable());
149        let legs = smallvec![circuit_leg];
150        // Note: the join point is only set for multi-path tunnels
151        let join_point = None;
152
153        // TODO(#2035): read this from the consensus/config.
154        #[cfg(feature = "conflux")]
155        let desired_ux = V1DesiredUx::NO_OPINION;
156
157        let mutable = Arc::new(TunnelMutableState::default());
158        mutable.insert(primary_id, circ_mutable);
159
160        let set = Self {
161            tunnel_id,
162            legs,
163            primary_id,
164            join_point,
165            mutable: mutable.clone(),
166            #[cfg(feature = "conflux")]
167            nonce: V1Nonce::new(&mut rand::rng()),
168            #[cfg(feature = "conflux")]
169            desired_ux,
170            last_seq_delivered: Arc::new(AtomicU64::new(0)),
171            selected_init_primary: false,
172        };
173
174        (set, mutable)
175    }
176
177    /// Remove and return the only leg of this conflux set.
178    ///
179    /// Returns an error if there is more than one leg in the set,
180    /// or if called before any circuit legs are available.
181    ///
182    /// Calling this function will empty the [`ConfluxSet`].
183    pub(super) fn take_single_leg(&mut self) -> Result<Circuit, NotSingleLegError> {
184        let primary_index =
185            element_idx(self.legs.iter(), self.primary_id).ok_or(NotSingleError::None)?;
186        Ok(self.legs.remove(primary_index))
187    }
188
189    /// Return a reference to the only leg of this conflux set,
190    /// along with the leg's ID.
191    ///
192    /// Returns an error if there is more than one leg in the set,
193    /// or if called before any circuit legs are available.
194    pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
195        Ok(get_single(self.legs.iter())?)
196    }
197
198    /// Return a mutable reference to the only leg of this conflux set,
199    /// along with the leg's ID.
200    ///
201    /// Returns an error if there is more than one leg in the set,
202    /// or if called before any circuit legs are available.
203    pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
204        Ok(get_single(self.legs.iter_mut())?)
205    }
206
207    /// Return the primary leg of this conflux set.
208    ///
209    /// Returns an error if called before any circuit legs are available.
210    pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
211        #[cfg(not(feature = "conflux"))]
212        if self.legs.len() > 1 {
213            return Err(internal!(
214                "got multipath tunnel, but conflux feature is disabled?!"
215            ));
216        }
217
218        if self.legs.is_empty() {
219            Err(bad_api_usage!(
220                "tried to get circuit leg before creating it?!"
221            ))
222        } else {
223            let circ = self
224                .leg_mut(self.primary_id)
225                .ok_or_else(|| internal!("conflux set is empty?!"))?;
226
227            Ok(circ)
228        }
229    }
230
231    /// Return a reference to the leg of this conflux set with the given id.
232    pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
233        self.legs.iter().find(|circ| circ.unique_id() == leg_id)
234    }
235
236    /// Return a mutable reference to the leg of this conflux set with the given id.
237    pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
238        self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
239    }
240
241    /// Return the number of legs in this conflux set.
242    pub(super) fn len(&self) -> usize {
243        self.legs.len()
244    }
245
246    /// Return whether this conflux set is empty.
247    pub(super) fn is_empty(&self) -> bool {
248        self.legs.len() == 0
249    }
250
251    /// Remove the specified leg from this conflux set.
252    ///
253    /// Returns an error if the given leg doesn't exist in the set.
254    ///
255    /// Returns an error instructing the reactor to perform a clean shutdown
256    /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
257    ///
258    ///   * the set is depleted (empty) after removing the specified leg
259    ///   * `leg` is currently the sending (primary) leg of this set
260    ///   * the closed leg had the highest non-zero last_seq_recv/sent
261    ///   * the closed leg had some in-progress data (inflight > cc_sendme_inc)
262    ///
263    /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
264    ///
265    /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
266    pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
267        let idx = element_idx(self.legs.iter(), leg)
268            .ok_or_else(|| bad_api_usage!("leg {leg:?} not found in conflux set"))?;
269        let circ: Circuit = self.legs.remove(idx);
270
271        tracing::trace!(
272            circ_id = %circ.unique_id(),
273            "Circuit removed from conflux set"
274        );
275
276        self.mutable.remove(circ.unique_id());
277
278        if self.legs.is_empty() {
279            // TODO: log the tunnel ID
280            tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
281
282            // The last circuit in the set has just died, so the reactor should exit.
283            return Err(ReactorError::Shutdown);
284        }
285
286        if leg == self.primary_id {
287            // We have just removed our sending leg,
288            // so it's time to close the entire conflux set.
289            return Err(ReactorError::Shutdown);
290        }
291
292        cfg_if::cfg_if! {
293            if #[cfg(feature = "conflux")] {
294                self.remove_conflux(circ)
295            } else {
296                // Conflux is disabled, so we can't possibly continue running if the only
297                // leg in the tunnel is gone.
298                //
299                // Technically this should be unreachable (because of the is_empty()
300                // check above)
301                Err(internal!("Multiple legs in single-path tunnel?!").into())
302            }
303        }
304    }
305
306    /// Handle the removal of a circuit,
307    /// returning an error if the reactor needs to shut down.
308    #[cfg(feature = "conflux")]
309    fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
310        let Some(status) = circ.conflux_status() else {
311            return Err(internal!("Found non-conflux circuit in conflux set?!").into());
312        };
313
314        // TODO(conflux): should the circmgr be notified about the leg removal?
315        //
316        // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
317        // is closed, subject to the limits in [SIDE_CHANNELS]."
318
319        // If we've reached this point and the conflux set is non-empty,
320        // it means it's a multi-path set.
321        //
322        // Time to check if we need to tear down the entire set.
323        match status {
324            ConfluxStatus::Unlinked => {
325                // This circuit hasn't yet begun the conflux handshake,
326                // so we can safely remove it from the set
327                Ok(circ)
328            }
329            ConfluxStatus::Pending | ConfluxStatus::Linked => {
330                let (circ_last_seq_recv, circ_last_seq_sent) =
331                    (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
332
333                // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
334                if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
335                    if circ_last_seq_recv > max_last_seq_recv {
336                        return Err(ReactorError::Shutdown);
337                    }
338                }
339
340                if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
341                    if circ_last_seq_sent > max_last_seq_sent {
342                        return Err(ReactorError::Shutdown);
343                    }
344                }
345
346                let hop = self.join_point_hop(&circ)?;
347
348                let (inflight, cwnd) = (|| {
349                    let ccontrol = hop.ccontrol();
350                    let inflight = ccontrol.inflight()?;
351                    let cwnd = ccontrol.cwnd()?;
352
353                    Some((inflight, cwnd))
354                })()
355                .ok_or_else(|| {
356                    internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
357                })?;
358
359                // If data is in progress on the leg (inflight > cc_sendme_inc),
360                // then all legs must be closed
361                if inflight >= cwnd.params().sendme_inc() {
362                    return Err(ReactorError::Shutdown);
363                }
364
365                Ok(circ)
366            }
367        }
368    }
369
370    /// Return the maximum relative last_seq_recv across all circuits.
371    #[cfg(feature = "conflux")]
372    fn max_last_seq_recv(&self) -> Option<u64> {
373        self.legs
374            .iter()
375            .filter_map(|leg| leg.last_seq_recv().ok())
376            .max()
377    }
378
379    /// Return the maximum relative last_seq_sent across all circuits.
380    #[cfg(feature = "conflux")]
381    fn max_last_seq_sent(&self) -> Option<u64> {
382        self.legs
383            .iter()
384            .filter_map(|leg| leg.last_seq_sent().ok())
385            .max()
386    }
387
388    /// Get the [`CircHop`] of the join point on the specified `circ`,
389    /// returning an error if this is a single path conflux set.
390    fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
391        let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
392            return Err(internal!("No join point on conflux tunnel?!"));
393        };
394
395        circ.hop(join_point)
396            .ok_or_else(|| internal!("Conflux join point disappeared?!"))
397    }
398
399    /// Return an iterator of all circuits in the conflux set.
400    fn circuits(&self) -> impl Iterator<Item = &Circuit> {
401        self.legs.iter()
402    }
403
404    /// Add legs to the this conflux set.
405    ///
406    /// Returns an error if any of the legs is invalid.
407    ///
408    /// A leg is considered valid if
409    ///
410    ///   * the circuit has the same length as all the other circuits in the set
411    ///   * its last hop is equal to the designated join point
412    ///   * the circuit has no streams attached to any of its hops
413    ///   * the circuit is not already part of a conflux set
414    ///
415    /// Note: the circuits will not begin linking until
416    /// [`link_circuits`](Self::link_circuits) is called.
417    ///
418    /// IMPORTANT: this function does not prevent the construction of conflux sets
419    /// where the circuit legs share guard or middle relays. It is the responsibility
420    /// of the caller to enforce the following invariant from prop354:
421    ///
422    /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
423    /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
424    /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
425    ///
426    /// This is because at this level we don't actually know which relays are the guards,
427    /// so we can't know if the join point happens to be one of the Guard + Exit relays.
428    #[cfg(feature = "conflux")]
429    pub(super) fn add_legs(
430        &mut self,
431        legs: Vec<Circuit>,
432        runtime: &tor_rtcompat::DynTimeProvider,
433    ) -> Result<(), Bug> {
434        if legs.is_empty() {
435            return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
436        }
437
438        let join_point = match self.join_point.take() {
439            Some(p) => {
440                // Preserve the existing join point, if there is one.
441                p
442            }
443            None => {
444                let (hop, detail, streams) = (|| {
445                    let first_leg = self.circuits().next()?;
446                    let first_leg_path = first_leg.path();
447                    let all_hops = first_leg_path.all_hops();
448                    let hop_num = first_leg.last_hop_num()?;
449                    let detail = all_hops.last()?;
450                    let hop = first_leg.hop(hop_num)?;
451                    let streams = Arc::clone(hop.stream_map());
452                    Some((hop_num, detail.clone(), streams))
453                })()
454                .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
455
456                JoinPoint {
457                    hop,
458                    detail,
459                    streams,
460                }
461            }
462        };
463
464        // Check two HopDetails for equality.
465        //
466        // Returns an error if one of the hops is virtual.
467        let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
468            match (h1, h2) {
469                (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => Ok(t1.same_relay_ids(t2)),
470                #[cfg(feature = "hs-common")]
471                (HopDetail::Virtual, HopDetail::Virtual) => {
472                    // TODO(#2016): support onion service conflux
473                    Err(internal!("onion service conflux not supported"))
474                }
475                _ => Ok(false),
476            }
477        };
478
479        // A leg is considered valid if
480        //
481        //   * the circuit has the expected length
482        //     (the length of the first circuit we added to the set)
483        //   * its last hop is equal to the designated join point
484        //     (the last hop of the first circuit we added)
485        //   * the circuit has no streams attached to any of its hops
486        //   * the circuit is not already part of a conflux tunnel
487        //
488        // Returns an error if any hops are virtual.
489        let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
490            use crate::ccparams::Algorithm;
491
492            let path = leg.path();
493            let Some(last_hop) = path.all_hops().last() else {
494                // A circuit with no hops is invalid
495                return Ok(false);
496            };
497
498            // TODO: this sort of duplicates the check above.
499            // The difference is that above we read the hop detail
500            // information from the circuit Path, whereas here we get
501            // the actual last CircHop of the circuit.
502            let Some(last_hop_num) = leg.last_hop_num() else {
503                // A circuit with no hops is invalid
504                return Ok(false);
505            };
506
507            let circhop = leg
508                .hop(last_hop_num)
509                .ok_or_else(|| internal!("hop disappeared?!"))?;
510
511            // Ensure we negotiated a suitable cc algorithm
512            let is_cc_suitable = match circhop.ccontrol().algorithm() {
513                Algorithm::FixedWindow(_) => false,
514                Algorithm::Vegas(_) => true,
515            };
516
517            if !is_cc_suitable {
518                return Ok(false);
519            }
520
521            Ok(last_hop_num == join_point.hop
522                && hops_eq(last_hop, &join_point.detail)?
523                && !leg.has_streams()
524                && leg.conflux_status().is_none())
525        };
526
527        for leg in &legs {
528            if !leg_is_valid(leg)? {
529                return Err(bad_api_usage!("one more more conflux circuits are invalid"));
530            }
531        }
532
533        // Select a join point, or put the existing one back into self.
534        self.join_point = Some(join_point.clone());
535
536        // The legs are valid, so add them to the set.
537        for circ in legs {
538            let mutable = Arc::clone(circ.mutable());
539            let unique_id = circ.unique_id();
540            self.legs.push(circ);
541            // Merge the mutable state of the circuit into our tunnel state.
542            self.mutable.insert(unique_id, mutable);
543        }
544
545        for circ in self.legs.iter_mut() {
546            // The circuits that have a None status don't know they're part of
547            // a multi-path tunnel yet. They need to be initialized with a
548            // conflux message handler, and have their join point fixed up
549            // to share a stream map with the join point on all the other circuits.
550            if circ.conflux_status().is_none() {
551                let conflux_handler = ConfluxMsgHandler::new_client(
552                    join_point.hop,
553                    self.nonce,
554                    Arc::clone(&self.last_seq_delivered),
555                    runtime.clone(),
556                );
557
558                circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
559
560                // Ensure the stream map of the last hop is shared by all the legs
561                let last_hop = circ
562                    .hop_mut(join_point.hop)
563                    .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
564                last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
565            }
566        }
567
568        Ok(())
569    }
570
571    /// Try to update the primary leg based on the configured desired UX,
572    /// if needed.
573    ///
574    /// Returns the SWITCH cell to send on the primary leg,
575    /// if we switched primary leg.
576    #[cfg(feature = "conflux")]
577    pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
578        use tor_error::into_internal;
579
580        let Some(join_point) = self.join_point.as_ref() else {
581            // Return early if this is not a multi-path tunnel
582            return Ok(None);
583        };
584
585        let join_point = join_point.hop;
586
587        if !self.should_update_primary_leg() {
588            // Nothing to do
589            return Ok(None);
590        }
591
592        let Some(new_primary_id) = self.select_primary_leg()? else {
593            // None of the legs satisfy our UX requirements, continue using the existing one.
594            return Ok(None);
595        };
596
597        // Check that the newly selected leg is actually different from the previous
598        if self.primary_id == new_primary_id {
599            // The primary leg stays the same, nothing to do.
600            return Ok(None);
601        }
602
603        let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
604        self.primary_id = new_primary_id;
605        let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
606
607        // If this fails, it means we haven't updated our primary leg in a very long time.
608        //
609        // TODO(#2036): there are currently no safeguards to prevent us from staying
610        // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
611        // such that it forces us to switch legs periodically, to prevent the seqno delta from
612        // getting too big?
613        let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
614            into_internal!("Seqno delta for switch does not fit in u32?!"),
615        )?;
616
617        // We need to carry the last_seq_sent over to the next leg
618        // (the next cell sent will have seqno = prev_last_seq_sent + 1)
619        self.primary_leg_mut()?
620            .set_last_seq_sent(prev_last_seq_sent)?;
621
622        let switch = ConfluxSwitch::new(seqno_delta);
623        let cell = AnyRelayMsgOuter::new(None, switch.into());
624        Ok(Some(SendRelayCell {
625            hop: join_point,
626            early: false,
627            cell,
628        }))
629    }
630
631    /// Whether it's time to select a new primary leg.
632    #[cfg(feature = "conflux")]
633    fn should_update_primary_leg(&mut self) -> bool {
634        if !self.selected_init_primary {
635            self.maybe_select_init_primary();
636            return false;
637        }
638
639        // If we don't have at least 2 legs,
640        // we can't switch our primary leg.
641        if self.legs.len() < 2 {
642            return false;
643        }
644
645        // TODO(conflux-tuning): if it turns out we switch legs too frequently,
646        // we might want to implement some sort of rate-limiting here
647        // (see c-tor's conflux_can_switch).
648
649        true
650    }
651
652    /// Return the best leg according to the configured desired UX.
653    ///
654    /// Returns `None` if no suitable leg was found.
655    #[cfg(feature = "conflux")]
656    fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
657        match self.desired_ux {
658            V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
659                self.select_primary_leg_min_rtt(false)
660            }
661            V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
662            V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
663                // TODO(conflux-tuning): add support for low-memory algorithms
664                self.select_primary_leg_min_rtt(false)
665            }
666            _ => {
667                // Default to MIN_RTT if we don't recognize the desired UX value
668                warn!(
669                    tunnel_id = %self.tunnel_id,
670                    "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
671                    self.desired_ux
672                );
673                self.select_primary_leg_min_rtt(false)
674            }
675        }
676    }
677
678    /// Try to choose an initial primary leg, if we have an initial RTT measurement
679    /// for at least one of the legs.
680    #[cfg(feature = "conflux")]
681    fn maybe_select_init_primary(&mut self) {
682        let best = self
683            .legs
684            .iter()
685            .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
686            .min_by_key(|(_leg, rtt)| *rtt)
687            .map(|(leg, _rtt)| leg.unique_id());
688
689        if let Some(best) = best {
690            self.primary_id = best;
691            self.selected_init_primary = true;
692        }
693    }
694
695    /// Return the leg with the best (lowest) RTT.
696    ///
697    /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
698    ///
699    /// Returns `None` if no suitable leg was found.
700    #[cfg(feature = "conflux")]
701    fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
702        let mut best = None;
703
704        for circ in self.legs.iter() {
705            let leg_id = circ.unique_id();
706            let join_point = self.join_point_hop(circ)?;
707            let ccontrol = join_point.ccontrol();
708
709            if check_can_send && !ccontrol.can_send() {
710                continue;
711            }
712
713            let rtt = ccontrol.rtt();
714            let init_rtt_usec = || {
715                circ.init_rtt()
716                    .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
717            };
718
719            let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
720                return Err(internal!(
721                    "attempted to select primary leg before handshake completed?!"
722                ));
723            };
724
725            match best.take() {
726                None => {
727                    best = Some((leg_id, ewma_rtt));
728                }
729                Some(best_so_far) => {
730                    if best_so_far.1 <= ewma_rtt {
731                        best = Some(best_so_far);
732                    } else {
733                        best = Some((leg_id, ewma_rtt));
734                    }
735                }
736            }
737        }
738
739        Ok(best.map(|(leg_id, _)| leg_id))
740    }
741
742    /// Returns `true` if our conflux join point is blocked on congestion control
743    /// on the specified `circuit`.
744    ///
745    /// Returns `false` if the join point is not blocked on cc,
746    /// or if this is a single-path set.
747    ///
748    /// Returns an error if this is a multipath tunnel,
749    /// but the joint point hop doesn't exist on the specified circuit.
750    #[cfg(feature = "conflux")]
751    fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
752        let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
753            internal!(
754                "Join point hop {} not found on circuit {}?!",
755                join_hop.display(),
756                circuit.unique_id(),
757            )
758        })?;
759
760        Ok(!join_circhop.ccontrol().can_send())
761    }
762
763    /// Returns the `HopNum` of the join point
764    /// if [`next_circ_action`](Self::next_circ_action)
765    /// should avoid polling the join point streams entirely.
766    #[cfg(feature = "conflux")]
767    fn should_skip_join_point(&self) -> Result<Option<HopNum>, Bug> {
768        let Some(primary_join_point) = self.primary_join_point() else {
769            // Single-path, there is no join point
770            return Ok(None);
771        };
772
773        let join_hop = primary_join_point.1;
774        let primary_blocked_on_cc = {
775            let primary = self
776                .leg(self.primary_id)
777                .ok_or_else(|| internal!("primary leg disappeared?!"))?;
778            Self::is_join_point_blocked_on_cc(join_hop, primary)?
779        };
780
781        if !primary_blocked_on_cc {
782            // Easy, we can just carry on
783            return Ok(None);
784        }
785
786        // Now, if the primary *is* blocked on cc, we may still be able to poll
787        // the join point streams (if we're using the right desired UX)
788        let exclude = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
789            // The primary leg is blocked on cc, and we can't switch because we're
790            // not using the high throughput algorithm, so we must stop reading
791            // the join point streams.
792            //
793            // Note: if the selected algorithm is HIGH_THROUGHPUT,
794            // it's okay to continue reading from the edge connection,
795            // because maybe_update_primary_leg() will select a new,
796            // non-blocked primary leg, just before sending.
797            trace!(
798                tunnel_id = %self.tunnel_id,
799                join_point = ?primary_join_point,
800                reason = "sending leg blocked on congestion control",
801                "Pausing join point stream reads"
802            );
803
804            Some(join_hop)
805        } else {
806            // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
807            // to an unblocked leg before sending any cells over the join point,
808            // as long as there are some unblocked legs.
809
810            // TODO: figure out how to rewrite this with an idiomatic iterator combinator
811            let mut all_blocked_on_cc = true;
812            for leg in &self.legs {
813                all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
814                if !all_blocked_on_cc {
815                    break;
816                }
817            }
818
819            if all_blocked_on_cc {
820                // All legs are blocked on cc, so we must stop reading from
821                // the join point streams for now.
822                trace!(
823                    tunnel_id = %self.tunnel_id,
824                    join_point = ?primary_join_point,
825                    reason = "all legs blocked on congestion control",
826                    "Pausing join point stream reads"
827                );
828
829                Some(join_hop)
830            } else {
831                // At least one leg is not blocked, so we can continue reading
832                // from the join point streams
833                None
834            }
835        };
836
837        Ok(exclude)
838    }
839
840    /// Returns the next ready [`CircuitAction`],
841    /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
842    ///
843    /// Will return an error if there are no circuits in this set,
844    /// or other internal errors occur.
845    ///
846    /// This is cancellation-safe.
847    #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
848    pub(super) fn next_circ_action<'a>(
849        &'a mut self,
850        runtime: &'a tor_rtcompat::DynTimeProvider,
851    ) -> Result<impl Future<Output = Result<CircuitAction, crate::Error>> + 'a, Bug> {
852        // Avoid polling the streams on the join point if our primary
853        // leg is blocked on cc
854        cfg_if::cfg_if! {
855            if #[cfg(feature = "conflux")] {
856                let exclude_hop = self.should_skip_join_point()?;
857            } else {
858                let exclude_hop: Option<HopNum> = None;
859            }
860        };
861
862        Ok(self.legs
863            .iter_mut()
864            .map(|leg| {
865                let unique_id = leg.unique_id();
866                let tunnel_id = self.tunnel_id;
867
868                // The client SHOULD abandon and close circuit if the LINKED message takes too long to
869                // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
870                // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
871                // implementation currently uses Circuit Build Timeout).
872                let conflux_hs_timeout = if leg.conflux_status() == Some(ConfluxStatus::Pending) {
873                    if let Some(timeout) = leg.conflux_hs_timeout() {
874                        // TODO: ask Diziet if we can have a sleep_until_instant() function
875                        Box::pin(runtime.sleep_until_wallclock(timeout))
876                            as Pin<Box<dyn Future<Output = ()> + Send>>
877                    } else {
878                        Box::pin(std::future::pending())
879                    }
880                } else {
881                    Box::pin(std::future::pending())
882                };
883
884                let mut ready_streams = leg.ready_streams_iterator(exclude_hop);
885                let input = &mut leg.input;
886                // TODO: we don't really need prepare_send_from here
887                // because the inner select_biased! is cancel-safe.
888                // We should replace this with a simple sink readiness check
889                let send_fut = leg.chan_sender.prepare_send_from(async move {
890                    // A future to wait for the next ready stream.
891                    let next_ready_stream = async {
892                        match ready_streams.next().await {
893                            Some(x) => x,
894                            None => {
895                                info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
896                                // There are no ready streams (for example, they may all be
897                                // blocked due to congestion control), so there is nothing
898                                // to do.
899                                // We await an infinitely pending future so that we don't
900                                // immediately return a `None` in the `select_biased!` below.
901                                // We'd rather wait on `input.next()` than immediately return with
902                                // no `CircuitAction`, which could put the reactor into a spin loop.
903                                let () = std::future::pending().await;
904                                unreachable!();
905                            }
906                        }
907                    };
908
909                    // NOTE: the stream returned by this function is polled in the select_biased!
910                    // from Reactor::run_once(), so each block from *this* select_biased! must be
911                    // cancellation-safe
912                    select_biased! {
913                        // Check whether we've got an input message pending.
914                        ret = input.next().fuse() => {
915                            let Some(cell) = ret else {
916                                return Ok(CircuitAction::RemoveLeg {
917                                    leg: unique_id,
918                                    reason: RemoveLegReason::ChannelClosed,
919                                });
920                            };
921
922                            Ok(CircuitAction::HandleCell { leg: unique_id, cell })
923                        },
924                        ret = next_ready_stream.fuse() => {
925                            let ret = ret.map(|cmd| {
926                                Ok(CircuitAction::RunCmd { leg: unique_id, cmd })
927                            });
928
929                            flatten(ret)
930                        },
931                    }
932                });
933
934                let mut send_fut = Box::pin(send_fut);
935
936                async move {
937                    select_biased! {
938                        () = conflux_hs_timeout.fuse() => {
939                            warn!(
940                                tunnel_id = %tunnel_id,
941                                circ_id = %unique_id,
942                                "Conflux handshake timed out on circuit"
943                            );
944
945                            // Conflux handshake has timed out, time to remove this circuit leg,
946                            // and notify the handshake initiator.
947                            Ok(Ok(CircuitAction::RemoveLeg {
948                                leg: unique_id,
949                                reason: RemoveLegReason::ConfluxHandshakeTimeout,
950                            }))
951                        }
952                        ret = send_fut => {
953                            // Note: We don't actually use the returned SinkSendable,
954                            // and continue writing to the SometimesUboundedSink in the reactor :(
955                            ret.map(|ret| ret.0)
956                        }
957                    }
958                }
959            })
960            .collect::<FuturesUnordered<_>>()
961            // We only return the first ready action as a Future.
962            // Can't use `next()` since it borrows the stream.
963            .into_future()
964            .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
965            // Clean up the nested `Result`s before returning to the caller.
966            .map(|res| flatten(flatten(res))))
967    }
968
969    /// The join point on the current primary leg.
970    pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
971        self.join_point
972            .as_ref()
973            .map(|join_point| (self.primary_id, join_point.hop))
974    }
975
976    /// Does congestion control use stream SENDMEs for the given hop?
977    ///
978    /// Returns `None` if either the `leg` or `hop` don't exist.
979    pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
980        self.leg(leg)?.uses_stream_sendme(hop)
981    }
982
983    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
984    ///
985    /// See [`Circuit::send_relay_cell`].
986    pub(super) async fn send_relay_cell_on_leg(
987        &mut self,
988        msg: SendRelayCell,
989        leg: Option<UniqId>,
990    ) -> crate::Result<()> {
991        let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
992        let leg = if let Some(join_point) = conflux_join_point {
993            // Conflux circuits always send multiplexed relay commands to
994            // to the last hop (the join point).
995            if cmd_counts_towards_seqno(msg.cell.cmd()) {
996                if msg.hop != join_point {
997                    // For leaky pipe, we must continue using the original leg
998                    leg
999                } else {
1000                    let old_primary_leg = self.primary_id;
1001                    // Check if it's time to switch our primary leg.
1002                    #[cfg(feature = "conflux")]
1003                    if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1004                        trace!(
1005                            old = ?old_primary_leg,
1006                            new = ?self.primary_id,
1007                            "Switching primary conflux leg..."
1008                        );
1009
1010                        self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1011                    }
1012
1013                    // Use the possibly updated primary leg
1014                    Some(self.primary_id)
1015                }
1016            } else {
1017                // Non-multiplexed commands go on their original
1018                // circuit and hop
1019                leg
1020            }
1021        } else {
1022            // If there is no join point, it means this is not
1023            // a multi-path tunnel, so we continue using
1024            // the leg_id/hop the cmd came from.
1025            leg
1026        };
1027
1028        let leg = leg.unwrap_or(self.primary_id);
1029
1030        let circ = self
1031            .leg_mut(leg)
1032            .ok_or_else(|| internal!("leg disappeared?!"))?;
1033
1034        circ.send_relay_cell(msg).await
1035    }
1036
1037    /// Send a LINK cell down each unlinked leg.
1038    #[cfg(feature = "conflux")]
1039    pub(super) async fn link_circuits(
1040        &mut self,
1041        runtime: &tor_rtcompat::DynTimeProvider,
1042    ) -> crate::Result<()> {
1043        let (_leg_id, join_point) = self
1044            .primary_join_point()
1045            .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1046
1047        // Link all the circuits that haven't started the conflux handshake yet.
1048        for circ in self
1049            .legs
1050            .iter_mut()
1051            // TODO: it is an internal error if any of the legs don't have a conflux handler
1052            // (i.e. if conflux_status() returns None)
1053            .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1054        {
1055            let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1056            let link = ConfluxLink::new(v1_payload);
1057            let cell = AnyRelayMsgOuter::new(None, link.into());
1058
1059            circ.begin_conflux_link(join_point, cell, runtime).await?;
1060        }
1061
1062        // TODO(conflux): the caller should take care to not allow opening streams
1063        // until the conflux set is ready (i.e. until at least one of the legs completes
1064        // the handshake).
1065        //
1066        // We will probably need a channel for notifying the caller
1067        // of handshake completion/conflux set readiness
1068
1069        Ok(())
1070    }
1071
1072    /// Get the number of unlinked or non-conflux legs.
1073    #[cfg(feature = "conflux")]
1074    pub(super) fn num_unlinked(&self) -> usize {
1075        self.circuits()
1076            .filter(|circ| {
1077                let status = circ.conflux_status();
1078                status.is_none() || status == Some(ConfluxStatus::Unlinked)
1079            })
1080            .count()
1081    }
1082
1083    /// Check if the specified sequence number is the sequence number of the
1084    /// next message we're expecting to handle.
1085    pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1086        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1087        seq_recv == last_seq_delivered + 1
1088    }
1089}
1090
1091/// Get the index of the specified element in `iterator`.
1092fn element_idx<'a>(
1093    mut iterator: impl Iterator<Item = &'a Circuit>,
1094    circ_id: UniqId,
1095) -> Option<usize> {
1096    iterator.position(|circ| circ.unique_id() == circ_id)
1097}
1098
1099// TODO: replace this with Itertools::exactly_one()?
1100//
1101/// Get the only item from an iterator.
1102///
1103/// Returns an error if the iterator is empty or has more than one item.
1104fn get_single<T>(mut iterator: impl Iterator<Item = T>) -> Result<T, NotSingleError> {
1105    let Some(rv) = iterator.next() else {
1106        return Err(NotSingleError::None);
1107    };
1108    if iterator.next().is_some() {
1109        return Err(NotSingleError::Multiple);
1110    }
1111    Ok(rv)
1112}
1113
1114/// An error returned from [`get_single`].
1115#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1116pub(super) enum NotSingleError {
1117    /// The iterator had no items.
1118    #[error("the iterator had no items")]
1119    None,
1120    /// The iterator had more than one item.
1121    #[error("the iterator had more than one item")]
1122    Multiple,
1123}
1124
1125/// An error returned when a method is expecting a single-leg conflux circuit,
1126/// but it is not single-leg.
1127#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1128pub(super) enum NotSingleLegError {
1129    /// Conflux set has no legs.
1130    #[error("the conflux set has no legs")]
1131    EmptyConfluxSet,
1132    /// Conflux set is multi-path.
1133    #[error("the conflux set is multi-path")]
1134    IsMultipath,
1135}
1136
1137impl From<NotSingleLegError> for Bug {
1138    fn from(e: NotSingleLegError) -> Self {
1139        into_bad_api_usage!("not a single leg conflux set")(e)
1140    }
1141}
1142
1143impl From<NotSingleLegError> for crate::Error {
1144    fn from(e: NotSingleLegError) -> Self {
1145        Self::from(Bug::from(e))
1146    }
1147}
1148
1149impl From<NotSingleLegError> for ReactorError {
1150    fn from(e: NotSingleLegError) -> Self {
1151        Self::from(Bug::from(e))
1152    }
1153}
1154
1155impl From<NotSingleError> for NotSingleLegError {
1156    fn from(e: NotSingleError) -> Self {
1157        match e {
1158            NotSingleError::None => Self::EmptyConfluxSet,
1159            NotSingleError::Multiple => Self::IsMultipath,
1160        }
1161    }
1162}
1163
1164/// Whether the specified `cmd` counts towards the conflux sequence numbers.
1165fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
1166    // Note: copy-pasted from c-tor
1167    match cmd {
1168        // These are all fine to multiplex, and must be so that ordering is preserved
1169        RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
1170
1171        // We can't multiplex these because they are circuit-specific
1172        RelayCmd::SENDME
1173        | RelayCmd::EXTEND
1174        | RelayCmd::EXTENDED
1175        | RelayCmd::TRUNCATE
1176        | RelayCmd::TRUNCATED
1177        | RelayCmd::DROP => false,
1178
1179        //  We must multiplex RESOLVEs because their ordering impacts begin/end.
1180        RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
1181
1182        // These are all circuit-specific
1183        RelayCmd::BEGIN_DIR
1184        | RelayCmd::EXTEND2
1185        | RelayCmd::EXTENDED2
1186        | RelayCmd::ESTABLISH_INTRO
1187        | RelayCmd::ESTABLISH_RENDEZVOUS
1188        | RelayCmd::INTRODUCE1
1189        | RelayCmd::INTRODUCE2
1190        | RelayCmd::RENDEZVOUS1
1191        | RelayCmd::RENDEZVOUS2
1192        | RelayCmd::INTRO_ESTABLISHED
1193        | RelayCmd::RENDEZVOUS_ESTABLISHED
1194        | RelayCmd::INTRODUCE_ACK
1195        | RelayCmd::PADDING_NEGOTIATE
1196        | RelayCmd::PADDING_NEGOTIATED => false,
1197
1198        // Flow control cells must be ordered (see prop 329).
1199        RelayCmd::XOFF | RelayCmd::XON => true,
1200
1201        // These two are not multiplexed, because they must be processed immediately
1202        // to update sequence numbers before any other cells are processed on the circuit
1203        RelayCmd::CONFLUX_SWITCH
1204        | RelayCmd::CONFLUX_LINK
1205        | RelayCmd::CONFLUX_LINKED
1206        | RelayCmd::CONFLUX_LINKED_ACK => false,
1207
1208        _ => {
1209            tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1210            false
1211        }
1212    }
1213}
1214
1215#[cfg(test)]
1216mod test {
1217    // Tested in [`crate::tunnel::circuit::test`].
1218}