tor_proto/tunnel/reactor/
circuit.rs

1//! Module exposing types for representing circuits in the tunnel reactor.
2
3pub(crate) mod circhop;
4pub(super) mod create;
5pub(super) mod extender;
6
7use crate::channel::{Channel, ChannelSender};
8use crate::circuit::HopSettings;
9use crate::congestion::sendme;
10use crate::congestion::CongestionSignals;
11use crate::crypto::binding::CircuitBinding;
12use crate::crypto::cell::{
13    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
14    RelayCellBody,
15};
16use crate::crypto::handshake::fast::CreateFastClient;
17use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
18use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
20use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
21use crate::stream::{AnyCmdChecker, StreamStatus};
22use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
23use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
24use crate::tunnel::circuit::path;
25use crate::tunnel::circuit::unique_id::UniqId;
26use crate::tunnel::circuit::{
27    CircuitRxReceiver, MutableState, StreamMpscReceiver, StreamMpscSender,
28};
29use crate::tunnel::handshake::RelayCryptLayerProtocol;
30use crate::tunnel::reactor::MetaCellDisposition;
31use crate::tunnel::streammap;
32use crate::tunnel::TunnelScopedCircId;
33use crate::util::err::ReactorError;
34use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
35use crate::util::SinkExt as _;
36use crate::{ClockSkew, Error, Result};
37
38use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
39use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
40use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
41use tor_cell::chancell::{BoxedCellBody, ChanMsg};
42use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
43use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
44use tor_cell::relaycell::{
45    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
46};
47use tor_error::{internal, Bug};
48use tor_linkspec::RelayIds;
49use tor_llcrypto::pk;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52use futures::{SinkExt as _, Stream};
53use oneshot_fused_workaround as oneshot;
54use safelog::sensitive as sv;
55use tracing::{debug, trace, warn};
56
57use super::{
58    CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
59};
60
61use std::borrow::Borrow;
62use std::pin::Pin;
63use std::result::Result as StdResult;
64use std::sync::Arc;
65use std::task::Poll;
66use std::time::{Duration, SystemTime};
67
68use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
69use extender::HandshakeAuxDataHandler;
70
71#[cfg(feature = "hs-service")]
72use {
73    crate::stream::{DataCmdChecker, IncomingStreamRequest},
74    tor_cell::relaycell::msg::Begin,
75};
76
77#[cfg(feature = "conflux")]
78use {
79    super::conflux::ConfluxMsgHandler,
80    super::conflux::{ConfluxAction, OooRelayMsg},
81    crate::tunnel::reactor::RemoveLegReason,
82    crate::tunnel::TunnelId,
83};
84
85pub(super) use circhop::{CircHop, CircHopList};
86
87/// Initial value for outbound flow-control window on streams.
88pub(super) const SEND_WINDOW_INIT: u16 = 500;
89/// Initial value for inbound flow-control window on streams.
90pub(crate) const RECV_WINDOW_INIT: u16 = 500;
91/// Size of the buffer used between the reactor and a `StreamReader`.
92///
93/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
94///             get sent more than the receive window anyway!). We might do due to things that
95///             don't count towards the window though.
96pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
97
98/// A circuit "leg" from a tunnel.
99///
100/// Regular (non-multipath) circuits have a single leg.
101/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
102pub(crate) struct Circuit {
103    /// The channel this circuit is attached to.
104    channel: Arc<Channel>,
105    /// Sender object used to actually send cells.
106    ///
107    /// NOTE: Control messages could potentially add unboundedly to this, although that's
108    ///       not likely to happen (and isn't triggereable from the network, either).
109    pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
110    /// Input stream, on which we receive ChanMsg objects from this circuit's
111    /// channel.
112    ///
113    // TODO: could use a SPSC channel here instead.
114    pub(super) input: CircuitRxReceiver,
115    /// The cryptographic state for this circuit for inbound cells.
116    /// This object is divided into multiple layers, each of which is
117    /// shared with one hop of the circuit.
118    crypto_in: InboundClientCrypt,
119    /// The cryptographic state for this circuit for outbound cells.
120    crypto_out: OutboundClientCrypt,
121    /// List of hops state objects used by the reactor
122    hops: CircHopList,
123    /// Mutable information about this circuit,
124    /// shared with the reactor's `ConfluxSet`.
125    mutable: Arc<MutableState>,
126    /// This circuit's identifier on the upstream channel.
127    channel_id: CircId,
128    /// An identifier for logging about this reactor's circuit.
129    unique_id: TunnelScopedCircId,
130    /// A handler for conflux cells.
131    ///
132    /// Set once the conflux handshake is initiated by the reactor
133    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
134    #[cfg(feature = "conflux")]
135    conflux_handler: Option<ConfluxMsgHandler>,
136    /// Memory quota account
137    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
138    memquota: CircuitAccount,
139}
140
141/// A command to run in response to a circuit event.
142///
143/// Unlike `RunOnceCmdInner`, doesn't know anything about `UniqId`s.
144/// The user of the `CircuitCmd`s is supposed to know the `UniqId`
145/// of the circuit the `CircuitCmd` came from.
146///
147/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
148#[derive(Debug)]
149pub(super) enum CircuitCmd {
150    /// Send a RELAY cell on the circuit leg this command originates from.
151    Send(SendRelayCell),
152    /// Handle a SENDME message received on the circuit leg this command originates from.
153    HandleSendMe {
154        /// The hop number.
155        hop: HopNum,
156        /// The SENDME message to handle.
157        sendme: Sendme,
158    },
159    /// Close the specified stream on the circuit leg this command originates from.
160    CloseStream {
161        /// The hop number.
162        hop: HopNum,
163        /// The ID of the stream to close.
164        sid: StreamId,
165        /// The stream-closing behavior.
166        behav: CloseStreamBehavior,
167        /// The reason for closing the stream.
168        reason: streammap::TerminateReason,
169    },
170    /// Remove this circuit from the conflux set.
171    ///
172    /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
173    /// (originating from wrong hop), and for messages that are rejected
174    /// by its inner `AbstractMsgHandler`.
175    #[cfg(feature = "conflux")]
176    ConfluxRemove(RemoveLegReason),
177    /// This circuit has completed the conflux handshake,
178    /// and wants to send the specified cell.
179    ///
180    /// Returned by an `AbstractMsgHandler` to signal to the reactor that
181    /// the conflux handshake is complete.
182    #[cfg(feature = "conflux")]
183    ConfluxHandshakeComplete(SendRelayCell),
184    /// Perform a clean shutdown on this circuit.
185    CleanShutdown,
186    /// Enqueue an out-of-order cell in the reactor.
187    #[cfg(feature = "conflux")]
188    Enqueue(OooRelayMsg),
189}
190
191/// Return a `CircProto` error for the specified unsupported cell.
192///
193/// This error will shut down the reactor.
194///
195/// Note: this is a macro to simplify usage (this way the caller doesn't
196/// need to .map() the result to the appropriate type)
197macro_rules! unsupported_client_cell {
198    ($msg:expr) => {{
199        unsupported_client_cell!(@ $msg, "")
200    }};
201
202    ($msg:expr, $hopnum:expr) => {{
203        let hop: HopNum = $hopnum;
204        let hop_display = format!(" from hop {}", hop.display());
205        unsupported_client_cell!(@ $msg, hop_display)
206    }};
207
208    (@ $msg:expr, $hopnum_display:expr) => {
209        Err(crate::Error::CircProto(format!(
210            "Unexpected {} cell{} on client circuit",
211            $msg.cmd(),
212            $hopnum_display,
213        )))
214    };
215}
216
217pub(super) use unsupported_client_cell;
218
219impl Circuit {
220    /// Create a new non-multipath circuit.
221    pub(super) fn new(
222        channel: Arc<Channel>,
223        channel_id: CircId,
224        unique_id: TunnelScopedCircId,
225        input: CircuitRxReceiver,
226        memquota: CircuitAccount,
227        mutable: Arc<MutableState>,
228    ) -> Self {
229        let chan_sender = SometimesUnboundedSink::new(channel.sender());
230
231        let crypto_out = OutboundClientCrypt::new();
232        Circuit {
233            channel,
234            chan_sender,
235            input,
236            crypto_in: InboundClientCrypt::new(),
237            hops: CircHopList::default(),
238            unique_id,
239            channel_id,
240            crypto_out,
241            mutable,
242            #[cfg(feature = "conflux")]
243            conflux_handler: None,
244            memquota,
245        }
246    }
247
248    /// Return the process-unique identifier of this circuit.
249    pub(super) fn unique_id(&self) -> UniqId {
250        self.unique_id.unique_id()
251    }
252
253    /// Return the shared mutable state of this circuit.
254    pub(super) fn mutable(&self) -> &Arc<MutableState> {
255        &self.mutable
256    }
257
258    /// Add this circuit to a multipath tunnel, by associating it with a new [`TunnelId`],
259    /// and installing a [`ConfluxMsgHandler`] on this circuit.
260    ///
261    /// Once this is called, the circuit will be able to handle conflux cells.
262    #[cfg(feature = "conflux")]
263    pub(super) fn add_to_conflux_tunnel(
264        &mut self,
265        tunnel_id: TunnelId,
266        conflux_handler: ConfluxMsgHandler,
267    ) {
268        self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
269        self.conflux_handler = Some(conflux_handler);
270    }
271
272    /// Send a LINK cell to the specified hop.
273    ///
274    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
275    /// on the circuit with [`add_to_conflux_tunnel`](Self::add_to_conflux_tunnel).
276    #[cfg(feature = "conflux")]
277    pub(super) async fn begin_conflux_link(
278        &mut self,
279        hop: HopNum,
280        cell: AnyRelayMsgOuter,
281        runtime: &tor_rtcompat::DynTimeProvider,
282    ) -> Result<()> {
283        use tor_rtcompat::SleepProvider as _;
284
285        if self.conflux_handler.is_none() {
286            return Err(internal!(
287                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
288            )
289            .into());
290        }
291
292        let cell = SendRelayCell {
293            hop,
294            early: false,
295            cell,
296        };
297        self.send_relay_cell(cell).await?;
298
299        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
300            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
301        };
302
303        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
304    }
305
306    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
307    ///
308    /// Returns `None` if this handler hasn't started the handshake yet.
309    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
310        cfg_if::cfg_if! {
311            if #[cfg(feature = "conflux")] {
312                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
313            } else {
314                None
315            }
316        }
317    }
318
319    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
320    #[cfg(test)]
321    pub(super) fn handle_add_fake_hop(
322        &mut self,
323        format: RelayCellFormat,
324        fwd_lasthop: bool,
325        rev_lasthop: bool,
326        dummy_peer_id: path::HopDetail,
327        params: &crate::circuit::CircParameters,
328        done: ReactorResultChannel<()>,
329    ) {
330        use tor_protover::{named, Protocols};
331
332        use crate::tunnel::circuit::test::DummyCrypto;
333
334        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
335        let rev = Box::new(DummyCrypto::new(rev_lasthop));
336        let binding = None;
337        let settings = HopSettings::from_params_and_caps(
338            params,
339            &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
340        )
341        .expect("Can't construct HopSettings");
342        self.add_hop(format, dummy_peer_id, fwd, rev, binding, &settings)
343            .expect("could not add hop to circuit");
344        let _ = done.send(Ok(()));
345    }
346
347    /// Encode `msg` and encrypt it, returning the resulting cell
348    /// and tag that should be expected for an authenticated SENDME sent
349    /// in response to that cell.
350    fn encode_relay_cell(
351        crypto_out: &mut OutboundClientCrypt,
352        relay_format: RelayCellFormat,
353        hop: HopNum,
354        early: bool,
355        msg: AnyRelayMsgOuter,
356    ) -> Result<(AnyChanMsg, SendmeTag)> {
357        let mut body: RelayCellBody = msg
358            .encode(relay_format, &mut rand::rng())
359            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
360            .into();
361        let cmd = if early {
362            ChanCmd::RELAY_EARLY
363        } else {
364            ChanCmd::RELAY
365        };
366        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
367        let msg = Relay::from(BoxedCellBody::from(body));
368        let msg = if early {
369            AnyChanMsg::RelayEarly(msg.into())
370        } else {
371            AnyChanMsg::Relay(msg)
372        };
373
374        Ok((msg, tag))
375    }
376
377    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
378    ///
379    /// If there is insufficient outgoing *circuit-level* or *stream-level*
380    /// SENDME window, an error is returned instead.
381    ///
382    /// Does not check whether the cell is well-formed or reasonable.
383    ///
384    /// NOTE: the reactor should not call this function directly, only via
385    /// [`ConfluxSet::send_relay_cell_on_leg`](super::conflux::ConfluxSet::send_relay_cell_on_leg),
386    /// which will reroute the message, if necessary to the primary leg.
387    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
388        let SendRelayCell {
389            hop,
390            early,
391            cell: msg,
392        } = msg;
393
394        let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
395        if !is_conflux_link && self.is_conflux_pending() {
396            // Note: it is the responsibility of the reactor user to wait until
397            // at least one of the legs completes the handshake.
398            return Err(internal!("tried to send cell on unlinked circuit").into());
399        }
400
401        trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
402
403        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
404        let stream_id = msg.stream_id();
405        let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
406
407        // We need to apply stream-level flow control *before* encoding the message.
408        if c_t_w {
409            if let Some(stream_id) = stream_id {
410                circhop.take_capacity_to_send(stream_id, msg.msg())?;
411            }
412        }
413
414        // Save the RelayCmd of the message before it gets consumed below.
415        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
416        // so that it can update its counters.
417        let relay_cmd = msg.cmd();
418
419        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
420        //            the whole circuit (e.g. by returning an error).
421        let (msg, tag) = Self::encode_relay_cell(
422            &mut self.crypto_out,
423            circhop.relay_format(),
424            hop,
425            early,
426            msg,
427        )?;
428        // The cell counted for congestion control, inform our algorithm of such and pass down the
429        // tag for authenticated SENDMEs.
430        if c_t_w {
431            circhop.ccontrol_mut().note_data_sent(&tag)?;
432        }
433
434        let cell = AnyChanCell::new(Some(self.channel_id), msg);
435        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
436
437        #[cfg(feature = "conflux")]
438        if let Some(conflux) = self.conflux_handler.as_mut() {
439            conflux.note_cell_sent(relay_cmd);
440        }
441
442        Ok(())
443    }
444
445    /// Helper: process a cell on a channel.  Most cells get ignored
446    /// or rejected; a few get delivered to circuits.
447    ///
448    /// Return `CellStatus::CleanShutdown` if we should exit.
449    ///
450    // TODO: returning `Vec<CircuitCmd>` means we're unnecessarily
451    // allocating a `Vec` here. Generally, the number of commands is going to be small
452    // (usually 1, but > 1 when we start supporting packed cells).
453    //
454    // We should consider using smallvec instead. It might also be a good idea to have a
455    // separate higher-level type splitting this out into Single(CircuitCmd),
456    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
457    pub(super) fn handle_cell(
458        &mut self,
459        handlers: &mut CellHandlers,
460        leg: UniqId,
461        cell: ClientCircChanMsg,
462    ) -> Result<Vec<CircuitCmd>> {
463        trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
464        use ClientCircChanMsg::*;
465        match cell {
466            Relay(r) => self.handle_relay_cell(handlers, leg, r),
467            Destroy(d) => {
468                let reason = d.reason();
469                debug!(
470                    circ_id = %self.unique_id,
471                    "Received DESTROY cell. Reason: {} [{}]",
472                    reason.human_str(),
473                    reason
474                );
475
476                self.handle_destroy_cell().map(|c| vec![c])
477            }
478        }
479    }
480
481    /// Decode `cell`, returning its corresponding hop number, tag,
482    /// and decoded body.
483    fn decode_relay_cell(
484        &mut self,
485        cell: Relay,
486    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
487        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
488        let cmd = cell.cmd();
489        let mut body = cell.into_relay_body().into();
490
491        // Decrypt the cell. If it's recognized, then find the
492        // corresponding hop.
493        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
494
495        // Decode the cell.
496        let decode_res = self
497            .hop_mut(hopnum)
498            .ok_or_else(|| {
499                Error::from(internal!(
500                    "Trying to decode cell from nonexistent hop {:?}",
501                    hopnum
502                ))
503            })?
504            .decode(body.into())?;
505
506        Ok((hopnum, tag, decode_res))
507    }
508
509    /// React to a Relay or RelayEarly cell.
510    fn handle_relay_cell(
511        &mut self,
512        handlers: &mut CellHandlers,
513        leg: UniqId,
514        cell: Relay,
515    ) -> Result<Vec<CircuitCmd>> {
516        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
517
518        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
519
520        // Decrement the circuit sendme windows, and see if we need to
521        // send a sendme cell.
522        let send_circ_sendme = if c_t_w {
523            self.hop_mut(hopnum)
524                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
525                .ccontrol_mut()
526                .note_data_received()?
527        } else {
528            false
529        };
530
531        let mut circ_cmds = vec![];
532        // If we do need to send a circuit-level SENDME cell, do so.
533        if send_circ_sendme {
534            // This always sends a V1 (tagged) sendme cell, and thereby assumes
535            // that SendmeEmitMinVersion is no more than 1.  If the authorities
536            // every increase that parameter to a higher number, this will
537            // become incorrect.  (Higher numbers are not currently defined.)
538            let sendme = Sendme::from(tag);
539            let cell = AnyRelayMsgOuter::new(None, sendme.into());
540            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
541                hop: hopnum,
542                early: false,
543                cell,
544            }));
545
546            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
547            self.hop_mut(hopnum)
548                .ok_or_else(|| {
549                    Error::from(internal!(
550                        "Trying to send SENDME to nonexistent hop {:?}",
551                        hopnum
552                    ))
553                })?
554                .ccontrol_mut()
555                .note_sendme_sent()?;
556        }
557
558        let (mut msgs, incomplete) = decode_res.into_parts();
559        while let Some(msg) = msgs.next() {
560            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
561
562            match msg_status {
563                None => continue,
564                Some(msg @ CircuitCmd::CleanShutdown) => {
565                    for m in msgs {
566                        debug!(
567                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
568                            id = self.unique_id
569                        );
570                    }
571                    if let Some(incomplete) = incomplete {
572                        debug!(
573                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
574                            incomplete,
575                            id=self.unique_id,
576                        );
577                    }
578                    circ_cmds.push(msg);
579                    return Ok(circ_cmds);
580                }
581                Some(msg) => {
582                    circ_cmds.push(msg);
583                }
584            }
585        }
586
587        Ok(circ_cmds)
588    }
589
590    /// Handle a single incoming relay message.
591    fn handle_relay_msg(
592        &mut self,
593        handlers: &mut CellHandlers,
594        hopnum: HopNum,
595        leg: UniqId,
596        cell_counts_toward_windows: bool,
597        msg: UnparsedRelayMsg,
598    ) -> Result<Option<CircuitCmd>> {
599        // If this msg wants/refuses to have a Stream ID, does it
600        // have/not have one?
601        let streamid = msg_streamid(&msg)?;
602
603        // If this doesn't have a StreamId, it's a meta cell,
604        // not meant for a particular stream.
605        let Some(streamid) = streamid else {
606            return self.handle_meta_cell(handlers, hopnum, msg);
607        };
608
609        #[cfg(feature = "conflux")]
610        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
611            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
612                ConfluxAction::Deliver(msg) => {
613                    // The message either doesn't count towards the sequence numbers
614                    // or is already well-ordered, so we're ready to handle it.
615
616                    // It's possible that some of our buffered messages are now ready to be
617                    // handled. We don't check that here, however, because that's handled
618                    // by the reactor main loop.
619                    msg
620                }
621                ConfluxAction::Enqueue(msg) => {
622                    // Tell the reactor to enqueue this msg
623                    return Ok(Some(CircuitCmd::Enqueue(msg)));
624                }
625            }
626        } else {
627            // If we don't have a conflux_handler, it means this circuit is not part of
628            // a conflux tunnel, so we can just process the message.
629            msg
630        };
631
632        self.handle_in_order_relay_msg(
633            handlers,
634            hopnum,
635            leg,
636            cell_counts_toward_windows,
637            streamid,
638            msg,
639        )
640    }
641
642    /// Handle a single incoming relay message that is known to be in order.
643    pub(super) fn handle_in_order_relay_msg(
644        &mut self,
645        handlers: &mut CellHandlers,
646        hopnum: HopNum,
647        leg: UniqId,
648        cell_counts_toward_windows: bool,
649        streamid: StreamId,
650        msg: UnparsedRelayMsg,
651    ) -> Result<Option<CircuitCmd>> {
652        #[cfg(feature = "conflux")]
653        if let Some(conflux) = self.conflux_handler.as_mut() {
654            conflux.inc_last_seq_delivered(&msg);
655        }
656
657        let hop = self
658            .hop_mut(hopnum)
659            .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
660        let res = hop.handle_msg(cell_counts_toward_windows, streamid, msg)?;
661
662        if let Some(msg) = res {
663            cfg_if::cfg_if! {
664                if #[cfg(feature = "hs-service")] {
665                    return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
666                } else {
667                    return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
668                }
669            }
670        }
671
672        Ok(None)
673    }
674
675    /// Handle a conflux message coming from the specified hop.
676    ///
677    /// Returns an error if
678    ///
679    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
680    ///   * this is a client circuit and the conflux message originated an unexpected hop
681    ///   * the cell was sent in violation of the handshake protocol
682    #[cfg(feature = "conflux")]
683    fn handle_conflux_msg(
684        &mut self,
685        hop: HopNum,
686        msg: UnparsedRelayMsg,
687    ) -> Result<Option<CircuitCmd>> {
688        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
689            // If conflux is not enabled, tear down the circuit
690            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
691            return Err(Error::CircProto(format!(
692                "Received {} cell from hop {} on non-conflux client circuit?!",
693                msg.cmd(),
694                hop.display(),
695            )));
696        };
697
698        Ok(conflux_handler.handle_conflux_msg(msg, hop))
699    }
700
701    /// For conflux: return the sequence number of the last cell sent on this leg.
702    ///
703    /// Returns an error if this circuit is not part of a conflux set.
704    #[cfg(feature = "conflux")]
705    pub(super) fn last_seq_sent(&self) -> Result<u64> {
706        let handler = self
707            .conflux_handler
708            .as_ref()
709            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
710
711        Ok(handler.last_seq_sent())
712    }
713
714    /// For conflux: return the sequence number of the last cell received on this leg.
715    ///
716    /// Returns an error if this circuit is not part of a conflux set.
717    #[cfg(feature = "conflux")]
718    pub(super) fn last_seq_recv(&self) -> Result<u64> {
719        let handler = self
720            .conflux_handler
721            .as_ref()
722            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
723
724        Ok(handler.last_seq_recv())
725    }
726
727    /// A helper for handling incoming stream requests.
728    ///
729    // TODO: can we make this a method on CircHop to avoid the double HopNum lookup?
730    #[cfg(feature = "hs-service")]
731    fn handle_incoming_stream_request(
732        &mut self,
733        handlers: &mut CellHandlers,
734        msg: UnparsedRelayMsg,
735        stream_id: StreamId,
736        hop_num: HopNum,
737        leg: UniqId,
738    ) -> Result<Option<CircuitCmd>> {
739        use super::syncview::ClientCircSyncView;
740        use tor_cell::relaycell::msg::EndReason;
741        use tor_error::into_internal;
742        use tor_log_ratelim::log_ratelim;
743
744        use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
745
746        // We need to construct this early so that we don't double-borrow &mut self
747
748        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
749            return Err(Error::CircProto(
750                "Cannot handle BEGIN cells on this circuit".into(),
751            ));
752        };
753
754        if hop_num != handler.hop_num {
755            return Err(Error::CircProto(format!(
756                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
757                handler.hop_num.display(),
758                msg.cmd(),
759                hop_num.display()
760            )));
761        }
762
763        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
764
765        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
766        // have to look it up again! However, we can't pass the `&mut hop` reference from
767        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
768        // borrowing self as mutable more than once).
769        //
770        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
771        // handle_relay_cell to work around the problem described above
772        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
773
774        if message_closes_stream {
775            hop.ending_msg_received(stream_id)?;
776
777            return Ok(None);
778        }
779
780        let begin = msg
781            .decode::<Begin>()
782            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
783            .into_msg();
784
785        let req = IncomingStreamRequest::Begin(begin);
786
787        {
788            use crate::stream::IncomingStreamRequestDisposition::*;
789
790            let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
791            // IMPORTANT: ClientCircSyncView::n_open_streams() (called via disposition() below)
792            // accesses the stream map mutexes!
793            //
794            // This means it's very important not to call this function while any of the hop's
795            // stream map mutex is held.
796            let view = ClientCircSyncView::new(&self.hops);
797
798            match handler.filter.as_mut().disposition(&ctx, &view)? {
799                Accept => {}
800                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
801                RejectRequest(end) => {
802                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
803                    let cell = SendRelayCell {
804                        hop: hop_num,
805                        early: false,
806                        cell: end_msg,
807                    };
808                    return Ok(Some(CircuitCmd::Send(cell)));
809                }
810            }
811        }
812
813        // TODO: Sadly, we need to look up `&mut hop` yet again,
814        // since we needed to pass `&self.hops` by reference to our filter above. :(
815        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
816        let relay_cell_format = hop.relay_format();
817
818        let memquota = StreamAccount::new(&self.memquota)?;
819
820        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
821            self.chan_sender.as_inner().time_provider().clone(),
822            memquota.as_raw_account(),
823        )?;
824        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
825            self.chan_sender.as_inner().time_provider().clone(),
826            memquota.as_raw_account(),
827        )?;
828
829        let cmd_checker = DataCmdChecker::new_connected();
830        hop.add_ent_with_id(sender, msg_rx, stream_id, cmd_checker)?;
831
832        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
833            req,
834            stream_id,
835            hop_num,
836            leg,
837            msg_tx,
838            receiver,
839            memquota,
840            relay_cell_format,
841        });
842
843        log_ratelim!("Delivering message to incoming stream handler"; outcome);
844
845        if let Err(e) = outcome {
846            if e.is_full() {
847                // The IncomingStreamRequestHandler's stream is full; it isn't
848                // handling requests fast enough. So instead, we reply with an
849                // END cell.
850                let end_msg = AnyRelayMsgOuter::new(
851                    Some(stream_id),
852                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
853                );
854
855                let cell = SendRelayCell {
856                    hop: hop_num,
857                    early: false,
858                    cell: end_msg,
859                };
860                return Ok(Some(CircuitCmd::Send(cell)));
861            } else if e.is_disconnected() {
862                // The IncomingStreamRequestHandler's stream has been dropped.
863                // In the Tor protocol as it stands, this always means that the
864                // circuit itself is out-of-use and should be closed. (See notes
865                // on `allow_stream_requests.`)
866                //
867                // Note that we will _not_ reach this point immediately after
868                // the IncomingStreamRequestHandler is dropped; we won't hit it
869                // until we next get an incoming request.  Thus, if we do later
870                // want to add early detection for a dropped
871                // IncomingStreamRequestHandler, we need to do it elsewhere, in
872                // a different way.
873                debug!(
874                    circ_id = %self.unique_id,
875                    "Incoming stream request receiver dropped",
876                );
877                // This will _cause_ the circuit to get closed.
878                return Err(Error::CircuitClosed);
879            } else {
880                // There are no errors like this with the current design of
881                // futures::mpsc, but we shouldn't just ignore the possibility
882                // that they'll be added later.
883                return Err(Error::from((into_internal!(
884                    "try_send failed unexpectedly"
885                ))(e)));
886            }
887        }
888
889        Ok(None)
890    }
891
892    /// Helper: process a destroy cell.
893    #[allow(clippy::unnecessary_wraps)]
894    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
895        // I think there is nothing more to do here.
896        Ok(CircuitCmd::CleanShutdown)
897    }
898
899    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
900    pub(super) async fn handle_create(
901        &mut self,
902        recv_created: oneshot::Receiver<CreateResponse>,
903        handshake: CircuitHandshake,
904        settings: HopSettings,
905        done: ReactorResultChannel<()>,
906    ) -> StdResult<(), ReactorError> {
907        let ret = match handshake {
908            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
909            CircuitHandshake::Ntor {
910                public_key,
911                ed_identity,
912            } => {
913                self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
914                    .await
915            }
916            CircuitHandshake::NtorV3 { public_key } => {
917                self.create_firsthop_ntor_v3(recv_created, public_key, settings)
918                    .await
919            }
920        };
921        let _ = done.send(ret); // don't care if sender goes away
922
923        // TODO: maybe we don't need to flush here?
924        // (we could let run_once() handle all the flushing)
925        self.chan_sender.flush().await?;
926
927        Ok(())
928    }
929
930    /// Helper: create the first hop of a circuit.
931    ///
932    /// This is parameterized not just on the RNG, but a wrapper object to
933    /// build the right kind of create cell, and a handshake object to perform
934    /// the cryptographic handshake.
935    async fn create_impl<H, W, M>(
936        &mut self,
937        cell_protocol: RelayCryptLayerProtocol,
938        recvcreated: oneshot::Receiver<CreateResponse>,
939        wrap: &W,
940        key: &H::KeyType,
941        mut settings: HopSettings,
942        msg: &M,
943    ) -> Result<()>
944    where
945        H: ClientHandshake + HandshakeAuxDataHandler,
946        W: CreateHandshakeWrap,
947        H::KeyGen: KeyGenerator,
948        M: Borrow<H::ClientAuxData>,
949    {
950        // We don't need to shut down the circuit on failure here, since this
951        // function consumes the PendingClientCirc and only returns
952        // a ClientCirc on success.
953
954        let (state, msg) = {
955            // done like this because holding the RNG across an await boundary makes the future
956            // non-Send
957            let mut rng = rand::rng();
958            H::client1(&mut rng, key, msg)?
959        };
960        let create_cell = wrap.to_chanmsg(msg);
961        trace!(
962            circ_id = %self.unique_id,
963            create = %create_cell.cmd(),
964            "Extending to hop 1",
965        );
966        self.send_msg(create_cell).await?;
967
968        let reply = recvcreated
969            .await
970            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
971
972        let relay_handshake = wrap.decode_chanmsg(reply)?;
973        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
974
975        H::handle_server_aux_data(&mut settings, &server_msg)?;
976
977        let relay_cell_format = cell_protocol.relay_cell_format();
978        let BoxedClientLayer { fwd, back, binding } =
979            cell_protocol.construct_client_layers(HandshakeRole::Initiator, keygen)?;
980
981        trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
982
983        let peer_id = self.channel.target().clone();
984
985        self.add_hop(
986            relay_cell_format,
987            path::HopDetail::Relay(peer_id),
988            fwd,
989            back,
990            binding,
991            &settings,
992        )?;
993        Ok(())
994    }
995
996    /// Use the (questionable!) CREATE_FAST handshake to connect to the
997    /// first hop of this circuit.
998    ///
999    /// There's no authentication in CREATE_FAST,
1000    /// so we don't need to know whom we're connecting to: we're just
1001    /// connecting to whichever relay the channel is for.
1002    async fn create_firsthop_fast(
1003        &mut self,
1004        recvcreated: oneshot::Receiver<CreateResponse>,
1005        settings: HopSettings,
1006    ) -> Result<()> {
1007        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1008        let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1009        let wrap = CreateFastWrap;
1010        self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), settings, &())
1011            .await
1012    }
1013
1014    /// Use the ntor handshake to connect to the first hop of this circuit.
1015    ///
1016    /// Note that the provided keys must match the channel's target,
1017    /// or the handshake will fail.
1018    async fn create_firsthop_ntor(
1019        &mut self,
1020        recvcreated: oneshot::Receiver<CreateResponse>,
1021        ed_identity: pk::ed25519::Ed25519Identity,
1022        pubkey: NtorPublicKey,
1023        settings: HopSettings,
1024    ) -> Result<()> {
1025        // In an ntor handshake, we can't negotiate a format other than this.
1026        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1027
1028        // Exit now if we have an Ed25519 or RSA identity mismatch.
1029        let target = RelayIds::builder()
1030            .ed_identity(ed_identity)
1031            .rsa_identity(pubkey.id)
1032            .build()
1033            .expect("Unable to build RelayIds");
1034        self.channel.check_match(&target)?;
1035
1036        let wrap = Create2Wrap {
1037            handshake_type: HandshakeType::NTOR,
1038        };
1039        self.create_impl::<NtorClient, _, _>(
1040            relay_cell_protocol,
1041            recvcreated,
1042            &wrap,
1043            &pubkey,
1044            settings,
1045            &(),
1046        )
1047        .await
1048    }
1049
1050    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1051    ///
1052    /// Note that the provided key must match the channel's target,
1053    /// or the handshake will fail.
1054    async fn create_firsthop_ntor_v3(
1055        &mut self,
1056        recvcreated: oneshot::Receiver<CreateResponse>,
1057        pubkey: NtorV3PublicKey,
1058        settings: HopSettings,
1059    ) -> Result<()> {
1060        // Exit now if we have a mismatched key.
1061        let target = RelayIds::builder()
1062            .ed_identity(pubkey.id)
1063            .build()
1064            .expect("Unable to build RelayIds");
1065        self.channel.check_match(&target)?;
1066
1067        // TODO #1947: Add support for negotiating other formats.
1068        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1069
1070        // Set the client extensions.
1071        let client_extensions = circ_extensions_from_settings(&settings)?;
1072        let wrap = Create2Wrap {
1073            handshake_type: HandshakeType::NTOR_V3,
1074        };
1075
1076        self.create_impl::<NtorV3Client, _, _>(
1077            relay_cell_protocol,
1078            recvcreated,
1079            &wrap,
1080            &pubkey,
1081            settings,
1082            &client_extensions,
1083        )
1084        .await
1085    }
1086
1087    /// Add a hop to the end of this circuit.
1088    ///
1089    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1090    pub(super) fn add_hop(
1091        &mut self,
1092        format: RelayCellFormat,
1093        peer_id: path::HopDetail,
1094        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1095        rev: Box<dyn InboundClientLayer + 'static + Send>,
1096        binding: Option<CircuitBinding>,
1097        settings: &HopSettings,
1098    ) -> StdResult<(), Bug> {
1099        let hop_num = self.hops.len();
1100        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1101
1102        // There are several places in the code that assume that a `usize` hop number
1103        // can be cast or converted to a `u8` hop number,
1104        // so this check is important to prevent panics or incorrect behaviour.
1105        if hop_num == usize::from(u8::MAX) {
1106            return Err(internal!(
1107                "cannot add more hops to a circuit with `u8::MAX` hops"
1108            ));
1109        }
1110
1111        let hop_num = (hop_num as u8).into();
1112
1113        let hop = CircHop::new(self.unique_id, hop_num, format, settings);
1114        self.hops.push(hop);
1115        self.crypto_in.add_layer(rev);
1116        self.crypto_out.add_layer(fwd);
1117        self.mutable.add_hop(peer_id, binding);
1118
1119        Ok(())
1120    }
1121
1122    /// Handle a RELAY cell on this circuit with stream ID 0.
1123    #[allow(clippy::cognitive_complexity)]
1124    fn handle_meta_cell(
1125        &mut self,
1126        handlers: &mut CellHandlers,
1127        hopnum: HopNum,
1128        msg: UnparsedRelayMsg,
1129    ) -> Result<Option<CircuitCmd>> {
1130        // SENDME cells and TRUNCATED get handled internally by the circuit.
1131
1132        // TODO: This pattern (Check command, try to decode, map error) occurs
1133        // several times, and would be good to extract simplify. Such
1134        // simplification is obstructed by a couple of factors: First, that
1135        // there is not currently a good way to get the RelayCmd from _type_ of
1136        // a RelayMsg.  Second, that decode() [correctly] consumes the
1137        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
1138        // for it. -nickm
1139        if msg.cmd() == RelayCmd::SENDME {
1140            let sendme = msg
1141                .decode::<Sendme>()
1142                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1143                .into_msg();
1144
1145            return Ok(Some(CircuitCmd::HandleSendMe {
1146                hop: hopnum,
1147                sendme,
1148            }));
1149        }
1150        if msg.cmd() == RelayCmd::TRUNCATED {
1151            let truncated = msg
1152                .decode::<Truncated>()
1153                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1154                .into_msg();
1155            let reason = truncated.reason();
1156            debug!(
1157                circ_id = %self.unique_id,
1158                "Truncated from hop {}. Reason: {} [{}]",
1159                hopnum.display(),
1160                reason.human_str(),
1161                reason
1162            );
1163
1164            return Ok(Some(CircuitCmd::CleanShutdown));
1165        }
1166
1167        trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1168
1169        #[cfg(feature = "conflux")]
1170        if matches!(
1171            msg.cmd(),
1172            RelayCmd::CONFLUX_LINK
1173                | RelayCmd::CONFLUX_LINKED
1174                | RelayCmd::CONFLUX_LINKED_ACK
1175                | RelayCmd::CONFLUX_SWITCH
1176        ) {
1177            return self.handle_conflux_msg(hopnum, msg);
1178        }
1179
1180        if self.is_conflux_pending() {
1181            warn!(
1182                circ_id = %self.unique_id,
1183                "received unexpected cell {msg:?} on unlinked conflux circuit",
1184            );
1185            return Err(Error::CircProto(
1186                "Received unexpected cell on unlinked circuit".into(),
1187            ));
1188        }
1189
1190        // For all other command types, we'll only get them in response
1191        // to another command, which should have registered a responder.
1192        //
1193        // TODO: should the conflux state machine be a meta cell handler?
1194        // We'd need to add support for multiple meta handlers, and change the
1195        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
1196        // (because some cells will require sending a response)
1197        if let Some(mut handler) = handlers.meta_handler.take() {
1198            if handler.expected_hop() == hopnum {
1199                // Somebody was waiting for a message -- maybe this message
1200                let ret = handler.handle_msg(msg, self);
1201                trace!(
1202                    circ_id = %self.unique_id,
1203                    result = ?ret,
1204                    "meta handler completed",
1205                );
1206                match ret {
1207                    #[cfg(feature = "send-control-msg")]
1208                    Ok(MetaCellDisposition::Consumed) => {
1209                        handlers.meta_handler = Some(handler);
1210                        Ok(None)
1211                    }
1212                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1213                    #[cfg(feature = "send-control-msg")]
1214                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1215                    Err(e) => Err(e),
1216                }
1217            } else {
1218                // Somebody wanted a message from a different hop!  Put this
1219                // one back.
1220                handlers.meta_handler = Some(handler);
1221
1222                unsupported_client_cell!(msg, hopnum)
1223            }
1224        } else {
1225            // No need to call shutdown here, since this error will
1226            // propagate to the reactor shut it down.
1227            unsupported_client_cell!(msg)
1228        }
1229    }
1230
1231    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1232    pub(super) fn handle_sendme(
1233        &mut self,
1234        hopnum: HopNum,
1235        msg: Sendme,
1236        signals: CongestionSignals,
1237    ) -> Result<Option<CircuitCmd>> {
1238        // No need to call "shutdown" on errors in this function;
1239        // it's called from the reactor task and errors will propagate there.
1240        let hop = self
1241            .hop_mut(hopnum)
1242            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1243
1244        let tag = msg.into_sendme_tag().ok_or_else(||
1245                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1246                // but we don't support those any longer.
1247                 Error::CircProto("missing tag on circuit sendme".into()))?;
1248        // Update the CC object that we received a SENDME along with possible congestion signals.
1249        hop.ccontrol_mut().note_sendme_received(tag, signals)?;
1250        Ok(None)
1251    }
1252
1253    /// Send a message onto the circuit's channel.
1254    ///
1255    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1256    /// will be enqueued for sending at a later iteration of the reactor loop.
1257    ///
1258    /// # Note
1259    ///
1260    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1261    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1262    /// ideally use this to implement backpressure (such that you do not read from other sources
1263    /// that would send here while you know you're unable to forward the messages on).
1264    async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1265        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1266        // Note: this future is always `Ready`, so await won't block.
1267        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1268        Ok(())
1269    }
1270
1271    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
1272    ///
1273    /// The iterator contains at most one [`CircuitCmd`] for each hop,
1274    /// representing the instructions for handling the ready-item, if any,
1275    /// of its highest priority stream.
1276    ///
1277    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
1278    /// To avoid contention, never create more than one [`Circuit::ready_streams_iterator`]
1279    /// stream at a time!
1280    ///
1281    /// This is cancellation-safe.
1282    pub(super) fn ready_streams_iterator(&self) -> impl Stream<Item = Result<CircuitCmd>> {
1283        self.hops.ready_streams_iterator()
1284    }
1285
1286    /// Return the congestion signals for this reactor. This is used by congestion control module.
1287    ///
1288    /// Note: This is only async because we need a Context to check the sink for readiness.
1289    pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1290        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1291            Poll::Ready(CongestionSignals::new(
1292                self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1293                self.chan_sender.n_queued(),
1294            ))
1295        })
1296        .await
1297    }
1298
1299    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
1300    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1301        self.hops.hop(hopnum)
1302    }
1303
1304    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1305    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1306        self.hops.get_mut(hopnum)
1307    }
1308
1309    /// Begin a stream with the provided hop in this circuit.
1310    pub(super) fn begin_stream(
1311        &mut self,
1312        hop_num: HopNum,
1313        message: AnyRelayMsg,
1314        sender: StreamMpscSender<UnparsedRelayMsg>,
1315        rx: StreamMpscReceiver<AnyRelayMsg>,
1316        cmd_checker: AnyCmdChecker,
1317    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1318        let Some(hop) = self.hop_mut(hop_num) else {
1319            return Err(internal!(
1320                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1321                self.unique_id,
1322            ));
1323        };
1324
1325        Ok(hop.begin_stream(message, sender, rx, cmd_checker))
1326    }
1327
1328    /// Close the specified stream
1329    pub(super) async fn close_stream(
1330        &mut self,
1331        hop_num: HopNum,
1332        sid: StreamId,
1333        behav: CloseStreamBehavior,
1334        reason: streammap::TerminateReason,
1335    ) -> Result<()> {
1336        if let Some(hop) = self.hop_mut(hop_num) {
1337            let res = hop.close_stream(sid, behav, reason)?;
1338            if let Some(cell) = res {
1339                self.send_relay_cell(cell).await?;
1340            }
1341        }
1342        Ok(())
1343    }
1344
1345    /// Returns true if there are any streams on this circuit
1346    ///
1347    /// Important: this function locks the stream map of its each of the [`CircHop`]s
1348    /// in this circuit, so it must **not** be called from any function where the
1349    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
1350    pub(super) fn has_streams(&self) -> bool {
1351        self.hops.has_streams()
1352    }
1353
1354    /// The number of hops in this circuit.
1355    pub(super) fn num_hops(&self) -> u8 {
1356        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
1357        // so `self.hops.len()` should be safe to cast to a `u8`.
1358        // If that assumption is violated,
1359        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1360        self.hops
1361            .len()
1362            .try_into()
1363            .expect("`hops.len()` has more than `u8::MAX` hops")
1364    }
1365
1366    /// Check whether this circuit has any hops.
1367    pub(super) fn has_hops(&self) -> bool {
1368        !self.hops.is_empty()
1369    }
1370
1371    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
1372    ///
1373    /// Returns `None` if the circuit has no hops.
1374    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1375        let num_hops = self.num_hops();
1376        if num_hops == 0 {
1377            // asked for the last hop, but there are no hops
1378            return None;
1379        }
1380        Some(HopNum::from(num_hops - 1))
1381    }
1382
1383    /// Get the path of the circuit.
1384    ///
1385    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
1386    pub(super) fn path(&self) -> Arc<path::Path> {
1387        self.mutable.path()
1388    }
1389
1390    /// Return a ClockSkew declaring how much clock skew the other side of this channel
1391    /// claimed that we had when we negotiated the connection.
1392    pub(super) fn clock_skew(&self) -> ClockSkew {
1393        self.channel.clock_skew()
1394    }
1395
1396    /// Does congestion control use stream SENDMEs for the given `hop`?
1397    ///
1398    /// Returns `None` if `hop` doesn't exist.
1399    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1400        let hop = self.hop(hop)?;
1401        Some(hop.ccontrol().uses_stream_sendme())
1402    }
1403
1404    /// Returns whether this is a conflux circuit that is not linked yet.
1405    pub(super) fn is_conflux_pending(&self) -> bool {
1406        let Some(status) = self.conflux_status() else {
1407            return false;
1408        };
1409
1410        status != ConfluxStatus::Linked
1411    }
1412
1413    /// Returns the conflux status of this circuit.
1414    ///
1415    /// Returns `None` if this is not a conflux circuit.
1416    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1417        cfg_if::cfg_if! {
1418            if #[cfg(feature = "conflux")] {
1419                self.conflux_handler
1420                    .as_ref()
1421                    .map(|handler| handler.status())
1422            } else {
1423                None
1424            }
1425        }
1426    }
1427
1428    /// Returns initial RTT on this leg, measured in the conflux handshake.
1429    #[cfg(feature = "conflux")]
1430    pub(super) fn init_rtt(&self) -> Option<Duration> {
1431        self.conflux_handler
1432            .as_ref()
1433            .map(|handler| handler.init_rtt())?
1434    }
1435}
1436
1437/// The conflux status of a conflux [`Circuit`].
1438#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1439pub(super) enum ConfluxStatus {
1440    /// Circuit has not begun the conflux handshake yet.
1441    Unlinked,
1442    /// Conflux handshake is in progress.
1443    Pending,
1444    /// A linked conflux circuit.
1445    Linked,
1446}
1447
1448/// Return the stream ID of `msg`, if it has one.
1449///
1450/// Returns `Ok(None)` if `msg` is a meta cell.
1451fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1452    let cmd = msg.cmd();
1453    let streamid = msg.stream_id();
1454    if !cmd.accepts_streamid_val(streamid) {
1455        return Err(Error::CircProto(format!(
1456            "Invalid stream ID {} for relay command {}",
1457            sv(StreamId::get_or_zero(streamid)),
1458            msg.cmd()
1459        )));
1460    }
1461
1462    Ok(streamid)
1463}
1464
1465impl Drop for Circuit {
1466    fn drop(&mut self) {
1467        let _ = self.channel.close_circuit(self.channel_id);
1468    }
1469}
1470
1471/// Return the client circuit-creation extensions that we should use in order to negotiate
1472/// a given set of circuit hop parameters.
1473#[allow(clippy::unnecessary_wraps)]
1474pub(super) fn circ_extensions_from_settings(params: &HopSettings) -> Result<Vec<CircRequestExt>> {
1475    // allow 'unused_mut' because of the combinations of `cfg` conditions below
1476    #[allow(unused_mut)]
1477    let mut client_extensions = Vec::new();
1478
1479    if params.ccontrol.is_enabled() {
1480        cfg_if::cfg_if! {
1481            if #[cfg(feature = "flowctl-cc")] {
1482                // TODO(arti#88): We have an `if false` in `exit_circparams_from_netparams`
1483                // which should prevent the above `is_enabled()` from ever being true,
1484                // even with the "flowctl-cc" feature enabled:
1485                // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
1486                // The panic here is so that CI tests will hopefully catch if congestion
1487                // control is unexpectedly enabled.
1488                // We should remove this panic once xon/xoff flow is supported.
1489                #[cfg(not(test))]
1490                panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1491
1492                #[allow(unreachable_code)]
1493                client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1494            } else {
1495                return Err(
1496                    tor_error::internal!(
1497                        "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1498                    )
1499                    .into()
1500                );
1501            }
1502        }
1503    }
1504
1505    Ok(client_extensions)
1506}