tor_memquota/mtracker/reclaim.rs
1//! Reclamation algorithm
2//!
3//! Implementation the of long-running [`task`] function,
4//! (which is the only export from here, the wider `mtracker` module).
5
6use super::*;
7
8mod deferred_drop;
9
10use deferred_drop::{DeferredDrop, GuardWithDeferredDrop};
11
12/// Total number of participants
13///
14/// Used in reporting and in calculations of various edge cases.
15/// On 64-bit systems, bigger than the refcounts, which are all `u32`
16type NumParticips = usize;
17
18//========== candidate victim analysis ==========
19
20/// The nominal data age of a participant
21#[derive(Ord, PartialOrd, Eq, PartialEq)]
22enum Age {
23 /// Treat this participant as having very old data
24 TreatAsVeryOld,
25 /// Data age value from the [`IsParticipant`]
26 Actual(CoarseInstant),
27}
28
29/// Participant status, as a candidate victim
30enum PStatus {
31 /// Treat participant as having data of age OldestData
32 Candidate(Age),
33 /// Tear this participant down right away
34 TearDown,
35 /// Treat participant as not having any data; don't reclaim from it
36 NoData,
37}
38
39/// Outcome of a completed reclamation run
40///
41/// This is used only within `choose_victim`, and only for logging
42#[derive(Debug, derive_more::Display)]
43enum Outcome {
44 /// We reached the low water mark
45 #[display("complete")]
46 TargetReached,
47
48 /// We didn't, but we have so many participants that that's possibly expected
49 ///
50 /// (Can only happen on 32-bit platforms.)
51 #[display("{} participants, good enough - stopping", n_particips)]
52 GoodEnough {
53 /// The number of participants
54 n_particips: NumParticips,
55 },
56}
57
58/// Figure out whether a participant is a candidate victim, and obtain its data age
59fn analyse_particip(precord: &PRecord, defer_drop: &mut DeferredDrop) -> PStatus {
60 let Some(particip) = precord.particip.upgrade() else {
61 // Oh! This participant has vanished!
62 // We can't reclaim from it. It may already be reclaiming.
63 // Delete it from our data structure.
64 return PStatus::TearDown;
65 };
66
67 let got_oldest = catch_unwind(AssertUnwindSafe(|| particip.get_oldest(precord.enabled)));
68 defer_drop.push(particip);
69
70 match got_oldest {
71 Ok(Some(age)) => return PStatus::Candidate(Age::Actual(age)),
72 Ok(None) => {}
73 Err(_panicked) => {
74 // _panicked is of a useless type
75 error!("bug in memory tracker: call to get_oldest panicked!");
76 return PStatus::TearDown;
77 }
78 }
79
80 // The participant claims not to have any memory
81 // There might be some cached, let's check
82
83 let Some(max_cached) = precord
84 .refcount
85 .as_usize()
86 .checked_mul(MAX_CACHE.as_usize())
87 else {
88 // WTF! So many Participation clones that the max usage has
89 // overflowed. (This can only happen on 32-bit platforms
90 // since refcount is a u32.) Probably we should reclaim
91 // from this participant.
92 log_ratelim!(
93 "memtrack: participant with many clones claims to have no data";
94 Err::<Void, _>(internal!("{} Participation clones", *precord.refcount));
95 );
96 return PStatus::Candidate(Age::TreatAsVeryOld);
97 };
98
99 if precord.used.as_raw() > Qty(max_cached) {
100 // This participant is lying to us somehow.
101 log_ratelim!(
102 "memtrack: participant claims to have no data, but our accounting disagrees";
103 Err::<Void, _>(internal!("{} used (by {} clones)", precord.used, *precord.refcount));
104 );
105 return PStatus::Candidate(Age::TreatAsVeryOld);
106 }
107
108 // Participant plausibly does have no data
109 PStatus::NoData
110}
111
112//========== reclamation algorithm, the main pieces ==========
113
114/// State while reclamation is active
115struct Reclaiming {
116 /// The heap of candidates, oldest at top of heap
117 heap: BinaryHeap<Reverse<(Age, AId)>>,
118
119 /// Make this type uninhabited if memory tracking is compiled out
120 enabled: EnabledToken,
121}
122
123/// A victim we have selected for reclamation
124///
125/// This designates a specific Participant.
126///
127/// But, note that we always reclaim from an Account, so if we are reclaiming
128/// from one `Victim`, we may be reclaiming from other `Victim`s with the same
129/// `AId` and different `IsParticipant`s. And because of inheritance, we might
130/// be reclaiming from other Accounts too.
131type Victim = (AId, drop_reentrancy::ProtectedArc<dyn IsParticipant>);
132
133/// Marker indicating that the victim's reclaim function panicked
134struct VictimPanicked;
135
136/// Set of responses from the victims, after they have all finished reclaiming.
137type VictimResponses = Vec<(AId, Result<Reclaimed, VictimPanicked>)>;
138
139impl Reclaiming {
140 /// Check to see if we should start reclaiming, and if so return a `Reclaiming`
141 ///
142 /// 1. Checks to see if usage is above `max`; if not, returns `None`
143 /// 2. Logs that we're starting reclamation
144 /// 3. Calculates the heap of data ages
145 fn maybe_start(state: &mut GuardWithDeferredDrop) -> Option<Self> {
146 let (state, deferred_drop) = state.deref_mut_both();
147
148 if *state.total_used <= state.global.config.max {
149 return None;
150 }
151
152 info!(
153 "memory tracking: {} > {}, reclamation started (target {})",
154 *state.total_used, state.config.max, state.config.low_water,
155 );
156
157 // `BinaryHeap` is a max heap, so use Rev
158 let mut heap = BinaryHeap::new();
159
160 // Build heap of participants we might want to reclaim from
161 // (and, while we're at it, tear down broken participants)
162 for (aid, arecord) in state.accounts.iter_mut() {
163 arecord.ps.retain(|_pid, precord| {
164 match analyse_particip(precord, deferred_drop) {
165 PStatus::Candidate(age) => {
166 heap.push(Reverse((age, aid)));
167 true // retain
168 }
169 PStatus::NoData => {
170 true // retain
171 }
172 PStatus::TearDown => {
173 precord.auto_release(&mut state.global);
174 false // remove
175 }
176 }
177 });
178 }
179
180 Some(Reclaiming {
181 heap,
182 enabled: state.enabled,
183 })
184 }
185
186 /// If we're reclaiming, choose the next victim(s) to reclaim
187 ///
188 /// This is the account whose participant has the oldest data age,
189 /// and all of that account's children.
190 ///
191 /// We might discover that we didn't want to continue reclamation after all:
192 /// this function is responsible for checking our progress
193 /// against the low water mark.
194 ///
195 /// If reclamation should stop, this function logs, and returns `None`.
196 fn choose_victims(&mut self, state: &mut State) -> Result<Option<Vec<Victim>>, ReclaimCrashed> {
197 let stop = |state: &mut State, outcome| {
198 info!(
199 "memory tracking reclamation reached: {} (target {}): {}",
200 *state.total_used, state.config.low_water, outcome,
201 );
202 Ok(None)
203 };
204
205 if *state.total_used <= state.config.low_water {
206 return stop(state, Outcome::TargetReached);
207 }
208 let Some(Reverse((_, oldest_aid))) = self.heap.pop() else {
209 // All our remaining participants are NoData.
210 let n_particips: usize = state
211 .accounts
212 .values()
213 .map(|ar| {
214 ar.ps
215 .values()
216 .map(
217 |pr| *pr.refcount as NumParticips, // refcount is u32, so this is fine
218 )
219 .sum::<NumParticips>()
220 })
221 .sum::<NumParticips>();
222
223 if state
224 .total_used
225 .as_raw()
226 .as_usize()
227 .checked_div(n_particips)
228 .is_some_and(|total_used| total_used < usize::from(MAX_CACHE))
229 {
230 // On 32-bit, this could happen due to the cache, if we have
231 // 2^32 / MAX_CACHE participants.
232 return stop(state, Outcome::GoodEnough { n_particips });
233 }
234
235 // Oh dear.
236 return Err(internal!(
237 "memory accounting state corrupted: used={} n_particips={} all NoData",
238 *state.total_used,
239 n_particips,
240 )
241 .into());
242 };
243
244 // When we do partial reclamation, rather than just Collapsing:
245 //
246 // fudge next_oldest by something to do with number of loop iterations,
247 // to avoid one-allocation-each-time ping pong between multiple caches
248 //
249 // (this match statement will fail to compile when we add a non-Collapsing variant)
250 //
251 // let next_oldest = heap.peek_lowest();
252 match None {
253 None | Some(Reclaimed::Collapsing) => {}
254 }
255
256 let victim_aids = state.get_aid_and_children_recursively(oldest_aid);
257
258 let victims: Vec<Victim> = {
259 let mut particips = vec![];
260 for aid in victim_aids {
261 let Some(arecord) = state.accounts.get_mut(aid) else {
262 // shouldn't happen but no need to panic
263 continue;
264 };
265 arecord.ps.retain(|_pid, precord| {
266 let Some(particip) = precord.particip.upgrade() else {
267 // tear this down!
268 precord.auto_release(&mut state.global);
269 return false;
270 };
271 particips.push((aid, particip));
272 true
273 });
274 }
275 particips
276 };
277
278 Ok(Some(victims))
279 }
280
281 /// Notify the chosen victims and obtain their responses
282 ///
283 /// This is the async part, and is done with the state unlocked.
284 // Doesn't actually need `self`, only `victims`, but we take it for form's sake
285 async fn notify_victims(&mut self, victims: Vec<Victim>) -> VictimResponses {
286 let enabled = self.enabled;
287
288 futures::future::join_all(
289 //
290 victims.into_iter().map(|(aid, particip)| async move {
291 let particip = particip.promise_dropping_is_ok();
292 // We run the `.reclaim()` calls within the same task (since that's what
293 // `join_all` does). So they all run on whatever executor thread is polling
294 // the reclamation task.
295 let reclaimed = AssertUnwindSafe(particip.reclaim(enabled))
296 .catch_unwind()
297 .await
298 .map_err(|_panicked| VictimPanicked);
299 // We drop the `ProtectedArc<dyn IsParticipant>` here, which is OK
300 // because we don't hold the lock. Since drop isn't async, and
301 // `join_all` doesn't spawn tasks, we drop them sequentially.
302 (aid, reclaimed)
303 }),
304 )
305 .await
306 }
307
308 /// Process the victim's responses and update `state` accordingly
309 // Doesn't actually need `self`, only `state`, but we take it for form's sake
310 fn handle_victim_responses(&mut self, state: &mut State, responses: VictimResponses) {
311 for (aid, reclaimed) in responses {
312 match reclaimed {
313 Ok(Reclaimed::Collapsing) | Err(VictimPanicked) => {
314 let Some(mut arecord) = state.accounts.remove(aid) else {
315 // Account is gone, fair enough
316 continue;
317 };
318 arecord.auto_release(&mut state.global);
319 // Account is definitely gone now
320 }
321 }
322 }
323 }
324}
325
326//========== the reclamation task, in terms of the pieces ==========-
327
328/// Return value from the task, when it finishes due to the tracker being shut down
329struct TaskFinished;
330
331/// Reclaim memory until we reach low water, if necessary
332///
333/// Looks to see if we're above `config.max`.
334/// If so, constructs a list of victims, and starts reclaiming from them,
335/// until we reach low water.
336async fn inner_loop(
337 tracker: &Arc<MemoryQuotaTracker>,
338 _enabled: EnabledToken,
339) -> Result<(), ReclaimCrashed> {
340 let mut reclaiming;
341 let mut victims;
342 {
343 let mut state_guard = GuardWithDeferredDrop::new(tracker.lock()?.enabled_or_bug()?);
344
345 let Some(r) = Reclaiming::maybe_start(&mut state_guard) else {
346 return Ok(());
347 };
348 reclaiming = r;
349
350 // Duplicating this call to reclaiming.choose_victims means we don't
351 // release the lock between `maybe_start` and `choose_victims` (here)
352 // and between `handle_victim_responses` and `choose_victims` (bellw).
353 // (Releasing the lock would not be a bug, but it's not desirable.)
354 let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
355 return Ok(());
356 };
357 victims = v;
358 }
359
360 loop {
361 let responses = reclaiming.notify_victims(mem::take(&mut victims)).await;
362 let mut state_guard = tracker.lock()?.enabled_or_bug()?;
363 reclaiming.handle_victim_responses(&mut state_guard, responses);
364 let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
365 return Ok(());
366 };
367 victims = v;
368 }
369}
370
371/// Internal long-running task, handling reclamation - main loop
372///
373/// Handles routine logging, but not termination
374async fn task_loop(
375 tracker: &Weak<MemoryQuotaTracker>,
376 mut wakeup: mpsc::Receiver<()>,
377 enabled: EnabledToken,
378) -> Result<TaskFinished, ReclaimCrashed> {
379 loop {
380 // We don't hold a strong reference while we loop around, so we detect
381 // last drop of an actual client handle.
382 {
383 let Some(tracker) = tracker.upgrade() else {
384 return Ok(TaskFinished);
385 };
386
387 inner_loop(&tracker, enabled).await?;
388 }
389
390 let Some(()) = wakeup.next().await else {
391 // Sender dropped
392 return Ok(TaskFinished);
393 };
394 }
395}
396
397/// Internal long-running task, handling reclamation
398///
399/// This is the entrypoint used by the rest of the `tracker`.
400/// It handles logging of crashes.
401pub(super) async fn task(
402 tracker: Weak<MemoryQuotaTracker>,
403 wakeup: mpsc::Receiver<()>,
404 enabled: EnabledToken,
405) {
406 match task_loop(&tracker, wakeup, enabled).await {
407 Ok(TaskFinished) => {}
408 Err(bug) => {
409 let _: Option<()> = (|| {
410 let tracker = tracker.upgrade()?;
411 let mut state = tracker.state.as_enabled()?.lock().ok()?;
412 state.total_used.set_poisoned();
413 Some(())
414 })();
415 error_report!(bug, "memory tracker task failed");
416 }
417 }
418}