tor_proto/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::task::{Context, Poll};
11use futures::Future;
12
13#[cfg(feature = "tokio")]
14use tokio_crate::io::ReadBuf;
15#[cfg(feature = "tokio")]
16use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
17#[cfg(feature = "tokio")]
18use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
19use tor_cell::restricted_msg;
20
21use std::fmt::Debug;
22use std::io::Result as IoResult;
23use std::pin::Pin;
24#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
25use std::sync::Arc;
26#[cfg(feature = "stream-ctrl")]
27use std::sync::{Mutex, Weak};
28
29use educe::Educe;
30
31#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
32use crate::tunnel::circuit::ClientCirc;
33
34use crate::memquota::StreamAccount;
35use crate::stream::StreamReader;
36use crate::tunnel::StreamTarget;
37use tor_basic_utils::skip_fmt;
38use tor_cell::relaycell::msg::Data;
39use tor_error::internal;
40
41use super::AnyCmdChecker;
42
43/// An anonymized stream over the Tor network.
44///
45/// For most purposes, you can think of this type as an anonymized
46/// TCP stream: it can read and write data, and get closed when it's done.
47///
48/// [`DataStream`] implements [`futures::io::AsyncRead`] and
49/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
50/// traits are expected.
51///
52/// # Examples
53///
54/// Connecting to an HTTP server and sending a request, using
55/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
56///
57/// ```ignore
58/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
59///
60/// use futures::io::AsyncWriteExt;
61///
62/// stream
63///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
64///     .await?;
65///
66/// // Flushing the stream is important; see below!
67/// stream.flush().await?;
68/// ```
69///
70/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
71///
72/// ```ignore
73/// use futures::io::AsyncReadExt;
74///
75/// let mut buf = Vec::new();
76/// stream.read_to_end(&mut buf).await?;
77///
78/// println!("{}", String::from_utf8_lossy(&buf));
79/// ```
80///
81/// # Usage with Tokio
82///
83/// If the `tokio` crate feature is enabled, this type also implements
84/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
85/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
86/// with code that expects those traits.
87///
88/// # Remember to call `flush`!
89///
90/// DataStream buffers data internally, in order to write as few cells
91/// as possible onto the network.  In order to make sure that your
92/// data has actually been sent, you need to make sure that
93/// [`AsyncWrite::poll_flush`] runs to completion: probably via
94/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
95///
96/// # Splitting the type
97///
98/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
99/// `DataStream::split` method can be used to split it into those two parts, for more
100/// convenient usage with e.g. stream combinators.
101///
102/// # How long does a stream live?
103///
104/// A `DataStream` will live until all references to it are dropped,
105/// or until it is closed explicitly.
106///
107/// If you split the stream into a `DataReader` and a `DataWriter`, it
108/// will survive until _both_ are dropped, or until it is closed
109/// explicitly.
110///
111/// A stream can also close because of a network error,
112/// or because the other side of the stream decided to close it.
113///
114// # Semver note
115//
116// Note that this type is re-exported as a part of the public API of
117// the `arti-client` crate.  Any changes to its API here in
118// `tor-proto` need to be reflected above.
119#[derive(Debug)]
120pub struct DataStream {
121    /// Underlying writer for this stream
122    w: DataWriter,
123    /// Underlying reader for this stream
124    r: DataReader,
125    /// A control object that can be used to monitor and control this stream
126    /// without needing to own it.
127    #[cfg(feature = "stream-ctrl")]
128    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
129}
130assert_impl_all! { DataStream: Send, Sync }
131
132/// An object used to control and monitor a data stream.
133///
134/// # Notes
135///
136/// This is a separate type from [`DataStream`] because it's useful to have
137/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
138/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
139/// work correctly.
140#[cfg(feature = "stream-ctrl")]
141#[derive(Debug)]
142pub struct ClientDataStreamCtrl {
143    /// The circuit to which this stream is attached.
144    ///
145    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
146    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
147    /// one of those is alive, this reference will be present.
148    ///
149    /// We make this a Weak reference so that once the stream itself is closed,
150    /// we can't leak circuits.
151    // TODO(conflux): use ClientTunnel
152    circuit: Weak<ClientCirc>,
153
154    /// Shared user-visible information about the state of this stream.
155    ///
156    /// TODO RPC: This will probably want to be a `postage::Watch` or something
157    /// similar, if and when it stops moving around.
158    #[cfg(feature = "stream-ctrl")]
159    status: Arc<Mutex<DataStreamStatus>>,
160
161    /// The memory quota account that should be used for this stream's data
162    ///
163    /// Exists to keep the account alive
164    _memquota: StreamAccount,
165}
166
167/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
168///
169/// See the [`DataStream`] docs for more information. In particular, note
170/// that this writer requires `poll_flush` to complete in order to guarantee that
171/// all data has been written.
172///
173/// # Usage with Tokio
174///
175/// If the `tokio` crate feature is enabled, this type also implements
176/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
177/// with code that expects that trait.
178///
179/// # Drop and close
180///
181/// Note that dropping a `DataWriter` has no special effect on its own:
182/// if the `DataWriter` is dropped, the underlying stream will still remain open
183/// until the `DataReader` is also dropped.
184///
185/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
186/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
187///
188/// Remember that Tor does not support half-open streams:
189/// If you `close` or `shutdown` a stream,
190/// the other side will not see the stream as half-open,
191/// and so will (probably) not finish sending you any in-progress data.
192/// Do not use `close`/`shutdown` to communicate anything besides
193/// "I am done using this stream."
194///
195// # Semver note
196//
197// Note that this type is re-exported as a part of the public API of
198// the `arti-client` crate.  Any changes to its API here in
199// `tor-proto` need to be reflected above.
200#[derive(Debug)]
201pub struct DataWriter {
202    /// Internal state for this writer
203    ///
204    /// This is stored in an Option so that we can mutate it in the
205    /// AsyncWrite functions.  It might be possible to do better here,
206    /// and we should refactor if so.
207    state: Option<DataWriterState>,
208
209    /// The memory quota account that should be used for this stream's data
210    ///
211    /// Exists to keep the account alive
212    // If we liked, we could make this conditional; see DataReader.memquota
213    _memquota: StreamAccount,
214
215    /// A control object that can be used to monitor and control this stream
216    /// without needing to own it.
217    #[cfg(feature = "stream-ctrl")]
218    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
219}
220
221/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
222///
223/// See the [`DataStream`] docs for more information.
224///
225/// # Usage with Tokio
226///
227/// If the `tokio` crate feature is enabled, this type also implements
228/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
229/// with code that expects that trait.
230//
231// # Semver note
232//
233// Note that this type is re-exported as a part of the public API of
234// the `arti-client` crate.  Any changes to its API here in
235// `tor-proto` need to be reflected above.
236#[derive(Debug)]
237pub struct DataReader {
238    /// Internal state for this reader.
239    ///
240    /// This is stored in an Option so that we can mutate it in
241    /// poll_read().  It might be possible to do better here, and we
242    /// should refactor if so.
243    state: Option<DataReaderState>,
244
245    /// The memory quota account that should be used for this stream's data
246    ///
247    /// Exists to keep the account alive
248    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
249    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
250    _memquota: StreamAccount,
251
252    /// A control object that can be used to monitor and control this stream
253    /// without needing to own it.
254    #[cfg(feature = "stream-ctrl")]
255    ctrl: std::sync::Arc<ClientDataStreamCtrl>,
256}
257
258/// Shared status flags for tracking the status of as `DataStream`.
259///
260/// We expect to refactor this a bit, so it's not exposed at all.
261//
262// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
263// from various points in this module, we should instead construct
264// DataStreamStatus as needed from information available elsewhere.  In any
265// case, we should really  eliminate as much duplicate state here as we can.
266// (See discussions at !1198 for some challenges with this.)
267#[cfg(feature = "stream-ctrl")]
268#[derive(Clone, Debug, Default)]
269struct DataStreamStatus {
270    /// True if we've received a CONNECTED message.
271    //
272    // TODO: This is redundant with `connected` in DataReaderImpl and
273    // `expecting_connected` in DataCmdChecker.
274    received_connected: bool,
275    /// True if we have decided to send an END message.
276    //
277    // TODO RPC: There is not an easy way to set this from this module!  Really,
278    // the decision to send an "end" is made when the StreamTarget object is
279    // dropped, but we don't currently have any way to see when that happens.
280    // Perhaps we need a different shared StreamStatus object that the
281    // StreamTarget holds?
282    sent_end: bool,
283    /// True if we have received an END message telling us to close the stream.
284    received_end: bool,
285    /// True if we have received an error.
286    ///
287    /// (This is not a subset or superset of received_end; some errors are END
288    /// messages but some aren't; some END messages are errors but some aren't.)
289    received_err: bool,
290}
291
292#[cfg(feature = "stream-ctrl")]
293impl DataStreamStatus {
294    /// Remember that we've received a connected message.
295    fn record_connected(&mut self) {
296        self.received_connected = true;
297    }
298
299    /// Remember that we've received an error of some kind.
300    fn record_error(&mut self, e: &Error) {
301        // TODO: Probably we should remember the actual error in a box or
302        // something.  But that means making a redundant copy of the error
303        // even if nobody will want it.  Do we care?
304        match e {
305            Error::EndReceived(EndReason::DONE) => self.received_end = true,
306            Error::EndReceived(_) => {
307                self.received_end = true;
308                self.received_err = true;
309            }
310            _ => self.received_err = true,
311        }
312    }
313}
314
315restricted_msg! {
316    /// An allowable incoming message on a data stream.
317    enum DataStreamMsg:RelayMsg {
318        // SENDME is handled by the reactor.
319        Data, End, Connected,
320    }
321}
322
323// TODO RPC: Should we also implement this trait for everything that holds a
324// ClientDataStreamCtrl?
325#[cfg(feature = "stream-ctrl")]
326impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
327    // TODO(conflux): use ClientTunnel
328    fn circuit(&self) -> Option<Arc<ClientCirc>> {
329        self.circuit.upgrade()
330    }
331}
332
333#[cfg(feature = "stream-ctrl")]
334impl ClientDataStreamCtrl {
335    /// Return true if the underlying stream is connected. (That is, if it has
336    /// received a `CONNECTED` message, and has not been closed.)
337    pub fn is_connected(&self) -> bool {
338        let s = self.status.lock().expect("poisoned lock");
339        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
340    }
341
342    // TODO RPC: Add more functions once we have the desired API more nailed
343    // down.
344}
345
346impl DataStream {
347    /// Wrap raw stream reader and target parts as a DataStream.
348    ///
349    /// For non-optimistic stream, function `wait_for_connection`
350    /// must be called after to make sure CONNECTED is received.
351    pub(crate) fn new(reader: StreamReader, target: StreamTarget, memquota: StreamAccount) -> Self {
352        Self::new_inner(reader, target, false, memquota)
353    }
354
355    /// Wrap raw stream reader and target parts as a connected DataStream.
356    ///
357    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
358    /// CONNECTED cell.
359    ///
360    /// This is used by hidden services, exit relays, and directory servers to accept streams.
361    #[cfg(feature = "hs-service")]
362    pub(crate) fn new_connected(
363        reader: StreamReader,
364        target: StreamTarget,
365        memquota: StreamAccount,
366    ) -> Self {
367        Self::new_inner(reader, target, true, memquota)
368    }
369
370    /// The shared implementation of the `new*()` functions.
371    fn new_inner(
372        reader: StreamReader,
373        target: StreamTarget,
374        connected: bool,
375        memquota: StreamAccount,
376    ) -> Self {
377        let relay_cell_format = target.relay_cell_format();
378        let out_buf_len = Data::max_body_len(relay_cell_format);
379
380        #[cfg(feature = "stream-ctrl")]
381        let status = {
382            let mut data_stream_status = DataStreamStatus::default();
383            if connected {
384                data_stream_status.record_connected();
385            }
386            Arc::new(Mutex::new(data_stream_status))
387        };
388
389        #[cfg(feature = "stream-ctrl")]
390        let ctrl = Arc::new(ClientDataStreamCtrl {
391            circuit: Arc::downgrade(target.circuit()),
392            status: status.clone(),
393            _memquota: memquota.clone(),
394        });
395        let r = DataReader {
396            state: Some(DataReaderState::Ready(DataReaderImpl {
397                s: reader,
398                pending: Vec::new(),
399                offset: 0,
400                connected,
401                #[cfg(feature = "stream-ctrl")]
402                status: status.clone(),
403            })),
404            _memquota: memquota.clone(),
405            #[cfg(feature = "stream-ctrl")]
406            ctrl: ctrl.clone(),
407        };
408        let w = DataWriter {
409            state: Some(DataWriterState::Ready(DataWriterImpl {
410                s: target,
411                buf: vec![0; out_buf_len].into_boxed_slice(),
412                n_pending: 0,
413                #[cfg(feature = "stream-ctrl")]
414                status,
415                relay_cell_format,
416            })),
417            _memquota: memquota,
418            #[cfg(feature = "stream-ctrl")]
419            ctrl: ctrl.clone(),
420        };
421        DataStream {
422            w,
423            r,
424            #[cfg(feature = "stream-ctrl")]
425            ctrl,
426        }
427    }
428
429    /// Divide this DataStream into its constituent parts.
430    pub fn split(self) -> (DataReader, DataWriter) {
431        (self.r, self.w)
432    }
433
434    /// Wait until a CONNECTED cell is received, or some other cell
435    /// is received to indicate an error.
436    ///
437    /// Does nothing if this stream is already connected.
438    pub async fn wait_for_connection(&mut self) -> Result<()> {
439        // We must put state back before returning
440        let state = self.r.state.take().expect("Missing state in DataReader");
441
442        if let DataReaderState::Ready(imp) = state {
443            let (imp, result) = if imp.connected {
444                (imp, Ok(()))
445            } else {
446                // This succeeds if the cell is CONNECTED, and fails otherwise.
447                imp.read_cell().await
448            };
449            self.r.state = Some(match result {
450                Err(_) => DataReaderState::Closed,
451                Ok(_) => DataReaderState::Ready(imp),
452            });
453            result
454        } else {
455            Err(Error::from(internal!(
456                "Expected ready state, got {:?}",
457                state
458            )))
459        }
460    }
461
462    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
463    /// interact with this stream without holding the stream itself.
464    #[cfg(feature = "stream-ctrl")]
465    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
466        Some(&self.ctrl)
467    }
468}
469
470impl AsyncRead for DataStream {
471    fn poll_read(
472        mut self: Pin<&mut Self>,
473        cx: &mut Context<'_>,
474        buf: &mut [u8],
475    ) -> Poll<IoResult<usize>> {
476        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
477    }
478}
479
480#[cfg(feature = "tokio")]
481impl TokioAsyncRead for DataStream {
482    fn poll_read(
483        self: Pin<&mut Self>,
484        cx: &mut Context<'_>,
485        buf: &mut ReadBuf<'_>,
486    ) -> Poll<IoResult<()>> {
487        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
488    }
489}
490
491impl AsyncWrite for DataStream {
492    fn poll_write(
493        mut self: Pin<&mut Self>,
494        cx: &mut Context<'_>,
495        buf: &[u8],
496    ) -> Poll<IoResult<usize>> {
497        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
498    }
499    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
500        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
501    }
502    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
503        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
504    }
505}
506
507#[cfg(feature = "tokio")]
508impl TokioAsyncWrite for DataStream {
509    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
510        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
511    }
512
513    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
514        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
515    }
516
517    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
518        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
519    }
520}
521
522/// Helper type: Like BoxFuture, but also requires that the future be Sync.
523type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
524
525/// An enumeration for the state of a DataWriter.
526///
527/// We have to use an enum here because, for as long as we're waiting
528/// for a flush operation to complete, the future returned by
529/// `flush_cell()` owns the DataWriterImpl.
530#[derive(Educe)]
531#[educe(Debug)]
532enum DataWriterState {
533    /// The writer has closed or gotten an error: nothing more to do.
534    Closed,
535    /// The writer is not currently flushing; more data can get queued
536    /// immediately.
537    Ready(DataWriterImpl),
538    /// The writer is flushing a cell.
539    Flushing(
540        #[educe(Debug(method = "skip_fmt"))] //
541        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
542    ),
543}
544
545/// Internal: the write part of a DataStream
546#[derive(Educe)]
547#[educe(Debug)]
548struct DataWriterImpl {
549    /// The underlying StreamTarget object.
550    s: StreamTarget,
551
552    /// Buffered data to send over the connection.
553    ///
554    /// This buffer is currently allocated using a number of bytes
555    /// equal to the maximum that we can package at a time.
556    //
557    // TODO: this buffer is probably smaller than we want, but it's good
558    // enough for now.  If we _do_ make it bigger, we'll have to change
559    // our use of Data::split_from to handle the case where we can't fit
560    // all the data.
561    #[educe(Debug(method = "skip_fmt"))]
562    buf: Box<[u8]>,
563
564    /// Number of unflushed bytes in buf.
565    n_pending: usize,
566
567    /// Relay cell format in use
568    relay_cell_format: RelayCellFormat,
569
570    /// Shared user-visible information about the state of this stream.
571    #[cfg(feature = "stream-ctrl")]
572    status: Arc<Mutex<DataStreamStatus>>,
573}
574
575impl DataWriter {
576    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
577    /// interact with this stream without holding the stream itself.
578    #[cfg(feature = "stream-ctrl")]
579    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
580        Some(&self.ctrl)
581    }
582
583    /// Helper for poll_flush() and poll_close(): Performs a flush, then
584    /// closes the stream if should_close is true.
585    fn poll_flush_impl(
586        mut self: Pin<&mut Self>,
587        cx: &mut Context<'_>,
588        should_close: bool,
589    ) -> Poll<IoResult<()>> {
590        let state = self.state.take().expect("Missing state in DataWriter");
591
592        // TODO: this whole function is a bit copy-pasted.
593        let mut future: BoxSyncFuture<_> = match state {
594            DataWriterState::Ready(imp) => {
595                if imp.n_pending == 0 {
596                    // Nothing to flush!
597                    if should_close {
598                        // We need to actually continue with this function to do the closing.
599                        // Thus, make a future that does nothing and is ready immediately.
600                        Box::pin(futures::future::ready((imp, Ok(()))))
601                    } else {
602                        // There's nothing more to do; we can return.
603                        self.state = Some(DataWriterState::Ready(imp));
604                        return Poll::Ready(Ok(()));
605                    }
606                } else {
607                    // We need to flush the buffer's contents; Make a future for that.
608                    Box::pin(imp.flush_buf())
609                }
610            }
611            DataWriterState::Flushing(fut) => fut,
612            DataWriterState::Closed => {
613                self.state = Some(DataWriterState::Closed);
614                return Poll::Ready(Err(Error::NotConnected.into()));
615            }
616        };
617
618        match future.as_mut().poll(cx) {
619            Poll::Ready((_imp, Err(e))) => {
620                self.state = Some(DataWriterState::Closed);
621                Poll::Ready(Err(e.into()))
622            }
623            Poll::Ready((mut imp, Ok(()))) => {
624                if should_close {
625                    // Tell the StreamTarget to close, so that the reactor
626                    // realizes that we are done sending. (Dropping `imp.s` does not
627                    // suffice, since there may be other clones of it.  In particular,
628                    // the StreamReader has one, which it uses to keep the stream
629                    // open, among other things.)
630                    imp.s.close();
631
632                    #[cfg(feature = "stream-ctrl")]
633                    {
634                        // TODO RPC:  This is not sufficient to track every case
635                        // where we might have sent an End.  See note on the
636                        // `sent_end` field.
637                        imp.status.lock().expect("lock poisoned").sent_end = true;
638                    }
639                    self.state = Some(DataWriterState::Closed);
640                } else {
641                    self.state = Some(DataWriterState::Ready(imp));
642                }
643                Poll::Ready(Ok(()))
644            }
645            Poll::Pending => {
646                self.state = Some(DataWriterState::Flushing(future));
647                Poll::Pending
648            }
649        }
650    }
651}
652
653impl AsyncWrite for DataWriter {
654    fn poll_write(
655        mut self: Pin<&mut Self>,
656        cx: &mut Context<'_>,
657        buf: &[u8],
658    ) -> Poll<IoResult<usize>> {
659        if buf.is_empty() {
660            return Poll::Ready(Ok(0));
661        }
662
663        let state = self.state.take().expect("Missing state in DataWriter");
664
665        let mut future = match state {
666            DataWriterState::Ready(mut imp) => {
667                let n_queued = imp.queue_bytes(buf);
668                if n_queued != 0 {
669                    self.state = Some(DataWriterState::Ready(imp));
670                    return Poll::Ready(Ok(n_queued));
671                }
672                // we couldn't queue anything, so the current cell must be full.
673                Box::pin(imp.flush_buf())
674            }
675            DataWriterState::Flushing(fut) => fut,
676            DataWriterState::Closed => {
677                self.state = Some(DataWriterState::Closed);
678                return Poll::Ready(Err(Error::NotConnected.into()));
679            }
680        };
681
682        match future.as_mut().poll(cx) {
683            Poll::Ready((_imp, Err(e))) => {
684                #[cfg(feature = "stream-ctrl")]
685                {
686                    _imp.status.lock().expect("lock poisoned").record_error(&e);
687                }
688                self.state = Some(DataWriterState::Closed);
689                Poll::Ready(Err(e.into()))
690            }
691            Poll::Ready((mut imp, Ok(()))) => {
692                // Great!  We're done flushing.  Queue as much as we can of this
693                // cell.
694                let n_queued = imp.queue_bytes(buf);
695                self.state = Some(DataWriterState::Ready(imp));
696                Poll::Ready(Ok(n_queued))
697            }
698            Poll::Pending => {
699                self.state = Some(DataWriterState::Flushing(future));
700                Poll::Pending
701            }
702        }
703    }
704
705    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
706        self.poll_flush_impl(cx, false)
707    }
708
709    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
710        self.poll_flush_impl(cx, true)
711    }
712}
713
714#[cfg(feature = "tokio")]
715impl TokioAsyncWrite for DataWriter {
716    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
717        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
718    }
719
720    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
721        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
722    }
723
724    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
725        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
726    }
727}
728
729impl DataWriterImpl {
730    /// Try to flush the current buffer contents as a data cell.
731    async fn flush_buf(mut self) -> (Self, Result<()>) {
732        let result = if let Some((cell, remainder)) =
733            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
734        {
735            // TODO: Eventually we may want a larger buffer; if we do,
736            // this invariant will become false.
737            assert!(remainder.is_empty());
738            self.n_pending = 0;
739            self.s.send(cell.into()).await
740        } else {
741            Ok(())
742        };
743
744        (self, result)
745    }
746
747    /// Add as many bytes as possible from `b` to our internal buffer;
748    /// return the number we were able to add.
749    fn queue_bytes(&mut self, b: &[u8]) -> usize {
750        let empty_space = &mut self.buf[self.n_pending..];
751        if empty_space.is_empty() {
752            // that is, len == 0
753            return 0;
754        }
755
756        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
757        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
758        self.n_pending += n_to_copy;
759        n_to_copy
760    }
761}
762
763impl DataReader {
764    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
765    /// interact with this stream without holding the stream itself.
766    #[cfg(feature = "stream-ctrl")]
767    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
768        Some(&self.ctrl)
769    }
770}
771
772/// An enumeration for the state of a DataReader.
773///
774/// We have to use an enum here because, when we're waiting for
775/// ReadingCell to complete, the future returned by `read_cell()` owns the
776/// DataCellImpl.  If we wanted to store the future and the cell at the
777/// same time, we'd need to make a self-referential structure, which isn't
778/// possible in safe Rust AIUI.
779#[derive(Educe)]
780#[educe(Debug)]
781enum DataReaderState {
782    /// In this state we have received an end cell or an error.
783    Closed,
784    /// In this state the reader is not currently fetching a cell; it
785    /// either has data or not.
786    Ready(DataReaderImpl),
787    /// The reader is currently fetching a cell: this future is the
788    /// progress it is making.
789    ReadingCell(
790        #[educe(Debug(method = "skip_fmt"))] //
791        BoxSyncFuture<'static, (DataReaderImpl, Result<()>)>,
792    ),
793}
794
795/// Wrapper for the read part of a DataStream
796#[derive(Educe)]
797#[educe(Debug)]
798struct DataReaderImpl {
799    /// The underlying StreamReader object.
800    #[educe(Debug(method = "skip_fmt"))]
801    s: StreamReader,
802
803    /// If present, data that we received on this stream but have not
804    /// been able to send to the caller yet.
805    // TODO: This data structure is probably not what we want, but
806    // it's good enough for now.
807    #[educe(Debug(method = "skip_fmt"))]
808    pending: Vec<u8>,
809
810    /// Index into pending to show what we've already read.
811    offset: usize,
812
813    /// If true, we have received a CONNECTED cell on this stream.
814    connected: bool,
815
816    /// Shared user-visible information about the state of this stream.
817    #[cfg(feature = "stream-ctrl")]
818    status: Arc<Mutex<DataStreamStatus>>,
819}
820
821impl AsyncRead for DataReader {
822    fn poll_read(
823        mut self: Pin<&mut Self>,
824        cx: &mut Context<'_>,
825        buf: &mut [u8],
826    ) -> Poll<IoResult<usize>> {
827        // We're pulling the state object out of the reader.  We MUST
828        // put it back before this function returns.
829        let mut state = self.state.take().expect("Missing state in DataReader");
830
831        loop {
832            let mut future = match state {
833                DataReaderState::Ready(mut imp) => {
834                    // There may be data to read already.
835                    let n_copied = imp.extract_bytes(buf);
836                    if n_copied != 0 {
837                        // We read data into the buffer.  Tell the caller.
838                        self.state = Some(DataReaderState::Ready(imp));
839                        return Poll::Ready(Ok(n_copied));
840                    }
841
842                    // No data available!  We have to launch a read.
843                    Box::pin(imp.read_cell())
844                }
845                DataReaderState::ReadingCell(fut) => fut,
846                DataReaderState::Closed => {
847                    self.state = Some(DataReaderState::Closed);
848                    return Poll::Ready(Err(Error::NotConnected.into()));
849                }
850            };
851
852            // We have a future that represents an in-progress read.
853            // See if it can make progress.
854            match future.as_mut().poll(cx) {
855                Poll::Ready((_imp, Err(e))) => {
856                    // There aren't any survivable errors in the current
857                    // design.
858                    self.state = Some(DataReaderState::Closed);
859                    #[cfg(feature = "stream-ctrl")]
860                    {
861                        _imp.status.lock().expect("lock poisoned").record_error(&e);
862                    }
863                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
864                        Ok(0)
865                    } else {
866                        Err(e.into())
867                    };
868                    return Poll::Ready(result);
869                }
870                Poll::Ready((imp, Ok(()))) => {
871                    // It read a cell!  Continue the loop.
872                    state = DataReaderState::Ready(imp);
873                }
874                Poll::Pending => {
875                    // The future is pending; store it and tell the
876                    // caller to get back to us later.
877                    self.state = Some(DataReaderState::ReadingCell(future));
878                    return Poll::Pending;
879                }
880            }
881        }
882    }
883}
884
885#[cfg(feature = "tokio")]
886impl TokioAsyncRead for DataReader {
887    fn poll_read(
888        self: Pin<&mut Self>,
889        cx: &mut Context<'_>,
890        buf: &mut ReadBuf<'_>,
891    ) -> Poll<IoResult<()>> {
892        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
893    }
894}
895
896impl DataReaderImpl {
897    /// Pull as many bytes as we can off of self.pending, and return that
898    /// number of bytes.
899    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
900        let remainder = &self.pending[self.offset..];
901        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
902        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
903        self.offset += n_to_copy;
904
905        n_to_copy
906    }
907
908    /// Return true iff there are no buffered bytes here to yield
909    fn buf_is_empty(&self) -> bool {
910        self.pending.len() == self.offset
911    }
912
913    /// Load self.pending with the contents of a new data cell.
914    ///
915    /// This function takes ownership of self so that we can avoid
916    /// self-referential lifetimes.
917    async fn read_cell(mut self) -> (Self, Result<()>) {
918        use DataStreamMsg::*;
919        let msg = match self.s.recv().await {
920            Ok(unparsed) => match unparsed.decode::<DataStreamMsg>() {
921                Ok(cell) => cell.into_msg(),
922                Err(e) => {
923                    self.s.protocol_error();
924                    return (
925                        self,
926                        Err(Error::from_bytes_err(e, "message on a data stream")),
927                    );
928                }
929            },
930            Err(e) => return (self, Err(e)),
931        };
932
933        let result = match msg {
934            Connected(_) if !self.connected => {
935                self.connected = true;
936                #[cfg(feature = "stream-ctrl")]
937                {
938                    self.status
939                        .lock()
940                        .expect("poisoned lock")
941                        .record_connected();
942                }
943                Ok(())
944            }
945            Connected(_) => {
946                self.s.protocol_error();
947                Err(Error::StreamProto(
948                    "Received a second connect cell on a data stream".to_string(),
949                ))
950            }
951            Data(d) if self.connected => {
952                self.add_data(d.into());
953                Ok(())
954            }
955            Data(_) => {
956                self.s.protocol_error();
957                Err(Error::StreamProto(
958                    "Received a data cell an unconnected stream".to_string(),
959                ))
960            }
961            End(e) => Err(Error::EndReceived(e.reason())),
962        };
963
964        (self, result)
965    }
966
967    /// Add the data from `d` to the end of our pending bytes.
968    fn add_data(&mut self, mut d: Vec<u8>) {
969        if self.buf_is_empty() {
970            // No data pending?  Just take d as the new pending.
971            self.pending = d;
972            self.offset = 0;
973        } else {
974            // TODO(nickm) This has potential to grow `pending` without bound.
975            // Fortunately, we don't currently read cells or call this
976            // `add_data` method when pending is nonempty—but if we do in the
977            // future, we'll have to be careful here.
978            self.pending.append(&mut d);
979        }
980    }
981}
982
983/// A `CmdChecker` that enforces invariants for outbound data streams.
984#[derive(Debug)]
985pub(crate) struct DataCmdChecker {
986    /// True if we are expecting to receive a CONNECTED message on this stream.
987    expecting_connected: bool,
988}
989
990impl Default for DataCmdChecker {
991    fn default() -> Self {
992        Self {
993            expecting_connected: true,
994        }
995    }
996}
997
998impl super::CmdChecker for DataCmdChecker {
999    fn check_msg(
1000        &mut self,
1001        msg: &tor_cell::relaycell::UnparsedRelayMsg,
1002    ) -> Result<super::StreamStatus> {
1003        use super::StreamStatus::*;
1004        match msg.cmd() {
1005            RelayCmd::CONNECTED => {
1006                if !self.expecting_connected {
1007                    Err(Error::StreamProto(
1008                        "Received CONNECTED twice on a stream.".into(),
1009                    ))
1010                } else {
1011                    self.expecting_connected = false;
1012                    Ok(Open)
1013                }
1014            }
1015            RelayCmd::DATA => {
1016                if !self.expecting_connected {
1017                    Ok(Open)
1018                } else {
1019                    Err(Error::StreamProto(
1020                        "Received DATA before CONNECTED on a stream".into(),
1021                    ))
1022                }
1023            }
1024            RelayCmd::END => Ok(Closed),
1025            _ => Err(Error::StreamProto(format!(
1026                "Unexpected {} on a data stream!",
1027                msg.cmd()
1028            ))),
1029        }
1030    }
1031
1032    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1033        let _ = msg
1034            .decode::<DataStreamMsg>()
1035            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1036        Ok(())
1037    }
1038}
1039
1040impl DataCmdChecker {
1041    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1042    /// constructed connection.
1043    pub(crate) fn new_any() -> AnyCmdChecker {
1044        Box::<Self>::default()
1045    }
1046
1047    /// Return a new boxed `DataCmdChecker` in a state suitable for a
1048    /// connection where an initial CONNECTED cell is not expected.
1049    ///
1050    /// This is used by hidden services, exit relays, and directory servers
1051    /// to accept streams.
1052    #[cfg(feature = "hs-service")]
1053    pub(crate) fn new_connected() -> AnyCmdChecker {
1054        Box::new(Self {
1055            expecting_connected: false,
1056        })
1057    }
1058}