tor_proto/client/reactor/
conflux.rs

1//! Conflux-related functionality
2
3// TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
4//
5// See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
6#![allow(unstable_name_collisions)]
7
8#[cfg(feature = "conflux")]
9pub(crate) mod msghandler;
10
11use std::pin::Pin;
12use std::sync::atomic::{self, AtomicU64};
13use std::sync::{Arc, Mutex};
14
15use futures::{FutureExt as _, StreamExt, select_biased};
16use itertools::Itertools;
17use itertools::structs::ExactlyOneError;
18use smallvec::{SmallVec, smallvec};
19use tor_rtcompat::{SleepProvider as _, SleepProviderExt as _};
20use tracing::{info, instrument, trace, warn};
21
22use tor_cell::relaycell::AnyRelayMsgOuter;
23use tor_error::{Bug, bad_api_usage, internal};
24use tor_linkspec::HasRelayIds as _;
25
26use crate::circuit::UniqId;
27use crate::circuit::circhop::SendRelayCell;
28use crate::client::circuit::TunnelMutableState;
29#[cfg(feature = "circ-padding")]
30use crate::client::circuit::padding::PaddingEvent;
31use crate::client::circuit::path::HopDetail;
32use crate::conflux::cmd_counts_towards_seqno;
33use crate::conflux::msghandler::{ConfluxStatus, RemoveLegReason};
34use crate::congestion::params::CongestionWindowParams;
35use crate::crypto::cell::HopNum;
36use crate::streammap;
37use crate::tunnel::TunnelId;
38use crate::util::err::ReactorError;
39use crate::util::poll_all::PollAll;
40use crate::util::tunnel_activity::TunnelActivity;
41
42use super::circuit::CircHop;
43use super::{Circuit, CircuitAction};
44
45#[cfg(feature = "conflux")]
46use {
47    crate::conflux::msghandler::ConfluxMsgHandler,
48    msghandler::ClientConfluxMsgHandler,
49    tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
50    tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
51};
52
53/// The maximum number of conflux legs to store in the conflux set SmallVec.
54///
55/// Attempting to store more legs will cause the SmallVec to spill to the heap.
56///
57/// Note: this value was picked arbitrarily and may not be suitable.
58const MAX_CONFLUX_LEGS: usize = 16;
59
60/// The number of futures we add to the per-circuit [`PollAll`] future in
61/// [`ConfluxSet::next_circ_action`].
62///
63/// Used for the SmallVec size estimate;
64const NUM_CIRC_FUTURES: usize = 2;
65
66/// The expected number of circuit actions to be returned from
67/// [`ConfluxSet::next_circ_action`]
68const CIRC_ACTION_COUNT: usize = MAX_CONFLUX_LEGS * NUM_CIRC_FUTURES;
69
70/// A set with one or more circuits.
71///
72/// ### Conflux set life cycle
73///
74/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
75///
76/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
77///
78/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
79/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
80///
81/// The reactor can then turn the `ConfluxSet` into a multi-path set
82/// (a multi-path set is a conflux set that contains more than 1 circuit).
83/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
84/// by the reactor user (also referred to as the "conflux handshake initiator").
85/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
86///
87/// Circuits can be removed from the set using [`ConfluxSet::remove`].
88///
89/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
90/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
91/// This can happen on an explicit shutdown request, or if a fatal error occurs.
92///
93/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
94/// For example, if after being instructed to remove a circuit from the set
95/// using [`ConfluxSet::remove`], the set is completely depleted,
96/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
97/// which will cause the reactor to shut down.
98pub(super) struct ConfluxSet {
99    /// The unique identifier of the tunnel this conflux set belongs to.
100    ///
101    /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
102    /// that gets used for logging purposes.
103    tunnel_id: TunnelId,
104    /// The circuits in this conflux set.
105    legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
106    /// Tunnel state, shared with `ClientCirc`.
107    ///
108    /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
109    mutable: Arc<TunnelMutableState>,
110    /// The unique identifier of the primary leg
111    primary_id: UniqId,
112    /// The join point of the set, if this is a multi-path set.
113    ///
114    /// Initially the conflux set starts out as a single-path set with no join point.
115    /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
116    /// the join point is initialized to the last hop in the tunnel.
117    //
118    // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
119    // to ensure the HopNum of the join point is the same for all of them.
120    //
121    // In the future we might want to relax this restriction.
122    join_point: Option<JoinPoint>,
123    /// The nonce associated with the circuits from this set.
124    #[cfg(feature = "conflux")]
125    nonce: V1Nonce,
126    /// The desired UX
127    #[cfg(feature = "conflux")]
128    desired_ux: V1DesiredUx,
129    /// The absolute sequence number of the last cell delivered to a stream.
130    ///
131    /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
132    ///
133    /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
134    /// of the leg compares the (leg-local) sequence number of the message
135    /// with this sequence number to determine whether the message is in-order.
136    ///
137    /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
138    /// to deliver it to its corresponding stream.
139    ///
140    /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
141    /// to instruct the reactor to buffer the message.
142    last_seq_delivered: Arc<AtomicU64>,
143    /// Whether we have selected our initial primary leg,
144    /// if this is a multipath conflux set.
145    selected_init_primary: bool,
146}
147
148/// The conflux join point.
149#[derive(Clone, derive_more::Debug)]
150struct JoinPoint {
151    /// The hop number.
152    hop: HopNum,
153    /// The [`HopDetail`] of the hop.
154    detail: HopDetail,
155    /// The stream map of the joint point, shared with each circuit leg.
156    #[debug(skip)]
157    streams: Arc<Mutex<streammap::StreamMap>>,
158}
159
160impl ConfluxSet {
161    /// Create a new conflux set, consisting of a single leg.
162    ///
163    /// Returns the newly created set and a reference to its [`TunnelMutableState`].
164    pub(super) fn new(
165        tunnel_id: TunnelId,
166        circuit_leg: Circuit,
167    ) -> (Self, Arc<TunnelMutableState>) {
168        let primary_id = circuit_leg.unique_id();
169        let circ_mutable = Arc::clone(circuit_leg.mutable());
170        let legs = smallvec![circuit_leg];
171        // Note: the join point is only set for multi-path tunnels
172        let join_point = None;
173
174        // TODO(#2035): read this from the consensus/config.
175        #[cfg(feature = "conflux")]
176        let desired_ux = V1DesiredUx::NO_OPINION;
177
178        let mutable = Arc::new(TunnelMutableState::default());
179        mutable.insert(primary_id, circ_mutable);
180
181        let set = Self {
182            tunnel_id,
183            legs,
184            primary_id,
185            join_point,
186            mutable: mutable.clone(),
187            #[cfg(feature = "conflux")]
188            nonce: V1Nonce::new(&mut rand::rng()),
189            #[cfg(feature = "conflux")]
190            desired_ux,
191            last_seq_delivered: Arc::new(AtomicU64::new(0)),
192            selected_init_primary: false,
193        };
194
195        (set, mutable)
196    }
197
198    /// Remove and return the only leg of this conflux set.
199    ///
200    /// Returns an error if there is more than one leg in the set,
201    /// or if called before any circuit legs are available.
202    ///
203    /// Calling this function will empty the [`ConfluxSet`].
204    pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
205        let circ = self
206            .legs
207            .iter()
208            .exactly_one()
209            .map_err(NotSingleLegError::from)?;
210        let circ_id = circ.unique_id();
211
212        debug_assert!(circ_id == self.primary_id);
213
214        self.remove_unchecked(circ_id)
215    }
216
217    /// Return a reference to the only leg of this conflux set,
218    /// along with the leg's ID.
219    ///
220    /// Returns an error if there is more than one leg in the set,
221    /// or if called before any circuit legs are available.
222    pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
223        Ok(self.legs.iter().exactly_one()?)
224    }
225
226    /// Return a mutable reference to the only leg of this conflux set,
227    /// along with the leg's ID.
228    ///
229    /// Returns an error if there is more than one leg in the set,
230    /// or if called before any circuit legs are available.
231    pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
232        Ok(self.legs.iter_mut().exactly_one()?)
233    }
234
235    /// Return the primary leg of this conflux set.
236    ///
237    /// Returns an error if called before any circuit legs are available.
238    pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
239        #[cfg(not(feature = "conflux"))]
240        if self.legs.len() > 1 {
241            return Err(internal!(
242                "got multipath tunnel, but conflux feature is disabled?!"
243            ));
244        }
245
246        if self.legs.is_empty() {
247            Err(bad_api_usage!(
248                "tried to get circuit leg before creating it?!"
249            ))
250        } else {
251            let circ = self
252                .leg_mut(self.primary_id)
253                .ok_or_else(|| internal!("conflux set is empty?!"))?;
254
255            Ok(circ)
256        }
257    }
258
259    /// Return a reference to the leg of this conflux set with the given id.
260    pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
261        self.legs.iter().find(|circ| circ.unique_id() == leg_id)
262    }
263
264    /// Return a mutable reference to the leg of this conflux set with the given id.
265    pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
266        self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
267    }
268
269    /// Return the number of legs in this conflux set.
270    pub(super) fn len(&self) -> usize {
271        self.legs.len()
272    }
273
274    /// Return whether this conflux set is empty.
275    pub(super) fn is_empty(&self) -> bool {
276        self.legs.len() == 0
277    }
278
279    /// Remove the specified leg from this conflux set.
280    ///
281    /// Returns an error if the given leg doesn't exist in the set.
282    ///
283    /// Returns an error instructing the reactor to perform a clean shutdown
284    /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
285    ///
286    ///   * the set is depleted (empty) after removing the specified leg
287    ///   * `leg` is currently the sending (primary) leg of this set
288    ///   * the closed leg had the highest non-zero last_seq_recv/sent
289    ///   * the closed leg had some in-progress data (inflight > cc_sendme_inc)
290    ///
291    /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
292    ///
293    /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
294    #[instrument(level = "trace", skip_all)]
295    pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
296        let circ = self.remove_unchecked(leg)?;
297
298        tracing::trace!(
299            circ_id = %circ.unique_id(),
300            "Circuit removed from conflux set"
301        );
302
303        self.mutable.remove(circ.unique_id());
304
305        if self.legs.is_empty() {
306            // TODO: log the tunnel ID
307            tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
308
309            // The last circuit in the set has just died, so the reactor should exit.
310            return Err(ReactorError::Shutdown);
311        }
312
313        if leg == self.primary_id {
314            // We have just removed our sending leg,
315            // so it's time to close the entire conflux set.
316            return Err(ReactorError::Shutdown);
317        }
318
319        cfg_if::cfg_if! {
320            if #[cfg(feature = "conflux")] {
321                self.remove_conflux(circ)
322            } else {
323                // Conflux is disabled, so we can't possibly continue running if the only
324                // leg in the tunnel is gone.
325                //
326                // Technically this should be unreachable (because of the is_empty()
327                // check above)
328                Err(internal!("Multiple legs in single-path tunnel?!").into())
329            }
330        }
331    }
332
333    /// Handle the removal of a circuit,
334    /// returning an error if the reactor needs to shut down.
335    #[cfg(feature = "conflux")]
336    fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
337        let Some(status) = circ.conflux_status() else {
338            return Err(internal!("Found non-conflux circuit in conflux set?!").into());
339        };
340
341        // TODO(conflux): should the circmgr be notified about the leg removal?
342        //
343        // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
344        // is closed, subject to the limits in [SIDE_CHANNELS]."
345
346        // If we've reached this point and the conflux set is non-empty,
347        // it means it's a multi-path set.
348        //
349        // Time to check if we need to tear down the entire set.
350        match status {
351            ConfluxStatus::Unlinked => {
352                // This circuit hasn't yet begun the conflux handshake,
353                // so we can safely remove it from the set
354                Ok(circ)
355            }
356            ConfluxStatus::Pending | ConfluxStatus::Linked => {
357                let (circ_last_seq_recv, circ_last_seq_sent) =
358                    (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
359
360                // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
361                if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
362                    if circ_last_seq_recv > max_last_seq_recv {
363                        return Err(ReactorError::Shutdown);
364                    }
365                }
366
367                if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
368                    if circ_last_seq_sent > max_last_seq_sent {
369                        return Err(ReactorError::Shutdown);
370                    }
371                }
372
373                let hop = self.join_point_hop(&circ)?;
374
375                let (inflight, cwnd) = (|| {
376                    let ccontrol = hop.ccontrol();
377                    let inflight = ccontrol.inflight()?;
378                    let cwnd = ccontrol.cwnd()?;
379
380                    Some((inflight, cwnd))
381                })()
382                .ok_or_else(|| {
383                    internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
384                })?;
385
386                // If data is in progress on the leg (inflight > cc_sendme_inc),
387                // then all legs must be closed
388                if inflight >= cwnd.params().sendme_inc() {
389                    return Err(ReactorError::Shutdown);
390                }
391
392                Ok(circ)
393            }
394        }
395    }
396
397    /// Return the maximum relative last_seq_recv across all circuits.
398    #[cfg(feature = "conflux")]
399    fn max_last_seq_recv(&self) -> Option<u64> {
400        self.legs
401            .iter()
402            .filter_map(|leg| leg.last_seq_recv().ok())
403            .max()
404    }
405
406    /// Return the maximum relative last_seq_sent across all circuits.
407    #[cfg(feature = "conflux")]
408    fn max_last_seq_sent(&self) -> Option<u64> {
409        self.legs
410            .iter()
411            .filter_map(|leg| leg.last_seq_sent().ok())
412            .max()
413    }
414
415    /// Get the [`CircHop`] of the join point on the specified `circ`,
416    /// returning an error if this is a single path conflux set.
417    fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
418        let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
419            return Err(internal!("No join point on conflux tunnel?!"));
420        };
421
422        circ.hop(join_point)
423            .ok_or_else(|| internal!("Conflux join point disappeared?!"))
424    }
425
426    /// Return an iterator of all circuits in the conflux set.
427    fn circuits(&self) -> impl Iterator<Item = &Circuit> {
428        self.legs.iter()
429    }
430
431    /// Return the most active [`TunnelActivity`] for any leg of this `ConfluxSet`.
432    pub(super) fn tunnel_activity(&self) -> TunnelActivity {
433        self.circuits()
434            .map(|c| c.hops.tunnel_activity())
435            .max()
436            .unwrap_or_else(TunnelActivity::never_used)
437    }
438
439    /// Add legs to the this conflux set.
440    ///
441    /// Returns an error if any of the legs is invalid.
442    ///
443    /// A leg is considered valid if
444    ///
445    ///   * the circuit has the same length as all the other circuits in the set
446    ///   * its last hop is equal to the designated join point
447    ///   * the circuit has no streams attached to any of its hops
448    ///   * the circuit is not already part of a conflux set
449    ///
450    /// Note: the circuits will not begin linking until
451    /// [`link_circuits`](Self::link_circuits) is called.
452    ///
453    /// IMPORTANT: this function does not prevent the construction of conflux sets
454    /// where the circuit legs share guard or middle relays. It is the responsibility
455    /// of the caller to enforce the following invariant from prop354:
456    ///
457    /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
458    /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
459    /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
460    ///
461    /// This is because at this level we don't actually know which relays are the guards,
462    /// so we can't know if the join point happens to be one of the Guard + Exit relays.
463    #[cfg(feature = "conflux")]
464    pub(super) fn add_legs(
465        &mut self,
466        legs: Vec<Circuit>,
467        runtime: &tor_rtcompat::DynTimeProvider,
468    ) -> Result<(), Bug> {
469        if legs.is_empty() {
470            return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
471        }
472
473        let join_point = match self.join_point.take() {
474            Some(p) => {
475                // Preserve the existing join point, if there is one.
476                p
477            }
478            None => {
479                let (hop, detail, streams) = (|| {
480                    let first_leg = self.circuits().next()?;
481                    let first_leg_path = first_leg.path();
482                    let all_hops = first_leg_path.all_hops();
483                    let hop_num = first_leg.last_hop_num()?;
484                    let detail = all_hops.last()?;
485                    let hop = first_leg.hop(hop_num)?;
486                    let streams = Arc::clone(hop.stream_map());
487                    Some((hop_num, detail.clone(), streams))
488                })()
489                .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
490
491                JoinPoint {
492                    hop,
493                    detail,
494                    streams,
495                }
496            }
497        };
498
499        // Check two HopDetails for equality.
500        //
501        // Returns an error if one of the hops is virtual.
502        let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
503            match (h1, h2) {
504                (HopDetail::Relay(t1), HopDetail::Relay(t2)) => Ok(t1.same_relay_ids(t2)),
505                #[cfg(feature = "hs-common")]
506                (HopDetail::Virtual, HopDetail::Virtual) => {
507                    // TODO(#2016): support onion service conflux
508                    Err(internal!("onion service conflux not supported"))
509                }
510                _ => Ok(false),
511            }
512        };
513
514        // A leg is considered valid if
515        //
516        //   * the circuit has the expected length
517        //     (the length of the first circuit we added to the set)
518        //   * its last hop is equal to the designated join point
519        //     (the last hop of the first circuit we added)
520        //   * the circuit has no streams attached to any of its hops
521        //   * the circuit is not already part of a conflux tunnel
522        //
523        // Returns an error if any hops are virtual.
524        let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
525            use crate::ccparams::Algorithm;
526
527            let path = leg.path();
528            let Some(last_hop) = path.all_hops().last() else {
529                // A circuit with no hops is invalid
530                return Ok(false);
531            };
532
533            // TODO: this sort of duplicates the check above.
534            // The difference is that above we read the hop detail
535            // information from the circuit Path, whereas here we get
536            // the actual last CircHop of the circuit.
537            let Some(last_hop_num) = leg.last_hop_num() else {
538                // A circuit with no hops is invalid
539                return Ok(false);
540            };
541
542            let circhop = leg
543                .hop(last_hop_num)
544                .ok_or_else(|| internal!("hop disappeared?!"))?;
545
546            // Ensure we negotiated a suitable cc algorithm
547            let is_cc_suitable = match circhop.ccontrol().algorithm() {
548                Algorithm::FixedWindow(_) => false,
549                Algorithm::Vegas(_) => true,
550            };
551
552            if !is_cc_suitable {
553                return Ok(false);
554            }
555
556            Ok(last_hop_num == join_point.hop
557                && hops_eq(last_hop, &join_point.detail)?
558                && !leg.has_streams()
559                && leg.conflux_status().is_none())
560        };
561
562        for leg in &legs {
563            if !leg_is_valid(leg)? {
564                return Err(bad_api_usage!("one more more conflux circuits are invalid"));
565            }
566        }
567
568        // Select a join point, or put the existing one back into self.
569        self.join_point = Some(join_point.clone());
570
571        // The legs are valid, so add them to the set.
572        for circ in legs {
573            let mutable = Arc::clone(circ.mutable());
574            let unique_id = circ.unique_id();
575            self.legs.push(circ);
576            // Merge the mutable state of the circuit into our tunnel state.
577            self.mutable.insert(unique_id, mutable);
578        }
579
580        let cwnd_params = self.cwnd_params()?;
581        for circ in self.legs.iter_mut() {
582            // The circuits that have a None status don't know they're part of
583            // a multi-path tunnel yet. They need to be initialized with a
584            // conflux message handler, and have their join point fixed up
585            // to share a stream map with the join point on all the other circuits.
586            if circ.conflux_status().is_none() {
587                let handler = Box::new(ClientConfluxMsgHandler::new(
588                    join_point.hop,
589                    self.nonce,
590                    Arc::clone(&self.last_seq_delivered),
591                    cwnd_params,
592                    runtime.clone(),
593                ));
594                let conflux_handler =
595                    ConfluxMsgHandler::new(handler, Arc::clone(&self.last_seq_delivered));
596
597                circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
598
599                // Ensure the stream map of the last hop is shared by all the legs
600                let last_hop = circ
601                    .hop_mut(join_point.hop)
602                    .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
603                last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
604            }
605        }
606
607        Ok(())
608    }
609
610    /// Get the [`CongestionWindowParams`] of the join point
611    /// on the first leg.
612    ///
613    /// Returns an error if the congestion control algorithm
614    /// doesn't have a congestion control window object,
615    /// or if the conflux set is empty, or the joint point hop
616    /// does not exist.
617    ///
618    // TODO: this function is a bit of a hack. In reality, we only
619    // need the cc_cwnd_init parameter (for SWITCH seqno validation).
620    // The fact that we obtain it from the cc params of the join point
621    // is an implementation detail (it's a workaround for the fact that
622    // at this point, these params can only obtained from a CircHop)
623    #[cfg(feature = "conflux")]
624    fn cwnd_params(&self) -> Result<CongestionWindowParams, Bug> {
625        let primary_leg = self
626            .leg(self.primary_id)
627            .ok_or_else(|| internal!("no primary leg?!"))?;
628        let join_point = self.join_point_hop(primary_leg)?;
629        let ccontrol = join_point.ccontrol();
630        let cwnd = ccontrol
631            .cwnd()
632            .ok_or_else(|| internal!("congestion control algorithm does not track the cwnd?!"))?;
633
634        Ok(*cwnd.params())
635    }
636
637    /// Try to update the primary leg based on the configured desired UX,
638    /// if needed.
639    ///
640    /// Returns the SWITCH cell to send on the primary leg,
641    /// if we switched primary leg.
642    #[cfg(feature = "conflux")]
643    pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
644        use tor_error::into_internal;
645
646        let Some(join_point) = self.join_point.as_ref() else {
647            // Return early if this is not a multi-path tunnel
648            return Ok(None);
649        };
650
651        let join_point = join_point.hop;
652
653        if !self.should_update_primary_leg() {
654            // Nothing to do
655            return Ok(None);
656        }
657
658        let Some(new_primary_id) = self.select_primary_leg()? else {
659            // None of the legs satisfy our UX requirements, continue using the existing one.
660            return Ok(None);
661        };
662
663        // Check that the newly selected leg is actually different from the previous
664        if self.primary_id == new_primary_id {
665            // The primary leg stays the same, nothing to do.
666            return Ok(None);
667        }
668
669        let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
670        self.primary_id = new_primary_id;
671        let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
672
673        // If this fails, it means we haven't updated our primary leg in a very long time.
674        //
675        // TODO(#2036): there are currently no safeguards to prevent us from staying
676        // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
677        // such that it forces us to switch legs periodically, to prevent the seqno delta from
678        // getting too big?
679        let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
680            into_internal!("Seqno delta for switch does not fit in u32?!"),
681        )?;
682
683        // We need to carry the last_seq_sent over to the next leg
684        // (the next cell sent will have seqno = prev_last_seq_sent + 1)
685        self.primary_leg_mut()?
686            .set_last_seq_sent(prev_last_seq_sent)?;
687
688        let switch = ConfluxSwitch::new(seqno_delta);
689        let cell = AnyRelayMsgOuter::new(None, switch.into());
690        Ok(Some(SendRelayCell {
691            hop: Some(join_point),
692            early: false,
693            cell,
694        }))
695    }
696
697    /// Whether it's time to select a new primary leg.
698    #[cfg(feature = "conflux")]
699    fn should_update_primary_leg(&mut self) -> bool {
700        if !self.selected_init_primary {
701            self.maybe_select_init_primary();
702            return false;
703        }
704
705        // If we don't have at least 2 legs,
706        // we can't switch our primary leg.
707        if self.legs.len() < 2 {
708            return false;
709        }
710
711        // TODO(conflux-tuning): if it turns out we switch legs too frequently,
712        // we might want to implement some sort of rate-limiting here
713        // (see c-tor's conflux_can_switch).
714
715        true
716    }
717
718    /// Return the best leg according to the configured desired UX.
719    ///
720    /// Returns `None` if no suitable leg was found.
721    #[cfg(feature = "conflux")]
722    fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
723        match self.desired_ux {
724            V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
725                self.select_primary_leg_min_rtt(false)
726            }
727            V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
728            V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
729                // TODO(conflux-tuning): add support for low-memory algorithms
730                self.select_primary_leg_min_rtt(false)
731            }
732            _ => {
733                // Default to MIN_RTT if we don't recognize the desired UX value
734                warn!(
735                    tunnel_id = %self.tunnel_id,
736                    "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
737                    self.desired_ux
738                );
739                self.select_primary_leg_min_rtt(false)
740            }
741        }
742    }
743
744    /// Try to choose an initial primary leg, if we have an initial RTT measurement
745    /// for at least one of the legs.
746    #[cfg(feature = "conflux")]
747    fn maybe_select_init_primary(&mut self) {
748        let best = self
749            .legs
750            .iter()
751            .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
752            .min_by_key(|(_leg, rtt)| *rtt)
753            .map(|(leg, _rtt)| leg.unique_id());
754
755        if let Some(best) = best {
756            self.primary_id = best;
757            self.selected_init_primary = true;
758        }
759    }
760
761    /// Return the leg with the best (lowest) RTT.
762    ///
763    /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
764    ///
765    /// Returns `None` if no suitable leg was found.
766    #[cfg(feature = "conflux")]
767    fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
768        let mut best = None;
769
770        for circ in self.legs.iter() {
771            let leg_id = circ.unique_id();
772            let join_point = self.join_point_hop(circ)?;
773            let ccontrol = join_point.ccontrol();
774
775            if check_can_send && !ccontrol.can_send() {
776                continue;
777            }
778
779            let rtt = ccontrol.rtt();
780            let init_rtt_usec = || {
781                circ.init_rtt()
782                    .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
783            };
784
785            let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
786                return Err(internal!(
787                    "attempted to select primary leg before handshake completed?!"
788                ));
789            };
790
791            match best.take() {
792                None => {
793                    best = Some((leg_id, ewma_rtt));
794                }
795                Some(best_so_far) => {
796                    if best_so_far.1 <= ewma_rtt {
797                        best = Some(best_so_far);
798                    } else {
799                        best = Some((leg_id, ewma_rtt));
800                    }
801                }
802            }
803        }
804
805        Ok(best.map(|(leg_id, _)| leg_id))
806    }
807
808    /// Returns `true` if our conflux join point is blocked on congestion control
809    /// on the specified `circuit`.
810    ///
811    /// Returns `false` if the join point is not blocked on cc,
812    /// or if this is a single-path set.
813    ///
814    /// Returns an error if this is a multipath tunnel,
815    /// but the joint point hop doesn't exist on the specified circuit.
816    #[cfg(feature = "conflux")]
817    fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
818        let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
819            internal!(
820                "Join point hop {} not found on circuit {}?!",
821                join_hop.display(),
822                circuit.unique_id(),
823            )
824        })?;
825
826        Ok(!join_circhop.ccontrol().can_send())
827    }
828
829    /// Returns whether [`next_circ_action`](Self::next_circ_action)
830    /// should avoid polling the join point streams entirely.
831    #[cfg(feature = "conflux")]
832    fn should_skip_join_point(&self) -> Result<bool, Bug> {
833        let Some(primary_join_point) = self.primary_join_point() else {
834            // Single-path, there is no join point
835            return Ok(false);
836        };
837
838        let join_hop = primary_join_point.1;
839        let primary_blocked_on_cc = {
840            let primary = self
841                .leg(self.primary_id)
842                .ok_or_else(|| internal!("primary leg disappeared?!"))?;
843            Self::is_join_point_blocked_on_cc(join_hop, primary)?
844        };
845
846        if !primary_blocked_on_cc {
847            // Easy, we can just carry on
848            return Ok(false);
849        }
850
851        // Now, if the primary *is* blocked on cc, we may still be able to poll
852        // the join point streams (if we're using the right desired UX)
853        let should_skip = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
854            // The primary leg is blocked on cc, and we can't switch because we're
855            // not using the high throughput algorithm, so we must stop reading
856            // the join point streams.
857            //
858            // Note: if the selected algorithm is HIGH_THROUGHPUT,
859            // it's okay to continue reading from the edge connection,
860            // because maybe_update_primary_leg() will select a new,
861            // non-blocked primary leg, just before sending.
862            trace!(
863                tunnel_id = %self.tunnel_id,
864                join_point = ?primary_join_point,
865                reason = "sending leg blocked on congestion control",
866                "Pausing join point stream reads"
867            );
868
869            true
870        } else {
871            // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
872            // to an unblocked leg before sending any cells over the join point,
873            // as long as there are some unblocked legs.
874
875            // TODO: figure out how to rewrite this with an idiomatic iterator combinator
876            let mut all_blocked_on_cc = true;
877            for leg in &self.legs {
878                all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
879                if !all_blocked_on_cc {
880                    break;
881                }
882            }
883
884            if all_blocked_on_cc {
885                // All legs are blocked on cc, so we must stop reading from
886                // the join point streams for now.
887                trace!(
888                    tunnel_id = %self.tunnel_id,
889                    join_point = ?primary_join_point,
890                    reason = "all legs blocked on congestion control",
891                    "Pausing join point stream reads"
892                );
893
894                true
895            } else {
896                // At least one leg is not blocked, so we can continue reading
897                // from the join point streams
898                false
899            }
900        };
901
902        Ok(should_skip)
903    }
904
905    /// Returns the next ready [`CircuitAction`],
906    /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
907    ///
908    /// Will return an error if there are no circuits in this set,
909    /// or other internal errors occur.
910    ///
911    /// This is cancellation-safe.
912    #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
913    #[instrument(level = "trace", skip_all)]
914    pub(super) async fn next_circ_action(
915        &mut self,
916        runtime: &tor_rtcompat::DynTimeProvider,
917    ) -> Result<SmallVec<[CircuitAction; CIRC_ACTION_COUNT]>, crate::Error> {
918        // Avoid polling the streams on the join point if our primary
919        // leg is blocked on cc
920        cfg_if::cfg_if! {
921            if #[cfg(feature = "conflux")] {
922                let mut should_poll_join_point = !self.should_skip_join_point()?;
923            } else {
924                let mut should_poll_join_point = true;
925            }
926        };
927        let join_point = self.primary_join_point().map(|join_point| join_point.1);
928
929        // Each circuit leg has a PollAll future (see poll_all_circ below)
930        // that drives two futures: one that reads from input channel,
931        // and another drives the application streams.
932        //
933        // *This* PollAll drives the PollAll futures of all circuit legs in lockstep,
934        // ensuring they all get a chance to make some progress on every reactor iteration.
935        //
936        // IMPORTANT: if you want to push additional futures into this,
937        // bear in mind that the ordering matters!
938        // If multiple futures resolve at the same time, their results will be processed
939        // in the order their corresponding futures were inserted into `PollAll`.
940        // So if futures A and B resolve at the same time, and future A was pushed
941        // into `PollAll` before future B, the result of future A will come
942        // before future B's result in the result list returned by poll_all.await.
943        //
944        // This means that the actions corresponding to the first circuit in the tunnel
945        // will be executed first, followed by the actions issued by the next circuit,
946        // and s on.
947        //
948        let mut poll_all =
949            PollAll::<MAX_CONFLUX_LEGS, SmallVec<[CircuitAction; NUM_CIRC_FUTURES]>>::new();
950
951        for leg in &mut self.legs {
952            let unique_id = leg.unique_id();
953            let tunnel_id = self.tunnel_id;
954            let runtime = runtime.clone();
955
956            // Garbage-collect all halfstreams that have expired.
957            //
958            // Note: this will iterate over the closed streams of all hops.
959            // If we think this will cause perf issues, one idea would be to make
960            // StreamMap::closed_streams into a min-heap, and add a branch to the
961            // select_biased! below to sleep until the first expiry is due
962            // (but my gut feeling is that iterating is cheaper)
963            leg.remove_expired_halfstreams(runtime.now());
964
965            // The client SHOULD abandon and close circuit if the LINKED message takes too long to
966            // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
967            // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
968            // implementation currently uses Circuit Build Timeout).
969            let conflux_hs_timeout = leg.conflux_hs_timeout();
970
971            let mut poll_all_circ = PollAll::<NUM_CIRC_FUTURES, CircuitAction>::new();
972
973            let input = leg.input.next().map(move |res| match res {
974                Some(cell) => CircuitAction::HandleCell {
975                    leg: unique_id,
976                    cell,
977                },
978                None => CircuitAction::RemoveLeg {
979                    leg: unique_id,
980                    reason: RemoveLegReason::ChannelClosed,
981                },
982            });
983            poll_all_circ.push(input);
984
985            // This future resolves when the chan_sender sink (i.e. the outgoing TCP connection)
986            // becomes ready. We need it inside the next_ready_stream future below,
987            // to prevent reading from the application streams before we are ready to send.
988            let chan_ready_fut = futures::future::poll_fn(|cx| {
989                use futures::Sink as _;
990
991                // Ensure the chan sender sink is ready before polling the ready streams.
992                Pin::new(&mut leg.chan_sender).poll_ready(cx)
993            });
994
995            let exclude_hop = if should_poll_join_point {
996                // Avoid polling the join point more than once per reactor loop.
997                should_poll_join_point = false;
998                None
999            } else {
1000                join_point
1001            };
1002
1003            let mut ready_streams = leg.hops.ready_streams_iterator(exclude_hop);
1004            let next_ready_stream = async move {
1005                // Avoid polling the application streams if the outgoing sink is blocked
1006                let _ = chan_ready_fut.await;
1007
1008                match ready_streams.next().await {
1009                    Some(x) => x,
1010                    None => {
1011                        info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
1012                        // There are no ready streams (for example, they may all be
1013                        // blocked due to congestion control), so there is nothing
1014                        // to do.
1015                        // We await an infinitely pending future so that we don't
1016                        // immediately return a `None` in the `select_biased!` below.
1017                        // We'd rather wait on `input.next()` than immediately return with
1018                        // no `CircuitAction`, which could put the reactor into a spin loop.
1019                        let () = std::future::pending().await;
1020                        unreachable!();
1021                    }
1022                }
1023            };
1024
1025            poll_all_circ.push(next_ready_stream.map(move |cmd| CircuitAction::RunCmd {
1026                leg: unique_id,
1027                cmd,
1028            }));
1029
1030            let mut next_padding_event_fut = leg.padding_event_stream.next();
1031
1032            // This selects between 3 actions that cannot be handled concurrently.
1033            //
1034            // If the conflux handshake times out, we need to remove the circuit leg
1035            // (any pending padding events or application stream data should be discarded;
1036            // in fact, there shouldn't even be any open streams on circuits that are
1037            // in the conflux handshake phase).
1038            //
1039            // If there's a padding event, we need to handle it immediately,
1040            // because it might tell us to start blocking the chan_sender sink,
1041            // which, in turn, means we need to stop trying to read from the application streams.
1042            poll_all.push(
1043                async move {
1044                    let conflux_hs_timeout = if let Some(timeout) = conflux_hs_timeout {
1045                        // TODO: ask Diziet if we can have a sleep_until_instant() function
1046                        Box::pin(runtime.sleep_until_wallclock(timeout))
1047                            as Pin<Box<dyn Future<Output = ()> + Send>>
1048                    } else {
1049                        Box::pin(std::future::pending())
1050                    };
1051                    select_biased! {
1052                        () = conflux_hs_timeout.fuse() => {
1053                            warn!(
1054                                tunnel_id = %tunnel_id,
1055                                circ_id = %unique_id,
1056                                "Conflux handshake timed out on circuit"
1057                            );
1058
1059                            // Conflux handshake has timed out, time to remove this circuit leg,
1060                            // and notify the handshake initiator.
1061                            smallvec![CircuitAction::RemoveLeg {
1062                                leg: unique_id,
1063                                reason: RemoveLegReason::ConfluxHandshakeTimeout,
1064                            }]
1065                        }
1066                        padding_action = next_padding_event_fut => {
1067                            smallvec![CircuitAction::PaddingAction {
1068                                leg: unique_id,
1069                                padding_action:
1070                                    padding_action.expect("PaddingEventStream, surprisingly, was terminated!"),
1071                            }]
1072                        }
1073                        ret = poll_all_circ.fuse() => ret,
1074                    }
1075                }
1076            );
1077        }
1078
1079        // Flatten the nested SmallVecs to simplify the calling code
1080        // (which will handle all the returned actions sequentially).
1081        Ok(poll_all.await.into_iter().flatten().collect())
1082    }
1083
1084    /// The join point on the current primary leg.
1085    pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
1086        self.join_point
1087            .as_ref()
1088            .map(|join_point| (self.primary_id, join_point.hop))
1089    }
1090
1091    /// Does congestion control use stream SENDMEs for the given hop?
1092    ///
1093    /// Returns `None` if either the `leg` or `hop` don't exist.
1094    pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1095        self.leg(leg)?.uses_stream_sendme(hop)
1096    }
1097
1098    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
1099    ///
1100    /// See [`Circuit::send_relay_cell`].
1101    #[instrument(level = "trace", skip_all)]
1102    pub(super) async fn send_relay_cell_on_leg(
1103        &mut self,
1104        msg: SendRelayCell,
1105        leg: Option<UniqId>,
1106    ) -> crate::Result<()> {
1107        let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
1108        let leg = if let Some(join_point) = conflux_join_point {
1109            let hop = msg.hop.expect("missing hop in client SendRelayCell?!");
1110            // Conflux circuits always send multiplexed relay commands to
1111            // to the last hop (the join point).
1112            if cmd_counts_towards_seqno(msg.cell.cmd()) {
1113                if hop != join_point {
1114                    // For leaky pipe, we must continue using the original leg
1115                    leg
1116                } else {
1117                    let old_primary_leg = self.primary_id;
1118                    // Check if it's time to switch our primary leg.
1119                    #[cfg(feature = "conflux")]
1120                    if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1121                        trace!(
1122                            old = ?old_primary_leg,
1123                            new = ?self.primary_id,
1124                            "Switching primary conflux leg..."
1125                        );
1126
1127                        self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1128                    }
1129
1130                    // Use the possibly updated primary leg
1131                    Some(self.primary_id)
1132                }
1133            } else {
1134                // Non-multiplexed commands go on their original
1135                // circuit and hop
1136                leg
1137            }
1138        } else {
1139            // If there is no join point, it means this is not
1140            // a multi-path tunnel, so we continue using
1141            // the leg_id/hop the cmd came from.
1142            leg
1143        };
1144
1145        let leg = leg.unwrap_or(self.primary_id);
1146
1147        let circ = self
1148            .leg_mut(leg)
1149            .ok_or_else(|| internal!("leg disappeared?!"))?;
1150
1151        circ.send_relay_cell(msg).await
1152    }
1153
1154    /// Send a LINK cell down each unlinked leg.
1155    #[cfg(feature = "conflux")]
1156    pub(super) async fn link_circuits(
1157        &mut self,
1158        runtime: &tor_rtcompat::DynTimeProvider,
1159    ) -> crate::Result<()> {
1160        let (_leg_id, join_point) = self
1161            .primary_join_point()
1162            .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1163
1164        // Link all the circuits that haven't started the conflux handshake yet.
1165        for circ in self
1166            .legs
1167            .iter_mut()
1168            // TODO: it is an internal error if any of the legs don't have a conflux handler
1169            // (i.e. if conflux_status() returns None)
1170            .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1171        {
1172            let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1173            let link = ConfluxLink::new(v1_payload);
1174            let cell = AnyRelayMsgOuter::new(None, link.into());
1175
1176            circ.begin_conflux_link(join_point, cell, runtime).await?;
1177        }
1178
1179        // TODO(conflux): the caller should take care to not allow opening streams
1180        // until the conflux set is ready (i.e. until at least one of the legs completes
1181        // the handshake).
1182        //
1183        // We will probably need a channel for notifying the caller
1184        // of handshake completion/conflux set readiness
1185
1186        Ok(())
1187    }
1188
1189    /// Get the number of unlinked or non-conflux legs.
1190    #[cfg(feature = "conflux")]
1191    pub(super) fn num_unlinked(&self) -> usize {
1192        self.circuits()
1193            .filter(|circ| {
1194                let status = circ.conflux_status();
1195                status.is_none() || status == Some(ConfluxStatus::Unlinked)
1196            })
1197            .count()
1198    }
1199
1200    /// Check if the specified sequence number is the sequence number of the
1201    /// next message we're expecting to handle.
1202    pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1203        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1204        seq_recv == last_seq_delivered + 1
1205    }
1206
1207    /// Remove the circuit leg with the specified `UniqId` from this conflux set.
1208    ///
1209    /// Unlike [`ConfluxSet::remove`], this function does not check
1210    /// if the removal of the leg ought to trigger a reactor shutdown.
1211    ///
1212    /// Returns an error if the leg doesn't exit in the conflux set.
1213    fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
1214        let idx = self
1215            .legs
1216            .iter()
1217            .position(|circ| circ.unique_id() == circ_id)
1218            .ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
1219
1220        Ok(self.legs.remove(idx))
1221    }
1222
1223    /// Perform some circuit-padding-based action on the specified circuit.
1224    #[cfg(feature = "circ-padding")]
1225    pub(super) async fn run_padding_action(
1226        &mut self,
1227        circ_id: UniqId,
1228        padding_action: PaddingEvent,
1229    ) -> crate::Result<()> {
1230        use PaddingEvent as E;
1231        let Some(circ) = self.leg_mut(circ_id) else {
1232            // No such circuit; it must have gone away after generating this event.
1233            // Just ignore it.
1234            return Ok(());
1235        };
1236
1237        match padding_action {
1238            E::SendPadding(send_padding) => {
1239                circ.send_padding(send_padding).await?;
1240            }
1241            E::StartBlocking(start_blocking) => {
1242                circ.start_blocking_for_padding(start_blocking);
1243            }
1244            E::StopBlocking => {
1245                circ.stop_blocking_for_padding();
1246            }
1247        }
1248        Ok(())
1249    }
1250}
1251
1252/// An error returned when a method is expecting a single-leg conflux circuit,
1253/// but it is not single-leg.
1254#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
1255pub(super) struct NotSingleLegError(#[source] Bug);
1256
1257impl From<NotSingleLegError> for Bug {
1258    fn from(e: NotSingleLegError) -> Self {
1259        e.0
1260    }
1261}
1262
1263impl From<NotSingleLegError> for crate::Error {
1264    fn from(e: NotSingleLegError) -> Self {
1265        Self::from(e.0)
1266    }
1267}
1268
1269impl From<NotSingleLegError> for ReactorError {
1270    fn from(e: NotSingleLegError) -> Self {
1271        Self::from(e.0)
1272    }
1273}
1274
1275impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
1276    fn from(e: ExactlyOneError<I>) -> Self {
1277        // TODO: cannot wrap the ExactlyOneError with into_bad_api_usage
1278        // because it's not Send + Sync
1279        Self(bad_api_usage!("not a single leg conflux set ({e})"))
1280    }
1281}
1282
1283#[cfg(test)]
1284mod test {
1285    // Tested in [`crate::client::circuit::test`].
1286}