StreamPollSet

Struct StreamPollSet 

Source
pub struct StreamPollSet<K, P, S>
where S: PeekableStream + Unpin,
{ priorities: HashMap<K, (P, InTunnelActivity)>, ready_streams: BTreeMap<(P, K), S>, pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>, tunnel_activity: TunnelActivity, }
Expand description

Manages a dynamic set of [futures::Stream] with associated keys and priorities.

Notable features:

  • Prioritization: streams have an associated priority, and ready-streams are iterated over in ascending priority order.
  • Efficient polling: an unready stream won’t be polled again until it’s ready or exhausted (e.g. a corresponding [futures::Sink] is written-to or dropped). A ready stream won’t be polled again until the ready item has been removed.

Fields§

§priorities: HashMap<K, (P, InTunnelActivity)>

Priority for each stream in the set, and associated InTunnelActivity token.

§ready_streams: BTreeMap<(P, K), S>

Streams that have a result ready, in ascending order by priority.

§pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>

Streams for which we’re still waiting for the next result.

§tunnel_activity: TunnelActivity

Information about how active this particular hop has been, with respect to tracking overall tunnel activity.

Implementations§

Source§

impl<K, P, S> StreamPollSet<K, P, S>
where K: Ord + Hash + Clone + Send + Sync + 'static, S: PeekableStream + Unpin, P: Ord + Clone,

Source

pub fn new() -> Self

Create a new, empty, StreamPollSet.

Source

pub fn try_insert( &mut self, key: K, priority: P, stream: S, ) -> Result<(), KeyAlreadyInsertedError<K, P, S>>

Insert a stream, with an associated key and priority.

If the key is already in use, the parameters are returned without altering self.

Source

pub fn remove(&mut self, key: &K) -> Option<(K, P, S)>

Remove the entry for key, if any. This is the key, priority, buffered poll_next result, and stream.

Source

pub fn poll_ready_iter_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a + use<'a, K, P, S>

Polls streams that are ready to be polled, and returns an iterator over all streams for which we have a buffered Poll::Ready result, in ascending priority order.

Registers the provided Context to be woken when any of the internal streams that weren’t ready in the previous call to this method (and therefore wouldn’t have appeared in the iterator results) become potentially ready (based on when the inner stream wakes the Context provided to its own poll_next).

The same restrictions apply as for Self::stream_mut. e.g. do not directly call PeekableStream::poll_peek to see what item is available on the stream; instead use Self::peek_mut. (Or tor_async_utils::peekable_stream::UnobtrusivePeekableStream if implemented for the stream).

This method does not drain ready items. Some values can be removed with Self::take_ready_value_and_reprioritize. None values can only be removed by removing the whole stream with Self::remove.

This API is meant to allow callers to find the first stream (in priority order) that is ready, and that the caller is able to process now. i.e. it’s specifically to support the use-case where external factors may prevent the processing of some streams but not others.

Example:

# // We need the `nocompile` since `StreamPollSet` is non-pub.
# // TODO: take away the nocompile if we make this pub or implement some
# // workaround to expose it to doc-tests.
# type Key=u64;
# type Value=u64;
# type Priority=u64;
# type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
# fn can_process(key: &Key, val: &Value) -> bool { true }
# fn process(val: Value) { }
# fn new_priority(priority: &Priority) -> Priority { *priority }
fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
  let mut iter = sps.poll_ready_iter(cx);
  while let Some((key, priority, stream)) = iter.next() {
    let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
       // Stream exhausted. Remove the stream. We have to drop the iterator
       // first, though, so that we can mutate.
       let key = *key;
       drop(iter);
       sps.remove(&key).unwrap();
       return std::task::Poll::Ready(());
    };
    if can_process(key, value) {
       let key = *key;
       let priority = new_priority(priority);
       drop(iter);
       let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
       process(value);
       return std::task::Poll::Ready(());
    }
  }
  return std::task::Poll::Pending;
}
Source

pub fn take_ready_value_and_reprioritize( &mut self, key: &K, new_priority: P, ) -> Option<(P, S::Item)>

If the stream for key has Some(value) ready, take that value and set the priority for it to new_priority.

This method doesn’t register a waker with the polled stream. Use poll_ready_iter to ensure streams make progress.

If the key doesn’t exist, the stream isn’t ready, or the stream’s value is None (indicating the end of the stream), this function returns None without mutating anything.

Ended streams should be removed using Self::remove.

Source

pub fn peek_mut<'a>( &'a mut self, key: &K, ) -> Option<Poll<Option<&'a mut S::Item>>>

Get a mut reference to a ready value for key key, if one exists.

This method doesn’t poll the internal streams. Use poll_ready_iter to ensure streams make progress.

Source

pub fn stream(&self, key: &K) -> Option<&S>

Get a reference to the stream for key.

The same restrictions apply as for Self::stream_mut (e.g. using interior mutability).

Source

pub fn stream_mut(&mut self, key: &K) -> Option<&mut S>

Get a mut reference to the stream for key.

Polling the stream through this reference, or otherwise causing its registered Waker to be removed without waking it, will result in unspecified (but not unsound) behavior.

This is mostly intended for accessing non-Stream functionality of the stream object, though it is permitted to mutate it in a way that the stream becomes ready (potentially removing and waking its registered Waker(s)).

Source

pub fn len(&self) -> usize

Number of streams managed by this object.

Source

pub fn tunnel_activity(&self) -> TunnelActivity

Return a TunnelActivity for this hop.

Trait Implementations§

Source§

impl<K, P, S> Drop for StreamPollSet<K, P, S>
where S: PeekableStream + Unpin,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<K, P, S> Freeze for StreamPollSet<K, P, S>

§

impl<K, P, S> !RefUnwindSafe for StreamPollSet<K, P, S>

§

impl<K, P, S> Send for StreamPollSet<K, P, S>
where S: Send, K: Send, P: Send,

§

impl<K, P, S> Sync for StreamPollSet<K, P, S>
where S: Sync, K: Sync + Send, P: Sync,

§

impl<K, P, S> Unpin for StreamPollSet<K, P, S>
where K: Unpin, P: Unpin,

§

impl<K, P, S> !UnwindSafe for StreamPollSet<K, P, S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Conv for T

§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
§

impl<T> Downcast for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

Converts Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.
§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

Converts Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
§

fn as_any(&self) -> &(dyn Any + 'static)

Converts &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
§

fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)

Converts &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
§

impl<T> DowncastSend for T
where T: Any + Send,

§

fn into_any_send(self: Box<T>) -> Box<dyn Any + Send>

Converts Box<Trait> (where Trait: DowncastSend) to Box<dyn Any + Send>, which can then be downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

impl<T> DowncastSync for T
where T: Any + Send + Sync,

§

fn into_any_sync(self: Box<T>) -> Box<dyn Any + Send + Sync>

Converts Box<Trait> (where Trait: DowncastSync) to Box<dyn Any + Send + Sync>, which can then be downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

fn into_any_arc(self: Arc<T>) -> Arc<dyn Any + Send + Sync>

Converts Arc<Trait> (where Trait: DowncastSync) to Arc<Any>, which can then be downcast into Arc<ConcreteType> where ConcreteType implements Trait.
§

impl<T> FmtForward for T

§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> Pipe for T
where T: ?Sized,

§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
Source§

impl<T> PossiblyOption<T> for T

Source§

fn to_option(self) -> Option<T>

Convert this object into an Option<T>
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> Tap for T

§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
§

impl<T> TryConv for T

§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more