tor_proto/tunnel/reactor/conflux.rs
1//! Conflux-related functionality
2
3#[cfg(feature = "conflux")]
4mod msghandler;
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{self, AtomicU64};
9use std::sync::Arc;
10
11use futures::StreamExt;
12use futures::{select_biased, stream::FuturesUnordered, FutureExt as _};
13use itertools::Itertools as _;
14use slotmap_careful::SlotMap;
15use tor_rtcompat::SleepProviderExt as _;
16use tracing::warn;
17
18use tor_async_utils::SinkPrepareExt as _;
19use tor_basic_utils::flatten;
20use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd};
21use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
22use tor_linkspec::HasRelayIds as _;
23
24use crate::circuit::path::HopDetail;
25use crate::circuit::TunnelMutableState;
26use crate::crypto::cell::HopNum;
27use crate::tunnel::reactor::circuit::ConfluxStatus;
28use crate::tunnel::reactor::CircuitCmd;
29use crate::util::err::ReactorError;
30
31use super::circuit::CircHop;
32use super::{Circuit, CircuitAction, LegId, LegIdKey, RemoveLegReason};
33
34#[cfg(feature = "conflux")]
35use {
36 super::SendRelayCell,
37 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
38 tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
39};
40
41#[cfg(feature = "conflux")]
42pub(crate) use msghandler::{ConfluxAction, ConfluxMsgHandler, OooRelayMsg};
43
44/// A set with one or more circuits.
45///
46/// ### Conflux set life cycle
47///
48/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
49///
50/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
51///
52/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
53/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
54///
55/// The reactor can then turn the `ConfluxSet` into a multi-path set
56/// (a multi-path set is a conflux set that contains more than 1 circuit).
57/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
58/// by the reactor user (also referred to as the "conflux handshake initiator").
59/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
60///
61/// Circuits can be removed from the set using [`ConfluxSet::remove`].
62///
63/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
64/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
65/// This can happen on an explicit shutdown request, or if a fatal error occurs.
66///
67/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
68/// For example, if after being instructed to remove a circuit from the set
69/// using [`ConfluxSet::remove`], the set is completely depleted,
70/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
71/// which will cause the reactor to shut down.
72pub(super) struct ConfluxSet {
73 /// The circuits in this conflux set.
74 legs: SlotMap<LegIdKey, Circuit>,
75 /// Tunnel state, shared with `ClientCirc`.
76 ///
77 /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
78 mutable: Arc<TunnelMutableState>,
79 /// The unique identifier of the primary leg
80 primary_id: LegIdKey,
81 /// The join point of the set, if this is a multi-path set.
82 ///
83 /// Initially the conflux set starts out as a single-path set with no join point.
84 /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
85 /// the join point is initialized to the last hop in the tunnel.
86 //
87 // TODO(conflux): for simplicity, we currently we force all legs to have the same length,
88 // to ensure the HopNum of the join point is the same for all of them.
89 //
90 // In the future we might want to relax this restriction.
91 join_point: Option<JoinPoint>,
92 /// The nonce associated with the circuits from this set.
93 #[cfg(feature = "conflux")]
94 nonce: V1Nonce,
95 /// The desired UX
96 #[cfg(feature = "conflux")]
97 desired_ux: V1DesiredUx,
98 /// The absolute sequence number of the last cell delivered to a stream.
99 ///
100 /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
101 ///
102 /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
103 /// of the leg compares the (leg-local) sequence number of the message
104 /// with this sequence number to determine whether the message is in-order.
105 ///
106 /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
107 /// to deliver it to its corresponding stream.
108 ///
109 /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
110 /// to instruct the reactor to buffer the message.
111 last_seq_delivered: Arc<AtomicU64>,
112 /// Whether we have selected our initial primary leg,
113 /// if this is a multipath conflux set.
114 selected_init_primary: bool,
115}
116
117/// The conflux join point.
118#[derive(Debug, Clone)]
119struct JoinPoint {
120 /// The hop number.
121 hop: HopNum,
122 /// The [`HopDetail`] of the hop.
123 detail: HopDetail,
124}
125
126impl ConfluxSet {
127 /// Create a new conflux set, consisting of a single leg.
128 ///
129 /// Returns the newly created set and a reference to its [`TunnelMutableState`].
130 pub(super) fn new(circuit_leg: Circuit) -> (Self, Arc<TunnelMutableState>) {
131 let mut legs: SlotMap<LegIdKey, Circuit> = SlotMap::with_key();
132 let circ_mutable = Arc::clone(circuit_leg.mutable());
133 let unique_id = circuit_leg.unique_id();
134 let primary_id = legs.insert(circuit_leg);
135 // Note: the join point is only set for multi-path tunnels
136 let join_point = None;
137
138 // TODO(conflux): read this from the consensus/config.
139 #[cfg(feature = "conflux")]
140 let desired_ux = V1DesiredUx::NO_OPINION;
141
142 let mutable = Arc::new(TunnelMutableState::default());
143 mutable.insert(unique_id, primary_id.into(), circ_mutable);
144
145 let set = Self {
146 legs,
147 primary_id,
148 join_point,
149 mutable: mutable.clone(),
150 #[cfg(feature = "conflux")]
151 nonce: V1Nonce::new(&mut rand::rng()),
152 #[cfg(feature = "conflux")]
153 desired_ux,
154 last_seq_delivered: Arc::new(AtomicU64::new(0)),
155 selected_init_primary: false,
156 };
157
158 (set, mutable)
159 }
160
161 /// Remove and return the only leg of this conflux set.
162 ///
163 /// Returns an error if there is more than one leg in the set,
164 /// or if called before any circuit legs are available.
165 ///
166 /// Calling this function will empty the [`ConfluxSet`].
167 pub(super) fn take_single_leg(&mut self) -> Result<Circuit, NotSingleLegError> {
168 let circ = get_single(self.legs.remove(self.primary_id).into_iter())?;
169 Ok(circ)
170 }
171
172 /// Return a reference to the only leg of this conflux set,
173 /// along with the leg's ID.
174 ///
175 /// Returns an error if there is more than one leg in the set,
176 /// or if called before any circuit legs are available.
177 pub(super) fn single_leg(&self) -> Result<(LegId, &Circuit), NotSingleLegError> {
178 let (circ_id, circ) = get_single(self.legs.iter())?;
179 Ok((LegId(circ_id), circ))
180 }
181
182 /// Return a mutable reference to the only leg of this conflux set,
183 /// along with the leg's ID.
184 ///
185 /// Returns an error if there is more than one leg in the set,
186 /// or if called before any circuit legs are available.
187 pub(super) fn single_leg_mut(&mut self) -> Result<(LegId, &mut Circuit), NotSingleLegError> {
188 let (circ_id, circ) = get_single(self.legs.iter_mut())?;
189 Ok((LegId(circ_id), circ))
190 }
191
192 /// Return the primary leg of this conflux set.
193 ///
194 /// Returns an error if called before any circuit legs are available.
195 pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
196 // TODO(conflux): support more than one leg,
197 // and remove this check
198 if self.legs.len() > 1 {
199 return Err(internal!("multipath not currently supported"));
200 }
201
202 if self.legs.is_empty() {
203 Err(bad_api_usage!(
204 "tried to get circuit leg before creating it?!"
205 ))
206 } else {
207 // TODO(conflux): implement primary leg selection
208 let circ = self
209 .legs
210 .get_mut(self.primary_id)
211 .ok_or_else(|| internal!("slotmap is empty?!"))?;
212
213 Ok(circ)
214 }
215 }
216
217 /// Return a reference to the leg of this conflux set with the given id.
218 pub(super) fn leg(&self, leg_id: LegId) -> Option<&Circuit> {
219 self.legs.get(leg_id.0)
220 }
221
222 /// Return a mutable reference to the leg of this conflux set with the given id.
223 pub(super) fn leg_mut(&mut self, leg_id: LegId) -> Option<&mut Circuit> {
224 self.legs.get_mut(leg_id.0)
225 }
226
227 /// Return the LegId of the primary leg.
228 pub(super) fn primary_leg_id(&self) -> LegId {
229 LegId(self.primary_id)
230 }
231
232 /// Return the number of legs in this conflux set.
233 pub(super) fn len(&self) -> usize {
234 self.legs.len()
235 }
236
237 /// Return whether this conflux set is empty.
238 pub(super) fn is_empty(&self) -> bool {
239 self.legs.len() == 0
240 }
241
242 /// Remove the specified leg from this conflux set.
243 ///
244 /// Returns an error if the given leg doesn't exist in the set.
245 ///
246 /// Returns an error instructing the reactor to perform a clean shutdown
247 /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
248 ///
249 /// * the set is depleted (empty) after removing the specified leg
250 /// * `leg` is currently the sending (primary) leg of this set
251 /// * the closed leg had the highest non-zero last_seq_recv/sent
252 /// * the closed leg had some in-progress data (inflight > cc_sendme_inc)
253 ///
254 /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
255 ///
256 /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
257 pub(super) fn remove(&mut self, leg: LegIdKey) -> Result<Circuit, ReactorError> {
258 let circ = self
259 .legs
260 .remove(leg)
261 .ok_or_else(|| bad_api_usage!("leg {leg:?} not found in conflux set"))?;
262
263 self.mutable.remove(circ.unique_id());
264
265 if self.legs.is_empty() {
266 // The last circuit in the set has just died, so the reactor should exit.
267 return Err(ReactorError::Shutdown);
268 }
269
270 if leg == self.primary_id {
271 // We have just removed our sending leg,
272 // so it's time to close the entire conflux set.
273 return Err(ReactorError::Shutdown);
274 }
275
276 cfg_if::cfg_if! {
277 if #[cfg(feature = "conflux")] {
278 self.remove_conflux(circ)
279 } else {
280 // Conflux is disabled, so we can't possible continue running if the only
281 // leg in the tunnel is gone.
282 //
283 // Technically this should be unreachable (because of the is_empty()
284 // check above)
285 return Err(internal!("Multiple legs in single-path tunnel?!").into());
286 }
287 }
288 }
289
290 /// Handle the removal of a circuit,
291 /// returning an error if the reactor needs to shut down.
292 #[cfg(feature = "conflux")]
293 fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
294 let Some(status) = circ.conflux_status() else {
295 return Err(internal!("Found non-conflux circuit in conflux set?!").into());
296 };
297
298 // TODO(conflux): should the circmgr be notified about the leg removal?
299 //
300 // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
301 // is closed, subject to the limits in [SIDE_CHANNELS]."
302
303 // If we've reached this point and the conflux set is non-empty,
304 // it means it's a multi-path set.
305 //
306 // Time to check if we need to tear down the entire set.
307 match status {
308 ConfluxStatus::Unlinked => {
309 // This circuit hasn't yet begun the conflux handshake,
310 // so we can safely remove it from the set
311 Ok(circ)
312 }
313 ConfluxStatus::Pending | ConfluxStatus::Linked => {
314 let (circ_last_seq_recv, circ_last_seq_sent) =
315 (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
316
317 // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
318 if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
319 if circ_last_seq_recv > max_last_seq_recv {
320 return Err(ReactorError::Shutdown);
321 }
322 }
323
324 if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
325 if circ_last_seq_sent > max_last_seq_sent {
326 return Err(ReactorError::Shutdown);
327 }
328 }
329
330 let hop = self.join_point_hop(&circ)?;
331
332 let (inflight, cwnd) = (|| {
333 let ccontrol = hop.ccontrol();
334 let inflight = ccontrol.inflight()?;
335 let cwnd = ccontrol.cwnd()?;
336
337 Some((inflight, cwnd))
338 })()
339 .ok_or_else(|| {
340 internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
341 })?;
342
343 // If data is in progress on the leg (inflight > cc_sendme_inc),
344 // then all legs must be closed
345 if inflight >= cwnd.params().sendme_inc() {
346 return Err(ReactorError::Shutdown);
347 }
348
349 Ok(circ)
350 }
351 }
352 }
353
354 /// Return the maximum relative last_seq_recv across all circuits.
355 #[cfg(feature = "conflux")]
356 fn max_last_seq_recv(&self) -> Option<u64> {
357 self.legs
358 .iter()
359 .filter_map(|(_id, leg)| leg.last_seq_recv().ok())
360 .max()
361 }
362
363 /// Return the maximum relative last_seq_sent across all circuits.
364 #[cfg(feature = "conflux")]
365 fn max_last_seq_sent(&self) -> Option<u64> {
366 self.legs
367 .iter()
368 .filter_map(|(_id, leg)| leg.last_seq_sent().ok())
369 .max()
370 }
371
372 /// Get the [`CircHop`] of the join point on the specified `circ`,
373 /// returning an error if this is a single path conflux set.
374 fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
375 let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
376 return Err(internal!("No join point on conflux tunnel?!"));
377 };
378
379 circ.hop(join_point)
380 .ok_or_else(|| internal!("Conflux join point disappeared?!"))
381 }
382
383 /// Return an iterator of all circuits in the conflux set.
384 fn circuits(&self) -> impl Iterator<Item = &Circuit> {
385 self.legs.iter().map(|(_id, leg)| leg)
386 }
387
388 /// Add legs to the this conflux set.
389 ///
390 // TODO(conflux): update this with the latest changes from torspec!369
391 ///
392 /// Returns an error if any of the legs are invalid,
393 /// or if adding the legs would cause the conflux set to contain
394 /// any circuits that have the same hop in the middle and guard positions
395 /// (legs of the form `G1 - M1 - E` and `G2 - M1 - E`,
396 /// or `G1 - M1 -E` and `G1 - M2 - E ` aren't allowed to be part
397 /// of the same conflux set).
398 ///
399 /// A leg is considered valid if
400 ///
401 /// * the circuit has the same length as all the other circuits in the set
402 /// * its last hop is equal to the designated join point
403 /// * the circuit has no streams attached to any of its hops
404 /// * the circuit is not already part of a conflux set
405 ///
406 /// Note: the circuits will not begin linking until
407 /// [`link_circuits`](Self::link_circuits) is called.
408 #[cfg(feature = "conflux")]
409 pub(super) fn add_legs(
410 &mut self,
411 legs: Vec<Circuit>,
412 runtime: &tor_rtcompat::DynTimeProvider,
413 ) -> Result<(), Bug> {
414 if legs.is_empty() {
415 return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
416 }
417
418 let join_point = match self.join_point.take() {
419 Some(p) => {
420 // Preserve the existing join point, if there is one.
421 p
422 }
423 None => {
424 let (hop, detail) = (|| {
425 let first_leg = legs.first()?;
426 let all_hops = first_leg.path().all_hops();
427 let hop = first_leg.last_hop_num()?;
428 let detail = all_hops.last()?;
429 Some((hop, detail.clone()))
430 })()
431 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
432
433 JoinPoint { hop, detail }
434 }
435 };
436
437 // Check two HopDetails for equality.
438 let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
439 match (h1, h2) {
440 (HopDetail::Relay(t1), HopDetail::Relay(ref t2)) => t1.same_relay_ids(t2),
441 (HopDetail::Virtual, HopDetail::Virtual) => {
442 // TODO(conflux): HopDetail::Virtual are always considered equal,
443 // but that's not exactly right. We should resolve the TODO
444 // from HopDetail::Virtual, and store some additional context
445 // for differentiating virtual hops
446 true
447 }
448 _ => false,
449 }
450 };
451
452 // Check if the last hop of leg is the same as the one from the first hop.
453 let hop_eq_joint_point = |hop: Option<&HopDetail>| {
454 hop.map(|hop| hops_eq(hop, &join_point.detail))
455 .unwrap_or_default()
456 };
457
458 // A leg is considered valid if
459 //
460 // * the circuit has the expected length
461 // (the length of the first circuit we added to the set)
462 // * its last hop is equal to the designated join point
463 // (the last hop of the first circuit we added)
464 // * the circuit has no streams attached to any of its hops
465 // * the circuit is not already part of a conflux tunnel
466 let leg_is_valid = |leg: &Circuit| {
467 leg.last_hop_num() == Some(join_point.hop)
468 && hop_eq_joint_point(leg.path().all_hops().last())
469 && !leg.has_streams()
470 && leg.conflux_status().is_none()
471 };
472
473 if !legs.iter().all(leg_is_valid) {
474 return Err(bad_api_usage!("one more more conflux circuits are invalid"));
475 }
476
477 let check_legs_disjoint = |(leg1, leg2): (&Circuit, &Circuit)| {
478 // TODO(conflux): add a new Path API for getting an iterator over HopDetail.
479 let path1 = leg1.path().all_hops();
480 let path1_except_last = path1.iter().dropping_back(1);
481 let path2 = leg2.path().all_hops();
482 let path2_except_last = path2.iter().dropping_back(1);
483
484 // At this point we've already validated the lengths of the new legs,
485 // so we know they all have the same length.
486 path1_except_last
487 .zip(path2_except_last)
488 .all(|(h1, h2)| !hops_eq(h1, h2))
489 };
490
491 // TODO(conflux): reduce unnecessary iteration over `legs`
492 // without hurting readability
493
494 // Ensure the legs don't share guard or middle relays
495 //
496 // TODO(conflux): is this right?
497 // It means we allow legs of the form
498 //
499 // N1 --- N2 ----
500 // \
501 // E
502 // N2 --- N1-----/
503 //
504 // But I think that's alright.
505 if !self
506 .circuits()
507 .chain(legs.iter())
508 .cartesian_product(legs.iter())
509 .all(check_legs_disjoint)
510 {
511 return Err(bad_api_usage!(
512 "conflux circuits must not share hops in the same position"
513 ));
514 }
515
516 // Select a join point, or put the existing one back into self.
517 self.join_point = Some(join_point.clone());
518
519 // The legs are valid, so add them to the set.
520 for mut circ in legs {
521 let conflux_handler = ConfluxMsgHandler::new_client(
522 join_point.hop,
523 self.nonce,
524 Arc::clone(&self.last_seq_delivered),
525 runtime.clone(),
526 );
527
528 circ.install_conflux_handler(conflux_handler);
529 let mutable = Arc::clone(circ.mutable());
530 let unique_id = circ.unique_id();
531 let leg_id = self.legs.insert(circ);
532 // Merge the mutable state of the circuit into our tunnel state.
533 self.mutable.insert(unique_id, leg_id.into(), mutable);
534 }
535
536 Ok(())
537 }
538
539 /// Try to update the primary leg based on the configured desired UX,
540 /// if needed.
541 ///
542 /// Returns the SWITCH cell to send on the primary leg,
543 /// if we switched primary leg.
544 #[cfg(feature = "conflux")]
545 pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
546 use tor_error::into_internal;
547
548 let Some(join_point) = self.join_point.as_ref() else {
549 // Return early if this is not a multi-path tunnel
550 return Ok(None);
551 };
552
553 let join_point = join_point.hop;
554
555 if !self.should_update_primary_leg() {
556 // Nothing to do
557 return Ok(None);
558 }
559
560 let Some(new_primary_id) = self.select_primary_leg()? else {
561 // None of the legs satisfy our UX requirements, continue using the existing one.
562 return Ok(None);
563 };
564
565 // Check that the newly selected leg is actually different from the previous
566 if self.primary_id == new_primary_id {
567 // The primary leg stays the same, nothing to do.
568 return Ok(None);
569 }
570
571 let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
572 self.primary_id = new_primary_id;
573 let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
574
575 // If this fails, it means we haven't updated our primary leg in a very long time.
576 //
577 // TODO(conflux): there are currently no safeguards to prevent us from staying
578 // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
579 // such that it forces us to switch legs periodically, to prevent the seqno delta from
580 // getting too big?
581 let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
582 into_internal!("Seqno delta for switch does not fit in u32?!"),
583 )?;
584
585 let switch = ConfluxSwitch::new(seqno_delta);
586 let cell = AnyRelayMsgOuter::new(None, switch.into());
587 Ok(Some(SendRelayCell {
588 hop: join_point,
589 early: false,
590 cell,
591 }))
592 }
593
594 /// Whether it's time to select a new primary leg.
595 #[cfg(feature = "conflux")]
596 fn should_update_primary_leg(&mut self) -> bool {
597 if !self.selected_init_primary {
598 self.maybe_select_init_primary();
599 return false;
600 }
601
602 // If we don't have at least 2 legs,
603 // we can't switch our primary leg.
604 if self.legs.len() < 2 {
605 return false;
606 }
607
608 // TODO(conflux-tuning): if it turns out we switch legs too frequently,
609 // we might want to implement some sort of rate-limiting here
610 // (see c-tor's conflux_can_switch).
611
612 true
613 }
614
615 /// Return the best leg according to the configured desired UX.
616 ///
617 /// Returns `None` if no suitable was found.
618 #[cfg(feature = "conflux")]
619 fn select_primary_leg(&self) -> Result<Option<LegIdKey>, Bug> {
620 match self.desired_ux {
621 V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
622 self.select_primary_leg_min_rtt(false)
623 }
624 V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
625 V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
626 // TODO(conflux-tuning): add support for low-memory algorithms
627 self.select_primary_leg_min_rtt(false)
628 }
629 _ => {
630 // Default to MIN_RTT if we don't recognized the desired UX value
631 warn!(
632 "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
633 self.desired_ux
634 );
635 self.select_primary_leg_min_rtt(false)
636 }
637 }
638 }
639
640 /// Try to choose an initial primary leg, if we have an initial RTT measurement
641 /// for at least one of the legs.
642 #[cfg(feature = "conflux")]
643 fn maybe_select_init_primary(&mut self) {
644 let best = self
645 .legs
646 .iter()
647 .filter_map(|(id, leg)| leg.init_rtt().map(|rtt| (LegId(id), rtt)))
648 .min_by_key(|(_leg_id, rtt)| *rtt)
649 .map(|(leg_id, _rtt)| leg_id.0);
650
651 if let Some(best) = best {
652 self.primary_id = best;
653 self.selected_init_primary = true;
654 }
655 }
656
657 /// Return the leg with the best (lowest) RTT.
658 ///
659 /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
660 ///
661 /// Returns `None` if no suitable was found.
662 #[cfg(feature = "conflux")]
663 fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<LegIdKey>, Bug> {
664 let mut best = None;
665
666 for (leg_id, circ) in self.legs.iter() {
667 let join_point = self.join_point_hop(circ)?;
668 let ccontrol = join_point.ccontrol();
669
670 if check_can_send && !ccontrol.can_send() {
671 continue;
672 }
673
674 let rtt = ccontrol.rtt();
675 let ewma_rtt = rtt.ewma_rtt_usec();
676
677 match best.take() {
678 None => {
679 best = Some((leg_id, ewma_rtt));
680 }
681 Some(best_so_far) => {
682 if best_so_far.1 < ewma_rtt {
683 best = Some(best_so_far);
684 } else {
685 best = Some((leg_id, ewma_rtt));
686 }
687 }
688 }
689 }
690
691 Ok(best.map(|(leg_id, _)| leg_id))
692 }
693
694 /// Returns the next ready [`CircuitAction`],
695 /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
696 ///
697 /// Will return an error if there are no circuits in this set,
698 /// or other internal errors occur.
699 ///
700 /// This is cancellation-safe.
701 pub(super) fn next_circ_action<'a>(
702 &'a mut self,
703 runtime: &'a tor_rtcompat::DynTimeProvider,
704 ) -> impl Future<Output = Result<CircuitAction, crate::Error>> + 'a {
705 self.legs
706 .iter_mut()
707 .map(|(leg_id, leg)| {
708 // The client SHOULD abandon and close circuit if the LINKED message takes too long to
709 // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
710 // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
711 // implementation currently uses Circuit Build Timeout).
712 let conflux_hs_timeout = if let Some(timeout) = leg.conflux_hs_timeout() {
713 // TODO: ask Diziet if we can have a sleep_until_instant() function
714 Box::pin(runtime.sleep_until_wallclock(timeout)) as Pin<Box<dyn Future<Output = ()> + Send>>
715 } else {
716 Box::pin(std::future::pending())
717 };
718
719 let mut ready_streams = leg.ready_streams_iterator();
720 let input = &mut leg.input;
721 let primary_id = self.primary_id;
722 let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
723 // TODO: we don't really need prepare_send_from here
724 // because the inner select_biased! is cancel-safe.
725 // We should replace this with a simple sink readiness check
726 let send_fut = leg.chan_sender.prepare_send_from(async move {
727 // A future to wait for the next ready stream.
728 let next_ready_stream = async {
729 match ready_streams.next().await {
730 Some(x) => x,
731 None => {
732 // There are no ready streams (for example, they may all be
733 // blocked due to congestion control), so there is nothing
734 // to do.
735 // We await an infinitely pending future so that we don't
736 // immediately return a `None` in the `select_biased!` below.
737 // We'd rather wait on `input.next()` than immediately return with
738 // no `CircuitAction`, which could put the reactor into a spin loop.
739 let () = std::future::pending().await;
740 unreachable!();
741 }
742 }
743 };
744
745 // NOTE: the stream returned by this function is polled in the select_biased!
746 // from Reactor::run_once(), so each block from *this* select_biased! must be
747 // cancellation-safe
748 select_biased! {
749 // Check whether we've got an input message pending.
750 ret = input.next().fuse() => {
751 let Some(cell) = ret else {
752 return Ok(CircuitAction::RemoveLeg {
753 leg: leg_id,
754 reason: RemoveLegReason::ChannelClosed,
755 });
756 };
757
758 Ok(CircuitAction::HandleCell { leg: leg_id, cell })
759 },
760 ret = next_ready_stream.fuse() => {
761 let ret = ret.map(|cmd| {
762 // TODO(conflux): refactor this spaghetti
763 let leg = if let Some(join_point) = conflux_join_point {
764 match &cmd {
765 CircuitCmd::Send(send) => {
766 // Conflux circuits always send multiplexed relay commands to
767 // to the last hop (the join point).
768 if cmd_counts_towards_seqno(send.cell.cmd()) {
769 if send.hop != join_point {
770 return Err(crate::Error::Bug(internal!(
771 "Leaky pipe on conflux circuit?! (target_hop={}, join_point={})",
772 send.hop.display(),
773 join_point.display(),
774 )));
775 }
776 // TODO(conflux): validate the hop? We should
777 // ensure the target hop is the join point, and
778 // error otherwise?
779
780 primary_id
781 } else {
782 // Non-multiplexed commands go on their original
783 // circuit and hop
784 leg_id
785 }
786 }
787 // The leg_id doesn't need to change (or doesn't matter)
788 // for these other commands.
789 CircuitCmd::HandleSendMe { .. } | CircuitCmd::CloseStream { .. }
790 | CircuitCmd::CleanShutdown => leg_id,
791 #[cfg(feature = "conflux")]
792 CircuitCmd::ConfluxRemove(_) | CircuitCmd::ConfluxHandshakeComplete (_) | CircuitCmd::Enqueue(_) => leg_id,
793 }
794 } else {
795 // If there is no join point, it means this is not
796 // a multi-path tunnel, so we continue using
797 // the leg_id/hop the cmd came from.
798 leg_id
799 };
800
801 Ok(CircuitAction::RunCmd { leg, cmd })
802 });
803
804 flatten(ret)
805 },
806 }
807 });
808
809 let mut send_fut = Box::pin(send_fut);
810
811 async move {
812 select_biased! {
813 () = conflux_hs_timeout.fuse() => {
814 // Conflux handshake has timed out, time to remove this circuit leg,
815 // and notify the handshake initiator.
816 Ok(Ok(CircuitAction::RemoveLeg {
817 leg: leg_id,
818 reason: RemoveLegReason::ConfluxHandshakeTimeout,
819 }))
820 }
821 ret = send_fut => {
822 // Note: We don't actually use the returned SinkSendable,
823 // and continue writing to the SometimesUboundedSink in the reactor :(
824 ret.map(|ret| ret.0)
825 }
826 }
827 }
828 })
829 .collect::<FuturesUnordered<_>>()
830 // We only return the first ready action as a Future.
831 // Can't use `next()` since it borrows the stream.
832 .into_future()
833 .map(|(next, _)| next.ok_or(internal!("empty conflux set").into()))
834 // Clean up the nested `Result`s before returning to the caller.
835 .map(|res| flatten(flatten(res)))
836 }
837
838 /// The join point on the current primary leg.
839 pub(super) fn primary_join_point(&self) -> Option<(LegId, HopNum)> {
840 self.join_point
841 .as_ref()
842 .map(|join_point| (LegId(self.primary_id), join_point.hop))
843 }
844
845 /// Does congestion control use stream SENDMEs for the given hop?
846 ///
847 /// Returns `None` if either the `leg` or `hop` don't exist.
848 pub(super) fn uses_stream_sendme(&self, leg: LegId, hop: HopNum) -> Option<bool> {
849 self.leg(leg)?.uses_stream_sendme(hop)
850 }
851
852 /// Send a LINK cell down each unlinked leg.
853 #[cfg(feature = "conflux")]
854 pub(super) async fn link_circuits(
855 &mut self,
856 runtime: &tor_rtcompat::DynTimeProvider,
857 ) -> crate::Result<()> {
858 let (_leg_id, join_point) = self
859 .primary_join_point()
860 .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
861
862 // Link all the circuits that haven't started the conflux handshake yet.
863 for (_, circ) in self
864 .legs
865 .iter_mut()
866 // TODO: it is an internal error if any of the legs don't have a conflux handler
867 // (i.e. if conflux_status() returns None)
868 .filter(|(_, circ)| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
869 {
870 let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
871 let link = ConfluxLink::new(v1_payload);
872 let cell = AnyRelayMsgOuter::new(None, link.into());
873
874 circ.begin_conflux_link(join_point, cell, runtime).await?;
875 }
876
877 // TODO(conflux): the caller should take care to not allow opening streams
878 // until the conflux set is ready (i.e. until at least one of the legs completes
879 // the handshake).
880 //
881 // We will probably need a channel for notifying the caller
882 // of handshake completion/conflux set readiness
883
884 Ok(())
885 }
886
887 /// Check if the specified sequence number is the sequence number of the
888 /// next message we're expecting to handle.
889 pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
890 let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
891 seq_recv == last_seq_delivered + 1
892 }
893}
894
895// TODO(conflux): replace this with Itertools::exactly_one()?
896//
897/// Get the only item from an iterator.
898///
899/// Returns an error if the iterator is empty or has more than one item.
900fn get_single<T>(mut iterator: impl Iterator<Item = T>) -> Result<T, NotSingleError> {
901 let Some(rv) = iterator.next() else {
902 return Err(NotSingleError::None);
903 };
904 if iterator.next().is_some() {
905 return Err(NotSingleError::Multiple);
906 }
907 Ok(rv)
908}
909
910/// An error returned from [`get_single`].
911#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
912pub(super) enum NotSingleError {
913 /// The iterator had no items.
914 #[error("the iterator had no items")]
915 None,
916 /// The iterator had more than one item.
917 #[error("the iterator had more than one item")]
918 Multiple,
919}
920
921/// An error returned when a method is expecting a single-leg conflux circuit,
922/// but it is not single-leg.
923#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
924pub(super) enum NotSingleLegError {
925 /// Conflux set has no legs.
926 #[error("the conflux set has no legs")]
927 EmptyConfluxSet,
928 /// Conflux set is multi-path.
929 #[error("the conflux set is multi-path")]
930 IsMultipath,
931}
932
933impl From<NotSingleLegError> for Bug {
934 fn from(e: NotSingleLegError) -> Self {
935 into_bad_api_usage!("not a single leg conflux set")(e)
936 }
937}
938
939impl From<NotSingleLegError> for crate::Error {
940 fn from(e: NotSingleLegError) -> Self {
941 Self::from(Bug::from(e))
942 }
943}
944
945impl From<NotSingleLegError> for ReactorError {
946 fn from(e: NotSingleLegError) -> Self {
947 Self::from(Bug::from(e))
948 }
949}
950
951impl From<NotSingleError> for NotSingleLegError {
952 fn from(e: NotSingleError) -> Self {
953 match e {
954 NotSingleError::None => Self::EmptyConfluxSet,
955 NotSingleError::Multiple => Self::IsMultipath,
956 }
957 }
958}
959
960/// Whether the specified `cmd` counts towards the conflux sequence numbers.
961fn cmd_counts_towards_seqno(cmd: RelayCmd) -> bool {
962 // Note: copy-pasted from c-tor
963 match cmd {
964 // These are all fine to multiplex, and must be so that ordering is preserved
965 RelayCmd::BEGIN | RelayCmd::DATA | RelayCmd::END | RelayCmd::CONNECTED => true,
966
967 // We can't multiplex these because they are circuit-specific
968 RelayCmd::SENDME
969 | RelayCmd::EXTEND
970 | RelayCmd::EXTENDED
971 | RelayCmd::TRUNCATE
972 | RelayCmd::TRUNCATED
973 | RelayCmd::DROP => false,
974
975 // We must multiplex RESOLVEs because their ordering impacts begin/end.
976 RelayCmd::RESOLVE | RelayCmd::RESOLVED => true,
977
978 // These are all circuit-specific
979 RelayCmd::BEGIN_DIR
980 | RelayCmd::EXTEND2
981 | RelayCmd::EXTENDED2
982 | RelayCmd::ESTABLISH_INTRO
983 | RelayCmd::ESTABLISH_RENDEZVOUS
984 | RelayCmd::INTRODUCE1
985 | RelayCmd::INTRODUCE2
986 | RelayCmd::RENDEZVOUS1
987 | RelayCmd::RENDEZVOUS2
988 | RelayCmd::INTRO_ESTABLISHED
989 | RelayCmd::RENDEZVOUS_ESTABLISHED
990 | RelayCmd::INTRODUCE_ACK
991 | RelayCmd::PADDING_NEGOTIATE
992 | RelayCmd::PADDING_NEGOTIATED => false,
993
994 // TODO(cc): XOFF/XON
995
996 // These two are not multiplexed, because they must be processed immediately
997 // to update sequence numbers before any other cells are processed on the circuit
998 RelayCmd::CONFLUX_SWITCH
999 | RelayCmd::CONFLUX_LINK
1000 | RelayCmd::CONFLUX_LINKED
1001 | RelayCmd::CONFLUX_LINKED_ACK => false,
1002
1003 _ => {
1004 tracing::warn!("Conflux asked to multiplex unknown relay command {cmd}");
1005 false
1006 }
1007 }
1008}