tor_memquota/mtracker/
reclaim.rs

1//! Reclamation algorithm
2//!
3//! Implementation the of long-running [`task`] function,
4//! (which is the only export from here, the wider `mtracker` module).
5
6use super::*;
7
8mod deferred_drop;
9
10use deferred_drop::{DeferredDrop, GuardWithDeferredDrop};
11
12/// Total number of participants
13///
14/// Used in reporting and in calculations of various edge cases.
15/// On 64-bit systems, bigger than the refcounts, which are all `u32`
16type NumParticips = usize;
17
18//========== candidate victim analysis ==========
19
20/// The nominal data age of a participant
21#[derive(Ord, PartialOrd, Eq, PartialEq)]
22enum Age {
23    /// Treat this participant as having very old data
24    TreatAsVeryOld,
25    /// Data age value from the [`IsParticipant`]
26    Actual(CoarseInstant),
27}
28
29/// Participant status, as a candidate victim
30enum PStatus {
31    /// Treat participant as having data of age OldestData
32    Candidate(Age),
33    /// Tear this participant down right away
34    TearDown,
35    /// Treat participant as not having any data; don't reclaim from it
36    NoData,
37}
38
39/// Outcome of a completed reclamation run
40///
41/// This is used only within `choose_victim`, and only for logging
42#[derive(Debug, derive_more::Display)]
43enum Outcome {
44    /// We reached the low water mark
45    #[display("complete")]
46    TargetReached,
47
48    /// We didn't, but we have so many participants that that's possibly expected
49    ///
50    /// (Can only happen on 32-bit platforms.)
51    #[display("{} participants, good enough - stopping", n_particips)]
52    GoodEnough {
53        /// The number of participants
54        n_particips: NumParticips,
55    },
56}
57
58/// Figure out whether a participant is a candidate victim, and obtain its data age
59fn analyse_particip(precord: &PRecord, defer_drop: &mut DeferredDrop) -> PStatus {
60    let Some(particip) = precord.particip.upgrade() else {
61        // Oh!  This participant has vanished!
62        // We can't reclaim from it.  It may already be reclaiming.
63        // Delete it from our data structure.
64        return PStatus::TearDown;
65    };
66
67    let got_oldest = catch_unwind(AssertUnwindSafe(|| particip.get_oldest(precord.enabled)));
68    defer_drop.push(particip);
69
70    match got_oldest {
71        Ok(Some(age)) => return PStatus::Candidate(Age::Actual(age)),
72        Ok(None) => {}
73        Err(_panicked) => {
74            // _panicked is of a useless type
75            error!("bug in memory tracker: call to get_oldest panicked!");
76            return PStatus::TearDown;
77        }
78    }
79
80    // The participant claims not to have any memory
81    // There might be some cached, let's check
82
83    let Some(max_cached) = precord
84        .refcount
85        .as_usize()
86        .checked_mul(MAX_CACHE.as_usize())
87    else {
88        // WTF!  So many Participation clones that the max usage has
89        // overflowed.  (This can only happen on 32-bit platforms
90        // since refcount is a u32.)  Probably we should reclaim
91        // from this participant.
92        log_ratelim!(
93            "memtrack: participant with many clones claims to have no data";
94            Err::<Void, _>(internal!("{} Participation clones", *precord.refcount));
95        );
96        return PStatus::Candidate(Age::TreatAsVeryOld);
97    };
98
99    if precord.used.as_raw() > Qty(max_cached) {
100        // This participant is lying to us somehow.
101        log_ratelim!(
102            "memtrack: participant claims to have no data, but our accounting disagrees";
103            Err::<Void, _>(internal!("{} used (by {} clones)", precord.used, *precord.refcount));
104        );
105        return PStatus::Candidate(Age::TreatAsVeryOld);
106    }
107
108    // Participant plausibly does have no data
109    PStatus::NoData
110}
111
112//========== reclamation algorithm, the main pieces ==========
113
114/// State while reclamation is active
115struct Reclaiming {
116    /// The heap of candidates, oldest at top of heap
117    heap: BinaryHeap<Reverse<(Age, AId)>>,
118
119    /// Make this type uninhabited if memory tracking is compiled out
120    enabled: EnabledToken,
121}
122
123/// A victim we have selected for reclamation
124///
125/// This designates a specific Participant.
126///
127/// But, note that we always reclaim from an Account, so if we are reclaiming
128/// from one `Victim`, we may be reclaiming from other `Victim`s with the same
129/// `AId` and different `IsParticipant`s.  And because of inheritance, we might
130/// be reclaiming from other Accounts too.
131type Victim = (AId, drop_reentrancy::ProtectedArc<dyn IsParticipant>);
132
133/// Marker indicating that the victim's reclaim function panicked
134struct VictimPanicked;
135
136/// Set of responses from the victims, after they have all finished reclaiming.
137type VictimResponses = Vec<(AId, Result<Reclaimed, VictimPanicked>)>;
138
139impl Reclaiming {
140    /// Check to see if we should start reclaiming, and if so return a `Reclaiming`
141    ///
142    ///  1. Checks to see if usage is above `max`; if not, returns `None`
143    ///  2. Logs that we're starting reclamation
144    ///  3. Calculates the heap of data ages
145    fn maybe_start(state: &mut GuardWithDeferredDrop) -> Option<Self> {
146        let (state, deferred_drop) = state.deref_mut_both();
147
148        if *state.total_used <= state.global.config.max {
149            return None;
150        }
151
152        info!(
153            "memory tracking: {} > {}, reclamation started (target {})",
154            *state.total_used, state.config.max, state.config.low_water,
155        );
156
157        // `BinaryHeap` is a max heap, so use Rev
158        let mut heap = BinaryHeap::new();
159
160        // Build heap of participants we might want to reclaim from
161        // (and, while we're at it, tear down broken participants)
162        for (aid, arecord) in state.accounts.iter_mut() {
163            arecord.ps.retain(|_pid, precord| {
164                match analyse_particip(precord, deferred_drop) {
165                    PStatus::Candidate(age) => {
166                        heap.push(Reverse((age, aid)));
167                        true // retain
168                    }
169                    PStatus::NoData => {
170                        true // retain
171                    }
172                    PStatus::TearDown => {
173                        precord.auto_release(&mut state.global);
174                        false // remove
175                    }
176                }
177            });
178        }
179
180        Some(Reclaiming {
181            heap,
182            enabled: state.enabled,
183        })
184    }
185
186    /// If we're reclaiming, choose the next victim(s) to reclaim
187    ///
188    /// This is the account whose participant has the oldest data age,
189    /// and all of that account's children.
190    ///
191    /// We might discover that we didn't want to continue reclamation after all:
192    /// this function is responsible for checking our progress
193    /// against the low water mark.
194    ///
195    /// If reclamation should stop, this function logs, and returns `None`.
196    fn choose_victims(&mut self, state: &mut State) -> Result<Option<Vec<Victim>>, ReclaimCrashed> {
197        let stop = |state: &mut State, outcome| {
198            info!(
199                "memory tracking reclamation reached: {} (target {}): {}",
200                *state.total_used, state.config.low_water, outcome,
201            );
202            Ok(None)
203        };
204
205        if *state.total_used <= state.config.low_water {
206            return stop(state, Outcome::TargetReached);
207        }
208        let Some(Reverse((_, oldest_aid))) = self.heap.pop() else {
209            // All our remaining participants are NoData.
210            let n_particips: usize = state
211                .accounts
212                .values()
213                .map(|ar| {
214                    ar.ps
215                        .values()
216                        .map(
217                            |pr| *pr.refcount as NumParticips, // refcount is u32, so this is fine
218                        )
219                        .sum::<NumParticips>()
220                })
221                .sum::<NumParticips>();
222
223            if state
224                .total_used
225                .as_raw()
226                .as_usize()
227                .checked_div(n_particips)
228                .is_some_and(|total_used| total_used < usize::from(MAX_CACHE))
229            {
230                // On 32-bit, this could happen due to the cache, if we have
231                // 2^32 / MAX_CACHE participants.
232                return stop(state, Outcome::GoodEnough { n_particips });
233            }
234
235            // Oh dear.
236            return Err(internal!(
237                "memory accounting state corrupted: used={} n_particips={} all NoData",
238                *state.total_used,
239                n_particips,
240            )
241            .into());
242        };
243
244        // When we do partial reclamation, rather than just Collapsing:
245        //
246        // fudge next_oldest by something to do with number of loop iterations,
247        // to avoid one-allocation-each-time ping pong between multiple caches
248        //
249        // (this match statement will fail to compile when we add a non-Collapsing variant)
250        //
251        // let next_oldest = heap.peek_lowest();
252        match None {
253            None | Some(Reclaimed::Collapsing) => {}
254        }
255
256        let victim_aids = state.get_aid_and_children_recursively(oldest_aid);
257
258        let victims: Vec<Victim> = {
259            let mut particips = vec![];
260            for aid in victim_aids {
261                let Some(arecord) = state.accounts.get_mut(aid) else {
262                    // shouldn't happen but no need to panic
263                    continue;
264                };
265                arecord.ps.retain(|_pid, precord| {
266                    let Some(particip) = precord.particip.upgrade() else {
267                        // tear this down!
268                        precord.auto_release(&mut state.global);
269                        return false;
270                    };
271                    particips.push((aid, particip));
272                    true
273                });
274            }
275            particips
276        };
277
278        Ok(Some(victims))
279    }
280
281    /// Notify the chosen victims and obtain their responses
282    ///
283    /// This is the async part, and is done with the state unlocked.
284    // Doesn't actually need `self`, only `victims`, but we take it for form's sake
285    async fn notify_victims(&mut self, victims: Vec<Victim>) -> VictimResponses {
286        let enabled = self.enabled;
287
288        futures::future::join_all(
289            //
290            victims.into_iter().map(|(aid, particip)| async move {
291                let particip = particip.promise_dropping_is_ok();
292                // We run the `.reclaim()` calls within the same task (since that's what
293                // `join_all` does).  So they all run on whatever executor thread is polling
294                // the reclamation task.
295                let reclaimed = AssertUnwindSafe(particip.reclaim(enabled))
296                    .catch_unwind()
297                    .await
298                    .map_err(|_panicked| VictimPanicked);
299                // We drop the `ProtectedArc<dyn IsParticipant>` here, which is OK
300                // because we don't hold the lock.  Since drop isn't async, and
301                // `join_all` doesn't spawn tasks, we drop them sequentially.
302                (aid, reclaimed)
303            }),
304        )
305        .await
306    }
307
308    /// Process the victim's responses and update `state` accordingly
309    // Doesn't actually need `self`, only `state`, but we take it for form's sake
310    fn handle_victim_responses(&mut self, state: &mut State, responses: VictimResponses) {
311        for (aid, reclaimed) in responses {
312            match reclaimed {
313                Ok(Reclaimed::Collapsing) | Err(VictimPanicked) => {
314                    let Some(mut arecord) = state.accounts.remove(aid) else {
315                        // Account is gone, fair enough
316                        continue;
317                    };
318                    arecord.auto_release(&mut state.global);
319                    // Account is definitely gone now
320                }
321            }
322        }
323    }
324}
325
326//========== the reclamation task, in terms of the pieces ==========-
327
328/// Return value from the task, when it finishes due to the tracker being shut down
329struct TaskFinished;
330
331/// Reclaim memory until we reach low water, if necessary
332///
333/// Looks to see if we're above `config.max`.
334/// If so, constructs a list of victims, and starts reclaiming from them,
335/// until we reach low water.
336async fn inner_loop(
337    tracker: &Arc<MemoryQuotaTracker>,
338    _enabled: EnabledToken,
339) -> Result<(), ReclaimCrashed> {
340    let mut reclaiming;
341    let mut victims;
342    {
343        let mut state_guard = GuardWithDeferredDrop::new(tracker.lock()?.enabled_or_bug()?);
344
345        let Some(r) = Reclaiming::maybe_start(&mut state_guard) else {
346            return Ok(());
347        };
348        reclaiming = r;
349
350        // Duplicating this call to reclaiming.choose_victims means we don't
351        // release the lock between `maybe_start` and `choose_victims` (here)
352        // and between `handle_victim_responses` and `choose_victims` (bellw).
353        // (Releasing the lock would not be a bug, but it's not desirable.)
354        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
355            return Ok(());
356        };
357        victims = v;
358    }
359
360    loop {
361        let responses = reclaiming.notify_victims(mem::take(&mut victims)).await;
362        let mut state_guard = tracker.lock()?.enabled_or_bug()?;
363        reclaiming.handle_victim_responses(&mut state_guard, responses);
364        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
365            return Ok(());
366        };
367        victims = v;
368    }
369}
370
371/// Internal long-running task, handling reclamation - main loop
372///
373/// Handles routine logging, but not termination
374async fn task_loop(
375    tracker: &Weak<MemoryQuotaTracker>,
376    mut wakeup: mpsc::Receiver<()>,
377    enabled: EnabledToken,
378) -> Result<TaskFinished, ReclaimCrashed> {
379    loop {
380        // We don't hold a strong reference while we loop around, so we detect
381        // last drop of an actual client handle.
382        {
383            let Some(tracker) = tracker.upgrade() else {
384                return Ok(TaskFinished);
385            };
386
387            inner_loop(&tracker, enabled).await?;
388        }
389
390        let Some(()) = wakeup.next().await else {
391            // Sender dropped
392            return Ok(TaskFinished);
393        };
394    }
395}
396
397/// Internal long-running task, handling reclamation
398///
399/// This is the entrypoint used by the rest of the `tracker`.
400/// It handles logging of crashes.
401pub(super) async fn task(
402    tracker: Weak<MemoryQuotaTracker>,
403    wakeup: mpsc::Receiver<()>,
404    enabled: EnabledToken,
405) {
406    match task_loop(&tracker, wakeup, enabled).await {
407        Ok(TaskFinished) => {}
408        Err(bug) => {
409            let _: Option<()> = (|| {
410                let tracker = tracker.upgrade()?;
411                let mut state = tracker.state.as_enabled()?.lock().ok()?;
412                state.total_used.set_poisoned();
413                Some(())
414            })();
415            error_report!(bug, "memory tracker task failed");
416        }
417    }
418}