tor_proto/tunnel/reactor/
circuit.rs

1//! Module exposing types for representing circuits in the tunnel reactor.
2
3pub(crate) mod circhop;
4pub(super) mod create;
5pub(super) mod extender;
6
7use crate::channel::{Channel, ChannelSender};
8#[cfg(feature = "counter-galois-onion")]
9use crate::circuit::handshake::RelayCryptLayerProtocol;
10use crate::circuit::HopSettings;
11use crate::congestion::sendme;
12use crate::congestion::CongestionSignals;
13use crate::crypto::binding::CircuitBinding;
14use crate::crypto::cell::{
15    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
16    RelayCellBody,
17};
18use crate::crypto::handshake::fast::CreateFastClient;
19use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
20use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
21use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
22use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
23use crate::stream::queue::{stream_queue, StreamQueueSender};
24use crate::stream::{AnyCmdChecker, DrainRateRequest, StreamRateLimit, StreamStatus};
25use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
26use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
27use crate::tunnel::circuit::path;
28use crate::tunnel::circuit::unique_id::UniqId;
29use crate::tunnel::circuit::{CircuitRxReceiver, MutableState, StreamMpscReceiver};
30use crate::tunnel::reactor::MetaCellDisposition;
31use crate::tunnel::streammap;
32use crate::tunnel::TunnelScopedCircId;
33use crate::util::err::ReactorError;
34use crate::util::notify::NotifySender;
35use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
36use crate::util::SinkExt as _;
37use crate::{ClockSkew, Error, Result};
38
39use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
40use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
41use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
42use tor_cell::chancell::{BoxedCellBody, ChanMsg};
43use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
44use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
45use tor_cell::relaycell::{
46    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
47};
48use tor_error::{internal, Bug};
49use tor_linkspec::RelayIds;
50use tor_llcrypto::pk;
51use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
52
53use futures::{SinkExt as _, Stream};
54use oneshot_fused_workaround as oneshot;
55use postage::watch;
56use safelog::sensitive as sv;
57use tor_rtcompat::DynTimeProvider;
58use tracing::{debug, trace, warn};
59
60use super::{
61    CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
62};
63
64use std::borrow::Borrow;
65use std::pin::Pin;
66use std::result::Result as StdResult;
67use std::sync::Arc;
68use std::task::Poll;
69use std::time::{Duration, SystemTime};
70
71use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
72use extender::HandshakeAuxDataHandler;
73
74#[cfg(feature = "hs-service")]
75use {
76    crate::stream::{DataCmdChecker, IncomingStreamRequest},
77    tor_cell::relaycell::msg::Begin,
78};
79
80#[cfg(feature = "conflux")]
81use {
82    super::conflux::ConfluxMsgHandler,
83    super::conflux::{ConfluxAction, OooRelayMsg},
84    crate::tunnel::reactor::RemoveLegReason,
85    crate::tunnel::TunnelId,
86};
87
88pub(super) use circhop::{CircHop, CircHopList};
89
90/// Initial value for outbound flow-control window on streams.
91pub(super) const SEND_WINDOW_INIT: u16 = 500;
92/// Initial value for inbound flow-control window on streams.
93pub(crate) const RECV_WINDOW_INIT: u16 = 500;
94/// Size of the buffer used between the reactor and a `StreamReader`.
95///
96/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
97///             get sent more than the receive window anyway!). We might do due to things that
98///             don't count towards the window though.
99pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
100
101/// A circuit "leg" from a tunnel.
102///
103/// Regular (non-multipath) circuits have a single leg.
104/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
105pub(crate) struct Circuit {
106    /// The time provider.
107    runtime: DynTimeProvider,
108    /// The channel this circuit is attached to.
109    channel: Arc<Channel>,
110    /// Sender object used to actually send cells.
111    ///
112    /// NOTE: Control messages could potentially add unboundedly to this, although that's
113    ///       not likely to happen (and isn't triggereable from the network, either).
114    pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
115    /// Input stream, on which we receive ChanMsg objects from this circuit's
116    /// channel.
117    ///
118    // TODO: could use a SPSC channel here instead.
119    pub(super) input: CircuitRxReceiver,
120    /// The cryptographic state for this circuit for inbound cells.
121    /// This object is divided into multiple layers, each of which is
122    /// shared with one hop of the circuit.
123    crypto_in: InboundClientCrypt,
124    /// The cryptographic state for this circuit for outbound cells.
125    crypto_out: OutboundClientCrypt,
126    /// List of hops state objects used by the reactor
127    hops: CircHopList,
128    /// Mutable information about this circuit,
129    /// shared with the reactor's `ConfluxSet`.
130    mutable: Arc<MutableState>,
131    /// This circuit's identifier on the upstream channel.
132    channel_id: CircId,
133    /// An identifier for logging about this reactor's circuit.
134    unique_id: TunnelScopedCircId,
135    /// A handler for conflux cells.
136    ///
137    /// Set once the conflux handshake is initiated by the reactor
138    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
139    #[cfg(feature = "conflux")]
140    conflux_handler: Option<ConfluxMsgHandler>,
141    /// Memory quota account
142    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
143    memquota: CircuitAccount,
144}
145
146/// A command to run in response to a circuit event.
147///
148/// Unlike `RunOnceCmdInner`, doesn't know anything about `UniqId`s.
149/// The user of the `CircuitCmd`s is supposed to know the `UniqId`
150/// of the circuit the `CircuitCmd` came from.
151///
152/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
153#[derive(Debug)]
154pub(super) enum CircuitCmd {
155    /// Send a RELAY cell on the circuit leg this command originates from.
156    Send(SendRelayCell),
157    /// Handle a SENDME message received on the circuit leg this command originates from.
158    HandleSendMe {
159        /// The hop number.
160        hop: HopNum,
161        /// The SENDME message to handle.
162        sendme: Sendme,
163    },
164    /// Close the specified stream on the circuit leg this command originates from.
165    CloseStream {
166        /// The hop number.
167        hop: HopNum,
168        /// The ID of the stream to close.
169        sid: StreamId,
170        /// The stream-closing behavior.
171        behav: CloseStreamBehavior,
172        /// The reason for closing the stream.
173        reason: streammap::TerminateReason,
174    },
175    /// Remove this circuit from the conflux set.
176    ///
177    /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
178    /// (originating from wrong hop), and for messages that are rejected
179    /// by its inner `AbstractMsgHandler`.
180    #[cfg(feature = "conflux")]
181    ConfluxRemove(RemoveLegReason),
182    /// This circuit has completed the conflux handshake,
183    /// and wants to send the specified cell.
184    ///
185    /// Returned by an `AbstractMsgHandler` to signal to the reactor that
186    /// the conflux handshake is complete.
187    #[cfg(feature = "conflux")]
188    ConfluxHandshakeComplete(SendRelayCell),
189    /// Perform a clean shutdown on this circuit.
190    CleanShutdown,
191    /// Enqueue an out-of-order cell in the reactor.
192    #[cfg(feature = "conflux")]
193    Enqueue(OooRelayMsg),
194}
195
196/// Return a `CircProto` error for the specified unsupported cell.
197///
198/// This error will shut down the reactor.
199///
200/// Note: this is a macro to simplify usage (this way the caller doesn't
201/// need to .map() the result to the appropriate type)
202macro_rules! unsupported_client_cell {
203    ($msg:expr) => {{
204        unsupported_client_cell!(@ $msg, "")
205    }};
206
207    ($msg:expr, $hopnum:expr) => {{
208        let hop: HopNum = $hopnum;
209        let hop_display = format!(" from hop {}", hop.display());
210        unsupported_client_cell!(@ $msg, hop_display)
211    }};
212
213    (@ $msg:expr, $hopnum_display:expr) => {
214        Err(crate::Error::CircProto(format!(
215            "Unexpected {} cell{} on client circuit",
216            $msg.cmd(),
217            $hopnum_display,
218        )))
219    };
220}
221
222pub(super) use unsupported_client_cell;
223
224impl Circuit {
225    /// Create a new non-multipath circuit.
226    pub(super) fn new(
227        runtime: DynTimeProvider,
228        channel: Arc<Channel>,
229        channel_id: CircId,
230        unique_id: TunnelScopedCircId,
231        input: CircuitRxReceiver,
232        memquota: CircuitAccount,
233        mutable: Arc<MutableState>,
234    ) -> Self {
235        let chan_sender = SometimesUnboundedSink::new(channel.sender());
236
237        let crypto_out = OutboundClientCrypt::new();
238        Circuit {
239            runtime,
240            channel,
241            chan_sender,
242            input,
243            crypto_in: InboundClientCrypt::new(),
244            hops: CircHopList::default(),
245            unique_id,
246            channel_id,
247            crypto_out,
248            mutable,
249            #[cfg(feature = "conflux")]
250            conflux_handler: None,
251            memquota,
252        }
253    }
254
255    /// Return the process-unique identifier of this circuit.
256    pub(super) fn unique_id(&self) -> UniqId {
257        self.unique_id.unique_id()
258    }
259
260    /// Return the shared mutable state of this circuit.
261    pub(super) fn mutable(&self) -> &Arc<MutableState> {
262        &self.mutable
263    }
264
265    /// Add this circuit to a multipath tunnel, by associating it with a new [`TunnelId`],
266    /// and installing a [`ConfluxMsgHandler`] on this circuit.
267    ///
268    /// Once this is called, the circuit will be able to handle conflux cells.
269    #[cfg(feature = "conflux")]
270    pub(super) fn add_to_conflux_tunnel(
271        &mut self,
272        tunnel_id: TunnelId,
273        conflux_handler: ConfluxMsgHandler,
274    ) {
275        self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
276        self.conflux_handler = Some(conflux_handler);
277    }
278
279    /// Send a LINK cell to the specified hop.
280    ///
281    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
282    /// on the circuit with [`add_to_conflux_tunnel`](Self::add_to_conflux_tunnel).
283    #[cfg(feature = "conflux")]
284    pub(super) async fn begin_conflux_link(
285        &mut self,
286        hop: HopNum,
287        cell: AnyRelayMsgOuter,
288        runtime: &tor_rtcompat::DynTimeProvider,
289    ) -> Result<()> {
290        use tor_rtcompat::SleepProvider as _;
291
292        if self.conflux_handler.is_none() {
293            return Err(internal!(
294                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
295            )
296            .into());
297        }
298
299        let cell = SendRelayCell {
300            hop,
301            early: false,
302            cell,
303        };
304        self.send_relay_cell(cell).await?;
305
306        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
307            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
308        };
309
310        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
311    }
312
313    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
314    ///
315    /// Returns `None` if this handler hasn't started the handshake yet.
316    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
317        cfg_if::cfg_if! {
318            if #[cfg(feature = "conflux")] {
319                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
320            } else {
321                None
322            }
323        }
324    }
325
326    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
327    #[cfg(test)]
328    pub(super) fn handle_add_fake_hop(
329        &mut self,
330        format: RelayCellFormat,
331        fwd_lasthop: bool,
332        rev_lasthop: bool,
333        dummy_peer_id: path::HopDetail,
334        // TODO-CGO: Take HopSettings instead of CircParams.
335        // (Do this after we've got the virtual-hop refactorings done for
336        // virtual extending.)
337        params: &crate::circuit::CircParameters,
338        done: ReactorResultChannel<()>,
339    ) {
340        use tor_protover::{named, Protocols};
341
342        use crate::tunnel::circuit::test::DummyCrypto;
343
344        assert!(matches!(format, RelayCellFormat::V0));
345        let _ = format; // TODO-CGO: remove this once we have CGO+hs implemented.
346
347        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
348        let rev = Box::new(DummyCrypto::new(rev_lasthop));
349        let binding = None;
350
351        let settings = HopSettings::from_params_and_caps(
352            // This is for testing only, so we'll assume full negotiation took place.
353            crate::circuit::HopNegotiationType::Full,
354            params,
355            &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
356        )
357        .expect("Can't construct HopSettings");
358        self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
359            .expect("could not add hop to circuit");
360        let _ = done.send(Ok(()));
361    }
362
363    /// Encode `msg` and encrypt it, returning the resulting cell
364    /// and tag that should be expected for an authenticated SENDME sent
365    /// in response to that cell.
366    fn encode_relay_cell(
367        crypto_out: &mut OutboundClientCrypt,
368        relay_format: RelayCellFormat,
369        hop: HopNum,
370        early: bool,
371        msg: AnyRelayMsgOuter,
372    ) -> Result<(AnyChanMsg, SendmeTag)> {
373        let mut body: RelayCellBody = msg
374            .encode(relay_format, &mut rand::rng())
375            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
376            .into();
377        let cmd = if early {
378            ChanCmd::RELAY_EARLY
379        } else {
380            ChanCmd::RELAY
381        };
382        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
383        let msg = Relay::from(BoxedCellBody::from(body));
384        let msg = if early {
385            AnyChanMsg::RelayEarly(msg.into())
386        } else {
387            AnyChanMsg::Relay(msg)
388        };
389
390        Ok((msg, tag))
391    }
392
393    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
394    ///
395    /// If there is insufficient outgoing *circuit-level* or *stream-level*
396    /// SENDME window, an error is returned instead.
397    ///
398    /// Does not check whether the cell is well-formed or reasonable.
399    ///
400    /// NOTE: the reactor should not call this function directly, only via
401    /// [`ConfluxSet::send_relay_cell_on_leg`](super::conflux::ConfluxSet::send_relay_cell_on_leg),
402    /// which will reroute the message, if necessary to the primary leg.
403    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
404        let SendRelayCell {
405            hop,
406            early,
407            cell: msg,
408        } = msg;
409
410        let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
411        if !is_conflux_link && self.is_conflux_pending() {
412            // Note: it is the responsibility of the reactor user to wait until
413            // at least one of the legs completes the handshake.
414            return Err(internal!("tried to send cell on unlinked circuit").into());
415        }
416
417        trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
418
419        // Cloned, because we borrow mutably from self when we get the circhop.
420        let runtime = self.runtime.clone();
421        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
422        let stream_id = msg.stream_id();
423        let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
424
425        // We might be out of capacity entirely; see if we are about to hit a limit.
426        //
427        // TODO: If we ever add a notion of _recoverable_ errors below, we'll
428        // need a way to restore this limit, and similarly for take_capacity_to_send().
429        circhop.decrement_outbound_cell_limit()?;
430
431        // We need to apply stream-level flow control *before* encoding the message.
432        if c_t_w {
433            if let Some(stream_id) = stream_id {
434                circhop.take_capacity_to_send(stream_id, msg.msg())?;
435            }
436        }
437
438        // Save the RelayCmd of the message before it gets consumed below.
439        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
440        // so that it can update its counters.
441        let relay_cmd = msg.cmd();
442
443        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
444        //            the whole circuit (e.g. by returning an error).
445        let (msg, tag) = Self::encode_relay_cell(
446            &mut self.crypto_out,
447            circhop.relay_format(),
448            hop,
449            early,
450            msg,
451        )?;
452        // The cell counted for congestion control, inform our algorithm of such and pass down the
453        // tag for authenticated SENDMEs.
454        if c_t_w {
455            circhop.ccontrol_mut().note_data_sent(&runtime, &tag)?;
456        }
457
458        let cell = AnyChanCell::new(Some(self.channel_id), msg);
459        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
460
461        #[cfg(feature = "conflux")]
462        if let Some(conflux) = self.conflux_handler.as_mut() {
463            conflux.note_cell_sent(relay_cmd);
464        }
465
466        Ok(())
467    }
468
469    /// Helper: process a cell on a channel.  Most cells get ignored
470    /// or rejected; a few get delivered to circuits.
471    ///
472    /// Return `CellStatus::CleanShutdown` if we should exit.
473    ///
474    // TODO: returning `Vec<CircuitCmd>` means we're unnecessarily
475    // allocating a `Vec` here. Generally, the number of commands is going to be small
476    // (usually 1, but > 1 when we start supporting packed cells).
477    //
478    // We should consider using smallvec instead. It might also be a good idea to have a
479    // separate higher-level type splitting this out into Single(CircuitCmd),
480    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
481    pub(super) fn handle_cell(
482        &mut self,
483        handlers: &mut CellHandlers,
484        leg: UniqId,
485        cell: ClientCircChanMsg,
486    ) -> Result<Vec<CircuitCmd>> {
487        trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
488        use ClientCircChanMsg::*;
489        match cell {
490            Relay(r) => self.handle_relay_cell(handlers, leg, r),
491            Destroy(d) => {
492                let reason = d.reason();
493                debug!(
494                    circ_id = %self.unique_id,
495                    "Received DESTROY cell. Reason: {} [{}]",
496                    reason.human_str(),
497                    reason
498                );
499
500                self.handle_destroy_cell().map(|c| vec![c])
501            }
502        }
503    }
504
505    /// Decode `cell`, returning its corresponding hop number, tag,
506    /// and decoded body.
507    fn decode_relay_cell(
508        &mut self,
509        cell: Relay,
510    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
511        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
512        let cmd = cell.cmd();
513        let mut body = cell.into_relay_body().into();
514
515        // Decrypt the cell. If it's recognized, then find the
516        // corresponding hop.
517        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
518
519        // Decode the cell.
520        let decode_res = self
521            .hop_mut(hopnum)
522            .ok_or_else(|| {
523                Error::from(internal!(
524                    "Trying to decode cell from nonexistent hop {:?}",
525                    hopnum
526                ))
527            })?
528            .decode(body.into())?;
529
530        Ok((hopnum, tag, decode_res))
531    }
532
533    /// React to a Relay or RelayEarly cell.
534    fn handle_relay_cell(
535        &mut self,
536        handlers: &mut CellHandlers,
537        leg: UniqId,
538        cell: Relay,
539    ) -> Result<Vec<CircuitCmd>> {
540        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
541
542        // Check whether we are allowed to receive more data for this circuit hop.
543        self.hop_mut(hopnum)
544            .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
545            .decrement_inbound_cell_limit()?;
546
547        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
548
549        // Decrement the circuit sendme windows, and see if we need to
550        // send a sendme cell.
551        let send_circ_sendme = if c_t_w {
552            self.hop_mut(hopnum)
553                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
554                .ccontrol_mut()
555                .note_data_received()?
556        } else {
557            false
558        };
559
560        let mut circ_cmds = vec![];
561        // If we do need to send a circuit-level SENDME cell, do so.
562        if send_circ_sendme {
563            // This always sends a V1 (tagged) sendme cell, and thereby assumes
564            // that SendmeEmitMinVersion is no more than 1.  If the authorities
565            // every increase that parameter to a higher number, this will
566            // become incorrect.  (Higher numbers are not currently defined.)
567            let sendme = Sendme::from(tag);
568            let cell = AnyRelayMsgOuter::new(None, sendme.into());
569            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
570                hop: hopnum,
571                early: false,
572                cell,
573            }));
574
575            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
576            self.hop_mut(hopnum)
577                .ok_or_else(|| {
578                    Error::from(internal!(
579                        "Trying to send SENDME to nonexistent hop {:?}",
580                        hopnum
581                    ))
582                })?
583                .ccontrol_mut()
584                .note_sendme_sent()?;
585        }
586
587        let (mut msgs, incomplete) = decode_res.into_parts();
588        while let Some(msg) = msgs.next() {
589            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
590
591            match msg_status {
592                None => continue,
593                Some(msg @ CircuitCmd::CleanShutdown) => {
594                    for m in msgs {
595                        debug!(
596                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
597                            id = self.unique_id
598                        );
599                    }
600                    if let Some(incomplete) = incomplete {
601                        debug!(
602                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
603                            incomplete,
604                            id=self.unique_id,
605                        );
606                    }
607                    circ_cmds.push(msg);
608                    return Ok(circ_cmds);
609                }
610                Some(msg) => {
611                    circ_cmds.push(msg);
612                }
613            }
614        }
615
616        Ok(circ_cmds)
617    }
618
619    /// Handle a single incoming relay message.
620    fn handle_relay_msg(
621        &mut self,
622        handlers: &mut CellHandlers,
623        hopnum: HopNum,
624        leg: UniqId,
625        cell_counts_toward_windows: bool,
626        msg: UnparsedRelayMsg,
627    ) -> Result<Option<CircuitCmd>> {
628        // If this msg wants/refuses to have a Stream ID, does it
629        // have/not have one?
630        let streamid = msg_streamid(&msg)?;
631
632        // If this doesn't have a StreamId, it's a meta cell,
633        // not meant for a particular stream.
634        let Some(streamid) = streamid else {
635            return self.handle_meta_cell(handlers, hopnum, msg);
636        };
637
638        #[cfg(feature = "conflux")]
639        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
640            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
641                ConfluxAction::Deliver(msg) => {
642                    // The message either doesn't count towards the sequence numbers
643                    // or is already well-ordered, so we're ready to handle it.
644
645                    // It's possible that some of our buffered messages are now ready to be
646                    // handled. We don't check that here, however, because that's handled
647                    // by the reactor main loop.
648                    msg
649                }
650                ConfluxAction::Enqueue(msg) => {
651                    // Tell the reactor to enqueue this msg
652                    return Ok(Some(CircuitCmd::Enqueue(msg)));
653                }
654            }
655        } else {
656            // If we don't have a conflux_handler, it means this circuit is not part of
657            // a conflux tunnel, so we can just process the message.
658            msg
659        };
660
661        self.handle_in_order_relay_msg(
662            handlers,
663            hopnum,
664            leg,
665            cell_counts_toward_windows,
666            streamid,
667            msg,
668        )
669    }
670
671    /// Handle a single incoming relay message that is known to be in order.
672    pub(super) fn handle_in_order_relay_msg(
673        &mut self,
674        handlers: &mut CellHandlers,
675        hopnum: HopNum,
676        leg: UniqId,
677        cell_counts_toward_windows: bool,
678        streamid: StreamId,
679        msg: UnparsedRelayMsg,
680    ) -> Result<Option<CircuitCmd>> {
681        #[cfg(feature = "conflux")]
682        if let Some(conflux) = self.conflux_handler.as_mut() {
683            conflux.inc_last_seq_delivered(&msg);
684        }
685
686        // Returns the original message if it's an an incoming stream request
687        // that we need to handle.
688        let hop = self
689            .hop_mut(hopnum)
690            .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
691        let res = hop.handle_msg(cell_counts_toward_windows, streamid, msg)?;
692
693        // If it was an incoming stream request, we don't need to worry about
694        // sending an XOFF as there's no stream data within this message.
695        if let Some(msg) = res {
696            cfg_if::cfg_if! {
697                if #[cfg(feature = "hs-service")] {
698                    return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
699                } else {
700                    return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
701                }
702            }
703        }
704
705        // We may want to send an XOFF if the incoming buffer is too large.
706        if let Some(cell) = hop.maybe_send_xoff(streamid)? {
707            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
708            let cell = SendRelayCell {
709                hop: hopnum,
710                early: false,
711                cell,
712            };
713            return Ok(Some(CircuitCmd::Send(cell)));
714        }
715
716        Ok(None)
717    }
718
719    /// Handle a conflux message coming from the specified hop.
720    ///
721    /// Returns an error if
722    ///
723    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
724    ///   * this is a client circuit and the conflux message originated an unexpected hop
725    ///   * the cell was sent in violation of the handshake protocol
726    #[cfg(feature = "conflux")]
727    fn handle_conflux_msg(
728        &mut self,
729        hop: HopNum,
730        msg: UnparsedRelayMsg,
731    ) -> Result<Option<CircuitCmd>> {
732        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
733            // If conflux is not enabled, tear down the circuit
734            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
735            return Err(Error::CircProto(format!(
736                "Received {} cell from hop {} on non-conflux client circuit?!",
737                msg.cmd(),
738                hop.display(),
739            )));
740        };
741
742        Ok(conflux_handler.handle_conflux_msg(msg, hop))
743    }
744
745    /// For conflux: return the sequence number of the last cell sent on this leg.
746    ///
747    /// Returns an error if this circuit is not part of a conflux set.
748    #[cfg(feature = "conflux")]
749    pub(super) fn last_seq_sent(&self) -> Result<u64> {
750        let handler = self
751            .conflux_handler
752            .as_ref()
753            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
754
755        Ok(handler.last_seq_sent())
756    }
757
758    /// For conflux: set the sequence number of the last cell sent on this leg.
759    ///
760    /// Returns an error if this circuit is not part of a conflux set.
761    #[cfg(feature = "conflux")]
762    pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
763        let handler = self
764            .conflux_handler
765            .as_mut()
766            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
767
768        handler.set_last_seq_sent(n);
769        Ok(())
770    }
771
772    /// For conflux: return the sequence number of the last cell received on this leg.
773    ///
774    /// Returns an error if this circuit is not part of a conflux set.
775    #[cfg(feature = "conflux")]
776    pub(super) fn last_seq_recv(&self) -> Result<u64> {
777        let handler = self
778            .conflux_handler
779            .as_ref()
780            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
781
782        Ok(handler.last_seq_recv())
783    }
784
785    /// A helper for handling incoming stream requests.
786    ///
787    // TODO: can we make this a method on CircHop to avoid the double HopNum lookup?
788    #[cfg(feature = "hs-service")]
789    fn handle_incoming_stream_request(
790        &mut self,
791        handlers: &mut CellHandlers,
792        msg: UnparsedRelayMsg,
793        stream_id: StreamId,
794        hop_num: HopNum,
795        leg: UniqId,
796    ) -> Result<Option<CircuitCmd>> {
797        use super::syncview::ClientCircSyncView;
798        use tor_cell::relaycell::msg::EndReason;
799        use tor_error::into_internal;
800        use tor_log_ratelim::log_ratelim;
801
802        use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
803
804        // We need to construct this early so that we don't double-borrow &mut self
805
806        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
807            return Err(Error::CircProto(
808                "Cannot handle BEGIN cells on this circuit".into(),
809            ));
810        };
811
812        if hop_num != handler.hop_num {
813            return Err(Error::CircProto(format!(
814                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
815                handler.hop_num.display(),
816                msg.cmd(),
817                hop_num.display()
818            )));
819        }
820
821        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
822
823        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
824        // have to look it up again! However, we can't pass the `&mut hop` reference from
825        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
826        // borrowing self as mutable more than once).
827        //
828        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
829        // handle_relay_cell to work around the problem described above
830        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
831
832        if message_closes_stream {
833            hop.ending_msg_received(stream_id)?;
834
835            return Ok(None);
836        }
837
838        let begin = msg
839            .decode::<Begin>()
840            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
841            .into_msg();
842
843        let req = IncomingStreamRequest::Begin(begin);
844
845        {
846            use crate::stream::IncomingStreamRequestDisposition::*;
847
848            let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
849            // IMPORTANT: ClientCircSyncView::n_open_streams() (called via disposition() below)
850            // accesses the stream map mutexes!
851            //
852            // This means it's very important not to call this function while any of the hop's
853            // stream map mutex is held.
854            let view = ClientCircSyncView::new(&self.hops);
855
856            match handler.filter.as_mut().disposition(&ctx, &view)? {
857                Accept => {}
858                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
859                RejectRequest(end) => {
860                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
861                    let cell = SendRelayCell {
862                        hop: hop_num,
863                        early: false,
864                        cell: end_msg,
865                    };
866                    return Ok(Some(CircuitCmd::Send(cell)));
867                }
868            }
869        }
870
871        // TODO: Sadly, we need to look up `&mut hop` yet again,
872        // since we needed to pass `&self.hops` by reference to our filter above. :(
873        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
874        let relay_cell_format = hop.relay_format();
875
876        let memquota = StreamAccount::new(&self.memquota)?;
877
878        let (sender, receiver) = stream_queue(
879            STREAM_READER_BUFFER,
880            &memquota,
881            self.chan_sender.as_inner().time_provider(),
882        )?;
883
884        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
885            self.chan_sender.as_inner().time_provider().clone(),
886            memquota.as_raw_account(),
887        )?;
888
889        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
890
891        // A channel for the reactor to request a new drain rate from the reader.
892        // Typically this notification will be sent after an XOFF is sent so that the reader can
893        // send us a new drain rate when the stream data queue becomes empty.
894        let mut drain_rate_request_tx = NotifySender::new_typed();
895        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
896
897        let cmd_checker = DataCmdChecker::new_connected();
898        hop.add_ent_with_id(
899            sender,
900            msg_rx,
901            rate_limit_tx,
902            drain_rate_request_tx,
903            stream_id,
904            cmd_checker,
905        )?;
906
907        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
908            req,
909            stream_id,
910            hop: (leg, hop_num).into(),
911            msg_tx,
912            receiver,
913            rate_limit_stream: rate_limit_rx,
914            drain_rate_request_stream: drain_rate_request_rx,
915            memquota,
916            relay_cell_format,
917        });
918
919        log_ratelim!("Delivering message to incoming stream handler"; outcome);
920
921        if let Err(e) = outcome {
922            if e.is_full() {
923                // The IncomingStreamRequestHandler's stream is full; it isn't
924                // handling requests fast enough. So instead, we reply with an
925                // END cell.
926                let end_msg = AnyRelayMsgOuter::new(
927                    Some(stream_id),
928                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
929                );
930
931                let cell = SendRelayCell {
932                    hop: hop_num,
933                    early: false,
934                    cell: end_msg,
935                };
936                return Ok(Some(CircuitCmd::Send(cell)));
937            } else if e.is_disconnected() {
938                // The IncomingStreamRequestHandler's stream has been dropped.
939                // In the Tor protocol as it stands, this always means that the
940                // circuit itself is out-of-use and should be closed. (See notes
941                // on `allow_stream_requests.`)
942                //
943                // Note that we will _not_ reach this point immediately after
944                // the IncomingStreamRequestHandler is dropped; we won't hit it
945                // until we next get an incoming request.  Thus, if we do later
946                // want to add early detection for a dropped
947                // IncomingStreamRequestHandler, we need to do it elsewhere, in
948                // a different way.
949                debug!(
950                    circ_id = %self.unique_id,
951                    "Incoming stream request receiver dropped",
952                );
953                // This will _cause_ the circuit to get closed.
954                return Err(Error::CircuitClosed);
955            } else {
956                // There are no errors like this with the current design of
957                // futures::mpsc, but we shouldn't just ignore the possibility
958                // that they'll be added later.
959                return Err(Error::from((into_internal!(
960                    "try_send failed unexpectedly"
961                ))(e)));
962            }
963        }
964
965        Ok(None)
966    }
967
968    /// Helper: process a destroy cell.
969    #[allow(clippy::unnecessary_wraps)]
970    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
971        // I think there is nothing more to do here.
972        Ok(CircuitCmd::CleanShutdown)
973    }
974
975    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
976    pub(super) async fn handle_create(
977        &mut self,
978        recv_created: oneshot::Receiver<CreateResponse>,
979        handshake: CircuitHandshake,
980        settings: HopSettings,
981        done: ReactorResultChannel<()>,
982    ) -> StdResult<(), ReactorError> {
983        let ret = match handshake {
984            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
985            CircuitHandshake::Ntor {
986                public_key,
987                ed_identity,
988            } => {
989                self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
990                    .await
991            }
992            CircuitHandshake::NtorV3 { public_key } => {
993                self.create_firsthop_ntor_v3(recv_created, public_key, settings)
994                    .await
995            }
996        };
997        let _ = done.send(ret); // don't care if sender goes away
998
999        // TODO: maybe we don't need to flush here?
1000        // (we could let run_once() handle all the flushing)
1001        self.chan_sender.flush().await?;
1002
1003        Ok(())
1004    }
1005
1006    /// Helper: create the first hop of a circuit.
1007    ///
1008    /// This is parameterized not just on the RNG, but a wrapper object to
1009    /// build the right kind of create cell, and a handshake object to perform
1010    /// the cryptographic handshake.
1011    async fn create_impl<H, W, M>(
1012        &mut self,
1013        recvcreated: oneshot::Receiver<CreateResponse>,
1014        wrap: &W,
1015        key: &H::KeyType,
1016        mut settings: HopSettings,
1017        msg: &M,
1018    ) -> Result<()>
1019    where
1020        H: ClientHandshake + HandshakeAuxDataHandler,
1021        W: CreateHandshakeWrap,
1022        H::KeyGen: KeyGenerator,
1023        M: Borrow<H::ClientAuxData>,
1024    {
1025        // We don't need to shut down the circuit on failure here, since this
1026        // function consumes the PendingClientCirc and only returns
1027        // a ClientCirc on success.
1028
1029        let (state, msg) = {
1030            // done like this because holding the RNG across an await boundary makes the future
1031            // non-Send
1032            let mut rng = rand::rng();
1033            H::client1(&mut rng, key, msg)?
1034        };
1035        let create_cell = wrap.to_chanmsg(msg);
1036        trace!(
1037            circ_id = %self.unique_id,
1038            create = %create_cell.cmd(),
1039            "Extending to hop 1",
1040        );
1041        self.send_msg(create_cell).await?;
1042
1043        let reply = recvcreated
1044            .await
1045            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1046
1047        let relay_handshake = wrap.decode_chanmsg(reply)?;
1048        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1049
1050        H::handle_server_aux_data(&mut settings, &server_msg)?;
1051
1052        let BoxedClientLayer { fwd, back, binding } = settings
1053            .relay_crypt_protocol()
1054            .construct_client_layers(HandshakeRole::Initiator, keygen)?;
1055
1056        trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
1057
1058        let peer_id = self.channel.target().clone();
1059
1060        self.add_hop(
1061            path::HopDetail::Relay(peer_id),
1062            fwd,
1063            back,
1064            binding,
1065            &settings,
1066        )?;
1067        Ok(())
1068    }
1069
1070    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1071    /// first hop of this circuit.
1072    ///
1073    /// There's no authentication in CREATE_FAST,
1074    /// so we don't need to know whom we're connecting to: we're just
1075    /// connecting to whichever relay the channel is for.
1076    async fn create_firsthop_fast(
1077        &mut self,
1078        recvcreated: oneshot::Receiver<CreateResponse>,
1079        settings: HopSettings,
1080    ) -> Result<()> {
1081        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1082        let wrap = CreateFastWrap;
1083        self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
1084            .await
1085    }
1086
1087    /// Use the ntor handshake to connect to the first hop of this circuit.
1088    ///
1089    /// Note that the provided keys must match the channel's target,
1090    /// or the handshake will fail.
1091    async fn create_firsthop_ntor(
1092        &mut self,
1093        recvcreated: oneshot::Receiver<CreateResponse>,
1094        ed_identity: pk::ed25519::Ed25519Identity,
1095        pubkey: NtorPublicKey,
1096        settings: HopSettings,
1097    ) -> Result<()> {
1098        // Exit now if we have an Ed25519 or RSA identity mismatch.
1099        let target = RelayIds::builder()
1100            .ed_identity(ed_identity)
1101            .rsa_identity(pubkey.id)
1102            .build()
1103            .expect("Unable to build RelayIds");
1104        self.channel.check_match(&target)?;
1105
1106        let wrap = Create2Wrap {
1107            handshake_type: HandshakeType::NTOR,
1108        };
1109        self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
1110            .await
1111    }
1112
1113    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1114    ///
1115    /// Note that the provided key must match the channel's target,
1116    /// or the handshake will fail.
1117    async fn create_firsthop_ntor_v3(
1118        &mut self,
1119        recvcreated: oneshot::Receiver<CreateResponse>,
1120        pubkey: NtorV3PublicKey,
1121        settings: HopSettings,
1122    ) -> Result<()> {
1123        // Exit now if we have a mismatched key.
1124        let target = RelayIds::builder()
1125            .ed_identity(pubkey.id)
1126            .build()
1127            .expect("Unable to build RelayIds");
1128        self.channel.check_match(&target)?;
1129
1130        // Set the client extensions.
1131        let client_extensions = circ_extensions_from_settings(&settings)?;
1132        let wrap = Create2Wrap {
1133            handshake_type: HandshakeType::NTOR_V3,
1134        };
1135
1136        self.create_impl::<NtorV3Client, _, _>(
1137            recvcreated,
1138            &wrap,
1139            &pubkey,
1140            settings,
1141            &client_extensions,
1142        )
1143        .await
1144    }
1145
1146    /// Add a hop to the end of this circuit.
1147    ///
1148    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1149    pub(super) fn add_hop(
1150        &mut self,
1151        peer_id: path::HopDetail,
1152        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1153        rev: Box<dyn InboundClientLayer + 'static + Send>,
1154        binding: Option<CircuitBinding>,
1155        settings: &HopSettings,
1156    ) -> StdResult<(), Bug> {
1157        let hop_num = self.hops.len();
1158        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1159
1160        // There are several places in the code that assume that a `usize` hop number
1161        // can be cast or converted to a `u8` hop number,
1162        // so this check is important to prevent panics or incorrect behaviour.
1163        if hop_num == usize::from(u8::MAX) {
1164            return Err(internal!(
1165                "cannot add more hops to a circuit with `u8::MAX` hops"
1166            ));
1167        }
1168
1169        let hop_num = (hop_num as u8).into();
1170
1171        let hop = CircHop::new(self.unique_id, hop_num, settings);
1172        self.hops.push(hop);
1173        self.crypto_in.add_layer(rev);
1174        self.crypto_out.add_layer(fwd);
1175        self.mutable.add_hop(peer_id, binding);
1176
1177        Ok(())
1178    }
1179
1180    /// Handle a RELAY cell on this circuit with stream ID 0.
1181    #[allow(clippy::cognitive_complexity)]
1182    fn handle_meta_cell(
1183        &mut self,
1184        handlers: &mut CellHandlers,
1185        hopnum: HopNum,
1186        msg: UnparsedRelayMsg,
1187    ) -> Result<Option<CircuitCmd>> {
1188        // SENDME cells and TRUNCATED get handled internally by the circuit.
1189
1190        // TODO: This pattern (Check command, try to decode, map error) occurs
1191        // several times, and would be good to extract simplify. Such
1192        // simplification is obstructed by a couple of factors: First, that
1193        // there is not currently a good way to get the RelayCmd from _type_ of
1194        // a RelayMsg.  Second, that decode() [correctly] consumes the
1195        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
1196        // for it. -nickm
1197        if msg.cmd() == RelayCmd::SENDME {
1198            let sendme = msg
1199                .decode::<Sendme>()
1200                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1201                .into_msg();
1202
1203            return Ok(Some(CircuitCmd::HandleSendMe {
1204                hop: hopnum,
1205                sendme,
1206            }));
1207        }
1208        if msg.cmd() == RelayCmd::TRUNCATED {
1209            let truncated = msg
1210                .decode::<Truncated>()
1211                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1212                .into_msg();
1213            let reason = truncated.reason();
1214            debug!(
1215                circ_id = %self.unique_id,
1216                "Truncated from hop {}. Reason: {} [{}]",
1217                hopnum.display(),
1218                reason.human_str(),
1219                reason
1220            );
1221
1222            return Ok(Some(CircuitCmd::CleanShutdown));
1223        }
1224
1225        trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1226
1227        #[cfg(feature = "conflux")]
1228        if matches!(
1229            msg.cmd(),
1230            RelayCmd::CONFLUX_LINK
1231                | RelayCmd::CONFLUX_LINKED
1232                | RelayCmd::CONFLUX_LINKED_ACK
1233                | RelayCmd::CONFLUX_SWITCH
1234        ) {
1235            return self.handle_conflux_msg(hopnum, msg);
1236        }
1237
1238        if self.is_conflux_pending() {
1239            warn!(
1240                circ_id = %self.unique_id,
1241                "received unexpected cell {msg:?} on unlinked conflux circuit",
1242            );
1243            return Err(Error::CircProto(
1244                "Received unexpected cell on unlinked circuit".into(),
1245            ));
1246        }
1247
1248        // For all other command types, we'll only get them in response
1249        // to another command, which should have registered a responder.
1250        //
1251        // TODO: should the conflux state machine be a meta cell handler?
1252        // We'd need to add support for multiple meta handlers, and change the
1253        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
1254        // (because some cells will require sending a response)
1255        if let Some(mut handler) = handlers.meta_handler.take() {
1256            // The handler has a TargetHop so we do a quick convert for equality check.
1257            if handler.expected_hop() == (self.unique_id(), hopnum).into() {
1258                // Somebody was waiting for a message -- maybe this message
1259                let ret = handler.handle_msg(msg, self);
1260                trace!(
1261                    circ_id = %self.unique_id,
1262                    result = ?ret,
1263                    "meta handler completed",
1264                );
1265                match ret {
1266                    #[cfg(feature = "send-control-msg")]
1267                    Ok(MetaCellDisposition::Consumed) => {
1268                        handlers.meta_handler = Some(handler);
1269                        Ok(None)
1270                    }
1271                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1272                    #[cfg(feature = "send-control-msg")]
1273                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1274                    Err(e) => Err(e),
1275                }
1276            } else {
1277                // Somebody wanted a message from a different hop!  Put this
1278                // one back.
1279                handlers.meta_handler = Some(handler);
1280
1281                unsupported_client_cell!(msg, hopnum)
1282            }
1283        } else {
1284            // No need to call shutdown here, since this error will
1285            // propagate to the reactor shut it down.
1286            unsupported_client_cell!(msg)
1287        }
1288    }
1289
1290    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1291    pub(super) fn handle_sendme(
1292        &mut self,
1293        hopnum: HopNum,
1294        msg: Sendme,
1295        signals: CongestionSignals,
1296    ) -> Result<Option<CircuitCmd>> {
1297        // Cloned, because we borrow mutably from self when we get the circhop.
1298        let runtime = self.runtime.clone();
1299
1300        // No need to call "shutdown" on errors in this function;
1301        // it's called from the reactor task and errors will propagate there.
1302        let hop = self
1303            .hop_mut(hopnum)
1304            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1305
1306        let tag = msg.into_sendme_tag().ok_or_else(||
1307                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1308                // but we don't support those any longer.
1309                 Error::CircProto("missing tag on circuit sendme".into()))?;
1310        // Update the CC object that we received a SENDME along with possible congestion signals.
1311        hop.ccontrol_mut()
1312            .note_sendme_received(&runtime, tag, signals)?;
1313        Ok(None)
1314    }
1315
1316    /// Send a message onto the circuit's channel.
1317    ///
1318    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1319    /// will be enqueued for sending at a later iteration of the reactor loop.
1320    ///
1321    /// # Note
1322    ///
1323    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1324    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1325    /// ideally use this to implement backpressure (such that you do not read from other sources
1326    /// that would send here while you know you're unable to forward the messages on).
1327    async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1328        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1329        // Note: this future is always `Ready`, so await won't block.
1330        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1331        Ok(())
1332    }
1333
1334    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
1335    ///
1336    /// The iterator contains at most one [`CircuitCmd`] for each hop,
1337    /// (excluding the `exclude` hop, if specified),
1338    /// representing the instructions for handling the ready-item, if any,
1339    /// of its highest priority stream.
1340    ///
1341    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
1342    /// To avoid contention, never create more than one [`Circuit::ready_streams_iterator`]
1343    /// stream at a time!
1344    ///
1345    /// This is cancellation-safe.
1346    pub(super) fn ready_streams_iterator(
1347        &self,
1348        exclude: Option<HopNum>,
1349    ) -> impl Stream<Item = Result<CircuitCmd>> {
1350        self.hops.ready_streams_iterator(exclude)
1351    }
1352
1353    /// Return the congestion signals for this reactor. This is used by congestion control module.
1354    ///
1355    /// Note: This is only async because we need a Context to check the sink for readiness.
1356    pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1357        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1358            Poll::Ready(CongestionSignals::new(
1359                self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1360                self.chan_sender.n_queued(),
1361            ))
1362        })
1363        .await
1364    }
1365
1366    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
1367    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1368        self.hops.hop(hopnum)
1369    }
1370
1371    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1372    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1373        self.hops.get_mut(hopnum)
1374    }
1375
1376    /// Begin a stream with the provided hop in this circuit.
1377    // TODO: see if there's a way that we can clean this up
1378    #[allow(clippy::too_many_arguments)]
1379    pub(super) fn begin_stream(
1380        &mut self,
1381        hop_num: HopNum,
1382        message: AnyRelayMsg,
1383        sender: StreamQueueSender,
1384        rx: StreamMpscReceiver<AnyRelayMsg>,
1385        rate_limit_notifier: watch::Sender<StreamRateLimit>,
1386        drain_rate_requester: NotifySender<DrainRateRequest>,
1387        cmd_checker: AnyCmdChecker,
1388    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1389        let Some(hop) = self.hop_mut(hop_num) else {
1390            return Err(internal!(
1391                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1392                self.unique_id,
1393            ));
1394        };
1395
1396        Ok(hop.begin_stream(
1397            message,
1398            sender,
1399            rx,
1400            rate_limit_notifier,
1401            drain_rate_requester,
1402            cmd_checker,
1403        ))
1404    }
1405
1406    /// Close the specified stream
1407    pub(super) async fn close_stream(
1408        &mut self,
1409        hop_num: HopNum,
1410        sid: StreamId,
1411        behav: CloseStreamBehavior,
1412        reason: streammap::TerminateReason,
1413    ) -> Result<()> {
1414        if let Some(hop) = self.hop_mut(hop_num) {
1415            let res = hop.close_stream(sid, behav, reason)?;
1416            if let Some(cell) = res {
1417                self.send_relay_cell(cell).await?;
1418            }
1419        }
1420        Ok(())
1421    }
1422
1423    /// Returns true if there are any streams on this circuit
1424    ///
1425    /// Important: this function locks the stream map of its each of the [`CircHop`]s
1426    /// in this circuit, so it must **not** be called from any function where the
1427    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
1428    pub(super) fn has_streams(&self) -> bool {
1429        self.hops.has_streams()
1430    }
1431
1432    /// The number of hops in this circuit.
1433    pub(super) fn num_hops(&self) -> u8 {
1434        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
1435        // so `self.hops.len()` should be safe to cast to a `u8`.
1436        // If that assumption is violated,
1437        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1438        self.hops
1439            .len()
1440            .try_into()
1441            .expect("`hops.len()` has more than `u8::MAX` hops")
1442    }
1443
1444    /// Check whether this circuit has any hops.
1445    pub(super) fn has_hops(&self) -> bool {
1446        !self.hops.is_empty()
1447    }
1448
1449    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
1450    ///
1451    /// Returns `None` if the circuit has no hops.
1452    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1453        let num_hops = self.num_hops();
1454        if num_hops == 0 {
1455            // asked for the last hop, but there are no hops
1456            return None;
1457        }
1458        Some(HopNum::from(num_hops - 1))
1459    }
1460
1461    /// Get the path of the circuit.
1462    ///
1463    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
1464    pub(super) fn path(&self) -> Arc<path::Path> {
1465        self.mutable.path()
1466    }
1467
1468    /// Return a ClockSkew declaring how much clock skew the other side of this channel
1469    /// claimed that we had when we negotiated the connection.
1470    pub(super) fn clock_skew(&self) -> ClockSkew {
1471        self.channel.clock_skew()
1472    }
1473
1474    /// Does congestion control use stream SENDMEs for the given `hop`?
1475    ///
1476    /// Returns `None` if `hop` doesn't exist.
1477    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1478        let hop = self.hop(hop)?;
1479        Some(hop.ccontrol().uses_stream_sendme())
1480    }
1481
1482    /// Returns whether this is a conflux circuit that is not linked yet.
1483    pub(super) fn is_conflux_pending(&self) -> bool {
1484        let Some(status) = self.conflux_status() else {
1485            return false;
1486        };
1487
1488        status != ConfluxStatus::Linked
1489    }
1490
1491    /// Returns the conflux status of this circuit.
1492    ///
1493    /// Returns `None` if this is not a conflux circuit.
1494    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1495        cfg_if::cfg_if! {
1496            if #[cfg(feature = "conflux")] {
1497                self.conflux_handler
1498                    .as_ref()
1499                    .map(|handler| handler.status())
1500            } else {
1501                None
1502            }
1503        }
1504    }
1505
1506    /// Returns initial RTT on this leg, measured in the conflux handshake.
1507    #[cfg(feature = "conflux")]
1508    pub(super) fn init_rtt(&self) -> Option<Duration> {
1509        self.conflux_handler
1510            .as_ref()
1511            .map(|handler| handler.init_rtt())?
1512    }
1513}
1514
1515/// The conflux status of a conflux [`Circuit`].
1516#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1517pub(super) enum ConfluxStatus {
1518    /// Circuit has not begun the conflux handshake yet.
1519    Unlinked,
1520    /// Conflux handshake is in progress.
1521    Pending,
1522    /// A linked conflux circuit.
1523    Linked,
1524}
1525
1526/// Return the stream ID of `msg`, if it has one.
1527///
1528/// Returns `Ok(None)` if `msg` is a meta cell.
1529fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1530    let cmd = msg.cmd();
1531    let streamid = msg.stream_id();
1532    if !cmd.accepts_streamid_val(streamid) {
1533        return Err(Error::CircProto(format!(
1534            "Invalid stream ID {} for relay command {}",
1535            sv(StreamId::get_or_zero(streamid)),
1536            msg.cmd()
1537        )));
1538    }
1539
1540    Ok(streamid)
1541}
1542
1543impl Drop for Circuit {
1544    fn drop(&mut self) {
1545        let _ = self.channel.close_circuit(self.channel_id);
1546    }
1547}
1548
1549/// Return the client circuit-creation extensions that we should use in order to negotiate
1550/// a given set of circuit hop parameters.
1551#[allow(clippy::unnecessary_wraps)]
1552pub(super) fn circ_extensions_from_settings(params: &HopSettings) -> Result<Vec<CircRequestExt>> {
1553    // allow 'unused_mut' because of the combinations of `cfg` conditions below
1554    #[allow(unused_mut)]
1555    let mut client_extensions = Vec::new();
1556
1557    #[allow(unused, unused_mut)]
1558    let mut cc_extension_set = false;
1559
1560    if params.ccontrol.is_enabled() {
1561        cfg_if::cfg_if! {
1562            if #[cfg(feature = "flowctl-cc")] {
1563                // TODO(arti#88): We have an `if false` in `exit_circparams_from_netparams`
1564                // which should prevent the above `is_enabled()` from ever being true,
1565                // even with the "flowctl-cc" feature enabled:
1566                // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
1567                // The panic here is so that CI tests will hopefully catch if congestion
1568                // control is unexpectedly enabled.
1569                // We should remove this panic once xon/xoff flow is supported.
1570                #[cfg(not(test))]
1571                panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1572
1573                #[allow(unreachable_code)]
1574                client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1575                cc_extension_set = true;
1576            } else {
1577                return Err(
1578                    tor_error::internal!(
1579                        "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1580                    )
1581                    .into()
1582                );
1583            }
1584        }
1585    }
1586
1587    // See whether we need to send a list of required protocol capabilities.
1588    // These aren't "negotiated" per se; they're simply demanded.
1589    // The relay will refuse the circuit if it doesn't support all of them,
1590    // and if any of them isn't supported in the SubprotocolRequest extension.
1591    //
1592    // (In other words, don't add capabilities here just because you want the
1593    // relay to have them! They must be explicitly listed as supported for use
1594    // with this extension. For the current list, see
1595    // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
1596    //
1597    #[allow(unused_mut)]
1598    let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
1599
1600    #[cfg(feature = "counter-galois-onion")]
1601    if matches!(params.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
1602        if !cc_extension_set {
1603            return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
1604        }
1605        required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
1606    }
1607
1608    if !required_protocol_capabilities.is_empty() {
1609        client_extensions.push(CircRequestExt::SubprotocolRequest(
1610            required_protocol_capabilities.into_iter().collect(),
1611        ));
1612    }
1613
1614    Ok(client_extensions)
1615}