tor_proto/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::task::{Context, Poll};
11use futures::Future;
12
13#[cfg(feature = "tokio")]
14use tokio_crate::io::ReadBuf;
15#[cfg(feature = "tokio")]
16use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
17#[cfg(feature = "tokio")]
18use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
19use tor_cell::restricted_msg;
20
21use std::fmt::Debug;
22use std::io::Result as IoResult;
23use std::num::NonZero;
24use std::pin::Pin;
25#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
26use std::sync::Arc;
27#[cfg(feature = "stream-ctrl")]
28use std::sync::{Mutex, Weak};
29
30use educe::Educe;
31
32#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
33use crate::tunnel::circuit::ClientCirc;
34
35use crate::memquota::StreamAccount;
36use crate::stream::StreamReader;
37use crate::tunnel::StreamTarget;
38use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
39use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
40use tor_basic_utils::skip_fmt;
41use tor_cell::relaycell::msg::Data;
42use tor_error::internal;
43use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
44
45use super::AnyCmdChecker;
46
47/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
48///
49/// This is not implemented yet, so it's just an `Empty` stream.
50/// We use a type alias to make `DataWriter` a little nicer.
51// TODO(arti#534): use a proper stream
52type RateConfigStream = futures::stream::Empty<RateLimitedWriterConfig>;
53
54/// An anonymized stream over the Tor network.
55///
56/// For most purposes, you can think of this type as an anonymized
57/// TCP stream: it can read and write data, and get closed when it's done.
58///
59/// [`DataStream`] implements [`futures::io::AsyncRead`] and
60/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
61/// traits are expected.
62///
63/// # Examples
64///
65/// Connecting to an HTTP server and sending a request, using
66/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
67///
68/// ```ignore
69/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
70///
71/// use futures::io::AsyncWriteExt;
72///
73/// stream
74///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
75///     .await?;
76///
77/// // Flushing the stream is important; see below!
78/// stream.flush().await?;
79/// ```
80///
81/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
82///
83/// ```ignore
84/// use futures::io::AsyncReadExt;
85///
86/// let mut buf = Vec::new();
87/// stream.read_to_end(&mut buf).await?;
88///
89/// println!("{}", String::from_utf8_lossy(&buf));
90/// ```
91///
92/// # Usage with Tokio
93///
94/// If the `tokio` crate feature is enabled, this type also implements
95/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
96/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
97/// with code that expects those traits.
98///
99/// # Remember to call `flush`!
100///
101/// DataStream buffers data internally, in order to write as few cells
102/// as possible onto the network.  In order to make sure that your
103/// data has actually been sent, you need to make sure that
104/// [`AsyncWrite::poll_flush`] runs to completion: probably via
105/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
106///
107/// # Splitting the type
108///
109/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
110/// `DataStream::split` method can be used to split it into those two parts, for more
111/// convenient usage with e.g. stream combinators.
112///
113/// # How long does a stream live?
114///
115/// A `DataStream` will live until all references to it are dropped,
116/// or until it is closed explicitly.
117///
118/// If you split the stream into a `DataReader` and a `DataWriter`, it
119/// will survive until _both_ are dropped, or until it is closed
120/// explicitly.
121///
122/// A stream can also close because of a network error,
123/// or because the other side of the stream decided to close it.
124///
125// # Semver note
126//
127// Note that this type is re-exported as a part of the public API of
128// the `arti-client` crate.  Any changes to its API here in
129// `tor-proto` need to be reflected above.
130#[derive(Debug)]
131pub struct DataStream {
132    /// Underlying writer for this stream
133    w: DataWriter,
134    /// Underlying reader for this stream
135    r: DataReader,
136    /// A control object that can be used to monitor and control this stream
137    /// without needing to own it.
138    #[cfg(feature = "stream-ctrl")]
139    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
140}
141assert_impl_all! { DataStream: Send, Sync }
142
143/// An object used to control and monitor a data stream.
144///
145/// # Notes
146///
147/// This is a separate type from [`DataStream`] because it's useful to have
148/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
149/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
150/// work correctly.
151#[cfg(feature = "stream-ctrl")]
152#[derive(Debug)]
153pub struct ClientDataStreamCtrl {
154    /// The circuit to which this stream is attached.
155    ///
156    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
157    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
158    /// one of those is alive, this reference will be present.
159    ///
160    /// We make this a Weak reference so that once the stream itself is closed,
161    /// we can't leak circuits.
162    // TODO(conflux): use ClientTunnel
163    circuit: Weak<ClientCirc>,
164
165    /// Shared user-visible information about the state of this stream.
166    ///
167    /// TODO RPC: This will probably want to be a `postage::Watch` or something
168    /// similar, if and when it stops moving around.
169    #[cfg(feature = "stream-ctrl")]
170    status: Arc<Mutex<DataStreamStatus>>,
171
172    /// The memory quota account that should be used for this stream's data
173    ///
174    /// Exists to keep the account alive
175    _memquota: StreamAccount,
176}
177
178/// The inner writer for [`DataWriter`].
179///
180/// This type is responsible for taking bytes and packaging them into cells.
181/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
182#[derive(Debug)]
183struct DataWriterInner {
184    /// Internal state for this writer
185    ///
186    /// This is stored in an Option so that we can mutate it in the
187    /// AsyncWrite functions.  It might be possible to do better here,
188    /// and we should refactor if so.
189    state: Option<DataWriterState>,
190
191    /// The memory quota account that should be used for this stream's data
192    ///
193    /// Exists to keep the account alive
194    // If we liked, we could make this conditional; see DataReader.memquota
195    _memquota: StreamAccount,
196
197    /// A control object that can be used to monitor and control this stream
198    /// without needing to own it.
199    #[cfg(feature = "stream-ctrl")]
200    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
201}
202
203/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
204///
205/// See the [`DataStream`] docs for more information. In particular, note
206/// that this writer requires `poll_flush` to complete in order to guarantee that
207/// all data has been written.
208///
209/// # Usage with Tokio
210///
211/// If the `tokio` crate feature is enabled, this type also implements
212/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
213/// with code that expects that trait.
214///
215/// # Drop and close
216///
217/// Note that dropping a `DataWriter` has no special effect on its own:
218/// if the `DataWriter` is dropped, the underlying stream will still remain open
219/// until the `DataReader` is also dropped.
220///
221/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
222/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
223///
224/// Remember that Tor does not support half-open streams:
225/// If you `close` or `shutdown` a stream,
226/// the other side will not see the stream as half-open,
227/// and so will (probably) not finish sending you any in-progress data.
228/// Do not use `close`/`shutdown` to communicate anything besides
229/// "I am done using this stream."
230///
231// # Semver note
232//
233// Note that this type is re-exported as a part of the public API of
234// the `arti-client` crate.  Any changes to its API here in
235// `tor-proto` need to be reflected above.
236#[derive(Debug)]
237pub struct DataWriter {
238    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
239    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
240}
241
242impl AsyncWrite for DataWriter {
243    fn poll_write(
244        mut self: Pin<&mut Self>,
245        cx: &mut Context<'_>,
246        buf: &[u8],
247    ) -> Poll<IoResult<usize>> {
248        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
249    }
250
251    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
252        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
253    }
254
255    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
256        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
257    }
258}
259
260#[cfg(feature = "tokio")]
261impl TokioAsyncWrite for DataWriter {
262    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
263        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
264    }
265
266    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
267        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
268    }
269
270    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
271        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
272    }
273}
274
275impl DataWriter {
276    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
277    /// interact with this stream without holding the stream itself.
278    #[cfg(feature = "stream-ctrl")]
279    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
280        Some(self.writer.inner().client_stream_ctrl())
281    }
282}
283
284/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
285///
286/// See the [`DataStream`] docs for more information.
287///
288/// # Usage with Tokio
289///
290/// If the `tokio` crate feature is enabled, this type also implements
291/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
292/// with code that expects that trait.
293//
294// # Semver note
295//
296// Note that this type is re-exported as a part of the public API of
297// the `arti-client` crate.  Any changes to its API here in
298// `tor-proto` need to be reflected above.
299#[derive(Debug)]
300pub struct DataReader {
301    /// Internal state for this reader.
302    ///
303    /// This is stored in an Option so that we can mutate it in
304    /// poll_read().  It might be possible to do better here, and we
305    /// should refactor if so.
306    state: Option<DataReaderState>,
307
308    /// The memory quota account that should be used for this stream's data
309    ///
310    /// Exists to keep the account alive
311    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
312    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
313    _memquota: StreamAccount,
314
315    /// A control object that can be used to monitor and control this stream
316    /// without needing to own it.
317    #[cfg(feature = "stream-ctrl")]
318    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
319}
320
321/// Shared status flags for tracking the status of as `DataStream`.
322///
323/// We expect to refactor this a bit, so it's not exposed at all.
324//
325// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
326// from various points in this module, we should instead construct
327// DataStreamStatus as needed from information available elsewhere.  In any
328// case, we should really  eliminate as much duplicate state here as we can.
329// (See discussions at !1198 for some challenges with this.)
330#[cfg(feature = "stream-ctrl")]
331#[derive(Clone, Debug, Default)]
332struct DataStreamStatus {
333    /// True if we've received a CONNECTED message.
334    //
335    // TODO: This is redundant with `connected` in DataReaderImpl and
336    // `expecting_connected` in DataCmdChecker.
337    received_connected: bool,
338    /// True if we have decided to send an END message.
339    //
340    // TODO RPC: There is not an easy way to set this from this module!  Really,
341    // the decision to send an "end" is made when the StreamTarget object is
342    // dropped, but we don't currently have any way to see when that happens.
343    // Perhaps we need a different shared StreamStatus object that the
344    // StreamTarget holds?
345    sent_end: bool,
346    /// True if we have received an END message telling us to close the stream.
347    received_end: bool,
348    /// True if we have received an error.
349    ///
350    /// (This is not a subset or superset of received_end; some errors are END
351    /// messages but some aren't; some END messages are errors but some aren't.)
352    received_err: bool,
353}
354
355#[cfg(feature = "stream-ctrl")]
356impl DataStreamStatus {
357    /// Remember that we've received a connected message.
358    fn record_connected(&mut self) {
359        self.received_connected = true;
360    }
361
362    /// Remember that we've received an error of some kind.
363    fn record_error(&mut self, e: &Error) {
364        // TODO: Probably we should remember the actual error in a box or
365        // something.  But that means making a redundant copy of the error
366        // even if nobody will want it.  Do we care?
367        match e {
368            Error::EndReceived(EndReason::DONE) => self.received_end = true,
369            Error::EndReceived(_) => {
370                self.received_end = true;
371                self.received_err = true;
372            }
373            _ => self.received_err = true,
374        }
375    }
376}
377
378restricted_msg! {
379    /// An allowable incoming message on a data stream.
380    enum DataStreamMsg:RelayMsg {
381        // SENDME is handled by the reactor.
382        Data, End, Connected,
383    }
384}
385
386// TODO RPC: Should we also implement this trait for everything that holds a
387// ClientDataStreamCtrl?
388#[cfg(feature = "stream-ctrl")]
389impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
390    // TODO(conflux): use ClientTunnel
391    fn circuit(&self) -> Option<Arc<ClientCirc>> {
392        self.circuit.upgrade()
393    }
394}
395
396#[cfg(feature = "stream-ctrl")]
397impl ClientDataStreamCtrl {
398    /// Return true if the underlying stream is connected. (That is, if it has
399    /// received a `CONNECTED` message, and has not been closed.)
400    pub fn is_connected(&self) -> bool {
401        let s = self.status.lock().expect("poisoned lock");
402        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
403    }
404
405    // TODO RPC: Add more functions once we have the desired API more nailed
406    // down.
407}
408
409impl DataStream {
410    /// Wrap raw stream reader and target parts as a DataStream.
411    ///
412    /// For non-optimistic stream, function `wait_for_connection`
413    /// must be called after to make sure CONNECTED is received.
414    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
415        time_provider: P,
416        reader: StreamReader,
417        target: StreamTarget,
418        memquota: StreamAccount,
419    ) -> Self {
420        Self::new_inner(time_provider, reader, target, false, memquota)
421    }
422
423    /// Wrap raw stream reader and target parts as a connected DataStream.
424    ///
425    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
426    /// CONNECTED cell.
427    ///
428    /// This is used by hidden services, exit relays, and directory servers to accept streams.
429    #[cfg(feature = "hs-service")]
430    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
431        time_provider: P,
432        reader: StreamReader,
433        target: StreamTarget,
434        memquota: StreamAccount,
435    ) -> Self {
436        Self::new_inner(time_provider, reader, target, true, memquota)
437    }
438
439    /// The shared implementation of the `new*()` functions.
440    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
441        time_provider: P,
442        reader: StreamReader,
443        target: StreamTarget,
444        connected: bool,
445        memquota: StreamAccount,
446    ) -> Self {
447        let relay_cell_format = target.relay_cell_format();
448        let out_buf_len = Data::max_body_len(relay_cell_format);
449
450        #[cfg(feature = "stream-ctrl")]
451        let status = {
452            let mut data_stream_status = DataStreamStatus::default();
453            if connected {
454                data_stream_status.record_connected();
455            }
456            Arc::new(Mutex::new(data_stream_status))
457        };
458
459        #[cfg(feature = "stream-ctrl")]
460        let ctrl = Arc::new(ClientDataStreamCtrl {
461            circuit: Arc::downgrade(target.circuit()),
462            status: status.clone(),
463            _memquota: memquota.clone(),
464        });
465        let r = DataReader {
466            state: Some(DataReaderState::Ready(DataReaderImpl {
467                s: reader,
468                pending: Vec::new(),
469                offset: 0,
470                connected,
471                #[cfg(feature = "stream-ctrl")]
472                status: status.clone(),
473            })),
474            _memquota: memquota.clone(),
475            #[cfg(feature = "stream-ctrl")]
476            ctrl: ctrl.clone(),
477        };
478        let w = DataWriterInner {
479            state: Some(DataWriterState::Ready(DataWriterImpl {
480                s: target,
481                buf: vec![0; out_buf_len].into_boxed_slice(),
482                n_pending: 0,
483                #[cfg(feature = "stream-ctrl")]
484                status,
485                relay_cell_format,
486            })),
487            _memquota: memquota,
488            #[cfg(feature = "stream-ctrl")]
489            ctrl: ctrl.clone(),
490        };
491
492        let time_provider = DynTimeProvider::new(time_provider);
493        let config = RateLimitedWriterConfig {
494            rate: u64::MAX,  // bytes per second
495            burst: u64::MAX, // bytes
496            // This number is chosen arbitrarily, but the idea is that we want to balance between
497            // throughput and latency. Assume the user tries to write a large buffer (~600 bytes).
498            // If we set this too small (for example 1), we'll be waking up frequently and writing a
499            // small number of bytes each time to the `DataWriterInner`, even if this isn't enough
500            // bytes to send a cell. If we set this too large (for example 510), we'll be waking up
501            // infrequently to write a larger number of bytes each time. So even if the
502            // `DataWriterInner` has almost a full cell's worth of data queued (for example 490) and
503            // only needs 509-490=19 more bytes before a cell can be sent, it will block until the
504            // rate limiter allows 510 more bytes.
505            //
506            // TODO(arti#2028): Is there an optimal value here?
507            wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
508        };
509
510        // TODO(arti#534): Need to be able to update this stream dynamically in response to flow
511        // control events. For now we just provide an empty stream.
512        let config_updates = futures::stream::empty();
513
514        let w = RateLimitedWriter::new(w, &config, time_provider);
515        let w = DynamicRateLimitedWriter::new(w, config_updates);
516        let w = DataWriter { writer: w };
517
518        DataStream {
519            w,
520            r,
521            #[cfg(feature = "stream-ctrl")]
522            ctrl,
523        }
524    }
525
526    /// Divide this DataStream into its constituent parts.
527    pub fn split(self) -> (DataReader, DataWriter) {
528        (self.r, self.w)
529    }
530
531    /// Wait until a CONNECTED cell is received, or some other cell
532    /// is received to indicate an error.
533    ///
534    /// Does nothing if this stream is already connected.
535    pub async fn wait_for_connection(&mut self) -> Result<()> {
536        // We must put state back before returning
537        let state = self.r.state.take().expect("Missing state in DataReader");
538
539        if let DataReaderState::Ready(imp) = state {
540            let (imp, result) = if imp.connected {
541                (imp, Ok(()))
542            } else {
543                // This succeeds if the cell is CONNECTED, and fails otherwise.
544                imp.read_cell().await
545            };
546            self.r.state = Some(match result {
547                Err(_) => DataReaderState::Closed,
548                Ok(_) => DataReaderState::Ready(imp),
549            });
550            result
551        } else {
552            Err(Error::from(internal!(
553                "Expected ready state, got {:?}",
554                state
555            )))
556        }
557    }
558
559    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
560    /// interact with this stream without holding the stream itself.
561    #[cfg(feature = "stream-ctrl")]
562    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
563        Some(&self.ctrl)
564    }
565}
566
567impl AsyncRead for DataStream {
568    fn poll_read(
569        mut self: Pin<&mut Self>,
570        cx: &mut Context<'_>,
571        buf: &mut [u8],
572    ) -> Poll<IoResult<usize>> {
573        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
574    }
575}
576
577#[cfg(feature = "tokio")]
578impl TokioAsyncRead for DataStream {
579    fn poll_read(
580        self: Pin<&mut Self>,
581        cx: &mut Context<'_>,
582        buf: &mut ReadBuf<'_>,
583    ) -> Poll<IoResult<()>> {
584        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
585    }
586}
587
588impl AsyncWrite for DataStream {
589    fn poll_write(
590        mut self: Pin<&mut Self>,
591        cx: &mut Context<'_>,
592        buf: &[u8],
593    ) -> Poll<IoResult<usize>> {
594        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
595    }
596    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
597        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
598    }
599    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
600        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
601    }
602}
603
604#[cfg(feature = "tokio")]
605impl TokioAsyncWrite for DataStream {
606    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
607        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
608    }
609
610    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
611        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
612    }
613
614    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
615        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
616    }
617}
618
619/// Helper type: Like BoxFuture, but also requires that the future be Sync.
620type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
621
622/// An enumeration for the state of a DataWriter.
623///
624/// We have to use an enum here because, for as long as we're waiting
625/// for a flush operation to complete, the future returned by
626/// `flush_cell()` owns the DataWriterImpl.
627#[derive(Educe)]
628#[educe(Debug)]
629enum DataWriterState {
630    /// The writer has closed or gotten an error: nothing more to do.
631    Closed,
632    /// The writer is not currently flushing; more data can get queued
633    /// immediately.
634    Ready(DataWriterImpl),
635    /// The writer is flushing a cell.
636    Flushing(
637        #[educe(Debug(method = "skip_fmt"))] //
638        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
639    ),
640}
641
642/// Internal: the write part of a DataStream
643#[derive(Educe)]
644#[educe(Debug)]
645struct DataWriterImpl {
646    /// The underlying StreamTarget object.
647    s: StreamTarget,
648
649    /// Buffered data to send over the connection.
650    ///
651    /// This buffer is currently allocated using a number of bytes
652    /// equal to the maximum that we can package at a time.
653    //
654    // TODO: this buffer is probably smaller than we want, but it's good
655    // enough for now.  If we _do_ make it bigger, we'll have to change
656    // our use of Data::split_from to handle the case where we can't fit
657    // all the data.
658    #[educe(Debug(method = "skip_fmt"))]
659    buf: Box<[u8]>,
660
661    /// Number of unflushed bytes in buf.
662    n_pending: usize,
663
664    /// Relay cell format in use
665    relay_cell_format: RelayCellFormat,
666
667    /// Shared user-visible information about the state of this stream.
668    #[cfg(feature = "stream-ctrl")]
669    status: Arc<Mutex<DataStreamStatus>>,
670}
671
672impl DataWriterInner {
673    /// See [`DataWriter::client_stream_ctrl`].
674    #[cfg(feature = "stream-ctrl")]
675    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
676        &self.ctrl
677    }
678
679    /// Helper for poll_flush() and poll_close(): Performs a flush, then
680    /// closes the stream if should_close is true.
681    fn poll_flush_impl(
682        mut self: Pin<&mut Self>,
683        cx: &mut Context<'_>,
684        should_close: bool,
685    ) -> Poll<IoResult<()>> {
686        let state = self.state.take().expect("Missing state in DataWriter");
687
688        // TODO: this whole function is a bit copy-pasted.
689        let mut future: BoxSyncFuture<_> = match state {
690            DataWriterState::Ready(imp) => {
691                if imp.n_pending == 0 {
692                    // Nothing to flush!
693                    if should_close {
694                        // We need to actually continue with this function to do the closing.
695                        // Thus, make a future that does nothing and is ready immediately.
696                        Box::pin(futures::future::ready((imp, Ok(()))))
697                    } else {
698                        // There's nothing more to do; we can return.
699                        self.state = Some(DataWriterState::Ready(imp));
700                        return Poll::Ready(Ok(()));
701                    }
702                } else {
703                    // We need to flush the buffer's contents; Make a future for that.
704                    Box::pin(imp.flush_buf())
705                }
706            }
707            DataWriterState::Flushing(fut) => fut,
708            DataWriterState::Closed => {
709                self.state = Some(DataWriterState::Closed);
710                return Poll::Ready(Err(Error::NotConnected.into()));
711            }
712        };
713
714        match future.as_mut().poll(cx) {
715            Poll::Ready((_imp, Err(e))) => {
716                self.state = Some(DataWriterState::Closed);
717                Poll::Ready(Err(e.into()))
718            }
719            Poll::Ready((mut imp, Ok(()))) => {
720                if should_close {
721                    // Tell the StreamTarget to close, so that the reactor
722                    // realizes that we are done sending. (Dropping `imp.s` does not
723                    // suffice, since there may be other clones of it.  In particular,
724                    // the StreamReader has one, which it uses to keep the stream
725                    // open, among other things.)
726                    imp.s.close();
727
728                    #[cfg(feature = "stream-ctrl")]
729                    {
730                        // TODO RPC:  This is not sufficient to track every case
731                        // where we might have sent an End.  See note on the
732                        // `sent_end` field.
733                        imp.status.lock().expect("lock poisoned").sent_end = true;
734                    }
735                    self.state = Some(DataWriterState::Closed);
736                } else {
737                    self.state = Some(DataWriterState::Ready(imp));
738                }
739                Poll::Ready(Ok(()))
740            }
741            Poll::Pending => {
742                self.state = Some(DataWriterState::Flushing(future));
743                Poll::Pending
744            }
745        }
746    }
747}
748
749impl AsyncWrite for DataWriterInner {
750    fn poll_write(
751        mut self: Pin<&mut Self>,
752        cx: &mut Context<'_>,
753        buf: &[u8],
754    ) -> Poll<IoResult<usize>> {
755        if buf.is_empty() {
756            return Poll::Ready(Ok(0));
757        }
758
759        let state = self.state.take().expect("Missing state in DataWriter");
760
761        let mut future = match state {
762            DataWriterState::Ready(mut imp) => {
763                let n_queued = imp.queue_bytes(buf);
764                if n_queued != 0 {
765                    self.state = Some(DataWriterState::Ready(imp));
766                    return Poll::Ready(Ok(n_queued));
767                }
768                // we couldn't queue anything, so the current cell must be full.
769                Box::pin(imp.flush_buf())
770            }
771            DataWriterState::Flushing(fut) => fut,
772            DataWriterState::Closed => {
773                self.state = Some(DataWriterState::Closed);
774                return Poll::Ready(Err(Error::NotConnected.into()));
775            }
776        };
777
778        match future.as_mut().poll(cx) {
779            Poll::Ready((_imp, Err(e))) => {
780                #[cfg(feature = "stream-ctrl")]
781                {
782                    _imp.status.lock().expect("lock poisoned").record_error(&e);
783                }
784                self.state = Some(DataWriterState::Closed);
785                Poll::Ready(Err(e.into()))
786            }
787            Poll::Ready((mut imp, Ok(()))) => {
788                // Great!  We're done flushing.  Queue as much as we can of this
789                // cell.
790                let n_queued = imp.queue_bytes(buf);
791                self.state = Some(DataWriterState::Ready(imp));
792                Poll::Ready(Ok(n_queued))
793            }
794            Poll::Pending => {
795                self.state = Some(DataWriterState::Flushing(future));
796                Poll::Pending
797            }
798        }
799    }
800
801    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
802        self.poll_flush_impl(cx, false)
803    }
804
805    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
806        self.poll_flush_impl(cx, true)
807    }
808}
809
810#[cfg(feature = "tokio")]
811impl TokioAsyncWrite for DataWriterInner {
812    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
813        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
814    }
815
816    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
817        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
818    }
819
820    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
821        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
822    }
823}
824
825impl DataWriterImpl {
826    /// Try to flush the current buffer contents as a data cell.
827    async fn flush_buf(mut self) -> (Self, Result<()>) {
828        let result = if let Some((cell, remainder)) =
829            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
830        {
831            // TODO: Eventually we may want a larger buffer; if we do,
832            // this invariant will become false.
833            assert!(remainder.is_empty());
834            self.n_pending = 0;
835            self.s.send(cell.into()).await
836        } else {
837            Ok(())
838        };
839
840        (self, result)
841    }
842
843    /// Add as many bytes as possible from `b` to our internal buffer;
844    /// return the number we were able to add.
845    fn queue_bytes(&mut self, b: &[u8]) -> usize {
846        let empty_space = &mut self.buf[self.n_pending..];
847        if empty_space.is_empty() {
848            // that is, len == 0
849            return 0;
850        }
851
852        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
853        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
854        self.n_pending += n_to_copy;
855        n_to_copy
856    }
857}
858
859impl DataReader {
860    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
861    /// interact with this stream without holding the stream itself.
862    #[cfg(feature = "stream-ctrl")]
863    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
864        Some(&self.ctrl)
865    }
866}
867
868/// An enumeration for the state of a DataReader.
869///
870/// We have to use an enum here because, when we're waiting for
871/// ReadingCell to complete, the future returned by `read_cell()` owns the
872/// DataCellImpl.  If we wanted to store the future and the cell at the
873/// same time, we'd need to make a self-referential structure, which isn't
874/// possible in safe Rust AIUI.
875#[derive(Educe)]
876#[educe(Debug)]
877enum DataReaderState {
878    /// In this state we have received an end cell or an error.
879    Closed,
880    /// In this state the reader is not currently fetching a cell; it
881    /// either has data or not.
882    Ready(DataReaderImpl),
883    /// The reader is currently fetching a cell: this future is the
884    /// progress it is making.
885    ReadingCell(
886        #[educe(Debug(method = "skip_fmt"))] //
887        BoxSyncFuture<'static, (DataReaderImpl, Result<()>)>,
888    ),
889}
890
891/// Wrapper for the read part of a DataStream
892#[derive(Educe)]
893#[educe(Debug)]
894struct DataReaderImpl {
895    /// The underlying StreamReader object.
896    #[educe(Debug(method = "skip_fmt"))]
897    s: StreamReader,
898
899    /// If present, data that we received on this stream but have not
900    /// been able to send to the caller yet.
901    // TODO: This data structure is probably not what we want, but
902    // it's good enough for now.
903    #[educe(Debug(method = "skip_fmt"))]
904    pending: Vec<u8>,
905
906    /// Index into pending to show what we've already read.
907    offset: usize,
908
909    /// If true, we have received a CONNECTED cell on this stream.
910    connected: bool,
911
912    /// Shared user-visible information about the state of this stream.
913    #[cfg(feature = "stream-ctrl")]
914    status: Arc<Mutex<DataStreamStatus>>,
915}
916
917impl AsyncRead for DataReader {
918    fn poll_read(
919        mut self: Pin<&mut Self>,
920        cx: &mut Context<'_>,
921        buf: &mut [u8],
922    ) -> Poll<IoResult<usize>> {
923        // We're pulling the state object out of the reader.  We MUST
924        // put it back before this function returns.
925        let mut state = self.state.take().expect("Missing state in DataReader");
926
927        loop {
928            let mut future = match state {
929                DataReaderState::Ready(mut imp) => {
930                    // There may be data to read already.
931                    let n_copied = imp.extract_bytes(buf);
932                    if n_copied != 0 {
933                        // We read data into the buffer.  Tell the caller.
934                        self.state = Some(DataReaderState::Ready(imp));
935                        return Poll::Ready(Ok(n_copied));
936                    }
937
938                    // No data available!  We have to launch a read.
939                    Box::pin(imp.read_cell())
940                }
941                DataReaderState::ReadingCell(fut) => fut,
942                DataReaderState::Closed => {
943                    self.state = Some(DataReaderState::Closed);
944                    return Poll::Ready(Err(Error::NotConnected.into()));
945                }
946            };
947
948            // We have a future that represents an in-progress read.
949            // See if it can make progress.
950            match future.as_mut().poll(cx) {
951                Poll::Ready((_imp, Err(e))) => {
952                    // There aren't any survivable errors in the current
953                    // design.
954                    self.state = Some(DataReaderState::Closed);
955                    #[cfg(feature = "stream-ctrl")]
956                    {
957                        _imp.status.lock().expect("lock poisoned").record_error(&e);
958                    }
959                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
960                        Ok(0)
961                    } else {
962                        Err(e.into())
963                    };
964                    return Poll::Ready(result);
965                }
966                Poll::Ready((imp, Ok(()))) => {
967                    // It read a cell!  Continue the loop.
968                    state = DataReaderState::Ready(imp);
969                }
970                Poll::Pending => {
971                    // The future is pending; store it and tell the
972                    // caller to get back to us later.
973                    self.state = Some(DataReaderState::ReadingCell(future));
974                    return Poll::Pending;
975                }
976            }
977        }
978    }
979}
980
981#[cfg(feature = "tokio")]
982impl TokioAsyncRead for DataReader {
983    fn poll_read(
984        self: Pin<&mut Self>,
985        cx: &mut Context<'_>,
986        buf: &mut ReadBuf<'_>,
987    ) -> Poll<IoResult<()>> {
988        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
989    }
990}
991
992impl DataReaderImpl {
993    /// Pull as many bytes as we can off of self.pending, and return that
994    /// number of bytes.
995    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
996        let remainder = &self.pending[self.offset..];
997        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
998        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
999        self.offset += n_to_copy;
1000
1001        n_to_copy
1002    }
1003
1004    /// Return true iff there are no buffered bytes here to yield
1005    fn buf_is_empty(&self) -> bool {
1006        self.pending.len() == self.offset
1007    }
1008
1009    /// Load self.pending with the contents of a new data cell.
1010    ///
1011    /// This function takes ownership of self so that we can avoid
1012    /// self-referential lifetimes.
1013    async fn read_cell(mut self) -> (Self, Result<()>) {
1014        use DataStreamMsg::*;
1015        let msg = match self.s.recv().await {
1016            Ok(unparsed) => match unparsed.decode::<DataStreamMsg>() {
1017                Ok(cell) => cell.into_msg(),
1018                Err(e) => {
1019                    self.s.protocol_error();
1020                    return (
1021                        self,
1022                        Err(Error::from_bytes_err(e, "message on a data stream")),
1023                    );
1024                }
1025            },
1026            Err(e) => return (self, Err(e)),
1027        };
1028
1029        let result = match msg {
1030            Connected(_) if !self.connected => {
1031                self.connected = true;
1032                #[cfg(feature = "stream-ctrl")]
1033                {
1034                    self.status
1035                        .lock()
1036                        .expect("poisoned lock")
1037                        .record_connected();
1038                }
1039                Ok(())
1040            }
1041            Connected(_) => {
1042                self.s.protocol_error();
1043                Err(Error::StreamProto(
1044                    "Received a second connect cell on a data stream".to_string(),
1045                ))
1046            }
1047            Data(d) if self.connected => {
1048                self.add_data(d.into());
1049                Ok(())
1050            }
1051            Data(_) => {
1052                self.s.protocol_error();
1053                Err(Error::StreamProto(
1054                    "Received a data cell an unconnected stream".to_string(),
1055                ))
1056            }
1057            End(e) => Err(Error::EndReceived(e.reason())),
1058        };
1059
1060        (self, result)
1061    }
1062
1063    /// Add the data from `d` to the end of our pending bytes.
1064    fn add_data(&mut self, mut d: Vec<u8>) {
1065        if self.buf_is_empty() {
1066            // No data pending?  Just take d as the new pending.
1067            self.pending = d;
1068            self.offset = 0;
1069        } else {
1070            // TODO(nickm) This has potential to grow `pending` without bound.
1071            // Fortunately, we don't currently read cells or call this
1072            // `add_data` method when pending is nonempty—but if we do in the
1073            // future, we'll have to be careful here.
1074            self.pending.append(&mut d);
1075        }
1076    }
1077}
1078
1079/// A `CmdChecker` that enforces invariants for outbound data streams.
1080#[derive(Debug)]
1081pub(crate) struct DataCmdChecker {
1082    /// True if we are expecting to receive a CONNECTED message on this stream.
1083    expecting_connected: bool,
1084}
1085
1086impl Default for DataCmdChecker {
1087    fn default() -> Self {
1088        Self {
1089            expecting_connected: true,
1090        }
1091    }
1092}
1093
1094impl super::CmdChecker for DataCmdChecker {
1095    fn check_msg(
1096        &mut self,
1097        msg: &tor_cell::relaycell::UnparsedRelayMsg,
1098    ) -> Result<super::StreamStatus> {
1099        use super::StreamStatus::*;
1100        match msg.cmd() {
1101            RelayCmd::CONNECTED => {
1102                if !self.expecting_connected {
1103                    Err(Error::StreamProto(
1104                        "Received CONNECTED twice on a stream.".into(),
1105                    ))
1106                } else {
1107                    self.expecting_connected = false;
1108                    Ok(Open)
1109                }
1110            }
1111            RelayCmd::DATA => {
1112                if !self.expecting_connected {
1113                    Ok(Open)
1114                } else {
1115                    Err(Error::StreamProto(
1116                        "Received DATA before CONNECTED on a stream".into(),
1117                    ))
1118                }
1119            }
1120            RelayCmd::END => Ok(Closed),
1121            _ => Err(Error::StreamProto(format!(
1122                "Unexpected {} on a data stream!",
1123                msg.cmd()
1124            ))),
1125        }
1126    }
1127
1128    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1129        let _ = msg
1130            .decode::<DataStreamMsg>()
1131            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1132        Ok(())
1133    }
1134}
1135
1136impl DataCmdChecker {
1137    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1138    /// constructed connection.
1139    pub(crate) fn new_any() -> AnyCmdChecker {
1140        Box::<Self>::default()
1141    }
1142
1143    /// Return a new boxed `DataCmdChecker` in a state suitable for a
1144    /// connection where an initial CONNECTED cell is not expected.
1145    ///
1146    /// This is used by hidden services, exit relays, and directory servers
1147    /// to accept streams.
1148    #[cfg(feature = "hs-service")]
1149    pub(crate) fn new_connected() -> AnyCmdChecker {
1150        Box::new(Self {
1151            expecting_connected: false,
1152        })
1153    }
1154}