1
//! Code to handle incoming cells on a channel.
2
//!
3
//! The role of this code is to run in a separate asynchronous task,
4
//! and routes cells to the right circuits.
5
//!
6
//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7
//! or in the error handling behavior.
8

            
9
use super::circmap::{CircEnt, CircMap};
10
use super::OpenChanCellS2C;
11
use crate::channel::OpenChanMsgS2C;
12
use crate::tunnel::circuit::halfcirc::HalfCirc;
13
use crate::util::err::ReactorError;
14
use crate::util::oneshot_broadcast;
15
use crate::{Error, Result};
16
use tor_async_utils::SinkPrepareExt as _;
17
use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18
use tor_cell::chancell::ChanMsg;
19
use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20
use tor_memquota::mq_queue;
21
use tor_rtcompat::SleepProvider;
22

            
23
#[cfg_attr(not(target_os = "linux"), allow(unused))]
24
use tor_error::error_report;
25
#[cfg_attr(not(target_os = "linux"), allow(unused))]
26
use tor_rtcompat::StreamOps;
27

            
28
use futures::channel::mpsc;
29
use oneshot_fused_workaround as oneshot;
30

            
31
use futures::sink::SinkExt;
32
use futures::stream::Stream;
33
use futures::Sink;
34
use futures::StreamExt as _;
35
use futures::{select, select_biased};
36
use tor_error::internal;
37

            
38
use std::fmt;
39
use std::pin::Pin;
40
use std::sync::Arc;
41

            
42
use crate::channel::{
43
    codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
44
};
45
use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
46
use tracing::{debug, trace};
47

            
48
/// A boxed trait object that can provide `ChanCell`s.
49
pub(super) type BoxedChannelStream = Box<
50
    dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
51
>;
52
/// A boxed trait object that can sink `ChanCell`s.
53
pub(super) type BoxedChannelSink =
54
    Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
55
/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
56
pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
57
/// The type of a oneshot channel used to inform reactor users of the result of an operation.
58
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
59

            
60
/// Convert `err` to an Error, under the assumption that it's happening on an
61
/// open channel.
62
114
fn codec_err_to_chan(err: CodecError) -> Error {
63
114
    match err {
64
        CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
65
        CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
66
114
        CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
67
    }
68
114
}
69

            
70
/// A message telling the channel reactor to do something.
71
#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72
#[derive(Debug)]
73
#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
74
#[allow(clippy::exhaustive_enums)]
75
pub enum CtrlMsg {
76
    /// Shut down the reactor.
77
    Shutdown,
78
    /// Tell the reactor that a given circuit has gone away.
79
    CloseCircuit(CircId),
80
    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
81
    /// and registering senders for messages received for the circuit.
82
    AllocateCircuit {
83
        /// Channel to send the circuit's `CreateResponse` down.
84
        created_sender: oneshot::Sender<CreateResponse>,
85
        /// Channel to send other messages from this circuit down.
86
        sender: CircuitRxSender,
87
        /// Oneshot channel to send the new circuit's identifiers down.
88
        tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
89
    },
90
    /// Enable/disable/reconfigure channel padding
91
    ///
92
    /// The sender of these messages is responsible for the optimisation of
93
    /// ensuring that "no-change" messages are elided.
94
    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
95
    ///
96
    /// These updates are done via a control message to avoid adding additional branches to the
97
    /// main reactor `select!`.
98
    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
99
    /// Enable/disable/reconfigure KIST.
100
    ///
101
    /// Like in the case of `ConfigUpdate`,
102
    /// the sender of these messages is responsible for the optimisation of
103
    /// ensuring that "no-change" messages are elided.
104
    KistConfigUpdate(KistParams),
105
}
106

            
107
/// Object to handle incoming cells and background tasks on a channel.
108
///
109
/// This type is returned when you finish a channel; you need to spawn a
110
/// new task that calls `run()` on it.
111
#[must_use = "If you don't call run() on a reactor, the channel won't work."]
112
pub struct Reactor<S: SleepProvider> {
113
    /// A receiver for control messages from `Channel` objects.
114
    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
115
    /// A oneshot sender that is used to alert other tasks when this reactor is
116
    /// finally dropped.
117
    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
118
    /// A receiver for cells to be sent on this reactor's sink.
119
    ///
120
    /// `Channel` objects have a sender that can send cells here.
121
    pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
122
    /// A Stream from which we can read `ChanCell`s.
123
    ///
124
    /// This should be backed by a TLS connection if you want it to be secure.
125
    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
126
    /// A Sink to which we can write `ChanCell`s.
127
    ///
128
    /// This should also be backed by a TLS connection if you want it to be secure.
129
    pub(super) output: BoxedChannelSink,
130
    /// A handler for setting stream options on the underlying stream.
131
    #[cfg_attr(not(target_os = "linux"), allow(unused))]
132
    pub(super) streamops: BoxedChannelStreamOps,
133
    /// Timer tracking when to generate channel padding
134
    pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
135
    /// Outgoing cells introduced at the channel reactor
136
    pub(super) special_outgoing: SpecialOutgoing,
137
    /// A map from circuit ID to Sinks on which we can deliver cells.
138
    pub(super) circs: CircMap,
139
    /// A unique identifier for this channel.
140
    pub(super) unique_id: super::UniqId,
141
    /// Information shared with the frontend
142
    pub(super) details: Arc<ChannelDetails>,
143
    /// Context for allocating unique circuit log identifiers.
144
    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
145
    /// What link protocol is the channel using?
146
    #[allow(dead_code)] // We don't support protocols where this would matter
147
    pub(super) link_protocol: u16,
148
}
149

            
150
/// Outgoing cells introduced at the channel reactor
151
#[derive(Default, Debug, Clone)]
152
pub(super) struct SpecialOutgoing {
153
    /// If we must send a `PaddingNegotiate`
154
    pub(super) padding_negotiate: Option<PaddingNegotiate>,
155
}
156

            
157
impl SpecialOutgoing {
158
    /// Do we have a special cell to send?
159
    ///
160
    /// Called by the reactor before looking for cells from the reactor's clients.
161
    /// The returned message *must* be sent by the caller, not dropped!
162
    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
163
4574
    pub(super) fn next(&mut self) -> Option<AnyChanCell> {
164
        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
165
        // enum, so that we can fast-path the usual case of "no special message to send".
166
4574
        if let Some(p) = self.padding_negotiate.take() {
167
            return Some(p.into());
168
4574
        }
169
4574
        None
170
4574
    }
171
}
172

            
173
/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
174
///
175
/// There is no risk of confusion because no-one would try to print a
176
/// Reactor for some other reason.
177
impl<S: SleepProvider> fmt::Display for Reactor<S> {
178
724
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179
724
        fmt::Debug::fmt(&self.unique_id, f)
180
724
    }
181
}
182

            
183
impl<S: SleepProvider> Reactor<S> {
184
    /// Launch the reactor, and run until the channel closes or we
185
    /// encounter an error.
186
    ///
187
    /// Once this function returns, the channel is dead, and can't be
188
    /// used again.
189
290
    pub async fn run(mut self) -> Result<()> {
190
290
        trace!(channel_id = %self, "Running reactor");
191
290
        let result: Result<()> = loop {
192
4500
            match self.run_once().await {
193
4210
                Ok(()) => (),
194
168
                Err(ReactorError::Shutdown) => break Ok(()),
195
122
                Err(ReactorError::Err(e)) => break Err(e),
196
            }
197
        };
198
290
        debug!(channel_id = %self, "Reactor stopped");
199
        // Inform any waiters that the channel has closed.
200
290
        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
201
290
        self.reactor_closed_tx.send(close_msg);
202
290
        result
203
290
    }
204

            
205
    /// Helper for run(): handles only one action.
206
4804
    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
207
4804
        select! {
208

            
209
            // See if the output sink can have cells written to it yet.
210
            // If so, see if we have to-be-transmitted cells.
211
4804
            ret = self.output.prepare_send_from(async {
212
                // This runs if we will be able to write, so try to obtain a cell:
213

            
214
4574
                if let Some(l) = self.special_outgoing.next() {
215
                    // See reasoning below.
216
                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
217
                    self.padding_timer.as_mut().note_cell_sent();
218
                    return Some(l)
219
4574
                }
220
4574

            
221
4574
                select_biased! {
222
4574
                    n = self.cells.next() => {
223
                        // Note transmission on *input* to the reactor, not ultimate
224
                        // transmission.  Ideally we would tap into the TCP stream at the far
225
                        // end of our TLS or perhaps during encoding on entry to the TLS, but
226
                        // both of those would involve quite some plumbing.  Doing it here in
227
                        // the reactor avoids additional inter-task communication, mutexes,
228
                        // etc.  (And there is no real difference between doing it here on
229
                        // input, to just below, on enquieing into the `sendable`.)
230
                        //
231
                        // Padding is sent when the output channel is idle, and the effect of
232
                        // buffering is just that we might sent it a little early because we
233
                        // measure idleness when we last put something into the output layers.
234
                        //
235
                        // We can revisit this if measurement shows it to be bad in practice.
236
                        //
237
                        // (We in any case need padding that we generate when idle to make it
238
                        // through to the output promptly, or it will be late and ineffective.)
239
4206
                        self.padding_timer.as_mut().note_cell_sent();
240
4206
                        n
241
                    },
242
4574
                    p = self.padding_timer.as_mut().next() => {
243
                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
244
                        Some(p.into())
245
                    },
246
                }
247
4804
            }) => {
248
4278
                let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
249
4206
                let msg = msg.ok_or(ReactorError::Shutdown)?;
250
4154
                sendable.send(msg).map_err(codec_err_to_chan)?;
251
            }
252

            
253
4804
            ret = self.control.next() => {
254
138
                let ctrl = match ret {
255
56
                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
256
114
                    Some(x) => x,
257
114
                };
258
114
                self.handle_control(ctrl).await?;
259
            }
260

            
261
4804
            ret = self.input.next() => {
262
356
                let item = ret
263
356
                    .ok_or(ReactorError::Shutdown)?
264
288
                    .map_err(codec_err_to_chan)?;
265
288
                crate::note_incoming_traffic();
266
288
                self.handle_cell(item).await?;
267
            }
268

            
269
        }
270
4458
        Ok(()) // Run again.
271
4804
    }
272

            
273
    /// Handle a CtrlMsg other than Shutdown.
274
114
    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
275
114
        trace!(
276
            channel_id = %self,
277
            msg = ?msg,
278
            "reactor received control message"
279
        );
280

            
281
114
        match msg {
282
            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
283
106
            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
284
            CtrlMsg::AllocateCircuit {
285
8
                created_sender,
286
8
                sender,
287
8
                tx,
288
8
            } => {
289
8
                let mut rng = rand::rng();
290
8
                let my_unique_id = self.unique_id;
291
8
                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
292
8
                let ret: Result<_> = self
293
8
                    .circs
294
8
                    .add_ent(&mut rng, created_sender, sender)
295
8
                    .map(|id| (id, circ_unique_id));
296
8
                let _ = tx.send(ret); // don't care about other side going away
297
8
                self.update_disused_since();
298
8
            }
299
            CtrlMsg::ConfigUpdate(updates) => {
300
                if self.link_protocol == 4 {
301
                    // Link protocol 4 does not permit sending, or negotiating, link padding.
302
                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
303
                    // keep doing padding things.
304
                    return Ok(());
305
                }
306

            
307
                let ChannelPaddingInstructionsUpdates {
308
                    // List all the fields explicitly; that way the compiler will warn us
309
                    // if one is added and we fail to handle it here.
310
                    padding_enable,
311
                    padding_parameters,
312
                    padding_negotiate,
313
                } = &*updates;
314
                if let Some(parameters) = padding_parameters {
315
                    self.padding_timer.as_mut().reconfigure(parameters)?;
316
                }
317
                if let Some(enable) = padding_enable {
318
                    if *enable {
319
                        self.padding_timer.as_mut().enable();
320
                    } else {
321
                        self.padding_timer.as_mut().disable();
322
                    }
323
                }
324
                if let Some(padding_negotiate) = padding_negotiate {
325
                    // This replaces any previous PADDING_NEGOTIATE cell that we were
326
                    // told to send, but which we didn't manage to send yet.
327
                    // It doesn't make sense to queue them up.
328
                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
329
                }
330
            }
331
            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
332
        }
333
74
        Ok(())
334
114
    }
335

            
336
    /// Helper: process a cell on a channel.  Most cell types get ignored
337
    /// or rejected; a few get delivered to circuits.
338
288
    async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
339
288
        let (circid, msg) = cell.into_circid_and_msg();
340
        use OpenChanMsgS2C::*;
341

            
342
288
        match msg {
343
240
            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
344
48
            _ => trace!(
345
                channel_id = %self,
346
                "received {} for {}",
347
                msg.cmd(),
348
                CircId::get_or_zero(circid)
349
            ),
350
        }
351

            
352
288
        match msg {
353
            // These are allowed, and need to be handled.
354
240
            Relay(_) => self.deliver_relay(circid, msg.into()).await,
355

            
356
32
            Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
357

            
358
16
            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
359

            
360
            // These are always ignored.
361
            Padding(_) | Vpadding(_) => Ok(()),
362
        }
363
288
    }
364

            
365
    /// Give the RELAY cell `msg` to the appropriate circuit.
366
240
    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
367
240
        let Some(circid) = circid else {
368
            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
369
        };
370

            
371
240
        let mut ent = self
372
240
            .circs
373
240
            .get_mut(circid)
374
240
            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
375

            
376
224
        match &mut *ent {
377
8
            CircEnt::Open(s) => {
378
8
                // There's an open circuit; we can give it the RELAY cell.
379
8
                if s.send(msg.try_into()?).await.is_err() {
380
                    drop(ent);
381
                    // The circuit's receiver went away, so we should destroy the circuit.
382
                    self.outbound_destroy_circ(circid).await?;
383
8
                }
384
8
                Ok(())
385
            }
386
8
            CircEnt::Opening(_, _) => Err(Error::ChanProto(
387
8
                "Relay cell on pending circuit before CREATED* received".into(),
388
8
            )),
389
208
            CircEnt::DestroySent(hs) => hs.receive_cell(),
390
        }
391
240
    }
392

            
393
    /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
394
    /// circuit, if that circuit is waiting for one.
395
16
    async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
396
16
        let Some(circid) = circid else {
397
            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
398
        };
399

            
400
16
        let target = self.circs.advance_from_opening(circid)?;
401
        let created = msg.try_into()?;
402
        // TODO(nickm) I think that this one actually means the other side
403
        // is closed. See arti#269.
404
        target.send(created).map_err(|_| {
405
            Error::from(internal!(
406
                "Circuit queue rejected created message. Is it closing?"
407
            ))
408
        })
409
16
    }
410

            
411
    /// Handle a DESTROY cell by removing the corresponding circuit
412
    /// from the map, and passing the destroy cell onward to the circuit.
413
32
    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
414
32
        let Some(circid) = circid else {
415
            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
416
        };
417

            
418
        // Remove the circuit from the map: nothing more can be done with it.
419
32
        let entry = self.circs.remove(circid);
420
32
        self.update_disused_since();
421
24
        match entry {
422
            // If the circuit is waiting for CREATED, tell it that it
423
            // won't get one.
424
8
            Some(CircEnt::Opening(oneshot, _)) => {
425
8
                trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
426
8
                oneshot
427
8
                    .send(msg.try_into()?)
428
                    // TODO(nickm) I think that this one actually means the other side
429
                    // is closed. See arti#269.
430
8
                    .map_err(|_| {
431
                        internal!("pending circuit wasn't interested in destroy cell?").into()
432
8
                    })
433
            }
434
            // It's an open circuit: tell it that it got a DESTROY cell.
435
8
            Some(CircEnt::Open(mut sink)) => {
436
8
                trace!(channel_id = %self, "Passing destroy to open circuit {}", circid);
437
8
                sink.send(msg.try_into()?)
438
8
                    .await
439
                    // TODO(nickm) I think that this one actually means the other side
440
                    // is closed. See arti#269.
441
8
                    .map_err(|_| {
442
                        internal!("open circuit wasn't interested in destroy cell?").into()
443
8
                    })
444
            }
445
            // We've sent a destroy; we can leave this circuit removed.
446
8
            Some(CircEnt::DestroySent(_)) => Ok(()),
447
            // Got a DESTROY cell for a circuit we don't have.
448
            None => {
449
8
                trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
450
8
                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
451
            }
452
        }
453
32
    }
454

            
455
    /// Helper: send a cell on the outbound sink.
456
106
    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
457
106
        self.output.send(cell).await.map_err(codec_err_to_chan)?;
458
66
        Ok(())
459
106
    }
460

            
461
    /// Called when a circuit goes away: sends a DESTROY cell and removes
462
    /// the circuit.
463
106
    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
464
106
        trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
465
        // Remove the circuit's entry from the map: nothing more
466
        // can be done with it.
467
        // TODO: It would be great to have a tighter upper bound for
468
        // the number of relay cells we'll receive.
469
106
        self.circs.destroy_sent(id, HalfCirc::new(3000));
470
106
        self.update_disused_since();
471
106
        let destroy = Destroy::new(DestroyReason::NONE).into();
472
106
        let cell = AnyChanCell::new(Some(id), destroy);
473
106
        self.send_cell(cell).await?;
474

            
475
66
        Ok(())
476
106
    }
477

            
478
    /// Update disused timestamp with current time if this channel is no longer used
479
146
    fn update_disused_since(&self) {
480
146
        if self.circs.open_ent_count() == 0 {
481
138
            // Update disused_since if it still indicates that the channel is in use
482
138
            self.details.unused_since.update_if_none();
483
138
        } else {
484
8
            // Mark this channel as in use
485
8
            self.details.unused_since.clear();
486
8
        }
487
146
    }
488

            
489
    /// Use the new KIST parameters.
490
    #[cfg(target_os = "linux")]
491
    fn apply_kist_params(&self, params: &KistParams) {
492
        use super::kist::KistMode;
493

            
494
        let set_tcp_notsent_lowat = |v: u32| {
495
            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
496
                // This is bad, but not fatal: not setting the KIST options
497
                // comes with a performance penalty, but we don't have to crash.
498
                error_report!(e, "Failed to set KIST socket options");
499
            }
500
        };
501

            
502
        match params.kist_enabled() {
503
            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
504
            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
505
        }
506
    }
507

            
508
    #[cfg(not(target_os = "linux"))]
509
    fn apply_kist_params(&self, params: &KistParams) {
510
        use super::kist::KistMode;
511

            
512
        if params.kist_enabled() != KistMode::Disabled {
513
            tracing::warn!("KIST not currently supported on non-linux platforms");
514
        }
515
    }
516
}
517

            
518
#[cfg(test)]
519
pub(crate) mod test {
520
    #![allow(clippy::unwrap_used)]
521
    use super::*;
522
    use crate::channel::{ClosedUnexpectedly, UniqId};
523
    use crate::fake_mpsc;
524
    use crate::tunnel::circuit::CircParameters;
525
    use crate::util::fake_mq;
526
    use futures::sink::SinkExt;
527
    use futures::stream::StreamExt;
528
    use futures::task::SpawnExt;
529
    use tor_cell::chancell::msg;
530
    use tor_linkspec::OwnedChanTarget;
531
    use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
532

            
533
    type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
534

            
535
    pub(crate) fn new_reactor<R: Runtime>(
536
        runtime: R,
537
    ) -> (
538
        Arc<crate::channel::Channel>,
539
        Reactor<R>,
540
        mpsc::Receiver<AnyChanCell>,
541
        mpsc::Sender<CodecResult>,
542
    ) {
543
        let link_protocol = 4;
544
        let (send1, recv1) = mpsc::channel(32);
545
        let (send2, recv2) = mpsc::channel(32);
546
        let unique_id = UniqId::new();
547
        let dummy_target = OwnedChanTarget::builder()
548
            .ed_identity([6; 32].into())
549
            .rsa_identity([10; 20].into())
550
            .build()
551
            .unwrap();
552
        let send1 = send1.sink_map_err(|e| {
553
            trace!("got sink error: {:?}", e);
554
            CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
555
        });
556
        let stream_ops = NoOpStreamOpsHandle::default();
557
        let (chan, reactor) = crate::channel::Channel::new(
558
            link_protocol,
559
            Box::new(send1),
560
            Box::new(recv2),
561
            Box::new(stream_ops),
562
            unique_id,
563
            dummy_target,
564
            crate::ClockSkew::None,
565
            runtime,
566
            fake_mq(),
567
        )
568
        .expect("channel create failed");
569
        (chan, reactor, recv1, send2)
570
    }
571

            
572
    // Try shutdown from inside run_once..
573
    #[test]
574
    fn shutdown() {
575
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
576
            let (chan, mut reactor, _output, _input) = new_reactor(rt);
577

            
578
            chan.terminate();
579
            let r = reactor.run_once().await;
580
            assert!(matches!(r, Err(ReactorError::Shutdown)));
581
        });
582
    }
583

            
584
    // Try shutdown while reactor is running.
585
    #[test]
586
    fn shutdown2() {
587
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
588
            // TODO: Ask a rust person if this is how to do this.
589

            
590
            use futures::future::FutureExt;
591
            use futures::join;
592

            
593
            let (chan, reactor, _output, _input) = new_reactor(rt);
594
            // Let's get the reactor running...
595
            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
596

            
597
            let rr = run_reactor.clone();
598

            
599
            let exit_then_check = async {
600
                assert!(rr.peek().is_none());
601
                // ... and terminate the channel while that's happening.
602
                chan.terminate();
603
            };
604

            
605
            let (rr_s, _) = join!(run_reactor, exit_then_check);
606

            
607
            // Now let's see. The reactor should not _still_ be running.
608
            assert!(rr_s);
609
        });
610
    }
611

            
612
    #[test]
613
    fn new_circ_closed() {
614
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
615
            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
616
            assert!(chan.duration_unused().is_some()); // unused yet
617

            
618
            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
619
            let (pending, circr) = ret.unwrap();
620
            rt.spawn(async {
621
                let _ignore = circr.run().await;
622
            })
623
            .unwrap();
624
            assert!(reac.is_ok());
625

            
626
            let id = pending.peek_circid();
627

            
628
            let ent = reactor.circs.get_mut(id);
629
            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
630
            assert!(chan.duration_unused().is_none()); // in use
631

            
632
            // Now drop the circuit; this should tell the reactor to remove
633
            // the circuit from the map.
634
            drop(pending);
635

            
636
            reactor.run_once().await.unwrap();
637
            let ent = reactor.circs.get_mut(id);
638
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
639
            let cell = output.next().await.unwrap();
640
            assert_eq!(cell.circid(), Some(id));
641
            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
642
            assert!(chan.duration_unused().is_some()); // unused again
643
        });
644
    }
645

            
646
    // Test proper delivery of a created cell that doesn't make a channel
647
    #[test]
648
    #[ignore] // See bug #244: re-enable this test once it passes reliably.
649
    fn new_circ_create_failure() {
650
        use std::time::Duration;
651
        use tor_rtcompat::SleepProvider;
652

            
653
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
654
            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
655

            
656
            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
657
            let (pending, circr) = ret.unwrap();
658
            rt.spawn(async {
659
                let _ignore = circr.run().await;
660
            })
661
            .unwrap();
662
            assert!(reac.is_ok());
663

            
664
            let circparams = CircParameters::default();
665

            
666
            let id = pending.peek_circid();
667

            
668
            let ent = reactor.circs.get_mut(id);
669
            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
670

            
671
            #[allow(clippy::clone_on_copy)]
672
            let rtc = rt.clone();
673
            let send_response = async {
674
                rtc.sleep(Duration::from_millis(100)).await;
675
                trace!("sending createdfast");
676
                // We'll get a bad handshake result from this createdfast cell.
677
                let created_cell =
678
                    OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
679
                input.send(Ok(created_cell)).await.unwrap();
680
                reactor.run_once().await.unwrap();
681
            };
682

            
683
            let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
684
            // Make sure statuses are as expected.
685
            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
686

            
687
            reactor.run_once().await.unwrap();
688

            
689
            // Make sure that the createfast cell got sent
690
            let cell_sent = output.next().await.unwrap();
691
            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
692

            
693
            // But the next run if the reactor will make the circuit get closed.
694
            let ent = reactor.circs.get_mut(id);
695
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
696
        });
697
    }
698

            
699
    // Try incoming cells that shouldn't arrive on channels.
700
    #[test]
701
    fn bad_cells() {
702
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
703
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
704

            
705
            // shouldn't get created2 cells for nonexistent circuits
706
            let created2_cell = msg::Created2::new(*b"hihi").into();
707
            input
708
                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
709
                .await
710
                .unwrap();
711

            
712
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
713
            assert_eq!(
714
                format!("{}", e),
715
                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
716
            );
717

            
718
            // Can't get a relay cell on a circuit we've never heard of.
719
            let relay_cell = msg::Relay::new(b"abc").into();
720
            input
721
                .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
722
                .await
723
                .unwrap();
724
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
725
            assert_eq!(
726
                format!("{}", e),
727
                "Channel protocol violation: Relay cell on nonexistent circuit"
728
            );
729

            
730
            // There used to be tests here for other types, but now that we only
731
            // accept OpenClientChanCell, we know that the codec can't even try
732
            // to give us e.g. VERSIONS or CREATE.
733
        });
734
    }
735

            
736
    #[test]
737
    fn deliver_relay() {
738
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
739
            use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
740
            use oneshot_fused_workaround as oneshot;
741

            
742
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
743

            
744
            let (_circ_stream_7, mut circ_stream_13) = {
745
                let (snd1, _rcv1) = oneshot::channel();
746
                let (snd2, rcv2) = fake_mpsc(64);
747
                reactor
748
                    .circs
749
                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
750

            
751
                let (snd3, rcv3) = fake_mpsc(64);
752
                reactor
753
                    .circs
754
                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
755

            
756
                reactor.circs.put_unchecked(
757
                    CircId::new(23).unwrap(),
758
                    CircEnt::DestroySent(HalfCirc::new(25)),
759
                );
760
                (rcv2, rcv3)
761
            };
762

            
763
            // If a relay cell is sent on an open channel, the correct circuit
764
            // should get it.
765
            let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
766
            input
767
                .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
768
                .await
769
                .unwrap();
770
            reactor.run_once().await.unwrap();
771
            let got = circ_stream_13.next().await.unwrap();
772
            assert!(matches!(got, ClientCircChanMsg::Relay(_)));
773

            
774
            // If a relay cell is sent on an opening channel, that's an error.
775
            input
776
                .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
777
                .await
778
                .unwrap();
779
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
780
            assert_eq!(
781
                format!("{}", e),
782
                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
783
            );
784

            
785
            // If a relay cell is sent on a non-existent channel, that's an error.
786
            input
787
                .send(Ok(OpenChanCellS2C::new(
788
                    CircId::new(101),
789
                    relaycell.clone(),
790
                )))
791
                .await
792
                .unwrap();
793
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
794
            assert_eq!(
795
                format!("{}", e),
796
                "Channel protocol violation: Relay cell on nonexistent circuit"
797
            );
798

            
799
            // It's fine to get a relay cell on a DestroySent channel: that happens
800
            // when the other side hasn't noticed the Destroy yet.
801

            
802
            // We can do this 25 more times according to our setup:
803
            for _ in 0..25 {
804
                input
805
                    .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
806
                    .await
807
                    .unwrap();
808
                reactor.run_once().await.unwrap(); // should be fine.
809
            }
810

            
811
            // This one will fail.
812
            input
813
                .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
814
                .await
815
                .unwrap();
816
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
817
            assert_eq!(
818
                format!("{}", e),
819
                "Channel protocol violation: Too many cells received on destroyed circuit"
820
            );
821
        });
822
    }
823

            
824
    #[test]
825
    fn deliver_destroy() {
826
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
827
            use crate::tunnel::circuit::celltypes::*;
828
            use oneshot_fused_workaround as oneshot;
829

            
830
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
831

            
832
            let (circ_oneshot_7, mut circ_stream_13) = {
833
                let (snd1, rcv1) = oneshot::channel();
834
                let (snd2, _rcv2) = fake_mpsc(64);
835
                reactor
836
                    .circs
837
                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
838

            
839
                let (snd3, rcv3) = fake_mpsc(64);
840
                reactor
841
                    .circs
842
                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
843

            
844
                reactor.circs.put_unchecked(
845
                    CircId::new(23).unwrap(),
846
                    CircEnt::DestroySent(HalfCirc::new(25)),
847
                );
848
                (rcv1, rcv3)
849
            };
850

            
851
            // Destroying an opening circuit is fine.
852
            let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
853
            input
854
                .send(Ok(OpenChanCellS2C::new(
855
                    CircId::new(7),
856
                    destroycell.clone(),
857
                )))
858
                .await
859
                .unwrap();
860
            reactor.run_once().await.unwrap();
861
            let msg = circ_oneshot_7.await;
862
            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
863

            
864
            // Destroying an open circuit is fine.
865
            input
866
                .send(Ok(OpenChanCellS2C::new(
867
                    CircId::new(13),
868
                    destroycell.clone(),
869
                )))
870
                .await
871
                .unwrap();
872
            reactor.run_once().await.unwrap();
873
            let msg = circ_stream_13.next().await.unwrap();
874
            assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
875

            
876
            // Destroying a DestroySent circuit is fine.
877
            input
878
                .send(Ok(OpenChanCellS2C::new(
879
                    CircId::new(23),
880
                    destroycell.clone(),
881
                )))
882
                .await
883
                .unwrap();
884
            reactor.run_once().await.unwrap();
885

            
886
            // Destroying a nonexistent circuit is an error.
887
            input
888
                .send(Ok(OpenChanCellS2C::new(
889
                    CircId::new(101),
890
                    destroycell.clone(),
891
                )))
892
                .await
893
                .unwrap();
894
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
895
            assert_eq!(
896
                format!("{}", e),
897
                "Channel protocol violation: Destroy for nonexistent circuit"
898
            );
899
        });
900
    }
901

            
902
    #[test]
903
    fn closing_if_reactor_dropped() {
904
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
905
            let (chan, reactor, _output, _input) = new_reactor(rt);
906

            
907
            assert!(!chan.is_closing());
908
            drop(reactor);
909
            assert!(chan.is_closing());
910

            
911
            assert!(matches!(
912
                chan.wait_for_close().await,
913
                Err(ClosedUnexpectedly::ReactorDropped),
914
            ));
915
        });
916
    }
917

            
918
    #[test]
919
    fn closing_if_reactor_shutdown() {
920
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
921
            let (chan, reactor, _output, _input) = new_reactor(rt);
922

            
923
            assert!(!chan.is_closing());
924
            chan.terminate();
925
            assert!(!chan.is_closing());
926

            
927
            let r = reactor.run().await;
928
            assert!(r.is_ok());
929
            assert!(chan.is_closing());
930

            
931
            assert!(chan.wait_for_close().await.is_ok());
932
        });
933
    }
934

            
935
    #[test]
936
    fn reactor_error_wait_for_close() {
937
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
938
            let (chan, reactor, _output, mut input) = new_reactor(rt);
939

            
940
            // force an error by sending created2 cell for nonexistent circuit
941
            let created2_cell = msg::Created2::new(*b"hihi").into();
942
            input
943
                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
944
                .await
945
                .unwrap();
946

            
947
            // `reactor.run()` should return an error
948
            let run_error = reactor.run().await.unwrap_err();
949

            
950
            // `chan.wait_for_close()` should return the same error
951
            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
952
            else {
953
                panic!("Expected a 'ReactorError'");
954
            };
955

            
956
            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
957
            assert_eq!(run_error.to_string(), wait_error.to_string());
958
        });
959
    }
960
}