tor_proto/channel.rs
1//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2//!
3//! Channels form the basis of the rest of the Tor protocol: they are
4//! the only way for two Tor instances to talk.
5//!
6//! Channels are not useful directly for application requests: after
7//! making a channel, it needs to get used to build circuits, and the
8//! circuits are used to anonymize streams. The streams are the
9//! objects corresponding to directory requests.
10//!
11//! In general, you shouldn't try to manage channels on your own;
12//! use the `tor-chanmgr` crate instead.
13//!
14//! To launch a channel:
15//!
16//! * Create a TLS connection as an object that implements AsyncRead +
17//! AsyncWrite + StreamOps, and pass it to a [ChannelBuilder]. This will
18//! yield an [handshake::OutboundClientHandshake] that represents
19//! the state of the handshake.
20//! * Call [handshake::OutboundClientHandshake::connect] on the result
21//! to negotiate the rest of the handshake. This will verify
22//! syntactic correctness of the handshake, but not its cryptographic
23//! integrity.
24//! * Call [handshake::UnverifiedChannel::check] on the result. This
25//! finishes the cryptographic checks.
26//! * Call [handshake::VerifiedChannel::finish] on the result. This
27//! completes the handshake and produces an open channel and Reactor.
28//! * Launch an asynchronous task to call the reactor's run() method.
29//!
30//! One you have a running channel, you can create circuits on it with
31//! its [Channel::new_circ] method. See
32//! [crate::tunnel::circuit::PendingClientCirc] for information on how to
33//! proceed from there.
34//!
35//! # Design
36//!
37//! For now, this code splits the channel into two pieces: a "Channel"
38//! object that can be used by circuits to write cells onto the
39//! channel, and a "Reactor" object that runs as a task in the
40//! background, to read channel cells and pass them to circuits as
41//! appropriate.
42//!
43//! I'm not at all sure that's the best way to do that, but it's what
44//! I could think of.
45//!
46//! # Limitations
47//!
48//! This is client-only, and only supports link protocol version 4.
49//!
50//! TODO: There is no channel padding.
51//!
52//! TODO: There is no flow control, rate limiting, queueing, or
53//! fairness.
54
55/// The size of the channel buffer for communication between `Channel` and its reactor.
56pub const CHANNEL_BUFFER_SIZE: usize = 128;
57
58mod circmap;
59mod codec;
60mod handshake;
61pub mod kist;
62pub mod padding;
63pub mod params;
64mod reactor;
65mod unique_id;
66
67pub use crate::channel::params::*;
68use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
69pub use crate::channel::unique_id::UniqId;
70use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
71use crate::util::err::ChannelClosed;
72use crate::util::oneshot_broadcast;
73use crate::util::ts::AtomicOptTimestamp;
74use crate::{tunnel, tunnel::circuit, ClockSkew};
75use crate::{Error, Result};
76use reactor::BoxedChannelStreamOps;
77use safelog::sensitive as sv;
78use std::future::{Future, IntoFuture};
79use std::pin::Pin;
80use std::sync::{Mutex, MutexGuard};
81use std::time::Duration;
82use tor_cell::chancell::msg::AnyChanMsg;
83use tor_cell::chancell::{msg, msg::PaddingNegotiate, AnyChanCell, CircId};
84use tor_cell::chancell::{ChanCell, ChanMsg};
85use tor_cell::restricted_msg;
86use tor_error::internal;
87use tor_linkspec::{HasRelayIds, OwnedChanTarget};
88use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
89use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider, StreamOps};
90
91/// Imports that are re-exported pub if feature `testing` is enabled
92///
93/// Putting them together in a little module like this allows us to select the
94/// visibility for all of these things together.
95mod testing_exports {
96 #![allow(unreachable_pub)]
97 pub use super::reactor::CtrlMsg;
98 pub use crate::tunnel::circuit::celltypes::CreateResponse;
99}
100#[cfg(feature = "testing")]
101pub use testing_exports::*;
102#[cfg(not(feature = "testing"))]
103use testing_exports::*;
104
105use asynchronous_codec;
106use futures::channel::mpsc;
107use futures::io::{AsyncRead, AsyncWrite};
108use oneshot_fused_workaround as oneshot;
109
110use educe::Educe;
111use futures::{FutureExt as _, Sink};
112use std::result::Result as StdResult;
113use std::sync::Arc;
114use std::task::{Context, Poll};
115
116use tracing::trace;
117
118// reexport
119use crate::channel::unique_id::CircUniqIdContext;
120#[cfg(test)]
121pub(crate) use codec::CodecError;
122pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
123
124use kist::KistParams;
125
126restricted_msg! {
127 /// A channel message that we allow to be sent from a server to a client on
128 /// an open channel.
129 ///
130 /// (An Open channel here is one on which we have received a NETINFO cell.)
131 ///
132 /// Note that an unexpected message type will _not_ be ignored: instead, it
133 /// will cause the channel to shut down.
134 #[derive(Clone, Debug)]
135 pub(crate) enum OpenChanMsgS2C : ChanMsg {
136 Padding,
137 Vpadding,
138 // Not Create*, since we are not a relay.
139 // Not Created, since we never send CREATE.
140 CreatedFast,
141 Created2,
142 Relay,
143 // Not RelayEarly, since we are a client.
144 Destroy,
145 // Not PaddingNegotiate, since we are not a relay.
146 // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
147 // Not Authorize: it is reserved, but unused.
148 }
149}
150
151/// A channel cell that we allot to be sent on an open channel from
152/// a server to a client.
153pub(crate) type OpenChanCellS2C = ChanCell<OpenChanMsgS2C>;
154
155/// Type alias: A Sink and Stream that transforms a TLS connection into
156/// a cell-based communication mechanism.
157type CellFrame<T> =
158 asynchronous_codec::Framed<T, crate::channel::codec::ChannelCodec<OpenChanMsgS2C, AnyChanMsg>>;
159
160/// An open client channel, ready to send and receive Tor cells.
161///
162/// A channel is a direct connection to a Tor relay, implemented using TLS.
163///
164/// This struct is a frontend that can be used to send cells
165/// and otherwise control the channel. The main state is
166/// in the Reactor object.
167///
168/// (Users need a mutable reference because of the types in `Sink`, and
169/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
170///
171/// # Channel life cycle
172///
173/// Channels can be created directly here through the [`ChannelBuilder`] API.
174/// For a higher-level API (with better support for TLS, pluggable transports,
175/// and channel reuse) see the `tor-chanmgr` crate.
176///
177/// After a channel is created, it will persist until it is closed in one of
178/// four ways:
179/// 1. A remote error occurs.
180/// 2. The other side of the channel closes the channel.
181/// 3. Someone calls [`Channel::terminate`] on the channel.
182/// 4. The last reference to the `Channel` is dropped. (Note that every circuit
183/// on a `Channel` keeps a reference to it, which will in turn keep the
184/// channel from closing until all those circuits have gone away.)
185///
186/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
187/// will just be unusable for most purposes. Most operations on it will fail
188/// with an error.
189#[derive(Debug)]
190pub struct Channel {
191 /// A channel used to send control messages to the Reactor.
192 control: mpsc::UnboundedSender<CtrlMsg>,
193 /// A channel used to send cells to the Reactor.
194 cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
195
196 /// A receiver that indicates whether the channel is closed.
197 ///
198 /// Awaiting will return a `CancelledError` event when the reactor is dropped.
199 /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
200 reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
201
202 /// A unique identifier for this channel.
203 unique_id: UniqId,
204 /// Validated identity and address information for this peer.
205 peer_id: OwnedChanTarget,
206 /// The declared clock skew on this channel, at the time when this channel was
207 /// created.
208 clock_skew: ClockSkew,
209 /// The time when this channel was successfully completed
210 opened_at: coarsetime::Instant,
211 /// Mutable state used by the `Channel.
212 mutable: Mutex<MutableDetails>,
213
214 /// Information shared with the reactor
215 details: Arc<ChannelDetails>,
216}
217
218/// This is information shared between the reactor and the frontend (`Channel` object).
219///
220/// `control` can't be here because we rely on it getting dropped when the last user goes away.
221#[derive(Debug)]
222pub(crate) struct ChannelDetails {
223 /// Since when the channel became unused.
224 ///
225 /// If calling `time_since_update` returns None,
226 /// this channel is still in use by at least one circuit.
227 ///
228 /// Set by reactor when a circuit is added or removed.
229 /// Read from `Channel::duration_unused`.
230 unused_since: AtomicOptTimestamp,
231 /// Memory quota account
232 ///
233 /// This is here partly because we need to ensure it lives as long as the channel,
234 /// as otherwise the memquota system will tear the account down.
235 #[allow(dead_code)]
236 memquota: ChannelAccount,
237}
238
239/// Mutable details (state) used by the `Channel` (frontend)
240#[derive(Debug, Default)]
241struct MutableDetails {
242 /// State used to control padding
243 padding: PaddingControlState,
244}
245
246/// State used to control padding
247///
248/// We store this here because:
249///
250/// 1. It must be per-channel, because it depends on channel usage. So it can't be in
251/// (for example) `ChannelPaddingInstructionsUpdate`.
252///
253/// 2. It could be in the channel manager's per-channel state but (for code flow reasons
254/// there, really) at the point at which the channel manager concludes for a pending
255/// channel that it ought to update the usage, it has relinquished the lock on its own data
256/// structure.
257/// And there is actually no need for this to be global: a per-channel lock is better than
258/// reacquiring the global one.
259///
260/// 3. It doesn't want to be in the channel reactor since that's super hot.
261///
262/// See also the overview at [`tor_proto::channel::padding`](padding)
263#[derive(Debug, Educe)]
264#[educe(Default)]
265enum PaddingControlState {
266 /// No usage of this channel, so far, implies sending or negotiating channel padding.
267 ///
268 /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
269 /// with the following consequences:
270 ///
271 /// * We don't enable our own padding.
272 /// * We don't do any work to change the timeout distribution in the padding timer,
273 /// (which is fine since this timer is not enabled).
274 /// * We don't send any PADDING_NEGOTIATE cells. The peer is supposed to come to the
275 /// same conclusions as us, based on channel usage: it should also not send padding.
276 #[educe(Default)]
277 UsageDoesNotImplyPadding {
278 /// The last padding parameters (from reparameterize)
279 ///
280 /// We keep this so that we can send it if and when
281 /// this channel starts to be used in a way that implies (possibly) sending padding.
282 padding_params: ChannelPaddingInstructionsUpdates,
283 },
284
285 /// Some usage of this channel implies possibly sending channel padding
286 ///
287 /// The required padding timer, negotiation cell, etc.,
288 /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
289 ///
290 /// Once we have set this variant, it remains this way forever for this channel,
291 /// (the spec speaks of channels "only used for" certain purposes not getting padding).
292 PaddingConfigured,
293}
294
295use PaddingControlState as PCS;
296
297/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
298#[derive(Debug)]
299pub(crate) struct ChannelSender {
300 /// MPSC sender to send cells.
301 cell_tx: mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
302 /// A receiver used to check if the channel is closed.
303 reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
304 /// Unique ID for this channel. For logging.
305 unique_id: UniqId,
306}
307
308impl ChannelSender {
309 /// Check whether a cell type is permissible to be _sent_ on an
310 /// open client channel.
311 fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
312 use msg::AnyChanMsg::*;
313 let msg = cell.msg();
314 match msg {
315 Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
316 "Can't send {} cell on client channel",
317 msg.cmd()
318 ))),
319 Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
320 | Netinfo(_) => Err(Error::from(internal!(
321 "Can't send {} cell after handshake is done",
322 msg.cmd()
323 ))),
324 _ => Ok(()),
325 }
326 }
327
328 /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
329 ///
330 /// (This can sometimes be used to avoid having to keep
331 /// a separate clone of the time provider.)
332 pub(crate) fn time_provider(&self) -> &DynTimeProvider {
333 self.cell_tx.time_provider()
334 }
335}
336
337impl Sink<AnyChanCell> for ChannelSender {
338 type Error = Error;
339
340 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
341 let this = self.get_mut();
342 Pin::new(&mut this.cell_tx)
343 .poll_ready(cx)
344 .map_err(|_| ChannelClosed.into())
345 }
346
347 fn start_send(self: Pin<&mut Self>, cell: AnyChanCell) -> Result<()> {
348 let this = self.get_mut();
349 if this.reactor_closed_rx.is_ready() {
350 return Err(ChannelClosed.into());
351 }
352 this.check_cell(&cell)?;
353 {
354 use msg::AnyChanMsg::*;
355 match cell.msg() {
356 Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
357 _ => trace!(
358 "{}: Sending {} for {}",
359 this.unique_id,
360 cell.msg().cmd(),
361 CircId::get_or_zero(cell.circid())
362 ),
363 }
364 }
365
366 Pin::new(&mut this.cell_tx)
367 .start_send(cell)
368 .map_err(|_| ChannelClosed.into())
369 }
370
371 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
372 let this = self.get_mut();
373 Pin::new(&mut this.cell_tx)
374 .poll_flush(cx)
375 .map_err(|_| ChannelClosed.into())
376 }
377
378 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
379 let this = self.get_mut();
380 Pin::new(&mut this.cell_tx)
381 .poll_close(cx)
382 .map_err(|_| ChannelClosed.into())
383 }
384}
385
386/// Structure for building and launching a Tor channel.
387#[derive(Default)]
388pub struct ChannelBuilder {
389 /// If present, a description of the address we're trying to connect to,
390 /// and the way in which we are trying to connect to it.
391 ///
392 /// TODO: at some point, check this against the addresses in the netinfo
393 /// cell too.
394 target: Option<tor_linkspec::ChannelMethod>,
395}
396
397impl ChannelBuilder {
398 /// Construct a new ChannelBuilder.
399 pub fn new() -> Self {
400 ChannelBuilder::default()
401 }
402
403 /// Set the declared target method of this channel to correspond to a direct
404 /// connection to a given socket address.
405 #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
406 pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
407 self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
408 }
409
410 /// Set the declared target method of this channel.
411 ///
412 /// Note that nothing enforces the correctness of this method: it
413 /// doesn't have to match the real method used to create the TLS
414 /// stream.
415 pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
416 self.target = Some(target);
417 }
418
419 /// Launch a new client handshake over a TLS stream.
420 ///
421 /// After calling this function, you'll need to call `connect()` on
422 /// the result to start the handshake. If that succeeds, you'll have
423 /// authentication info from the relay: call `check()` on the result
424 /// to check that. Finally, to finish the handshake, call `finish()`
425 /// on the result of _that_.
426 pub fn launch<T, S>(
427 self,
428 tls: T,
429 sleep_prov: S,
430 memquota: ChannelAccount,
431 ) -> OutboundClientHandshake<T, S>
432 where
433 T: AsyncRead + AsyncWrite + StreamOps + Send + Unpin + 'static,
434 S: CoarseTimeProvider + SleepProvider,
435 {
436 handshake::OutboundClientHandshake::new(tls, self.target, sleep_prov, memquota)
437 }
438}
439
440impl Channel {
441 /// Construct a channel and reactor.
442 ///
443 /// Internal method, called to finalize the channel when we've
444 /// sent our netinfo cell, received the peer's netinfo cell, and
445 /// we're finally ready to create circuits.
446 #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
447 fn new<S>(
448 link_protocol: u16,
449 sink: BoxedChannelSink,
450 stream: BoxedChannelStream,
451 streamops: BoxedChannelStreamOps,
452 unique_id: UniqId,
453 peer_id: OwnedChanTarget,
454 clock_skew: ClockSkew,
455 sleep_prov: S,
456 memquota: ChannelAccount,
457 ) -> Result<(Arc<Self>, reactor::Reactor<S>)>
458 where
459 S: CoarseTimeProvider + SleepProvider,
460 {
461 use circmap::{CircIdRange, CircMap};
462 let circmap = CircMap::new(CircIdRange::High);
463 let dyn_time = DynTimeProvider::new(sleep_prov.clone());
464
465 let (control_tx, control_rx) = mpsc::unbounded();
466 let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
467 .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
468 let unused_since = AtomicOptTimestamp::new();
469 unused_since.update();
470
471 let mutable = MutableDetails::default();
472 let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
473
474 let details = ChannelDetails {
475 unused_since,
476 memquota,
477 };
478 let details = Arc::new(details);
479
480 let channel = Arc::new(Channel {
481 control: control_tx,
482 cell_tx,
483 reactor_closed_rx,
484 unique_id,
485 peer_id,
486 clock_skew,
487 opened_at: coarsetime::Instant::now(),
488 mutable: Mutex::new(mutable),
489 details: Arc::clone(&details),
490 });
491
492 // We start disabled; the channel manager will `reconfigure` us soon after creation.
493 let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None)?);
494
495 let reactor = Reactor {
496 control: control_rx,
497 cells: cell_rx,
498 reactor_closed_tx,
499 input: futures::StreamExt::fuse(stream),
500 output: sink,
501 streamops,
502 circs: circmap,
503 circ_unique_id_ctx: CircUniqIdContext::new(),
504 link_protocol,
505 unique_id,
506 details,
507 padding_timer,
508 special_outgoing: Default::default(),
509 };
510
511 Ok((channel, reactor))
512 }
513
514 /// Return a process-unique identifier for this channel.
515 pub fn unique_id(&self) -> UniqId {
516 self.unique_id
517 }
518
519 /// Return a reference to the memory tracking account for this Channel
520 pub fn mq_account(&self) -> &ChannelAccount {
521 &self.details.memquota
522 }
523
524 /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
525 ///
526 /// (This can sometimes be used to avoid having to keep
527 /// a separate clone of the time provider.)
528 pub fn time_provider(&self) -> &DynTimeProvider {
529 self.cell_tx.time_provider()
530 }
531
532 /// Return an OwnedChanTarget representing the actual handshake used to
533 /// create this channel.
534 pub fn target(&self) -> &OwnedChanTarget {
535 &self.peer_id
536 }
537
538 /// Return the amount of time that has passed since this channel became open.
539 pub fn age(&self) -> Duration {
540 self.opened_at.elapsed().into()
541 }
542
543 /// Return a ClockSkew declaring how much clock skew the other side of this channel
544 /// claimed that we had when we negotiated the connection.
545 pub fn clock_skew(&self) -> ClockSkew {
546 self.clock_skew
547 }
548
549 /// Send a control message
550 fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
551 self.control
552 .unbounded_send(msg)
553 .map_err(|_| ChannelClosed)?;
554 Ok(())
555 }
556
557 /// Acquire the lock on `mutable` (and handle any poison error)
558 fn mutable(&self) -> MutexGuard<MutableDetails> {
559 self.mutable.lock().expect("channel details poisoned")
560 }
561
562 /// Specify that this channel should do activities related to channel padding
563 ///
564 /// Initially, the channel does nothing related to channel padding:
565 /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
566 ///
567 /// After this function has been called, it will do both,
568 /// according to the parameters specified through `reparameterize`.
569 /// Note that this might include *disabling* padding
570 /// (for example, by sending a `PADDING_NEGOTIATE`).
571 ///
572 /// Idempotent.
573 ///
574 /// There is no way to undo the effect of this call.
575 pub fn engage_padding_activities(&self) {
576 let mut mutable = self.mutable();
577
578 match &mutable.padding {
579 PCS::UsageDoesNotImplyPadding {
580 padding_params: params,
581 } => {
582 // Well, apparently the channel usage *does* imply padding now,
583 // so we need to (belatedly) enable the timer,
584 // send the padding negotiation cell, etc.
585 let mut params = params.clone();
586
587 // Except, maybe the padding we would be requesting is precisely default,
588 // so we wouldn't actually want to send that cell.
589 if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
590 params.padding_negotiate = None;
591 }
592
593 match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
594 Ok(()) => {}
595 Err(ChannelClosed) => return,
596 }
597
598 mutable.padding = PCS::PaddingConfigured;
599 }
600
601 PCS::PaddingConfigured => {
602 // OK, nothing to do
603 }
604 }
605
606 drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
607 }
608
609 /// Reparameterise (update parameters; reconfigure)
610 ///
611 /// Returns `Err` if the channel was closed earlier
612 pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
613 let mut mutable = self
614 .mutable
615 .lock()
616 .map_err(|_| internal!("channel details poisoned"))?;
617
618 match &mut mutable.padding {
619 PCS::PaddingConfigured => {
620 self.send_control(CtrlMsg::ConfigUpdate(params))?;
621 }
622 PCS::UsageDoesNotImplyPadding { padding_params } => {
623 padding_params.combine(¶ms);
624 }
625 }
626
627 drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
628 Ok(())
629 }
630
631 /// Update the KIST parameters.
632 ///
633 /// Returns `Err` if the channel is closed.
634 pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
635 Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
636 }
637
638 /// Return an error if this channel is somehow mismatched with the
639 /// given target.
640 pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
641 check_id_match_helper(&self.peer_id, target)
642 }
643
644 /// Return true if this channel is closed and therefore unusable.
645 pub fn is_closing(&self) -> bool {
646 self.reactor_closed_rx.is_ready()
647 }
648
649 /// If the channel is not in use, return the amount of time
650 /// it has had with no circuits.
651 ///
652 /// Return `None` if the channel is currently in use.
653 pub fn duration_unused(&self) -> Option<std::time::Duration> {
654 self.details
655 .unused_since
656 .time_since_update()
657 .map(Into::into)
658 }
659
660 /// Return a new [`ChannelSender`] to transmit cells on this channel.
661 pub(crate) fn sender(&self) -> ChannelSender {
662 ChannelSender {
663 cell_tx: self.cell_tx.clone(),
664 reactor_closed_rx: self.reactor_closed_rx.clone(),
665 unique_id: self.unique_id,
666 }
667 }
668
669 /// Return a newly allocated PendingClientCirc object with
670 /// a corresponding circuit reactor. A circuit ID is allocated, but no
671 /// messages are sent, and no cryptography is done.
672 ///
673 /// To use the results of this method, call Reactor::run() in a
674 /// new task, then use the methods of
675 /// [crate::tunnel::circuit::PendingClientCirc] to build the circuit.
676 pub async fn new_circ(
677 self: &Arc<Self>,
678 ) -> Result<(circuit::PendingClientCirc, tunnel::reactor::Reactor)> {
679 if self.is_closing() {
680 return Err(ChannelClosed.into());
681 }
682
683 let time_prov = self.cell_tx.time_provider().clone();
684 let memquota = CircuitAccount::new(&self.details.memquota)?;
685
686 // TODO: blocking is risky, but so is unbounded.
687 let (sender, receiver) =
688 MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
689 let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
690
691 let (tx, rx) = oneshot::channel();
692 self.send_control(CtrlMsg::AllocateCircuit {
693 created_sender: createdsender,
694 sender,
695 tx,
696 })?;
697 let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
698
699 trace!("{}: Allocated CircId {}", circ_unique_id, id);
700
701 Ok(circuit::PendingClientCirc::new(
702 id,
703 self.clone(),
704 createdreceiver,
705 receiver,
706 circ_unique_id,
707 time_prov,
708 memquota,
709 ))
710 }
711
712 /// Shut down this channel immediately, along with all circuits that
713 /// are using it.
714 ///
715 /// Note that other references to this channel may exist. If they
716 /// do, they will stop working after you call this function.
717 ///
718 /// It's not necessary to call this method if you're just done
719 /// with a channel: the channel should close on its own once nothing
720 /// is using it any more.
721 pub fn terminate(&self) {
722 let _ = self.send_control(CtrlMsg::Shutdown);
723 }
724
725 /// Tell the reactor that the circuit with the given ID has gone away.
726 pub fn close_circuit(&self, circid: CircId) -> Result<()> {
727 self.send_control(CtrlMsg::CloseCircuit(circid))?;
728 Ok(())
729 }
730
731 /// Return a future that will resolve once this channel has closed.
732 ///
733 /// Note that this method does not _cause_ the channel to shut down on its own.
734 pub fn wait_for_close(
735 &self,
736 ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static
737 {
738 self.reactor_closed_rx
739 .clone()
740 .into_future()
741 .map(|recv| match recv {
742 Ok(Ok(info)) => Ok(info),
743 Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
744 Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
745 })
746 }
747
748 /// Make a new fake reactor-less channel. For testing only, obviously.
749 ///
750 /// Returns the receiver end of the control message mpsc.
751 ///
752 /// Suitable for external callers who want to test behaviour
753 /// of layers including the logic in the channel frontend
754 /// (`Channel` object methods).
755 //
756 // This differs from test::fake_channel as follows:
757 // * It returns the mpsc Receiver
758 // * It does not require explicit specification of details
759 #[cfg(feature = "testing")]
760 pub fn new_fake() -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
761 let (control, control_recv) = mpsc::unbounded();
762 let details = fake_channel_details();
763
764 let unique_id = UniqId::new();
765 let peer_id = OwnedChanTarget::builder()
766 .ed_identity([6_u8; 32].into())
767 .rsa_identity([10_u8; 20].into())
768 .build()
769 .expect("Couldn't construct peer id");
770
771 // This will make rx trigger immediately.
772 let (_tx, rx) = oneshot_broadcast::channel();
773
774 let channel = Channel {
775 control,
776 cell_tx: fake_mpsc().0,
777 reactor_closed_rx: rx,
778 unique_id,
779 peer_id,
780 clock_skew: ClockSkew::None,
781 opened_at: coarsetime::Instant::now(),
782 mutable: Default::default(),
783 details,
784 };
785 (channel, control_recv)
786 }
787}
788
789/// If there is any identity in `wanted_ident` that is not present in
790/// `my_ident`, return a ChanMismatch error.
791///
792/// This is a helper for [`Channel::check_match`] and
793/// [`UnverifiedChannel::check_internal`].
794fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
795where
796 T: HasRelayIds + ?Sized,
797 U: HasRelayIds + ?Sized,
798{
799 for desired in wanted_ident.identities() {
800 let id_type = desired.id_type();
801 match my_ident.identity(id_type) {
802 Some(actual) if actual == desired => {}
803 Some(actual) => {
804 return Err(Error::ChanMismatch(format!(
805 "Identity {} does not match target {}",
806 sv(actual),
807 sv(desired)
808 )));
809 }
810 None => {
811 return Err(Error::ChanMismatch(format!(
812 "Peer does not have {} identity",
813 id_type
814 )))
815 }
816 }
817 }
818 Ok(())
819}
820
821impl HasRelayIds for Channel {
822 fn identity(
823 &self,
824 key_type: tor_linkspec::RelayIdType,
825 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
826 self.peer_id.identity(key_type)
827 }
828}
829
830/// The status of a channel which was closed successfully.
831///
832/// **Note:** This doesn't have any associated data,
833/// but may be expanded in the future.
834// I can't think of any info we'd want to return to waiters,
835// but this type leaves the possibility open without requiring any backwards-incompatible changes.
836#[derive(Clone, Debug)]
837#[non_exhaustive]
838pub struct CloseInfo;
839
840/// The status of a channel which closed unexpectedly.
841#[derive(Clone, Debug, thiserror::Error)]
842#[non_exhaustive]
843pub enum ClosedUnexpectedly {
844 /// The channel reactor was dropped or panicked before completing.
845 #[error("channel reactor was dropped or panicked before completing")]
846 ReactorDropped,
847 /// The channel reactor had an internal error.
848 #[error("channel reactor had an internal error")]
849 ReactorError(Error),
850}
851
852/// Make some fake channel details (for testing only!)
853#[cfg(any(test, feature = "testing"))]
854fn fake_channel_details() -> Arc<ChannelDetails> {
855 let unused_since = AtomicOptTimestamp::new();
856
857 Arc::new(ChannelDetails {
858 unused_since,
859 memquota: crate::util::fake_mq(),
860 })
861}
862
863/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
864#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
865pub(crate) fn fake_mpsc() -> (
866 mq_queue::Sender<AnyChanCell, mq_queue::MpscSpec>,
867 mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
868) {
869 crate::fake_mpsc(CHANNEL_BUFFER_SIZE)
870}
871
872#[cfg(test)]
873pub(crate) mod test {
874 // Most of this module is tested via tests that also check on the
875 // reactor code; there are just a few more cases to examine here.
876 #![allow(clippy::unwrap_used)]
877 use super::*;
878 use crate::channel::codec::test::MsgBuf;
879 pub(crate) use crate::channel::reactor::test::new_reactor;
880 use crate::util::fake_mq;
881 use tor_cell::chancell::msg::HandshakeType;
882 use tor_cell::chancell::{msg, AnyChanCell};
883 use tor_rtcompat::PreferredRuntime;
884
885 /// Make a new fake reactor-less channel. For testing only, obviously.
886 pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
887 let unique_id = UniqId::new();
888 let peer_id = OwnedChanTarget::builder()
889 .ed_identity([6_u8; 32].into())
890 .rsa_identity([10_u8; 20].into())
891 .build()
892 .expect("Couldn't construct peer id");
893 // This will make rx trigger immediately.
894 let (_tx, rx) = oneshot_broadcast::channel();
895 Channel {
896 control: mpsc::unbounded().0,
897 cell_tx: fake_mpsc().0,
898 reactor_closed_rx: rx,
899 unique_id,
900 peer_id,
901 clock_skew: ClockSkew::None,
902 opened_at: coarsetime::Instant::now(),
903 mutable: Default::default(),
904 details,
905 }
906 }
907
908 #[test]
909 fn send_bad() {
910 tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
911 use std::error::Error;
912 let chan = fake_channel(fake_channel_details());
913
914 let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
915 let e = chan.sender().check_cell(&cell);
916 assert!(e.is_err());
917 assert!(format!("{}", e.unwrap_err().source().unwrap())
918 .contains("Can't send CREATED2 cell on client channel"));
919 let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
920 let e = chan.sender().check_cell(&cell);
921 assert!(e.is_err());
922 assert!(format!("{}", e.unwrap_err().source().unwrap())
923 .contains("Can't send CERTS cell after handshake is done"));
924
925 let cell = AnyChanCell::new(
926 CircId::new(5),
927 msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
928 );
929 let e = chan.sender().check_cell(&cell);
930 assert!(e.is_ok());
931 // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
932 // let got = output.next().await.unwrap();
933 // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
934 });
935 }
936
937 #[test]
938 fn chanbuilder() {
939 let rt = PreferredRuntime::create().unwrap();
940 let mut builder = ChannelBuilder::default();
941 builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec!["127.0.0.1:9001"
942 .parse()
943 .unwrap()]));
944 let tls = MsgBuf::new(&b""[..]);
945 let _outbound = builder.launch(tls, rt, fake_mq());
946 }
947
948 #[test]
949 fn check_match() {
950 let chan = fake_channel(fake_channel_details());
951
952 let t1 = OwnedChanTarget::builder()
953 .ed_identity([6; 32].into())
954 .rsa_identity([10; 20].into())
955 .build()
956 .unwrap();
957 let t2 = OwnedChanTarget::builder()
958 .ed_identity([1; 32].into())
959 .rsa_identity([3; 20].into())
960 .build()
961 .unwrap();
962 let t3 = OwnedChanTarget::builder()
963 .ed_identity([3; 32].into())
964 .rsa_identity([2; 20].into())
965 .build()
966 .unwrap();
967
968 assert!(chan.check_match(&t1).is_ok());
969 assert!(chan.check_match(&t2).is_err());
970 assert!(chan.check_match(&t3).is_err());
971 }
972
973 #[test]
974 fn unique_id() {
975 let ch1 = fake_channel(fake_channel_details());
976 let ch2 = fake_channel(fake_channel_details());
977 assert_ne!(ch1.unique_id(), ch2.unique_id());
978 }
979
980 #[test]
981 fn duration_unused_at() {
982 let details = fake_channel_details();
983 let ch = fake_channel(Arc::clone(&details));
984 details.unused_since.update();
985 assert!(ch.duration_unused().is_some());
986 }
987}