Module mq_queue

Source
Expand description

Queues that participate in the memory quota system

Wraps a communication channel, such as futures::channel::mpsc, tracks the memory use of the queue, and participates in the memory quota system.

Each item in the queue must know its memory cost, and provide it via HasMemoryCost.

New queues are created by calling the new_mq method on a ChannelSpec, for example MpscSpec or MpscUnboundedSpec.

The ends implement Stream and Sink. If the underlying channel’s sender is Clone, for example with an MPSC queue, the returned sender is also Clone.

Note that the Sender and Receiver only hold weak references to the Account. Ie, the queue is not the accountholder. The caller should keep a separate copy of the account.

§Example

use tor_memquota::{MemoryQuotaTracker, HasMemoryCost, EnabledToken};
use tor_rtcompat::{DynTimeProvider, PreferredRuntime};
use tor_memquota::mq_queue::{MpscSpec, ChannelSpec as _};

#[derive(Debug)]
struct Message(String);
impl HasMemoryCost for Message {
    fn memory_cost(&self, _: EnabledToken) -> usize { self.0.len() }
}

let runtime = PreferredRuntime::create().unwrap();
let time_prov = DynTimeProvider::new(runtime.clone());
let config  = tor_memquota::Config::builder().max(1024*1024*1024).build().unwrap();
let trk = MemoryQuotaTracker::new(&runtime, config).unwrap();
let account = trk.new_account(None).unwrap();

let (tx, rx) = MpscSpec { buffer: 10 }.new_mq::<Message>(time_prov, &account)?;

§Caveat

The memory use tracking is based on external observations, i.e., items inserted and removed.

How well this reflects the actual memory use of the channel depends on the channel’s implementation.

For example, if the channel uses a single contiguous buffer containing the unboxed items, and that buffer doesn’t shrink, then the memory tracking can be based on an underestimate. (This is significantly mitigated if the bulk of the memory use for each item is separately boxed.)

Structs§

CollapsedDueToReclaim 🔒
Marker, appears in state as Err to mean “we have collapsed”
Entry 🔒
Entry in in the inner queue
MpscSpec
Specification for a (bounded) MPSC channel
MpscUnboundedSpec
Specification for an unbounded MPSC channel
Receiver
Receiver for a channel that participates in the memory quota system
ReceiverInner 🔒
Payload of Receiver, that’s within the Arc, but contains the Mutex.
ReceiverState 🔒
Mutable state of a Receiver
Sender
Sender for a channel that participates in the memory quota system

Enums§

CollapseReason
Argument to CollapseCallback: why are we collapsing?
SendError
Error returned when trying to write to a Sender

Traits§

ChannelSpec
Specification for a communication channel

Functions§

receiver_state_debug_collapse_notify 🔒
Method for educe’s Debug impl for ReceiverState.collapse_callbacks

Type Aliases§

CollapseCallback
Callback passed to Receiver::register_collapse_hook