tor_proto/tunnel/reactor/
conflux.rs

1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::Arc;
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use itertools::Itertools as _;
14use slotmap_careful::SlotMap;
15use tor_rtcompat::SleepProviderExt as _;
16use tracing::warn;
17
18use tor_async_utils::SinkPrepareExt as _;
19use tor_basic_utils::flatten;
20use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
21use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
22use tor_linkspec::HasRelayIds as _;
23
24use crate::circuit::path::HopDetail;
25use crate::circuit::TunnelMutableState;
26use crate::crypto::cell::HopNum;
27use crate::tunnel::reactor::circuit::ConfluxStatus;
28use crate::tunnel::reactor::CircuitCmd;
29use crate::util::err::ReactorError;
30
31use super::circuit::CircHop;
32use super::{Circuit, CircuitAction, LegId, LegIdKey, RemoveLegReason};
33
34#[cfg(feature = "conflux")]
35use {
36    super::SendRelayCell,
37    tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
38    tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
39};
40
41#[cfg(feature = "conflux")]
42pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
43
44/// A set with one or more circuits.
45///
46/// ### Conflux set life cycle
47///
48/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
49///
50/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
51///
52/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
53/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
54///
55/// The reactor can then turn the `ConfluxSet` into a multi-path set
56/// (a multi-path set is a conflux set that contains more than 1 circuit).
57/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
58/// by the reactor user (also referred to as the "conflux handshake initiator").
59/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
60///
61/// Circuits can be removed from the set using [`ConfluxSet::remove`].
62///
63/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
64/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
65/// This can happen on an explicit shutdown request, or if a fatal error occurs.
66///
67/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
68/// For example, if after being instructed to remove a circuit from the set
69/// using [`ConfluxSet::remove`], the set is completely depleted,
70/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
71/// which will cause the reactor to shut down.
72pub(super) struct ConfluxSet {
73    /// The circuits in this conflux set.
74    legs: SlotMap<LegIdKey, Circuit>,
75    /// Tunnel state, shared with `ClientCirc`.
76    ///
77    /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
78    mutable: Arc<TunnelMutableState>,
79    /// The unique identifier of the primary leg
80    primary_id: LegIdKey,
81    /// The join point of the set, if this is a multi-path set.
82    ///
83    /// Initially the conflux set starts out as a single-path set with no join point.
84    /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
85    /// the join point is initialized to the last hop in the tunnel.
86    //
87    // TODO(conflux): for simplicity, we currently we force all legs to have the same length,
88    // to ensure the HopNum of the join point is the same for all of them.
89    //
90    // In the future we might want to relax this restriction.
91    join_point: Option<JoinPoint>,
92    /// The nonce associated with the circuits from this set.
93    #[cfg(feature = "conflux")]
94    nonce: V1Nonce,
95    /// The desired UX
96    #[cfg(feature = "conflux")]
97    desired_ux: V1DesiredUx,
98    /// The absolute sequence number of the last cell delivered to a stream.
99    ///
100    /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
101    ///
102    /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
103    /// of the leg compares the (leg-local) sequence number of the message
104    /// with this sequence number to determine whether the message is in-order.
105    ///
106    /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
107    /// to deliver it to its corresponding stream.
108    ///
109    /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
110    /// to instruct the reactor to buffer the message.
111    last_seq_delivered: Arc<AtomicU64>,
112    /// Whether we have selected our initial primary leg,
113    /// if this is a multipath conflux set.
114    selected_init_primary: bool,
115}
116
117/// The conflux join point.
118#[derive(Debug, Clone)]
119struct JoinPoint {
120    /// The hop number.
121    hop: HopNum,
122    /// The [`HopDetail`] of the hop.
123    detail: HopDetail,
124}
125
126impl ConfluxSet {
127    /// Create a new conflux set, consisting of a single leg.
128    ///
129    /// Returns the newly created set and a reference to its [`TunnelMutableState`].
130    pub(super) fn new(circuit_leg: Circuit) -> (Self, Arc<TunnelMutableState>) {
131        let mut legs: SlotMap<LegIdKey, Circuit> = SlotMap::with_key();
132        let circ_mutable = Arc::clone(circuit_leg.mutable());
133        let unique_id = circuit_leg.unique_id();
134        let primary_id = legs.insert(circuit_leg);
135        // Note: the join point is only set for multi-path tunnels
136        let join_point = None;
137
138        // TODO(conflux): read this from the consensus/config.
139        #[cfg(feature = "conflux")]
140        let desired_ux = V1DesiredUx::NO_OPINION;
141
142        let mutable = Arc::new(TunnelMutableState::default());
143        mutable.insert(unique_id, primary_id.into(), circ_mutable);
144
145        let set = Self {
146            legs,
147            primary_id,
148            join_point,
149            mutable: mutable.clone(),
150            #[cfg(feature = "conflux")]
151            nonce: V1Nonce::new(&mut rand::rng()),
152            #[cfg(feature = "conflux")]
153            desired_ux,
154            last_seq_delivered: Arc::new(AtomicU64::new(0)),
155            selected_init_primary: false,
156        };
157
158        (set, mutable)
159    }
160
161    /// Remove and return the only leg of this conflux set.
162    ///
163    /// Returns an error if there is more than one leg in the set,
164    /// or if called before any circuit legs are available.
165    ///
166    /// Calling this function will empty the [`ConfluxSet`].
167    pub(super) fn take_single_leg(&mut self) -> Result<Circuit, NotSingleLegError> {
168        let circ = get_single(self.legs.remove(self.primary_id).into_iter())?;
169        Ok(circ)
170    }
171
172    /// Return a reference to the only leg of this conflux set,
173    /// along with the leg's ID.
174    ///
175    /// Returns an error if there is more than one leg in the set,
176    /// or if called before any circuit legs are available.
177    pub(super) fn single_leg(&self) -> Result<(LegId, &Circuit), NotSingleLegError> {
178        let (circ_id, circ) = get_single(self.legs.iter())?;
179        Ok((LegId(circ_id), circ))
180    }
181
182    /// Return a mutable reference to the only leg of this conflux set,
183    /// along with the leg's ID.
184    ///
185    /// Returns an error if there is more than one leg in the set,
186    /// or if called before any circuit legs are available.
187    pub(super) fn single_leg_mut(&mut self) -> Result<(LegId, &mut Circuit), NotSingleLegError> {
188        let (circ_id, circ) = get_single(self.legs.iter_mut())?;
189        Ok((LegId(circ_id), circ))
190    }
191
192    /// Return the primary leg of this conflux set.
193    ///
194    /// Returns an error if called before any circuit legs are available.
195    pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
196        // TODO(conflux): support more than one leg,
197        // and remove this check
198        if self.legs.len() > 1 {
199            return Err(internal!("multipath not currently supported"));
200        }
201
202        if self.legs.is_empty() {
203            Err(bad_api_usage!(
204                "tried to get circuit leg before creating it?!"
205            ))
206        } else {
207            // TODO(conflux): implement primary leg selection
208            let circ = self
209                .legs
210                .get_mut(self.primary_id)
211                .ok_or_else(|| internal!("slotmap is empty?!"))?;
212
213            Ok(circ)
214        }
215    }
216
217    /// Return a reference to the leg of this conflux set with the given id.
218    pub(super) fn leg(&self, leg_id: LegId) -> Option<&Circuit> {
219        self.legs.get(leg_id.0)
220    }
221
222    /// Return a mutable reference to the leg of this conflux set with the given id.
223    pub(super) fn leg_mut(&mut self, leg_id: LegId) -> Option<&mut Circuit> {
224        self.legs.get_mut(leg_id.0)
225    }
226
227    /// Return the LegId of the primary leg.
228    pub(super) fn primary_leg_id(&self) -> LegId {
229        LegId(self.primary_id)
230    }
231
232    /// Return the number of legs in this conflux set.
233    pub(super) fn len(&self) -> usize {
234        self.legs.len()
235    }
236
237    /// Return whether this conflux set is empty.
238    pub(super) fn is_empty(&self) -> bool {
239        self.legs.len() == 0
240    }
241
242    /// Remove the specified leg from this conflux set.
243    ///
244    /// Returns an error if the given leg doesn't exist in the set.
245    ///
246    /// Returns an error instructing the reactor to perform a clean shutdown
247    /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
248    ///
249    ///   * the set is depleted (empty) after removing the specified leg
250    ///   * `leg` is currently the sending (primary) leg of this set
251    ///   * the closed leg had the highest non-zero last_seq_recv/sent
252    ///   * the closed leg had some in-progress data (inflight > cc_sendme_inc)
253    ///
254    /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
255    ///
256    /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
257    pub(super) fn remove(&mut self, leg: LegIdKey) -> Result<Circuit, ReactorError> {
258        let circ = self
259            .legs
260            .remove(leg)
261            .ok_or_else(|| bad_api_usage!("leg {leg:?} not found in conflux set"))?;
262
263        self.mutable.remove(circ.unique_id());
264
265        if self.legs.is_empty() {
266            // The last circuit in the set has just died, so the reactor should exit.
267            return Err(ReactorError::Shutdown);
268        }
269
270        if leg == self.primary_id {
271            // We have just removed our sending leg,
272            // so it's time to close the entire conflux set.
273            return Err(ReactorError::Shutdown);
274        }
275
276        cfg_if::cfg_if! {
277            if #[cfg(feature = "conflux")] {
278                self.remove_conflux(circ)
279            } else {
280                // Conflux is disabled, so we can't possible continue running if the only
281                // leg in the tunnel is gone.
282                //
283                // Technically this should be unreachable (because of the is_empty()
284                // check above)
285                return Err(internal!("Multiple legs in single-path tunnel?!").into());
286            }
287        }
288    }
289
290    /// Handle the removal of a circuit,
291    /// returning an error if the reactor needs to shut down.
292    #[cfg(feature = "conflux")]
293    fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
294        let Some(status) = circ.conflux_status() else {
295            return Err(internal!("Found non-conflux circuit in conflux set?!").into());
296        };
297
298        // TODO(conflux): should the circmgr be notified about the leg removal?
299        //
300        // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
301        // is closed, subject to the limits in [SIDE_CHANNELS]."
302
303        // If we've reached this point and the conflux set is non-empty,
304        // it means it's a multi-path set.
305        //
306        // Time to check if we need to tear down the entire set.
307        match status {
308            ConfluxStatus::Unlinked => {
309                // This circuit hasn't yet begun the conflux handshake,
310                // so we can safely remove it from the set
311                Ok(circ)
312            }
313            ConfluxStatus::Pending | ConfluxStatus::Linked => {
314                let (circ_last_seq_recv, circ_last_seq_sent) =
315                    (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
316
317                // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
318                if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
319                    if circ_last_seq_recv > max_last_seq_recv {
320                        return Err(ReactorError::Shutdown);
321                    }
322                }
323
324                if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
325                    if circ_last_seq_sent > max_last_seq_sent {
326                        return Err(ReactorError::Shutdown);
327                    }
328                }
329
330                let hop = self.join_point_hop(&circ)?;
331
332                let (inflight, cwnd) = (|| {
333                    let ccontrol = hop.ccontrol();
334                    let inflight = ccontrol.inflight()?;
335                    let cwnd = ccontrol.cwnd()?;
336
337                    Some((inflight, cwnd))
338                })()
339                .ok_or_else(|| {
340                    internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
341                })?;
342
343                // If data is in progress on the leg (inflight > cc_sendme_inc),
344                // then all legs must be closed
345                if inflight >= cwnd.params().sendme_inc() {
346                    return Err(ReactorError::Shutdown);
347                }
348
349                Ok(circ)
350            }
351        }
352    }
353
354    /// Return the maximum relative last_seq_recv across all circuits.
355    #[cfg(feature = "conflux")]
356    fn max_last_seq_recv(&self) -> Option<u64> {
357        self.legs
358            .iter()
359            .filter_map(|(_id, leg)| leg.last_seq_recv().ok())
360            .max()
361    }
362
363    /// Return the maximum relative last_seq_sent across all circuits.
364    #[cfg(feature = "conflux")]
365    fn max_last_seq_sent(&self) -> Option<u64> {
366        self.legs
367            .iter()
368            .filter_map(|(_id, leg)| leg.last_seq_sent().ok())
369            .max()
370    }
371
372    /// Get the [`CircHop`] of the join point on the specified `circ`,
373    /// returning an error if this is a single path conflux set.
374    fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
375        let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
376            return Err(internal!("No join point on conflux tunnel?!"));
377        };
378
379        circ.hop(join_point)
380            .ok_or_else(|| internal!("Conflux join point disappeared?!"))
381    }
382
383    /// Return an iterator of all circuits in the conflux set.
384    fn circuits(&self) -> impl Iterator<Item = &Circuit> {
385        self.legs.iter().map(|(_id, leg)| leg)
386    }
387
388    /// Add legs to the this conflux set.
389    ///
390    // TODO(conflux): update this with the latest changes from torspec!369
391    ///
392    /// Returns an error if any of the legs are invalid,
393    /// or if adding the legs would cause the conflux set to contain
394    /// any circuits that have the same hop in the middle and guard positions
395    /// (legs of the form `G1 - M1 - E` and `G2 - M1 - E`,
396    /// or `G1 - M1 -E` and `G1 - M2 - E ` aren't allowed to be part
397    /// of the same conflux set).
398    ///
399    /// A leg is considered valid if
400    ///
401    ///   * the circuit has the same length as all the other circuits in the set
402    ///   * its last hop is equal to the designated join point
403    ///   * the circuit has no streams attached to any of its hops
404    ///   * the circuit is not already part of a conflux set
405    ///
406    /// Note: the circuits will not begin linking until
407    /// [`link_circuits`](Self::link_circuits) is called.
408    #[cfg(feature = "conflux")]
409    pub(super) fn add_legs(
410        &mut self,
411        legs: Vec<Circuit>,
412        runtime: &tor_rtcompat::DynTimeProvider,
413    ) -> Result<(), Bug> {
414        if legs.is_empty() {
415            return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
416        }
417
418        let join_point = match self.join_point.take() {
419            Some(p) => {
420                // Preserve the existing join point, if there is one.
421                p
422            }
423            None => {
424                let (hop, detail) = (|| {
425                    let first_leg = legs.first()?;
426                    let all_hops = first_leg.path().all_hops();
427                    let hop = first_leg.last_hop_num()?;
428                    let detail = all_hops.last()?;
429                    Some((hop, detail.clone()))
430                })()
431                .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
432
433                JoinPoint { hop, detail }
434            }
435        };
436
437        // Check two HopDetails for equality.
438        let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
439            match (h1, h2) {
440                (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => t1.same_relay_ids(t2),
441                (HopDetail::Virtual, HopDetail::Virtual) => {
442                    // TODO(conflux): HopDetail::Virtual are always considered equal,
443                    // but that's not exactly right. We should resolve the TODO
444                    // from HopDetail::Virtual, and store some additional context
445                    // for differentiating virtual hops
446                    true
447                }
448                _ => false,
449            }
450        };
451
452        // Check if the last hop of leg is the same as the one from the first hop.
453        let hop_eq_joint_point = |hop: Option<&HopDetail>| {
454            hop.map(|hop| hops_eq(hop, &join_point.detail))
455                .unwrap_or_default()
456        };
457
458        // A leg is considered valid if
459        //
460        //   * the circuit has the expected length
461        //     (the length of the first circuit we added to the set)
462        //   * its last hop is equal to the designated join point
463        //     (the last hop of the first circuit we added)
464        //   * the circuit has no streams attached to any of its hops
465        //   * the circuit is not already part of a conflux tunnel
466        let leg_is_valid = |leg: &Circuit| {
467            leg.last_hop_num() == Some(join_point.hop)
468                && hop_eq_joint_point(leg.path().all_hops().last())
469                && !leg.has_streams()
470                && leg.conflux_status().is_none()
471        };
472
473        if !legs.iter().all(leg_is_valid) {
474            return Err(bad_api_usage!("one more more conflux circuits are invalid"));
475        }
476
477        let check_legs_disjoint = |(leg1, leg2): (&Circuit, &Circuit)| {
478            // TODO(conflux): add a new Path API for getting an iterator over HopDetail.
479            let path1 = leg1.path().all_hops();
480            let path1_except_last = path1.iter().dropping_back(1);
481            let path2 = leg2.path().all_hops();
482            let path2_except_last = path2.iter().dropping_back(1);
483
484            // At this point we've already validated the lengths of the new legs,
485            // so we know they all have the same length.
486            path1_except_last
487                .zip(path2_except_last)
488                .all(|(h1, h2)| !hops_eq(h1, h2))
489        };
490
491        // TODO(conflux): reduce unnecessary iteration over `legs`
492        // without hurting readability
493
494        // Ensure the legs don't share guard or middle relays
495        //
496        // TODO(conflux): is this right?
497        // It means we allow legs of the form
498        //
499        //  N1 --- N2 ----
500        //                \
501        //                 E
502        //  N2 --- N1-----/
503        //
504        //  But I think that's alright.
505        if !self
506            .circuits()
507            .chain(legs.iter())
508            .cartesian_product(legs.iter())
509            .all(check_legs_disjoint)
510        {
511            return Err(bad_api_usage!(
512                "conflux circuits must not share hops in the same position"
513            ));
514        }
515
516        // Select a join point, or put the existing one back into self.
517        self.join_point = Some(join_point.clone());
518
519        // The legs are valid, so add them to the set.
520        for mut circ in legs {
521            let conflux_handler = ConfluxMsgHandler::new_client(
522                join_point.hop,
523                self.nonce,
524                Arc::clone(&self.last_seq_delivered),
525                runtime.clone(),
526            );
527
528            circ.install_conflux_handler(conflux_handler);
529            let mutable = Arc::clone(circ.mutable());
530            let unique_id = circ.unique_id();
531            let leg_id = self.legs.insert(circ);
532            // Merge the mutable state of the circuit into our tunnel state.
533            self.mutable.insert(unique_id, leg_id.into(), mutable);
534        }
535
536        Ok(())
537    }
538
539    /// Try to update the primary leg based on the configured desired UX,
540    /// if needed.
541    ///
542    /// Returns the SWITCH cell to send on the primary leg,
543    /// if we switched primary leg.
544    #[cfg(feature = "conflux")]
545    pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
546        use tor_error::into_internal;
547
548        let Some(join_point) = self.join_point.as_ref() else {
549            // Return early if this is not a multi-path tunnel
550            return Ok(None);
551        };
552
553        let join_point = join_point.hop;
554
555        if !self.should_update_primary_leg() {
556            // Nothing to do
557            return Ok(None);
558        }
559
560        let Some(new_primary_id) = self.select_primary_leg()? else {
561            // None of the legs satisfy our UX requirements, continue using the existing one.
562            return Ok(None);
563        };
564
565        // Check that the newly selected leg is actually different from the previous
566        if self.primary_id == new_primary_id {
567            // The primary leg stays the same, nothing to do.
568            return Ok(None);
569        }
570
571        let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
572        self.primary_id = new_primary_id;
573        let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
574
575        // If this fails, it means we haven't updated our primary leg in a very long time.
576        //
577        // TODO(conflux): there are currently no safeguards to prevent us from staying
578        // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
579        // such that it forces us to switch legs periodically, to prevent the seqno delta from
580        // getting too big?
581        let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
582            into_internal!("Seqno delta for switch does not fit in u32?!"),
583        )?;
584
585        let switch = ConfluxSwitch::new(seqno_delta);
586        let cell = AnyRelayMsgOuter::new(None, switch.into());
587        Ok(Some(SendRelayCell {
588            hop: join_point,
589            early: false,
590            cell,
591        }))
592    }
593
594    /// Whether it's time to select a new primary leg.
595    #[cfg(feature = "conflux")]
596    fn should_update_primary_leg(&mut self) -> bool {
597        if !self.selected_init_primary {
598            self.maybe_select_init_primary();
599            return false;
600        }
601
602        // If we don't have at least 2 legs,
603        // we can't switch our primary leg.
604        if self.legs.len() < 2 {
605            return false;
606        }
607
608        // TODO(conflux-tuning): if it turns out we switch legs too frequently,
609        // we might want to implement some sort of rate-limiting here
610        // (see c-tor's conflux_can_switch).
611
612        true
613    }
614
615    /// Return the best leg according to the configured desired UX.
616    ///
617    /// Returns `None` if no suitable was found.
618    #[cfg(feature = "conflux")]
619    fn select_primary_leg(&self) -> Result<Option<LegIdKey>, Bug> {
620        match self.desired_ux {
621            V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
622                self.select_primary_leg_min_rtt(false)
623            }
624            V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
625            V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
626                // TODO(conflux-tuning): add support for low-memory algorithms
627                self.select_primary_leg_min_rtt(false)
628            }
629            _ => {
630                // Default to MIN_RTT if we don't recognized the desired UX value
631                warn!(
632                    "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
633                    self.desired_ux
634                );
635                self.select_primary_leg_min_rtt(false)
636            }
637        }
638    }
639
640    /// Try to choose an initial primary leg, if we have an initial RTT measurement
641    /// for at least one of the legs.
642    #[cfg(feature = "conflux")]
643    fn maybe_select_init_primary(&mut self) {
644        let best = self
645            .legs
646            .iter()
647            .filter_map(|(id, leg)| leg.init_rtt().map(|rtt| (LegId(id), rtt)))
648            .min_by_key(|(_leg_id, rtt)| *rtt)
649            .map(|(leg_id, _rtt)| leg_id.0);
650
651        if let Some(best) = best {
652            self.primary_id = best;
653            self.selected_init_primary = true;
654        }
655    }
656
657    /// Return the leg with the best (lowest) RTT.
658    ///
659    /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
660    ///
661    /// Returns `None` if no suitable was found.
662    #[cfg(feature = "conflux")]
663    fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<LegIdKey>, Bug> {
664        let mut best = None;
665
666        for (leg_id, circ) in self.legs.iter() {
667            let join_point = self.join_point_hop(circ)?;
668            let ccontrol = join_point.ccontrol();
669
670            if check_can_send && !ccontrol.can_send() {
671                continue;
672            }
673
674            let rtt = ccontrol.rtt();
675            let ewma_rtt = rtt.ewma_rtt_usec();
676
677            match best.take() {
678                None => {
679                    best = Some((leg_id, ewma_rtt));
680                }
681                Some(best_so_far) => {
682                    if best_so_far.1 < ewma_rtt {
683                        best = Some(best_so_far);
684                    } else {
685                        best = Some((leg_id, ewma_rtt));
686                    }
687                }
688            }
689        }
690
691        Ok(best.map(|(leg_id, _)| leg_id))
692    }
693
694    /// Returns the next ready [`CircuitAction`],
695    /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
696    ///
697    /// Will return an error if there are no circuits in this set,
698    /// or other internal errors occur.
699    ///
700    /// This is cancellation-safe.
701    pub(super) fn next_circ_action<'a>(
702        &'a mut self,
703        runtime: &'a tor_rtcompat::DynTimeProvider,
704    ) -> impl Future<Output = Result<CircuitAction, crate::Error>> + 'a {
705        self.legs
706            .iter_mut()
707            .map(|(leg_id, leg)| {
708                // The client SHOULD abandon and close circuit if the LINKED message takes too long to
709                // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
710                // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
711                // implementation currently uses Circuit Build Timeout).
712                let conflux_hs_timeout = if let Some(timeout) =  leg.conflux_hs_timeout() {
713                    // TODO: ask Diziet if we can have a sleep_until_instant() function
714                    Box::pin(runtime.sleep_until_wallclock(timeout)) as Pin<Box<dyn Future<Output = ()> + Send>>
715                } else {
716                    Box::pin(std::future::pending())
717                };
718
719                let mut ready_streams = leg.ready_streams_iterator();
720                let input = &mut leg.input;
721                let primary_id = self.primary_id;
722                let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
723                // TODO: we don't really need prepare_send_from here
724                // because the inner select_biased! is cancel-safe.
725                // We should replace this with a simple sink readiness check
726                let send_fut = leg.chan_sender.prepare_send_from(async move {
727                    // A future to wait for the next ready stream.
728                    let next_ready_stream = async {
729                        match ready_streams.next().await {
730                            Some(x) => x,
731                            None => {
732                                // There are no ready streams (for example, they may all be
733                                // blocked due to congestion control), so there is nothing
734                                // to do.
735                                // We await an infinitely pending future so that we don't
736                                // immediately return a `None` in the `select_biased!` below.
737                                // We'd rather wait on `input.next()` than immediately return with
738                                // no `CircuitAction`, which could put the reactor into a spin loop.
739                                let () = std::future::pending().await;
740                                unreachable!();
741                            }
742                        }
743                    };
744
745                    // NOTE: the stream returned by this function is polled in the select_biased!
746                    // from Reactor::run_once(), so each block from *this* select_biased! must be
747                    // cancellation-safe
748                    select_biased! {
749                        // Check whether we've got an input message pending.
750                        ret = input.next().fuse() => {
751                            let Some(cell) = ret else {
752                                return Ok(CircuitAction::RemoveLeg {
753                                    leg: leg_id,
754                                    reason: RemoveLegReason::ChannelClosed,
755                                });
756                            };
757
758                            Ok(CircuitAction::HandleCell { leg: leg_id, cell })
759                        },
760                        ret = next_ready_stream.fuse() => {
761                            let ret = ret.map(|cmd| {
762                                // TODO(conflux): refactor this spaghetti
763                                let leg = if let Some(join_point) = conflux_join_point {
764                                    match &cmd {
765                                        CircuitCmd::Send(send) => {
766                                            // Conflux circuits always send multiplexed relay commands to
767                                            // to the last hop (the join point).
768                                            if cmd_counts_towards_seqno(send.cell.cmd()) {
769                                                if send.hop != join_point {
770                                                    return Err(crate::Error::Bug(internal!(
771                                                        "Leaky pipe on conflux circuit?! (target_hop={}, join_point={})",
772                                                        send.hop.display(),
773                                                        join_point.display(),
774                                                    )));
775                                                }
776                                                // TODO(conflux): validate the hop? We should
777                                                // ensure the target hop is the join point, and
778                                                // error otherwise?
779
780                                                primary_id
781                                            } else {
782                                                // Non-multiplexed commands go on their original
783                                                // circuit and hop
784                                                leg_id
785                                            }
786                                        }
787                                        // The leg_id doesn't need to change (or doesn't matter)
788                                        // for these other commands.
789                                        CircuitCmd::HandleSendMe { .. } | CircuitCmd::CloseStream { .. }
790                                        | CircuitCmd::CleanShutdown => leg_id,
791                                        #[cfg(feature = "conflux")]
792                                        CircuitCmd::ConfluxRemove(_) | CircuitCmd::ConfluxHandshakeComplete (_) | CircuitCmd::Enqueue(_) => leg_id,
793                                    }
794                                } else {
795                                    // If there is no join point, it means this is not
796                                    // a multi-path tunnel, so we continue using
797                                    // the leg_id/hop the cmd came from.
798                                    leg_id
799                                };
800
801                                Ok(CircuitAction::RunCmd { leg, cmd })
802                            });
803
804                            flatten(ret)
805                        },
806                    }
807                });
808
809                let mut send_fut = Box::pin(send_fut);
810
811                async move {
812                    select_biased! {
813                        () = conflux_hs_timeout.fuse() => {
814                            // Conflux handshake has timed out, time to remove this circuit leg,
815                            // and notify the handshake initiator.
816                            Ok(Ok(CircuitAction::RemoveLeg {
817                                leg: leg_id,
818                                reason: RemoveLegReason::ConfluxHandshakeTimeout,
819                            }))
820                        }
821                        ret = send_fut => {
822                            // Note: We don't actually use the returned SinkSendable,
823                            // and continue writing to the SometimesUboundedSink in the reactor :(
824                            ret.map(|ret| ret.0)
825                        }
826                    }
827                }
828            })
829            .collect::<FuturesUnordered<_>>()
830            // We only return the first ready action as a Future.
831            // Can't use `next()` since it borrows the stream.
832            .into_future()
833            .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
834            // Clean up the nested `Result`s before returning to the caller.
835            .map(|res| flatten(flatten(res)))
836    }
837
838    /// The join point on the current primary leg.
839    pub(super) fn primary_join_point(&self) -> Option<(LegId, HopNum)> {
840        self.join_point
841            .as_ref()
842            .map(|join_point| (LegId(self.primary_id), join_point.hop))
843    }
844
845    /// Does congestion control use stream SENDMEs for the given hop?
846    ///
847    /// Returns `None` if either the `leg` or `hop` don't exist.
848    pub(super) fn uses_stream_sendme(&self, leg: LegId, hop: HopNum) -> Option<bool> {
849        self.leg(leg)?.uses_stream_sendme(hop)
850    }
851
852    /// Send a LINK cell down each unlinked leg.
853    #[cfg(feature = "conflux")]
854    pub(super) async fn link_circuits(
855        &mut self,
856        runtime: &tor_rtcompat::DynTimeProvider,
857    ) -> crate::Result<()> {
858        let (_leg_id, join_point) = self
859            .primary_join_point()
860            .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
861
862        // Link all the circuits that haven't started the conflux handshake yet.
863        for (_, circ) in self
864            .legs
865            .iter_mut()
866            // TODO: it is an internal error if any of the legs don't have a conflux handler
867            // (i.e. if conflux_status() returns None)
868            .filter(|(_, circ)| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
869        {
870            let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
871            let link = ConfluxLink::new(v1_payload);
872            let cell = AnyRelayMsgOuter::new(None, link.into());
873
874            circ.begin_conflux_link(join_point, cell, runtime).await?;
875        }
876
877        // TODO(conflux): the caller should take care to not allow opening streams
878        // until the conflux set is ready (i.e. until at least one of the legs completes
879        // the handshake).
880        //
881        // We will probably need a channel for notifying the caller
882        // of handshake completion/conflux set readiness
883
884        Ok(())
885    }
886
887    /// Check if the specified sequence number is the sequence number of the
888    /// next message we're expecting to handle.
889    pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
890        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
891        seq_recv == last_seq_delivered + 1
892    }
893}
894
895// TODO(conflux): replace this with Itertools::exactly_one()?
896//
897/// Get the only item from an iterator.
898///
899/// Returns an error if the iterator is empty or has more than one item.
900fn get_single<T>(mut iterator: impl Iterator<Item = T>) -> Result<T, NotSingleError> {
901    let Some(rv) = iterator.next() else {
902        return Err(NotSingleError::None);
903    };
904    if iterator.next().is_some() {
905        return Err(NotSingleError::Multiple);
906    }
907    Ok(rv)
908}
909
910/// An error returned from [`get_single`].
911#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
912pub(super) enum NotSingleError {
913    /// The iterator had no items.
914    #[error("the iterator had no items")]
915    None,
916    /// The iterator had more than one item.
917    #[error("the iterator had more than one item")]
918    Multiple,
919}
920
921/// An error returned when a method is expecting a single-leg conflux circuit,
922/// but it is not single-leg.
923#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
924pub(super) enum NotSingleLegError {
925    /// Conflux set has no legs.
926    #[error("the conflux set has no legs")]
927    EmptyConfluxSet,
928    /// Conflux set is multi-path.
929    #[error("the conflux set is multi-path")]
930    IsMultipath,
931}
932
933impl From<NotSingleLegError> for Bug {
934    fn from(e: NotSingleLegError) -> Self {
935        into_bad_api_usage!("not a single leg conflux set")(e)
936    }
937}
938
939impl From<NotSingleLegError> for crate::Error {
940    fn from(e: NotSingleLegError) -> Self {
941        Self::from(Bug::from(e))
942    }
943}
944
945impl From<NotSingleLegError> for ReactorError {
946    fn from(e: NotSingleLegError) -> Self {
947        Self::from(Bug::from(e))
948    }
949}
950
951impl From<NotSingleError> for NotSingleLegError {
952    fn from(e: NotSingleError) -> Self {
953        match e {
954            NotSingleError::None => Self::EmptyConfluxSet,
955            NotSingleError::Multiple => Self::IsMultipath,
956        }
957    }
958}
959
960/// Whether the specified `cmd` counts towards the conflux sequence numbers.
961fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
962    // Note: copy-pasted from c-tor
963    match cmd {
964        // These are all fine to multiplex, and must be so that ordering is preserved
965        RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
966
967        // We can't multiplex these because they are circuit-specific
968        RelayCmd::SENDME
969        | RelayCmd::EXTEND
970        | RelayCmd::EXTENDED
971        | RelayCmd::TRUNCATE
972        | RelayCmd::TRUNCATED
973        | RelayCmd::DROP => false,
974
975        //  We must multiplex RESOLVEs because their ordering impacts begin/end.
976        RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
977
978        // These are all circuit-specific
979        RelayCmd::BEGIN_DIR
980        | RelayCmd::EXTEND2
981        | RelayCmd::EXTENDED2
982        | RelayCmd::ESTABLISH_INTRO
983        | RelayCmd::ESTABLISH_RENDEZVOUS
984        | RelayCmd::INTRODUCE1
985        | RelayCmd::INTRODUCE2
986        | RelayCmd::RENDEZVOUS1
987        | RelayCmd::RENDEZVOUS2
988        | RelayCmd::INTRO_ESTABLISHED
989        | RelayCmd::RENDEZVOUS_ESTABLISHED
990        | RelayCmd::INTRODUCE_ACK
991        | RelayCmd::PADDING_NEGOTIATE
992        | RelayCmd::PADDING_NEGOTIATED => false,
993
994        // TODO(cc): XOFF/XON
995
996        // These two are not multiplexed, because they must be processed immediately
997        // to update sequence numbers before any other cells are processed on the circuit
998        RelayCmd::CONFLUX_SWITCH
999        | RelayCmd::CONFLUX_LINK
1000        | RelayCmd::CONFLUX_LINKED
1001        | RelayCmd::CONFLUX_LINKED_ACK => false,
1002
1003        _ => {
1004            tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1005            false
1006        }
1007    }
1008}