tor_proto/client/reactor/conflux.rs
1//! Conflux-related functionality
2
3// TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
4//
5// See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
6#![allow(unstable_name_collisions)]
7
8#[cfg(feature = "conflux")]
9pub(crate) mod msghandler;
10
11use std::pin::Pin;
12use std::sync::atomic::{self, AtomicU64};
13use std::sync::{Arc, Mutex};
14
15use futures::{FutureExt as _, StreamExt, select_biased};
16use itertools::Itertools;
17use itertools::structs::ExactlyOneError;
18use smallvec::{SmallVec, smallvec};
19use tor_rtcompat::{SleepProvider as _, SleepProviderExt as _};
20use tracing::{info, instrument, trace, warn};
21
22use tor_cell::relaycell::AnyRelayMsgOuter;
23use tor_error::{Bug, bad_api_usage, internal};
24use tor_linkspec::HasRelayIds as _;
25
26use crate::circuit::UniqId;
27use crate::circuit::circhop::SendRelayCell;
28use crate::client::circuit::TunnelMutableState;
29#[cfg(feature = "circ-padding")]
30use crate::client::circuit::padding::PaddingEvent;
31use crate::client::circuit::path::HopDetail;
32use crate::conflux::cmd_counts_towards_seqno;
33use crate::conflux::msghandler::{ConfluxStatus, RemoveLegReason};
34use crate::congestion::params::CongestionWindowParams;
35use crate::crypto::cell::HopNum;
36use crate::streammap;
37use crate::tunnel::TunnelId;
38use crate::util::err::ReactorError;
39use crate::util::poll_all::PollAll;
40use crate::util::tunnel_activity::TunnelActivity;
41
42use super::circuit::CircHop;
43use super::{Circuit, CircuitAction};
44
45#[cfg(feature = "conflux")]
46use {
47 crate::conflux::msghandler::ConfluxMsgHandler,
48 msghandler::ClientConfluxMsgHandler,
49 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
50 tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
51};
52
53/// The maximum number of conflux legs to store in the conflux set SmallVec.
54///
55/// Attempting to store more legs will cause the SmallVec to spill to the heap.
56///
57/// Note: this value was picked arbitrarily and may not be suitable.
58const MAX_CONFLUX_LEGS: usize = 16;
59
60/// The number of futures we add to the per-circuit [`PollAll`] future in
61/// [`ConfluxSet::next_circ_action`].
62///
63/// Used for the SmallVec size estimate;
64const NUM_CIRC_FUTURES: usize = 2;
65
66/// The expected number of circuit actions to be returned from
67/// [`ConfluxSet::next_circ_action`]
68const CIRC_ACTION_COUNT: usize = MAX_CONFLUX_LEGS * NUM_CIRC_FUTURES;
69
70/// A set with one or more circuits.
71///
72/// ### Conflux set life cycle
73///
74/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
75///
76/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
77///
78/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
79/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
80///
81/// The reactor can then turn the `ConfluxSet` into a multi-path set
82/// (a multi-path set is a conflux set that contains more than 1 circuit).
83/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
84/// by the reactor user (also referred to as the "conflux handshake initiator").
85/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
86///
87/// Circuits can be removed from the set using [`ConfluxSet::remove`].
88///
89/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
90/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
91/// This can happen on an explicit shutdown request, or if a fatal error occurs.
92///
93/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
94/// For example, if after being instructed to remove a circuit from the set
95/// using [`ConfluxSet::remove`], the set is completely depleted,
96/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
97/// which will cause the reactor to shut down.
98pub(super) struct ConfluxSet {
99 /// The unique identifier of the tunnel this conflux set belongs to.
100 ///
101 /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
102 /// that gets used for logging purposes.
103 tunnel_id: TunnelId,
104 /// The circuits in this conflux set.
105 legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
106 /// Tunnel state, shared with `ClientCirc`.
107 ///
108 /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
109 mutable: Arc<TunnelMutableState>,
110 /// The unique identifier of the primary leg
111 primary_id: UniqId,
112 /// The join point of the set, if this is a multi-path set.
113 ///
114 /// Initially the conflux set starts out as a single-path set with no join point.
115 /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
116 /// the join point is initialized to the last hop in the tunnel.
117 //
118 // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
119 // to ensure the HopNum of the join point is the same for all of them.
120 //
121 // In the future we might want to relax this restriction.
122 join_point: Option<JoinPoint>,
123 /// The nonce associated with the circuits from this set.
124 #[cfg(feature = "conflux")]
125 nonce: V1Nonce,
126 /// The desired UX
127 #[cfg(feature = "conflux")]
128 desired_ux: V1DesiredUx,
129 /// The absolute sequence number of the last cell delivered to a stream.
130 ///
131 /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
132 ///
133 /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
134 /// of the leg compares the (leg-local) sequence number of the message
135 /// with this sequence number to determine whether the message is in-order.
136 ///
137 /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
138 /// to deliver it to its corresponding stream.
139 ///
140 /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
141 /// to instruct the reactor to buffer the message.
142 last_seq_delivered: Arc<AtomicU64>,
143 /// Whether we have selected our initial primary leg,
144 /// if this is a multipath conflux set.
145 selected_init_primary: bool,
146}
147
148/// The conflux join point.
149#[derive(Clone, derive_more::Debug)]
150struct JoinPoint {
151 /// The hop number.
152 hop: HopNum,
153 /// The [`HopDetail`] of the hop.
154 detail: HopDetail,
155 /// The stream map of the joint point, shared with each circuit leg.
156 #[debug(skip)]
157 streams: Arc<Mutex<streammap::StreamMap>>,
158}
159
160impl ConfluxSet {
161 /// Create a new conflux set, consisting of a single leg.
162 ///
163 /// Returns the newly created set and a reference to its [`TunnelMutableState`].
164 pub(super) fn new(
165 tunnel_id: TunnelId,
166 circuit_leg: Circuit,
167 ) -> (Self, Arc<TunnelMutableState>) {
168 let primary_id = circuit_leg.unique_id();
169 let circ_mutable = Arc::clone(circuit_leg.mutable());
170 let legs = smallvec![circuit_leg];
171 // Note: the join point is only set for multi-path tunnels
172 let join_point = None;
173
174 // TODO(#2035): read this from the consensus/config.
175 #[cfg(feature = "conflux")]
176 let desired_ux = V1DesiredUx::NO_OPINION;
177
178 let mutable = Arc::new(TunnelMutableState::default());
179 mutable.insert(primary_id, circ_mutable);
180
181 let set = Self {
182 tunnel_id,
183 legs,
184 primary_id,
185 join_point,
186 mutable: mutable.clone(),
187 #[cfg(feature = "conflux")]
188 nonce: V1Nonce::new(&mut rand::rng()),
189 #[cfg(feature = "conflux")]
190 desired_ux,
191 last_seq_delivered: Arc::new(AtomicU64::new(0)),
192 selected_init_primary: false,
193 };
194
195 (set, mutable)
196 }
197
198 /// Remove and return the only leg of this conflux set.
199 ///
200 /// Returns an error if there is more than one leg in the set,
201 /// or if called before any circuit legs are available.
202 ///
203 /// Calling this function will empty the [`ConfluxSet`].
204 pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
205 let circ = self
206 .legs
207 .iter()
208 .exactly_one()
209 .map_err(NotSingleLegError::from)?;
210 let circ_id = circ.unique_id();
211
212 debug_assert!(circ_id == self.primary_id);
213
214 self.remove_unchecked(circ_id)
215 }
216
217 /// Return a reference to the only leg of this conflux set,
218 /// along with the leg's ID.
219 ///
220 /// Returns an error if there is more than one leg in the set,
221 /// or if called before any circuit legs are available.
222 pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
223 Ok(self.legs.iter().exactly_one()?)
224 }
225
226 /// Return a mutable reference to the only leg of this conflux set,
227 /// along with the leg's ID.
228 ///
229 /// Returns an error if there is more than one leg in the set,
230 /// or if called before any circuit legs are available.
231 pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
232 Ok(self.legs.iter_mut().exactly_one()?)
233 }
234
235 /// Return the primary leg of this conflux set.
236 ///
237 /// Returns an error if called before any circuit legs are available.
238 pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
239 #[cfg(not(feature = "conflux"))]
240 if self.legs.len() > 1 {
241 return Err(internal!(
242 "got multipath tunnel, but conflux feature is disabled?!"
243 ));
244 }
245
246 if self.legs.is_empty() {
247 Err(bad_api_usage!(
248 "tried to get circuit leg before creating it?!"
249 ))
250 } else {
251 let circ = self
252 .leg_mut(self.primary_id)
253 .ok_or_else(|| internal!("conflux set is empty?!"))?;
254
255 Ok(circ)
256 }
257 }
258
259 /// Return a reference to the leg of this conflux set with the given id.
260 pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
261 self.legs.iter().find(|circ| circ.unique_id() == leg_id)
262 }
263
264 /// Return a mutable reference to the leg of this conflux set with the given id.
265 pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
266 self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
267 }
268
269 /// Return the number of legs in this conflux set.
270 pub(super) fn len(&self) -> usize {
271 self.legs.len()
272 }
273
274 /// Return whether this conflux set is empty.
275 pub(super) fn is_empty(&self) -> bool {
276 self.legs.len() == 0
277 }
278
279 /// Remove the specified leg from this conflux set.
280 ///
281 /// Returns an error if the given leg doesn't exist in the set.
282 ///
283 /// Returns an error instructing the reactor to perform a clean shutdown
284 /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
285 ///
286 /// * the set is depleted (empty) after removing the specified leg
287 /// * `leg` is currently the sending (primary) leg of this set
288 /// * the closed leg had the highest non-zero last_seq_recv/sent
289 /// * the closed leg had some in-progress data (inflight > cc_sendme_inc)
290 ///
291 /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
292 ///
293 /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
294 #[instrument(level = "trace", skip_all)]
295 pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
296 let circ = self.remove_unchecked(leg)?;
297
298 tracing::trace!(
299 circ_id = %circ.unique_id(),
300 "Circuit removed from conflux set"
301 );
302
303 self.mutable.remove(circ.unique_id());
304
305 if self.legs.is_empty() {
306 // TODO: log the tunnel ID
307 tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
308
309 // The last circuit in the set has just died, so the reactor should exit.
310 return Err(ReactorError::Shutdown);
311 }
312
313 if leg == self.primary_id {
314 // We have just removed our sending leg,
315 // so it's time to close the entire conflux set.
316 return Err(ReactorError::Shutdown);
317 }
318
319 cfg_if::cfg_if! {
320 if #[cfg(feature = "conflux")] {
321 self.remove_conflux(circ)
322 } else {
323 // Conflux is disabled, so we can't possibly continue running if the only
324 // leg in the tunnel is gone.
325 //
326 // Technically this should be unreachable (because of the is_empty()
327 // check above)
328 Err(internal!("Multiple legs in single-path tunnel?!").into())
329 }
330 }
331 }
332
333 /// Handle the removal of a circuit,
334 /// returning an error if the reactor needs to shut down.
335 #[cfg(feature = "conflux")]
336 fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
337 let Some(status) = circ.conflux_status() else {
338 return Err(internal!("Found non-conflux circuit in conflux set?!").into());
339 };
340
341 // TODO(conflux): should the circmgr be notified about the leg removal?
342 //
343 // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
344 // is closed, subject to the limits in [SIDE_CHANNELS]."
345
346 // If we've reached this point and the conflux set is non-empty,
347 // it means it's a multi-path set.
348 //
349 // Time to check if we need to tear down the entire set.
350 match status {
351 ConfluxStatus::Unlinked => {
352 // This circuit hasn't yet begun the conflux handshake,
353 // so we can safely remove it from the set
354 Ok(circ)
355 }
356 ConfluxStatus::Pending | ConfluxStatus::Linked => {
357 let (circ_last_seq_recv, circ_last_seq_sent) =
358 (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
359
360 // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
361 if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
362 if circ_last_seq_recv > max_last_seq_recv {
363 return Err(ReactorError::Shutdown);
364 }
365 }
366
367 if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
368 if circ_last_seq_sent > max_last_seq_sent {
369 return Err(ReactorError::Shutdown);
370 }
371 }
372
373 let hop = self.join_point_hop(&circ)?;
374
375 let (inflight, cwnd) = (|| {
376 let ccontrol = hop.ccontrol();
377 let inflight = ccontrol.inflight()?;
378 let cwnd = ccontrol.cwnd()?;
379
380 Some((inflight, cwnd))
381 })()
382 .ok_or_else(|| {
383 internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
384 })?;
385
386 // If data is in progress on the leg (inflight > cc_sendme_inc),
387 // then all legs must be closed
388 if inflight >= cwnd.params().sendme_inc() {
389 return Err(ReactorError::Shutdown);
390 }
391
392 Ok(circ)
393 }
394 }
395 }
396
397 /// Return the maximum relative last_seq_recv across all circuits.
398 #[cfg(feature = "conflux")]
399 fn max_last_seq_recv(&self) -> Option<u64> {
400 self.legs
401 .iter()
402 .filter_map(|leg| leg.last_seq_recv().ok())
403 .max()
404 }
405
406 /// Return the maximum relative last_seq_sent across all circuits.
407 #[cfg(feature = "conflux")]
408 fn max_last_seq_sent(&self) -> Option<u64> {
409 self.legs
410 .iter()
411 .filter_map(|leg| leg.last_seq_sent().ok())
412 .max()
413 }
414
415 /// Get the [`CircHop`] of the join point on the specified `circ`,
416 /// returning an error if this is a single path conflux set.
417 fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
418 let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
419 return Err(internal!("No join point on conflux tunnel?!"));
420 };
421
422 circ.hop(join_point)
423 .ok_or_else(|| internal!("Conflux join point disappeared?!"))
424 }
425
426 /// Return an iterator of all circuits in the conflux set.
427 fn circuits(&self) -> impl Iterator<Item = &Circuit> {
428 self.legs.iter()
429 }
430
431 /// Return the most active [`TunnelActivity`] for any leg of this `ConfluxSet`.
432 pub(super) fn tunnel_activity(&self) -> TunnelActivity {
433 self.circuits()
434 .map(|c| c.hops.tunnel_activity())
435 .max()
436 .unwrap_or_else(TunnelActivity::never_used)
437 }
438
439 /// Add legs to the this conflux set.
440 ///
441 /// Returns an error if any of the legs is invalid.
442 ///
443 /// A leg is considered valid if
444 ///
445 /// * the circuit has the same length as all the other circuits in the set
446 /// * its last hop is equal to the designated join point
447 /// * the circuit has no streams attached to any of its hops
448 /// * the circuit is not already part of a conflux set
449 ///
450 /// Note: the circuits will not begin linking until
451 /// [`link_circuits`](Self::link_circuits) is called.
452 ///
453 /// IMPORTANT: this function does not prevent the construction of conflux sets
454 /// where the circuit legs share guard or middle relays. It is the responsibility
455 /// of the caller to enforce the following invariant from prop354:
456 ///
457 /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
458 /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
459 /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
460 ///
461 /// This is because at this level we don't actually know which relays are the guards,
462 /// so we can't know if the join point happens to be one of the Guard + Exit relays.
463 #[cfg(feature = "conflux")]
464 pub(super) fn add_legs(
465 &mut self,
466 legs: Vec<Circuit>,
467 runtime: &tor_rtcompat::DynTimeProvider,
468 ) -> Result<(), Bug> {
469 if legs.is_empty() {
470 return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
471 }
472
473 let join_point = match self.join_point.take() {
474 Some(p) => {
475 // Preserve the existing join point, if there is one.
476 p
477 }
478 None => {
479 let (hop, detail, streams) = (|| {
480 let first_leg = self.circuits().next()?;
481 let first_leg_path = first_leg.path();
482 let all_hops = first_leg_path.all_hops();
483 let hop_num = first_leg.last_hop_num()?;
484 let detail = all_hops.last()?;
485 let hop = first_leg.hop(hop_num)?;
486 let streams = Arc::clone(hop.stream_map());
487 Some((hop_num, detail.clone(), streams))
488 })()
489 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
490
491 JoinPoint {
492 hop,
493 detail,
494 streams,
495 }
496 }
497 };
498
499 // Check two HopDetails for equality.
500 //
501 // Returns an error if one of the hops is virtual.
502 let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
503 match (h1, h2) {
504 (HopDetail::Relay(t1), HopDetail::Relay(t2)) => Ok(t1.same_relay_ids(t2)),
505 #[cfg(feature = "hs-common")]
506 (HopDetail::Virtual, HopDetail::Virtual) => {
507 // TODO(#2016): support onion service conflux
508 Err(internal!("onion service conflux not supported"))
509 }
510 _ => Ok(false),
511 }
512 };
513
514 // A leg is considered valid if
515 //
516 // * the circuit has the expected length
517 // (the length of the first circuit we added to the set)
518 // * its last hop is equal to the designated join point
519 // (the last hop of the first circuit we added)
520 // * the circuit has no streams attached to any of its hops
521 // * the circuit is not already part of a conflux tunnel
522 //
523 // Returns an error if any hops are virtual.
524 let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
525 use crate::ccparams::Algorithm;
526
527 let path = leg.path();
528 let Some(last_hop) = path.all_hops().last() else {
529 // A circuit with no hops is invalid
530 return Ok(false);
531 };
532
533 // TODO: this sort of duplicates the check above.
534 // The difference is that above we read the hop detail
535 // information from the circuit Path, whereas here we get
536 // the actual last CircHop of the circuit.
537 let Some(last_hop_num) = leg.last_hop_num() else {
538 // A circuit with no hops is invalid
539 return Ok(false);
540 };
541
542 let circhop = leg
543 .hop(last_hop_num)
544 .ok_or_else(|| internal!("hop disappeared?!"))?;
545
546 // Ensure we negotiated a suitable cc algorithm
547 let is_cc_suitable = match circhop.ccontrol().algorithm() {
548 Algorithm::FixedWindow(_) => false,
549 Algorithm::Vegas(_) => true,
550 };
551
552 if !is_cc_suitable {
553 return Ok(false);
554 }
555
556 Ok(last_hop_num == join_point.hop
557 && hops_eq(last_hop, &join_point.detail)?
558 && !leg.has_streams()
559 && leg.conflux_status().is_none())
560 };
561
562 for leg in &legs {
563 if !leg_is_valid(leg)? {
564 return Err(bad_api_usage!("one more more conflux circuits are invalid"));
565 }
566 }
567
568 // Select a join point, or put the existing one back into self.
569 self.join_point = Some(join_point.clone());
570
571 // The legs are valid, so add them to the set.
572 for circ in legs {
573 let mutable = Arc::clone(circ.mutable());
574 let unique_id = circ.unique_id();
575 self.legs.push(circ);
576 // Merge the mutable state of the circuit into our tunnel state.
577 self.mutable.insert(unique_id, mutable);
578 }
579
580 let cwnd_params = self.cwnd_params()?;
581 for circ in self.legs.iter_mut() {
582 // The circuits that have a None status don't know they're part of
583 // a multi-path tunnel yet. They need to be initialized with a
584 // conflux message handler, and have their join point fixed up
585 // to share a stream map with the join point on all the other circuits.
586 if circ.conflux_status().is_none() {
587 let handler = Box::new(ClientConfluxMsgHandler::new(
588 join_point.hop,
589 self.nonce,
590 Arc::clone(&self.last_seq_delivered),
591 cwnd_params,
592 runtime.clone(),
593 ));
594 let conflux_handler =
595 ConfluxMsgHandler::new(handler, Arc::clone(&self.last_seq_delivered));
596
597 circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
598
599 // Ensure the stream map of the last hop is shared by all the legs
600 let last_hop = circ
601 .hop_mut(join_point.hop)
602 .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
603 last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
604 }
605 }
606
607 Ok(())
608 }
609
610 /// Get the [`CongestionWindowParams`] of the join point
611 /// on the first leg.
612 ///
613 /// Returns an error if the congestion control algorithm
614 /// doesn't have a congestion control window object,
615 /// or if the conflux set is empty, or the joint point hop
616 /// does not exist.
617 ///
618 // TODO: this function is a bit of a hack. In reality, we only
619 // need the cc_cwnd_init parameter (for SWITCH seqno validation).
620 // The fact that we obtain it from the cc params of the join point
621 // is an implementation detail (it's a workaround for the fact that
622 // at this point, these params can only obtained from a CircHop)
623 #[cfg(feature = "conflux")]
624 fn cwnd_params(&self) -> Result<CongestionWindowParams, Bug> {
625 let primary_leg = self
626 .leg(self.primary_id)
627 .ok_or_else(|| internal!("no primary leg?!"))?;
628 let join_point = self.join_point_hop(primary_leg)?;
629 let ccontrol = join_point.ccontrol();
630 let cwnd = ccontrol
631 .cwnd()
632 .ok_or_else(|| internal!("congestion control algorithm does not track the cwnd?!"))?;
633
634 Ok(*cwnd.params())
635 }
636
637 /// Try to update the primary leg based on the configured desired UX,
638 /// if needed.
639 ///
640 /// Returns the SWITCH cell to send on the primary leg,
641 /// if we switched primary leg.
642 #[cfg(feature = "conflux")]
643 pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
644 use tor_error::into_internal;
645
646 let Some(join_point) = self.join_point.as_ref() else {
647 // Return early if this is not a multi-path tunnel
648 return Ok(None);
649 };
650
651 let join_point = join_point.hop;
652
653 if !self.should_update_primary_leg() {
654 // Nothing to do
655 return Ok(None);
656 }
657
658 let Some(new_primary_id) = self.select_primary_leg()? else {
659 // None of the legs satisfy our UX requirements, continue using the existing one.
660 return Ok(None);
661 };
662
663 // Check that the newly selected leg is actually different from the previous
664 if self.primary_id == new_primary_id {
665 // The primary leg stays the same, nothing to do.
666 return Ok(None);
667 }
668
669 let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
670 self.primary_id = new_primary_id;
671 let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
672
673 // If this fails, it means we haven't updated our primary leg in a very long time.
674 //
675 // TODO(#2036): there are currently no safeguards to prevent us from staying
676 // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
677 // such that it forces us to switch legs periodically, to prevent the seqno delta from
678 // getting too big?
679 let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
680 into_internal!("Seqno delta for switch does not fit in u32?!"),
681 )?;
682
683 // We need to carry the last_seq_sent over to the next leg
684 // (the next cell sent will have seqno = prev_last_seq_sent + 1)
685 self.primary_leg_mut()?
686 .set_last_seq_sent(prev_last_seq_sent)?;
687
688 let switch = ConfluxSwitch::new(seqno_delta);
689 let cell = AnyRelayMsgOuter::new(None, switch.into());
690 Ok(Some(SendRelayCell {
691 hop: Some(join_point),
692 early: false,
693 cell,
694 }))
695 }
696
697 /// Whether it's time to select a new primary leg.
698 #[cfg(feature = "conflux")]
699 fn should_update_primary_leg(&mut self) -> bool {
700 if !self.selected_init_primary {
701 self.maybe_select_init_primary();
702 return false;
703 }
704
705 // If we don't have at least 2 legs,
706 // we can't switch our primary leg.
707 if self.legs.len() < 2 {
708 return false;
709 }
710
711 // TODO(conflux-tuning): if it turns out we switch legs too frequently,
712 // we might want to implement some sort of rate-limiting here
713 // (see c-tor's conflux_can_switch).
714
715 true
716 }
717
718 /// Return the best leg according to the configured desired UX.
719 ///
720 /// Returns `None` if no suitable leg was found.
721 #[cfg(feature = "conflux")]
722 fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
723 match self.desired_ux {
724 V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
725 self.select_primary_leg_min_rtt(false)
726 }
727 V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
728 V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
729 // TODO(conflux-tuning): add support for low-memory algorithms
730 self.select_primary_leg_min_rtt(false)
731 }
732 _ => {
733 // Default to MIN_RTT if we don't recognize the desired UX value
734 warn!(
735 tunnel_id = %self.tunnel_id,
736 "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
737 self.desired_ux
738 );
739 self.select_primary_leg_min_rtt(false)
740 }
741 }
742 }
743
744 /// Try to choose an initial primary leg, if we have an initial RTT measurement
745 /// for at least one of the legs.
746 #[cfg(feature = "conflux")]
747 fn maybe_select_init_primary(&mut self) {
748 let best = self
749 .legs
750 .iter()
751 .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
752 .min_by_key(|(_leg, rtt)| *rtt)
753 .map(|(leg, _rtt)| leg.unique_id());
754
755 if let Some(best) = best {
756 self.primary_id = best;
757 self.selected_init_primary = true;
758 }
759 }
760
761 /// Return the leg with the best (lowest) RTT.
762 ///
763 /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
764 ///
765 /// Returns `None` if no suitable leg was found.
766 #[cfg(feature = "conflux")]
767 fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
768 let mut best = None;
769
770 for circ in self.legs.iter() {
771 let leg_id = circ.unique_id();
772 let join_point = self.join_point_hop(circ)?;
773 let ccontrol = join_point.ccontrol();
774
775 if check_can_send && !ccontrol.can_send() {
776 continue;
777 }
778
779 let rtt = ccontrol.rtt();
780 let init_rtt_usec = || {
781 circ.init_rtt()
782 .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
783 };
784
785 let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
786 return Err(internal!(
787 "attempted to select primary leg before handshake completed?!"
788 ));
789 };
790
791 match best.take() {
792 None => {
793 best = Some((leg_id, ewma_rtt));
794 }
795 Some(best_so_far) => {
796 if best_so_far.1 <= ewma_rtt {
797 best = Some(best_so_far);
798 } else {
799 best = Some((leg_id, ewma_rtt));
800 }
801 }
802 }
803 }
804
805 Ok(best.map(|(leg_id, _)| leg_id))
806 }
807
808 /// Returns `true` if our conflux join point is blocked on congestion control
809 /// on the specified `circuit`.
810 ///
811 /// Returns `false` if the join point is not blocked on cc,
812 /// or if this is a single-path set.
813 ///
814 /// Returns an error if this is a multipath tunnel,
815 /// but the joint point hop doesn't exist on the specified circuit.
816 #[cfg(feature = "conflux")]
817 fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
818 let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
819 internal!(
820 "Join point hop {} not found on circuit {}?!",
821 join_hop.display(),
822 circuit.unique_id(),
823 )
824 })?;
825
826 Ok(!join_circhop.ccontrol().can_send())
827 }
828
829 /// Returns whether [`next_circ_action`](Self::next_circ_action)
830 /// should avoid polling the join point streams entirely.
831 #[cfg(feature = "conflux")]
832 fn should_skip_join_point(&self) -> Result<bool, Bug> {
833 let Some(primary_join_point) = self.primary_join_point() else {
834 // Single-path, there is no join point
835 return Ok(false);
836 };
837
838 let join_hop = primary_join_point.1;
839 let primary_blocked_on_cc = {
840 let primary = self
841 .leg(self.primary_id)
842 .ok_or_else(|| internal!("primary leg disappeared?!"))?;
843 Self::is_join_point_blocked_on_cc(join_hop, primary)?
844 };
845
846 if !primary_blocked_on_cc {
847 // Easy, we can just carry on
848 return Ok(false);
849 }
850
851 // Now, if the primary *is* blocked on cc, we may still be able to poll
852 // the join point streams (if we're using the right desired UX)
853 let should_skip = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
854 // The primary leg is blocked on cc, and we can't switch because we're
855 // not using the high throughput algorithm, so we must stop reading
856 // the join point streams.
857 //
858 // Note: if the selected algorithm is HIGH_THROUGHPUT,
859 // it's okay to continue reading from the edge connection,
860 // because maybe_update_primary_leg() will select a new,
861 // non-blocked primary leg, just before sending.
862 trace!(
863 tunnel_id = %self.tunnel_id,
864 join_point = ?primary_join_point,
865 reason = "sending leg blocked on congestion control",
866 "Pausing join point stream reads"
867 );
868
869 true
870 } else {
871 // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
872 // to an unblocked leg before sending any cells over the join point,
873 // as long as there are some unblocked legs.
874
875 // TODO: figure out how to rewrite this with an idiomatic iterator combinator
876 let mut all_blocked_on_cc = true;
877 for leg in &self.legs {
878 all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
879 if !all_blocked_on_cc {
880 break;
881 }
882 }
883
884 if all_blocked_on_cc {
885 // All legs are blocked on cc, so we must stop reading from
886 // the join point streams for now.
887 trace!(
888 tunnel_id = %self.tunnel_id,
889 join_point = ?primary_join_point,
890 reason = "all legs blocked on congestion control",
891 "Pausing join point stream reads"
892 );
893
894 true
895 } else {
896 // At least one leg is not blocked, so we can continue reading
897 // from the join point streams
898 false
899 }
900 };
901
902 Ok(should_skip)
903 }
904
905 /// Returns the next ready [`CircuitAction`],
906 /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
907 ///
908 /// Will return an error if there are no circuits in this set,
909 /// or other internal errors occur.
910 ///
911 /// This is cancellation-safe.
912 #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
913 #[instrument(level = "trace", skip_all)]
914 pub(super) async fn next_circ_action(
915 &mut self,
916 runtime: &tor_rtcompat::DynTimeProvider,
917 ) -> Result<SmallVec<[CircuitAction; CIRC_ACTION_COUNT]>, crate::Error> {
918 // Avoid polling the streams on the join point if our primary
919 // leg is blocked on cc
920 cfg_if::cfg_if! {
921 if #[cfg(feature = "conflux")] {
922 let mut should_poll_join_point = !self.should_skip_join_point()?;
923 } else {
924 let mut should_poll_join_point = true;
925 }
926 };
927 let join_point = self.primary_join_point().map(|join_point| join_point.1);
928
929 // Each circuit leg has a PollAll future (see poll_all_circ below)
930 // that drives two futures: one that reads from input channel,
931 // and another drives the application streams.
932 //
933 // *This* PollAll drives the PollAll futures of all circuit legs in lockstep,
934 // ensuring they all get a chance to make some progress on every reactor iteration.
935 //
936 // IMPORTANT: if you want to push additional futures into this,
937 // bear in mind that the ordering matters!
938 // If multiple futures resolve at the same time, their results will be processed
939 // in the order their corresponding futures were inserted into `PollAll`.
940 // So if futures A and B resolve at the same time, and future A was pushed
941 // into `PollAll` before future B, the result of future A will come
942 // before future B's result in the result list returned by poll_all.await.
943 //
944 // This means that the actions corresponding to the first circuit in the tunnel
945 // will be executed first, followed by the actions issued by the next circuit,
946 // and s on.
947 //
948 let mut poll_all =
949 PollAll::<MAX_CONFLUX_LEGS, SmallVec<[CircuitAction; NUM_CIRC_FUTURES]>>::new();
950
951 for leg in &mut self.legs {
952 let unique_id = leg.unique_id();
953 let tunnel_id = self.tunnel_id;
954 let runtime = runtime.clone();
955
956 // Garbage-collect all halfstreams that have expired.
957 //
958 // Note: this will iterate over the closed streams of all hops.
959 // If we think this will cause perf issues, one idea would be to make
960 // StreamMap::closed_streams into a min-heap, and add a branch to the
961 // select_biased! below to sleep until the first expiry is due
962 // (but my gut feeling is that iterating is cheaper)
963 leg.remove_expired_halfstreams(runtime.now());
964
965 // The client SHOULD abandon and close circuit if the LINKED message takes too long to
966 // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
967 // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
968 // implementation currently uses Circuit Build Timeout).
969 let conflux_hs_timeout = leg.conflux_hs_timeout();
970
971 let mut poll_all_circ = PollAll::<NUM_CIRC_FUTURES, CircuitAction>::new();
972
973 let input = leg.input.next().map(move |res| match res {
974 Some(cell) => CircuitAction::HandleCell {
975 leg: unique_id,
976 cell,
977 },
978 None => CircuitAction::RemoveLeg {
979 leg: unique_id,
980 reason: RemoveLegReason::ChannelClosed,
981 },
982 });
983 poll_all_circ.push(input);
984
985 // This future resolves when the chan_sender sink (i.e. the outgoing TCP connection)
986 // becomes ready. We need it inside the next_ready_stream future below,
987 // to prevent reading from the application streams before we are ready to send.
988 let chan_ready_fut = futures::future::poll_fn(|cx| {
989 use futures::Sink as _;
990
991 // Ensure the chan sender sink is ready before polling the ready streams.
992 Pin::new(&mut leg.chan_sender).poll_ready(cx)
993 });
994
995 let exclude_hop = if should_poll_join_point {
996 // Avoid polling the join point more than once per reactor loop.
997 should_poll_join_point = false;
998 None
999 } else {
1000 join_point
1001 };
1002
1003 let mut ready_streams = leg.hops.ready_streams_iterator(exclude_hop);
1004 let next_ready_stream = async move {
1005 // Avoid polling the application streams if the outgoing sink is blocked
1006 let _ = chan_ready_fut.await;
1007
1008 match ready_streams.next().await {
1009 Some(x) => x,
1010 None => {
1011 info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
1012 // There are no ready streams (for example, they may all be
1013 // blocked due to congestion control), so there is nothing
1014 // to do.
1015 // We await an infinitely pending future so that we don't
1016 // immediately return a `None` in the `select_biased!` below.
1017 // We'd rather wait on `input.next()` than immediately return with
1018 // no `CircuitAction`, which could put the reactor into a spin loop.
1019 let () = std::future::pending().await;
1020 unreachable!();
1021 }
1022 }
1023 };
1024
1025 poll_all_circ.push(next_ready_stream.map(move |cmd| CircuitAction::RunCmd {
1026 leg: unique_id,
1027 cmd,
1028 }));
1029
1030 let mut next_padding_event_fut = leg.padding_event_stream.next();
1031
1032 // This selects between 3 actions that cannot be handled concurrently.
1033 //
1034 // If the conflux handshake times out, we need to remove the circuit leg
1035 // (any pending padding events or application stream data should be discarded;
1036 // in fact, there shouldn't even be any open streams on circuits that are
1037 // in the conflux handshake phase).
1038 //
1039 // If there's a padding event, we need to handle it immediately,
1040 // because it might tell us to start blocking the chan_sender sink,
1041 // which, in turn, means we need to stop trying to read from the application streams.
1042 poll_all.push(
1043 async move {
1044 let conflux_hs_timeout = if let Some(timeout) = conflux_hs_timeout {
1045 // TODO: ask Diziet if we can have a sleep_until_instant() function
1046 Box::pin(runtime.sleep_until_wallclock(timeout))
1047 as Pin<Box<dyn Future<Output = ()> + Send>>
1048 } else {
1049 Box::pin(std::future::pending())
1050 };
1051 select_biased! {
1052 () = conflux_hs_timeout.fuse() => {
1053 warn!(
1054 tunnel_id = %tunnel_id,
1055 circ_id = %unique_id,
1056 "Conflux handshake timed out on circuit"
1057 );
1058
1059 // Conflux handshake has timed out, time to remove this circuit leg,
1060 // and notify the handshake initiator.
1061 smallvec![CircuitAction::RemoveLeg {
1062 leg: unique_id,
1063 reason: RemoveLegReason::ConfluxHandshakeTimeout,
1064 }]
1065 }
1066 padding_action = next_padding_event_fut => {
1067 smallvec![CircuitAction::PaddingAction {
1068 leg: unique_id,
1069 padding_action:
1070 padding_action.expect("PaddingEventStream, surprisingly, was terminated!"),
1071 }]
1072 }
1073 ret = poll_all_circ.fuse() => ret,
1074 }
1075 }
1076 );
1077 }
1078
1079 // Flatten the nested SmallVecs to simplify the calling code
1080 // (which will handle all the returned actions sequentially).
1081 Ok(poll_all.await.into_iter().flatten().collect())
1082 }
1083
1084 /// The join point on the current primary leg.
1085 pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
1086 self.join_point
1087 .as_ref()
1088 .map(|join_point| (self.primary_id, join_point.hop))
1089 }
1090
1091 /// Does congestion control use stream SENDMEs for the given hop?
1092 ///
1093 /// Returns `None` if either the `leg` or `hop` don't exist.
1094 pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1095 self.leg(leg)?.uses_stream_sendme(hop)
1096 }
1097
1098 /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
1099 ///
1100 /// See [`Circuit::send_relay_cell`].
1101 #[instrument(level = "trace", skip_all)]
1102 pub(super) async fn send_relay_cell_on_leg(
1103 &mut self,
1104 msg: SendRelayCell,
1105 leg: Option<UniqId>,
1106 ) -> crate::Result<()> {
1107 let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
1108 let leg = if let Some(join_point) = conflux_join_point {
1109 let hop = msg.hop.expect("missing hop in client SendRelayCell?!");
1110 // Conflux circuits always send multiplexed relay commands to
1111 // to the last hop (the join point).
1112 if cmd_counts_towards_seqno(msg.cell.cmd()) {
1113 if hop != join_point {
1114 // For leaky pipe, we must continue using the original leg
1115 leg
1116 } else {
1117 let old_primary_leg = self.primary_id;
1118 // Check if it's time to switch our primary leg.
1119 #[cfg(feature = "conflux")]
1120 if let Some(switch_cell) = self.maybe_update_primary_leg()? {
1121 trace!(
1122 old = ?old_primary_leg,
1123 new = ?self.primary_id,
1124 "Switching primary conflux leg..."
1125 );
1126
1127 self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
1128 }
1129
1130 // Use the possibly updated primary leg
1131 Some(self.primary_id)
1132 }
1133 } else {
1134 // Non-multiplexed commands go on their original
1135 // circuit and hop
1136 leg
1137 }
1138 } else {
1139 // If there is no join point, it means this is not
1140 // a multi-path tunnel, so we continue using
1141 // the leg_id/hop the cmd came from.
1142 leg
1143 };
1144
1145 let leg = leg.unwrap_or(self.primary_id);
1146
1147 let circ = self
1148 .leg_mut(leg)
1149 .ok_or_else(|| internal!("leg disappeared?!"))?;
1150
1151 circ.send_relay_cell(msg).await
1152 }
1153
1154 /// Send a LINK cell down each unlinked leg.
1155 #[cfg(feature = "conflux")]
1156 pub(super) async fn link_circuits(
1157 &mut self,
1158 runtime: &tor_rtcompat::DynTimeProvider,
1159 ) -> crate::Result<()> {
1160 let (_leg_id, join_point) = self
1161 .primary_join_point()
1162 .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
1163
1164 // Link all the circuits that haven't started the conflux handshake yet.
1165 for circ in self
1166 .legs
1167 .iter_mut()
1168 // TODO: it is an internal error if any of the legs don't have a conflux handler
1169 // (i.e. if conflux_status() returns None)
1170 .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
1171 {
1172 let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
1173 let link = ConfluxLink::new(v1_payload);
1174 let cell = AnyRelayMsgOuter::new(None, link.into());
1175
1176 circ.begin_conflux_link(join_point, cell, runtime).await?;
1177 }
1178
1179 // TODO(conflux): the caller should take care to not allow opening streams
1180 // until the conflux set is ready (i.e. until at least one of the legs completes
1181 // the handshake).
1182 //
1183 // We will probably need a channel for notifying the caller
1184 // of handshake completion/conflux set readiness
1185
1186 Ok(())
1187 }
1188
1189 /// Get the number of unlinked or non-conflux legs.
1190 #[cfg(feature = "conflux")]
1191 pub(super) fn num_unlinked(&self) -> usize {
1192 self.circuits()
1193 .filter(|circ| {
1194 let status = circ.conflux_status();
1195 status.is_none() || status == Some(ConfluxStatus::Unlinked)
1196 })
1197 .count()
1198 }
1199
1200 /// Check if the specified sequence number is the sequence number of the
1201 /// next message we're expecting to handle.
1202 pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
1203 let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
1204 seq_recv == last_seq_delivered + 1
1205 }
1206
1207 /// Remove the circuit leg with the specified `UniqId` from this conflux set.
1208 ///
1209 /// Unlike [`ConfluxSet::remove`], this function does not check
1210 /// if the removal of the leg ought to trigger a reactor shutdown.
1211 ///
1212 /// Returns an error if the leg doesn't exit in the conflux set.
1213 fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
1214 let idx = self
1215 .legs
1216 .iter()
1217 .position(|circ| circ.unique_id() == circ_id)
1218 .ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
1219
1220 Ok(self.legs.remove(idx))
1221 }
1222
1223 /// Perform some circuit-padding-based action on the specified circuit.
1224 #[cfg(feature = "circ-padding")]
1225 pub(super) async fn run_padding_action(
1226 &mut self,
1227 circ_id: UniqId,
1228 padding_action: PaddingEvent,
1229 ) -> crate::Result<()> {
1230 use PaddingEvent as E;
1231 let Some(circ) = self.leg_mut(circ_id) else {
1232 // No such circuit; it must have gone away after generating this event.
1233 // Just ignore it.
1234 return Ok(());
1235 };
1236
1237 match padding_action {
1238 E::SendPadding(send_padding) => {
1239 circ.send_padding(send_padding).await?;
1240 }
1241 E::StartBlocking(start_blocking) => {
1242 circ.start_blocking_for_padding(start_blocking);
1243 }
1244 E::StopBlocking => {
1245 circ.stop_blocking_for_padding();
1246 }
1247 }
1248 Ok(())
1249 }
1250}
1251
1252/// An error returned when a method is expecting a single-leg conflux circuit,
1253/// but it is not single-leg.
1254#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
1255pub(super) struct NotSingleLegError(#[source] Bug);
1256
1257impl From<NotSingleLegError> for Bug {
1258 fn from(e: NotSingleLegError) -> Self {
1259 e.0
1260 }
1261}
1262
1263impl From<NotSingleLegError> for crate::Error {
1264 fn from(e: NotSingleLegError) -> Self {
1265 Self::from(e.0)
1266 }
1267}
1268
1269impl From<NotSingleLegError> for ReactorError {
1270 fn from(e: NotSingleLegError) -> Self {
1271 Self::from(e.0)
1272 }
1273}
1274
1275impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
1276 fn from(e: ExactlyOneError<I>) -> Self {
1277 // TODO: cannot wrap the ExactlyOneError with into_bad_api_usage
1278 // because it's not Send + Sync
1279 Self(bad_api_usage!("not a single leg conflux set ({e})"))
1280 }
1281}
1282
1283#[cfg(test)]
1284mod test {
1285 // Tested in [`crate::client::circuit::test`].
1286}