1
//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2
//! for byte-oriented communication.
3

            
4
use crate::{Error, Result};
5
use static_assertions::assert_impl_all;
6
use tor_cell::relaycell::msg::EndReason;
7
use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8

            
9
use futures::io::{AsyncRead, AsyncWrite};
10
use futures::stream::StreamExt;
11
use futures::task::{Context, Poll};
12
use futures::{Future, Stream};
13
use pin_project::pin_project;
14
use postage::watch;
15

            
16
#[cfg(feature = "tokio")]
17
use tokio_crate::io::ReadBuf;
18
#[cfg(feature = "tokio")]
19
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20
#[cfg(feature = "tokio")]
21
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22
use tor_cell::restricted_msg;
23

            
24
use std::fmt::Debug;
25
use std::io::Result as IoResult;
26
use std::num::NonZero;
27
use std::pin::Pin;
28
#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29
use std::sync::Arc;
30
#[cfg(feature = "stream-ctrl")]
31
use std::sync::{Mutex, Weak};
32

            
33
use educe::Educe;
34

            
35
#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
36
use crate::tunnel::circuit::ClientCirc;
37

            
38
use crate::memquota::StreamAccount;
39
use crate::stream::xon_xoff::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
40
use crate::stream::{StreamRateLimit, StreamReceiver};
41
use crate::tunnel::StreamTarget;
42
use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
43
use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
44
use tor_basic_utils::skip_fmt;
45
use tor_cell::relaycell::msg::Data;
46
use tor_error::internal;
47
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
48

            
49
use super::AnyCmdChecker;
50

            
51
/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
52
///
53
/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
54
/// [`DataWriter`], which leaves us with this ugly type.
55
/// We use a type alias to make `DataWriter` a little nicer.
56
type RateConfigStream = futures::stream::Map<
57
    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
58
    fn(StreamRateLimit) -> RateLimitedWriterConfig,
59
>;
60

            
61
/// An anonymized stream over the Tor network.
62
///
63
/// For most purposes, you can think of this type as an anonymized
64
/// TCP stream: it can read and write data, and get closed when it's done.
65
///
66
/// [`DataStream`] implements [`futures::io::AsyncRead`] and
67
/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
68
/// traits are expected.
69
///
70
/// # Examples
71
///
72
/// Connecting to an HTTP server and sending a request, using
73
/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
74
///
75
/// ```ignore
76
/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
77
///
78
/// use futures::io::AsyncWriteExt;
79
///
80
/// stream
81
///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
82
///     .await?;
83
///
84
/// // Flushing the stream is important; see below!
85
/// stream.flush().await?;
86
/// ```
87
///
88
/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
89
///
90
/// ```ignore
91
/// use futures::io::AsyncReadExt;
92
///
93
/// let mut buf = Vec::new();
94
/// stream.read_to_end(&mut buf).await?;
95
///
96
/// println!("{}", String::from_utf8_lossy(&buf));
97
/// ```
98
///
99
/// # Usage with Tokio
100
///
101
/// If the `tokio` crate feature is enabled, this type also implements
102
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
103
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
104
/// with code that expects those traits.
105
///
106
/// # Remember to call `flush`!
107
///
108
/// DataStream buffers data internally, in order to write as few cells
109
/// as possible onto the network.  In order to make sure that your
110
/// data has actually been sent, you need to make sure that
111
/// [`AsyncWrite::poll_flush`] runs to completion: probably via
112
/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
113
///
114
/// # Splitting the type
115
///
116
/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
117
/// `DataStream::split` method can be used to split it into those two parts, for more
118
/// convenient usage with e.g. stream combinators.
119
///
120
/// # How long does a stream live?
121
///
122
/// A `DataStream` will live until all references to it are dropped,
123
/// or until it is closed explicitly.
124
///
125
/// If you split the stream into a `DataReader` and a `DataWriter`, it
126
/// will survive until _both_ are dropped, or until it is closed
127
/// explicitly.
128
///
129
/// A stream can also close because of a network error,
130
/// or because the other side of the stream decided to close it.
131
///
132
// # Semver note
133
//
134
// Note that this type is re-exported as a part of the public API of
135
// the `arti-client` crate.  Any changes to its API here in
136
// `tor-proto` need to be reflected above.
137
#[derive(Debug)]
138
pub struct DataStream {
139
    /// Underlying writer for this stream
140
    w: DataWriter,
141
    /// Underlying reader for this stream
142
    r: DataReader,
143
    /// A control object that can be used to monitor and control this stream
144
    /// without needing to own it.
145
    #[cfg(feature = "stream-ctrl")]
146
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
147
}
148
assert_impl_all! { DataStream: Send, Sync }
149

            
150
/// An object used to control and monitor a data stream.
151
///
152
/// # Notes
153
///
154
/// This is a separate type from [`DataStream`] because it's useful to have
155
/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
156
/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
157
/// work correctly.
158
#[cfg(feature = "stream-ctrl")]
159
#[derive(Debug)]
160
pub struct ClientDataStreamCtrl {
161
    /// The circuit to which this stream is attached.
162
    ///
163
    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
164
    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
165
    /// one of those is alive, this reference will be present.
166
    ///
167
    /// We make this a Weak reference so that once the stream itself is closed,
168
    /// we can't leak circuits.
169
    // TODO(conflux): use ClientTunnel
170
    circuit: Weak<ClientCirc>,
171

            
172
    /// Shared user-visible information about the state of this stream.
173
    ///
174
    /// TODO RPC: This will probably want to be a `postage::Watch` or something
175
    /// similar, if and when it stops moving around.
176
    #[cfg(feature = "stream-ctrl")]
177
    status: Arc<Mutex<DataStreamStatus>>,
178

            
179
    /// The memory quota account that should be used for this stream's data
180
    ///
181
    /// Exists to keep the account alive
182
    _memquota: StreamAccount,
183
}
184

            
185
/// The inner writer for [`DataWriter`].
186
///
187
/// This type is responsible for taking bytes and packaging them into cells.
188
/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
189
#[derive(Debug)]
190
struct DataWriterInner {
191
    /// Internal state for this writer
192
    ///
193
    /// This is stored in an Option so that we can mutate it in the
194
    /// AsyncWrite functions.  It might be possible to do better here,
195
    /// and we should refactor if so.
196
    state: Option<DataWriterState>,
197

            
198
    /// The memory quota account that should be used for this stream's data
199
    ///
200
    /// Exists to keep the account alive
201
    // If we liked, we could make this conditional; see DataReaderInner.memquota
202
    _memquota: StreamAccount,
203

            
204
    /// A control object that can be used to monitor and control this stream
205
    /// without needing to own it.
206
    #[cfg(feature = "stream-ctrl")]
207
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
208
}
209

            
210
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
211
///
212
/// See the [`DataStream`] docs for more information. In particular, note
213
/// that this writer requires `poll_flush` to complete in order to guarantee that
214
/// all data has been written.
215
///
216
/// # Usage with Tokio
217
///
218
/// If the `tokio` crate feature is enabled, this type also implements
219
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
220
/// with code that expects that trait.
221
///
222
/// # Drop and close
223
///
224
/// Note that dropping a `DataWriter` has no special effect on its own:
225
/// if the `DataWriter` is dropped, the underlying stream will still remain open
226
/// until the `DataReader` is also dropped.
227
///
228
/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
229
/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
230
///
231
/// Remember that Tor does not support half-open streams:
232
/// If you `close` or `shutdown` a stream,
233
/// the other side will not see the stream as half-open,
234
/// and so will (probably) not finish sending you any in-progress data.
235
/// Do not use `close`/`shutdown` to communicate anything besides
236
/// "I am done using this stream."
237
///
238
// # Semver note
239
//
240
// Note that this type is re-exported as a part of the public API of
241
// the `arti-client` crate.  Any changes to its API here in
242
// `tor-proto` need to be reflected above.
243
#[derive(Debug)]
244
pub struct DataWriter {
245
    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
246
    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
247
}
248

            
249
impl DataWriter {
250
    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
251
84
    fn new(
252
84
        inner: DataWriterInner,
253
84
        rate_limit_updates: watch::Receiver<StreamRateLimit>,
254
84
        time_provider: DynTimeProvider,
255
84
    ) -> Self {
256
        /// Converts a `rate` into a `RateLimitedWriterConfig`.
257
128
        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
258
128
            let rate = rate.bytes_per_sec();
259
128
            RateLimitedWriterConfig {
260
128
                rate,        // bytes per second
261
128
                burst: rate, // bytes
262
128
                // This number is chosen arbitrarily, but the idea is that we want to balance
263
128
                // between throughput and latency. Assume the user tries to write a large buffer
264
128
                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
265
128
                // frequently and writing a small number of bytes each time to the
266
128
                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
267
128
                // too large (for example 510), we'll be waking up infrequently to write a larger
268
128
                // number of bytes each time. So even if the `DataWriterInner` has almost a full
269
128
                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
270
128
                // bytes before a cell can be sent, it will block until the rate limiter allows 510
271
128
                // more bytes.
272
128
                //
273
128
                // TODO(arti#2028): Is there an optimal value here?
274
128
                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
275
128
            }
276
128
        }
277

            
278
        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
279
84
        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
280
84

            
281
84
        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
282
84
        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
283
84

            
284
84
        // build the rate limiter
285
84
        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
286
84
        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
287
84

            
288
84
        Self { writer }
289
84
    }
290

            
291
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
292
    /// interact with this stream without holding the stream itself.
293
    #[cfg(feature = "stream-ctrl")]
294
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
295
        Some(self.writer.inner().client_stream_ctrl())
296
    }
297
}
298

            
299
impl AsyncWrite for DataWriter {
300
5296
    fn poll_write(
301
5296
        mut self: Pin<&mut Self>,
302
5296
        cx: &mut Context<'_>,
303
5296
        buf: &[u8],
304
5296
    ) -> Poll<IoResult<usize>> {
305
5296
        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
306
5296
    }
307

            
308
28
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
309
28
        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
310
28
    }
311

            
312
8
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
313
8
        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
314
8
    }
315
}
316

            
317
#[cfg(feature = "tokio")]
318
impl TokioAsyncWrite for DataWriter {
319
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
320
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
321
    }
322

            
323
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
324
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
325
    }
326

            
327
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
328
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
329
    }
330
}
331

            
332
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
333
///
334
/// See the [`DataStream`] docs for more information.
335
///
336
/// # Usage with Tokio
337
///
338
/// If the `tokio` crate feature is enabled, this type also implements
339
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
340
/// with code that expects that trait.
341
//
342
// # Semver note
343
//
344
// Note that this type is re-exported as a part of the public API of
345
// the `arti-client` crate.  Any changes to its API here in
346
// `tor-proto` need to be reflected above.
347
#[derive(Debug)]
348
pub struct DataReader {
349
    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
350
    reader: XonXoffReader<DataReaderInner>,
351
}
352

            
353
impl DataReader {
354
    /// Create a new [`DataReader`].
355
84
    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
356
84
        Self {
357
84
            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
358
84
        }
359
84
    }
360

            
361
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
362
    /// interact with this stream without holding the stream itself.
363
    #[cfg(feature = "stream-ctrl")]
364
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
365
        Some(self.reader.inner().client_stream_ctrl())
366
    }
367
}
368

            
369
impl AsyncRead for DataReader {
370
198
    fn poll_read(
371
198
        mut self: Pin<&mut Self>,
372
198
        cx: &mut Context<'_>,
373
198
        buf: &mut [u8],
374
198
    ) -> Poll<IoResult<usize>> {
375
198
        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
376
198
    }
377

            
378
    fn poll_read_vectored(
379
        mut self: Pin<&mut Self>,
380
        cx: &mut Context<'_>,
381
        bufs: &mut [std::io::IoSliceMut<'_>],
382
    ) -> Poll<IoResult<usize>> {
383
        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
384
    }
385
}
386

            
387
#[cfg(feature = "tokio")]
388
impl TokioAsyncRead for DataReader {
389
    fn poll_read(
390
        self: Pin<&mut Self>,
391
        cx: &mut Context<'_>,
392
        buf: &mut ReadBuf<'_>,
393
    ) -> Poll<IoResult<()>> {
394
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
395
    }
396
}
397

            
398
/// The inner reader for [`DataReader`].
399
///
400
/// This type is responsible for taking stream messages and extracting the stream data from them.
401
/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
402
#[derive(Debug)]
403
pub(crate) struct DataReaderInner {
404
    /// Internal state for this reader.
405
    ///
406
    /// This is stored in an Option so that we can mutate it in
407
    /// poll_read().  It might be possible to do better here, and we
408
    /// should refactor if so.
409
    state: Option<DataReaderState>,
410

            
411
    /// The memory quota account that should be used for this stream's data
412
    ///
413
    /// Exists to keep the account alive
414
    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
415
    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
416
    _memquota: StreamAccount,
417

            
418
    /// A control object that can be used to monitor and control this stream
419
    /// without needing to own it.
420
    #[cfg(feature = "stream-ctrl")]
421
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
422
}
423

            
424
impl BufferIsEmpty for DataReaderInner {
425
    /// The result will become stale,
426
    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
427
    fn is_empty(mut self: Pin<&mut Self>) -> bool {
428
        match self
429
            .state
430
            .as_mut()
431
            .expect("forgot to put `DataReaderState` back")
432
        {
433
            DataReaderState::Open(imp) => {
434
                // check if the partial cell in `pending` is empty,
435
                // and if the message stream is empty
436
                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
437
            }
438
            // closed, so any data should have been discarded
439
            DataReaderState::Closed => true,
440
        }
441
    }
442
}
443

            
444
/// Shared status flags for tracking the status of as `DataStream`.
445
///
446
/// We expect to refactor this a bit, so it's not exposed at all.
447
//
448
// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
449
// from various points in this module, we should instead construct
450
// DataStreamStatus as needed from information available elsewhere.  In any
451
// case, we should really  eliminate as much duplicate state here as we can.
452
// (See discussions at !1198 for some challenges with this.)
453
#[cfg(feature = "stream-ctrl")]
454
#[derive(Clone, Debug, Default)]
455
struct DataStreamStatus {
456
    /// True if we've received a CONNECTED message.
457
    //
458
    // TODO: This is redundant with `connected` in DataReaderImpl and
459
    // `expecting_connected` in DataCmdChecker.
460
    received_connected: bool,
461
    /// True if we have decided to send an END message.
462
    //
463
    // TODO RPC: There is not an easy way to set this from this module!  Really,
464
    // the decision to send an "end" is made when the StreamTarget object is
465
    // dropped, but we don't currently have any way to see when that happens.
466
    // Perhaps we need a different shared StreamStatus object that the
467
    // StreamTarget holds?
468
    sent_end: bool,
469
    /// True if we have received an END message telling us to close the stream.
470
    received_end: bool,
471
    /// True if we have received an error.
472
    ///
473
    /// (This is not a subset or superset of received_end; some errors are END
474
    /// messages but some aren't; some END messages are errors but some aren't.)
475
    received_err: bool,
476
}
477

            
478
#[cfg(feature = "stream-ctrl")]
479
impl DataStreamStatus {
480
    /// Remember that we've received a connected message.
481
84
    fn record_connected(&mut self) {
482
84
        self.received_connected = true;
483
84
    }
484

            
485
    /// Remember that we've received an error of some kind.
486
20
    fn record_error(&mut self, e: &Error) {
487
20
        // TODO: Probably we should remember the actual error in a box or
488
20
        // something.  But that means making a redundant copy of the error
489
20
        // even if nobody will want it.  Do we care?
490
20
        match e {
491
20
            Error::EndReceived(EndReason::DONE) => self.received_end = true,
492
            Error::EndReceived(_) => {
493
                self.received_end = true;
494
                self.received_err = true;
495
            }
496
            _ => self.received_err = true,
497
        }
498
20
    }
499
}
500

            
501
restricted_msg! {
502
    /// An allowable incoming message on a data stream.
503
    enum DataStreamMsg:RelayMsg {
504
        // SENDME is handled by the reactor.
505
        Data, End, Connected,
506
    }
507
}
508

            
509
// TODO RPC: Should we also implement this trait for everything that holds a
510
// ClientDataStreamCtrl?
511
#[cfg(feature = "stream-ctrl")]
512
impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
513
    // TODO(conflux): use ClientTunnel
514
    fn circuit(&self) -> Option<Arc<ClientCirc>> {
515
        self.circuit.upgrade()
516
    }
517
}
518

            
519
#[cfg(feature = "stream-ctrl")]
520
impl ClientDataStreamCtrl {
521
    /// Return true if the underlying stream is connected. (That is, if it has
522
    /// received a `CONNECTED` message, and has not been closed.)
523
    pub fn is_connected(&self) -> bool {
524
        let s = self.status.lock().expect("poisoned lock");
525
        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
526
    }
527

            
528
    // TODO RPC: Add more functions once we have the desired API more nailed
529
    // down.
530
}
531

            
532
impl DataStream {
533
    /// Wrap raw stream receiver and target parts as a DataStream.
534
    ///
535
    /// For non-optimistic stream, function `wait_for_connection`
536
    /// must be called after to make sure CONNECTED is received.
537
68
    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
538
68
        time_provider: P,
539
68
        receiver: StreamReceiver,
540
68
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
541
68
        target: StreamTarget,
542
68
        memquota: StreamAccount,
543
68
    ) -> Self {
544
68
        Self::new_inner(
545
68
            time_provider,
546
68
            receiver,
547
68
            xon_xoff_reader_ctrl,
548
68
            target,
549
68
            false,
550
68
            memquota,
551
68
        )
552
68
    }
553

            
554
    /// Wrap raw stream receiver and target parts as a connected DataStream.
555
    ///
556
    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
557
    /// CONNECTED cell.
558
    ///
559
    /// This is used by hidden services, exit relays, and directory servers to accept streams.
560
    #[cfg(feature = "hs-service")]
561
16
    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
562
16
        time_provider: P,
563
16
        receiver: StreamReceiver,
564
16
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
565
16
        target: StreamTarget,
566
16
        memquota: StreamAccount,
567
16
    ) -> Self {
568
16
        Self::new_inner(
569
16
            time_provider,
570
16
            receiver,
571
16
            xon_xoff_reader_ctrl,
572
16
            target,
573
16
            true,
574
16
            memquota,
575
16
        )
576
16
    }
577

            
578
    /// The shared implementation of the `new*()` functions.
579
84
    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
580
84
        time_provider: P,
581
84
        receiver: StreamReceiver,
582
84
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
583
84
        target: StreamTarget,
584
84
        connected: bool,
585
84
        memquota: StreamAccount,
586
84
    ) -> Self {
587
84
        let relay_cell_format = target.relay_cell_format();
588
84
        let out_buf_len = Data::max_body_len(relay_cell_format);
589
84
        let rate_limit_stream = target.rate_limit_stream().clone();
590

            
591
        #[cfg(feature = "stream-ctrl")]
592
84
        let status = {
593
84
            let mut data_stream_status = DataStreamStatus::default();
594
84
            if connected {
595
16
                data_stream_status.record_connected();
596
68
            }
597
84
            Arc::new(Mutex::new(data_stream_status))
598
84
        };
599
84

            
600
84
        #[cfg(feature = "stream-ctrl")]
601
84
        let ctrl = Arc::new(ClientDataStreamCtrl {
602
84
            circuit: Arc::downgrade(target.circuit()),
603
84
            status: status.clone(),
604
84
            _memquota: memquota.clone(),
605
84
        });
606
84
        let r = DataReaderInner {
607
84
            state: Some(DataReaderState::Open(DataReaderImpl {
608
84
                s: receiver,
609
84
                pending: Vec::new(),
610
84
                offset: 0,
611
84
                connected,
612
84
                #[cfg(feature = "stream-ctrl")]
613
84
                status: status.clone(),
614
84
            })),
615
84
            _memquota: memquota.clone(),
616
84
            #[cfg(feature = "stream-ctrl")]
617
84
            ctrl: ctrl.clone(),
618
84
        };
619
84
        let w = DataWriterInner {
620
84
            state: Some(DataWriterState::Ready(DataWriterImpl {
621
84
                s: target,
622
84
                buf: vec![0; out_buf_len].into_boxed_slice(),
623
84
                n_pending: 0,
624
84
                #[cfg(feature = "stream-ctrl")]
625
84
                status,
626
84
                relay_cell_format,
627
84
            })),
628
84
            _memquota: memquota,
629
84
            #[cfg(feature = "stream-ctrl")]
630
84
            ctrl: ctrl.clone(),
631
84
        };
632
84

            
633
84
        let time_provider = DynTimeProvider::new(time_provider);
634
84

            
635
84
        DataStream {
636
84
            w: DataWriter::new(w, rate_limit_stream, time_provider),
637
84
            r: DataReader::new(r, xon_xoff_reader_ctrl),
638
84
            #[cfg(feature = "stream-ctrl")]
639
84
            ctrl,
640
84
        }
641
84
    }
642

            
643
    /// Divide this DataStream into its constituent parts.
644
16
    pub fn split(self) -> (DataReader, DataWriter) {
645
16
        (self.r, self.w)
646
16
    }
647

            
648
    /// Wait until a CONNECTED cell is received, or some other cell
649
    /// is received to indicate an error.
650
    ///
651
    /// Does nothing if this stream is already connected.
652
90
    pub async fn wait_for_connection(&mut self) -> Result<()> {
653
60
        // We must put state back before returning
654
60
        let state = self
655
60
            .r
656
60
            .reader
657
60
            .inner_mut()
658
60
            .state
659
60
            .take()
660
60
            .expect("Missing state in DataReaderInner");
661

            
662
60
        if let DataReaderState::Open(mut imp) = state {
663
60
            let result = if imp.connected {
664
                Ok(())
665
            } else {
666
                // This succeeds if the cell is CONNECTED, and fails otherwise.
667
134
                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
668
            };
669
60
            self.r.reader.inner_mut().state = Some(match result {
670
                Err(_) => DataReaderState::Closed,
671
60
                Ok(_) => DataReaderState::Open(imp),
672
            });
673
60
            result
674
        } else {
675
            Err(Error::from(internal!(
676
                "Expected ready state, got {:?}",
677
                state
678
            )))
679
        }
680
60
    }
681

            
682
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
683
    /// interact with this stream without holding the stream itself.
684
    #[cfg(feature = "stream-ctrl")]
685
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
686
        Some(&self.ctrl)
687
    }
688
}
689

            
690
impl AsyncRead for DataStream {
691
198
    fn poll_read(
692
198
        mut self: Pin<&mut Self>,
693
198
        cx: &mut Context<'_>,
694
198
        buf: &mut [u8],
695
198
    ) -> Poll<IoResult<usize>> {
696
198
        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
697
198
    }
698
}
699

            
700
#[cfg(feature = "tokio")]
701
impl TokioAsyncRead for DataStream {
702
    fn poll_read(
703
        self: Pin<&mut Self>,
704
        cx: &mut Context<'_>,
705
        buf: &mut ReadBuf<'_>,
706
    ) -> Poll<IoResult<()>> {
707
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
708
    }
709
}
710

            
711
impl AsyncWrite for DataStream {
712
5296
    fn poll_write(
713
5296
        mut self: Pin<&mut Self>,
714
5296
        cx: &mut Context<'_>,
715
5296
        buf: &[u8],
716
5296
    ) -> Poll<IoResult<usize>> {
717
5296
        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
718
5296
    }
719
28
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
720
28
        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
721
28
    }
722
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
723
        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
724
    }
725
}
726

            
727
#[cfg(feature = "tokio")]
728
impl TokioAsyncWrite for DataStream {
729
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
730
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
731
    }
732

            
733
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
734
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
735
    }
736

            
737
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
738
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
739
    }
740
}
741

            
742
/// Helper type: Like BoxFuture, but also requires that the future be Sync.
743
type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
744

            
745
/// An enumeration for the state of a DataWriter.
746
///
747
/// We have to use an enum here because, for as long as we're waiting
748
/// for a flush operation to complete, the future returned by
749
/// `flush_cell()` owns the DataWriterImpl.
750
#[derive(Educe)]
751
#[educe(Debug)]
752
enum DataWriterState {
753
    /// The writer has closed or gotten an error: nothing more to do.
754
    Closed,
755
    /// The writer is not currently flushing; more data can get queued
756
    /// immediately.
757
    Ready(DataWriterImpl),
758
    /// The writer is flushing a cell.
759
    Flushing(
760
        #[educe(Debug(method = "skip_fmt"))] //
761
        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
762
    ),
763
}
764

            
765
/// Internal: the write part of a DataStream
766
#[derive(Educe)]
767
#[educe(Debug)]
768
struct DataWriterImpl {
769
    /// The underlying StreamTarget object.
770
    s: StreamTarget,
771

            
772
    /// Buffered data to send over the connection.
773
    ///
774
    /// This buffer is currently allocated using a number of bytes
775
    /// equal to the maximum that we can package at a time.
776
    //
777
    // TODO: this buffer is probably smaller than we want, but it's good
778
    // enough for now.  If we _do_ make it bigger, we'll have to change
779
    // our use of Data::split_from to handle the case where we can't fit
780
    // all the data.
781
    #[educe(Debug(method = "skip_fmt"))]
782
    buf: Box<[u8]>,
783

            
784
    /// Number of unflushed bytes in buf.
785
    n_pending: usize,
786

            
787
    /// Relay cell format in use
788
    relay_cell_format: RelayCellFormat,
789

            
790
    /// Shared user-visible information about the state of this stream.
791
    #[cfg(feature = "stream-ctrl")]
792
    status: Arc<Mutex<DataStreamStatus>>,
793
}
794

            
795
impl DataWriterInner {
796
    /// See [`DataWriter::client_stream_ctrl`].
797
    #[cfg(feature = "stream-ctrl")]
798
    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
799
        &self.ctrl
800
    }
801

            
802
    /// Helper for poll_flush() and poll_close(): Performs a flush, then
803
    /// closes the stream if should_close is true.
804
36
    fn poll_flush_impl(
805
36
        mut self: Pin<&mut Self>,
806
36
        cx: &mut Context<'_>,
807
36
        should_close: bool,
808
36
    ) -> Poll<IoResult<()>> {
809
36
        let state = self.state.take().expect("Missing state in DataWriter");
810

            
811
        // TODO: this whole function is a bit copy-pasted.
812
36
        let mut future: BoxSyncFuture<_> = match state {
813
36
            DataWriterState::Ready(imp) => {
814
36
                if imp.n_pending == 0 {
815
                    // Nothing to flush!
816
16
                    if should_close {
817
                        // We need to actually continue with this function to do the closing.
818
                        // Thus, make a future that does nothing and is ready immediately.
819
8
                        Box::pin(futures::future::ready((imp, Ok(()))))
820
                    } else {
821
                        // There's nothing more to do; we can return.
822
8
                        self.state = Some(DataWriterState::Ready(imp));
823
8
                        return Poll::Ready(Ok(()));
824
                    }
825
                } else {
826
                    // We need to flush the buffer's contents; Make a future for that.
827
20
                    Box::pin(imp.flush_buf())
828
                }
829
            }
830
            DataWriterState::Flushing(fut) => fut,
831
            DataWriterState::Closed => {
832
                self.state = Some(DataWriterState::Closed);
833
                return Poll::Ready(Err(Error::NotConnected.into()));
834
            }
835
        };
836

            
837
28
        match future.as_mut().poll(cx) {
838
            Poll::Ready((_imp, Err(e))) => {
839
                self.state = Some(DataWriterState::Closed);
840
                Poll::Ready(Err(e.into()))
841
            }
842
28
            Poll::Ready((mut imp, Ok(()))) => {
843
28
                if should_close {
844
8
                    // Tell the StreamTarget to close, so that the reactor
845
8
                    // realizes that we are done sending. (Dropping `imp.s` does not
846
8
                    // suffice, since there may be other clones of it.  In particular,
847
8
                    // the StreamReceiver has one, which it uses to keep the stream
848
8
                    // open, among other things.)
849
8
                    imp.s.close();
850
8

            
851
8
                    #[cfg(feature = "stream-ctrl")]
852
8
                    {
853
8
                        // TODO RPC:  This is not sufficient to track every case
854
8
                        // where we might have sent an End.  See note on the
855
8
                        // `sent_end` field.
856
8
                        imp.status.lock().expect("lock poisoned").sent_end = true;
857
8
                    }
858
8
                    self.state = Some(DataWriterState::Closed);
859
20
                } else {
860
20
                    self.state = Some(DataWriterState::Ready(imp));
861
20
                }
862
28
                Poll::Ready(Ok(()))
863
            }
864
            Poll::Pending => {
865
                self.state = Some(DataWriterState::Flushing(future));
866
                Poll::Pending
867
            }
868
        }
869
36
    }
870
}
871

            
872
impl AsyncWrite for DataWriterInner {
873
5296
    fn poll_write(
874
5296
        mut self: Pin<&mut Self>,
875
5296
        cx: &mut Context<'_>,
876
5296
        buf: &[u8],
877
5296
    ) -> Poll<IoResult<usize>> {
878
5296
        if buf.is_empty() {
879
            return Poll::Ready(Ok(0));
880
5296
        }
881
5296

            
882
5296
        let state = self.state.take().expect("Missing state in DataWriter");
883

            
884
5296
        let mut future = match state {
885
5256
            DataWriterState::Ready(mut imp) => {
886
5256
                let n_queued = imp.queue_bytes(buf);
887
5256
                if n_queued != 0 {
888
1204
                    self.state = Some(DataWriterState::Ready(imp));
889
1204
                    return Poll::Ready(Ok(n_queued));
890
4052
                }
891
4052
                // we couldn't queue anything, so the current cell must be full.
892
4052
                Box::pin(imp.flush_buf())
893
            }
894
40
            DataWriterState::Flushing(fut) => fut,
895
            DataWriterState::Closed => {
896
                self.state = Some(DataWriterState::Closed);
897
                return Poll::Ready(Err(Error::NotConnected.into()));
898
            }
899
        };
900

            
901
4092
        match future.as_mut().poll(cx) {
902
            Poll::Ready((_imp, Err(e))) => {
903
                #[cfg(feature = "stream-ctrl")]
904
                {
905
                    _imp.status.lock().expect("lock poisoned").record_error(&e);
906
                }
907
                self.state = Some(DataWriterState::Closed);
908
                Poll::Ready(Err(e.into()))
909
            }
910
4052
            Poll::Ready((mut imp, Ok(()))) => {
911
4052
                // Great!  We're done flushing.  Queue as much as we can of this
912
4052
                // cell.
913
4052
                let n_queued = imp.queue_bytes(buf);
914
4052
                self.state = Some(DataWriterState::Ready(imp));
915
4052
                Poll::Ready(Ok(n_queued))
916
            }
917
            Poll::Pending => {
918
40
                self.state = Some(DataWriterState::Flushing(future));
919
40
                Poll::Pending
920
            }
921
        }
922
5296
    }
923

            
924
28
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
925
28
        self.poll_flush_impl(cx, false)
926
28
    }
927

            
928
8
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
929
8
        self.poll_flush_impl(cx, true)
930
8
    }
931
}
932

            
933
#[cfg(feature = "tokio")]
934
impl TokioAsyncWrite for DataWriterInner {
935
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
936
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
937
    }
938

            
939
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
940
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
941
    }
942

            
943
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
944
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
945
    }
946
}
947

            
948
impl DataWriterImpl {
949
    /// Try to flush the current buffer contents as a data cell.
950
6108
    async fn flush_buf(mut self) -> (Self, Result<()>) {
951
4072
        let result = if let Some((cell, remainder)) =
952
4072
            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
953
        {
954
            // TODO: Eventually we may want a larger buffer; if we do,
955
            // this invariant will become false.
956
4072
            assert!(remainder.is_empty());
957
4072
            self.n_pending = 0;
958
4072
            self.s.send(cell.into()).await
959
        } else {
960
            Ok(())
961
        };
962

            
963
4072
        (self, result)
964
4072
    }
965

            
966
    /// Add as many bytes as possible from `b` to our internal buffer;
967
    /// return the number we were able to add.
968
9308
    fn queue_bytes(&mut self, b: &[u8]) -> usize {
969
9308
        let empty_space = &mut self.buf[self.n_pending..];
970
9308
        if empty_space.is_empty() {
971
            // that is, len == 0
972
4052
            return 0;
973
5256
        }
974
5256

            
975
5256
        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
976
5256
        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
977
5256
        self.n_pending += n_to_copy;
978
5256
        n_to_copy
979
9308
    }
980
}
981

            
982
impl DataReaderInner {
983
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
984
    /// interact with this stream without holding the stream itself.
985
    #[cfg(feature = "stream-ctrl")]
986
    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
987
        &self.ctrl
988
    }
989
}
990

            
991
/// An enumeration for the state of a [`DataReaderInner`].
992
// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
993
// future. There are a few ways we could simplify this. See:
994
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
995
#[derive(Educe)]
996
#[educe(Debug)]
997
// We allow this since it's expected that streams will spend most of their time in the `Open` state,
998
// and will be cleaned up shortly after closing.
999
#[allow(clippy::large_enum_variant)]
enum DataReaderState {
    /// In this state we have received an end cell or an error.
    Closed,
    /// In this state the reader is open.
    Open(DataReaderImpl),
}
/// Wrapper for the read part of a [`DataStream`].
#[derive(Educe)]
#[educe(Debug)]
340
#[pin_project]
struct DataReaderImpl {
    /// The underlying StreamReceiver object.
    #[educe(Debug(method = "skip_fmt"))]
    #[pin]
    s: StreamReceiver,
    /// If present, data that we received on this stream but have not
    /// been able to send to the caller yet.
    // TODO: This data structure is probably not what we want, but
    // it's good enough for now.
    #[educe(Debug(method = "skip_fmt"))]
    pending: Vec<u8>,
    /// Index into pending to show what we've already read.
    offset: usize,
    /// If true, we have received a CONNECTED cell on this stream.
    connected: bool,
    /// Shared user-visible information about the state of this stream.
    #[cfg(feature = "stream-ctrl")]
    status: Arc<Mutex<DataStreamStatus>>,
}
impl AsyncRead for DataReaderInner {
198
    fn poll_read(
198
        mut self: Pin<&mut Self>,
198
        cx: &mut Context<'_>,
198
        buf: &mut [u8],
198
    ) -> Poll<IoResult<usize>> {
198
        // We're pulling the state object out of the reader.  We MUST
198
        // put it back before this function returns.
198
        let mut state = self.state.take().expect("Missing state in DataReaderInner");
        loop {
282
            let mut imp = match state {
282
                DataReaderState::Open(mut imp) => {
282
                    // There may be data to read already.
282
                    let n_copied = imp.extract_bytes(buf);
282
                    if n_copied != 0 || buf.is_empty() {
                        // We read data into the buffer, or the buffer was 0-len to begin with.
                        // Tell the caller.
76
                        self.state = Some(DataReaderState::Open(imp));
76
                        return Poll::Ready(Ok(n_copied));
206
                    }
206

            
206
                    // No data available!  We have to try reading.
206
                    imp
                }
                DataReaderState::Closed => {
                    // TODO: Why are we returning an error rather than continuing to return EOF?
                    self.state = Some(DataReaderState::Closed);
                    return Poll::Ready(Err(Error::NotConnected.into()));
                }
            };
            // See if a cell is ready.
206
            match Pin::new(&mut imp).read_cell(cx) {
20
                Poll::Ready(Err(e)) => {
20
                    // There aren't any survivable errors in the current
20
                    // design.
20
                    self.state = Some(DataReaderState::Closed);
20
                    #[cfg(feature = "stream-ctrl")]
20
                    {
20
                        imp.status.lock().expect("lock poisoned").record_error(&e);
20
                    }
20
                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
20
                        Ok(0)
                    } else {
                        Err(e.into())
                    };
20
                    return Poll::Ready(result);
                }
84
                Poll::Ready(Ok(())) => {
84
                    // It read a cell!  Continue the loop.
84
                    state = DataReaderState::Open(imp);
84
                }
                Poll::Pending => {
                    // No cells ready, so tell the
                    // caller to get back to us later.
102
                    self.state = Some(DataReaderState::Open(imp));
102
                    return Poll::Pending;
                }
            }
        }
198
    }
}
#[cfg(feature = "tokio")]
impl TokioAsyncRead for DataReaderInner {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<IoResult<()>> {
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
    }
}
impl DataReaderImpl {
    /// Pull as many bytes as we can off of self.pending, and return that
    /// number of bytes.
282
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
282
        let remainder = &self.pending[self.offset..];
282
        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
282
        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
282
        self.offset += n_to_copy;
282

            
282
        n_to_copy
282
    }
    /// Return true iff there are no buffered bytes here to yield
76
    fn buf_is_empty(&self) -> bool {
76
        self.pending.len() == self.offset
76
    }
    /// Load self.pending with the contents of a new data cell.
340
    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        use DataStreamMsg::*;
340
        let msg = match self.as_mut().project().s.poll_next(cx) {
176
            Poll::Pending => return Poll::Pending,
164
            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<DataStreamMsg>() {
164
                Ok(cell) => cell.into_msg(),
                Err(e) => {
                    self.s.protocol_error();
                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
                }
            },
            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
            // terminated, we should be returning `None` here and not considering it as an error.
            // The `StreamReceiver` will have already returned an error if the cell stream was
            // terminated without an END message.
            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
        };
164
        let result = match msg {
68
            Connected(_) if !self.connected => {
68
                self.connected = true;
68
                #[cfg(feature = "stream-ctrl")]
68
                {
68
                    self.status
68
                        .lock()
68
                        .expect("poisoned lock")
68
                        .record_connected();
68
                }
68
                Ok(())
            }
            Connected(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a second connect cell on a data stream".to_string(),
                ))
            }
76
            Data(d) if self.connected => {
76
                self.add_data(d.into());
76
                Ok(())
            }
            Data(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a data cell an unconnected stream".to_string(),
                ))
            }
20
            End(e) => Err(Error::EndReceived(e.reason())),
        };
164
        Poll::Ready(result)
340
    }
    /// Add the data from `d` to the end of our pending bytes.
76
    fn add_data(&mut self, mut d: Vec<u8>) {
76
        if self.buf_is_empty() {
76
            // No data pending?  Just take d as the new pending.
76
            self.pending = d;
76
            self.offset = 0;
76
        } else {
            // TODO(nickm) This has potential to grow `pending` without bound.
            // Fortunately, we don't currently read cells or call this
            // `add_data` method when pending is nonempty—but if we do in the
            // future, we'll have to be careful here.
            self.pending.append(&mut d);
        }
76
    }
}
/// A `CmdChecker` that enforces invariants for outbound data streams.
#[derive(Debug)]
pub(crate) struct DataCmdChecker {
    /// True if we are expecting to receive a CONNECTED message on this stream.
    expecting_connected: bool,
}
impl Default for DataCmdChecker {
334
    fn default() -> Self {
334
        Self {
334
            expecting_connected: true,
334
        }
334
    }
}
impl super::CmdChecker for DataCmdChecker {
216
    fn check_msg(
216
        &mut self,
216
        msg: &tor_cell::relaycell::UnparsedRelayMsg,
216
    ) -> Result<super::StreamStatus> {
        use super::StreamStatus::*;
216
        match msg.cmd() {
            RelayCmd::CONNECTED => {
78
                if !self.expecting_connected {
4
                    Err(Error::StreamProto(
4
                        "Received CONNECTED twice on a stream.".into(),
4
                    ))
                } else {
74
                    self.expecting_connected = false;
74
                    Ok(Open)
                }
            }
            RelayCmd::DATA => {
116
                if !self.expecting_connected {
116
                    Ok(Open)
                } else {
                    Err(Error::StreamProto(
                        "Received DATA before CONNECTED on a stream".into(),
                    ))
                }
            }
20
            RelayCmd::END => Ok(Closed),
2
            _ => Err(Error::StreamProto(format!(
2
                "Unexpected {} on a data stream!",
2
                msg.cmd()
2
            ))),
        }
216
    }
44
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
44
        let _ = msg
44
            .decode::<DataStreamMsg>()
44
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
44
        Ok(())
44
    }
}
impl DataCmdChecker {
    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
    /// constructed connection.
334
    pub(crate) fn new_any() -> AnyCmdChecker {
334
        Box::<Self>::default()
334
    }
    /// Return a new boxed `DataCmdChecker` in a state suitable for a
    /// connection where an initial CONNECTED cell is not expected.
    ///
    /// This is used by hidden services, exit relays, and directory servers
    /// to accept streams.
    #[cfg(feature = "hs-service")]
24
    pub(crate) fn new_connected() -> AnyCmdChecker {
24
        Box::new(Self {
24
            expecting_connected: false,
24
        })
24
    }
}