1
//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2
//! for byte-oriented communication.
3

            
4
use crate::{Error, Result};
5
use static_assertions::assert_impl_all;
6
use tor_cell::relaycell::msg::EndReason;
7
use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8

            
9
use futures::io::{AsyncRead, AsyncWrite};
10
use futures::task::{Context, Poll};
11
use futures::Future;
12

            
13
#[cfg(feature = "tokio")]
14
use tokio_crate::io::ReadBuf;
15
#[cfg(feature = "tokio")]
16
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
17
#[cfg(feature = "tokio")]
18
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
19
use tor_cell::restricted_msg;
20

            
21
use std::fmt::Debug;
22
use std::io::Result as IoResult;
23
use std::pin::Pin;
24
#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
25
use std::sync::Arc;
26
#[cfg(feature = "stream-ctrl")]
27
use std::sync::{Mutex, Weak};
28

            
29
use educe::Educe;
30

            
31
#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
32
use crate::tunnel::circuit::ClientCirc;
33

            
34
use crate::memquota::StreamAccount;
35
use crate::stream::StreamReader;
36
use crate::tunnel::StreamTarget;
37
use tor_basic_utils::skip_fmt;
38
use tor_cell::relaycell::msg::Data;
39
use tor_error::internal;
40

            
41
use super::AnyCmdChecker;
42

            
43
/// An anonymized stream over the Tor network.
44
///
45
/// For most purposes, you can think of this type as an anonymized
46
/// TCP stream: it can read and write data, and get closed when it's done.
47
///
48
/// [`DataStream`] implements [`futures::io::AsyncRead`] and
49
/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
50
/// traits are expected.
51
///
52
/// # Examples
53
///
54
/// Connecting to an HTTP server and sending a request, using
55
/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
56
///
57
/// ```ignore
58
/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
59
///
60
/// use futures::io::AsyncWriteExt;
61
///
62
/// stream
63
///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
64
///     .await?;
65
///
66
/// // Flushing the stream is important; see below!
67
/// stream.flush().await?;
68
/// ```
69
///
70
/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
71
///
72
/// ```ignore
73
/// use futures::io::AsyncReadExt;
74
///
75
/// let mut buf = Vec::new();
76
/// stream.read_to_end(&mut buf).await?;
77
///
78
/// println!("{}", String::from_utf8_lossy(&buf));
79
/// ```
80
///
81
/// # Usage with Tokio
82
///
83
/// If the `tokio` crate feature is enabled, this type also implements
84
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
85
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
86
/// with code that expects those traits.
87
///
88
/// # Remember to call `flush`!
89
///
90
/// DataStream buffers data internally, in order to write as few cells
91
/// as possible onto the network.  In order to make sure that your
92
/// data has actually been sent, you need to make sure that
93
/// [`AsyncWrite::poll_flush`] runs to completion: probably via
94
/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
95
///
96
/// # Splitting the type
97
///
98
/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
99
/// `DataStream::split` method can be used to split it into those two parts, for more
100
/// convenient usage with e.g. stream combinators.
101
///
102
/// # How long does a stream live?
103
///
104
/// A `DataStream` will live until all references to it are dropped,
105
/// or until it is closed explicitly.
106
///
107
/// If you split the stream into a `DataReader` and a `DataWriter`, it
108
/// will survive until _both_ are dropped, or until it is closed
109
/// explicitly.
110
///
111
/// A stream can also close because of a network error,
112
/// or because the other side of the stream decided to close it.
113
///
114
// # Semver note
115
//
116
// Note that this type is re-exported as a part of the public API of
117
// the `arti-client` crate.  Any changes to its API here in
118
// `tor-proto` need to be reflected above.
119
#[derive(Debug)]
120
pub struct DataStream {
121
    /// Underlying writer for this stream
122
    w: DataWriter,
123
    /// Underlying reader for this stream
124
    r: DataReader,
125
    /// A control object that can be used to monitor and control this stream
126
    /// without needing to own it.
127
    #[cfg(feature = "stream-ctrl")]
128
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
129
}
130
assert_impl_all! { DataStream: Send, Sync }
131

            
132
/// An object used to control and monitor a data stream.
133
///
134
/// # Notes
135
///
136
/// This is a separate type from [`DataStream`] because it's useful to have
137
/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
138
/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
139
/// work correctly.
140
#[cfg(feature = "stream-ctrl")]
141
#[derive(Debug)]
142
pub struct ClientDataStreamCtrl {
143
    /// The circuit to which this stream is attached.
144
    ///
145
    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
146
    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
147
    /// one of those is alive, this reference will be present.
148
    ///
149
    /// We make this a Weak reference so that once the stream itself is closed,
150
    /// we can't leak circuits.
151
    // TODO(conflux): use ClientTunnel
152
    circuit: Weak<ClientCirc>,
153

            
154
    /// Shared user-visible information about the state of this stream.
155
    ///
156
    /// TODO RPC: This will probably want to be a `postage::Watch` or something
157
    /// similar, if and when it stops moving around.
158
    #[cfg(feature = "stream-ctrl")]
159
    status: Arc<Mutex<DataStreamStatus>>,
160

            
161
    /// The memory quota account that should be used for this stream's data
162
    ///
163
    /// Exists to keep the account alive
164
    _memquota: StreamAccount,
165
}
166

            
167
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
168
///
169
/// See the [`DataStream`] docs for more information. In particular, note
170
/// that this writer requires `poll_flush` to complete in order to guarantee that
171
/// all data has been written.
172
///
173
/// # Usage with Tokio
174
///
175
/// If the `tokio` crate feature is enabled, this type also implements
176
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
177
/// with code that expects that trait.
178
///
179
/// # Drop and close
180
///
181
/// Note that dropping a `DataWriter` has no special effect on its own:
182
/// if the `DataWriter` is dropped, the underlying stream will still remain open
183
/// until the `DataReader` is also dropped.
184
///
185
/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
186
/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
187
///
188
/// Remember that Tor does not support half-open streams:
189
/// If you `close` or `shutdown` a stream,
190
/// the other side will not see the stream as half-open,
191
/// and so will (probably) not finish sending you any in-progress data.
192
/// Do not use `close`/`shutdown` to communicate anything besides
193
/// "I am done using this stream."
194
///
195
// # Semver note
196
//
197
// Note that this type is re-exported as a part of the public API of
198
// the `arti-client` crate.  Any changes to its API here in
199
// `tor-proto` need to be reflected above.
200
#[derive(Debug)]
201
pub struct DataWriter {
202
    /// Internal state for this writer
203
    ///
204
    /// This is stored in an Option so that we can mutate it in the
205
    /// AsyncWrite functions.  It might be possible to do better here,
206
    /// and we should refactor if so.
207
    state: Option<DataWriterState>,
208

            
209
    /// The memory quota account that should be used for this stream's data
210
    ///
211
    /// Exists to keep the account alive
212
    // If we liked, we could make this conditional; see DataReader.memquota
213
    _memquota: StreamAccount,
214

            
215
    /// A control object that can be used to monitor and control this stream
216
    /// without needing to own it.
217
    #[cfg(feature = "stream-ctrl")]
218
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
219
}
220

            
221
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
222
///
223
/// See the [`DataStream`] docs for more information.
224
///
225
/// # Usage with Tokio
226
///
227
/// If the `tokio` crate feature is enabled, this type also implements
228
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
229
/// with code that expects that trait.
230
//
231
// # Semver note
232
//
233
// Note that this type is re-exported as a part of the public API of
234
// the `arti-client` crate.  Any changes to its API here in
235
// `tor-proto` need to be reflected above.
236
#[derive(Debug)]
237
pub struct DataReader {
238
    /// Internal state for this reader.
239
    ///
240
    /// This is stored in an Option so that we can mutate it in
241
    /// poll_read().  It might be possible to do better here, and we
242
    /// should refactor if so.
243
    state: Option<DataReaderState>,
244

            
245
    /// The memory quota account that should be used for this stream's data
246
    ///
247
    /// Exists to keep the account alive
248
    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
249
    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
250
    _memquota: StreamAccount,
251

            
252
    /// A control object that can be used to monitor and control this stream
253
    /// without needing to own it.
254
    #[cfg(feature = "stream-ctrl")]
255
    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
256
}
257

            
258
/// Shared status flags for tracking the status of as `DataStream`.
259
///
260
/// We expect to refactor this a bit, so it's not exposed at all.
261
//
262
// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
263
// from various points in this module, we should instead construct
264
// DataStreamStatus as needed from information available elsewhere.  In any
265
// case, we should really  eliminate as much duplicate state here as we can.
266
// (See discussions at !1198 for some challenges with this.)
267
#[cfg(feature = "stream-ctrl")]
268
#[derive(Clone, Debug, Default)]
269
struct DataStreamStatus {
270
    /// True if we've received a CONNECTED message.
271
    //
272
    // TODO: This is redundant with `connected` in DataReaderImpl and
273
    // `expecting_connected` in DataCmdChecker.
274
    received_connected: bool,
275
    /// True if we have decided to send an END message.
276
    //
277
    // TODO RPC: There is not an easy way to set this from this module!  Really,
278
    // the decision to send an "end" is made when the StreamTarget object is
279
    // dropped, but we don't currently have any way to see when that happens.
280
    // Perhaps we need a different shared StreamStatus object that the
281
    // StreamTarget holds?
282
    sent_end: bool,
283
    /// True if we have received an END message telling us to close the stream.
284
    received_end: bool,
285
    /// True if we have received an error.
286
    ///
287
    /// (This is not a subset or superset of received_end; some errors are END
288
    /// messages but some aren't; some END messages are errors but some aren't.)
289
    received_err: bool,
290
}
291

            
292
#[cfg(feature = "stream-ctrl")]
293
impl DataStreamStatus {
294
    /// Remember that we've received a connected message.
295
72
    fn record_connected(&mut self) {
296
72
        self.received_connected = true;
297
72
    }
298

            
299
    /// Remember that we've received an error of some kind.
300
8
    fn record_error(&mut self, e: &Error) {
301
8
        // TODO: Probably we should remember the actual error in a box or
302
8
        // something.  But that means making a redundant copy of the error
303
8
        // even if nobody will want it.  Do we care?
304
8
        match e {
305
8
            Error::EndReceived(EndReason::DONE) => self.received_end = true,
306
            Error::EndReceived(_) => {
307
                self.received_end = true;
308
                self.received_err = true;
309
            }
310
            _ => self.received_err = true,
311
        }
312
8
    }
313
}
314

            
315
restricted_msg! {
316
    /// An allowable incoming message on a data stream.
317
    enum DataStreamMsg:RelayMsg {
318
        // SENDME is handled by the reactor.
319
        Data, End, Connected,
320
    }
321
}
322

            
323
// TODO RPC: Should we also implement this trait for everything that holds a
324
// ClientDataStreamCtrl?
325
#[cfg(feature = "stream-ctrl")]
326
impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
327
    // TODO(conflux): use ClientTunnel
328
    fn circuit(&self) -> Option<Arc<ClientCirc>> {
329
        self.circuit.upgrade()
330
    }
331
}
332

            
333
#[cfg(feature = "stream-ctrl")]
334
impl ClientDataStreamCtrl {
335
    /// Return true if the underlying stream is connected. (That is, if it has
336
    /// received a `CONNECTED` message, and has not been closed.)
337
    pub fn is_connected(&self) -> bool {
338
        let s = self.status.lock().expect("poisoned lock");
339
        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
340
    }
341

            
342
    // TODO RPC: Add more functions once we have the desired API more nailed
343
    // down.
344
}
345

            
346
impl DataStream {
347
    /// Wrap raw stream reader and target parts as a DataStream.
348
    ///
349
    /// For non-optimistic stream, function `wait_for_connection`
350
    /// must be called after to make sure CONNECTED is received.
351
56
    pub(crate) fn new(reader: StreamReader, target: StreamTarget, memquota: StreamAccount) -> Self {
352
56
        Self::new_inner(reader, target, false, memquota)
353
56
    }
354

            
355
    /// Wrap raw stream reader and target parts as a connected DataStream.
356
    ///
357
    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
358
    /// CONNECTED cell.
359
    ///
360
    /// This is used by hidden services, exit relays, and directory servers to accept streams.
361
    #[cfg(feature = "hs-service")]
362
16
    pub(crate) fn new_connected(
363
16
        reader: StreamReader,
364
16
        target: StreamTarget,
365
16
        memquota: StreamAccount,
366
16
    ) -> Self {
367
16
        Self::new_inner(reader, target, true, memquota)
368
16
    }
369

            
370
    /// The shared implementation of the `new*()` functions.
371
72
    fn new_inner(
372
72
        reader: StreamReader,
373
72
        target: StreamTarget,
374
72
        connected: bool,
375
72
        memquota: StreamAccount,
376
72
    ) -> Self {
377
72
        let relay_cell_format = target.relay_cell_format();
378
72
        let out_buf_len = Data::max_body_len(relay_cell_format);
379

            
380
        #[cfg(feature = "stream-ctrl")]
381
72
        let status = {
382
72
            let mut data_stream_status = DataStreamStatus::default();
383
72
            if connected {
384
16
                data_stream_status.record_connected();
385
56
            }
386
72
            Arc::new(Mutex::new(data_stream_status))
387
72
        };
388
72

            
389
72
        #[cfg(feature = "stream-ctrl")]
390
72
        let ctrl = Arc::new(ClientDataStreamCtrl {
391
72
            circuit: Arc::downgrade(target.circuit()),
392
72
            status: status.clone(),
393
72
            _memquota: memquota.clone(),
394
72
        });
395
72
        let r = DataReader {
396
72
            state: Some(DataReaderState::Ready(DataReaderImpl {
397
72
                s: reader,
398
72
                pending: Vec::new(),
399
72
                offset: 0,
400
72
                connected,
401
72
                #[cfg(feature = "stream-ctrl")]
402
72
                status: status.clone(),
403
72
            })),
404
72
            _memquota: memquota.clone(),
405
72
            #[cfg(feature = "stream-ctrl")]
406
72
            ctrl: ctrl.clone(),
407
72
        };
408
72
        let w = DataWriter {
409
72
            state: Some(DataWriterState::Ready(DataWriterImpl {
410
72
                s: target,
411
72
                buf: vec![0; out_buf_len].into_boxed_slice(),
412
72
                n_pending: 0,
413
72
                #[cfg(feature = "stream-ctrl")]
414
72
                status,
415
72
                relay_cell_format,
416
72
            })),
417
72
            _memquota: memquota,
418
72
            #[cfg(feature = "stream-ctrl")]
419
72
            ctrl: ctrl.clone(),
420
72
        };
421
72
        DataStream {
422
72
            w,
423
72
            r,
424
72
            #[cfg(feature = "stream-ctrl")]
425
72
            ctrl,
426
72
        }
427
72
    }
428

            
429
    /// Divide this DataStream into its constituent parts.
430
16
    pub fn split(self) -> (DataReader, DataWriter) {
431
16
        (self.r, self.w)
432
16
    }
433

            
434
    /// Wait until a CONNECTED cell is received, or some other cell
435
    /// is received to indicate an error.
436
    ///
437
    /// Does nothing if this stream is already connected.
438
72
    pub async fn wait_for_connection(&mut self) -> Result<()> {
439
48
        // We must put state back before returning
440
48
        let state = self.r.state.take().expect("Missing state in DataReader");
441

            
442
48
        if let DataReaderState::Ready(imp) = state {
443
48
            let (imp, result) = if imp.connected {
444
                (imp, Ok(()))
445
            } else {
446
                // This succeeds if the cell is CONNECTED, and fails otherwise.
447
48
                imp.read_cell().await
448
            };
449
48
            self.r.state = Some(match result {
450
                Err(_) => DataReaderState::Closed,
451
48
                Ok(_) => DataReaderState::Ready(imp),
452
            });
453
48
            result
454
        } else {
455
            Err(Error::from(internal!(
456
                "Expected ready state, got {:?}",
457
                state
458
            )))
459
        }
460
48
    }
461

            
462
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
463
    /// interact with this stream without holding the stream itself.
464
    #[cfg(feature = "stream-ctrl")]
465
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
466
        Some(&self.ctrl)
467
    }
468
}
469

            
470
impl AsyncRead for DataStream {
471
98
    fn poll_read(
472
98
        mut self: Pin<&mut Self>,
473
98
        cx: &mut Context<'_>,
474
98
        buf: &mut [u8],
475
98
    ) -> Poll<IoResult<usize>> {
476
98
        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
477
98
    }
478
}
479

            
480
#[cfg(feature = "tokio")]
481
impl TokioAsyncRead for DataStream {
482
    fn poll_read(
483
        self: Pin<&mut Self>,
484
        cx: &mut Context<'_>,
485
        buf: &mut ReadBuf<'_>,
486
    ) -> Poll<IoResult<()>> {
487
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
488
    }
489
}
490

            
491
impl AsyncWrite for DataStream {
492
4088
    fn poll_write(
493
4088
        mut self: Pin<&mut Self>,
494
4088
        cx: &mut Context<'_>,
495
4088
        buf: &[u8],
496
4088
    ) -> Poll<IoResult<usize>> {
497
4088
        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
498
4088
    }
499
16
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
500
16
        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
501
16
    }
502
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
503
        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
504
    }
505
}
506

            
507
#[cfg(feature = "tokio")]
508
impl TokioAsyncWrite for DataStream {
509
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
510
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
511
    }
512

            
513
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
514
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
515
    }
516

            
517
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
518
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
519
    }
520
}
521

            
522
/// Helper type: Like BoxFuture, but also requires that the future be Sync.
523
type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
524

            
525
/// An enumeration for the state of a DataWriter.
526
///
527
/// We have to use an enum here because, for as long as we're waiting
528
/// for a flush operation to complete, the future returned by
529
/// `flush_cell()` owns the DataWriterImpl.
530
#[derive(Educe)]
531
#[educe(Debug)]
532
enum DataWriterState {
533
    /// The writer has closed or gotten an error: nothing more to do.
534
    Closed,
535
    /// The writer is not currently flushing; more data can get queued
536
    /// immediately.
537
    Ready(DataWriterImpl),
538
    /// The writer is flushing a cell.
539
    Flushing(
540
        #[educe(Debug(method = "skip_fmt"))] //
541
        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
542
    ),
543
}
544

            
545
/// Internal: the write part of a DataStream
546
#[derive(Educe)]
547
#[educe(Debug)]
548
struct DataWriterImpl {
549
    /// The underlying StreamTarget object.
550
    s: StreamTarget,
551

            
552
    /// Buffered data to send over the connection.
553
    ///
554
    /// This buffer is currently allocated using a number of bytes
555
    /// equal to the maximum that we can package at a time.
556
    //
557
    // TODO: this buffer is probably smaller than we want, but it's good
558
    // enough for now.  If we _do_ make it bigger, we'll have to change
559
    // our use of Data::split_from to handle the case where we can't fit
560
    // all the data.
561
    #[educe(Debug(method = "skip_fmt"))]
562
    buf: Box<[u8]>,
563

            
564
    /// Number of unflushed bytes in buf.
565
    n_pending: usize,
566

            
567
    /// Relay cell format in use
568
    relay_cell_format: RelayCellFormat,
569

            
570
    /// Shared user-visible information about the state of this stream.
571
    #[cfg(feature = "stream-ctrl")]
572
    status: Arc<Mutex<DataStreamStatus>>,
573
}
574

            
575
impl DataWriter {
576
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
577
    /// interact with this stream without holding the stream itself.
578
    #[cfg(feature = "stream-ctrl")]
579
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
580
        Some(&self.ctrl)
581
    }
582

            
583
    /// Helper for poll_flush() and poll_close(): Performs a flush, then
584
    /// closes the stream if should_close is true.
585
24
    fn poll_flush_impl(
586
24
        mut self: Pin<&mut Self>,
587
24
        cx: &mut Context<'_>,
588
24
        should_close: bool,
589
24
    ) -> Poll<IoResult<()>> {
590
24
        let state = self.state.take().expect("Missing state in DataWriter");
591

            
592
        // TODO: this whole function is a bit copy-pasted.
593
24
        let mut future: BoxSyncFuture<_> = match state {
594
24
            DataWriterState::Ready(imp) => {
595
24
                if imp.n_pending == 0 {
596
                    // Nothing to flush!
597
8
                    if should_close {
598
                        // We need to actually continue with this function to do the closing.
599
                        // Thus, make a future that does nothing and is ready immediately.
600
8
                        Box::pin(futures::future::ready((imp, Ok(()))))
601
                    } else {
602
                        // There's nothing more to do; we can return.
603
                        self.state = Some(DataWriterState::Ready(imp));
604
                        return Poll::Ready(Ok(()));
605
                    }
606
                } else {
607
                    // We need to flush the buffer's contents; Make a future for that.
608
16
                    Box::pin(imp.flush_buf())
609
                }
610
            }
611
            DataWriterState::Flushing(fut) => fut,
612
            DataWriterState::Closed => {
613
                self.state = Some(DataWriterState::Closed);
614
                return Poll::Ready(Err(Error::NotConnected.into()));
615
            }
616
        };
617

            
618
24
        match future.as_mut().poll(cx) {
619
            Poll::Ready((_imp, Err(e))) => {
620
                self.state = Some(DataWriterState::Closed);
621
                Poll::Ready(Err(e.into()))
622
            }
623
24
            Poll::Ready((mut imp, Ok(()))) => {
624
24
                if should_close {
625
8
                    // Tell the StreamTarget to close, so that the reactor
626
8
                    // realizes that we are done sending. (Dropping `imp.s` does not
627
8
                    // suffice, since there may be other clones of it.  In particular,
628
8
                    // the StreamReader has one, which it uses to keep the stream
629
8
                    // open, among other things.)
630
8
                    imp.s.close();
631
8

            
632
8
                    #[cfg(feature = "stream-ctrl")]
633
8
                    {
634
8
                        // TODO RPC:  This is not sufficient to track every case
635
8
                        // where we might have sent an End.  See note on the
636
8
                        // `sent_end` field.
637
8
                        imp.status.lock().expect("lock poisoned").sent_end = true;
638
8
                    }
639
8
                    self.state = Some(DataWriterState::Closed);
640
16
                } else {
641
16
                    self.state = Some(DataWriterState::Ready(imp));
642
16
                }
643
24
                Poll::Ready(Ok(()))
644
            }
645
            Poll::Pending => {
646
                self.state = Some(DataWriterState::Flushing(future));
647
                Poll::Pending
648
            }
649
        }
650
24
    }
651
}
652

            
653
impl AsyncWrite for DataWriter {
654
4088
    fn poll_write(
655
4088
        mut self: Pin<&mut Self>,
656
4088
        cx: &mut Context<'_>,
657
4088
        buf: &[u8],
658
4088
    ) -> Poll<IoResult<usize>> {
659
4088
        if buf.is_empty() {
660
            return Poll::Ready(Ok(0));
661
4088
        }
662
4088

            
663
4088
        let state = self.state.take().expect("Missing state in DataWriter");
664

            
665
4088
        let mut future = match state {
666
4056
            DataWriterState::Ready(mut imp) => {
667
4056
                let n_queued = imp.queue_bytes(buf);
668
4056
                if n_queued != 0 {
669
1200
                    self.state = Some(DataWriterState::Ready(imp));
670
1200
                    return Poll::Ready(Ok(n_queued));
671
2856
                }
672
2856
                // we couldn't queue anything, so the current cell must be full.
673
2856
                Box::pin(imp.flush_buf())
674
            }
675
32
            DataWriterState::Flushing(fut) => fut,
676
            DataWriterState::Closed => {
677
                self.state = Some(DataWriterState::Closed);
678
                return Poll::Ready(Err(Error::NotConnected.into()));
679
            }
680
        };
681

            
682
2888
        match future.as_mut().poll(cx) {
683
            Poll::Ready((_imp, Err(e))) => {
684
                #[cfg(feature = "stream-ctrl")]
685
                {
686
                    _imp.status.lock().expect("lock poisoned").record_error(&e);
687
                }
688
                self.state = Some(DataWriterState::Closed);
689
                Poll::Ready(Err(e.into()))
690
            }
691
2856
            Poll::Ready((mut imp, Ok(()))) => {
692
2856
                // Great!  We're done flushing.  Queue as much as we can of this
693
2856
                // cell.
694
2856
                let n_queued = imp.queue_bytes(buf);
695
2856
                self.state = Some(DataWriterState::Ready(imp));
696
2856
                Poll::Ready(Ok(n_queued))
697
            }
698
            Poll::Pending => {
699
32
                self.state = Some(DataWriterState::Flushing(future));
700
32
                Poll::Pending
701
            }
702
        }
703
4088
    }
704

            
705
16
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
706
16
        self.poll_flush_impl(cx, false)
707
16
    }
708

            
709
8
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
710
8
        self.poll_flush_impl(cx, true)
711
8
    }
712
}
713

            
714
#[cfg(feature = "tokio")]
715
impl TokioAsyncWrite for DataWriter {
716
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
717
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
718
    }
719

            
720
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
721
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
722
    }
723

            
724
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
725
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
726
    }
727
}
728

            
729
impl DataWriterImpl {
730
    /// Try to flush the current buffer contents as a data cell.
731
4308
    async fn flush_buf(mut self) -> (Self, Result<()>) {
732
2872
        let result = if let Some((cell, remainder)) =
733
2872
            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
734
        {
735
            // TODO: Eventually we may want a larger buffer; if we do,
736
            // this invariant will become false.
737
2872
            assert!(remainder.is_empty());
738
2872
            self.n_pending = 0;
739
2872
            self.s.send(cell.into()).await
740
        } else {
741
            Ok(())
742
        };
743

            
744
2872
        (self, result)
745
2872
    }
746

            
747
    /// Add as many bytes as possible from `b` to our internal buffer;
748
    /// return the number we were able to add.
749
6912
    fn queue_bytes(&mut self, b: &[u8]) -> usize {
750
6912
        let empty_space = &mut self.buf[self.n_pending..];
751
6912
        if empty_space.is_empty() {
752
            // that is, len == 0
753
2856
            return 0;
754
4056
        }
755
4056

            
756
4056
        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
757
4056
        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
758
4056
        self.n_pending += n_to_copy;
759
4056
        n_to_copy
760
6912
    }
761
}
762

            
763
impl DataReader {
764
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
765
    /// interact with this stream without holding the stream itself.
766
    #[cfg(feature = "stream-ctrl")]
767
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
768
        Some(&self.ctrl)
769
    }
770
}
771

            
772
/// An enumeration for the state of a DataReader.
773
///
774
/// We have to use an enum here because, when we're waiting for
775
/// ReadingCell to complete, the future returned by `read_cell()` owns the
776
/// DataCellImpl.  If we wanted to store the future and the cell at the
777
/// same time, we'd need to make a self-referential structure, which isn't
778
/// possible in safe Rust AIUI.
779
#[derive(Educe)]
780
#[educe(Debug)]
781
enum DataReaderState {
782
    /// In this state we have received an end cell or an error.
783
    Closed,
784
    /// In this state the reader is not currently fetching a cell; it
785
    /// either has data or not.
786
    Ready(DataReaderImpl),
787
    /// The reader is currently fetching a cell: this future is the
788
    /// progress it is making.
789
    ReadingCell(
790
        #[educe(Debug(method = "skip_fmt"))] //
791
        BoxSyncFuture<'static, (DataReaderImpl, Result<()>)>,
792
    ),
793
}
794

            
795
/// Wrapper for the read part of a DataStream
796
#[derive(Educe)]
797
#[educe(Debug)]
798
struct DataReaderImpl {
799
    /// The underlying StreamReader object.
800
    #[educe(Debug(method = "skip_fmt"))]
801
    s: StreamReader,
802

            
803
    /// If present, data that we received on this stream but have not
804
    /// been able to send to the caller yet.
805
    // TODO: This data structure is probably not what we want, but
806
    // it's good enough for now.
807
    #[educe(Debug(method = "skip_fmt"))]
808
    pending: Vec<u8>,
809

            
810
    /// Index into pending to show what we've already read.
811
    offset: usize,
812

            
813
    /// If true, we have received a CONNECTED cell on this stream.
814
    connected: bool,
815

            
816
    /// Shared user-visible information about the state of this stream.
817
    #[cfg(feature = "stream-ctrl")]
818
    status: Arc<Mutex<DataStreamStatus>>,
819
}
820

            
821
impl AsyncRead for DataReader {
822
98
    fn poll_read(
823
98
        mut self: Pin<&mut Self>,
824
98
        cx: &mut Context<'_>,
825
98
        buf: &mut [u8],
826
98
    ) -> Poll<IoResult<usize>> {
827
98
        // We're pulling the state object out of the reader.  We MUST
828
98
        // put it back before this function returns.
829
98
        let mut state = self.state.take().expect("Missing state in DataReader");
830

            
831
        loop {
832
130
            let mut future = match state {
833
64
                DataReaderState::Ready(mut imp) => {
834
64
                    // There may be data to read already.
835
64
                    let n_copied = imp.extract_bytes(buf);
836
64
                    if n_copied != 0 {
837
                        // We read data into the buffer.  Tell the caller.
838
24
                        self.state = Some(DataReaderState::Ready(imp));
839
24
                        return Poll::Ready(Ok(n_copied));
840
40
                    }
841
40

            
842
40
                    // No data available!  We have to launch a read.
843
40
                    Box::pin(imp.read_cell())
844
                }
845
66
                DataReaderState::ReadingCell(fut) => fut,
846
                DataReaderState::Closed => {
847
                    self.state = Some(DataReaderState::Closed);
848
                    return Poll::Ready(Err(Error::NotConnected.into()));
849
                }
850
            };
851

            
852
            // We have a future that represents an in-progress read.
853
            // See if it can make progress.
854
106
            match future.as_mut().poll(cx) {
855
8
                Poll::Ready((_imp, Err(e))) => {
856
8
                    // There aren't any survivable errors in the current
857
8
                    // design.
858
8
                    self.state = Some(DataReaderState::Closed);
859
8
                    #[cfg(feature = "stream-ctrl")]
860
8
                    {
861
8
                        _imp.status.lock().expect("lock poisoned").record_error(&e);
862
8
                    }
863
8
                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
864
8
                        Ok(0)
865
                    } else {
866
                        Err(e.into())
867
                    };
868
8
                    return Poll::Ready(result);
869
                }
870
32
                Poll::Ready((imp, Ok(()))) => {
871
32
                    // It read a cell!  Continue the loop.
872
32
                    state = DataReaderState::Ready(imp);
873
32
                }
874
                Poll::Pending => {
875
                    // The future is pending; store it and tell the
876
                    // caller to get back to us later.
877
66
                    self.state = Some(DataReaderState::ReadingCell(future));
878
66
                    return Poll::Pending;
879
                }
880
            }
881
        }
882
98
    }
883
}
884

            
885
#[cfg(feature = "tokio")]
886
impl TokioAsyncRead for DataReader {
887
    fn poll_read(
888
        self: Pin<&mut Self>,
889
        cx: &mut Context<'_>,
890
        buf: &mut ReadBuf<'_>,
891
    ) -> Poll<IoResult<()>> {
892
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
893
    }
894
}
895

            
896
impl DataReaderImpl {
897
    /// Pull as many bytes as we can off of self.pending, and return that
898
    /// number of bytes.
899
64
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
900
64
        let remainder = &self.pending[self.offset..];
901
64
        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
902
64
        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
903
64
        self.offset += n_to_copy;
904
64

            
905
64
        n_to_copy
906
64
    }
907

            
908
    /// Return true iff there are no buffered bytes here to yield
909
24
    fn buf_is_empty(&self) -> bool {
910
24
        self.pending.len() == self.offset
911
24
    }
912

            
913
    /// Load self.pending with the contents of a new data cell.
914
    ///
915
    /// This function takes ownership of self so that we can avoid
916
    /// self-referential lifetimes.
917
132
    async fn read_cell(mut self) -> (Self, Result<()>) {
918
        use DataStreamMsg::*;
919
88
        let msg = match self.s.recv().await {
920
88
            Ok(unparsed) => match unparsed.decode::<DataStreamMsg>() {
921
88
                Ok(cell) => cell.into_msg(),
922
                Err(e) => {
923
                    self.s.protocol_error();
924
                    return (
925
                        self,
926
                        Err(Error::from_bytes_err(e, "message on a data stream")),
927
                    );
928
                }
929
            },
930
            Err(e) => return (self, Err(e)),
931
        };
932

            
933
88
        let result = match msg {
934
56
            Connected(_) if !self.connected => {
935
56
                self.connected = true;
936
56
                #[cfg(feature = "stream-ctrl")]
937
56
                {
938
56
                    self.status
939
56
                        .lock()
940
56
                        .expect("poisoned lock")
941
56
                        .record_connected();
942
56
                }
943
56
                Ok(())
944
            }
945
            Connected(_) => {
946
                self.s.protocol_error();
947
                Err(Error::StreamProto(
948
                    "Received a second connect cell on a data stream".to_string(),
949
                ))
950
            }
951
24
            Data(d) if self.connected => {
952
24
                self.add_data(d.into());
953
24
                Ok(())
954
            }
955
            Data(_) => {
956
                self.s.protocol_error();
957
                Err(Error::StreamProto(
958
                    "Received a data cell an unconnected stream".to_string(),
959
                ))
960
            }
961
8
            End(e) => Err(Error::EndReceived(e.reason())),
962
        };
963

            
964
88
        (self, result)
965
88
    }
966

            
967
    /// Add the data from `d` to the end of our pending bytes.
968
24
    fn add_data(&mut self, mut d: Vec<u8>) {
969
24
        if self.buf_is_empty() {
970
24
            // No data pending?  Just take d as the new pending.
971
24
            self.pending = d;
972
24
            self.offset = 0;
973
24
        } else {
974
            // TODO(nickm) This has potential to grow `pending` without bound.
975
            // Fortunately, we don't currently read cells or call this
976
            // `add_data` method when pending is nonempty—but if we do in the
977
            // future, we'll have to be careful here.
978
            self.pending.append(&mut d);
979
        }
980
24
    }
981
}
982

            
983
/// A `CmdChecker` that enforces invariants for outbound data streams.
984
#[derive(Debug)]
985
pub(crate) struct DataCmdChecker {
986
    /// True if we are expecting to receive a CONNECTED message on this stream.
987
    expecting_connected: bool,
988
}
989

            
990
impl Default for DataCmdChecker {
991
322
    fn default() -> Self {
992
322
        Self {
993
322
            expecting_connected: true,
994
322
        }
995
322
    }
996
}
997

            
998
impl super::CmdChecker for DataCmdChecker {
999
140
    fn check_msg(
140
        &mut self,
140
        msg: &tor_cell::relaycell::UnparsedRelayMsg,
140
    ) -> Result<super::StreamStatus> {
        use super::StreamStatus::*;
140
        match msg.cmd() {
            RelayCmd::CONNECTED => {
66
                if !self.expecting_connected {
4
                    Err(Error::StreamProto(
4
                        "Received CONNECTED twice on a stream.".into(),
4
                    ))
                } else {
62
                    self.expecting_connected = false;
62
                    Ok(Open)
                }
            }
            RelayCmd::DATA => {
64
                if !self.expecting_connected {
64
                    Ok(Open)
                } else {
                    Err(Error::StreamProto(
                        "Received DATA before CONNECTED on a stream".into(),
                    ))
                }
            }
8
            RelayCmd::END => Ok(Closed),
2
            _ => Err(Error::StreamProto(format!(
2
                "Unexpected {} on a data stream!",
2
                msg.cmd()
2
            ))),
        }
140
    }
44
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
44
        let _ = msg
44
            .decode::<DataStreamMsg>()
44
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
44
        Ok(())
44
    }
}
impl DataCmdChecker {
    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
    /// constructed connection.
322
    pub(crate) fn new_any() -> AnyCmdChecker {
322
        Box::<Self>::default()
322
    }
    /// Return a new boxed `DataCmdChecker` in a state suitable for a
    /// connection where an initial CONNECTED cell is not expected.
    ///
    /// This is used by hidden services, exit relays, and directory servers
    /// to accept streams.
    #[cfg(feature = "hs-service")]
24
    pub(crate) fn new_connected() -> AnyCmdChecker {
24
        Box::new(Self {
24
            expecting_connected: false,
24
        })
24
    }
}