1
//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2
//!
3
//! Channels form the basis of the rest of the Tor protocol: they are
4
//! the only way for two Tor instances to talk.
5
//!
6
//! Channels are not useful directly for application requests: after
7
//! making a channel, it needs to get used to build circuits, and the
8
//! circuits are used to anonymize streams.  The streams are the
9
//! objects corresponding to directory requests.
10
//!
11
//! In general, you shouldn't try to manage channels on your own;
12
//! however, there is no alternative in Arti today.  (A future
13
//! channel-manager library will probably fix that.)
14
//!
15
//! To launch a channel:
16
//!
17
//!  * Create a TLS connection as an object that implements AsyncRead
18
//!    + AsyncWrite, and pass it to a [ChannelBuilder].  This will
19
//!    yield an [handshake::OutboundClientHandshake] that represents
20
//!    the state of the handshake.
21
//!  * Call [handshake::OutboundClientHandshake::connect] on the result
22
//!    to negotiate the rest of the handshake.  This will verify
23
//!    syntactic correctness of the handshake, but not its cryptographic
24
//!    integrity.
25
//!  * Call [handshake::UnverifiedChannel::check] on the result.  This
26
//!    finishes the cryptographic checks.
27
//!  * Call [handshake::VerifiedChannel::finish] on the result. This
28
//!    completes the handshake and produces an open channel and Reactor.
29
//!  * Launch an asynchronous task to call the reactor's run() method.
30
//!
31
//! One you have a running channel, you can create circuits on it with
32
//! its [Channel::new_circ] method.  See
33
//! [crate::circuit::PendingClientCirc] for information on how to
34
//! proceed from there.
35
//!
36
//! # Design
37
//!
38
//! For now, this code splits the channel into two pieces: a "Channel"
39
//! object that can be used by circuits to write cells onto the
40
//! channel, and a "Reactor" object that runs as a task in the
41
//! background, to read channel cells and pass them to circuits as
42
//! appropriate.
43
//!
44
//! I'm not at all sure that's the best way to do that, but it's what
45
//! I could think of.
46
//!
47
//! # Limitations
48
//!
49
//! This is client-only, and only supports link protocol version 4.
50
//!
51
//! TODO: There is no channel padding.
52
//!
53
//! TODO: There is no flow control, rate limiting, queueing, or
54
//! fairness.
55

            
56
/// The size of the channel buffer for communication between `Channel` and its reactor.
57
pub const CHANNEL_BUFFER_SIZE: usize = 128;
58

            
59
mod circmap;
60
mod codec;
61
mod handshake;
62
pub mod padding;
63
pub mod params;
64
mod reactor;
65
mod unique_id;
66

            
67
pub use crate::channel::params::*;
68
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
69
pub use crate::channel::unique_id::UniqId;
70
use crate::util::err::ChannelClosed;
71
use crate::util::ts::OptTimestamp;
72
use crate::{circuit, ClockSkew};
73
use crate::{Error, Result};
74
use safelog::sensitive as sv;
75
use std::pin::Pin;
76
use std::sync::{Mutex, MutexGuard};
77
use std::time::Duration;
78
use tor_cell::chancell::msg::AnyChanMsg;
79
use tor_cell::chancell::{msg, msg::PaddingNegotiate, AnyChanCell, CircId};
80
use tor_cell::chancell::{ChanCell, ChanMsg};
81
use tor_cell::restricted_msg;
82
use tor_error::internal;
83
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
84
use tor_rtcompat::SleepProvider;
85

            
86
/// Imports that are re-exported pub if feature `testing` is enabled
87
///
88
/// Putting them together in a little module like this allows us to select the
89
/// visibility for all of these things together.
90
mod testing_exports {
91
    #![allow(unreachable_pub)]
92
    pub use super::reactor::CtrlMsg;
93
    pub use crate::circuit::celltypes::CreateResponse;
94
}
95
#[cfg(feature = "testing")]
96
pub use testing_exports::*;
97
#[cfg(not(feature = "testing"))]
98
use testing_exports::*;
99

            
100
use asynchronous_codec as futures_codec;
101
use futures::channel::mpsc;
102
use futures::io::{AsyncRead, AsyncWrite};
103
use tor_async_utils::oneshot;
104

            
105
use educe::Educe;
106
use futures::{FutureExt as _, Sink, SinkExt as _};
107
use std::result::Result as StdResult;
108
use std::sync::atomic::{AtomicBool, Ordering};
109
use std::sync::Arc;
110
use std::task::{Context, Poll};
111

            
112
use tracing::trace;
113

            
114
// reexport
115
use crate::channel::unique_id::CircUniqIdContext;
116
#[cfg(test)]
117
pub(crate) use codec::CodecError;
118
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
119

            
120
restricted_msg! {
121
    /// A channel message that we allow to be sent from a server to a client on
122
    /// an open channel.
123
    ///
124
    /// (An Open channel here is one on which we have received a NETINFO cell.)
125
    ///
126
    /// Note that an unexpected message type will _not_ be ignored: instead, it
127
    /// will cause the channel to shut down.
128
    #[derive(Clone, Debug)]
129
    pub(crate) enum OpenChanMsgS2C : ChanMsg {
130
        Padding,
131
        Vpadding,
132
        // Not Create*, since we are not a relay.
133
        // Not Created, since we never send CREATE.
134
        CreatedFast,
135
        Created2,
136
        Relay,
137
        // Not RelayEarly, since we are a client.
138
        Destroy,
139
        // Not PaddingNegotiate, since we are not a relay.
140
        // Not Versions, Certs, AuthChallenge, Authenticate: they are for handshakes.
141
        // Not Authorize: it is reserved, but unused.
142
    }
143
}
144

            
145
/// A channel cell that we allot to be sent on an open channel from
146
/// a server to a client.
147
pub(crate) type OpenChanCellS2C = ChanCell<OpenChanMsgS2C>;
148

            
149
/// Type alias: A Sink and Stream that transforms a TLS connection into
150
/// a cell-based communication mechanism.
151
type CellFrame<T> =
152
    futures_codec::Framed<T, crate::channel::codec::ChannelCodec<OpenChanMsgS2C, AnyChanMsg>>;
153

            
154
/// An open client channel, ready to send and receive Tor cells.
155
///
156
/// A channel is a direct connection to a Tor relay, implemented using TLS.
157
///
158
/// This struct is a frontend that can be used to send cells (using the
159
/// `Sink<ChanCell>` impl and otherwise control the channel.  The main state is
160
/// in the Reactor object. `Channel` is cheap to clone.
161
///
162
/// (Users need a mutable reference because of the types in `Sink`, and
163
/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
164
///
165
/// # Channel life cycle
166
///
167
/// Channels can be created directly here through the [`ChannelBuilder`] API.
168
/// For a higher-level API (with better support for TLS, pluggable transports,
169
/// and channel reuse) see the `tor-chanmgr` crate.
170
///
171
/// After a channel is created, it will persist until it is closed in one of
172
/// four ways:
173
///    1. A remote error occurs.
174
///    2. The other side of the channel closes the channel.
175
///    3. Someone calls [`Channel::terminate`] on the channel.
176
///    4. The last reference to the `Channel` is dropped. (Note that every circuit
177
///       on a `Channel` keeps a reference to it, which will in turn keep the
178
///       channel from closing until all those circuits have gone away.)
179
///
180
/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
181
/// will just be unusable for most purposes.  Most operations on it will fail
182
/// with an error.
183
333
#[derive(Clone, Debug)]
184
pub struct Channel {
185
    /// A channel used to send control messages to the Reactor.
186
    control: mpsc::UnboundedSender<CtrlMsg>,
187
    /// A channel used to send cells to the Reactor.
188
    cell_tx: mpsc::Sender<AnyChanCell>,
189
    /// Information shared with the reactor
190
    details: Arc<ChannelDetails>,
191
}
192

            
193
/// This is information shared between the reactor and the frontend (`Channel` object).
194
///
195
/// This exists to make `Channel` cheap to clone, which is desirable because every circuit wants
196
/// an owned mutable `Channel`.
197
///
198
/// `control` can't be here because we rely on it getting dropped when the last user goes away.
199
#[derive(Debug)]
200
pub(crate) struct ChannelDetails {
201
    /// A unique identifier for this channel.
202
    unique_id: UniqId,
203
    /// Validated identity and address information for this peer.
204
    peer_id: OwnedChanTarget,
205
    /// If true, this channel is closing.
206
    closed: AtomicBool,
207
    /// A receiver that will get a Cancelled event when the reactor is finally
208
    /// dropped.
209
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
210
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
211
    /// Since when the channel became unused.
212
    ///
213
    /// If calling `time_since_update` returns None,
214
    /// this channel is still in use by at least one circuit.
215
    unused_since: OptTimestamp,
216
    /// The declared clock skew on this channel, at the time when this channel was
217
    /// created.
218
    clock_skew: ClockSkew,
219
    /// The time when this channel was successfully completed
220
    opened_at: coarsetime::Instant,
221
    /// Mutable state used by the `Channel` (frontend)
222
    ///
223
    /// The reactor (hot code) ought to avoid acquiring this lock.
224
    /// (It doesn't currently have a usable reference to it.)
225
    mutable: Mutex<MutableDetails>,
226
}
227

            
228
/// Mutable details (state) used by the `Channel` (frontend)
229
14
#[derive(Debug, Default)]
230
struct MutableDetails {
231
    /// State used to control padding
232
    padding: PaddingControlState,
233
}
234

            
235
/// State used to control padding
236
///
237
/// We store this here because:
238
///
239
///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
240
///     (for example) `ChannelPaddingInstructionsUpdate`.
241
///
242
///  2. It could be in the channel manager's per-channel state but (for code flow reasons
243
///     there, really) at the point at which the channel manager concludes for a pending
244
///     channel that it ought to update the usage, it has relinquished the lock on its own data
245
///     structure.
246
///     And there is actually no need for this to be global: a per-channel lock is better than
247
///     reacquiring the global one.
248
///
249
///  3. It doesn't want to be in the channel reactor since that's super hot.
250
///
251
/// See also the overview at [`tor_proto::channel::padding`](padding)
252
14
#[derive(Debug, Educe)]
253
#[educe(Default)]
254
enum PaddingControlState {
255
    /// No usage of this channel, so far, implies sending or negotiating channel padding.
256
    ///
257
    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
258
    /// with the following consequences:
259
    ///
260
    ///  * We don't enable our own padding.
261
    ///  * We don't do any work to change the timeout distribution in the padding timer,
262
    ///    (which is fine since this timer is not enabled).
263
    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
264
    ///    same conclusions as us, based on channel usage: it should also not send padding.
265
    #[educe(Default)]
266
    UsageDoesNotImplyPadding {
267
        /// The last padding parameters (from reparameterize)
268
        ///
269
        /// We keep this so that we can send it if and when
270
        /// this channel starts to be used in a way that implies (possibly) sending padding.
271
        padding_params: ChannelPaddingInstructionsUpdates,
272
    },
273

            
274
    /// Some usage of this channel implies possibly sending channel padding
275
    ///
276
    /// The required padding timer, negotiation cell, etc.,
277
    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
278
    ///
279
    /// Once we have set this variant, it remains this way forever for this channel,
280
    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
281
    PaddingConfigured,
282
}
283

            
284
use PaddingControlState as PCS;
285

            
286
impl Sink<AnyChanCell> for Channel {
287
    type Error = Error;
288

            
289
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
290
        let this = self.get_mut();
291
        Pin::new(&mut this.cell_tx)
292
            .poll_ready(cx)
293
            .map_err(|_| ChannelClosed.into())
294
    }
295

            
296
357
    fn start_send(self: Pin<&mut Self>, cell: AnyChanCell) -> Result<()> {
297
357
        let this = self.get_mut();
298
357
        if this.details.closed.load(Ordering::SeqCst) {
299
            return Err(ChannelClosed.into());
300
357
        }
301
357
        this.check_cell(&cell)?;
302
        {
303
            use msg::AnyChanMsg::*;
304
357
            match cell.msg() {
305
168
                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
306
189
                _ => trace!(
307
                    "{}: Sending {} for {}",
308
                    this.details.unique_id,
309
                    cell.msg().cmd(),
310
                    CircId::get_or_zero(cell.circid())
311
                ),
312
            }
313
        }
314

            
315
357
        Pin::new(&mut this.cell_tx)
316
357
            .start_send(cell)
317
357
            .map_err(|_| ChannelClosed.into())
318
357
    }
319

            
320
1806
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
321
1806
        let this = self.get_mut();
322
1806
        Pin::new(&mut this.cell_tx)
323
1806
            .poll_flush(cx)
324
1806
            .map_err(|_| ChannelClosed.into())
325
1806
    }
326

            
327
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
328
        let this = self.get_mut();
329
        Pin::new(&mut this.cell_tx)
330
            .poll_close(cx)
331
            .map_err(|_| ChannelClosed.into())
332
    }
333
}
334

            
335
/// Structure for building and launching a Tor channel.
336
42
#[derive(Default)]
337
pub struct ChannelBuilder {
338
    /// If present, a description of the the address we're trying to connect to,
339
    /// and the way in which we are trying to connect to it.
340
    ///
341
    /// TODO: at some point, check this against the addresses in the netinfo
342
    /// cell too.
343
    target: Option<tor_linkspec::ChannelMethod>,
344
}
345

            
346
impl ChannelBuilder {
347
    /// Construct a new ChannelBuilder.
348
42
    pub fn new() -> Self {
349
42
        ChannelBuilder::default()
350
42
    }
351

            
352
    /// Set the declared target method of this channel to correspond to a direct
353
    /// connection to a given socket address.
354
    #[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
355
    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
356
        self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
357
    }
358

            
359
    /// Set the declared target method of this channel.
360
    ///
361
    /// Note that nothing enforces the correctness of this method: it
362
    /// doesn't have to match the real method used to create the TLS
363
    /// stream.
364
42
    pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
365
42
        self.target = Some(target);
366
42
    }
367

            
368
    /// Launch a new client handshake over a TLS stream.
369
    ///
370
    /// After calling this function, you'll need to call `connect()` on
371
    /// the result to start the handshake.  If that succeeds, you'll have
372
    /// authentication info from the relay: call `check()` on the result
373
    /// to check that.  Finally, to finish the handshake, call `finish()`
374
    /// on the result of _that_.
375
14
    pub fn launch<T, S>(self, tls: T, sleep_prov: S) -> OutboundClientHandshake<T, S>
376
14
    where
377
14
        T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
378
14
        S: SleepProvider,
379
14
    {
380
14
        handshake::OutboundClientHandshake::new(tls, self.target, sleep_prov)
381
14
    }
382
}
383

            
384
impl Channel {
385
    /// Construct a channel and reactor.
386
    ///
387
    /// Internal method, called to finalize the channel when we've
388
    /// sent our netinfo cell, received the peer's netinfo cell, and
389
    /// we're finally ready to create circuits.
390
14
    fn new<S>(
391
14
        link_protocol: u16,
392
14
        sink: BoxedChannelSink,
393
14
        stream: BoxedChannelStream,
394
14
        unique_id: UniqId,
395
14
        peer_id: OwnedChanTarget,
396
14
        clock_skew: ClockSkew,
397
14
        sleep_prov: S,
398
14
    ) -> (Self, reactor::Reactor<S>)
399
14
    where
400
14
        S: SleepProvider,
401
14
    {
402
14
        use circmap::{CircIdRange, CircMap};
403
14
        let circmap = CircMap::new(CircIdRange::High);
404
14

            
405
14
        let (control_tx, control_rx) = mpsc::unbounded();
406
14
        let (cell_tx, cell_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
407
14
        let closed = AtomicBool::new(false);
408
14
        let unused_since = OptTimestamp::new();
409
14
        unused_since.update();
410
14

            
411
14
        let mutable = MutableDetails::default();
412
14
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
413
14
        let reactor_closed_rx = reactor_closed_rx.shared();
414
14

            
415
14
        let details = ChannelDetails {
416
14
            unique_id,
417
14
            peer_id,
418
14
            closed,
419
14
            unused_since,
420
14
            clock_skew,
421
14
            reactor_closed_rx,
422
14
            opened_at: coarsetime::Instant::now(),
423
14
            mutable: Mutex::new(mutable),
424
14
        };
425
14
        let details = Arc::new(details);
426
14

            
427
14
        let channel = Channel {
428
14
            control: control_tx,
429
14
            cell_tx,
430
14
            details: Arc::clone(&details),
431
14
        };
432
14

            
433
14
        // We start disabled; the channel manager will `reconfigure` us soon after creation.
434
14
        let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None));
435
14

            
436
14
        let reactor = Reactor {
437
14
            control: control_rx,
438
14
            cells: cell_rx,
439
14
            reactor_closed_tx,
440
14
            input: futures::StreamExt::fuse(stream),
441
14
            output: sink,
442
14
            circs: circmap,
443
14
            circ_unique_id_ctx: CircUniqIdContext::new(),
444
14
            link_protocol,
445
14
            details,
446
14
            padding_timer,
447
14
            special_outgoing: Default::default(),
448
14
        };
449
14

            
450
14
        (channel, reactor)
451
14
    }
452

            
453
    /// Return a process-unique identifier for this channel.
454
    pub fn unique_id(&self) -> UniqId {
455
        self.details.unique_id
456
    }
457

            
458
    /// Return an OwnedChanTarget representing the actual handshake used to
459
    /// create this channel.
460
105
    pub fn target(&self) -> &OwnedChanTarget {
461
105
        &self.details.peer_id
462
105
    }
463

            
464
    /// Return the amount of time that has passed since this channel became open.
465
    pub fn age(&self) -> Duration {
466
        self.details.opened_at.elapsed().into()
467
    }
468

            
469
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
470
    /// claimed that we had when we negotiated the connection.
471
84
    pub fn clock_skew(&self) -> ClockSkew {
472
84
        self.details.clock_skew
473
84
    }
474

            
475
    /// Send a control message
476
126
    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
477
126
        self.control
478
126
            .unbounded_send(msg)
479
126
            .map_err(|_| ChannelClosed)?;
480
126
        Ok(())
481
126
    }
482

            
483
    /// Acquire the lock on `mutable` (and handle any poison error)
484
42
    fn mutable(&self) -> MutexGuard<MutableDetails> {
485
42
        self.details
486
42
            .mutable
487
42
            .lock()
488
42
            .expect("channel details poisoned")
489
42
    }
490

            
491
    /// Specify that this channel should do activities related to channel padding
492
    ///
493
    /// Initially, the channel does nothing related to channel padding:
494
    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
495
    ///
496
    /// After this function has been called, it will do both,
497
    /// according to the parameters specified through `reparameterize`.
498
    /// Note that this might include *disabling* padding
499
    /// (for example, by sending a `PADDING_NEGOTIATE`).
500
    ///
501
    /// Idempotent.
502
    ///
503
    /// There is no way to undo the effect of this call.
504
42
    pub fn engage_padding_activities(&self) {
505
42
        let mut mutable = self.mutable();
506
42

            
507
42
        match &mutable.padding {
508
            PCS::UsageDoesNotImplyPadding {
509
21
                padding_params: params,
510
21
            } => {
511
21
                // Well, apparently the channel usage *does* imply padding now,
512
21
                // so we need to (belatedly) enable the timer,
513
21
                // send the padding negotiation cell, etc.
514
21
                let mut params = params.clone();
515
21

            
516
21
                // Except, maybe the padding we would be requesting is precisely default,
517
21
                // so we wouldn't actually want to send that cell.
518
21
                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
519
                    params.padding_negotiate = None;
520
21
                }
521

            
522
21
                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
523
21
                    Ok(()) => {}
524
                    Err(ChannelClosed) => return,
525
                }
526

            
527
21
                mutable.padding = PCS::PaddingConfigured;
528
            }
529

            
530
21
            PCS::PaddingConfigured => {
531
21
                // OK, nothing to do
532
21
            }
533
        }
534

            
535
42
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
536
42
    }
537

            
538
    /// Reparameterise (update parameters; reconfigure)
539
    ///
540
    /// Returns `Err` if the channel was closed earlier
541
42
    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
542
42
        let mut mutable = self
543
42
            .details
544
42
            .mutable
545
42
            .lock()
546
42
            .map_err(|_| internal!("channel details poisoned"))?;
547

            
548
42
        match &mut mutable.padding {
549
            PCS::PaddingConfigured => {
550
                self.send_control(CtrlMsg::ConfigUpdate(params))?;
551
            }
552
42
            PCS::UsageDoesNotImplyPadding { padding_params } => {
553
42
                padding_params.combine(&params);
554
42
            }
555
        }
556

            
557
42
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
558
42
        Ok(())
559
42
    }
560

            
561
    /// Return an error if this channel is somehow mismatched with the
562
    /// given target.
563
133
    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
564
133
        check_id_match_helper(&self.details.peer_id, target)
565
133
    }
566

            
567
    /// Return true if this channel is closed and therefore unusable.
568
168
    pub fn is_closing(&self) -> bool {
569
168
        self.details.closed.load(Ordering::SeqCst)
570
168
    }
571

            
572
    /// If the channel is not in use, return the amount of time
573
    /// it has had with no circuits.
574
    ///
575
    /// Return `None` if the channel is currently in use.
576
    pub fn duration_unused(&self) -> Option<std::time::Duration> {
577
        self.details
578
            .unused_since
579
            .time_since_update()
580
            .map(Into::into)
581
    }
582

            
583
    /// Check whether a cell type is permissible to be _sent_ on an
584
    /// open client channel.
585
357
    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
586
357
        use msg::AnyChanMsg::*;
587
357
        let msg = cell.msg();
588
357
        match msg {
589
            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
590
                "Can't send {} cell on client channel",
591
                msg.cmd()
592
            ))),
593
            Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
594
            | Netinfo(_) => Err(Error::from(internal!(
595
                "Can't send {} cell after handshake is done",
596
                msg.cmd()
597
            ))),
598
357
            _ => Ok(()),
599
        }
600
357
    }
601

            
602
    /// Like `futures::Sink::poll_ready`.
603
2058
    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool> {
604
2058
        Ok(match Pin::new(&mut self.cell_tx).poll_ready(cx) {
605
2058
            Poll::Ready(Ok(_)) => true,
606
            Poll::Ready(Err(_)) => return Err(Error::CircuitClosed),
607
            Poll::Pending => false,
608
        })
609
2058
    }
610

            
611
    /// Transmit a single cell on a channel.
612
    pub async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
613
        self.send(cell).await?;
614

            
615
        Ok(())
616
    }
617

            
618
    /// Return a newly allocated PendingClientCirc object with
619
    /// a corresponding circuit reactor. A circuit ID is allocated, but no
620
    /// messages are sent, and no cryptography is done.
621
    ///
622
    /// To use the results of this method, call Reactor::run() in a
623
    /// new task, then use the methods of
624
    /// [crate::circuit::PendingClientCirc] to build the circuit.
625
105
    pub async fn new_circ(
626
105
        &self,
627
110
    ) -> Result<(circuit::PendingClientCirc, circuit::reactor::Reactor)> {
628
95
        if self.is_closing() {
629
            return Err(ChannelClosed.into());
630
95
        }
631
95

            
632
95
        // TODO: blocking is risky, but so is unbounded.
633
95
        let (sender, receiver) = mpsc::channel(128);
634
95
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
635
95

            
636
95
        let (tx, rx) = oneshot::channel();
637
95
        self.send_control(CtrlMsg::AllocateCircuit {
638
95
            created_sender: createdsender,
639
95
            sender,
640
95
            tx,
641
95
        })?;
642
114
        let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
643

            
644
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
645

            
646
95
        Ok(circuit::PendingClientCirc::new(
647
95
            id,
648
95
            self.clone(),
649
95
            createdreceiver,
650
95
            receiver,
651
95
            circ_unique_id,
652
95
        ))
653
95
    }
654

            
655
    /// Shut down this channel immediately, along with all circuits that
656
    /// are using it.
657
    ///
658
    /// Note that other references to this channel may exist.  If they
659
    /// do, they will stop working after you call this function.
660
    ///
661
    /// It's not necessary to call this method if you're just done
662
    /// with a channel: the channel should close on its own once nothing
663
    /// is using it any more.
664
    pub fn terminate(&self) {
665
        let _ = self.send_control(CtrlMsg::Shutdown);
666
    }
667

            
668
    /// Tell the reactor that the circuit with the given ID has gone away.
669
    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
670
        self.send_control(CtrlMsg::CloseCircuit(circid))?;
671
        Ok(())
672
    }
673

            
674
    /// Return a future that will resolve once this channel has closed.
675
    ///
676
    /// Note that this method does not _cause_ the channel to shut down on its own.
677
    ///
678
    /// TODO: Perhaps this should return some kind of status indication instead
679
    /// of just ().
680
    #[cfg(feature = "experimental-api")]
681
    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
682
        self.details.reactor_closed_rx.clone().map(|_| ())
683
    }
684

            
685
    /// Make a new fake reactor-less channel.  For testing only, obviously.
686
    ///
687
    /// Returns the receiver end of the control message mpsc.
688
    ///
689
    /// Suitable for external callers who want to test behaviour
690
    /// of layers including the logic in the channel frontend
691
    /// (`Channel` object methods).
692
    //
693
    // This differs from test::fake_channel as follows:
694
    //  * It returns the mpsc Receiver
695
    //  * It does not require explicit specification of details
696
    #[cfg(feature = "testing")]
697
    pub fn new_fake() -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
698
        let (control, control_recv) = mpsc::unbounded();
699
        let details = fake_channel_details();
700
        let channel = Channel {
701
            control,
702
            cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
703
            details,
704
        };
705
        (channel, control_recv)
706
    }
707
}
708

            
709
/// If there is any identity in `wanted_ident` that is not present in
710
/// `my_ident`, return a ChanMismatch error.
711
///
712
/// This is a helper for [`Channel::check_match`] and
713
/// [`UnverifiedChannel::check_internal`].
714
147
fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
715
147
where
716
147
    T: HasRelayIds + ?Sized,
717
147
    U: HasRelayIds + ?Sized,
718
147
{
719
256
    for desired in wanted_ident.identities() {
720
256
        let id_type = desired.id_type();
721
256
        match my_ident.identity(id_type) {
722
256
            Some(actual) if actual == desired => {}
723
            Some(actual) => {
724
                return Err(Error::ChanMismatch(format!(
725
                    "Identity {} does not match target {}",
726
                    sv(actual),
727
                    sv(desired)
728
                )));
729
            }
730
            None => {
731
                return Err(Error::ChanMismatch(format!(
732
                    "Peer does not have {} identity",
733
                    id_type
734
                )))
735
            }
736
        }
737
    }
738
147
    Ok(())
739
147
}
740

            
741
impl HasRelayIds for Channel {
742
294
    fn identity(
743
294
        &self,
744
294
        key_type: tor_linkspec::RelayIdType,
745
294
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
746
294
        self.details.peer_id.identity(key_type)
747
294
    }
748
}
749

            
750
/// Make some fake channel details (for testing only!)
751
#[cfg(any(test, feature = "testing"))]
752
fn fake_channel_details() -> Arc<ChannelDetails> {
753
    let unique_id = UniqId::new();
754
    let unused_since = OptTimestamp::new();
755
    let peer_id = OwnedChanTarget::builder()
756
        .ed_identity([6_u8; 32].into())
757
        .rsa_identity([10_u8; 20].into())
758
        .build()
759
        .expect("Couldn't construct peer id");
760
    let (_tx, rx) = oneshot::channel(); // This will make rx trigger immediately.
761

            
762
    Arc::new(ChannelDetails {
763
        unique_id,
764
        peer_id,
765
        closed: AtomicBool::new(false),
766
        reactor_closed_rx: rx.shared(),
767
        unused_since,
768
        clock_skew: ClockSkew::None,
769
        opened_at: coarsetime::Instant::now(),
770
        mutable: Default::default(),
771
    })
772
}
773

            
774
#[cfg(test)]
775
pub(crate) mod test {
776
    // Most of this module is tested via tests that also check on the
777
    // reactor code; there are just a few more cases to examine here.
778
    #![allow(clippy::unwrap_used)]
779
    use super::*;
780
    use crate::channel::codec::test::MsgBuf;
781
    pub(crate) use crate::channel::reactor::test::new_reactor;
782
    use tor_cell::chancell::msg::HandshakeType;
783
    use tor_cell::chancell::{msg, AnyChanCell};
784
    use tor_rtcompat::PreferredRuntime;
785

            
786
    /// Make a new fake reactor-less channel.  For testing only, obviously.
787
    pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
788
        Channel {
789
            control: mpsc::unbounded().0,
790
            cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
791
            details,
792
        }
793
    }
794

            
795
    #[test]
796
    fn send_bad() {
797
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
798
            use std::error::Error;
799
            let chan = fake_channel(fake_channel_details());
800

            
801
            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
802
            let e = chan.check_cell(&cell);
803
            assert!(e.is_err());
804
            assert!(format!("{}", e.unwrap_err().source().unwrap())
805
                .contains("Can't send CREATED2 cell on client channel"));
806
            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
807
            let e = chan.check_cell(&cell);
808
            assert!(e.is_err());
809
            assert!(format!("{}", e.unwrap_err().source().unwrap())
810
                .contains("Can't send CERTS cell after handshake is done"));
811

            
812
            let cell = AnyChanCell::new(
813
                CircId::new(5),
814
                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
815
            );
816
            let e = chan.check_cell(&cell);
817
            assert!(e.is_ok());
818
            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
819
            // let got = output.next().await.unwrap();
820
            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
821
        });
822
    }
823

            
824
    #[test]
825
    fn chanbuilder() {
826
        let rt = PreferredRuntime::create().unwrap();
827
        let mut builder = ChannelBuilder::default();
828
        builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec!["127.0.0.1:9001"
829
            .parse()
830
            .unwrap()]));
831
        let tls = MsgBuf::new(&b""[..]);
832
        let _outbound = builder.launch(tls, rt);
833
    }
834

            
835
    #[test]
836
    fn check_match() {
837
        let chan = fake_channel(fake_channel_details());
838

            
839
        let t1 = OwnedChanTarget::builder()
840
            .ed_identity([6; 32].into())
841
            .rsa_identity([10; 20].into())
842
            .build()
843
            .unwrap();
844
        let t2 = OwnedChanTarget::builder()
845
            .ed_identity([1; 32].into())
846
            .rsa_identity([3; 20].into())
847
            .build()
848
            .unwrap();
849
        let t3 = OwnedChanTarget::builder()
850
            .ed_identity([3; 32].into())
851
            .rsa_identity([2; 20].into())
852
            .build()
853
            .unwrap();
854

            
855
        assert!(chan.check_match(&t1).is_ok());
856
        assert!(chan.check_match(&t2).is_err());
857
        assert!(chan.check_match(&t3).is_err());
858
    }
859

            
860
    #[test]
861
    fn unique_id() {
862
        let ch1 = fake_channel(fake_channel_details());
863
        let ch2 = fake_channel(fake_channel_details());
864
        assert_ne!(ch1.unique_id(), ch2.unique_id());
865
    }
866

            
867
    #[test]
868
    fn duration_unused_at() {
869
        let details = fake_channel_details();
870
        let ch = fake_channel(Arc::clone(&details));
871
        details.unused_since.update();
872
        assert!(ch.duration_unused().is_some());
873
    }
874
}