1pub(crate) mod circhop;
4pub(super) mod create;
5pub(super) mod extender;
6
7use crate::channel::{Channel, ChannelSender};
8use crate::circuit::HopSettings;
9use crate::congestion::sendme;
10use crate::congestion::CongestionSignals;
11use crate::crypto::binding::CircuitBinding;
12use crate::crypto::cell::{
13 HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
14 RelayCellBody,
15};
16use crate::crypto::handshake::fast::CreateFastClient;
17use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
18use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
20use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
21use crate::stream::{AnyCmdChecker, StreamStatus};
22use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
23use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
24use crate::tunnel::circuit::path;
25use crate::tunnel::circuit::unique_id::UniqId;
26use crate::tunnel::circuit::{
27 CircuitRxReceiver, MutableState, StreamMpscReceiver, StreamMpscSender,
28};
29use crate::tunnel::handshake::RelayCryptLayerProtocol;
30use crate::tunnel::reactor::MetaCellDisposition;
31use crate::tunnel::streammap;
32use crate::tunnel::TunnelScopedCircId;
33use crate::util::err::ReactorError;
34use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
35use crate::util::SinkExt as _;
36use crate::{ClockSkew, Error, Result};
37
38use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
39use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
40use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
41use tor_cell::chancell::{BoxedCellBody, ChanMsg};
42use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
43use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
44use tor_cell::relaycell::{
45 AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
46};
47use tor_error::{internal, Bug};
48use tor_linkspec::RelayIds;
49use tor_llcrypto::pk;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52use futures::{SinkExt as _, Stream};
53use oneshot_fused_workaround as oneshot;
54use safelog::sensitive as sv;
55use tracing::{debug, trace, warn};
56
57use super::{
58 CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
59};
60
61use std::borrow::Borrow;
62use std::pin::Pin;
63use std::result::Result as StdResult;
64use std::sync::Arc;
65use std::task::Poll;
66use std::time::{Duration, SystemTime};
67
68use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
69use extender::HandshakeAuxDataHandler;
70
71#[cfg(feature = "hs-service")]
72use {
73 crate::stream::{DataCmdChecker, IncomingStreamRequest},
74 tor_cell::relaycell::msg::Begin,
75};
76
77#[cfg(feature = "conflux")]
78use {
79 super::conflux::ConfluxMsgHandler,
80 super::conflux::{ConfluxAction, OooRelayMsg},
81 crate::tunnel::reactor::RemoveLegReason,
82 crate::tunnel::TunnelId,
83};
84
85pub(super) use circhop::{CircHop, CircHopList};
86
87pub(super) const SEND_WINDOW_INIT: u16 = 500;
89pub(crate) const RECV_WINDOW_INIT: u16 = 500;
91pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
97
98pub(crate) struct Circuit {
103 channel: Arc<Channel>,
105 pub(super) chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
110 pub(super) input: CircuitRxReceiver,
115 crypto_in: InboundClientCrypt,
119 crypto_out: OutboundClientCrypt,
121 hops: CircHopList,
123 mutable: Arc<MutableState>,
126 channel_id: CircId,
128 unique_id: TunnelScopedCircId,
130 #[cfg(feature = "conflux")]
135 conflux_handler: Option<ConfluxMsgHandler>,
136 #[allow(dead_code)] memquota: CircuitAccount,
139}
140
141#[derive(Debug)]
149pub(super) enum CircuitCmd {
150 Send(SendRelayCell),
152 HandleSendMe {
154 hop: HopNum,
156 sendme: Sendme,
158 },
159 CloseStream {
161 hop: HopNum,
163 sid: StreamId,
165 behav: CloseStreamBehavior,
167 reason: streammap::TerminateReason,
169 },
170 #[cfg(feature = "conflux")]
176 ConfluxRemove(RemoveLegReason),
177 #[cfg(feature = "conflux")]
183 ConfluxHandshakeComplete(SendRelayCell),
184 CleanShutdown,
186 #[cfg(feature = "conflux")]
188 Enqueue(OooRelayMsg),
189}
190
191macro_rules! unsupported_client_cell {
198 ($msg:expr) => {{
199 unsupported_client_cell!(@ $msg, "")
200 }};
201
202 ($msg:expr, $hopnum:expr) => {{
203 let hop: HopNum = $hopnum;
204 let hop_display = format!(" from hop {}", hop.display());
205 unsupported_client_cell!(@ $msg, hop_display)
206 }};
207
208 (@ $msg:expr, $hopnum_display:expr) => {
209 Err(crate::Error::CircProto(format!(
210 "Unexpected {} cell{} on client circuit",
211 $msg.cmd(),
212 $hopnum_display,
213 )))
214 };
215}
216
217pub(super) use unsupported_client_cell;
218
219impl Circuit {
220 pub(super) fn new(
222 channel: Arc<Channel>,
223 channel_id: CircId,
224 unique_id: TunnelScopedCircId,
225 input: CircuitRxReceiver,
226 memquota: CircuitAccount,
227 mutable: Arc<MutableState>,
228 ) -> Self {
229 let chan_sender = SometimesUnboundedSink::new(channel.sender());
230
231 let crypto_out = OutboundClientCrypt::new();
232 Circuit {
233 channel,
234 chan_sender,
235 input,
236 crypto_in: InboundClientCrypt::new(),
237 hops: CircHopList::default(),
238 unique_id,
239 channel_id,
240 crypto_out,
241 mutable,
242 #[cfg(feature = "conflux")]
243 conflux_handler: None,
244 memquota,
245 }
246 }
247
248 pub(super) fn unique_id(&self) -> UniqId {
250 self.unique_id.unique_id()
251 }
252
253 pub(super) fn mutable(&self) -> &Arc<MutableState> {
255 &self.mutable
256 }
257
258 #[cfg(feature = "conflux")]
263 pub(super) fn add_to_conflux_tunnel(
264 &mut self,
265 tunnel_id: TunnelId,
266 conflux_handler: ConfluxMsgHandler,
267 ) {
268 self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
269 self.conflux_handler = Some(conflux_handler);
270 }
271
272 #[cfg(feature = "conflux")]
277 pub(super) async fn begin_conflux_link(
278 &mut self,
279 hop: HopNum,
280 cell: AnyRelayMsgOuter,
281 runtime: &tor_rtcompat::DynTimeProvider,
282 ) -> Result<()> {
283 use tor_rtcompat::SleepProvider as _;
284
285 if self.conflux_handler.is_none() {
286 return Err(internal!(
287 "tried to send LINK cell before installing a ConfluxMsgHandler?!"
288 )
289 .into());
290 }
291
292 let cell = SendRelayCell {
293 hop,
294 early: false,
295 cell,
296 };
297 self.send_relay_cell(cell).await?;
298
299 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
300 return Err(internal!("ConfluxMsgHandler disappeared?!").into());
301 };
302
303 Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
304 }
305
306 pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
310 cfg_if::cfg_if! {
311 if #[cfg(feature = "conflux")] {
312 self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
313 } else {
314 None
315 }
316 }
317 }
318
319 #[cfg(test)]
321 pub(super) fn handle_add_fake_hop(
322 &mut self,
323 format: RelayCellFormat,
324 fwd_lasthop: bool,
325 rev_lasthop: bool,
326 dummy_peer_id: path::HopDetail,
327 params: &crate::circuit::CircParameters,
328 done: ReactorResultChannel<()>,
329 ) {
330 use tor_protover::{named, Protocols};
331
332 use crate::tunnel::circuit::test::DummyCrypto;
333
334 let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
335 let rev = Box::new(DummyCrypto::new(rev_lasthop));
336 let binding = None;
337 let settings = HopSettings::from_params_and_caps(
338 params,
339 &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
340 )
341 .expect("Can't construct HopSettings");
342 self.add_hop(format, dummy_peer_id, fwd, rev, binding, &settings)
343 .expect("could not add hop to circuit");
344 let _ = done.send(Ok(()));
345 }
346
347 fn encode_relay_cell(
351 crypto_out: &mut OutboundClientCrypt,
352 relay_format: RelayCellFormat,
353 hop: HopNum,
354 early: bool,
355 msg: AnyRelayMsgOuter,
356 ) -> Result<(AnyChanMsg, SendmeTag)> {
357 let mut body: RelayCellBody = msg
358 .encode(relay_format, &mut rand::rng())
359 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
360 .into();
361 let cmd = if early {
362 ChanCmd::RELAY_EARLY
363 } else {
364 ChanCmd::RELAY
365 };
366 let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
367 let msg = Relay::from(BoxedCellBody::from(body));
368 let msg = if early {
369 AnyChanMsg::RelayEarly(msg.into())
370 } else {
371 AnyChanMsg::Relay(msg)
372 };
373
374 Ok((msg, tag))
375 }
376
377 pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
388 let SendRelayCell {
389 hop,
390 early,
391 cell: msg,
392 } = msg;
393
394 let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
395 if !is_conflux_link && self.is_conflux_pending() {
396 return Err(internal!("tried to send cell on unlinked circuit").into());
399 }
400
401 trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
402
403 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
404 let stream_id = msg.stream_id();
405 let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
406
407 if c_t_w {
409 if let Some(stream_id) = stream_id {
410 circhop.take_capacity_to_send(stream_id, msg.msg())?;
411 }
412 }
413
414 let relay_cmd = msg.cmd();
418
419 let (msg, tag) = Self::encode_relay_cell(
422 &mut self.crypto_out,
423 circhop.relay_format(),
424 hop,
425 early,
426 msg,
427 )?;
428 if c_t_w {
431 circhop.ccontrol_mut().note_data_sent(&tag)?;
432 }
433
434 let cell = AnyChanCell::new(Some(self.channel_id), msg);
435 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
436
437 #[cfg(feature = "conflux")]
438 if let Some(conflux) = self.conflux_handler.as_mut() {
439 conflux.note_cell_sent(relay_cmd);
440 }
441
442 Ok(())
443 }
444
445 pub(super) fn handle_cell(
458 &mut self,
459 handlers: &mut CellHandlers,
460 leg: UniqId,
461 cell: ClientCircChanMsg,
462 ) -> Result<Vec<CircuitCmd>> {
463 trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
464 use ClientCircChanMsg::*;
465 match cell {
466 Relay(r) => self.handle_relay_cell(handlers, leg, r),
467 Destroy(d) => {
468 let reason = d.reason();
469 debug!(
470 circ_id = %self.unique_id,
471 "Received DESTROY cell. Reason: {} [{}]",
472 reason.human_str(),
473 reason
474 );
475
476 self.handle_destroy_cell().map(|c| vec![c])
477 }
478 }
479 }
480
481 fn decode_relay_cell(
484 &mut self,
485 cell: Relay,
486 ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
487 let cmd = cell.cmd();
489 let mut body = cell.into_relay_body().into();
490
491 let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
494
495 let decode_res = self
497 .hop_mut(hopnum)
498 .ok_or_else(|| {
499 Error::from(internal!(
500 "Trying to decode cell from nonexistent hop {:?}",
501 hopnum
502 ))
503 })?
504 .decode(body.into())?;
505
506 Ok((hopnum, tag, decode_res))
507 }
508
509 fn handle_relay_cell(
511 &mut self,
512 handlers: &mut CellHandlers,
513 leg: UniqId,
514 cell: Relay,
515 ) -> Result<Vec<CircuitCmd>> {
516 let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
517
518 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
519
520 let send_circ_sendme = if c_t_w {
523 self.hop_mut(hopnum)
524 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
525 .ccontrol_mut()
526 .note_data_received()?
527 } else {
528 false
529 };
530
531 let mut circ_cmds = vec![];
532 if send_circ_sendme {
534 let sendme = Sendme::from(tag);
539 let cell = AnyRelayMsgOuter::new(None, sendme.into());
540 circ_cmds.push(CircuitCmd::Send(SendRelayCell {
541 hop: hopnum,
542 early: false,
543 cell,
544 }));
545
546 self.hop_mut(hopnum)
548 .ok_or_else(|| {
549 Error::from(internal!(
550 "Trying to send SENDME to nonexistent hop {:?}",
551 hopnum
552 ))
553 })?
554 .ccontrol_mut()
555 .note_sendme_sent()?;
556 }
557
558 let (mut msgs, incomplete) = decode_res.into_parts();
559 while let Some(msg) = msgs.next() {
560 let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
561
562 match msg_status {
563 None => continue,
564 Some(msg @ CircuitCmd::CleanShutdown) => {
565 for m in msgs {
566 debug!(
567 "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
568 id = self.unique_id
569 );
570 }
571 if let Some(incomplete) = incomplete {
572 debug!(
573 "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
574 incomplete,
575 id=self.unique_id,
576 );
577 }
578 circ_cmds.push(msg);
579 return Ok(circ_cmds);
580 }
581 Some(msg) => {
582 circ_cmds.push(msg);
583 }
584 }
585 }
586
587 Ok(circ_cmds)
588 }
589
590 fn handle_relay_msg(
592 &mut self,
593 handlers: &mut CellHandlers,
594 hopnum: HopNum,
595 leg: UniqId,
596 cell_counts_toward_windows: bool,
597 msg: UnparsedRelayMsg,
598 ) -> Result<Option<CircuitCmd>> {
599 let streamid = msg_streamid(&msg)?;
602
603 let Some(streamid) = streamid else {
606 return self.handle_meta_cell(handlers, hopnum, msg);
607 };
608
609 #[cfg(feature = "conflux")]
610 let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
611 match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
612 ConfluxAction::Deliver(msg) => {
613 msg
620 }
621 ConfluxAction::Enqueue(msg) => {
622 return Ok(Some(CircuitCmd::Enqueue(msg)));
624 }
625 }
626 } else {
627 msg
630 };
631
632 self.handle_in_order_relay_msg(
633 handlers,
634 hopnum,
635 leg,
636 cell_counts_toward_windows,
637 streamid,
638 msg,
639 )
640 }
641
642 pub(super) fn handle_in_order_relay_msg(
644 &mut self,
645 handlers: &mut CellHandlers,
646 hopnum: HopNum,
647 leg: UniqId,
648 cell_counts_toward_windows: bool,
649 streamid: StreamId,
650 msg: UnparsedRelayMsg,
651 ) -> Result<Option<CircuitCmd>> {
652 #[cfg(feature = "conflux")]
653 if let Some(conflux) = self.conflux_handler.as_mut() {
654 conflux.inc_last_seq_delivered(&msg);
655 }
656
657 let hop = self
658 .hop_mut(hopnum)
659 .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
660 let res = hop.handle_msg(cell_counts_toward_windows, streamid, msg)?;
661
662 if let Some(msg) = res {
663 cfg_if::cfg_if! {
664 if #[cfg(feature = "hs-service")] {
665 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
666 } else {
667 return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
668 }
669 }
670 }
671
672 Ok(None)
673 }
674
675 #[cfg(feature = "conflux")]
683 fn handle_conflux_msg(
684 &mut self,
685 hop: HopNum,
686 msg: UnparsedRelayMsg,
687 ) -> Result<Option<CircuitCmd>> {
688 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
689 return Err(Error::CircProto(format!(
692 "Received {} cell from hop {} on non-conflux client circuit?!",
693 msg.cmd(),
694 hop.display(),
695 )));
696 };
697
698 Ok(conflux_handler.handle_conflux_msg(msg, hop))
699 }
700
701 #[cfg(feature = "conflux")]
705 pub(super) fn last_seq_sent(&self) -> Result<u64> {
706 let handler = self
707 .conflux_handler
708 .as_ref()
709 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
710
711 Ok(handler.last_seq_sent())
712 }
713
714 #[cfg(feature = "conflux")]
718 pub(super) fn last_seq_recv(&self) -> Result<u64> {
719 let handler = self
720 .conflux_handler
721 .as_ref()
722 .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
723
724 Ok(handler.last_seq_recv())
725 }
726
727 #[cfg(feature = "hs-service")]
731 fn handle_incoming_stream_request(
732 &mut self,
733 handlers: &mut CellHandlers,
734 msg: UnparsedRelayMsg,
735 stream_id: StreamId,
736 hop_num: HopNum,
737 leg: UniqId,
738 ) -> Result<Option<CircuitCmd>> {
739 use super::syncview::ClientCircSyncView;
740 use tor_cell::relaycell::msg::EndReason;
741 use tor_error::into_internal;
742 use tor_log_ratelim::log_ratelim;
743
744 use crate::{circuit::CIRCUIT_BUFFER_SIZE, tunnel::reactor::StreamReqInfo};
745
746 let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
749 return Err(Error::CircProto(
750 "Cannot handle BEGIN cells on this circuit".into(),
751 ));
752 };
753
754 if hop_num != handler.hop_num {
755 return Err(Error::CircProto(format!(
756 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
757 handler.hop_num.display(),
758 msg.cmd(),
759 hop_num.display()
760 )));
761 }
762
763 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
764
765 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
773
774 if message_closes_stream {
775 hop.ending_msg_received(stream_id)?;
776
777 return Ok(None);
778 }
779
780 let begin = msg
781 .decode::<Begin>()
782 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
783 .into_msg();
784
785 let req = IncomingStreamRequest::Begin(begin);
786
787 {
788 use crate::stream::IncomingStreamRequestDisposition::*;
789
790 let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
791 let view = ClientCircSyncView::new(&self.hops);
797
798 match handler.filter.as_mut().disposition(&ctx, &view)? {
799 Accept => {}
800 CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
801 RejectRequest(end) => {
802 let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
803 let cell = SendRelayCell {
804 hop: hop_num,
805 early: false,
806 cell: end_msg,
807 };
808 return Ok(Some(CircuitCmd::Send(cell)));
809 }
810 }
811 }
812
813 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
816 let relay_cell_format = hop.relay_format();
817
818 let memquota = StreamAccount::new(&self.memquota)?;
819
820 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
821 self.chan_sender.as_inner().time_provider().clone(),
822 memquota.as_raw_account(),
823 )?;
824 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
825 self.chan_sender.as_inner().time_provider().clone(),
826 memquota.as_raw_account(),
827 )?;
828
829 let cmd_checker = DataCmdChecker::new_connected();
830 hop.add_ent_with_id(sender, msg_rx, stream_id, cmd_checker)?;
831
832 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
833 req,
834 stream_id,
835 hop_num,
836 leg,
837 msg_tx,
838 receiver,
839 memquota,
840 relay_cell_format,
841 });
842
843 log_ratelim!("Delivering message to incoming stream handler"; outcome);
844
845 if let Err(e) = outcome {
846 if e.is_full() {
847 let end_msg = AnyRelayMsgOuter::new(
851 Some(stream_id),
852 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
853 );
854
855 let cell = SendRelayCell {
856 hop: hop_num,
857 early: false,
858 cell: end_msg,
859 };
860 return Ok(Some(CircuitCmd::Send(cell)));
861 } else if e.is_disconnected() {
862 debug!(
874 circ_id = %self.unique_id,
875 "Incoming stream request receiver dropped",
876 );
877 return Err(Error::CircuitClosed);
879 } else {
880 return Err(Error::from((into_internal!(
884 "try_send failed unexpectedly"
885 ))(e)));
886 }
887 }
888
889 Ok(None)
890 }
891
892 #[allow(clippy::unnecessary_wraps)]
894 fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
895 Ok(CircuitCmd::CleanShutdown)
897 }
898
899 pub(super) async fn handle_create(
901 &mut self,
902 recv_created: oneshot::Receiver<CreateResponse>,
903 handshake: CircuitHandshake,
904 settings: HopSettings,
905 done: ReactorResultChannel<()>,
906 ) -> StdResult<(), ReactorError> {
907 let ret = match handshake {
908 CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
909 CircuitHandshake::Ntor {
910 public_key,
911 ed_identity,
912 } => {
913 self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
914 .await
915 }
916 CircuitHandshake::NtorV3 { public_key } => {
917 self.create_firsthop_ntor_v3(recv_created, public_key, settings)
918 .await
919 }
920 };
921 let _ = done.send(ret); self.chan_sender.flush().await?;
926
927 Ok(())
928 }
929
930 async fn create_impl<H, W, M>(
936 &mut self,
937 cell_protocol: RelayCryptLayerProtocol,
938 recvcreated: oneshot::Receiver<CreateResponse>,
939 wrap: &W,
940 key: &H::KeyType,
941 mut settings: HopSettings,
942 msg: &M,
943 ) -> Result<()>
944 where
945 H: ClientHandshake + HandshakeAuxDataHandler,
946 W: CreateHandshakeWrap,
947 H::KeyGen: KeyGenerator,
948 M: Borrow<H::ClientAuxData>,
949 {
950 let (state, msg) = {
955 let mut rng = rand::rng();
958 H::client1(&mut rng, key, msg)?
959 };
960 let create_cell = wrap.to_chanmsg(msg);
961 trace!(
962 circ_id = %self.unique_id,
963 create = %create_cell.cmd(),
964 "Extending to hop 1",
965 );
966 self.send_msg(create_cell).await?;
967
968 let reply = recvcreated
969 .await
970 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
971
972 let relay_handshake = wrap.decode_chanmsg(reply)?;
973 let (server_msg, keygen) = H::client2(state, relay_handshake)?;
974
975 H::handle_server_aux_data(&mut settings, &server_msg)?;
976
977 let relay_cell_format = cell_protocol.relay_cell_format();
978 let BoxedClientLayer { fwd, back, binding } =
979 cell_protocol.construct_client_layers(HandshakeRole::Initiator, keygen)?;
980
981 trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
982
983 let peer_id = self.channel.target().clone();
984
985 self.add_hop(
986 relay_cell_format,
987 path::HopDetail::Relay(peer_id),
988 fwd,
989 back,
990 binding,
991 &settings,
992 )?;
993 Ok(())
994 }
995
996 async fn create_firsthop_fast(
1003 &mut self,
1004 recvcreated: oneshot::Receiver<CreateResponse>,
1005 settings: HopSettings,
1006 ) -> Result<()> {
1007 let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1009 let wrap = CreateFastWrap;
1010 self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), settings, &())
1011 .await
1012 }
1013
1014 async fn create_firsthop_ntor(
1019 &mut self,
1020 recvcreated: oneshot::Receiver<CreateResponse>,
1021 ed_identity: pk::ed25519::Ed25519Identity,
1022 pubkey: NtorPublicKey,
1023 settings: HopSettings,
1024 ) -> Result<()> {
1025 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1027
1028 let target = RelayIds::builder()
1030 .ed_identity(ed_identity)
1031 .rsa_identity(pubkey.id)
1032 .build()
1033 .expect("Unable to build RelayIds");
1034 self.channel.check_match(&target)?;
1035
1036 let wrap = Create2Wrap {
1037 handshake_type: HandshakeType::NTOR,
1038 };
1039 self.create_impl::<NtorClient, _, _>(
1040 relay_cell_protocol,
1041 recvcreated,
1042 &wrap,
1043 &pubkey,
1044 settings,
1045 &(),
1046 )
1047 .await
1048 }
1049
1050 async fn create_firsthop_ntor_v3(
1055 &mut self,
1056 recvcreated: oneshot::Receiver<CreateResponse>,
1057 pubkey: NtorV3PublicKey,
1058 settings: HopSettings,
1059 ) -> Result<()> {
1060 let target = RelayIds::builder()
1062 .ed_identity(pubkey.id)
1063 .build()
1064 .expect("Unable to build RelayIds");
1065 self.channel.check_match(&target)?;
1066
1067 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1069
1070 let client_extensions = circ_extensions_from_settings(&settings)?;
1072 let wrap = Create2Wrap {
1073 handshake_type: HandshakeType::NTOR_V3,
1074 };
1075
1076 self.create_impl::<NtorV3Client, _, _>(
1077 relay_cell_protocol,
1078 recvcreated,
1079 &wrap,
1080 &pubkey,
1081 settings,
1082 &client_extensions,
1083 )
1084 .await
1085 }
1086
1087 pub(super) fn add_hop(
1091 &mut self,
1092 format: RelayCellFormat,
1093 peer_id: path::HopDetail,
1094 fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1095 rev: Box<dyn InboundClientLayer + 'static + Send>,
1096 binding: Option<CircuitBinding>,
1097 settings: &HopSettings,
1098 ) -> StdResult<(), Bug> {
1099 let hop_num = self.hops.len();
1100 debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1101
1102 if hop_num == usize::from(u8::MAX) {
1106 return Err(internal!(
1107 "cannot add more hops to a circuit with `u8::MAX` hops"
1108 ));
1109 }
1110
1111 let hop_num = (hop_num as u8).into();
1112
1113 let hop = CircHop::new(self.unique_id, hop_num, format, settings);
1114 self.hops.push(hop);
1115 self.crypto_in.add_layer(rev);
1116 self.crypto_out.add_layer(fwd);
1117 self.mutable.add_hop(peer_id, binding);
1118
1119 Ok(())
1120 }
1121
1122 #[allow(clippy::cognitive_complexity)]
1124 fn handle_meta_cell(
1125 &mut self,
1126 handlers: &mut CellHandlers,
1127 hopnum: HopNum,
1128 msg: UnparsedRelayMsg,
1129 ) -> Result<Option<CircuitCmd>> {
1130 if msg.cmd() == RelayCmd::SENDME {
1140 let sendme = msg
1141 .decode::<Sendme>()
1142 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1143 .into_msg();
1144
1145 return Ok(Some(CircuitCmd::HandleSendMe {
1146 hop: hopnum,
1147 sendme,
1148 }));
1149 }
1150 if msg.cmd() == RelayCmd::TRUNCATED {
1151 let truncated = msg
1152 .decode::<Truncated>()
1153 .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1154 .into_msg();
1155 let reason = truncated.reason();
1156 debug!(
1157 circ_id = %self.unique_id,
1158 "Truncated from hop {}. Reason: {} [{}]",
1159 hopnum.display(),
1160 reason.human_str(),
1161 reason
1162 );
1163
1164 return Ok(Some(CircuitCmd::CleanShutdown));
1165 }
1166
1167 trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1168
1169 #[cfg(feature = "conflux")]
1170 if matches!(
1171 msg.cmd(),
1172 RelayCmd::CONFLUX_LINK
1173 | RelayCmd::CONFLUX_LINKED
1174 | RelayCmd::CONFLUX_LINKED_ACK
1175 | RelayCmd::CONFLUX_SWITCH
1176 ) {
1177 return self.handle_conflux_msg(hopnum, msg);
1178 }
1179
1180 if self.is_conflux_pending() {
1181 warn!(
1182 circ_id = %self.unique_id,
1183 "received unexpected cell {msg:?} on unlinked conflux circuit",
1184 );
1185 return Err(Error::CircProto(
1186 "Received unexpected cell on unlinked circuit".into(),
1187 ));
1188 }
1189
1190 if let Some(mut handler) = handlers.meta_handler.take() {
1198 if handler.expected_hop() == hopnum {
1199 let ret = handler.handle_msg(msg, self);
1201 trace!(
1202 circ_id = %self.unique_id,
1203 result = ?ret,
1204 "meta handler completed",
1205 );
1206 match ret {
1207 #[cfg(feature = "send-control-msg")]
1208 Ok(MetaCellDisposition::Consumed) => {
1209 handlers.meta_handler = Some(handler);
1210 Ok(None)
1211 }
1212 Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1213 #[cfg(feature = "send-control-msg")]
1214 Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1215 Err(e) => Err(e),
1216 }
1217 } else {
1218 handlers.meta_handler = Some(handler);
1221
1222 unsupported_client_cell!(msg, hopnum)
1223 }
1224 } else {
1225 unsupported_client_cell!(msg)
1228 }
1229 }
1230
1231 pub(super) fn handle_sendme(
1233 &mut self,
1234 hopnum: HopNum,
1235 msg: Sendme,
1236 signals: CongestionSignals,
1237 ) -> Result<Option<CircuitCmd>> {
1238 let hop = self
1241 .hop_mut(hopnum)
1242 .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1243
1244 let tag = msg.into_sendme_tag().ok_or_else(||
1245 Error::CircProto("missing tag on circuit sendme".into()))?;
1248 hop.ccontrol_mut().note_sendme_received(tag, signals)?;
1250 Ok(None)
1251 }
1252
1253 async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1265 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1266 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1268 Ok(())
1269 }
1270
1271 pub(super) fn ready_streams_iterator(&self) -> impl Stream<Item = Result<CircuitCmd>> {
1283 self.hops.ready_streams_iterator()
1284 }
1285
1286 pub(super) async fn congestion_signals(&mut self) -> CongestionSignals {
1290 futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1291 Poll::Ready(CongestionSignals::new(
1292 self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1293 self.chan_sender.n_queued(),
1294 ))
1295 })
1296 .await
1297 }
1298
1299 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1301 self.hops.hop(hopnum)
1302 }
1303
1304 pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1306 self.hops.get_mut(hopnum)
1307 }
1308
1309 pub(super) fn begin_stream(
1311 &mut self,
1312 hop_num: HopNum,
1313 message: AnyRelayMsg,
1314 sender: StreamMpscSender<UnparsedRelayMsg>,
1315 rx: StreamMpscReceiver<AnyRelayMsg>,
1316 cmd_checker: AnyCmdChecker,
1317 ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1318 let Some(hop) = self.hop_mut(hop_num) else {
1319 return Err(internal!(
1320 "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1321 self.unique_id,
1322 ));
1323 };
1324
1325 Ok(hop.begin_stream(message, sender, rx, cmd_checker))
1326 }
1327
1328 pub(super) async fn close_stream(
1330 &mut self,
1331 hop_num: HopNum,
1332 sid: StreamId,
1333 behav: CloseStreamBehavior,
1334 reason: streammap::TerminateReason,
1335 ) -> Result<()> {
1336 if let Some(hop) = self.hop_mut(hop_num) {
1337 let res = hop.close_stream(sid, behav, reason)?;
1338 if let Some(cell) = res {
1339 self.send_relay_cell(cell).await?;
1340 }
1341 }
1342 Ok(())
1343 }
1344
1345 pub(super) fn has_streams(&self) -> bool {
1351 self.hops.has_streams()
1352 }
1353
1354 pub(super) fn num_hops(&self) -> u8 {
1356 self.hops
1361 .len()
1362 .try_into()
1363 .expect("`hops.len()` has more than `u8::MAX` hops")
1364 }
1365
1366 pub(super) fn has_hops(&self) -> bool {
1368 !self.hops.is_empty()
1369 }
1370
1371 pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1375 let num_hops = self.num_hops();
1376 if num_hops == 0 {
1377 return None;
1379 }
1380 Some(HopNum::from(num_hops - 1))
1381 }
1382
1383 pub(super) fn path(&self) -> Arc<path::Path> {
1387 self.mutable.path()
1388 }
1389
1390 pub(super) fn clock_skew(&self) -> ClockSkew {
1393 self.channel.clock_skew()
1394 }
1395
1396 pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1400 let hop = self.hop(hop)?;
1401 Some(hop.ccontrol().uses_stream_sendme())
1402 }
1403
1404 pub(super) fn is_conflux_pending(&self) -> bool {
1406 let Some(status) = self.conflux_status() else {
1407 return false;
1408 };
1409
1410 status != ConfluxStatus::Linked
1411 }
1412
1413 pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1417 cfg_if::cfg_if! {
1418 if #[cfg(feature = "conflux")] {
1419 self.conflux_handler
1420 .as_ref()
1421 .map(|handler| handler.status())
1422 } else {
1423 None
1424 }
1425 }
1426 }
1427
1428 #[cfg(feature = "conflux")]
1430 pub(super) fn init_rtt(&self) -> Option<Duration> {
1431 self.conflux_handler
1432 .as_ref()
1433 .map(|handler| handler.init_rtt())?
1434 }
1435}
1436
1437#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1439pub(super) enum ConfluxStatus {
1440 Unlinked,
1442 Pending,
1444 Linked,
1446}
1447
1448fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
1452 let cmd = msg.cmd();
1453 let streamid = msg.stream_id();
1454 if !cmd.accepts_streamid_val(streamid) {
1455 return Err(Error::CircProto(format!(
1456 "Invalid stream ID {} for relay command {}",
1457 sv(StreamId::get_or_zero(streamid)),
1458 msg.cmd()
1459 )));
1460 }
1461
1462 Ok(streamid)
1463}
1464
1465impl Drop for Circuit {
1466 fn drop(&mut self) {
1467 let _ = self.channel.close_circuit(self.channel_id);
1468 }
1469}
1470
1471#[allow(clippy::unnecessary_wraps)]
1474pub(super) fn circ_extensions_from_settings(params: &HopSettings) -> Result<Vec<CircRequestExt>> {
1475 #[allow(unused_mut)]
1477 let mut client_extensions = Vec::new();
1478
1479 if params.ccontrol.is_enabled() {
1480 cfg_if::cfg_if! {
1481 if #[cfg(feature = "flowctl-cc")] {
1482 #[cfg(not(test))]
1490 panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
1491
1492 #[allow(unreachable_code)]
1493 client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
1494 } else {
1495 return Err(
1496 tor_error::internal!(
1497 "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
1498 )
1499 .into()
1500 );
1501 }
1502 }
1503 }
1504
1505 Ok(client_extensions)
1506}