tor_proto/tunnel/reactor/conflux.rs
1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::{Arc, Mutex};
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use smallvec::{smallvec, SmallVec};
14use tor_rtcompat::SleepProviderExt as _;
15use tracing::{info, trace, warn};
16
17use tor_async_utils::SinkPrepareExt as _;
18use tor_basic_utils::flatten;
19use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
20use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
21use tor_linkspec::HasRelayIds as _;
22
23use crate::circuit::path::HopDetail;
24use crate::circuit::{TunnelMutableState, UniqId};
25use crate::crypto::cell::HopNum;
26use crate::tunnel::reactor::circuit::ConfluxStatus;
27use crate::tunnel::{streammap, TunnelId};
28use crate::util::err::ReactorError;
29
30use super::circuit::CircHop;
31use super::{Circuit, CircuitAction, RemoveLegReason, SendRelayCell};
32
33#[cfg(feature = "conflux")]
34use {
35 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
36 tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
37};
38
39#[cfg(feature = "conflux")]
40pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
41
42/// The maximum number of conflux legs to store in the conflux set SmallVec.
43///
44/// Attempting to store more legs will cause the SmallVec to spill to the heap.
45///
46/// Note: this value was picked arbitrarily and may not be suitable.
47const MAX_CONFLUX_LEGS: usize = 16;
48
49/// A set with one or more circuits.
50///
51/// ### Conflux set life cycle
52///
53/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
54///
55/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
56///
57/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
58/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
59///
60/// The reactor can then turn the `ConfluxSet` into a multi-path set
61/// (a multi-path set is a conflux set that contains more than 1 circuit).
62/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
63/// by the reactor user (also referred to as the "conflux handshake initiator").
64/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
65///
66/// Circuits can be removed from the set using [`ConfluxSet::remove`].
67///
68/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
69/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
70/// This can happen on an explicit shutdown request, or if a fatal error occurs.
71///
72/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
73/// For example, if after being instructed to remove a circuit from the set
74/// using [`ConfluxSet::remove`], the set is completely depleted,
75/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
76/// which will cause the reactor to shut down.
77pub(super) struct ConfluxSet {
78 /// The unique identifier of the tunnel this conflux set belongs to.
79 ///
80 /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
81 /// that gets used for logging purposes.
82 tunnel_id: TunnelId,
83 /// The circuits in this conflux set.
84 legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
85 /// Tunnel state, shared with `ClientCirc`.
86 ///
87 /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
88 mutable: Arc<TunnelMutableState>,
89 /// The unique identifier of the primary leg
90 primary_id: UniqId,
91 /// The join point of the set, if this is a multi-path set.
92 ///
93 /// Initially the conflux set starts out as a single-path set with no join point.
94 /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
95 /// the join point is initialized to the last hop in the tunnel.
96 //
97 // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
98 // to ensure the HopNum of the join point is the same for all of them.
99 //
100 // In the future we might want to relax this restriction.
101 join_point: Option<JoinPoint>,
102 /// The nonce associated with the circuits from this set.
103 #[cfg(feature = "conflux")]
104 nonce: V1Nonce,
105 /// The desired UX
106 #[cfg(feature = "conflux")]
107 desired_ux: V1DesiredUx,
108 /// The absolute sequence number of the last cell delivered to a stream.
109 ///
110 /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
111 ///
112 /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
113 /// of the leg compares the (leg-local) sequence number of the message
114 /// with this sequence number to determine whether the message is in-order.
115 ///
116 /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
117 /// to deliver it to its corresponding stream.
118 ///
119 /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
120 /// to instruct the reactor to buffer the message.
121 last_seq_delivered: Arc<AtomicU64>,
122 /// Whether we have selected our initial primary leg,
123 /// if this is a multipath conflux set.
124 selected_init_primary: bool,
125}
126
127/// The conflux join point.
128#[derive(Clone, derive_more::Debug)]
129struct JoinPoint {
130 /// The hop number.
131 hop: HopNum,
132 /// The [`HopDetail`] of the hop.
133 detail: HopDetail,
134 /// The stream map of the joint point, shared with each circuit leg.
135 #[debug(skip)]
136 streams: Arc<Mutex<streammap::StreamMap>>,
137}
138
139impl ConfluxSet {
140 /// Create a new conflux set, consisting of a single leg.
141 ///
142 /// Returns the newly created set and a reference to its [`TunnelMutableState`].
143 pub(super) fn new(
144 tunnel_id: TunnelId,
145 circuit_leg: Circuit,
146 ) -> (Self, Arc<TunnelMutableState>) {
147 let primary_id = circuit_leg.unique_id();
148 let circ_mutable = Arc::clone(circuit_leg.mutable());
149 let legs = smallvec![circuit_leg];
150 // Note: the join point is only set for multi-path tunnels
151 let join_point = None;
152
153 // TODO(#2035): read this from the consensus/config.
154 #[cfg(feature = "conflux")]
155 let desired_ux = V1DesiredUx::NO_OPINION;
156
157 let mutable = Arc::new(TunnelMutableState::default());
158 mutable.insert(primary_id, circ_mutable);
159
160 let set = Self {
161 tunnel_id,
162 legs,
163 primary_id,
164 join_point,
165 mutable: mutable.clone(),
166 #[cfg(feature = "conflux")]
167 nonce: V1Nonce::new(&mut rand::rng()),
168 #[cfg(feature = "conflux")]
169 desired_ux,
170 last_seq_delivered: Arc::new(AtomicU64::new(0)),
171 selected_init_primary: false,
172 };
173
174 (set, mutable)
175 }
176
177 /// Remove and return the only leg of this conflux set.
178 ///
179 /// Returns an error if there is more than one leg in the set,
180 /// or if called before any circuit legs are available.
181 ///
182 /// Calling this function will empty the [`ConfluxSet`].
183 pub(super) fn take_single_leg(&mut self) -> Result<Circuit, NotSingleLegError> {
184 let primary_index =
185 element_idx(self.legs.iter(), self.primary_id).ok_or(NotSingleError::None)?;
186 Ok(self.legs.remove(primary_index))
187 }
188
189 /// Return a reference to the only leg of this conflux set,
190 /// along with the leg's ID.
191 ///
192 /// Returns an error if there is more than one leg in the set,
193 /// or if called before any circuit legs are available.
194 pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
195 Ok(get_single(self.legs.iter())?)
196 }
197
198 /// Return a mutable reference to the only leg of this conflux set,
199 /// along with the leg's ID.
200 ///
201 /// Returns an error if there is more than one leg in the set,
202 /// or if called before any circuit legs are available.
203 pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
204 Ok(get_single(self.legs.iter_mut())?)
205 }
206
207 /// Return the primary leg of this conflux set.
208 ///
209 /// Returns an error if called before any circuit legs are available.
210 pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
211 #[cfg(not(feature = "conflux"))]
212 if self.legs.len() > 1 {
213 return Err(internal!(
214 "got multipath tunnel, but conflux feature is disabled?!"
215 ));
216 }
217
218 if self.legs.is_empty() {
219 Err(bad_api_usage!(
220 "tried to get circuit leg before creating it?!"
221 ))
222 } else {
223 let circ = self
224 .leg_mut(self.primary_id)
225 .ok_or_else(|| internal!("conflux set is empty?!"))?;
226
227 Ok(circ)
228 }
229 }
230
231 /// Return a reference to the leg of this conflux set with the given id.
232 pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
233 self.legs.iter().find(|circ| circ.unique_id() == leg_id)
234 }
235
236 /// Return a mutable reference to the leg of this conflux set with the given id.
237 pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
238 self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
239 }
240
241 /// Return the number of legs in this conflux set.
242 pub(super) fn len(&self) -> usize {
243 self.legs.len()
244 }
245
246 /// Return whether this conflux set is empty.
247 pub(super) fn is_empty(&self) -> bool {
248 self.legs.len() == 0
249 }
250
251 /// Remove the specified leg from this conflux set.
252 ///
253 /// Returns an error if the given leg doesn't exist in the set.
254 ///
255 /// Returns an error instructing the reactor to perform a clean shutdown
256 /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
257 ///
258 /// * the set is depleted (empty) after removing the specified leg
259 /// * `leg` is currently the sending (primary) leg of this set
260 /// * the closed leg had the highest non-zero last_seq_recv/sent
261 /// * the closed leg had some in-progress data (inflight > cc_sendme_inc)
262 ///
263 /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
264 ///
265 /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
266 pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
267 let idx = element_idx(self.legs.iter(), leg)
268 .ok_or_else(|| bad_api_usage!("leg {leg:?} not found in conflux set"))?;
269 let circ: Circuit = self.legs.remove(idx);
270
271 tracing::trace!(
272 circ_id = %circ.unique_id(),
273 "Circuit removed from conflux set"
274 );
275
276 self.mutable.remove(circ.unique_id());
277
278 if self.legs.is_empty() {
279 // TODO: log the tunnel ID
280 tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
281
282 // The last circuit in the set has just died, so the reactor should exit.
283 return Err(ReactorError::Shutdown);
284 }
285
286 if leg == self.primary_id {
287 // We have just removed our sending leg,
288 // so it's time to close the entire conflux set.
289 return Err(ReactorError::Shutdown);
290 }
291
292 cfg_if::cfg_if! {
293 if #[cfg(feature = "conflux")] {
294 self.remove_conflux(circ)
295 } else {
296 // Conflux is disabled, so we can't possibly continue running if the only
297 // leg in the tunnel is gone.
298 //
299 // Technically this should be unreachable (because of the is_empty()
300 // check above)
301 Err(internal!("Multiple legs in single-path tunnel?!").into())
302 }
303 }
304 }
305
306 /// Handle the removal of a circuit,
307 /// returning an error if the reactor needs to shut down.
308 #[cfg(feature = "conflux")]
309 fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
310 let Some(status) = circ.conflux_status() else {
311 return Err(internal!("Found non-conflux circuit in conflux set?!").into());
312 };
313
314 // TODO(conflux): should the circmgr be notified about the leg removal?
315 //
316 // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
317 // is closed, subject to the limits in [SIDE_CHANNELS]."
318
319 // If we've reached this point and the conflux set is non-empty,
320 // it means it's a multi-path set.
321 //
322 // Time to check if we need to tear down the entire set.
323 match status {
324 ConfluxStatus::Unlinked => {
325 // This circuit hasn't yet begun the conflux handshake,
326 // so we can safely remove it from the set
327 Ok(circ)
328 }
329 ConfluxStatus::Pending | ConfluxStatus::Linked => {
330 let (circ_last_seq_recv, circ_last_seq_sent) =
331 (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
332
333 // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
334 if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
335 if circ_last_seq_recv > max_last_seq_recv {
336 return Err(ReactorError::Shutdown);
337 }
338 }
339
340 if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
341 if circ_last_seq_sent > max_last_seq_sent {
342 return Err(ReactorError::Shutdown);
343 }
344 }
345
346 let hop = self.join_point_hop(&circ)?;
347
348 let (inflight, cwnd) = (|| {
349 let ccontrol = hop.ccontrol();
350 let inflight = ccontrol.inflight()?;
351 let cwnd = ccontrol.cwnd()?;
352
353 Some((inflight, cwnd))
354 })()
355 .ok_or_else(|| {
356 internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
357 })?;
358
359 // If data is in progress on the leg (inflight > cc_sendme_inc),
360 // then all legs must be closed
361 if inflight >= cwnd.params().sendme_inc() {
362 return Err(ReactorError::Shutdown);
363 }
364
365 Ok(circ)
366 }
367 }
368 }
369
370 /// Return the maximum relative last_seq_recv across all circuits.
371 #[cfg(feature = "conflux")]
372 fn max_last_seq_recv(&self) -> Option<u64> {
373 self.legs
374 .iter()
375 .filter_map(|leg| leg.last_seq_recv().ok())
376 .max()
377 }
378
379 /// Return the maximum relative last_seq_sent across all circuits.
380 #[cfg(feature = "conflux")]
381 fn max_last_seq_sent(&self) -> Option<u64> {
382 self.legs
383 .iter()
384 .filter_map(|leg| leg.last_seq_sent().ok())
385 .max()
386 }
387
388 /// Get the [`CircHop`] of the join point on the specified `circ`,
389 /// returning an error if this is a single path conflux set.
390 fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
391 let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
392 return Err(internal!("No join point on conflux tunnel?!"));
393 };
394
395 circ.hop(join_point)
396 .ok_or_else(|| internal!("Conflux join point disappeared?!"))
397 }
398
399 /// Return an iterator of all circuits in the conflux set.
400 fn circuits(&self) -> impl Iterator<Item = &Circuit> {
401 self.legs.iter()
402 }
403
404 /// Add legs to the this conflux set.
405 ///
406 /// Returns an error if any of the legs is invalid.
407 ///
408 /// A leg is considered valid if
409 ///
410 /// * the circuit has the same length as all the other circuits in the set
411 /// * its last hop is equal to the designated join point
412 /// * the circuit has no streams attached to any of its hops
413 /// * the circuit is not already part of a conflux set
414 ///
415 /// Note: the circuits will not begin linking until
416 /// [`link_circuits`](Self::link_circuits) is called.
417 ///
418 /// IMPORTANT: this function does not prevent the construction of conflux sets
419 /// where the circuit legs share guard or middle relays. It is the responsibility
420 /// of the caller to enforce the following invariant from prop354:
421 ///
422 /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
423 /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
424 /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
425 ///
426 /// This is because at this level we don't actually know which relays are the guards,
427 /// so we can't know if the join point happens to be one of the Guard + Exit relays.
428 #[cfg(feature = "conflux")]
429 pub(super) fn add_legs(
430 &mut self,
431 legs: Vec<Circuit>,
432 runtime: &tor_rtcompat::DynTimeProvider,
433 ) -> Result<(), Bug> {
434 if legs.is_empty() {
435 return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
436 }
437
438 let join_point = match self.join_point.take() {
439 Some(p) => {
440 // Preserve the existing join point, if there is one.
441 p
442 }
443 None => {
444 let (hop, detail, streams) = (|| {
445 let first_leg = self.circuits().next()?;
446 let first_leg_path = first_leg.path();
447 let all_hops = first_leg_path.all_hops();
448 let hop_num = first_leg.last_hop_num()?;
449 let detail = all_hops.last()?;
450 let hop = first_leg.hop(hop_num)?;
451 let streams = Arc::clone(hop.stream_map());
452 Some((hop_num, detail.clone(), streams))
453 })()
454 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
455
456 JoinPoint {
457 hop,
458 detail,
459 streams,
460 }
461 }
462 };
463
464 // Check two HopDetails for equality.
465 //
466 // Returns an error if one of the hops is virtual.
467 let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
468 match (h1, h2) {
469 (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => Ok(t1.same_relay_ids(t2)),
470 #[cfg(feature = "hs-common")]
471 (HopDetail::Virtual, HopDetail::Virtual) => {
472 // TODO(#2016): support onion service conflux
473 Err(internal!("onion service conflux not supported"))
474 }
475 _ => Ok(false),
476 }
477 };
478
479 // A leg is considered valid if
480 //
481 // * the circuit has the expected length
482 // (the length of the first circuit we added to the set)
483 // * its last hop is equal to the designated join point
484 // (the last hop of the first circuit we added)
485 // * the circuit has no streams attached to any of its hops
486 // * the circuit is not already part of a conflux tunnel
487 //
488 // Returns an error if any hops are virtual.
489 let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
490 use crate::ccparams::Algorithm;
491
492 let path = leg.path();
493 let Some(last_hop) = path.all_hops().last() else {
494 // A circuit with no hops is invalid
495 return Ok(false);
496 };
497
498 // TODO: this sort of duplicates the check above.
499 // The difference is that above we read the hop detail
500 // information from the circuit Path, whereas here we get
501 // the actual last CircHop of the circuit.
502 let Some(last_hop_num) = leg.last_hop_num() else {
503 // A circuit with no hops is invalid
504 return Ok(false);
505 };
506
507 let circhop = leg
508 .hop(last_hop_num)
509 .ok_or_else(|| internal!("hop disappeared?!"))?;
510
511 // Ensure we negotiated a suitable cc algorithm
512 let is_cc_suitable = match circhop.ccontrol().algorithm() {
513 Algorithm::FixedWindow(_) => false,
514 Algorithm::Vegas(_) => true,
515 };
516
517 if !is_cc_suitable {
518 return Ok(false);
519 }
520
521 Ok(last_hop_num == join_point.hop
522 && hops_eq(last_hop, &join_point.detail)?
523 && !leg.has_streams()
524 && leg.conflux_status().is_none())
525 };
526
527 for leg in &legs {
528 if !leg_is_valid(leg)? {
529 return Err(bad_api_usage!("one more more conflux circuits are invalid"));
530 }
531 }
532
533 // Select a join point, or put the existing one back into self.
534 self.join_point = Some(join_point.clone());
535
536 // The legs are valid, so add them to the set.
537 for circ in legs {
538 let mutable = Arc::clone(circ.mutable());
539 let unique_id = circ.unique_id();
540 self.legs.push(circ);
541 // Merge the mutable state of the circuit into our tunnel state.
542 self.mutable.insert(unique_id, mutable);
543 }
544
545 for circ in self.legs.iter_mut() {
546 // The circuits that have a None status don't know they're part of
547 // a multi-path tunnel yet. They need to be initialized with a
548 // conflux message handler, and have their join point fixed up
549 // to share a stream map with the join point on all the other circuits.
550 if circ.conflux_status().is_none() {
551 let conflux_handler = ConfluxMsgHandler::new_client(
552 join_point.hop,
553 self.nonce,
554 Arc::clone(&self.last_seq_delivered),
555 runtime.clone(),
556 );
557
558 circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
559
560 // Ensure the stream map of the last hop is shared by all the legs
561 let last_hop = circ
562 .hop_mut(join_point.hop)
563 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
564 last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
565 }
566 }
567
568 Ok(())
569 }
570
571 /// Try to update the primary leg based on the configured desired UX,
572 /// if needed.
573 ///
574 /// Returns the SWITCH cell to send on the primary leg,
575 /// if we switched primary leg.
576 #[cfg(feature = "conflux")]
577 pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
578 use tor_error::into_internal;
579
580 let Some(join_point) = self.join_point.as_ref() else {
581 // Return early if this is not a multi-path tunnel
582 return Ok(None);
583 };
584
585 let join_point = join_point.hop;
586
587 if !self.should_update_primary_leg() {
588 // Nothing to do
589 return Ok(None);
590 }
591
592 let Some(new_primary_id) = self.select_primary_leg()? else {
593 // None of the legs satisfy our UX requirements, continue using the existing one.
594 return Ok(None);
595 };
596
597 // Check that the newly selected leg is actually different from the previous
598 if self.primary_id == new_primary_id {
599 // The primary leg stays the same, nothing to do.
600 return Ok(None);
601 }
602
603 let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
604 self.primary_id = new_primary_id;
605 let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
606
607 // If this fails, it means we haven't updated our primary leg in a very long time.
608 //
609 // TODO(#2036): there are currently no safeguards to prevent us from staying
610 // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
611 // such that it forces us to switch legs periodically, to prevent the seqno delta from
612 // getting too big?
613 let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
614 into_internal!("Seqno delta for switch does not fit in u32?!"),
615 )?;
616
617 // We need to carry the last_seq_sent over to the next leg
618 // (the next cell sent will have seqno = prev_last_seq_sent + 1)
619 self.primary_leg_mut()?
620 .set_last_seq_sent(prev_last_seq_sent)?;
621
622 let switch = ConfluxSwitch::new(seqno_delta);
623 let cell = AnyRelayMsgOuter::new(None, switch.into());
624 Ok(Some(SendRelayCell {
625 hop: join_point,
626 early: false,
627 cell,
628 }))
629 }
630
631 /// Whether it's time to select a new primary leg.
632 #[cfg(feature = "conflux")]
633 fn should_update_primary_leg(&mut self) -> bool {
634 if !self.selected_init_primary {
635 self.maybe_select_init_primary();
636 return false;
637 }
638
639 // If we don't have at least 2 legs,
640 // we can't switch our primary leg.
641 if self.legs.len() < 2 {
642 return false;
643 }
644
645 // TODO(conflux-tuning): if it turns out we switch legs too frequently,
646 // we might want to implement some sort of rate-limiting here
647 // (see c-tor's conflux_can_switch).
648
649 true
650 }
651
652 /// Return the best leg according to the configured desired UX.
653 ///
654 /// Returns `None` if no suitable leg was found.
655 #[cfg(feature = "conflux")]
656 fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
657 match self.desired_ux {
658 V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
659 self.select_primary_leg_min_rtt(false)
660 }
661 V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
662 V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
663 // TODO(conflux-tuning): add support for low-memory algorithms
664 self.select_primary_leg_min_rtt(false)
665 }
666 _ => {
667 // Default to MIN_RTT if we don't recognize the desired UX value
668 warn!(
669 tunnel_id = %self.tunnel_id,
670 "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
671 self.desired_ux
672 );
673 self.select_primary_leg_min_rtt(false)
674 }
675 }
676 }
677
678 /// Try to choose an initial primary leg, if we have an initial RTT measurement
679 /// for at least one of the legs.
680 #[cfg(feature = "conflux")]
681 fn maybe_select_init_primary(&mut self) {
682 let best = self
683 .legs
684 .iter()
685 .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
686 .min_by_key(|(_leg, rtt)| *rtt)
687 .map(|(leg, _rtt)| leg.unique_id());
688
689 if let Some(best) = best {
690 self.primary_id = best;
691 self.selected_init_primary = true;
692 }
693 }
694
695 /// Return the leg with the best (lowest) RTT.
696 ///
697 /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
698 ///
699 /// Returns `None` if no suitable leg was found.
700 #[cfg(feature = "conflux")]
701 fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
702 let mut best = None;
703
704 for circ in self.legs.iter() {
705 let leg_id = circ.unique_id();
706 let join_point = self.join_point_hop(circ)?;
707 let ccontrol = join_point.ccontrol();
708
709 if check_can_send && !ccontrol.can_send() {
710 continue;
711 }
712
713 let rtt = ccontrol.rtt();
714 let init_rtt_usec = || {
715 circ.init_rtt()
716 .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
717 };
718
719 let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
720 return Err(internal!(
721 "attempted to select primary leg before handshake completed?!"
722 ));
723 };
724
725 match best.take() {
726 None => {
727 best = Some((leg_id, ewma_rtt));
728 }
729 Some(best_so_far) => {
730 if best_so_far.1 <= ewma_rtt {
731 best = Some(best_so_far);
732 } else {
733 best = Some((leg_id, ewma_rtt));
734 }
735 }
736 }
737 }
738
739 Ok(best.map(|(leg_id, _)| leg_id))
740 }
741
742 /// Returns `true` if our conflux join point is blocked on congestion control
743 /// on the specified `circuit`.
744 ///
745 /// Returns `false` if the join point is not blocked on cc,
746 /// or if this is a single-path set.
747 ///
748 /// Returns an error if this is a multipath tunnel,
749 /// but the joint point hop doesn't exist on the specified circuit.
750 #[cfg(feature = "conflux")]
751 fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
752 let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
753 internal!(
754 "Join point hop {} not found on circuit {}?!",
755 join_hop.display(),
756 circuit.unique_id(),
757 )
758 })?;
759
760 Ok(!join_circhop.ccontrol().can_send())
761 }
762
763 /// Returns the `HopNum` of the join point
764 /// if [`next_circ_action`](Self::next_circ_action)
765 /// should avoid polling the join point streams entirely.
766 #[cfg(feature = "conflux")]
767 fn should_skip_join_point(&self) -> Result<Option<HopNum>, Bug> {
768 let Some(primary_join_point) = self.primary_join_point() else {
769 // Single-path, there is no join point
770 return Ok(None);
771 };
772
773 let join_hop = primary_join_point.1;
774 let primary_blocked_on_cc = {
775 let primary = self
776 .leg(self.primary_id)
777 .ok_or_else(|| internal!("primary leg disappeared?!"))?;
778 Self::is_join_point_blocked_on_cc(join_hop, primary)?
779 };
780
781 if !primary_blocked_on_cc {
782 // Easy, we can just carry on
783 return Ok(None);
784 }
785
786 // Now, if the primary *is* blocked on cc, we may still be able to poll
787 // the join point streams (if we're using the right desired UX)
788 let exclude = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
789 // The primary leg is blocked on cc, and we can't switch because we're
790 // not using the high throughput algorithm, so we must stop reading
791 // the join point streams.
792 //
793 // Note: if the selected algorithm is HIGH_THROUGHPUT,
794 // it's okay to continue reading from the edge connection,
795 // because maybe_update_primary_leg() will select a new,
796 // non-blocked primary leg, just before sending.
797 trace!(
798 tunnel_id = %self.tunnel_id,
799 join_point = ?primary_join_point,
800 reason = "sending leg blocked on congestion control",
801 "Pausing join point stream reads"
802 );
803
804 Some(join_hop)
805 } else {
806 // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
807 // to an unblocked leg before sending any cells over the join point,
808 // as long as there are some unblocked legs.
809
810 // TODO: figure out how to rewrite this with an idiomatic iterator combinator
811 let mut all_blocked_on_cc = true;
812 for leg in &self.legs {
813 all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
814 if !all_blocked_on_cc {
815 break;
816 }
817 }
818
819 if all_blocked_on_cc {
820 // All legs are blocked on cc, so we must stop reading from
821 // the join point streams for now.
822 trace!(
823 tunnel_id = %self.tunnel_id,
824 join_point = ?primary_join_point,
825 reason = "all legs blocked on congestion control",
826 "Pausing join point stream reads"
827 );
828
829 Some(join_hop)
830 } else {
831 // At least one leg is not blocked, so we can continue reading
832 // from the join point streams
833 None
834 }
835 };
836
837 Ok(exclude)
838 }
839
840 /// Returns the next ready [`CircuitAction`],
841 /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
842 ///
843 /// Will return an error if there are no circuits in this set,
844 /// or other internal errors occur.
845 ///
846 /// This is cancellation-safe.
847 #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
848 pub(super) fn next_circ_action<'a>(
849 &'a mut self,
850 runtime: &'a tor_rtcompat::DynTimeProvider,
851 ) -> Result<impl Future<Output = Result<CircuitAction, crate::Error>> + 'a, Bug> {
852 // Avoid polling the streams on the join point if our primary
853 // leg is blocked on cc
854 cfg_if::cfg_if! {
855 if #[cfg(feature = "conflux")] {
856 let exclude_hop = self.should_skip_join_point()?;
857 } else {
858 let exclude_hop: Option<HopNum> = None;
859 }
860 };
861
862 Ok(self.legs
863 .iter_mut()
864 .map(|leg| {
865 let unique_id = leg.unique_id();
866 let tunnel_id = self.tunnel_id;
867
868 // The client SHOULD abandon and close circuit if the LINKED message takes too long to
869 // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
870 // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
871 // implementation currently uses Circuit Build Timeout).
872 let conflux_hs_timeout = if leg.conflux_status() == Some(ConfluxStatus::Pending) {
873 if let Some(timeout) = leg.conflux_hs_timeout() {
874 // TODO: ask Diziet if we can have a sleep_until_instant() function
875 Box::pin(runtime.sleep_until_wallclock(timeout))
876 as Pin<Box<dyn Future<Output = ()> + Send>>
877 } else {
878 Box::pin(std::future::pending())
879 }
880 } else {
881 Box::pin(std::future::pending())
882 };
883
884 let mut ready_streams = leg.ready_streams_iterator(exclude_hop);
885 let input = &mut leg.input;
886 // TODO: we don't really need prepare_send_from here
887 // because the inner select_biased! is cancel-safe.
888 // We should replace this with a simple sink readiness check
889 let send_fut = leg.chan_sender.prepare_send_from(async move {
890 // A future to wait for the next ready stream.
891 let next_ready_stream = async {
892 match ready_streams.next().await {
893 Some(x) => x,
894 None => {
895 info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
896 // There are no ready streams (for example, they may all be
897 // blocked due to congestion control), so there is nothing
898 // to do.
899 // We await an infinitely pending future so that we don't
900 // immediately return a `None` in the `select_biased!` below.
901 // We'd rather wait on `input.next()` than immediately return with
902 // no `CircuitAction`, which could put the reactor into a spin loop.
903 let () = std::future::pending().await;
904 unreachable!();
905 }
906 }
907 };
908
909 // NOTE: the stream returned by this function is polled in the select_biased!
910 // from Reactor::run_once(), so each block from *this* select_biased! must be
911 // cancellation-safe
912 select_biased! {
913 // Check whether we've got an input message pending.
914 ret = input.next().fuse() => {
915 let Some(cell) = ret else {
916 return Ok(CircuitAction::RemoveLeg {
917 leg: unique_id,
918 reason: RemoveLegReason::ChannelClosed,
919 });
920 };
921
922 Ok(CircuitAction::HandleCell { leg: unique_id, cell })
923 },
924 ret = next_ready_stream.fuse() => {
925 let ret = ret.map(|cmd| {
926 Ok(CircuitAction::RunCmd { leg: unique_id, cmd })
927 });
928
929 flatten(ret)
930 },
931 }
932 });
933
934 let mut send_fut = Box::pin(send_fut);
935
936 async move {
937 select_biased! {
938 () = conflux_hs_timeout.fuse() => {
939 warn!(
940 tunnel_id = %tunnel_id,
941 circ_id = %unique_id,
942 "Conflux handshake timed out on circuit"
943 );
944
945 // Conflux handshake has timed out, time to remove this circuit leg,
946 // and notify the handshake initiator.
947 Ok(Ok(CircuitAction::RemoveLeg {
948 leg: unique_id,
949 reason: RemoveLegReason::ConfluxHandshakeTimeout,
950 }))
951 }
952 ret = send_fut => {
953 // Note: We don't actually use the returned SinkSendable,
954 // and continue writing to the SometimesUboundedSink in the reactor :(
955 ret.map(|ret| ret.0)
956 }
957 }
958 }
959 })
960 .collect::<FuturesUnordered<_>>()
961 // We only return the first ready action as a Future.
962 // Can't use `next()` since it borrows the stream.
963 .into_future()
964 .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
965 // Clean up the nested `Result`s before returning to the caller.
966 .map(|res| flatten(flatten(res))))
967 }
968
969 /// The join point on the current primary leg.
970 pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
971 self.join_point
972 .as_ref()
973 .map(|join_point| (self.primary_id, join_point.hop))
974 }
975
976 /// Does congestion control use stream SENDMEs for the given hop?
977 ///
978 /// Returns `None` if either the `leg` or `hop` don't exist.
979 pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
980 self.leg(leg)?.uses_stream_sendme(hop)
981 }
982
983 /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
984 ///
985 /// See [`Circuit::send_relay_cell`].
986 pub(super) async fn send_relay_cell_on_leg(
987 &mut self,
988 msg: SendRelayCell,
989 leg: Option<UniqId>,
990 ) -> crate::Result<()> {
991 let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
992 let leg = if let Some(join_point) = conflux_join_point {
993 // Conflux circuits always send multiplexed relay commands to
994 // to the last hop (the join point).
995 if cmd_counts_towards_seqno(msg.cell.cmd()) {
996 if msg.hop != join_point {
997 // For leaky pipe, we must continue using the original leg
998 leg
999 } else {
1000 let old_primary_leg = self.primary_id;
1001 // Check if it's time to switch our primary leg.
1002 #[cfg(feature = "conflux")]
1003 if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1004 trace!(
1005 old = ?old_primary_leg,
1006 new = ?self.primary_id,
1007 "Switching primary conflux leg..."
1008 );
1009
1010 self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1011 }
1012
1013 // Use the possibly updated primary leg
1014 Some(self.primary_id)
1015 }
1016 } else {
1017 // Non-multiplexed commands go on their original
1018 // circuit and hop
1019 leg
1020 }
1021 } else {
1022 // If there is no join point, it means this is not
1023 // a multi-path tunnel, so we continue using
1024 // the leg_id/hop the cmd came from.
1025 leg
1026 };
1027
1028 let leg = leg.unwrap_or(self.primary_id);
1029
1030 let circ = self
1031 .leg_mut(leg)
1032 .ok_or_else(|| internal!("leg disappeared?!"))?;
1033
1034 circ.send_relay_cell(msg).await
1035 }
1036
1037 /// Send a LINK cell down each unlinked leg.
1038 #[cfg(feature = "conflux")]
1039 pub(super) async fn link_circuits(
1040 &mut self,
1041 runtime: &tor_rtcompat::DynTimeProvider,
1042 ) -> crate::Result<()> {
1043 let (_leg_id, join_point) = self
1044 .primary_join_point()
1045 .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1046
1047 // Link all the circuits that haven't started the conflux handshake yet.
1048 for circ in self
1049 .legs
1050 .iter_mut()
1051 // TODO: it is an internal error if any of the legs don't have a conflux handler
1052 // (i.e. if conflux_status() returns None)
1053 .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1054 {
1055 let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1056 let link = ConfluxLink::new(v1_payload);
1057 let cell = AnyRelayMsgOuter::new(None, link.into());
1058
1059 circ.begin_conflux_link(join_point, cell, runtime).await?;
1060 }
1061
1062 // TODO(conflux): the caller should take care to not allow opening streams
1063 // until the conflux set is ready (i.e. until at least one of the legs completes
1064 // the handshake).
1065 //
1066 // We will probably need a channel for notifying the caller
1067 // of handshake completion/conflux set readiness
1068
1069 Ok(())
1070 }
1071
1072 /// Get the number of unlinked or non-conflux legs.
1073 #[cfg(feature = "conflux")]
1074 pub(super) fn num_unlinked(&self) -> usize {
1075 self.circuits()
1076 .filter(|circ| {
1077 let status = circ.conflux_status();
1078 status.is_none() || status == Some(ConfluxStatus::Unlinked)
1079 })
1080 .count()
1081 }
1082
1083 /// Check if the specified sequence number is the sequence number of the
1084 /// next message we're expecting to handle.
1085 pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1086 let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1087 seq_recv == last_seq_delivered + 1
1088 }
1089}
1090
1091/// Get the index of the specified element in `iterator`.
1092fn element_idx<'a>(
1093 mut iterator: impl Iterator<Item = &'a Circuit>,
1094 circ_id: UniqId,
1095) -> Option<usize> {
1096 iterator.position(|circ| circ.unique_id() == circ_id)
1097}
1098
1099// TODO: replace this with Itertools::exactly_one()?
1100//
1101/// Get the only item from an iterator.
1102///
1103/// Returns an error if the iterator is empty or has more than one item.
1104fn get_single<T>(mut iterator: impl Iterator<Item = T>) -> Result<T, NotSingleError> {
1105 let Some(rv) = iterator.next() else {
1106 return Err(NotSingleError::None);
1107 };
1108 if iterator.next().is_some() {
1109 return Err(NotSingleError::Multiple);
1110 }
1111 Ok(rv)
1112}
1113
1114/// An error returned from [`get_single`].
1115#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1116pub(super) enum NotSingleError {
1117 /// The iterator had no items.
1118 #[error("the iterator had no items")]
1119 None,
1120 /// The iterator had more than one item.
1121 #[error("the iterator had more than one item")]
1122 Multiple,
1123}
1124
1125/// An error returned when a method is expecting a single-leg conflux circuit,
1126/// but it is not single-leg.
1127#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1128pub(super) enum NotSingleLegError {
1129 /// Conflux set has no legs.
1130 #[error("the conflux set has no legs")]
1131 EmptyConfluxSet,
1132 /// Conflux set is multi-path.
1133 #[error("the conflux set is multi-path")]
1134 IsMultipath,
1135}
1136
1137impl From<NotSingleLegError> for Bug {
1138 fn from(e: NotSingleLegError) -> Self {
1139 into_bad_api_usage!("not a single leg conflux set")(e)
1140 }
1141}
1142
1143impl From<NotSingleLegError> for crate::Error {
1144 fn from(e: NotSingleLegError) -> Self {
1145 Self::from(Bug::from(e))
1146 }
1147}
1148
1149impl From<NotSingleLegError> for ReactorError {
1150 fn from(e: NotSingleLegError) -> Self {
1151 Self::from(Bug::from(e))
1152 }
1153}
1154
1155impl From<NotSingleError> for NotSingleLegError {
1156 fn from(e: NotSingleError) -> Self {
1157 match e {
1158 NotSingleError::None => Self::EmptyConfluxSet,
1159 NotSingleError::Multiple => Self::IsMultipath,
1160 }
1161 }
1162}
1163
1164/// Whether the specified `cmd` counts towards the conflux sequence numbers.
1165fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
1166 // Note: copy-pasted from c-tor
1167 match cmd {
1168 // These are all fine to multiplex, and must be so that ordering is preserved
1169 RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
1170
1171 // We can't multiplex these because they are circuit-specific
1172 RelayCmd::SENDME
1173 | RelayCmd::EXTEND
1174 | RelayCmd::EXTENDED
1175 | RelayCmd::TRUNCATE
1176 | RelayCmd::TRUNCATED
1177 | RelayCmd::DROP => false,
1178
1179 // We must multiplex RESOLVEs because their ordering impacts begin/end.
1180 RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
1181
1182 // These are all circuit-specific
1183 RelayCmd::BEGIN_DIR
1184 | RelayCmd::EXTEND2
1185 | RelayCmd::EXTENDED2
1186 | RelayCmd::ESTABLISH_INTRO
1187 | RelayCmd::ESTABLISH_RENDEZVOUS
1188 | RelayCmd::INTRODUCE1
1189 | RelayCmd::INTRODUCE2
1190 | RelayCmd::RENDEZVOUS1
1191 | RelayCmd::RENDEZVOUS2
1192 | RelayCmd::INTRO_ESTABLISHED
1193 | RelayCmd::RENDEZVOUS_ESTABLISHED
1194 | RelayCmd::INTRODUCE_ACK
1195 | RelayCmd::PADDING_NEGOTIATE
1196 | RelayCmd::PADDING_NEGOTIATED => false,
1197
1198 // Flow control cells must be ordered (see prop 329).
1199 RelayCmd::XOFF | RelayCmd::XON => true,
1200
1201 // These two are not multiplexed, because they must be processed immediately
1202 // to update sequence numbers before any other cells are processed on the circuit
1203 RelayCmd::CONFLUX_SWITCH
1204 | RelayCmd::CONFLUX_LINK
1205 | RelayCmd::CONFLUX_LINKED
1206 | RelayCmd::CONFLUX_LINKED_ACK => false,
1207
1208 _ => {
1209 tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1210 false
1211 }
1212 }
1213}
1214
1215#[cfg(test)]
1216mod test {
1217 // Tested in [`crate::tunnel::circuit::test`].
1218}