tor_proto/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::stream::StreamExt;
11use futures::task::{Context, Poll};
12use futures::{Future, Stream};
13use pin_project::pin_project;
14use postage::watch;
15
16#[cfg(feature = "tokio")]
17use tokio_crate::io::ReadBuf;
18#[cfg(feature = "tokio")]
19use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20#[cfg(feature = "tokio")]
21use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22use tor_cell::restricted_msg;
23
24use std::fmt::Debug;
25use std::io::Result as IoResult;
26use std::num::NonZero;
27use std::pin::Pin;
28#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29use std::sync::Arc;
30#[cfg(feature = "stream-ctrl")]
31use std::sync::{Mutex, Weak};
32
33use educe::Educe;
34
35#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
36use crate::tunnel::circuit::ClientCirc;
37
38use crate::memquota::StreamAccount;
39use crate::stream::xon_xoff::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
40use crate::stream::{StreamRateLimit, StreamReceiver};
41use crate::tunnel::StreamTarget;
42use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
43use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
44use tor_basic_utils::skip_fmt;
45use tor_cell::relaycell::msg::Data;
46use tor_error::internal;
47use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
48
49use super::AnyCmdChecker;
50
51/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
52///
53/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
54/// [`DataWriter`], which leaves us with this ugly type.
55/// We use a type alias to make `DataWriter` a little nicer.
56type RateConfigStream = futures::stream::Map<
57    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
58    fn(StreamRateLimit) -> RateLimitedWriterConfig,
59>;
60
61/// An anonymized stream over the Tor network.
62///
63/// For most purposes, you can think of this type as an anonymized
64/// TCP stream: it can read and write data, and get closed when it's done.
65///
66/// [`DataStream`] implements [`futures::io::AsyncRead`] and
67/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
68/// traits are expected.
69///
70/// # Examples
71///
72/// Connecting to an HTTP server and sending a request, using
73/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
74///
75/// ```ignore
76/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
77///
78/// use futures::io::AsyncWriteExt;
79///
80/// stream
81///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
82///     .await?;
83///
84/// // Flushing the stream is important; see below!
85/// stream.flush().await?;
86/// ```
87///
88/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
89///
90/// ```ignore
91/// use futures::io::AsyncReadExt;
92///
93/// let mut buf = Vec::new();
94/// stream.read_to_end(&mut buf).await?;
95///
96/// println!("{}", String::from_utf8_lossy(&buf));
97/// ```
98///
99/// # Usage with Tokio
100///
101/// If the `tokio` crate feature is enabled, this type also implements
102/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
103/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
104/// with code that expects those traits.
105///
106/// # Remember to call `flush`!
107///
108/// DataStream buffers data internally, in order to write as few cells
109/// as possible onto the network.  In order to make sure that your
110/// data has actually been sent, you need to make sure that
111/// [`AsyncWrite::poll_flush`] runs to completion: probably via
112/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
113///
114/// # Splitting the type
115///
116/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
117/// `DataStream::split` method can be used to split it into those two parts, for more
118/// convenient usage with e.g. stream combinators.
119///
120/// # How long does a stream live?
121///
122/// A `DataStream` will live until all references to it are dropped,
123/// or until it is closed explicitly.
124///
125/// If you split the stream into a `DataReader` and a `DataWriter`, it
126/// will survive until _both_ are dropped, or until it is closed
127/// explicitly.
128///
129/// A stream can also close because of a network error,
130/// or because the other side of the stream decided to close it.
131///
132// # Semver note
133//
134// Note that this type is re-exported as a part of the public API of
135// the `arti-client` crate.  Any changes to its API here in
136// `tor-proto` need to be reflected above.
137#[derive(Debug)]
138pub struct DataStream {
139    /// Underlying writer for this stream
140    w: DataWriter,
141    /// Underlying reader for this stream
142    r: DataReader,
143    /// A control object that can be used to monitor and control this stream
144    /// without needing to own it.
145    #[cfg(feature = "stream-ctrl")]
146    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
147}
148assert_impl_all! { DataStream: Send, Sync }
149
150/// An object used to control and monitor a data stream.
151///
152/// # Notes
153///
154/// This is a separate type from [`DataStream`] because it's useful to have
155/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
156/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
157/// work correctly.
158#[cfg(feature = "stream-ctrl")]
159#[derive(Debug)]
160pub struct ClientDataStreamCtrl {
161    /// The circuit to which this stream is attached.
162    ///
163    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
164    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
165    /// one of those is alive, this reference will be present.
166    ///
167    /// We make this a Weak reference so that once the stream itself is closed,
168    /// we can't leak circuits.
169    // TODO(conflux): use ClientTunnel
170    circuit: Weak<ClientCirc>,
171
172    /// Shared user-visible information about the state of this stream.
173    ///
174    /// TODO RPC: This will probably want to be a `postage::Watch` or something
175    /// similar, if and when it stops moving around.
176    #[cfg(feature = "stream-ctrl")]
177    status: Arc<Mutex<DataStreamStatus>>,
178
179    /// The memory quota account that should be used for this stream's data
180    ///
181    /// Exists to keep the account alive
182    _memquota: StreamAccount,
183}
184
185/// The inner writer for [`DataWriter`].
186///
187/// This type is responsible for taking bytes and packaging them into cells.
188/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
189#[derive(Debug)]
190struct DataWriterInner {
191    /// Internal state for this writer
192    ///
193    /// This is stored in an Option so that we can mutate it in the
194    /// AsyncWrite functions.  It might be possible to do better here,
195    /// and we should refactor if so.
196    state: Option<DataWriterState>,
197
198    /// The memory quota account that should be used for this stream's data
199    ///
200    /// Exists to keep the account alive
201    // If we liked, we could make this conditional; see DataReaderInner.memquota
202    _memquota: StreamAccount,
203
204    /// A control object that can be used to monitor and control this stream
205    /// without needing to own it.
206    #[cfg(feature = "stream-ctrl")]
207    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
208}
209
210/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
211///
212/// See the [`DataStream`] docs for more information. In particular, note
213/// that this writer requires `poll_flush` to complete in order to guarantee that
214/// all data has been written.
215///
216/// # Usage with Tokio
217///
218/// If the `tokio` crate feature is enabled, this type also implements
219/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
220/// with code that expects that trait.
221///
222/// # Drop and close
223///
224/// Note that dropping a `DataWriter` has no special effect on its own:
225/// if the `DataWriter` is dropped, the underlying stream will still remain open
226/// until the `DataReader` is also dropped.
227///
228/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
229/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
230///
231/// Remember that Tor does not support half-open streams:
232/// If you `close` or `shutdown` a stream,
233/// the other side will not see the stream as half-open,
234/// and so will (probably) not finish sending you any in-progress data.
235/// Do not use `close`/`shutdown` to communicate anything besides
236/// "I am done using this stream."
237///
238// # Semver note
239//
240// Note that this type is re-exported as a part of the public API of
241// the `arti-client` crate.  Any changes to its API here in
242// `tor-proto` need to be reflected above.
243#[derive(Debug)]
244pub struct DataWriter {
245    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
246    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
247}
248
249impl DataWriter {
250    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
251    fn new(
252        inner: DataWriterInner,
253        rate_limit_updates: watch::Receiver<StreamRateLimit>,
254        time_provider: DynTimeProvider,
255    ) -> Self {
256        /// Converts a `rate` into a `RateLimitedWriterConfig`.
257        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
258            let rate = rate.bytes_per_sec();
259            RateLimitedWriterConfig {
260                rate,        // bytes per second
261                burst: rate, // bytes
262                // This number is chosen arbitrarily, but the idea is that we want to balance
263                // between throughput and latency. Assume the user tries to write a large buffer
264                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
265                // frequently and writing a small number of bytes each time to the
266                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
267                // too large (for example 510), we'll be waking up infrequently to write a larger
268                // number of bytes each time. So even if the `DataWriterInner` has almost a full
269                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
270                // bytes before a cell can be sent, it will block until the rate limiter allows 510
271                // more bytes.
272                //
273                // TODO(arti#2028): Is there an optimal value here?
274                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
275            }
276        }
277
278        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
279        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
280
281        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
282        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
283
284        // build the rate limiter
285        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
286        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
287
288        Self { writer }
289    }
290
291    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
292    /// interact with this stream without holding the stream itself.
293    #[cfg(feature = "stream-ctrl")]
294    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
295        Some(self.writer.inner().client_stream_ctrl())
296    }
297}
298
299impl AsyncWrite for DataWriter {
300    fn poll_write(
301        mut self: Pin<&mut Self>,
302        cx: &mut Context<'_>,
303        buf: &[u8],
304    ) -> Poll<IoResult<usize>> {
305        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
306    }
307
308    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
309        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
310    }
311
312    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
313        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
314    }
315}
316
317#[cfg(feature = "tokio")]
318impl TokioAsyncWrite for DataWriter {
319    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
320        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
321    }
322
323    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
324        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
325    }
326
327    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
328        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
329    }
330}
331
332/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
333///
334/// See the [`DataStream`] docs for more information.
335///
336/// # Usage with Tokio
337///
338/// If the `tokio` crate feature is enabled, this type also implements
339/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
340/// with code that expects that trait.
341//
342// # Semver note
343//
344// Note that this type is re-exported as a part of the public API of
345// the `arti-client` crate.  Any changes to its API here in
346// `tor-proto` need to be reflected above.
347#[derive(Debug)]
348pub struct DataReader {
349    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
350    reader: XonXoffReader<DataReaderInner>,
351}
352
353impl DataReader {
354    /// Create a new [`DataReader`].
355    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
356        Self {
357            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
358        }
359    }
360
361    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
362    /// interact with this stream without holding the stream itself.
363    #[cfg(feature = "stream-ctrl")]
364    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
365        Some(self.reader.inner().client_stream_ctrl())
366    }
367}
368
369impl AsyncRead for DataReader {
370    fn poll_read(
371        mut self: Pin<&mut Self>,
372        cx: &mut Context<'_>,
373        buf: &mut [u8],
374    ) -> Poll<IoResult<usize>> {
375        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
376    }
377
378    fn poll_read_vectored(
379        mut self: Pin<&mut Self>,
380        cx: &mut Context<'_>,
381        bufs: &mut [std::io::IoSliceMut<'_>],
382    ) -> Poll<IoResult<usize>> {
383        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
384    }
385}
386
387#[cfg(feature = "tokio")]
388impl TokioAsyncRead for DataReader {
389    fn poll_read(
390        self: Pin<&mut Self>,
391        cx: &mut Context<'_>,
392        buf: &mut ReadBuf<'_>,
393    ) -> Poll<IoResult<()>> {
394        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
395    }
396}
397
398/// The inner reader for [`DataReader`].
399///
400/// This type is responsible for taking stream messages and extracting the stream data from them.
401/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
402#[derive(Debug)]
403pub(crate) struct DataReaderInner {
404    /// Internal state for this reader.
405    ///
406    /// This is stored in an Option so that we can mutate it in
407    /// poll_read().  It might be possible to do better here, and we
408    /// should refactor if so.
409    state: Option<DataReaderState>,
410
411    /// The memory quota account that should be used for this stream's data
412    ///
413    /// Exists to keep the account alive
414    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
415    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
416    _memquota: StreamAccount,
417
418    /// A control object that can be used to monitor and control this stream
419    /// without needing to own it.
420    #[cfg(feature = "stream-ctrl")]
421    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
422}
423
424impl BufferIsEmpty for DataReaderInner {
425    /// The result will become stale,
426    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
427    fn is_empty(mut self: Pin<&mut Self>) -> bool {
428        match self
429            .state
430            .as_mut()
431            .expect("forgot to put `DataReaderState` back")
432        {
433            DataReaderState::Open(imp) => {
434                // check if the partial cell in `pending` is empty,
435                // and if the message stream is empty
436                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
437            }
438            // closed, so any data should have been discarded
439            DataReaderState::Closed => true,
440        }
441    }
442}
443
444/// Shared status flags for tracking the status of as `DataStream`.
445///
446/// We expect to refactor this a bit, so it's not exposed at all.
447//
448// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
449// from various points in this module, we should instead construct
450// DataStreamStatus as needed from information available elsewhere.  In any
451// case, we should really  eliminate as much duplicate state here as we can.
452// (See discussions at !1198 for some challenges with this.)
453#[cfg(feature = "stream-ctrl")]
454#[derive(Clone, Debug, Default)]
455struct DataStreamStatus {
456    /// True if we've received a CONNECTED message.
457    //
458    // TODO: This is redundant with `connected` in DataReaderImpl and
459    // `expecting_connected` in DataCmdChecker.
460    received_connected: bool,
461    /// True if we have decided to send an END message.
462    //
463    // TODO RPC: There is not an easy way to set this from this module!  Really,
464    // the decision to send an "end" is made when the StreamTarget object is
465    // dropped, but we don't currently have any way to see when that happens.
466    // Perhaps we need a different shared StreamStatus object that the
467    // StreamTarget holds?
468    sent_end: bool,
469    /// True if we have received an END message telling us to close the stream.
470    received_end: bool,
471    /// True if we have received an error.
472    ///
473    /// (This is not a subset or superset of received_end; some errors are END
474    /// messages but some aren't; some END messages are errors but some aren't.)
475    received_err: bool,
476}
477
478#[cfg(feature = "stream-ctrl")]
479impl DataStreamStatus {
480    /// Remember that we've received a connected message.
481    fn record_connected(&mut self) {
482        self.received_connected = true;
483    }
484
485    /// Remember that we've received an error of some kind.
486    fn record_error(&mut self, e: &Error) {
487        // TODO: Probably we should remember the actual error in a box or
488        // something.  But that means making a redundant copy of the error
489        // even if nobody will want it.  Do we care?
490        match e {
491            Error::EndReceived(EndReason::DONE) => self.received_end = true,
492            Error::EndReceived(_) => {
493                self.received_end = true;
494                self.received_err = true;
495            }
496            _ => self.received_err = true,
497        }
498    }
499}
500
501restricted_msg! {
502    /// An allowable incoming message on a data stream.
503    enum DataStreamMsg:RelayMsg {
504        // SENDME is handled by the reactor.
505        Data, End, Connected,
506    }
507}
508
509// TODO RPC: Should we also implement this trait for everything that holds a
510// ClientDataStreamCtrl?
511#[cfg(feature = "stream-ctrl")]
512impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
513    // TODO(conflux): use ClientTunnel
514    fn circuit(&self) -> Option<Arc<ClientCirc>> {
515        self.circuit.upgrade()
516    }
517}
518
519#[cfg(feature = "stream-ctrl")]
520impl ClientDataStreamCtrl {
521    /// Return true if the underlying stream is connected. (That is, if it has
522    /// received a `CONNECTED` message, and has not been closed.)
523    pub fn is_connected(&self) -> bool {
524        let s = self.status.lock().expect("poisoned lock");
525        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
526    }
527
528    // TODO RPC: Add more functions once we have the desired API more nailed
529    // down.
530}
531
532impl DataStream {
533    /// Wrap raw stream receiver and target parts as a DataStream.
534    ///
535    /// For non-optimistic stream, function `wait_for_connection`
536    /// must be called after to make sure CONNECTED is received.
537    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
538        time_provider: P,
539        receiver: StreamReceiver,
540        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
541        target: StreamTarget,
542        memquota: StreamAccount,
543    ) -> Self {
544        Self::new_inner(
545            time_provider,
546            receiver,
547            xon_xoff_reader_ctrl,
548            target,
549            false,
550            memquota,
551        )
552    }
553
554    /// Wrap raw stream receiver and target parts as a connected DataStream.
555    ///
556    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
557    /// CONNECTED cell.
558    ///
559    /// This is used by hidden services, exit relays, and directory servers to accept streams.
560    #[cfg(feature = "hs-service")]
561    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
562        time_provider: P,
563        receiver: StreamReceiver,
564        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
565        target: StreamTarget,
566        memquota: StreamAccount,
567    ) -> Self {
568        Self::new_inner(
569            time_provider,
570            receiver,
571            xon_xoff_reader_ctrl,
572            target,
573            true,
574            memquota,
575        )
576    }
577
578    /// The shared implementation of the `new*()` functions.
579    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
580        time_provider: P,
581        receiver: StreamReceiver,
582        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
583        target: StreamTarget,
584        connected: bool,
585        memquota: StreamAccount,
586    ) -> Self {
587        let relay_cell_format = target.relay_cell_format();
588        let out_buf_len = Data::max_body_len(relay_cell_format);
589        let rate_limit_stream = target.rate_limit_stream().clone();
590
591        #[cfg(feature = "stream-ctrl")]
592        let status = {
593            let mut data_stream_status = DataStreamStatus::default();
594            if connected {
595                data_stream_status.record_connected();
596            }
597            Arc::new(Mutex::new(data_stream_status))
598        };
599
600        #[cfg(feature = "stream-ctrl")]
601        let ctrl = Arc::new(ClientDataStreamCtrl {
602            circuit: Arc::downgrade(target.circuit()),
603            status: status.clone(),
604            _memquota: memquota.clone(),
605        });
606        let r = DataReaderInner {
607            state: Some(DataReaderState::Open(DataReaderImpl {
608                s: receiver,
609                pending: Vec::new(),
610                offset: 0,
611                connected,
612                #[cfg(feature = "stream-ctrl")]
613                status: status.clone(),
614            })),
615            _memquota: memquota.clone(),
616            #[cfg(feature = "stream-ctrl")]
617            ctrl: ctrl.clone(),
618        };
619        let w = DataWriterInner {
620            state: Some(DataWriterState::Ready(DataWriterImpl {
621                s: target,
622                buf: vec![0; out_buf_len].into_boxed_slice(),
623                n_pending: 0,
624                #[cfg(feature = "stream-ctrl")]
625                status,
626                relay_cell_format,
627            })),
628            _memquota: memquota,
629            #[cfg(feature = "stream-ctrl")]
630            ctrl: ctrl.clone(),
631        };
632
633        let time_provider = DynTimeProvider::new(time_provider);
634
635        DataStream {
636            w: DataWriter::new(w, rate_limit_stream, time_provider),
637            r: DataReader::new(r, xon_xoff_reader_ctrl),
638            #[cfg(feature = "stream-ctrl")]
639            ctrl,
640        }
641    }
642
643    /// Divide this DataStream into its constituent parts.
644    pub fn split(self) -> (DataReader, DataWriter) {
645        (self.r, self.w)
646    }
647
648    /// Wait until a CONNECTED cell is received, or some other cell
649    /// is received to indicate an error.
650    ///
651    /// Does nothing if this stream is already connected.
652    pub async fn wait_for_connection(&mut self) -> Result<()> {
653        // We must put state back before returning
654        let state = self
655            .r
656            .reader
657            .inner_mut()
658            .state
659            .take()
660            .expect("Missing state in DataReaderInner");
661
662        if let DataReaderState::Open(mut imp) = state {
663            let result = if imp.connected {
664                Ok(())
665            } else {
666                // This succeeds if the cell is CONNECTED, and fails otherwise.
667                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
668            };
669            self.r.reader.inner_mut().state = Some(match result {
670                Err(_) => DataReaderState::Closed,
671                Ok(_) => DataReaderState::Open(imp),
672            });
673            result
674        } else {
675            Err(Error::from(internal!(
676                "Expected ready state, got {:?}",
677                state
678            )))
679        }
680    }
681
682    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
683    /// interact with this stream without holding the stream itself.
684    #[cfg(feature = "stream-ctrl")]
685    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
686        Some(&self.ctrl)
687    }
688}
689
690impl AsyncRead for DataStream {
691    fn poll_read(
692        mut self: Pin<&mut Self>,
693        cx: &mut Context<'_>,
694        buf: &mut [u8],
695    ) -> Poll<IoResult<usize>> {
696        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
697    }
698}
699
700#[cfg(feature = "tokio")]
701impl TokioAsyncRead for DataStream {
702    fn poll_read(
703        self: Pin<&mut Self>,
704        cx: &mut Context<'_>,
705        buf: &mut ReadBuf<'_>,
706    ) -> Poll<IoResult<()>> {
707        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
708    }
709}
710
711impl AsyncWrite for DataStream {
712    fn poll_write(
713        mut self: Pin<&mut Self>,
714        cx: &mut Context<'_>,
715        buf: &[u8],
716    ) -> Poll<IoResult<usize>> {
717        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
718    }
719    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
720        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
721    }
722    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
723        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
724    }
725}
726
727#[cfg(feature = "tokio")]
728impl TokioAsyncWrite for DataStream {
729    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
730        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
731    }
732
733    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
734        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
735    }
736
737    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
738        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
739    }
740}
741
742/// Helper type: Like BoxFuture, but also requires that the future be Sync.
743type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
744
745/// An enumeration for the state of a DataWriter.
746///
747/// We have to use an enum here because, for as long as we're waiting
748/// for a flush operation to complete, the future returned by
749/// `flush_cell()` owns the DataWriterImpl.
750#[derive(Educe)]
751#[educe(Debug)]
752enum DataWriterState {
753    /// The writer has closed or gotten an error: nothing more to do.
754    Closed,
755    /// The writer is not currently flushing; more data can get queued
756    /// immediately.
757    Ready(DataWriterImpl),
758    /// The writer is flushing a cell.
759    Flushing(
760        #[educe(Debug(method = "skip_fmt"))] //
761        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
762    ),
763}
764
765/// Internal: the write part of a DataStream
766#[derive(Educe)]
767#[educe(Debug)]
768struct DataWriterImpl {
769    /// The underlying StreamTarget object.
770    s: StreamTarget,
771
772    /// Buffered data to send over the connection.
773    ///
774    /// This buffer is currently allocated using a number of bytes
775    /// equal to the maximum that we can package at a time.
776    //
777    // TODO: this buffer is probably smaller than we want, but it's good
778    // enough for now.  If we _do_ make it bigger, we'll have to change
779    // our use of Data::split_from to handle the case where we can't fit
780    // all the data.
781    #[educe(Debug(method = "skip_fmt"))]
782    buf: Box<[u8]>,
783
784    /// Number of unflushed bytes in buf.
785    n_pending: usize,
786
787    /// Relay cell format in use
788    relay_cell_format: RelayCellFormat,
789
790    /// Shared user-visible information about the state of this stream.
791    #[cfg(feature = "stream-ctrl")]
792    status: Arc<Mutex<DataStreamStatus>>,
793}
794
795impl DataWriterInner {
796    /// See [`DataWriter::client_stream_ctrl`].
797    #[cfg(feature = "stream-ctrl")]
798    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
799        &self.ctrl
800    }
801
802    /// Helper for poll_flush() and poll_close(): Performs a flush, then
803    /// closes the stream if should_close is true.
804    fn poll_flush_impl(
805        mut self: Pin<&mut Self>,
806        cx: &mut Context<'_>,
807        should_close: bool,
808    ) -> Poll<IoResult<()>> {
809        let state = self.state.take().expect("Missing state in DataWriter");
810
811        // TODO: this whole function is a bit copy-pasted.
812        let mut future: BoxSyncFuture<_> = match state {
813            DataWriterState::Ready(imp) => {
814                if imp.n_pending == 0 {
815                    // Nothing to flush!
816                    if should_close {
817                        // We need to actually continue with this function to do the closing.
818                        // Thus, make a future that does nothing and is ready immediately.
819                        Box::pin(futures::future::ready((imp, Ok(()))))
820                    } else {
821                        // There's nothing more to do; we can return.
822                        self.state = Some(DataWriterState::Ready(imp));
823                        return Poll::Ready(Ok(()));
824                    }
825                } else {
826                    // We need to flush the buffer's contents; Make a future for that.
827                    Box::pin(imp.flush_buf())
828                }
829            }
830            DataWriterState::Flushing(fut) => fut,
831            DataWriterState::Closed => {
832                self.state = Some(DataWriterState::Closed);
833                return Poll::Ready(Err(Error::NotConnected.into()));
834            }
835        };
836
837        match future.as_mut().poll(cx) {
838            Poll::Ready((_imp, Err(e))) => {
839                self.state = Some(DataWriterState::Closed);
840                Poll::Ready(Err(e.into()))
841            }
842            Poll::Ready((mut imp, Ok(()))) => {
843                if should_close {
844                    // Tell the StreamTarget to close, so that the reactor
845                    // realizes that we are done sending. (Dropping `imp.s` does not
846                    // suffice, since there may be other clones of it.  In particular,
847                    // the StreamReceiver has one, which it uses to keep the stream
848                    // open, among other things.)
849                    imp.s.close();
850
851                    #[cfg(feature = "stream-ctrl")]
852                    {
853                        // TODO RPC:  This is not sufficient to track every case
854                        // where we might have sent an End.  See note on the
855                        // `sent_end` field.
856                        imp.status.lock().expect("lock poisoned").sent_end = true;
857                    }
858                    self.state = Some(DataWriterState::Closed);
859                } else {
860                    self.state = Some(DataWriterState::Ready(imp));
861                }
862                Poll::Ready(Ok(()))
863            }
864            Poll::Pending => {
865                self.state = Some(DataWriterState::Flushing(future));
866                Poll::Pending
867            }
868        }
869    }
870}
871
872impl AsyncWrite for DataWriterInner {
873    fn poll_write(
874        mut self: Pin<&mut Self>,
875        cx: &mut Context<'_>,
876        buf: &[u8],
877    ) -> Poll<IoResult<usize>> {
878        if buf.is_empty() {
879            return Poll::Ready(Ok(0));
880        }
881
882        let state = self.state.take().expect("Missing state in DataWriter");
883
884        let mut future = match state {
885            DataWriterState::Ready(mut imp) => {
886                let n_queued = imp.queue_bytes(buf);
887                if n_queued != 0 {
888                    self.state = Some(DataWriterState::Ready(imp));
889                    return Poll::Ready(Ok(n_queued));
890                }
891                // we couldn't queue anything, so the current cell must be full.
892                Box::pin(imp.flush_buf())
893            }
894            DataWriterState::Flushing(fut) => fut,
895            DataWriterState::Closed => {
896                self.state = Some(DataWriterState::Closed);
897                return Poll::Ready(Err(Error::NotConnected.into()));
898            }
899        };
900
901        match future.as_mut().poll(cx) {
902            Poll::Ready((_imp, Err(e))) => {
903                #[cfg(feature = "stream-ctrl")]
904                {
905                    _imp.status.lock().expect("lock poisoned").record_error(&e);
906                }
907                self.state = Some(DataWriterState::Closed);
908                Poll::Ready(Err(e.into()))
909            }
910            Poll::Ready((mut imp, Ok(()))) => {
911                // Great!  We're done flushing.  Queue as much as we can of this
912                // cell.
913                let n_queued = imp.queue_bytes(buf);
914                self.state = Some(DataWriterState::Ready(imp));
915                Poll::Ready(Ok(n_queued))
916            }
917            Poll::Pending => {
918                self.state = Some(DataWriterState::Flushing(future));
919                Poll::Pending
920            }
921        }
922    }
923
924    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
925        self.poll_flush_impl(cx, false)
926    }
927
928    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
929        self.poll_flush_impl(cx, true)
930    }
931}
932
933#[cfg(feature = "tokio")]
934impl TokioAsyncWrite for DataWriterInner {
935    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
936        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
937    }
938
939    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
940        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
941    }
942
943    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
944        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
945    }
946}
947
948impl DataWriterImpl {
949    /// Try to flush the current buffer contents as a data cell.
950    async fn flush_buf(mut self) -> (Self, Result<()>) {
951        let result = if let Some((cell, remainder)) =
952            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
953        {
954            // TODO: Eventually we may want a larger buffer; if we do,
955            // this invariant will become false.
956            assert!(remainder.is_empty());
957            self.n_pending = 0;
958            self.s.send(cell.into()).await
959        } else {
960            Ok(())
961        };
962
963        (self, result)
964    }
965
966    /// Add as many bytes as possible from `b` to our internal buffer;
967    /// return the number we were able to add.
968    fn queue_bytes(&mut self, b: &[u8]) -> usize {
969        let empty_space = &mut self.buf[self.n_pending..];
970        if empty_space.is_empty() {
971            // that is, len == 0
972            return 0;
973        }
974
975        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
976        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
977        self.n_pending += n_to_copy;
978        n_to_copy
979    }
980}
981
982impl DataReaderInner {
983    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
984    /// interact with this stream without holding the stream itself.
985    #[cfg(feature = "stream-ctrl")]
986    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
987        &self.ctrl
988    }
989}
990
991/// An enumeration for the state of a [`DataReaderInner`].
992// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
993// future. There are a few ways we could simplify this. See:
994// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
995#[derive(Educe)]
996#[educe(Debug)]
997// We allow this since it's expected that streams will spend most of their time in the `Open` state,
998// and will be cleaned up shortly after closing.
999#[allow(clippy::large_enum_variant)]
1000enum DataReaderState {
1001    /// In this state we have received an end cell or an error.
1002    Closed,
1003    /// In this state the reader is open.
1004    Open(DataReaderImpl),
1005}
1006
1007/// Wrapper for the read part of a [`DataStream`].
1008#[derive(Educe)]
1009#[educe(Debug)]
1010#[pin_project]
1011struct DataReaderImpl {
1012    /// The underlying StreamReceiver object.
1013    #[educe(Debug(method = "skip_fmt"))]
1014    #[pin]
1015    s: StreamReceiver,
1016
1017    /// If present, data that we received on this stream but have not
1018    /// been able to send to the caller yet.
1019    // TODO: This data structure is probably not what we want, but
1020    // it's good enough for now.
1021    #[educe(Debug(method = "skip_fmt"))]
1022    pending: Vec<u8>,
1023
1024    /// Index into pending to show what we've already read.
1025    offset: usize,
1026
1027    /// If true, we have received a CONNECTED cell on this stream.
1028    connected: bool,
1029
1030    /// Shared user-visible information about the state of this stream.
1031    #[cfg(feature = "stream-ctrl")]
1032    status: Arc<Mutex<DataStreamStatus>>,
1033}
1034
1035impl AsyncRead for DataReaderInner {
1036    fn poll_read(
1037        mut self: Pin<&mut Self>,
1038        cx: &mut Context<'_>,
1039        buf: &mut [u8],
1040    ) -> Poll<IoResult<usize>> {
1041        // We're pulling the state object out of the reader.  We MUST
1042        // put it back before this function returns.
1043        let mut state = self.state.take().expect("Missing state in DataReaderInner");
1044
1045        loop {
1046            let mut imp = match state {
1047                DataReaderState::Open(mut imp) => {
1048                    // There may be data to read already.
1049                    let n_copied = imp.extract_bytes(buf);
1050                    if n_copied != 0 || buf.is_empty() {
1051                        // We read data into the buffer, or the buffer was 0-len to begin with.
1052                        // Tell the caller.
1053                        self.state = Some(DataReaderState::Open(imp));
1054                        return Poll::Ready(Ok(n_copied));
1055                    }
1056
1057                    // No data available!  We have to try reading.
1058                    imp
1059                }
1060                DataReaderState::Closed => {
1061                    // TODO: Why are we returning an error rather than continuing to return EOF?
1062                    self.state = Some(DataReaderState::Closed);
1063                    return Poll::Ready(Err(Error::NotConnected.into()));
1064                }
1065            };
1066
1067            // See if a cell is ready.
1068            match Pin::new(&mut imp).read_cell(cx) {
1069                Poll::Ready(Err(e)) => {
1070                    // There aren't any survivable errors in the current
1071                    // design.
1072                    self.state = Some(DataReaderState::Closed);
1073                    #[cfg(feature = "stream-ctrl")]
1074                    {
1075                        imp.status.lock().expect("lock poisoned").record_error(&e);
1076                    }
1077                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
1078                        Ok(0)
1079                    } else {
1080                        Err(e.into())
1081                    };
1082                    return Poll::Ready(result);
1083                }
1084                Poll::Ready(Ok(())) => {
1085                    // It read a cell!  Continue the loop.
1086                    state = DataReaderState::Open(imp);
1087                }
1088                Poll::Pending => {
1089                    // No cells ready, so tell the
1090                    // caller to get back to us later.
1091                    self.state = Some(DataReaderState::Open(imp));
1092                    return Poll::Pending;
1093                }
1094            }
1095        }
1096    }
1097}
1098
1099#[cfg(feature = "tokio")]
1100impl TokioAsyncRead for DataReaderInner {
1101    fn poll_read(
1102        self: Pin<&mut Self>,
1103        cx: &mut Context<'_>,
1104        buf: &mut ReadBuf<'_>,
1105    ) -> Poll<IoResult<()>> {
1106        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
1107    }
1108}
1109
1110impl DataReaderImpl {
1111    /// Pull as many bytes as we can off of self.pending, and return that
1112    /// number of bytes.
1113    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
1114        let remainder = &self.pending[self.offset..];
1115        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
1116        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
1117        self.offset += n_to_copy;
1118
1119        n_to_copy
1120    }
1121
1122    /// Return true iff there are no buffered bytes here to yield
1123    fn buf_is_empty(&self) -> bool {
1124        self.pending.len() == self.offset
1125    }
1126
1127    /// Load self.pending with the contents of a new data cell.
1128    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1129        use DataStreamMsg::*;
1130        let msg = match self.as_mut().project().s.poll_next(cx) {
1131            Poll::Pending => return Poll::Pending,
1132            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<DataStreamMsg>() {
1133                Ok(cell) => cell.into_msg(),
1134                Err(e) => {
1135                    self.s.protocol_error();
1136                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
1137                }
1138            },
1139            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
1140            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
1141            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
1142            // terminated, we should be returning `None` here and not considering it as an error.
1143            // The `StreamReceiver` will have already returned an error if the cell stream was
1144            // terminated without an END message.
1145            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
1146        };
1147
1148        let result = match msg {
1149            Connected(_) if !self.connected => {
1150                self.connected = true;
1151                #[cfg(feature = "stream-ctrl")]
1152                {
1153                    self.status
1154                        .lock()
1155                        .expect("poisoned lock")
1156                        .record_connected();
1157                }
1158                Ok(())
1159            }
1160            Connected(_) => {
1161                self.s.protocol_error();
1162                Err(Error::StreamProto(
1163                    "Received a second connect cell on a data stream".to_string(),
1164                ))
1165            }
1166            Data(d) if self.connected => {
1167                self.add_data(d.into());
1168                Ok(())
1169            }
1170            Data(_) => {
1171                self.s.protocol_error();
1172                Err(Error::StreamProto(
1173                    "Received a data cell an unconnected stream".to_string(),
1174                ))
1175            }
1176            End(e) => Err(Error::EndReceived(e.reason())),
1177        };
1178
1179        Poll::Ready(result)
1180    }
1181
1182    /// Add the data from `d` to the end of our pending bytes.
1183    fn add_data(&mut self, mut d: Vec<u8>) {
1184        if self.buf_is_empty() {
1185            // No data pending?  Just take d as the new pending.
1186            self.pending = d;
1187            self.offset = 0;
1188        } else {
1189            // TODO(nickm) This has potential to grow `pending` without bound.
1190            // Fortunately, we don't currently read cells or call this
1191            // `add_data` method when pending is nonempty—but if we do in the
1192            // future, we'll have to be careful here.
1193            self.pending.append(&mut d);
1194        }
1195    }
1196}
1197
1198/// A `CmdChecker` that enforces invariants for outbound data streams.
1199#[derive(Debug)]
1200pub(crate) struct DataCmdChecker {
1201    /// True if we are expecting to receive a CONNECTED message on this stream.
1202    expecting_connected: bool,
1203}
1204
1205impl Default for DataCmdChecker {
1206    fn default() -> Self {
1207        Self {
1208            expecting_connected: true,
1209        }
1210    }
1211}
1212
1213impl super::CmdChecker for DataCmdChecker {
1214    fn check_msg(
1215        &mut self,
1216        msg: &tor_cell::relaycell::UnparsedRelayMsg,
1217    ) -> Result<super::StreamStatus> {
1218        use super::StreamStatus::*;
1219        match msg.cmd() {
1220            RelayCmd::CONNECTED => {
1221                if !self.expecting_connected {
1222                    Err(Error::StreamProto(
1223                        "Received CONNECTED twice on a stream.".into(),
1224                    ))
1225                } else {
1226                    self.expecting_connected = false;
1227                    Ok(Open)
1228                }
1229            }
1230            RelayCmd::DATA => {
1231                if !self.expecting_connected {
1232                    Ok(Open)
1233                } else {
1234                    Err(Error::StreamProto(
1235                        "Received DATA before CONNECTED on a stream".into(),
1236                    ))
1237                }
1238            }
1239            RelayCmd::END => Ok(Closed),
1240            _ => Err(Error::StreamProto(format!(
1241                "Unexpected {} on a data stream!",
1242                msg.cmd()
1243            ))),
1244        }
1245    }
1246
1247    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1248        let _ = msg
1249            .decode::<DataStreamMsg>()
1250            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1251        Ok(())
1252    }
1253}
1254
1255impl DataCmdChecker {
1256    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1257    /// constructed connection.
1258    pub(crate) fn new_any() -> AnyCmdChecker {
1259        Box::<Self>::default()
1260    }
1261
1262    /// Return a new boxed `DataCmdChecker` in a state suitable for a
1263    /// connection where an initial CONNECTED cell is not expected.
1264    ///
1265    /// This is used by hidden services, exit relays, and directory servers
1266    /// to accept streams.
1267    #[cfg(feature = "hs-service")]
1268    pub(crate) fn new_connected() -> AnyCmdChecker {
1269        Box::new(Self {
1270            expecting_connected: false,
1271        })
1272    }
1273}