tor_proto/tunnel/reactor/
circuit.rs

1//! Module exposing types for representing circuits in the tunnel reactor.
2
3pub(super) mod create;
4pub(super) mod extender;
5
6use crate::channel::{Channel, ChannelSender};
7use crate::congestion::sendme;
8use crate::congestion::{CongestionControl, CongestionSignals};
9use crate::crypto::binding::CircuitBinding;
10use crate::crypto::cell::{
11    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
12    RelayCellBody,
13};
14use crate::crypto::handshake::fast::CreateFastClient;
15use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
16use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
17use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
18use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
19use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
20use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
21use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
22use crate::tunnel::circuit::path;
23use crate::tunnel::circuit::unique_id::UniqId;
24use crate::tunnel::circuit::{
25    CircParameters, CircuitRxReceiver, MutableState, StreamMpscReceiver, StreamMpscSender,
26};
27use crate::tunnel::handshake::RelayCryptLayerProtocol;
28use crate::tunnel::reactor::MetaCellDisposition;
29use crate::tunnel::streammap::{
30    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
31};
32use crate::util::err::ReactorError;
33use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
34use crate::util::SinkExt as _;
35use crate::{ClockSkew, Error, Result};
36
37use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
38use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
39use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
40use tor_cell::chancell::{BoxedCellBody, ChanMsg};
41use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
42use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
43use tor_cell::relaycell::{
44    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
45    StreamId, UnparsedRelayMsg,
46};
47use tor_error::{internal, Bug};
48use tor_linkspec::RelayIds;
49use tor_llcrypto::pk;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52use futures::stream::FuturesUnordered;
53use futures::{SinkExt as _, Stream};
54use oneshot_fused_workaround as oneshot;
55use safelog::sensitive as sv;
56use tracing::{debug, trace, warn};
57
58#[cfg(feature = "conflux")]
59use super::conflux::ConfluxMsgHandler;
60use super::{
61    CellHandlers, CircuitHandshake, CloseStreamBehavior, LegId, ReactorResultChannel, SendRelayCell,
62};
63
64use std::borrow::Borrow;
65use std::pin::Pin;
66use std::result::Result as StdResult;
67use std::sync::{Arc, Mutex};
68use std::task::Poll;
69use std::time::{Duration, SystemTime};
70
71use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
72use extender::HandshakeAuxDataHandler;
73
74#[cfg(feature = "hs-service")]
75use {
76    crate::stream::{DataCmdChecker, IncomingStreamRequest},
77    tor_cell::relaycell::msg::Begin,
78};
79
80#[cfg(feature = "conflux")]
81use {
82    super::conflux::{ConfluxAction, OooRelayMsg},
83    crate::tunnel::reactor::RemoveLegReason,
84};
85
86/// Initial value for outbound flow-control window on streams.
87pub(super) const SEND_WINDOW_INIT: u16 = 500;
88/// Initial value for inbound flow-control window on streams.
89pub(crate) const RECV_WINDOW_INIT: u16 = 500;
90/// Size of the buffer used between the reactor and a `StreamReader`.
91///
92/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
93///             get sent more than the receive window anyway!). We might do due to things that
94///             don't count towards the window though.
95pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
96
97/// Represents the reactor's view of a single hop.
98pub(super) struct CircHop {
99    /// Reactor unique ID. Used for logging.
100    unique_id: UniqId,
101    /// Hop number in the path.
102    hop_num: HopNum,
103    /// Map from stream IDs to streams.
104    ///
105    /// We store this with the reactor instead of the circuit, since the
106    /// reactor needs it for every incoming cell on a stream, whereas
107    /// the circuit only needs it when allocating new streams.
108    ///
109    /// NOTE: this is behind a mutex because the reactor polls the `StreamMap`s
110    /// of all hops concurrently, in a [`FuturesUnordered`]. Without the mutex,
111    /// this wouldn't be possible, because it would mean holding multiple
112    /// mutable references to `self` (the reactor). Note, however,
113    /// that there should never be any contention on this mutex:
114    /// we never create more than one [`Circuit::ready_streams_iterator`] stream
115    /// at a time, and we never clone/lock the hop's `StreamMap` outside of
116    /// [`Circuit::ready_streams_iterator`].
117    ///
118    // TODO: encapsulate the Vec<CircHop> into a separate CircHops structure,
119    // and hide its internals from the Reactor. The CircHops implementation
120    // should enforce the invariant described in the note above.
121    map: Arc<Mutex<streammap::StreamMap>>,
122    /// Congestion control object.
123    ///
124    /// This object is also in charge of handling circuit level SENDME logic for this hop.
125    ccontrol: CongestionControl,
126    /// Decodes relay cells received from this hop.
127    inbound: RelayCellDecoder,
128    /// Format to use for relay cells.
129    //
130    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
131    relay_format: RelayCellFormat,
132}
133
134/// A circuit "leg" from a tunnel.
135///
136/// Regular (non-multipath) circuits have a single leg.
137/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
138pub(crate) struct Circuit {
139    /// The channel this circuit is attached to.
140    channel: Arc<Channel>,
141    /// Sender object used to actually send cells.
142    ///
143    /// NOTE: Control messages could potentially add unboundedly to this, although that's
144    ///       not likely to happen (and isn't triggereable from the network, either).
145    pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
146    /// Input stream, on which we receive ChanMsg objects from this circuit's
147    /// channel.
148    ///
149    // TODO: could use a SPSC channel here instead.
150    pub(super) input: CircuitRxReceiver,
151    /// The cryptographic state for this circuit for inbound cells.
152    /// This object is divided into multiple layers, each of which is
153    /// shared with one hop of the circuit.
154    crypto_in: InboundClientCrypt,
155    /// The cryptographic state for this circuit for outbound cells.
156    crypto_out: OutboundClientCrypt,
157    /// List of hops state objects used by the reactor
158    hops: Vec<CircHop>,
159    /// Mutable information about this circuit,
160    /// shared with the reactor's `ConfluxSet`.
161    mutable: Arc<MutableState>,
162    /// This circuit's identifier on the upstream channel.
163    channel_id: CircId,
164    /// An identifier for logging about this reactor's circuit.
165    unique_id: UniqId,
166    /// A handler for conflux cells.
167    ///
168    /// Set once the conflux handshake is initiated by the reactor
169    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
170    #[cfg(feature = "conflux")]
171    conflux_handler: Option<ConfluxMsgHandler>,
172    /// Memory quota account
173    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
174    memquota: CircuitAccount,
175}
176
177/// A command to run in response to a circuit event.
178///
179/// Unlike `RunOnceCmdInner`, doesn't know anything about `LegId`s.
180/// The user of the `CircuitCmd`s is supposed to know the `LegId`
181/// of the circuit the `CircuitCmd` came from.
182///
183/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
184#[derive(Debug)]
185pub(super) enum CircuitCmd {
186    /// Send a RELAY cell on the circuit leg this command originates from.
187    Send(SendRelayCell),
188    /// Handle a SENDME message received on the circuit leg this command originates from.
189    HandleSendMe {
190        /// The hop number.
191        hop: HopNum,
192        /// The SENDME message to handle.
193        sendme: Sendme,
194    },
195    /// Close the specified stream on the circuit leg this command originates from.
196    CloseStream {
197        /// The hop number.
198        hop: HopNum,
199        /// The ID of the stream to close.
200        sid: StreamId,
201        /// The stream-closing behavior.
202        behav: CloseStreamBehavior,
203        /// The reason for closing the stream.
204        reason: streammap::TerminateReason,
205    },
206    /// Remove this circuit from the conflux set.
207    ///
208    /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
209    /// (originating from wrong hop), and for messages that are rejected
210    /// by its inner `AbstractMsgHandler`.
211    #[cfg(feature = "conflux")]
212    ConfluxRemove(RemoveLegReason),
213    /// This circuit has completed the conflux handshake,
214    /// and wants to send the specified cell.
215    ///
216    /// Returned by an `AbstractMsgHandler` to signal to the reactor that
217    /// the conflux handshake is complete.
218    #[cfg(feature = "conflux")]
219    ConfluxHandshakeComplete(SendRelayCell),
220    /// Perform a clean shutdown on this circuit.
221    CleanShutdown,
222    /// Enqueue an out-of-order cell in the reactor.
223    #[cfg(feature = "conflux")]
224    Enqueue(OooRelayMsg),
225}
226
227/// Return a `CircProto` error for the specified unsupported cell.
228///
229/// This error will shut down the reactor.
230///
231/// Note: this is a macro to simplify usage (this way the caller doesn't
232/// need to .map() the result to the appropriate type)
233macro_rules! unsupported_client_cell {
234    ($msg:expr) => {{
235        unsupported_client_cell!(@ $msg, "")
236    }};
237
238    ($msg:expr, $hopnum:expr) => {{
239        let hop: HopNum = $hopnum;
240        let hop_display = format!(" from hop {}", hop.display());
241        unsupported_client_cell!(@ $msg, hop_display)
242    }};
243
244    (@ $msg:expr, $hopnum_display:expr) => {
245        Err(crate::Error::CircProto(format!(
246            "Unexpected {} cell{} on client circuit",
247            $msg.cmd(),
248            $hopnum_display,
249        )))
250    };
251}
252
253pub(super) use unsupported_client_cell;
254
255impl Circuit {
256    /// Create a new non-multipath circuit.
257    pub(super) fn new(
258        channel: Arc<Channel>,
259        channel_id: CircId,
260        unique_id: UniqId,
261        input: CircuitRxReceiver,
262        memquota: CircuitAccount,
263        mutable: Arc<MutableState>,
264    ) -> Self {
265        let chan_sender = SometimesUnboundedSink::new(channel.sender());
266
267        let crypto_out = OutboundClientCrypt::new();
268        Circuit {
269            channel,
270            chan_sender,
271            input,
272            crypto_in: InboundClientCrypt::new(),
273            hops: vec![],
274            unique_id,
275            channel_id,
276            crypto_out,
277            mutable,
278            #[cfg(feature = "conflux")]
279            conflux_handler: None,
280            memquota,
281        }
282    }
283
284    /// Return the process-unique identifier of this circuit.
285    pub(super) fn unique_id(&self) -> UniqId {
286        self.unique_id
287    }
288
289    /// Return the shared mutable state of this circuit.
290    pub(super) fn mutable(&self) -> &Arc<MutableState> {
291        &self.mutable
292    }
293
294    /// Install a [`ConfluxMsgHandler`] on this circuit,
295    ///
296    /// Once this is called, the circuit will be able to handle conflux cells.
297    #[cfg(feature = "conflux")]
298    pub(super) fn install_conflux_handler(&mut self, conflux_handler: ConfluxMsgHandler) {
299        self.conflux_handler = Some(conflux_handler);
300    }
301
302    /// Send a LINK cell to the specified hop.
303    ///
304    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
305    /// on the circuit with [`install_conflux_handler`](Self::install_conflux_handler).
306    #[cfg(feature = "conflux")]
307    pub(super) async fn begin_conflux_link(
308        &mut self,
309        hop: HopNum,
310        cell: AnyRelayMsgOuter,
311        runtime: &tor_rtcompat::DynTimeProvider,
312    ) -> Result<()> {
313        use tor_rtcompat::SleepProvider as _;
314
315        if self.conflux_handler.is_none() {
316            return Err(internal!(
317                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
318            )
319            .into());
320        }
321
322        let cell = SendRelayCell {
323            hop,
324            early: false,
325            cell,
326        };
327        self.send_relay_cell(cell).await?;
328
329        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
330            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
331        };
332
333        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
334    }
335
336    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
337    ///
338    /// Returns `None` if this handler hasn't started the handshake yet.
339    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
340        cfg_if::cfg_if! {
341            if #[cfg(feature = "conflux")] {
342                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
343            } else {
344                None
345            }
346        }
347    }
348
349    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
350    #[cfg(test)]
351    pub(super) fn handle_add_fake_hop(
352        &mut self,
353        format: RelayCellFormat,
354        fwd_lasthop: bool,
355        rev_lasthop: bool,
356        params: &CircParameters,
357        done: ReactorResultChannel<()>,
358    ) {
359        use crate::tunnel::circuit::test::DummyCrypto;
360
361        let dummy_peer_id = tor_linkspec::OwnedChanTarget::builder()
362            .ed_identity([4; 32].into())
363            .rsa_identity([5; 20].into())
364            .build()
365            .expect("Could not construct fake hop");
366
367        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
368        let rev = Box::new(DummyCrypto::new(rev_lasthop));
369        let binding = None;
370        self.add_hop(
371            format,
372            path::HopDetail::Relay(dummy_peer_id),
373            fwd,
374            rev,
375            binding,
376            params,
377        )
378        .expect("could not add hop to circuit");
379        let _ = done.send(Ok(()));
380    }
381
382    /// Encode `msg` and encrypt it, returning the resulting cell
383    /// and tag that should be expected for an authenticated SENDME sent
384    /// in response to that cell.
385    fn encode_relay_cell(
386        crypto_out: &mut OutboundClientCrypt,
387        relay_format: RelayCellFormat,
388        hop: HopNum,
389        early: bool,
390        msg: AnyRelayMsgOuter,
391    ) -> Result<(AnyChanMsg, SendmeTag)> {
392        let mut body: RelayCellBody = msg
393            .encode(relay_format, &mut rand::rng())
394            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
395            .into();
396        let cmd = if early {
397            ChanCmd::RELAY_EARLY
398        } else {
399            ChanCmd::RELAY
400        };
401        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
402        let msg = Relay::from(BoxedCellBody::from(body));
403        let msg = if early {
404            AnyChanMsg::RelayEarly(msg.into())
405        } else {
406            AnyChanMsg::Relay(msg)
407        };
408
409        Ok((msg, tag))
410    }
411
412    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
413    ///
414    /// If there is insufficient outgoing *circuit-level* or *stream-level*
415    /// SENDME window, an error is returned instead.
416    ///
417    /// Does not check whether the cell is well-formed or reasonable.
418    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
419        if self.is_conflux_pending() {
420            // TODO(conflux): is this right? Should we ensure all the legs are linked?
421            return Err(internal!("tried to send cell on unlinked circuit").into());
422        }
423
424        let SendRelayCell {
425            hop,
426            early,
427            cell: msg,
428        } = msg;
429
430        trace!("{}: sending relay cell: {:?}", self.unique_id, msg);
431
432        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
433        let stream_id = msg.stream_id();
434        let hop_num = Into::<usize>::into(hop);
435        let circhop = &mut self.hops.get_mut(hop_num).ok_or(Error::NoSuchHop)?;
436
437        // We need to apply stream-level flow control *before* encoding the message.
438        if c_t_w {
439            if let Some(stream_id) = stream_id {
440                let mut hop_map = circhop.map.lock().expect("lock poisoned");
441                let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
442                    warn!(
443                        "{}: sending a relay cell for non-existent or non-open stream with ID {}!",
444                        self.unique_id, stream_id
445                    );
446                    return Err(Error::CircProto(format!(
447                        "tried to send a relay cell on non-open stream {}",
448                        sv(stream_id),
449                    )));
450                };
451                ent.take_capacity_to_send(msg.msg())?;
452            }
453        }
454
455        // Save the RelayCmd of the message before it gets consumed below.
456        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
457        // so that it can update its counters.
458        let relay_cmd = msg.cmd();
459
460        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
461        //            the whole circuit (e.g. by returning an error).
462        let (msg, tag) =
463            Self::encode_relay_cell(&mut self.crypto_out, circhop.relay_format, hop, early, msg)?;
464        // The cell counted for congestion control, inform our algorithm of such and pass down the
465        // tag for authenticated SENDMEs.
466        if c_t_w {
467            circhop.ccontrol.note_data_sent(&tag)?;
468        }
469
470        let cell = AnyChanCell::new(Some(self.channel_id), msg);
471        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
472
473        #[cfg(feature = "conflux")]
474        if let Some(conflux) = self.conflux_handler.as_mut() {
475            conflux.note_cell_sent(relay_cmd);
476        }
477
478        Ok(())
479    }
480
481    /// Helper: process a cell on a channel.  Most cells get ignored
482    /// or rejected; a few get delivered to circuits.
483    ///
484    /// Return `CellStatus::CleanShutdown` if we should exit.
485    ///
486    // TODO(conflux): returning `Vec<CircuitCmd>` means we're unnecessarily
487    // allocating a `Vec` here. Generally, the number of commands is going to be small
488    // (usually 1, but > 1 when we start supporting packed cells).
489    //
490    // We should consider using smallvec instead. It might also be a good idea to have a
491    // separate higher-level type splitting this out into Single(CircuitCmd),
492    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
493    pub(super) fn handle_cell(
494        &mut self,
495        handlers: &mut CellHandlers,
496        leg: LegId,
497        cell: ClientCircChanMsg,
498    ) -> Result<Vec<CircuitCmd>> {
499        trace!("{}: handling cell: {:?}", self.unique_id, cell);
500        use ClientCircChanMsg::*;
501        match cell {
502            Relay(r) => self.handle_relay_cell(handlers, leg, r),
503            Destroy(d) => {
504                let reason = d.reason();
505                debug!(
506                    "{}: Received DESTROY cell. Reason: {} [{}]",
507                    self.unique_id,
508                    reason.human_str(),
509                    reason
510                );
511
512                self.handle_destroy_cell().map(|c| vec![c])
513            }
514        }
515    }
516
517    /// Decode `cell`, returning its corresponding hop number, tag,
518    /// and decoded body.
519    fn decode_relay_cell(
520        &mut self,
521        cell: Relay,
522    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
523        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
524        let cmd = cell.cmd();
525        let mut body = cell.into_relay_body().into();
526
527        // Decrypt the cell. If it's recognized, then find the
528        // corresponding hop.
529        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
530
531        // Decode the cell.
532        let decode_res = self
533            .hop_mut(hopnum)
534            .ok_or_else(|| {
535                Error::from(internal!(
536                    "Trying to decode cell from nonexistent hop {:?}",
537                    hopnum
538                ))
539            })?
540            .inbound
541            .decode(body.into())
542            .map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
543
544        Ok((hopnum, tag, decode_res))
545    }
546
547    /// React to a Relay or RelayEarly cell.
548    fn handle_relay_cell(
549        &mut self,
550        handlers: &mut CellHandlers,
551        leg: LegId,
552        cell: Relay,
553    ) -> Result<Vec<CircuitCmd>> {
554        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
555
556        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
557
558        // Decrement the circuit sendme windows, and see if we need to
559        // send a sendme cell.
560        let send_circ_sendme = if c_t_w {
561            self.hop_mut(hopnum)
562                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
563                .ccontrol
564                .note_data_received()?
565        } else {
566            false
567        };
568
569        let mut circ_cmds = vec![];
570        // If we do need to send a circuit-level SENDME cell, do so.
571        if send_circ_sendme {
572            // This always sends a V1 (tagged) sendme cell, and thereby assumes
573            // that SendmeEmitMinVersion is no more than 1.  If the authorities
574            // every increase that parameter to a higher number, this will
575            // become incorrect.  (Higher numbers are not currently defined.)
576            let sendme = Sendme::from(tag);
577            let cell = AnyRelayMsgOuter::new(None, sendme.into());
578            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
579                hop: hopnum,
580                early: false,
581                cell,
582            }));
583
584            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
585            self.hop_mut(hopnum)
586                .ok_or_else(|| {
587                    Error::from(internal!(
588                        "Trying to send SENDME to nonexistent hop {:?}",
589                        hopnum
590                    ))
591                })?
592                .ccontrol
593                .note_sendme_sent()?;
594        }
595
596        let (mut msgs, incomplete) = decode_res.into_parts();
597        while let Some(msg) = msgs.next() {
598            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
599
600            match msg_status {
601                None => continue,
602                Some(msg @ CircuitCmd::CleanShutdown) => {
603                    for m in msgs {
604                        debug!(
605                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
606                            id = self.unique_id
607                        );
608                    }
609                    if let Some(incomplete) = incomplete {
610                        debug!(
611                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
612                            incomplete,
613                            id=self.unique_id,
614                        );
615                    }
616                    circ_cmds.push(msg);
617                    return Ok(circ_cmds);
618                }
619                Some(msg) => {
620                    circ_cmds.push(msg);
621                }
622            }
623        }
624
625        Ok(circ_cmds)
626    }
627
628    /// Handle a single incoming relay message.
629    fn handle_relay_msg(
630        &mut self,
631        handlers: &mut CellHandlers,
632        hopnum: HopNum,
633        leg: LegId,
634        cell_counts_toward_windows: bool,
635        msg: UnparsedRelayMsg,
636    ) -> Result<Option<CircuitCmd>> {
637        // If this msg wants/refuses to have a Stream ID, does it
638        // have/not have one?
639        let streamid = msg_streamid(&msg)?;
640
641        // If this doesn't have a StreamId, it's a meta cell,
642        // not meant for a particular stream.
643        let Some(streamid) = streamid else {
644            return self.handle_meta_cell(handlers, hopnum, msg);
645        };
646
647        #[cfg(feature = "conflux")]
648        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
649            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
650                ConfluxAction::Deliver(msg) => {
651                    // The message either doesn't count towards the sequence numbers
652                    // or is already well-ordered, so we're ready to handle it.
653
654                    // It's possible that some of our buffered messages are now ready to be
655                    // handled. We don't check that here, however, because that's handled
656                    // by the reactor main loop.
657                    msg
658                }
659                ConfluxAction::Enqueue(msg) => {
660                    // Tell the reactor to enqueue this msg
661                    return Ok(Some(CircuitCmd::Enqueue(msg)));
662                }
663            }
664        } else {
665            // If we don't have a conflux_handler, it means this circuit is not part of
666            // a conflux tunnel, so we can just process the message.
667            msg
668        };
669
670        self.handle_in_order_relay_msg(
671            handlers,
672            hopnum,
673            leg,
674            cell_counts_toward_windows,
675            streamid,
676            msg,
677        )
678    }
679
680    /// Handle a single incoming relay message that is known to be in order.
681    pub(super) fn handle_in_order_relay_msg(
682        &mut self,
683        handlers: &mut CellHandlers,
684        hopnum: HopNum,
685        leg: LegId,
686        cell_counts_toward_windows: bool,
687        streamid: StreamId,
688        msg: UnparsedRelayMsg,
689    ) -> Result<Option<CircuitCmd>> {
690        #[cfg(feature = "conflux")]
691        if let Some(conflux) = self.conflux_handler.as_mut() {
692            conflux.inc_last_seq_delivered(&msg);
693        }
694
695        let hop = self
696            .hop_mut(hopnum)
697            .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
698        let mut hop_map = hop.map.lock().expect("lock poisoned");
699        match hop_map.get_mut(streamid) {
700            Some(StreamEntMut::Open(ent)) => {
701                // Can't have a stream level SENDME when congestion control is enabled.
702                let message_closes_stream =
703                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
704
705                if message_closes_stream {
706                    hop_map.ending_msg_received(streamid)?;
707                }
708            }
709            #[cfg(feature = "hs-service")]
710            Some(StreamEntMut::EndSent(_))
711                if matches!(
712                    msg.cmd(),
713                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
714                ) =>
715            {
716                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
717                // message, just remove the old stream from the map and stop waiting for a
718                // response
719                hop_map.ending_msg_received(streamid)?;
720                drop(hop_map);
721                return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
722            }
723            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
724                // We sent an end but maybe the other side hasn't heard.
725
726                match half_stream.handle_msg(msg)? {
727                    StreamStatus::Open => {}
728                    StreamStatus::Closed => {
729                        hop_map.ending_msg_received(streamid)?;
730                    }
731                }
732            }
733            #[cfg(feature = "hs-service")]
734            None if matches!(
735                msg.cmd(),
736                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
737            ) =>
738            {
739                drop(hop_map);
740                return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
741            }
742            _ => {
743                // No stream wants this message, or ever did.
744                return Err(Error::CircProto(
745                    "Cell received on nonexistent stream!?".into(),
746                ));
747            }
748        }
749        Ok(None)
750    }
751
752    /// Handle a conflux message coming from the specified hop.
753    ///
754    /// Returns an error if
755    ///
756    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
757    ///   * this is a client circuit and the conflux message originated an unexpected hop
758    ///   * the cell was sent in violation of the handshake protocol
759    #[cfg(feature = "conflux")]
760    fn handle_conflux_msg(
761        &mut self,
762        hop: HopNum,
763        msg: UnparsedRelayMsg,
764    ) -> Result<Option<CircuitCmd>> {
765        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
766            // If conflux is not enabled, tear down the circuit
767            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
768            //
769            // TODO(conflux): make sure this is properly implemented
770            return Err(Error::CircProto(format!(
771                "Received {} cell from hop {} on non-conflux client circuit?!",
772                msg.cmd(),
773                hop.display(),
774            )));
775        };
776
777        Ok(conflux_handler.handle_conflux_msg(msg, hop))
778    }
779
780    /// For conflux: return the sequence number of the last cell sent on this leg.
781    ///
782    /// Returns an error if this circuit is not part of a conflux set.
783    #[cfg(feature = "conflux")]
784    pub(super) fn last_seq_sent(&self) -> Result<u64> {
785        let handler = self
786            .conflux_handler
787            .as_ref()
788            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
789
790        Ok(handler.last_seq_sent())
791    }
792
793    /// For conflux: return the sequence number of the last cell received on this leg.
794    ///
795    /// Returns an error if this circuit is not part of a conflux set.
796    #[cfg(feature = "conflux")]
797    pub(super) fn last_seq_recv(&self) -> Result<u64> {
798        let handler = self
799            .conflux_handler
800            .as_ref()
801            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
802
803        Ok(handler.last_seq_recv())
804    }
805
806    /// Deliver `msg` to the specified open stream entry `ent`.
807    fn deliver_msg_to_stream(
808        streamid: StreamId,
809        ent: &mut OpenStreamEnt,
810        cell_counts_toward_windows: bool,
811        msg: UnparsedRelayMsg,
812    ) -> Result<bool> {
813        // The stream for this message exists, and is open.
814
815        if msg.cmd() == RelayCmd::SENDME {
816            let _sendme = msg
817                .decode::<Sendme>()
818                .map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))?
819                .into_msg();
820
821            // We need to handle sendmes here, not in the stream's
822            // recv() method, or else we'd never notice them if the
823            // stream isn't reading.
824            ent.put_for_incoming_sendme()?;
825            return Ok(false);
826        }
827
828        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
829
830        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
831            if e.is_full() {
832                // If we get here, we either have a logic bug (!), or an attacker
833                // is sending us more cells than we asked for via congestion control.
834                return Err(Error::CircProto(format!(
835                    "Stream sink would block; received too many cells on stream ID {}",
836                    sv(streamid),
837                )));
838            }
839            if e.is_disconnected() && cell_counts_toward_windows {
840                // the other side of the stream has gone away; remember
841                // that we received a cell that we couldn't queue for it.
842                //
843                // Later this value will be recorded in a half-stream.
844                ent.dropped += 1;
845            }
846        }
847
848        Ok(message_closes_stream)
849    }
850
851    /// A helper for handling incoming stream requests.
852    #[cfg(feature = "hs-service")]
853    fn handle_incoming_stream_request(
854        &mut self,
855        handlers: &mut CellHandlers,
856        msg: UnparsedRelayMsg,
857        stream_id: StreamId,
858        hop_num: HopNum,
859        leg: LegId,
860    ) -> Result<Option<CircuitCmd>> {
861        use super::syncview::ClientCircSyncView;
862        use tor_cell::relaycell::msg::EndReason;
863        use tor_error::into_internal;
864        use tor_log_ratelim::log_ratelim;
865
866        use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
867
868        // We need to construct this early so that we don't double-borrow &mut self
869
870        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
871            return Err(Error::CircProto(
872                "Cannot handle BEGIN cells on this circuit".into(),
873            ));
874        };
875
876        if hop_num != handler.hop_num {
877            return Err(Error::CircProto(format!(
878                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
879                handler.hop_num.display(),
880                msg.cmd(),
881                hop_num.display()
882            )));
883        }
884
885        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
886
887        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
888        // have to look it up again! However, we can't pass the `&mut hop` reference from
889        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
890        // borrowing self as mutable more than once).
891        //
892        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
893        // handle_relay_cell to work around the problem described above
894        let hop = self
895            .hops
896            .get_mut(Into::<usize>::into(hop_num))
897            .ok_or(Error::CircuitClosed)?;
898
899        if message_closes_stream {
900            hop.map
901                .lock()
902                .expect("lock poisoned")
903                .ending_msg_received(stream_id)?;
904
905            return Ok(None);
906        }
907
908        let begin = msg
909            .decode::<Begin>()
910            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
911            .into_msg();
912
913        let req = IncomingStreamRequest::Begin(begin);
914
915        {
916            use crate::stream::IncomingStreamRequestDisposition::*;
917
918            let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
919            // IMPORTANT: ClientCircSyncView::n_open_streams() (called via disposition() below)
920            // accesses the stream map mutexes!
921            //
922            // This means it's very important not to call this function while any of the hop's
923            // stream map mutex is held.
924            let view = ClientCircSyncView::new(&self.hops);
925
926            match handler.filter.as_mut().disposition(&ctx, &view)? {
927                Accept => {}
928                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
929                RejectRequest(end) => {
930                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
931                    let cell = SendRelayCell {
932                        hop: hop_num,
933                        early: false,
934                        cell: end_msg,
935                    };
936                    return Ok(Some(CircuitCmd::Send(cell)));
937                }
938            }
939        }
940
941        // TODO: Sadly, we need to look up `&mut hop` yet again,
942        // since we needed to pass `&self.hops` by reference to our filter above. :(
943        let hop = self
944            .hops
945            .get_mut(Into::<usize>::into(hop_num))
946            .ok_or(Error::CircuitClosed)?;
947        let relay_cell_format = hop.relay_format;
948
949        let memquota = StreamAccount::new(&self.memquota)?;
950
951        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
952            self.chan_sender.as_inner().time_provider().clone(),
953            memquota.as_raw_account(),
954        )?;
955        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
956            self.chan_sender.as_inner().time_provider().clone(),
957            memquota.as_raw_account(),
958        )?;
959
960        let cmd_checker = DataCmdChecker::new_connected();
961        hop.map.lock().expect("lock poisoned").add_ent_with_id(
962            sender,
963            msg_rx,
964            hop.build_send_flow_ctrl(),
965            stream_id,
966            cmd_checker,
967        )?;
968
969        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
970            req,
971            stream_id,
972            hop_num,
973            leg,
974            msg_tx,
975            receiver,
976            memquota,
977            relay_cell_format,
978        });
979
980        log_ratelim!("Delivering message to incoming stream handler"; outcome);
981
982        if let Err(e) = outcome {
983            if e.is_full() {
984                // The IncomingStreamRequestHandler's stream is full; it isn't
985                // handling requests fast enough. So instead, we reply with an
986                // END cell.
987                let end_msg = AnyRelayMsgOuter::new(
988                    Some(stream_id),
989                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
990                );
991
992                let cell = SendRelayCell {
993                    hop: hop_num,
994                    early: false,
995                    cell: end_msg,
996                };
997                return Ok(Some(CircuitCmd::Send(cell)));
998            } else if e.is_disconnected() {
999                // The IncomingStreamRequestHandler's stream has been dropped.
1000                // In the Tor protocol as it stands, this always means that the
1001                // circuit itself is out-of-use and should be closed. (See notes
1002                // on `allow_stream_requests.`)
1003                //
1004                // Note that we will _not_ reach this point immediately after
1005                // the IncomingStreamRequestHandler is dropped; we won't hit it
1006                // until we next get an incoming request.  Thus, if we do later
1007                // want to add early detection for a dropped
1008                // IncomingStreamRequestHandler, we need to do it elsewhere, in
1009                // a different way.
1010                debug!(
1011                    "{}: Incoming stream request receiver dropped",
1012                    self.unique_id
1013                );
1014                // This will _cause_ the circuit to get closed.
1015                return Err(Error::CircuitClosed);
1016            } else {
1017                // There are no errors like this with the current design of
1018                // futures::mpsc, but we shouldn't just ignore the possibility
1019                // that they'll be added later.
1020                return Err(Error::from((into_internal!(
1021                    "try_send failed unexpectedly"
1022                ))(e)));
1023            }
1024        }
1025
1026        Ok(None)
1027    }
1028
1029    /// Helper: process a destroy cell.
1030    #[allow(clippy::unnecessary_wraps)]
1031    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
1032        // I think there is nothing more to do here.
1033        Ok(CircuitCmd::CleanShutdown)
1034    }
1035
1036    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
1037    pub(super) async fn handle_create(
1038        &mut self,
1039        recv_created: oneshot::Receiver<CreateResponse>,
1040        handshake: CircuitHandshake,
1041        params: &mut CircParameters,
1042        done: ReactorResultChannel<()>,
1043    ) -> StdResult<(), ReactorError> {
1044        let ret = match handshake {
1045            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, params).await,
1046            CircuitHandshake::Ntor {
1047                public_key,
1048                ed_identity,
1049            } => {
1050                self.create_firsthop_ntor(recv_created, ed_identity, public_key, params)
1051                    .await
1052            }
1053            CircuitHandshake::NtorV3 { public_key } => {
1054                self.create_firsthop_ntor_v3(recv_created, public_key, params)
1055                    .await
1056            }
1057        };
1058        let _ = done.send(ret); // don't care if sender goes away
1059
1060        // TODO: maybe we don't need to flush here?
1061        // (we could let run_once() handle all the flushing)
1062        self.chan_sender.flush().await?;
1063
1064        Ok(())
1065    }
1066
1067    /// Helper: create the first hop of a circuit.
1068    ///
1069    /// This is parameterized not just on the RNG, but a wrapper object to
1070    /// build the right kind of create cell, and a handshake object to perform
1071    /// the cryptographic handshake.
1072    async fn create_impl<H, W, M>(
1073        &mut self,
1074        cell_protocol: RelayCryptLayerProtocol,
1075        recvcreated: oneshot::Receiver<CreateResponse>,
1076        wrap: &W,
1077        key: &H::KeyType,
1078        params: &mut CircParameters,
1079        msg: &M,
1080    ) -> Result<()>
1081    where
1082        H: ClientHandshake + HandshakeAuxDataHandler,
1083        W: CreateHandshakeWrap,
1084        H::KeyGen: KeyGenerator,
1085        M: Borrow<H::ClientAuxData>,
1086    {
1087        // We don't need to shut down the circuit on failure here, since this
1088        // function consumes the PendingClientCirc and only returns
1089        // a ClientCirc on success.
1090
1091        let (state, msg) = {
1092            // done like this because holding the RNG across an await boundary makes the future
1093            // non-Send
1094            let mut rng = rand::rng();
1095            H::client1(&mut rng, key, msg)?
1096        };
1097        let create_cell = wrap.to_chanmsg(msg);
1098        trace!(
1099            "{}: Extending to hop 1 with {}",
1100            self.unique_id,
1101            create_cell.cmd()
1102        );
1103        self.send_msg(create_cell).await?;
1104
1105        let reply = recvcreated
1106            .await
1107            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1108
1109        let relay_handshake = wrap.decode_chanmsg(reply)?;
1110        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1111
1112        H::handle_server_aux_data(params, &server_msg)?;
1113
1114        let relay_cell_format = cell_protocol.relay_cell_format();
1115        let BoxedClientLayer { fwd, back, binding } =
1116            cell_protocol.construct_client_layers(HandshakeRole::Initiator, keygen)?;
1117
1118        trace!("{}: Handshake complete; circuit created.", self.unique_id);
1119
1120        let peer_id = self.channel.target().clone();
1121
1122        self.add_hop(
1123            relay_cell_format,
1124            path::HopDetail::Relay(peer_id),
1125            fwd,
1126            back,
1127            binding,
1128            params,
1129        )?;
1130        Ok(())
1131    }
1132
1133    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1134    /// first hop of this circuit.
1135    ///
1136    /// There's no authentication in CREATE_FAST,
1137    /// so we don't need to know whom we're connecting to: we're just
1138    /// connecting to whichever relay the channel is for.
1139    async fn create_firsthop_fast(
1140        &mut self,
1141        recvcreated: oneshot::Receiver<CreateResponse>,
1142        params: &mut CircParameters,
1143    ) -> Result<()> {
1144        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1145        let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1146        let wrap = CreateFastWrap;
1147        self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), params, &())
1148            .await
1149    }
1150
1151    /// Use the ntor handshake to connect to the first hop of this circuit.
1152    ///
1153    /// Note that the provided keys must match the channel's target,
1154    /// or the handshake will fail.
1155    async fn create_firsthop_ntor(
1156        &mut self,
1157        recvcreated: oneshot::Receiver<CreateResponse>,
1158        ed_identity: pk::ed25519::Ed25519Identity,
1159        pubkey: NtorPublicKey,
1160        params: &mut CircParameters,
1161    ) -> Result<()> {
1162        // In an ntor handshake, we can't negotiate a format other than this.
1163        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1164
1165        // Exit now if we have an Ed25519 or RSA identity mismatch.
1166        let target = RelayIds::builder()
1167            .ed_identity(ed_identity)
1168            .rsa_identity(pubkey.id)
1169            .build()
1170            .expect("Unable to build RelayIds");
1171        self.channel.check_match(&target)?;
1172
1173        let wrap = Create2Wrap {
1174            handshake_type: HandshakeType::NTOR,
1175        };
1176        self.create_impl::<NtorClient, _, _>(
1177            relay_cell_protocol,
1178            recvcreated,
1179            &wrap,
1180            &pubkey,
1181            params,
1182            &(),
1183        )
1184        .await
1185    }
1186
1187    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1188    ///
1189    /// Note that the provided key must match the channel's target,
1190    /// or the handshake will fail.
1191    async fn create_firsthop_ntor_v3(
1192        &mut self,
1193        recvcreated: oneshot::Receiver<CreateResponse>,
1194        pubkey: NtorV3PublicKey,
1195        params: &mut CircParameters,
1196    ) -> Result<()> {
1197        // Exit now if we have a mismatched key.
1198        let target = RelayIds::builder()
1199            .ed_identity(pubkey.id)
1200            .build()
1201            .expect("Unable to build RelayIds");
1202        self.channel.check_match(&target)?;
1203
1204        // TODO #1947: Add support for negotiating other formats.
1205        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1206
1207        // Set the client extensions.
1208        // allow 'unused_mut' because of the combinations of `cfg` conditions below
1209        #[allow(unused_mut)]
1210        let mut client_extensions = Vec::new();
1211
1212        if params.ccontrol.is_enabled() {
1213            cfg_if::cfg_if! {
1214                if #[cfg(feature = "flowctl-cc")] {
1215                    // TODO(arti#88): We have an `if false` in `exit_circparams_from_netparams`
1216                    // which should prevent the above `is_enabled()` from ever being true,
1217                    // even with the "flowctl-cc" feature enabled:
1218                    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
1219                    // The panic here is so that CI tests will hopefully catch if congestion
1220                    // control is unexpectedly enabled.
1221                    // We should remove this panic once xon/xoff flow is supported.
1222                    #[cfg(not(test))]
1223                    panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1224
1225                    #[allow(unreachable_code)]
1226                    client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1227                } else {
1228                    return Err(
1229                        internal!(
1230                            "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1231                        )
1232                        .into()
1233                    );
1234                }
1235            }
1236        }
1237
1238        let wrap = Create2Wrap {
1239            handshake_type: HandshakeType::NTOR_V3,
1240        };
1241
1242        self.create_impl::<NtorV3Client, _, _>(
1243            relay_cell_protocol,
1244            recvcreated,
1245            &wrap,
1246            &pubkey,
1247            params,
1248            &client_extensions,
1249        )
1250        .await
1251    }
1252
1253    /// Add a hop to the end of this circuit.
1254    ///
1255    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1256    pub(super) fn add_hop(
1257        &mut self,
1258        format: RelayCellFormat,
1259        peer_id: path::HopDetail,
1260        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1261        rev: Box<dyn InboundClientLayer + 'static + Send>,
1262        binding: Option<CircuitBinding>,
1263        params: &CircParameters,
1264    ) -> StdResult<(), Bug> {
1265        let hop_num = self.hops.len();
1266        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1267
1268        // There are several places in the code that assume that a `usize` hop number
1269        // can be cast or converted to a `u8` hop number,
1270        // so this check is important to prevent panics or incorrect behaviour.
1271        if hop_num == usize::from(u8::MAX) {
1272            return Err(internal!(
1273                "cannot add more hops to a circuit with `u8::MAX` hops"
1274            ));
1275        }
1276
1277        let hop_num = (hop_num as u8).into();
1278
1279        let hop = CircHop::new(self.unique_id, hop_num, format, params);
1280        self.hops.push(hop);
1281        self.crypto_in.add_layer(rev);
1282        self.crypto_out.add_layer(fwd);
1283        self.mutable.add_hop(peer_id, binding);
1284
1285        Ok(())
1286    }
1287
1288    /// Handle a RELAY cell on this circuit with stream ID 0.
1289    #[allow(clippy::cognitive_complexity)]
1290    fn handle_meta_cell(
1291        &mut self,
1292        handlers: &mut CellHandlers,
1293        hopnum: HopNum,
1294        msg: UnparsedRelayMsg,
1295    ) -> Result<Option<CircuitCmd>> {
1296        // SENDME cells and TRUNCATED get handled internally by the circuit.
1297
1298        // TODO: This pattern (Check command, try to decode, map error) occurs
1299        // several times, and would be good to extract simplify. Such
1300        // simplification is obstructed by a couple of factors: First, that
1301        // there is not currently a good way to get the RelayCmd from _type_ of
1302        // a RelayMsg.  Second, that decode() [correctly] consumes the
1303        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
1304        // for it. -nickm
1305        if msg.cmd() == RelayCmd::SENDME {
1306            let sendme = msg
1307                .decode::<Sendme>()
1308                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1309                .into_msg();
1310
1311            return Ok(Some(CircuitCmd::HandleSendMe {
1312                hop: hopnum,
1313                sendme,
1314            }));
1315        }
1316        if msg.cmd() == RelayCmd::TRUNCATED {
1317            let truncated = msg
1318                .decode::<Truncated>()
1319                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1320                .into_msg();
1321            let reason = truncated.reason();
1322            debug!(
1323                "{}: Truncated from hop {}. Reason: {} [{}]",
1324                self.unique_id,
1325                hopnum.display(),
1326                reason.human_str(),
1327                reason
1328            );
1329
1330            return Ok(Some(CircuitCmd::CleanShutdown));
1331        }
1332
1333        trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
1334
1335        #[cfg(feature = "conflux")]
1336        if matches!(
1337            msg.cmd(),
1338            RelayCmd::CONFLUX_LINK
1339                | RelayCmd::CONFLUX_LINKED
1340                | RelayCmd::CONFLUX_LINKED_ACK
1341                | RelayCmd::CONFLUX_SWITCH
1342        ) {
1343            return self.handle_conflux_msg(hopnum, msg);
1344        }
1345
1346        if self.is_conflux_pending() {
1347            warn!(
1348                "{}: received unexpected cell {msg:?} on unlinked conflux circuit",
1349                self.unique_id,
1350            );
1351            return Err(Error::CircProto(
1352                "Received unexpected cell on unlinked circuit".into(),
1353            ));
1354        }
1355
1356        // For all other command types, we'll only get them in response
1357        // to another command, which should have registered a responder.
1358        //
1359        // TODO:(conflux): should the conflux state machine be a meta cell handler?
1360        // We'd need to add support for multiple meta handlers, and change the
1361        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
1362        // (because some cells will require sending a response)
1363        if let Some(mut handler) = handlers.meta_handler.take() {
1364            if handler.expected_hop() == hopnum {
1365                // Somebody was waiting for a message -- maybe this message
1366                let ret = handler.handle_msg(msg, self);
1367                trace!(
1368                    "{}: meta handler completed with result: {:?}",
1369                    self.unique_id,
1370                    ret
1371                );
1372                match ret {
1373                    #[cfg(feature = "send-control-msg")]
1374                    Ok(MetaCellDisposition::Consumed) => {
1375                        handlers.meta_handler = Some(handler);
1376                        Ok(None)
1377                    }
1378                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1379                    #[cfg(feature = "send-control-msg")]
1380                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1381                    Err(e) => Err(e),
1382                }
1383            } else {
1384                // Somebody wanted a message from a different hop!  Put this
1385                // one back.
1386                handlers.meta_handler = Some(handler);
1387
1388                unsupported_client_cell!(msg, hopnum)
1389            }
1390        } else {
1391            // No need to call shutdown here, since this error will
1392            // propagate to the reactor shut it down.
1393            unsupported_client_cell!(msg)
1394        }
1395    }
1396
1397    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1398    pub(super) fn handle_sendme(
1399        &mut self,
1400        hopnum: HopNum,
1401        msg: Sendme,
1402        signals: CongestionSignals,
1403    ) -> Result<Option<CircuitCmd>> {
1404        // No need to call "shutdown" on errors in this function;
1405        // it's called from the reactor task and errors will propagate there.
1406        let hop = self
1407            .hop_mut(hopnum)
1408            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1409
1410        let tag = msg.into_sendme_tag().ok_or_else(||
1411                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1412                // but we don't support those any longer.
1413                 Error::CircProto("missing tag on circuit sendme".into()))?;
1414        // Update the CC object that we received a SENDME along with possible congestion signals.
1415        hop.ccontrol.note_sendme_received(tag, signals)?;
1416        Ok(None)
1417    }
1418
1419    /// Send a message onto the circuit's channel.
1420    ///
1421    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1422    /// will be enqueued for sending at a later iteration of the reactor loop.
1423    ///
1424    /// # Note
1425    ///
1426    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1427    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1428    /// ideally use this to implement backpressure (such that you do not read from other sources
1429    /// that would send here while you know you're unable to forward the messages on).
1430    async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1431        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1432        // Note: this future is always `Ready`, so await won't block.
1433        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1434        Ok(())
1435    }
1436
1437    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
1438    ///
1439    /// The iterator contains at most one [`CircuitCmd`] for each hop,
1440    /// representing the instructions for handling the ready-item, if any,
1441    /// of its highest priority stream.
1442    ///
1443    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
1444    /// To avoid contention, never create more than one [`Circuit::ready_streams_iterator`]
1445    /// stream at a time!
1446    ///
1447    /// This is cancellation-safe.
1448    pub(super) fn ready_streams_iterator(&self) -> impl Stream<Item = Result<CircuitCmd>> {
1449        self.hops
1450            .iter()
1451            .enumerate()
1452            .filter_map(|(i, hop)| {
1453                if !hop.ccontrol.can_send() {
1454                    // We can't send anything on this hop that counts towards SENDME windows.
1455                    //
1456                    // In theory we could send messages that don't count towards
1457                    // windows (like `RESOLVE`), and process end-of-stream
1458                    // events (to send an `END`), but it's probably not worth
1459                    // doing an O(N) iteration over flow-control-ready streams
1460                    // to see if that's the case.
1461                    //
1462                    // This *doesn't* block outgoing flow-control messages (e.g.
1463                    // SENDME), which are initiated via the control-message
1464                    // channel, handled above.
1465                    //
1466                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
1467                    // congestion control has "bottomed out" might not be so bad, and the
1468                    // alternatives have complexity and/or performance costs.
1469                    return None;
1470                }
1471
1472                let hop_num = HopNum::from(i as u8);
1473                let hop_map = Arc::clone(&self.hops[i].map);
1474                Some(futures::future::poll_fn(move |cx| {
1475                    // Process an outbound message from the first ready stream on
1476                    // this hop. The stream map implements round robin scheduling to
1477                    // ensure fairness across streams.
1478                    // TODO: Consider looping here to process multiple ready
1479                    // streams. Need to be careful though to balance that with
1480                    // continuing to service incoming and control messages.
1481                    let mut hop_map = hop_map.lock().expect("lock poisoned");
1482                    let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
1483                        // No ready streams for this hop.
1484                        return Poll::Pending;
1485                    };
1486
1487                    if msg.is_none() {
1488                        return Poll::Ready(Ok(CircuitCmd::CloseStream {
1489                            hop: hop_num,
1490                            sid,
1491                            behav: CloseStreamBehavior::default(),
1492                            reason: streammap::TerminateReason::StreamTargetClosed,
1493                        }));
1494                    };
1495                    let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
1496
1497                    #[allow(unused)] // unused in non-debug builds
1498                    let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
1499                        panic!("Stream {sid} disappeared");
1500                    };
1501
1502                    debug_assert!(
1503                        s.can_send(&msg),
1504                        "Stream {sid} produced a message it can't send: {msg:?}"
1505                    );
1506
1507                    let cell = SendRelayCell {
1508                        hop: hop_num,
1509                        early: false,
1510                        cell: AnyRelayMsgOuter::new(Some(sid), msg),
1511                    };
1512                    Poll::Ready(Ok(CircuitCmd::Send(cell)))
1513                }))
1514            })
1515            .collect::<FuturesUnordered<_>>()
1516    }
1517
1518    /// Return the congestion signals for this reactor. This is used by congestion control module.
1519    ///
1520    /// Note: This is only async because we need a Context to check the sink for readiness.
1521    pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1522        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1523            Poll::Ready(CongestionSignals::new(
1524                self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1525                self.chan_sender.n_queued(),
1526            ))
1527        })
1528        .await
1529    }
1530
1531    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
1532    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1533        self.hops.get(Into::<usize>::into(hopnum))
1534    }
1535
1536    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1537    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1538        self.hops.get_mut(Into::<usize>::into(hopnum))
1539    }
1540
1541    /// Begin a stream with the provided hop in this circuit.
1542    pub(super) fn begin_stream(
1543        &mut self,
1544        hop_num: HopNum,
1545        message: AnyRelayMsg,
1546        sender: StreamMpscSender<UnparsedRelayMsg>,
1547        rx: StreamMpscReceiver<AnyRelayMsg>,
1548        cmd_checker: AnyCmdChecker,
1549    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1550        let Some(hop) = self.hop_mut(hop_num) else {
1551            return Err(internal!(
1552                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1553                self.unique_id,
1554            ));
1555        };
1556
1557        Ok(hop.begin_stream(message, sender, rx, cmd_checker))
1558    }
1559
1560    /// Close the specified stream
1561    pub(super) async fn close_stream(
1562        &mut self,
1563        hop_num: HopNum,
1564        sid: StreamId,
1565        behav: CloseStreamBehavior,
1566        reason: streammap::TerminateReason,
1567    ) -> Result<()> {
1568        if let Some(hop) = self.hop_mut(hop_num) {
1569            let res = hop.close_stream(sid, behav, reason)?;
1570            if let Some(cell) = res {
1571                self.send_relay_cell(cell).await?;
1572            }
1573        }
1574        Ok(())
1575    }
1576
1577    /// Returns true if there are any streams on this circuit
1578    ///
1579    /// Important: this function locks the stream map of its each of the [`CircHop`]s
1580    /// in this circuit, so it must **not** be called from any function where the
1581    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
1582    pub(super) fn has_streams(&self) -> bool {
1583        self.hops
1584            .iter()
1585            .any(|hop| hop.map.lock().expect("lock poisoned").n_open_streams() > 0)
1586    }
1587
1588    /// The number of hops in this circuit.
1589    pub(super) fn num_hops(&self) -> u8 {
1590        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
1591        // so `self.hops.len()` should be safe to cast to a `u8`.
1592        // If that assumption is violated,
1593        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1594        self.hops
1595            .len()
1596            .try_into()
1597            .expect("`hops.len()` has more than `u8::MAX` hops")
1598    }
1599
1600    /// Check whether this circuit has any hops.
1601    pub(super) fn has_hops(&self) -> bool {
1602        !self.hops.is_empty()
1603    }
1604
1605    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
1606    ///
1607    /// Returns `None` if the circuit has no hops.
1608    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1609        let num_hops = self.num_hops();
1610        if num_hops == 0 {
1611            // asked for the last hop, but there are no hops
1612            return None;
1613        }
1614        Some(HopNum::from(num_hops - 1))
1615    }
1616
1617    /// Get the path of the circuit.
1618    ///
1619    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
1620    pub(super) fn path(&self) -> Arc<path::Path> {
1621        self.mutable.path()
1622    }
1623
1624    /// Return a ClockSkew declaring how much clock skew the other side of this channel
1625    /// claimed that we had when we negotiated the connection.
1626    pub(super) fn clock_skew(&self) -> ClockSkew {
1627        self.channel.clock_skew()
1628    }
1629
1630    /// Does congestion control use stream SENDMEs for the given `hop`?
1631    ///
1632    /// Returns `None` if `hop` doesn't exist.
1633    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1634        let hop = self.hop(hop)?;
1635        Some(hop.ccontrol.uses_stream_sendme())
1636    }
1637
1638    /// Returns whether this is a conflux circuit that is not linked yet.
1639    pub(super) fn is_conflux_pending(&self) -> bool {
1640        let Some(status) = self.conflux_status() else {
1641            return false;
1642        };
1643
1644        status != ConfluxStatus::Linked
1645    }
1646
1647    /// Returns the conflux status of this circuit.
1648    ///
1649    /// Returns `None` if this is not a conflux circuit.
1650    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1651        cfg_if::cfg_if! {
1652            if #[cfg(feature = "conflux")] {
1653                self.conflux_handler
1654                    .as_ref()
1655                    .map(|handler| handler.status())
1656            } else {
1657                None
1658            }
1659        }
1660    }
1661
1662    /// Returns initial RTT on this leg, measured in the conflux handshake.
1663    #[cfg(feature = "conflux")]
1664    pub(super) fn init_rtt(&self) -> Option<Duration> {
1665        self.conflux_handler
1666            .as_ref()
1667            .map(|handler| handler.init_rtt())?
1668    }
1669}
1670
1671/// The conflux status of a conflux [`Circuit`].
1672#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1673pub(super) enum ConfluxStatus {
1674    /// Circuit has not begun the conflux handshake yet.
1675    Unlinked,
1676    /// Conflux handshake is in progress.
1677    Pending,
1678    /// A linked conflux circuit.
1679    Linked,
1680}
1681
1682/// Return the stream ID of `msg`, if it has one.
1683///
1684/// Returns `Ok(None)` if `msg` is a meta cell.
1685fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1686    let cmd = msg.cmd();
1687    let streamid = msg.stream_id();
1688    if !cmd.accepts_streamid_val(streamid) {
1689        return Err(Error::CircProto(format!(
1690            "Invalid stream ID {} for relay command {}",
1691            sv(StreamId::get_or_zero(streamid)),
1692            msg.cmd()
1693        )));
1694    }
1695
1696    Ok(streamid)
1697}
1698
1699impl Drop for Circuit {
1700    fn drop(&mut self) {
1701        let _ = self.channel.close_circuit(self.channel_id);
1702    }
1703}
1704
1705impl CircHop {
1706    /// Create a new hop.
1707    pub(super) fn new(
1708        unique_id: UniqId,
1709        hop_num: HopNum,
1710        relay_format: RelayCellFormat,
1711        params: &CircParameters,
1712    ) -> Self {
1713        CircHop {
1714            unique_id,
1715            hop_num,
1716            map: Arc::new(Mutex::new(streammap::StreamMap::new())),
1717            ccontrol: CongestionControl::new(&params.ccontrol),
1718            inbound: RelayCellDecoder::new(relay_format),
1719            relay_format,
1720        }
1721    }
1722
1723    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
1724    /// `message` to the provided hop.
1725    pub(crate) fn begin_stream(
1726        &mut self,
1727        message: AnyRelayMsg,
1728        sender: StreamMpscSender<UnparsedRelayMsg>,
1729        rx: StreamMpscReceiver<AnyRelayMsg>,
1730        cmd_checker: AnyCmdChecker,
1731    ) -> Result<(SendRelayCell, StreamId)> {
1732        let flow_ctrl = self.build_send_flow_ctrl();
1733        let r =
1734            self.map
1735                .lock()
1736                .expect("lock poisoned")
1737                .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
1738        let cell = AnyRelayMsgOuter::new(Some(r), message);
1739        Ok((
1740            SendRelayCell {
1741                hop: self.hop_num,
1742                early: false,
1743                cell,
1744            },
1745            r,
1746        ))
1747    }
1748
1749    /// Close the stream associated with `id` because the stream was
1750    /// dropped.
1751    ///
1752    /// If we have not already received an END cell on this stream, send one.
1753    /// If no END cell is specified, an END cell with the reason byte set to
1754    /// REASON_MISC will be sent.
1755    fn close_stream(
1756        &mut self,
1757        id: StreamId,
1758        message: CloseStreamBehavior,
1759        why: streammap::TerminateReason,
1760    ) -> Result<Option<SendRelayCell>> {
1761        let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
1762        trace!(
1763            "{}: Ending stream {}; should_send_end={:?}",
1764            self.unique_id,
1765            id,
1766            should_send_end
1767        );
1768        // TODO: I am about 80% sure that we only send an END cell if
1769        // we didn't already get an END cell.  But I should double-check!
1770        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
1771            (should_send_end, message)
1772        {
1773            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
1774            let cell = SendRelayCell {
1775                hop: self.hop_num,
1776                early: false,
1777                cell: end_cell,
1778            };
1779
1780            return Ok(Some(cell));
1781        }
1782        Ok(None)
1783    }
1784
1785    /// Return the format that is used for relay cells sent to this hop.
1786    ///
1787    /// For the most part, this format isn't necessary to interact with a CircHop;
1788    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
1789    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
1790        self.relay_format
1791    }
1792
1793    /// Builds the (sending) flow control handler for a new stream.
1794    fn build_send_flow_ctrl(&self) -> StreamSendFlowControl {
1795        if self.ccontrol.uses_stream_sendme() {
1796            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
1797            StreamSendFlowControl::new_window_based(window)
1798        } else {
1799            StreamSendFlowControl::new_xon_xoff_based()
1800        }
1801    }
1802
1803    /// Delegate to CongestionControl, for testing purposes
1804    #[cfg(test)]
1805    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
1806        self.ccontrol.send_window_and_expected_tags()
1807    }
1808
1809    /// Return the number of open streams on this hop.
1810    ///
1811    /// WARNING: because this locks the stream map mutex,
1812    /// it should never be called from a context where that mutex is already locked.
1813    pub(super) fn n_open_streams(&self) -> usize {
1814        self.map.lock().expect("lock poisoned").n_open_streams()
1815    }
1816
1817    /// Return a reference to our CongestionControl object.
1818    pub(crate) fn ccontrol(&self) -> &CongestionControl {
1819        &self.ccontrol
1820    }
1821}