1pub(crate) mod circhop;
4pub(super) mod create;
5pub(super) mod extender;
6
7use crate::channel::{Channel, ChannelSender};
8#[cfg(feature = "counter-galois-onion")]
9use crate::circuit::handshake::RelayCryptLayerProtocol;
10use crate::circuit::HopSettings;
11use crate::congestion::sendme;
12use crate::congestion::CongestionSignals;
13use crate::crypto::binding::CircuitBinding;
14use crate::crypto::cell::{
15 HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
16 RelayCellBody,
17};
18use crate::crypto::handshake::fast::CreateFastClient;
19use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
20use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
21use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
22use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
23use crate::stream::queue::{stream_queue, StreamQueueSender};
24use crate::stream::{AnyCmdChecker, DrainRateRequest, StreamRateLimit, StreamStatus};
25use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
26use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
27use crate::tunnel::circuit::path;
28use crate::tunnel::circuit::unique_id::UniqId;
29use crate::tunnel::circuit::{CircuitRxReceiver, MutableState, StreamMpscReceiver};
30use crate::tunnel::reactor::MetaCellDisposition;
31use crate::tunnel::streammap;
32use crate::tunnel::TunnelScopedCircId;
33use crate::util::err::ReactorError;
34use crate::util::notify::NotifySender;
35use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
36use crate::util::SinkExt as _;
37use crate::{ClockSkew, Error, Result};
38
39use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
40use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
41use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
42use tor_cell::chancell::{BoxedCellBody, ChanMsg};
43use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
44use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
45use tor_cell::relaycell::{
46 AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
47};
48use tor_error::{internal, Bug};
49use tor_linkspec::RelayIds;
50use tor_llcrypto::pk;
51use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
52
53use futures::{SinkExt as _, Stream};
54use oneshot_fused_workaround as oneshot;
55use postage::watch;
56use safelog::sensitive as sv;
57use tor_rtcompat::DynTimeProvider;
58use tracing::{debug, trace, warn};
59
60use super::{
61 CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
62};
63
64use std::borrow::Borrow;
65use std::pin::Pin;
66use std::result::Result as StdResult;
67use std::sync::Arc;
68use std::task::Poll;
69use std::time::{Duration, SystemTime};
70
71use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
72use extender::HandshakeAuxDataHandler;
73
74#[cfg(feature = "hs-service")]
75use {
76 crate::stream::{DataCmdChecker, IncomingStreamRequest},
77 tor_cell::relaycell::msg::Begin,
78};
79
80#[cfg(feature = "conflux")]
81use {
82 super::conflux::ConfluxMsgHandler,
83 super::conflux::{ConfluxAction, OooRelayMsg},
84 crate::tunnel::reactor::RemoveLegReason,
85 crate::tunnel::TunnelId,
86};
87
88pub(super) use circhop::{CircHop, CircHopList};
89
90pub(super) const SEND_WINDOW_INIT: u16 = 500;
92pub(crate) const RECV_WINDOW_INIT: u16 = 500;
94pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
100
101pub(crate) struct Circuit {
106 runtime: DynTimeProvider,
108 channel: Arc<Channel>,
110 pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
115 pub(super) input: CircuitRxReceiver,
120 crypto_in: InboundClientCrypt,
124 crypto_out: OutboundClientCrypt,
126 hops: CircHopList,
128 mutable: Arc<MutableState>,
131 channel_id: CircId,
133 unique_id: TunnelScopedCircId,
135 #[cfg(feature = "conflux")]
140 conflux_handler: Option<ConfluxMsgHandler>,
141 #[allow(dead_code)] memquota: CircuitAccount,
144}
145
146#[derive(Debug)]
154pub(super) enum CircuitCmd {
155 Send(SendRelayCell),
157 HandleSendMe {
159 hop: HopNum,
161 sendme: Sendme,
163 },
164 CloseStream {
166 hop: HopNum,
168 sid: StreamId,
170 behav: CloseStreamBehavior,
172 reason: streammap::TerminateReason,
174 },
175 #[cfg(feature = "conflux")]
181 ConfluxRemove(RemoveLegReason),
182 #[cfg(feature = "conflux")]
188 ConfluxHandshakeComplete(SendRelayCell),
189 CleanShutdown,
191 #[cfg(feature = "conflux")]
193 Enqueue(OooRelayMsg),
194}
195
196macro_rules! unsupported_client_cell {
203 ($msg:expr) => {{
204 unsupported_client_cell!(@ $msg, "")
205 }};
206
207 ($msg:expr, $hopnum:expr) => {{
208 let hop: HopNum = $hopnum;
209 let hop_display = format!(" from hop {}", hop.display());
210 unsupported_client_cell!(@ $msg, hop_display)
211 }};
212
213 (@ $msg:expr, $hopnum_display:expr) => {
214 Err(crate::Error::CircProto(format!(
215 "Unexpected {} cell{} on client circuit",
216 $msg.cmd(),
217 $hopnum_display,
218 )))
219 };
220}
221
222pub(super) use unsupported_client_cell;
223
224impl Circuit {
225 pub(super) fn new(
227 runtime: DynTimeProvider,
228 channel: Arc<Channel>,
229 channel_id: CircId,
230 unique_id: TunnelScopedCircId,
231 input: CircuitRxReceiver,
232 memquota: CircuitAccount,
233 mutable: Arc<MutableState>,
234 ) -> Self {
235 let chan_sender = SometimesUnboundedSink::new(channel.sender());
236
237 let crypto_out = OutboundClientCrypt::new();
238 Circuit {
239 runtime,
240 channel,
241 chan_sender,
242 input,
243 crypto_in: InboundClientCrypt::new(),
244 hops: CircHopList::default(),
245 unique_id,
246 channel_id,
247 crypto_out,
248 mutable,
249 #[cfg(feature = "conflux")]
250 conflux_handler: None,
251 memquota,
252 }
253 }
254
255 pub(super) fn unique_id(&self) -> UniqId {
257 self.unique_id.unique_id()
258 }
259
260 pub(super) fn mutable(&self) -> &Arc<MutableState> {
262 &self.mutable
263 }
264
265 #[cfg(feature = "conflux")]
270 pub(super) fn add_to_conflux_tunnel(
271 &mut self,
272 tunnel_id: TunnelId,
273 conflux_handler: ConfluxMsgHandler,
274 ) {
275 self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
276 self.conflux_handler = Some(conflux_handler);
277 }
278
279 #[cfg(feature = "conflux")]
284 pub(super) async fn begin_conflux_link(
285 &mut self,
286 hop: HopNum,
287 cell: AnyRelayMsgOuter,
288 runtime: &tor_rtcompat::DynTimeProvider,
289 ) -> Result<()> {
290 use tor_rtcompat::SleepProvider as _;
291
292 if self.conflux_handler.is_none() {
293 return Err(internal!(
294 "tried to send LINK cell before installing a ConfluxMsgHandler?!"
295 )
296 .into());
297 }
298
299 let cell = SendRelayCell {
300 hop,
301 early: false,
302 cell,
303 };
304 self.send_relay_cell(cell).await?;
305
306 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
307 return Err(internal!("ConfluxMsgHandler disappeared?!").into());
308 };
309
310 Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
311 }
312
313 pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
317 cfg_if::cfg_if! {
318 if #[cfg(feature = "conflux")] {
319 self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
320 } else {
321 None
322 }
323 }
324 }
325
326 #[cfg(test)]
328 pub(super) fn handle_add_fake_hop(
329 &mut self,
330 format: RelayCellFormat,
331 fwd_lasthop: bool,
332 rev_lasthop: bool,
333 dummy_peer_id: path::HopDetail,
334 params: &crate::circuit::CircParameters,
338 done: ReactorResultChannel<()>,
339 ) {
340 use tor_protover::{named, Protocols};
341
342 use crate::tunnel::circuit::test::DummyCrypto;
343
344 assert!(matches!(format, RelayCellFormat::V0));
345 let _ = format; let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
348 let rev = Box::new(DummyCrypto::new(rev_lasthop));
349 let binding = None;
350
351 let settings = HopSettings::from_params_and_caps(
352 crate::circuit::HopNegotiationType::Full,
354 params,
355 &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
356 )
357 .expect("Can't construct HopSettings");
358 self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
359 .expect("could not add hop to circuit");
360 let _ = done.send(Ok(()));
361 }
362
363 fn encode_relay_cell(
367 crypto_out: &mut OutboundClientCrypt,
368 relay_format: RelayCellFormat,
369 hop: HopNum,
370 early: bool,
371 msg: AnyRelayMsgOuter,
372 ) -> Result<(AnyChanMsg, SendmeTag)> {
373 let mut body: RelayCellBody = msg
374 .encode(relay_format, &mut rand::rng())
375 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
376 .into();
377 let cmd = if early {
378 ChanCmd::RELAY_EARLY
379 } else {
380 ChanCmd::RELAY
381 };
382 let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
383 let msg = Relay::from(BoxedCellBody::from(body));
384 let msg = if early {
385 AnyChanMsg::RelayEarly(msg.into())
386 } else {
387 AnyChanMsg::Relay(msg)
388 };
389
390 Ok((msg, tag))
391 }
392
393 pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
404 let SendRelayCell {
405 hop,
406 early,
407 cell: msg,
408 } = msg;
409
410 let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
411 if !is_conflux_link && self.is_conflux_pending() {
412 return Err(internal!("tried to send cell on unlinked circuit").into());
415 }
416
417 trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
418
419 let runtime = self.runtime.clone();
421 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
422 let stream_id = msg.stream_id();
423 let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
424
425 circhop.decrement_outbound_cell_limit()?;
430
431 if c_t_w {
433 if let Some(stream_id) = stream_id {
434 circhop.take_capacity_to_send(stream_id, msg.msg())?;
435 }
436 }
437
438 let relay_cmd = msg.cmd();
442
443 let (msg, tag) = Self::encode_relay_cell(
446 &mut self.crypto_out,
447 circhop.relay_format(),
448 hop,
449 early,
450 msg,
451 )?;
452 if c_t_w {
455 circhop.ccontrol_mut().note_data_sent(&runtime, &tag)?;
456 }
457
458 let cell = AnyChanCell::new(Some(self.channel_id), msg);
459 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
460
461 #[cfg(feature = "conflux")]
462 if let Some(conflux) = self.conflux_handler.as_mut() {
463 conflux.note_cell_sent(relay_cmd);
464 }
465
466 Ok(())
467 }
468
469 pub(super) fn handle_cell(
482 &mut self,
483 handlers: &mut CellHandlers,
484 leg: UniqId,
485 cell: ClientCircChanMsg,
486 ) -> Result<Vec<CircuitCmd>> {
487 trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
488 use ClientCircChanMsg::*;
489 match cell {
490 Relay(r) => self.handle_relay_cell(handlers, leg, r),
491 Destroy(d) => {
492 let reason = d.reason();
493 debug!(
494 circ_id = %self.unique_id,
495 "Received DESTROY cell. Reason: {} [{}]",
496 reason.human_str(),
497 reason
498 );
499
500 self.handle_destroy_cell().map(|c| vec![c])
501 }
502 }
503 }
504
505 fn decode_relay_cell(
508 &mut self,
509 cell: Relay,
510 ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
511 let cmd = cell.cmd();
513 let mut body = cell.into_relay_body().into();
514
515 let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
518
519 let decode_res = self
521 .hop_mut(hopnum)
522 .ok_or_else(|| {
523 Error::from(internal!(
524 "Trying to decode cell from nonexistent hop {:?}",
525 hopnum
526 ))
527 })?
528 .decode(body.into())?;
529
530 Ok((hopnum, tag, decode_res))
531 }
532
533 fn handle_relay_cell(
535 &mut self,
536 handlers: &mut CellHandlers,
537 leg: UniqId,
538 cell: Relay,
539 ) -> Result<Vec<CircuitCmd>> {
540 let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
541
542 self.hop_mut(hopnum)
544 .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
545 .decrement_inbound_cell_limit()?;
546
547 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
548
549 let send_circ_sendme = if c_t_w {
552 self.hop_mut(hopnum)
553 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
554 .ccontrol_mut()
555 .note_data_received()?
556 } else {
557 false
558 };
559
560 let mut circ_cmds = vec![];
561 if send_circ_sendme {
563 let sendme = Sendme::from(tag);
568 let cell = AnyRelayMsgOuter::new(None, sendme.into());
569 circ_cmds.push(CircuitCmd::Send(SendRelayCell {
570 hop: hopnum,
571 early: false,
572 cell,
573 }));
574
575 self.hop_mut(hopnum)
577 .ok_or_else(|| {
578 Error::from(internal!(
579 "Trying to send SENDME to nonexistent hop {:?}",
580 hopnum
581 ))
582 })?
583 .ccontrol_mut()
584 .note_sendme_sent()?;
585 }
586
587 let (mut msgs, incomplete) = decode_res.into_parts();
588 while let Some(msg) = msgs.next() {
589 let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
590
591 match msg_status {
592 None => continue,
593 Some(msg @ CircuitCmd::CleanShutdown) => {
594 for m in msgs {
595 debug!(
596 "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
597 id = self.unique_id
598 );
599 }
600 if let Some(incomplete) = incomplete {
601 debug!(
602 "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
603 incomplete,
604 id=self.unique_id,
605 );
606 }
607 circ_cmds.push(msg);
608 return Ok(circ_cmds);
609 }
610 Some(msg) => {
611 circ_cmds.push(msg);
612 }
613 }
614 }
615
616 Ok(circ_cmds)
617 }
618
619 fn handle_relay_msg(
621 &mut self,
622 handlers: &mut CellHandlers,
623 hopnum: HopNum,
624 leg: UniqId,
625 cell_counts_toward_windows: bool,
626 msg: UnparsedRelayMsg,
627 ) -> Result<Option<CircuitCmd>> {
628 let streamid = msg_streamid(&msg)?;
631
632 let Some(streamid) = streamid else {
635 return self.handle_meta_cell(handlers, hopnum, msg);
636 };
637
638 #[cfg(feature = "conflux")]
639 let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
640 match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
641 ConfluxAction::Deliver(msg) => {
642 msg
649 }
650 ConfluxAction::Enqueue(msg) => {
651 return Ok(Some(CircuitCmd::Enqueue(msg)));
653 }
654 }
655 } else {
656 msg
659 };
660
661 self.handle_in_order_relay_msg(
662 handlers,
663 hopnum,
664 leg,
665 cell_counts_toward_windows,
666 streamid,
667 msg,
668 )
669 }
670
671 pub(super) fn handle_in_order_relay_msg(
673 &mut self,
674 handlers: &mut CellHandlers,
675 hopnum: HopNum,
676 leg: UniqId,
677 cell_counts_toward_windows: bool,
678 streamid: StreamId,
679 msg: UnparsedRelayMsg,
680 ) -> Result<Option<CircuitCmd>> {
681 #[cfg(feature = "conflux")]
682 if let Some(conflux) = self.conflux_handler.as_mut() {
683 conflux.inc_last_seq_delivered(&msg);
684 }
685
686 let hop = self
689 .hop_mut(hopnum)
690 .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
691 let res = hop.handle_msg(cell_counts_toward_windows, streamid, msg)?;
692
693 if let Some(msg) = res {
696 cfg_if::cfg_if! {
697 if #[cfg(feature = "hs-service")] {
698 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
699 } else {
700 return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
701 }
702 }
703 }
704
705 if let Some(cell) = hop.maybe_send_xoff(streamid)? {
707 let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
708 let cell = SendRelayCell {
709 hop: hopnum,
710 early: false,
711 cell,
712 };
713 return Ok(Some(CircuitCmd::Send(cell)));
714 }
715
716 Ok(None)
717 }
718
719 #[cfg(feature = "conflux")]
727 fn handle_conflux_msg(
728 &mut self,
729 hop: HopNum,
730 msg: UnparsedRelayMsg,
731 ) -> Result<Option<CircuitCmd>> {
732 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
733 return Err(Error::CircProto(format!(
736 "Received {} cell from hop {} on non-conflux client circuit?!",
737 msg.cmd(),
738 hop.display(),
739 )));
740 };
741
742 Ok(conflux_handler.handle_conflux_msg(msg, hop))
743 }
744
745 #[cfg(feature = "conflux")]
749 pub(super) fn last_seq_sent(&self) -> Result<u64> {
750 let handler = self
751 .conflux_handler
752 .as_ref()
753 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
754
755 Ok(handler.last_seq_sent())
756 }
757
758 #[cfg(feature = "conflux")]
762 pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
763 let handler = self
764 .conflux_handler
765 .as_mut()
766 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
767
768 handler.set_last_seq_sent(n);
769 Ok(())
770 }
771
772 #[cfg(feature = "conflux")]
776 pub(super) fn last_seq_recv(&self) -> Result<u64> {
777 let handler = self
778 .conflux_handler
779 .as_ref()
780 .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
781
782 Ok(handler.last_seq_recv())
783 }
784
785 #[cfg(feature = "hs-service")]
789 fn handle_incoming_stream_request(
790 &mut self,
791 handlers: &mut CellHandlers,
792 msg: UnparsedRelayMsg,
793 stream_id: StreamId,
794 hop_num: HopNum,
795 leg: UniqId,
796 ) -> Result<Option<CircuitCmd>> {
797 use super::syncview::ClientCircSyncView;
798 use tor_cell::relaycell::msg::EndReason;
799 use tor_error::into_internal;
800 use tor_log_ratelim::log_ratelim;
801
802 use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
803
804 let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
807 return Err(Error::CircProto(
808 "Cannot handle BEGIN cells on this circuit".into(),
809 ));
810 };
811
812 if hop_num != handler.hop_num {
813 return Err(Error::CircProto(format!(
814 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
815 handler.hop_num.display(),
816 msg.cmd(),
817 hop_num.display()
818 )));
819 }
820
821 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
822
823 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
831
832 if message_closes_stream {
833 hop.ending_msg_received(stream_id)?;
834
835 return Ok(None);
836 }
837
838 let begin = msg
839 .decode::<Begin>()
840 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
841 .into_msg();
842
843 let req = IncomingStreamRequest::Begin(begin);
844
845 {
846 use crate::stream::IncomingStreamRequestDisposition::*;
847
848 let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
849 let view = ClientCircSyncView::new(&self.hops);
855
856 match handler.filter.as_mut().disposition(&ctx, &view)? {
857 Accept => {}
858 CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
859 RejectRequest(end) => {
860 let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
861 let cell = SendRelayCell {
862 hop: hop_num,
863 early: false,
864 cell: end_msg,
865 };
866 return Ok(Some(CircuitCmd::Send(cell)));
867 }
868 }
869 }
870
871 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
874 let relay_cell_format = hop.relay_format();
875
876 let memquota = StreamAccount::new(&self.memquota)?;
877
878 let (sender, receiver) = stream_queue(
879 STREAM_READER_BUFFER,
880 &memquota,
881 self.chan_sender.as_inner().time_provider(),
882 )?;
883
884 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
885 self.chan_sender.as_inner().time_provider().clone(),
886 memquota.as_raw_account(),
887 )?;
888
889 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
890
891 let mut drain_rate_request_tx = NotifySender::new_typed();
895 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
896
897 let cmd_checker = DataCmdChecker::new_connected();
898 hop.add_ent_with_id(
899 sender,
900 msg_rx,
901 rate_limit_tx,
902 drain_rate_request_tx,
903 stream_id,
904 cmd_checker,
905 )?;
906
907 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
908 req,
909 stream_id,
910 hop: (leg, hop_num).into(),
911 msg_tx,
912 receiver,
913 rate_limit_stream: rate_limit_rx,
914 drain_rate_request_stream: drain_rate_request_rx,
915 memquota,
916 relay_cell_format,
917 });
918
919 log_ratelim!("Delivering message to incoming stream handler"; outcome);
920
921 if let Err(e) = outcome {
922 if e.is_full() {
923 let end_msg = AnyRelayMsgOuter::new(
927 Some(stream_id),
928 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
929 );
930
931 let cell = SendRelayCell {
932 hop: hop_num,
933 early: false,
934 cell: end_msg,
935 };
936 return Ok(Some(CircuitCmd::Send(cell)));
937 } else if e.is_disconnected() {
938 debug!(
950 circ_id = %self.unique_id,
951 "Incoming stream request receiver dropped",
952 );
953 return Err(Error::CircuitClosed);
955 } else {
956 return Err(Error::from((into_internal!(
960 "try_send failed unexpectedly"
961 ))(e)));
962 }
963 }
964
965 Ok(None)
966 }
967
968 #[allow(clippy::unnecessary_wraps)]
970 fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
971 Ok(CircuitCmd::CleanShutdown)
973 }
974
975 pub(super) async fn handle_create(
977 &mut self,
978 recv_created: oneshot::Receiver<CreateResponse>,
979 handshake: CircuitHandshake,
980 settings: HopSettings,
981 done: ReactorResultChannel<()>,
982 ) -> StdResult<(), ReactorError> {
983 let ret = match handshake {
984 CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
985 CircuitHandshake::Ntor {
986 public_key,
987 ed_identity,
988 } => {
989 self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
990 .await
991 }
992 CircuitHandshake::NtorV3 { public_key } => {
993 self.create_firsthop_ntor_v3(recv_created, public_key, settings)
994 .await
995 }
996 };
997 let _ = done.send(ret); self.chan_sender.flush().await?;
1002
1003 Ok(())
1004 }
1005
1006 async fn create_impl<H, W, M>(
1012 &mut self,
1013 recvcreated: oneshot::Receiver<CreateResponse>,
1014 wrap: &W,
1015 key: &H::KeyType,
1016 mut settings: HopSettings,
1017 msg: &M,
1018 ) -> Result<()>
1019 where
1020 H: ClientHandshake + HandshakeAuxDataHandler,
1021 W: CreateHandshakeWrap,
1022 H::KeyGen: KeyGenerator,
1023 M: Borrow<H::ClientAuxData>,
1024 {
1025 let (state, msg) = {
1030 let mut rng = rand::rng();
1033 H::client1(&mut rng, key, msg)?
1034 };
1035 let create_cell = wrap.to_chanmsg(msg);
1036 trace!(
1037 circ_id = %self.unique_id,
1038 create = %create_cell.cmd(),
1039 "Extending to hop 1",
1040 );
1041 self.send_msg(create_cell).await?;
1042
1043 let reply = recvcreated
1044 .await
1045 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1046
1047 let relay_handshake = wrap.decode_chanmsg(reply)?;
1048 let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1049
1050 H::handle_server_aux_data(&mut settings, &server_msg)?;
1051
1052 let BoxedClientLayer { fwd, back, binding } = settings
1053 .relay_crypt_protocol()
1054 .construct_client_layers(HandshakeRole::Initiator, keygen)?;
1055
1056 trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
1057
1058 let peer_id = self.channel.target().clone();
1059
1060 self.add_hop(
1061 path::HopDetail::Relay(peer_id),
1062 fwd,
1063 back,
1064 binding,
1065 &settings,
1066 )?;
1067 Ok(())
1068 }
1069
1070 async fn create_firsthop_fast(
1077 &mut self,
1078 recvcreated: oneshot::Receiver<CreateResponse>,
1079 settings: HopSettings,
1080 ) -> Result<()> {
1081 let wrap = CreateFastWrap;
1083 self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
1084 .await
1085 }
1086
1087 async fn create_firsthop_ntor(
1092 &mut self,
1093 recvcreated: oneshot::Receiver<CreateResponse>,
1094 ed_identity: pk::ed25519::Ed25519Identity,
1095 pubkey: NtorPublicKey,
1096 settings: HopSettings,
1097 ) -> Result<()> {
1098 let target = RelayIds::builder()
1100 .ed_identity(ed_identity)
1101 .rsa_identity(pubkey.id)
1102 .build()
1103 .expect("Unable to build RelayIds");
1104 self.channel.check_match(&target)?;
1105
1106 let wrap = Create2Wrap {
1107 handshake_type: HandshakeType::NTOR,
1108 };
1109 self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
1110 .await
1111 }
1112
1113 async fn create_firsthop_ntor_v3(
1118 &mut self,
1119 recvcreated: oneshot::Receiver<CreateResponse>,
1120 pubkey: NtorV3PublicKey,
1121 settings: HopSettings,
1122 ) -> Result<()> {
1123 let target = RelayIds::builder()
1125 .ed_identity(pubkey.id)
1126 .build()
1127 .expect("Unable to build RelayIds");
1128 self.channel.check_match(&target)?;
1129
1130 let client_extensions = circ_extensions_from_settings(&settings)?;
1132 let wrap = Create2Wrap {
1133 handshake_type: HandshakeType::NTOR_V3,
1134 };
1135
1136 self.create_impl::<NtorV3Client, _, _>(
1137 recvcreated,
1138 &wrap,
1139 &pubkey,
1140 settings,
1141 &client_extensions,
1142 )
1143 .await
1144 }
1145
1146 pub(super) fn add_hop(
1150 &mut self,
1151 peer_id: path::HopDetail,
1152 fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1153 rev: Box<dyn InboundClientLayer + 'static + Send>,
1154 binding: Option<CircuitBinding>,
1155 settings: &HopSettings,
1156 ) -> StdResult<(), Bug> {
1157 let hop_num = self.hops.len();
1158 debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1159
1160 if hop_num == usize::from(u8::MAX) {
1164 return Err(internal!(
1165 "cannot add more hops to a circuit with `u8::MAX` hops"
1166 ));
1167 }
1168
1169 let hop_num = (hop_num as u8).into();
1170
1171 let hop = CircHop::new(self.unique_id, hop_num, settings);
1172 self.hops.push(hop);
1173 self.crypto_in.add_layer(rev);
1174 self.crypto_out.add_layer(fwd);
1175 self.mutable.add_hop(peer_id, binding);
1176
1177 Ok(())
1178 }
1179
1180 #[allow(clippy::cognitive_complexity)]
1182 fn handle_meta_cell(
1183 &mut self,
1184 handlers: &mut CellHandlers,
1185 hopnum: HopNum,
1186 msg: UnparsedRelayMsg,
1187 ) -> Result<Option<CircuitCmd>> {
1188 if msg.cmd() == RelayCmd::SENDME {
1198 let sendme = msg
1199 .decode::<Sendme>()
1200 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1201 .into_msg();
1202
1203 return Ok(Some(CircuitCmd::HandleSendMe {
1204 hop: hopnum,
1205 sendme,
1206 }));
1207 }
1208 if msg.cmd() == RelayCmd::TRUNCATED {
1209 let truncated = msg
1210 .decode::<Truncated>()
1211 .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1212 .into_msg();
1213 let reason = truncated.reason();
1214 debug!(
1215 circ_id = %self.unique_id,
1216 "Truncated from hop {}. Reason: {} [{}]",
1217 hopnum.display(),
1218 reason.human_str(),
1219 reason
1220 );
1221
1222 return Ok(Some(CircuitCmd::CleanShutdown));
1223 }
1224
1225 trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1226
1227 #[cfg(feature = "conflux")]
1228 if matches!(
1229 msg.cmd(),
1230 RelayCmd::CONFLUX_LINK
1231 | RelayCmd::CONFLUX_LINKED
1232 | RelayCmd::CONFLUX_LINKED_ACK
1233 | RelayCmd::CONFLUX_SWITCH
1234 ) {
1235 return self.handle_conflux_msg(hopnum, msg);
1236 }
1237
1238 if self.is_conflux_pending() {
1239 warn!(
1240 circ_id = %self.unique_id,
1241 "received unexpected cell {msg:?} on unlinked conflux circuit",
1242 );
1243 return Err(Error::CircProto(
1244 "Received unexpected cell on unlinked circuit".into(),
1245 ));
1246 }
1247
1248 if let Some(mut handler) = handlers.meta_handler.take() {
1256 if handler.expected_hop() == (self.unique_id(), hopnum).into() {
1258 let ret = handler.handle_msg(msg, self);
1260 trace!(
1261 circ_id = %self.unique_id,
1262 result = ?ret,
1263 "meta handler completed",
1264 );
1265 match ret {
1266 #[cfg(feature = "send-control-msg")]
1267 Ok(MetaCellDisposition::Consumed) => {
1268 handlers.meta_handler = Some(handler);
1269 Ok(None)
1270 }
1271 Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1272 #[cfg(feature = "send-control-msg")]
1273 Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1274 Err(e) => Err(e),
1275 }
1276 } else {
1277 handlers.meta_handler = Some(handler);
1280
1281 unsupported_client_cell!(msg, hopnum)
1282 }
1283 } else {
1284 unsupported_client_cell!(msg)
1287 }
1288 }
1289
1290 pub(super) fn handle_sendme(
1292 &mut self,
1293 hopnum: HopNum,
1294 msg: Sendme,
1295 signals: CongestionSignals,
1296 ) -> Result<Option<CircuitCmd>> {
1297 let runtime = self.runtime.clone();
1299
1300 let hop = self
1303 .hop_mut(hopnum)
1304 .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1305
1306 let tag = msg.into_sendme_tag().ok_or_else(||
1307 Error::CircProto("missing tag on circuit sendme".into()))?;
1310 hop.ccontrol_mut()
1312 .note_sendme_received(&runtime, tag, signals)?;
1313 Ok(None)
1314 }
1315
1316 async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1328 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1329 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1331 Ok(())
1332 }
1333
1334 pub(super) fn ready_streams_iterator(
1347 &self,
1348 exclude: Option<HopNum>,
1349 ) -> impl Stream<Item = Result<CircuitCmd>> {
1350 self.hops.ready_streams_iterator(exclude)
1351 }
1352
1353 pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1357 futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1358 Poll::Ready(CongestionSignals::new(
1359 self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1360 self.chan_sender.n_queued(),
1361 ))
1362 })
1363 .await
1364 }
1365
1366 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1368 self.hops.hop(hopnum)
1369 }
1370
1371 pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1373 self.hops.get_mut(hopnum)
1374 }
1375
1376 #[allow(clippy::too_many_arguments)]
1379 pub(super) fn begin_stream(
1380 &mut self,
1381 hop_num: HopNum,
1382 message: AnyRelayMsg,
1383 sender: StreamQueueSender,
1384 rx: StreamMpscReceiver<AnyRelayMsg>,
1385 rate_limit_notifier: watch::Sender<StreamRateLimit>,
1386 drain_rate_requester: NotifySender<DrainRateRequest>,
1387 cmd_checker: AnyCmdChecker,
1388 ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1389 let Some(hop) = self.hop_mut(hop_num) else {
1390 return Err(internal!(
1391 "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1392 self.unique_id,
1393 ));
1394 };
1395
1396 Ok(hop.begin_stream(
1397 message,
1398 sender,
1399 rx,
1400 rate_limit_notifier,
1401 drain_rate_requester,
1402 cmd_checker,
1403 ))
1404 }
1405
1406 pub(super) async fn close_stream(
1408 &mut self,
1409 hop_num: HopNum,
1410 sid: StreamId,
1411 behav: CloseStreamBehavior,
1412 reason: streammap::TerminateReason,
1413 ) -> Result<()> {
1414 if let Some(hop) = self.hop_mut(hop_num) {
1415 let res = hop.close_stream(sid, behav, reason)?;
1416 if let Some(cell) = res {
1417 self.send_relay_cell(cell).await?;
1418 }
1419 }
1420 Ok(())
1421 }
1422
1423 pub(super) fn has_streams(&self) -> bool {
1429 self.hops.has_streams()
1430 }
1431
1432 pub(super) fn num_hops(&self) -> u8 {
1434 self.hops
1439 .len()
1440 .try_into()
1441 .expect("`hops.len()` has more than `u8::MAX` hops")
1442 }
1443
1444 pub(super) fn has_hops(&self) -> bool {
1446 !self.hops.is_empty()
1447 }
1448
1449 pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1453 let num_hops = self.num_hops();
1454 if num_hops == 0 {
1455 return None;
1457 }
1458 Some(HopNum::from(num_hops - 1))
1459 }
1460
1461 pub(super) fn path(&self) -> Arc<path::Path> {
1465 self.mutable.path()
1466 }
1467
1468 pub(super) fn clock_skew(&self) -> ClockSkew {
1471 self.channel.clock_skew()
1472 }
1473
1474 pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1478 let hop = self.hop(hop)?;
1479 Some(hop.ccontrol().uses_stream_sendme())
1480 }
1481
1482 pub(super) fn is_conflux_pending(&self) -> bool {
1484 let Some(status) = self.conflux_status() else {
1485 return false;
1486 };
1487
1488 status != ConfluxStatus::Linked
1489 }
1490
1491 pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1495 cfg_if::cfg_if! {
1496 if #[cfg(feature = "conflux")] {
1497 self.conflux_handler
1498 .as_ref()
1499 .map(|handler| handler.status())
1500 } else {
1501 None
1502 }
1503 }
1504 }
1505
1506 #[cfg(feature = "conflux")]
1508 pub(super) fn init_rtt(&self) -> Option<Duration> {
1509 self.conflux_handler
1510 .as_ref()
1511 .map(|handler| handler.init_rtt())?
1512 }
1513}
1514
1515#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1517pub(super) enum ConfluxStatus {
1518 Unlinked,
1520 Pending,
1522 Linked,
1524}
1525
1526fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1530 let cmd = msg.cmd();
1531 let streamid = msg.stream_id();
1532 if !cmd.accepts_streamid_val(streamid) {
1533 return Err(Error::CircProto(format!(
1534 "Invalid stream ID {} for relay command {}",
1535 sv(StreamId::get_or_zero(streamid)),
1536 msg.cmd()
1537 )));
1538 }
1539
1540 Ok(streamid)
1541}
1542
1543impl Drop for Circuit {
1544 fn drop(&mut self) {
1545 let _ = self.channel.close_circuit(self.channel_id);
1546 }
1547}
1548
1549#[allow(clippy::unnecessary_wraps)]
1552pub(super) fn circ_extensions_from_settings(params: &HopSettings) -> Result<Vec<CircRequestExt>> {
1553 #[allow(unused_mut)]
1555 let mut client_extensions = Vec::new();
1556
1557 #[allow(unused, unused_mut)]
1558 let mut cc_extension_set = false;
1559
1560 if params.ccontrol.is_enabled() {
1561 cfg_if::cfg_if! {
1562 if #[cfg(feature = "flowctl-cc")] {
1563 #[cfg(not(test))]
1571 panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1572
1573 #[allow(unreachable_code)]
1574 client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1575 cc_extension_set = true;
1576 } else {
1577 return Err(
1578 tor_error::internal!(
1579 "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1580 )
1581 .into()
1582 );
1583 }
1584 }
1585 }
1586
1587 #[allow(unused_mut)]
1598 let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
1599
1600 #[cfg(feature = "counter-galois-onion")]
1601 if matches!(params.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
1602 if !cc_extension_set {
1603 return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
1604 }
1605 required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
1606 }
1607
1608 if !required_protocol_capabilities.is_empty() {
1609 client_extensions.push(CircRequestExt::SubprotocolRequest(
1610 required_protocol_capabilities.into_iter().collect(),
1611 ));
1612 }
1613
1614 Ok(client_extensions)
1615}