1pub(super) mod create;
4pub(super) mod extender;
5
6use crate::channel::{Channel, ChannelSender};
7use crate::congestion::sendme;
8use crate::congestion::{CongestionControl, CongestionSignals};
9use crate::crypto::binding::CircuitBinding;
10use crate::crypto::cell::{
11 HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
12 RelayCellBody,
13};
14use crate::crypto::handshake::fast::CreateFastClient;
15use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
16use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
17use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
18use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
19use crate::stream::{AnyCmdChecker, StreamSendFlowControl, StreamStatus};
20use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
21use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
22use crate::tunnel::circuit::path;
23use crate::tunnel::circuit::unique_id::UniqId;
24use crate::tunnel::circuit::{
25 CircParameters, CircuitRxReceiver, MutableState, StreamMpscReceiver, StreamMpscSender,
26};
27use crate::tunnel::handshake::RelayCryptLayerProtocol;
28use crate::tunnel::reactor::MetaCellDisposition;
29use crate::tunnel::streammap::{
30 self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
31};
32use crate::util::err::ReactorError;
33use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
34use crate::util::SinkExt as _;
35use crate::{ClockSkew, Error, Result};
36
37use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
38use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
39use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
40use tor_cell::chancell::{BoxedCellBody, ChanMsg};
41use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
42use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
43use tor_cell::relaycell::{
44 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
45 StreamId, UnparsedRelayMsg,
46};
47use tor_error::{internal, Bug};
48use tor_linkspec::RelayIds;
49use tor_llcrypto::pk;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52use futures::stream::FuturesUnordered;
53use futures::{SinkExt as _, Stream};
54use oneshot_fused_workaround as oneshot;
55use safelog::sensitive as sv;
56use tracing::{debug, trace, warn};
57
58#[cfg(feature = "conflux")]
59use super::conflux::ConfluxMsgHandler;
60use super::{
61 CellHandlers, CircuitHandshake, CloseStreamBehavior, LegId, ReactorResultChannel, SendRelayCell,
62};
63
64use std::borrow::Borrow;
65use std::pin::Pin;
66use std::result::Result as StdResult;
67use std::sync::{Arc, Mutex};
68use std::task::Poll;
69use std::time::{Duration, SystemTime};
70
71use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
72use extender::HandshakeAuxDataHandler;
73
74#[cfg(feature = "hs-service")]
75use {
76 crate::stream::{DataCmdChecker, IncomingStreamRequest},
77 tor_cell::relaycell::msg::Begin,
78};
79
80#[cfg(feature = "conflux")]
81use {
82 super::conflux::{ConfluxAction, OooRelayMsg},
83 crate::tunnel::reactor::RemoveLegReason,
84};
85
86pub(super) const SEND_WINDOW_INIT: u16 = 500;
88pub(crate) const RECV_WINDOW_INIT: u16 = 500;
90pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
96
97pub(super) struct CircHop {
99 unique_id: UniqId,
101 hop_num: HopNum,
103 map: Arc<Mutex<streammap::StreamMap>>,
122 ccontrol: CongestionControl,
126 inbound: RelayCellDecoder,
128 relay_format: RelayCellFormat,
132}
133
134pub(crate) struct Circuit {
139 channel: Arc<Channel>,
141 pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
146 pub(super) input: CircuitRxReceiver,
151 crypto_in: InboundClientCrypt,
155 crypto_out: OutboundClientCrypt,
157 hops: Vec<CircHop>,
159 mutable: Arc<MutableState>,
162 channel_id: CircId,
164 unique_id: UniqId,
166 #[cfg(feature = "conflux")]
171 conflux_handler: Option<ConfluxMsgHandler>,
172 #[allow(dead_code)] memquota: CircuitAccount,
175}
176
177#[derive(Debug)]
185pub(super) enum CircuitCmd {
186 Send(SendRelayCell),
188 HandleSendMe {
190 hop: HopNum,
192 sendme: Sendme,
194 },
195 CloseStream {
197 hop: HopNum,
199 sid: StreamId,
201 behav: CloseStreamBehavior,
203 reason: streammap::TerminateReason,
205 },
206 #[cfg(feature = "conflux")]
212 ConfluxRemove(RemoveLegReason),
213 #[cfg(feature = "conflux")]
219 ConfluxHandshakeComplete(SendRelayCell),
220 CleanShutdown,
222 #[cfg(feature = "conflux")]
224 Enqueue(OooRelayMsg),
225}
226
227macro_rules! unsupported_client_cell {
234 ($msg:expr) => {{
235 unsupported_client_cell!(@ $msg, "")
236 }};
237
238 ($msg:expr, $hopnum:expr) => {{
239 let hop: HopNum = $hopnum;
240 let hop_display = format!(" from hop {}", hop.display());
241 unsupported_client_cell!(@ $msg, hop_display)
242 }};
243
244 (@ $msg:expr, $hopnum_display:expr) => {
245 Err(crate::Error::CircProto(format!(
246 "Unexpected {} cell{} on client circuit",
247 $msg.cmd(),
248 $hopnum_display,
249 )))
250 };
251}
252
253pub(super) use unsupported_client_cell;
254
255impl Circuit {
256 pub(super) fn new(
258 channel: Arc<Channel>,
259 channel_id: CircId,
260 unique_id: UniqId,
261 input: CircuitRxReceiver,
262 memquota: CircuitAccount,
263 mutable: Arc<MutableState>,
264 ) -> Self {
265 let chan_sender = SometimesUnboundedSink::new(channel.sender());
266
267 let crypto_out = OutboundClientCrypt::new();
268 Circuit {
269 channel,
270 chan_sender,
271 input,
272 crypto_in: InboundClientCrypt::new(),
273 hops: vec![],
274 unique_id,
275 channel_id,
276 crypto_out,
277 mutable,
278 #[cfg(feature = "conflux")]
279 conflux_handler: None,
280 memquota,
281 }
282 }
283
284 pub(super) fn unique_id(&self) -> UniqId {
286 self.unique_id
287 }
288
289 pub(super) fn mutable(&self) -> &Arc<MutableState> {
291 &self.mutable
292 }
293
294 #[cfg(feature = "conflux")]
298 pub(super) fn install_conflux_handler(&mut self, conflux_handler: ConfluxMsgHandler) {
299 self.conflux_handler = Some(conflux_handler);
300 }
301
302 #[cfg(feature = "conflux")]
307 pub(super) async fn begin_conflux_link(
308 &mut self,
309 hop: HopNum,
310 cell: AnyRelayMsgOuter,
311 runtime: &tor_rtcompat::DynTimeProvider,
312 ) -> Result<()> {
313 use tor_rtcompat::SleepProvider as _;
314
315 if self.conflux_handler.is_none() {
316 return Err(internal!(
317 "tried to send LINK cell before installing a ConfluxMsgHandler?!"
318 )
319 .into());
320 }
321
322 let cell = SendRelayCell {
323 hop,
324 early: false,
325 cell,
326 };
327 self.send_relay_cell(cell).await?;
328
329 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
330 return Err(internal!("ConfluxMsgHandler disappeared?!").into());
331 };
332
333 Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
334 }
335
336 pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
340 cfg_if::cfg_if! {
341 if #[cfg(feature = "conflux")] {
342 self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
343 } else {
344 None
345 }
346 }
347 }
348
349 #[cfg(test)]
351 pub(super) fn handle_add_fake_hop(
352 &mut self,
353 format: RelayCellFormat,
354 fwd_lasthop: bool,
355 rev_lasthop: bool,
356 params: &CircParameters,
357 done: ReactorResultChannel<()>,
358 ) {
359 use crate::tunnel::circuit::test::DummyCrypto;
360
361 let dummy_peer_id = tor_linkspec::OwnedChanTarget::builder()
362 .ed_identity([4; 32].into())
363 .rsa_identity([5; 20].into())
364 .build()
365 .expect("Could not construct fake hop");
366
367 let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
368 let rev = Box::new(DummyCrypto::new(rev_lasthop));
369 let binding = None;
370 self.add_hop(
371 format,
372 path::HopDetail::Relay(dummy_peer_id),
373 fwd,
374 rev,
375 binding,
376 params,
377 )
378 .expect("could not add hop to circuit");
379 let _ = done.send(Ok(()));
380 }
381
382 fn encode_relay_cell(
386 crypto_out: &mut OutboundClientCrypt,
387 relay_format: RelayCellFormat,
388 hop: HopNum,
389 early: bool,
390 msg: AnyRelayMsgOuter,
391 ) -> Result<(AnyChanMsg, SendmeTag)> {
392 let mut body: RelayCellBody = msg
393 .encode(relay_format, &mut rand::rng())
394 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
395 .into();
396 let cmd = if early {
397 ChanCmd::RELAY_EARLY
398 } else {
399 ChanCmd::RELAY
400 };
401 let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
402 let msg = Relay::from(BoxedCellBody::from(body));
403 let msg = if early {
404 AnyChanMsg::RelayEarly(msg.into())
405 } else {
406 AnyChanMsg::Relay(msg)
407 };
408
409 Ok((msg, tag))
410 }
411
412 pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
419 if self.is_conflux_pending() {
420 return Err(internal!("tried to send cell on unlinked circuit").into());
422 }
423
424 let SendRelayCell {
425 hop,
426 early,
427 cell: msg,
428 } = msg;
429
430 trace!("{}: sending relay cell: {:?}", self.unique_id, msg);
431
432 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
433 let stream_id = msg.stream_id();
434 let hop_num = Into::<usize>::into(hop);
435 let circhop = &mut self.hops.get_mut(hop_num).ok_or(Error::NoSuchHop)?;
436
437 if c_t_w {
439 if let Some(stream_id) = stream_id {
440 let mut hop_map = circhop.map.lock().expect("lock poisoned");
441 let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
442 warn!(
443 "{}: sending a relay cell for non-existent or non-open stream with ID {}!",
444 self.unique_id, stream_id
445 );
446 return Err(Error::CircProto(format!(
447 "tried to send a relay cell on non-open stream {}",
448 sv(stream_id),
449 )));
450 };
451 ent.take_capacity_to_send(msg.msg())?;
452 }
453 }
454
455 let relay_cmd = msg.cmd();
459
460 let (msg, tag) =
463 Self::encode_relay_cell(&mut self.crypto_out, circhop.relay_format, hop, early, msg)?;
464 if c_t_w {
467 circhop.ccontrol.note_data_sent(&tag)?;
468 }
469
470 let cell = AnyChanCell::new(Some(self.channel_id), msg);
471 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
472
473 #[cfg(feature = "conflux")]
474 if let Some(conflux) = self.conflux_handler.as_mut() {
475 conflux.note_cell_sent(relay_cmd);
476 }
477
478 Ok(())
479 }
480
481 pub(super) fn handle_cell(
494 &mut self,
495 handlers: &mut CellHandlers,
496 leg: LegId,
497 cell: ClientCircChanMsg,
498 ) -> Result<Vec<CircuitCmd>> {
499 trace!("{}: handling cell: {:?}", self.unique_id, cell);
500 use ClientCircChanMsg::*;
501 match cell {
502 Relay(r) => self.handle_relay_cell(handlers, leg, r),
503 Destroy(d) => {
504 let reason = d.reason();
505 debug!(
506 "{}: Received DESTROY cell. Reason: {} [{}]",
507 self.unique_id,
508 reason.human_str(),
509 reason
510 );
511
512 self.handle_destroy_cell().map(|c| vec![c])
513 }
514 }
515 }
516
517 fn decode_relay_cell(
520 &mut self,
521 cell: Relay,
522 ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
523 let cmd = cell.cmd();
525 let mut body = cell.into_relay_body().into();
526
527 let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
530
531 let decode_res = self
533 .hop_mut(hopnum)
534 .ok_or_else(|| {
535 Error::from(internal!(
536 "Trying to decode cell from nonexistent hop {:?}",
537 hopnum
538 ))
539 })?
540 .inbound
541 .decode(body.into())
542 .map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
543
544 Ok((hopnum, tag, decode_res))
545 }
546
547 fn handle_relay_cell(
549 &mut self,
550 handlers: &mut CellHandlers,
551 leg: LegId,
552 cell: Relay,
553 ) -> Result<Vec<CircuitCmd>> {
554 let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
555
556 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
557
558 let send_circ_sendme = if c_t_w {
561 self.hop_mut(hopnum)
562 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
563 .ccontrol
564 .note_data_received()?
565 } else {
566 false
567 };
568
569 let mut circ_cmds = vec![];
570 if send_circ_sendme {
572 let sendme = Sendme::from(tag);
577 let cell = AnyRelayMsgOuter::new(None, sendme.into());
578 circ_cmds.push(CircuitCmd::Send(SendRelayCell {
579 hop: hopnum,
580 early: false,
581 cell,
582 }));
583
584 self.hop_mut(hopnum)
586 .ok_or_else(|| {
587 Error::from(internal!(
588 "Trying to send SENDME to nonexistent hop {:?}",
589 hopnum
590 ))
591 })?
592 .ccontrol
593 .note_sendme_sent()?;
594 }
595
596 let (mut msgs, incomplete) = decode_res.into_parts();
597 while let Some(msg) = msgs.next() {
598 let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
599
600 match msg_status {
601 None => continue,
602 Some(msg @ CircuitCmd::CleanShutdown) => {
603 for m in msgs {
604 debug!(
605 "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
606 id = self.unique_id
607 );
608 }
609 if let Some(incomplete) = incomplete {
610 debug!(
611 "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
612 incomplete,
613 id=self.unique_id,
614 );
615 }
616 circ_cmds.push(msg);
617 return Ok(circ_cmds);
618 }
619 Some(msg) => {
620 circ_cmds.push(msg);
621 }
622 }
623 }
624
625 Ok(circ_cmds)
626 }
627
628 fn handle_relay_msg(
630 &mut self,
631 handlers: &mut CellHandlers,
632 hopnum: HopNum,
633 leg: LegId,
634 cell_counts_toward_windows: bool,
635 msg: UnparsedRelayMsg,
636 ) -> Result<Option<CircuitCmd>> {
637 let streamid = msg_streamid(&msg)?;
640
641 let Some(streamid) = streamid else {
644 return self.handle_meta_cell(handlers, hopnum, msg);
645 };
646
647 #[cfg(feature = "conflux")]
648 let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
649 match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
650 ConfluxAction::Deliver(msg) => {
651 msg
658 }
659 ConfluxAction::Enqueue(msg) => {
660 return Ok(Some(CircuitCmd::Enqueue(msg)));
662 }
663 }
664 } else {
665 msg
668 };
669
670 self.handle_in_order_relay_msg(
671 handlers,
672 hopnum,
673 leg,
674 cell_counts_toward_windows,
675 streamid,
676 msg,
677 )
678 }
679
680 pub(super) fn handle_in_order_relay_msg(
682 &mut self,
683 handlers: &mut CellHandlers,
684 hopnum: HopNum,
685 leg: LegId,
686 cell_counts_toward_windows: bool,
687 streamid: StreamId,
688 msg: UnparsedRelayMsg,
689 ) -> Result<Option<CircuitCmd>> {
690 #[cfg(feature = "conflux")]
691 if let Some(conflux) = self.conflux_handler.as_mut() {
692 conflux.inc_last_seq_delivered(&msg);
693 }
694
695 let hop = self
696 .hop_mut(hopnum)
697 .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
698 let mut hop_map = hop.map.lock().expect("lock poisoned");
699 match hop_map.get_mut(streamid) {
700 Some(StreamEntMut::Open(ent)) => {
701 let message_closes_stream =
703 Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
704
705 if message_closes_stream {
706 hop_map.ending_msg_received(streamid)?;
707 }
708 }
709 #[cfg(feature = "hs-service")]
710 Some(StreamEntMut::EndSent(_))
711 if matches!(
712 msg.cmd(),
713 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
714 ) =>
715 {
716 hop_map.ending_msg_received(streamid)?;
720 drop(hop_map);
721 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
722 }
723 Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
724 match half_stream.handle_msg(msg)? {
727 StreamStatus::Open => {}
728 StreamStatus::Closed => {
729 hop_map.ending_msg_received(streamid)?;
730 }
731 }
732 }
733 #[cfg(feature = "hs-service")]
734 None if matches!(
735 msg.cmd(),
736 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
737 ) =>
738 {
739 drop(hop_map);
740 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
741 }
742 _ => {
743 return Err(Error::CircProto(
745 "Cell received on nonexistent stream!?".into(),
746 ));
747 }
748 }
749 Ok(None)
750 }
751
752 #[cfg(feature = "conflux")]
760 fn handle_conflux_msg(
761 &mut self,
762 hop: HopNum,
763 msg: UnparsedRelayMsg,
764 ) -> Result<Option<CircuitCmd>> {
765 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
766 return Err(Error::CircProto(format!(
771 "Received {} cell from hop {} on non-conflux client circuit?!",
772 msg.cmd(),
773 hop.display(),
774 )));
775 };
776
777 Ok(conflux_handler.handle_conflux_msg(msg, hop))
778 }
779
780 #[cfg(feature = "conflux")]
784 pub(super) fn last_seq_sent(&self) -> Result<u64> {
785 let handler = self
786 .conflux_handler
787 .as_ref()
788 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
789
790 Ok(handler.last_seq_sent())
791 }
792
793 #[cfg(feature = "conflux")]
797 pub(super) fn last_seq_recv(&self) -> Result<u64> {
798 let handler = self
799 .conflux_handler
800 .as_ref()
801 .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
802
803 Ok(handler.last_seq_recv())
804 }
805
806 fn deliver_msg_to_stream(
808 streamid: StreamId,
809 ent: &mut OpenStreamEnt,
810 cell_counts_toward_windows: bool,
811 msg: UnparsedRelayMsg,
812 ) -> Result<bool> {
813 if msg.cmd() == RelayCmd::SENDME {
816 let _sendme = msg
817 .decode::<Sendme>()
818 .map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))?
819 .into_msg();
820
821 ent.put_for_incoming_sendme()?;
825 return Ok(false);
826 }
827
828 let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
829
830 if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
831 if e.is_full() {
832 return Err(Error::CircProto(format!(
835 "Stream sink would block; received too many cells on stream ID {}",
836 sv(streamid),
837 )));
838 }
839 if e.is_disconnected() && cell_counts_toward_windows {
840 ent.dropped += 1;
845 }
846 }
847
848 Ok(message_closes_stream)
849 }
850
851 #[cfg(feature = "hs-service")]
853 fn handle_incoming_stream_request(
854 &mut self,
855 handlers: &mut CellHandlers,
856 msg: UnparsedRelayMsg,
857 stream_id: StreamId,
858 hop_num: HopNum,
859 leg: LegId,
860 ) -> Result<Option<CircuitCmd>> {
861 use super::syncview::ClientCircSyncView;
862 use tor_cell::relaycell::msg::EndReason;
863 use tor_error::into_internal;
864 use tor_log_ratelim::log_ratelim;
865
866 use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
867
868 let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
871 return Err(Error::CircProto(
872 "Cannot handle BEGIN cells on this circuit".into(),
873 ));
874 };
875
876 if hop_num != handler.hop_num {
877 return Err(Error::CircProto(format!(
878 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
879 handler.hop_num.display(),
880 msg.cmd(),
881 hop_num.display()
882 )));
883 }
884
885 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
886
887 let hop = self
895 .hops
896 .get_mut(Into::<usize>::into(hop_num))
897 .ok_or(Error::CircuitClosed)?;
898
899 if message_closes_stream {
900 hop.map
901 .lock()
902 .expect("lock poisoned")
903 .ending_msg_received(stream_id)?;
904
905 return Ok(None);
906 }
907
908 let begin = msg
909 .decode::<Begin>()
910 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
911 .into_msg();
912
913 let req = IncomingStreamRequest::Begin(begin);
914
915 {
916 use crate::stream::IncomingStreamRequestDisposition::*;
917
918 let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
919 let view = ClientCircSyncView::new(&self.hops);
925
926 match handler.filter.as_mut().disposition(&ctx, &view)? {
927 Accept => {}
928 CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
929 RejectRequest(end) => {
930 let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
931 let cell = SendRelayCell {
932 hop: hop_num,
933 early: false,
934 cell: end_msg,
935 };
936 return Ok(Some(CircuitCmd::Send(cell)));
937 }
938 }
939 }
940
941 let hop = self
944 .hops
945 .get_mut(Into::<usize>::into(hop_num))
946 .ok_or(Error::CircuitClosed)?;
947 let relay_cell_format = hop.relay_format;
948
949 let memquota = StreamAccount::new(&self.memquota)?;
950
951 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
952 self.chan_sender.as_inner().time_provider().clone(),
953 memquota.as_raw_account(),
954 )?;
955 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
956 self.chan_sender.as_inner().time_provider().clone(),
957 memquota.as_raw_account(),
958 )?;
959
960 let cmd_checker = DataCmdChecker::new_connected();
961 hop.map.lock().expect("lock poisoned").add_ent_with_id(
962 sender,
963 msg_rx,
964 hop.build_send_flow_ctrl(),
965 stream_id,
966 cmd_checker,
967 )?;
968
969 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
970 req,
971 stream_id,
972 hop_num,
973 leg,
974 msg_tx,
975 receiver,
976 memquota,
977 relay_cell_format,
978 });
979
980 log_ratelim!("Delivering message to incoming stream handler"; outcome);
981
982 if let Err(e) = outcome {
983 if e.is_full() {
984 let end_msg = AnyRelayMsgOuter::new(
988 Some(stream_id),
989 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
990 );
991
992 let cell = SendRelayCell {
993 hop: hop_num,
994 early: false,
995 cell: end_msg,
996 };
997 return Ok(Some(CircuitCmd::Send(cell)));
998 } else if e.is_disconnected() {
999 debug!(
1011 "{}: Incoming stream request receiver dropped",
1012 self.unique_id
1013 );
1014 return Err(Error::CircuitClosed);
1016 } else {
1017 return Err(Error::from((into_internal!(
1021 "try_send failed unexpectedly"
1022 ))(e)));
1023 }
1024 }
1025
1026 Ok(None)
1027 }
1028
1029 #[allow(clippy::unnecessary_wraps)]
1031 fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
1032 Ok(CircuitCmd::CleanShutdown)
1034 }
1035
1036 pub(super) async fn handle_create(
1038 &mut self,
1039 recv_created: oneshot::Receiver<CreateResponse>,
1040 handshake: CircuitHandshake,
1041 params: &mut CircParameters,
1042 done: ReactorResultChannel<()>,
1043 ) -> StdResult<(), ReactorError> {
1044 let ret = match handshake {
1045 CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, params).await,
1046 CircuitHandshake::Ntor {
1047 public_key,
1048 ed_identity,
1049 } => {
1050 self.create_firsthop_ntor(recv_created, ed_identity, public_key, params)
1051 .await
1052 }
1053 CircuitHandshake::NtorV3 { public_key } => {
1054 self.create_firsthop_ntor_v3(recv_created, public_key, params)
1055 .await
1056 }
1057 };
1058 let _ = done.send(ret); self.chan_sender.flush().await?;
1063
1064 Ok(())
1065 }
1066
1067 async fn create_impl<H, W, M>(
1073 &mut self,
1074 cell_protocol: RelayCryptLayerProtocol,
1075 recvcreated: oneshot::Receiver<CreateResponse>,
1076 wrap: &W,
1077 key: &H::KeyType,
1078 params: &mut CircParameters,
1079 msg: &M,
1080 ) -> Result<()>
1081 where
1082 H: ClientHandshake + HandshakeAuxDataHandler,
1083 W: CreateHandshakeWrap,
1084 H::KeyGen: KeyGenerator,
1085 M: Borrow<H::ClientAuxData>,
1086 {
1087 let (state, msg) = {
1092 let mut rng = rand::rng();
1095 H::client1(&mut rng, key, msg)?
1096 };
1097 let create_cell = wrap.to_chanmsg(msg);
1098 trace!(
1099 "{}: Extending to hop 1 with {}",
1100 self.unique_id,
1101 create_cell.cmd()
1102 );
1103 self.send_msg(create_cell).await?;
1104
1105 let reply = recvcreated
1106 .await
1107 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1108
1109 let relay_handshake = wrap.decode_chanmsg(reply)?;
1110 let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1111
1112 H::handle_server_aux_data(params, &server_msg)?;
1113
1114 let relay_cell_format = cell_protocol.relay_cell_format();
1115 let BoxedClientLayer { fwd, back, binding } =
1116 cell_protocol.construct_client_layers(HandshakeRole::Initiator, keygen)?;
1117
1118 trace!("{}: Handshake complete; circuit created.", self.unique_id);
1119
1120 let peer_id = self.channel.target().clone();
1121
1122 self.add_hop(
1123 relay_cell_format,
1124 path::HopDetail::Relay(peer_id),
1125 fwd,
1126 back,
1127 binding,
1128 params,
1129 )?;
1130 Ok(())
1131 }
1132
1133 async fn create_firsthop_fast(
1140 &mut self,
1141 recvcreated: oneshot::Receiver<CreateResponse>,
1142 params: &mut CircParameters,
1143 ) -> Result<()> {
1144 let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1146 let wrap = CreateFastWrap;
1147 self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), params, &())
1148 .await
1149 }
1150
1151 async fn create_firsthop_ntor(
1156 &mut self,
1157 recvcreated: oneshot::Receiver<CreateResponse>,
1158 ed_identity: pk::ed25519::Ed25519Identity,
1159 pubkey: NtorPublicKey,
1160 params: &mut CircParameters,
1161 ) -> Result<()> {
1162 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1164
1165 let target = RelayIds::builder()
1167 .ed_identity(ed_identity)
1168 .rsa_identity(pubkey.id)
1169 .build()
1170 .expect("Unable to build RelayIds");
1171 self.channel.check_match(&target)?;
1172
1173 let wrap = Create2Wrap {
1174 handshake_type: HandshakeType::NTOR,
1175 };
1176 self.create_impl::<NtorClient, _, _>(
1177 relay_cell_protocol,
1178 recvcreated,
1179 &wrap,
1180 &pubkey,
1181 params,
1182 &(),
1183 )
1184 .await
1185 }
1186
1187 async fn create_firsthop_ntor_v3(
1192 &mut self,
1193 recvcreated: oneshot::Receiver<CreateResponse>,
1194 pubkey: NtorV3PublicKey,
1195 params: &mut CircParameters,
1196 ) -> Result<()> {
1197 let target = RelayIds::builder()
1199 .ed_identity(pubkey.id)
1200 .build()
1201 .expect("Unable to build RelayIds");
1202 self.channel.check_match(&target)?;
1203
1204 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1206
1207 #[allow(unused_mut)]
1210 let mut client_extensions = Vec::new();
1211
1212 if params.ccontrol.is_enabled() {
1213 cfg_if::cfg_if! {
1214 if #[cfg(feature = "flowctl-cc")] {
1215 #[cfg(not(test))]
1223 panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1224
1225 #[allow(unreachable_code)]
1226 client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1227 } else {
1228 return Err(
1229 internal!(
1230 "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1231 )
1232 .into()
1233 );
1234 }
1235 }
1236 }
1237
1238 let wrap = Create2Wrap {
1239 handshake_type: HandshakeType::NTOR_V3,
1240 };
1241
1242 self.create_impl::<NtorV3Client, _, _>(
1243 relay_cell_protocol,
1244 recvcreated,
1245 &wrap,
1246 &pubkey,
1247 params,
1248 &client_extensions,
1249 )
1250 .await
1251 }
1252
1253 pub(super) fn add_hop(
1257 &mut self,
1258 format: RelayCellFormat,
1259 peer_id: path::HopDetail,
1260 fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1261 rev: Box<dyn InboundClientLayer + 'static + Send>,
1262 binding: Option<CircuitBinding>,
1263 params: &CircParameters,
1264 ) -> StdResult<(), Bug> {
1265 let hop_num = self.hops.len();
1266 debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1267
1268 if hop_num == usize::from(u8::MAX) {
1272 return Err(internal!(
1273 "cannot add more hops to a circuit with `u8::MAX` hops"
1274 ));
1275 }
1276
1277 let hop_num = (hop_num as u8).into();
1278
1279 let hop = CircHop::new(self.unique_id, hop_num, format, params);
1280 self.hops.push(hop);
1281 self.crypto_in.add_layer(rev);
1282 self.crypto_out.add_layer(fwd);
1283 self.mutable.add_hop(peer_id, binding);
1284
1285 Ok(())
1286 }
1287
1288 #[allow(clippy::cognitive_complexity)]
1290 fn handle_meta_cell(
1291 &mut self,
1292 handlers: &mut CellHandlers,
1293 hopnum: HopNum,
1294 msg: UnparsedRelayMsg,
1295 ) -> Result<Option<CircuitCmd>> {
1296 if msg.cmd() == RelayCmd::SENDME {
1306 let sendme = msg
1307 .decode::<Sendme>()
1308 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1309 .into_msg();
1310
1311 return Ok(Some(CircuitCmd::HandleSendMe {
1312 hop: hopnum,
1313 sendme,
1314 }));
1315 }
1316 if msg.cmd() == RelayCmd::TRUNCATED {
1317 let truncated = msg
1318 .decode::<Truncated>()
1319 .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1320 .into_msg();
1321 let reason = truncated.reason();
1322 debug!(
1323 "{}: Truncated from hop {}. Reason: {} [{}]",
1324 self.unique_id,
1325 hopnum.display(),
1326 reason.human_str(),
1327 reason
1328 );
1329
1330 return Ok(Some(CircuitCmd::CleanShutdown));
1331 }
1332
1333 trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
1334
1335 #[cfg(feature = "conflux")]
1336 if matches!(
1337 msg.cmd(),
1338 RelayCmd::CONFLUX_LINK
1339 | RelayCmd::CONFLUX_LINKED
1340 | RelayCmd::CONFLUX_LINKED_ACK
1341 | RelayCmd::CONFLUX_SWITCH
1342 ) {
1343 return self.handle_conflux_msg(hopnum, msg);
1344 }
1345
1346 if self.is_conflux_pending() {
1347 warn!(
1348 "{}: received unexpected cell {msg:?} on unlinked conflux circuit",
1349 self.unique_id,
1350 );
1351 return Err(Error::CircProto(
1352 "Received unexpected cell on unlinked circuit".into(),
1353 ));
1354 }
1355
1356 if let Some(mut handler) = handlers.meta_handler.take() {
1364 if handler.expected_hop() == hopnum {
1365 let ret = handler.handle_msg(msg, self);
1367 trace!(
1368 "{}: meta handler completed with result: {:?}",
1369 self.unique_id,
1370 ret
1371 );
1372 match ret {
1373 #[cfg(feature = "send-control-msg")]
1374 Ok(MetaCellDisposition::Consumed) => {
1375 handlers.meta_handler = Some(handler);
1376 Ok(None)
1377 }
1378 Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1379 #[cfg(feature = "send-control-msg")]
1380 Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1381 Err(e) => Err(e),
1382 }
1383 } else {
1384 handlers.meta_handler = Some(handler);
1387
1388 unsupported_client_cell!(msg, hopnum)
1389 }
1390 } else {
1391 unsupported_client_cell!(msg)
1394 }
1395 }
1396
1397 pub(super) fn handle_sendme(
1399 &mut self,
1400 hopnum: HopNum,
1401 msg: Sendme,
1402 signals: CongestionSignals,
1403 ) -> Result<Option<CircuitCmd>> {
1404 let hop = self
1407 .hop_mut(hopnum)
1408 .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1409
1410 let tag = msg.into_sendme_tag().ok_or_else(||
1411 Error::CircProto("missing tag on circuit sendme".into()))?;
1414 hop.ccontrol.note_sendme_received(tag, signals)?;
1416 Ok(None)
1417 }
1418
1419 async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1431 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1432 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1434 Ok(())
1435 }
1436
1437 pub(super) fn ready_streams_iterator(&self) -> impl Stream<Item = Result<CircuitCmd>> {
1449 self.hops
1450 .iter()
1451 .enumerate()
1452 .filter_map(|(i, hop)| {
1453 if !hop.ccontrol.can_send() {
1454 return None;
1470 }
1471
1472 let hop_num = HopNum::from(i as u8);
1473 let hop_map = Arc::clone(&self.hops[i].map);
1474 Some(futures::future::poll_fn(move |cx| {
1475 let mut hop_map = hop_map.lock().expect("lock poisoned");
1482 let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
1483 return Poll::Pending;
1485 };
1486
1487 if msg.is_none() {
1488 return Poll::Ready(Ok(CircuitCmd::CloseStream {
1489 hop: hop_num,
1490 sid,
1491 behav: CloseStreamBehavior::default(),
1492 reason: streammap::TerminateReason::StreamTargetClosed,
1493 }));
1494 };
1495 let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
1496
1497 #[allow(unused)] let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
1499 panic!("Stream {sid} disappeared");
1500 };
1501
1502 debug_assert!(
1503 s.can_send(&msg),
1504 "Stream {sid} produced a message it can't send: {msg:?}"
1505 );
1506
1507 let cell = SendRelayCell {
1508 hop: hop_num,
1509 early: false,
1510 cell: AnyRelayMsgOuter::new(Some(sid), msg),
1511 };
1512 Poll::Ready(Ok(CircuitCmd::Send(cell)))
1513 }))
1514 })
1515 .collect::<FuturesUnordered<_>>()
1516 }
1517
1518 pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1522 futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1523 Poll::Ready(CongestionSignals::new(
1524 self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1525 self.chan_sender.n_queued(),
1526 ))
1527 })
1528 .await
1529 }
1530
1531 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1533 self.hops.get(Into::<usize>::into(hopnum))
1534 }
1535
1536 pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1538 self.hops.get_mut(Into::<usize>::into(hopnum))
1539 }
1540
1541 pub(super) fn begin_stream(
1543 &mut self,
1544 hop_num: HopNum,
1545 message: AnyRelayMsg,
1546 sender: StreamMpscSender<UnparsedRelayMsg>,
1547 rx: StreamMpscReceiver<AnyRelayMsg>,
1548 cmd_checker: AnyCmdChecker,
1549 ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1550 let Some(hop) = self.hop_mut(hop_num) else {
1551 return Err(internal!(
1552 "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1553 self.unique_id,
1554 ));
1555 };
1556
1557 Ok(hop.begin_stream(message, sender, rx, cmd_checker))
1558 }
1559
1560 pub(super) async fn close_stream(
1562 &mut self,
1563 hop_num: HopNum,
1564 sid: StreamId,
1565 behav: CloseStreamBehavior,
1566 reason: streammap::TerminateReason,
1567 ) -> Result<()> {
1568 if let Some(hop) = self.hop_mut(hop_num) {
1569 let res = hop.close_stream(sid, behav, reason)?;
1570 if let Some(cell) = res {
1571 self.send_relay_cell(cell).await?;
1572 }
1573 }
1574 Ok(())
1575 }
1576
1577 pub(super) fn has_streams(&self) -> bool {
1583 self.hops
1584 .iter()
1585 .any(|hop| hop.map.lock().expect("lock poisoned").n_open_streams() > 0)
1586 }
1587
1588 pub(super) fn num_hops(&self) -> u8 {
1590 self.hops
1595 .len()
1596 .try_into()
1597 .expect("`hops.len()` has more than `u8::MAX` hops")
1598 }
1599
1600 pub(super) fn has_hops(&self) -> bool {
1602 !self.hops.is_empty()
1603 }
1604
1605 pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1609 let num_hops = self.num_hops();
1610 if num_hops == 0 {
1611 return None;
1613 }
1614 Some(HopNum::from(num_hops - 1))
1615 }
1616
1617 pub(super) fn path(&self) -> Arc<path::Path> {
1621 self.mutable.path()
1622 }
1623
1624 pub(super) fn clock_skew(&self) -> ClockSkew {
1627 self.channel.clock_skew()
1628 }
1629
1630 pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1634 let hop = self.hop(hop)?;
1635 Some(hop.ccontrol.uses_stream_sendme())
1636 }
1637
1638 pub(super) fn is_conflux_pending(&self) -> bool {
1640 let Some(status) = self.conflux_status() else {
1641 return false;
1642 };
1643
1644 status != ConfluxStatus::Linked
1645 }
1646
1647 pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1651 cfg_if::cfg_if! {
1652 if #[cfg(feature = "conflux")] {
1653 self.conflux_handler
1654 .as_ref()
1655 .map(|handler| handler.status())
1656 } else {
1657 None
1658 }
1659 }
1660 }
1661
1662 #[cfg(feature = "conflux")]
1664 pub(super) fn init_rtt(&self) -> Option<Duration> {
1665 self.conflux_handler
1666 .as_ref()
1667 .map(|handler| handler.init_rtt())?
1668 }
1669}
1670
1671#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1673pub(super) enum ConfluxStatus {
1674 Unlinked,
1676 Pending,
1678 Linked,
1680}
1681
1682fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1686 let cmd = msg.cmd();
1687 let streamid = msg.stream_id();
1688 if !cmd.accepts_streamid_val(streamid) {
1689 return Err(Error::CircProto(format!(
1690 "Invalid stream ID {} for relay command {}",
1691 sv(StreamId::get_or_zero(streamid)),
1692 msg.cmd()
1693 )));
1694 }
1695
1696 Ok(streamid)
1697}
1698
1699impl Drop for Circuit {
1700 fn drop(&mut self) {
1701 let _ = self.channel.close_circuit(self.channel_id);
1702 }
1703}
1704
1705impl CircHop {
1706 pub(super) fn new(
1708 unique_id: UniqId,
1709 hop_num: HopNum,
1710 relay_format: RelayCellFormat,
1711 params: &CircParameters,
1712 ) -> Self {
1713 CircHop {
1714 unique_id,
1715 hop_num,
1716 map: Arc::new(Mutex::new(streammap::StreamMap::new())),
1717 ccontrol: CongestionControl::new(¶ms.ccontrol),
1718 inbound: RelayCellDecoder::new(relay_format),
1719 relay_format,
1720 }
1721 }
1722
1723 pub(crate) fn begin_stream(
1726 &mut self,
1727 message: AnyRelayMsg,
1728 sender: StreamMpscSender<UnparsedRelayMsg>,
1729 rx: StreamMpscReceiver<AnyRelayMsg>,
1730 cmd_checker: AnyCmdChecker,
1731 ) -> Result<(SendRelayCell, StreamId)> {
1732 let flow_ctrl = self.build_send_flow_ctrl();
1733 let r =
1734 self.map
1735 .lock()
1736 .expect("lock poisoned")
1737 .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
1738 let cell = AnyRelayMsgOuter::new(Some(r), message);
1739 Ok((
1740 SendRelayCell {
1741 hop: self.hop_num,
1742 early: false,
1743 cell,
1744 },
1745 r,
1746 ))
1747 }
1748
1749 fn close_stream(
1756 &mut self,
1757 id: StreamId,
1758 message: CloseStreamBehavior,
1759 why: streammap::TerminateReason,
1760 ) -> Result<Option<SendRelayCell>> {
1761 let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
1762 trace!(
1763 "{}: Ending stream {}; should_send_end={:?}",
1764 self.unique_id,
1765 id,
1766 should_send_end
1767 );
1768 if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
1771 (should_send_end, message)
1772 {
1773 let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
1774 let cell = SendRelayCell {
1775 hop: self.hop_num,
1776 early: false,
1777 cell: end_cell,
1778 };
1779
1780 return Ok(Some(cell));
1781 }
1782 Ok(None)
1783 }
1784
1785 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
1790 self.relay_format
1791 }
1792
1793 fn build_send_flow_ctrl(&self) -> StreamSendFlowControl {
1795 if self.ccontrol.uses_stream_sendme() {
1796 let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
1797 StreamSendFlowControl::new_window_based(window)
1798 } else {
1799 StreamSendFlowControl::new_xon_xoff_based()
1800 }
1801 }
1802
1803 #[cfg(test)]
1805 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
1806 self.ccontrol.send_window_and_expected_tags()
1807 }
1808
1809 pub(super) fn n_open_streams(&self) -> usize {
1814 self.map.lock().expect("lock poisoned").n_open_streams()
1815 }
1816
1817 pub(crate) fn ccontrol(&self) -> &CongestionControl {
1819 &self.ccontrol
1820 }
1821}