1
//! Reclamation algorithm
2
//!
3
//! Implementation the of long-running [`task`] function,
4
//! (which is the only export from here, the wider `mtracker` module).
5

            
6
use super::*;
7

            
8
mod deferred_drop;
9

            
10
use deferred_drop::{DeferredDrop, GuardWithDeferredDrop};
11

            
12
/// Total number of participants
13
///
14
/// Used in reporting and in calculations of various edge cases.
15
/// On 64-bit systems, bigger than the refcounts, which are all `u32`
16
type NumParticips = usize;
17

            
18
//========== candidate victim analysis ==========
19

            
20
/// The nominal data age of a participant
21
#[derive(Ord, PartialOrd, Eq, PartialEq)]
22
enum Age {
23
    /// Treat this participant as having very old data
24
    TreatAsVeryOld,
25
    /// Data age value from the [`IsParticipant`]
26
    Actual(CoarseInstant),
27
}
28

            
29
/// Participant status, as a candidate victim
30
enum PStatus {
31
    /// Treat participant as having data of age OldestData
32
    Candidate(Age),
33
    /// Tear this participant down right away
34
    TearDown,
35
    /// Treat participant as not having any data; don't reclaim from it
36
    NoData,
37
}
38

            
39
/// Outcome of a completed reclamation run
40
///
41
/// This is used only within `choose_victim`, and only for logging
42
#[derive(Debug, derive_more::Display)]
43
enum Outcome {
44
    /// We reached the low water mark
45
    #[display("complete")]
46
    TargetReached,
47

            
48
    /// We didn't, but we have so many participants that that's possibly expected
49
    ///
50
    /// (Can only happen on 32-bit platforms.)
51
    #[display("{} participants, good enough - stopping", n_particips)]
52
    GoodEnough {
53
        /// The number of participants
54
        n_particips: NumParticips,
55
    },
56
}
57

            
58
/// Figure out whether a participant is a candidate victim, and obtain its data age
59
144
fn analyse_particip(precord: &PRecord, defer_drop: &mut DeferredDrop) -> PStatus {
60
144
    let Some(particip) = precord.particip.upgrade() else {
61
        // Oh!  This participant has vanished!
62
        // We can't reclaim from it.  It may already be reclaiming.
63
        // Delete it from our data structure.
64
4
        return PStatus::TearDown;
65
    };
66

            
67
210
    let got_oldest = catch_unwind(AssertUnwindSafe(|| particip.get_oldest(precord.enabled)));
68
140
    defer_drop.push(particip);
69

            
70
140
    match got_oldest {
71
132
        Ok(Some(age)) => return PStatus::Candidate(Age::Actual(age)),
72
8
        Ok(None) => {}
73
        Err(_panicked) => {
74
            // _panicked is of a useless type
75
            error!("memory tracker: call to get_oldest panicked!");
76
            return PStatus::TearDown;
77
        }
78
    }
79

            
80
    // The participant claims not to have any memory
81
    // There might be some cached, let's check
82

            
83
8
    let Some(max_cached) = precord
84
8
        .refcount
85
8
        .as_usize()
86
8
        .checked_mul(MAX_CACHE.as_usize())
87
    else {
88
        // WTF!  So many Participation clones that the max usage has
89
        // overflowed.  (This can only happen on 32-bit platforms
90
        // since refcount is a u32.)  Probably we should reclaim
91
        // from this participant.
92
        log_ratelim!(
93
            "memtrack: participant with many clones claims to have no data";
94
            Err::<Void, _>(internal!("{} Participation clones", *precord.refcount));
95
        );
96
        return PStatus::Candidate(Age::TreatAsVeryOld);
97
    };
98

            
99
8
    if precord.used.as_raw() > Qty(max_cached) {
100
        // This participant is lying to us somehow.
101
        log_ratelim!(
102
            "memtrack: participant claims to have no data, but our accounting disagrees";
103
            Err::<Void, _>(internal!("{} used (by {} clones)", precord.used, *precord.refcount));
104
        );
105
        return PStatus::Candidate(Age::TreatAsVeryOld);
106
8
    }
107
8

            
108
8
    // Participant plausibly does have no data
109
8
    PStatus::NoData
110
144
}
111

            
112
//========== reclamation algorithm, the main pieces ==========
113

            
114
/// State while reclamation is active
115
struct Reclaiming {
116
    /// The heap of candidates, oldest at top of heap
117
    heap: BinaryHeap<Reverse<(Age, AId)>>,
118

            
119
    /// Make this type uninhabited if memory tracking is compiled out
120
    enabled: EnabledToken,
121
}
122

            
123
/// A victim we have selected for reclamation
124
///
125
/// This designates a specific Participant.
126
///
127
/// But, note that we always reclaim from an Account, so if we are reclaiming
128
/// from one `Victim`, we may be reclaiming from other `Victim`s with the same
129
/// `AId` and different `IsParticipant`s.  And because of inheritance, we might
130
/// be reclaiming from other Accounts too.
131
type Victim = (AId, drop_reentrancy::ProtectedArc<dyn IsParticipant>);
132

            
133
/// Marker indicating that the victim's reclaim function panicked
134
struct VictimPanicked;
135

            
136
/// Set of responses from the victims, after they have all finished reclaiming.
137
type VictimResponses = Vec<(AId, Result<Reclaimed, VictimPanicked>)>;
138

            
139
impl Reclaiming {
140
    /// Check to see if we should start reclaiming, and if so return a `Reclaiming`
141
    ///
142
    ///  1. Checks to see if usage is above `max`; if not, returns `None`
143
    ///  2. Logs that we're starting reclamation
144
    ///  3. Calculates the heap of data ages
145
72
    fn maybe_start(state: &mut GuardWithDeferredDrop) -> Option<Self> {
146
72
        let (state, deferred_drop) = state.deref_mut_both();
147
72

            
148
72
        if *state.total_used <= state.global.config.max {
149
40
            return None;
150
32
        }
151
32

            
152
32
        info!(
153
            "memory tracking: {} > {}, reclamation started (target {})",
154
            *state.total_used, state.config.max, state.config.low_water,
155
        );
156

            
157
        // `BinaryHeap` is a max heap, so use Rev
158
32
        let mut heap = BinaryHeap::new();
159

            
160
        // Build heap of participants we might want to reclaim from
161
        // (and, while we're at it, tear down broken participants)
162
132
        for (aid, arecord) in state.accounts.iter_mut() {
163
210
            arecord.ps.retain(|_pid, precord| {
164
144
                match analyse_particip(precord, deferred_drop) {
165
132
                    PStatus::Candidate(age) => {
166
132
                        heap.push(Reverse((age, aid)));
167
132
                        true // retain
168
                    }
169
                    PStatus::NoData => {
170
8
                        true // retain
171
                    }
172
                    PStatus::TearDown => {
173
4
                        precord.auto_release(&mut state.global);
174
4
                        false // remove
175
                    }
176
                }
177
210
            });
178
132
        }
179

            
180
32
        Some(Reclaiming {
181
32
            heap,
182
32
            enabled: state.enabled,
183
32
        })
184
72
    }
185

            
186
    /// If we're reclaiming, choose the next victim(s) to reclaim
187
    ///
188
    /// This is the account whose participant has the oldest data age,
189
    /// and all of that account's children.
190
    ///
191
    /// We might discover that we didn't want to continue reclamation after all:
192
    /// this function is responsible for checking our progress
193
    /// against the low water mark.
194
    ///
195
    /// If reclamation should stop, this function logs, and returns `None`.
196
84
    fn choose_victims(&mut self, state: &mut State) -> Result<Option<Vec<Victim>>, ReclaimCrashed> {
197
100
        let stop = |state: &mut State, outcome| {
198
32
            info!(
199
                "memory tracking reclamation reached: {} (target {}): {}",
200
                *state.total_used, state.config.low_water, outcome,
201
            );
202
32
            Ok(None)
203
32
        };
204

            
205
84
        if *state.total_used <= state.config.low_water {
206
32
            return stop(state, Outcome::TargetReached);
207
52
        }
208
52
        let Some(Reverse((_, oldest_aid))) = self.heap.pop() else {
209
            // All our remaining participants are NoData.
210
            let n_particips: usize = state
211
                .accounts
212
                .values()
213
                .map(|ar| {
214
                    ar.ps
215
                        .values()
216
                        .map(
217
                            |pr| *pr.refcount as NumParticips, // refcount is u32, so this is fine
218
                        )
219
                        .sum::<NumParticips>()
220
                })
221
                .sum::<NumParticips>();
222

            
223
            if state
224
                .total_used
225
                .as_raw()
226
                .as_usize()
227
                .checked_div(n_particips)
228
                .is_some_and(|total_used| total_used < usize::from(MAX_CACHE))
229
            {
230
                // On 32-bit, this could happen due to the cache, if we have
231
                // 2^32 / MAX_CACHE participants.
232
                return stop(state, Outcome::GoodEnough { n_particips });
233
            }
234

            
235
            // Oh dear.
236
            return Err(internal!(
237
                "memory accounting state corrupted: used={} n_particips={} all NoData",
238
                *state.total_used,
239
                n_particips,
240
            )
241
            .into());
242
        };
243

            
244
        // When we do partial reclamation, rather than just Collapsing:
245
        //
246
        // fudge next_oldest by something to do with number of loop iterations,
247
        // to avoid one-allocation-each-time ping pong between multiple caches
248
        //
249
        // (this match statement will fail to compile when we add a non-Collapsing variant)
250
        //
251
        // let next_oldest = heap.peek_lowest();
252
52
        match None {
253
52
            None | Some(Reclaimed::Collapsing) => {}
254
52
        }
255
52

            
256
52
        let victim_aids = state.get_aid_and_children_recursively(oldest_aid);
257

            
258
52
        let victims: Vec<Victim> = {
259
52
            let mut particips = vec![];
260
108
            for aid in victim_aids {
261
56
                let Some(arecord) = state.accounts.get_mut(aid) else {
262
                    // shouldn't happen but no need to panic
263
                    continue;
264
                };
265
96
                arecord.ps.retain(|_pid, precord| {
266
68
                    let Some(particip) = precord.particip.upgrade() else {
267
                        // tear this down!
268
                        precord.auto_release(&mut state.global);
269
                        return false;
270
                    };
271
68
                    particips.push((aid, particip));
272
68
                    true
273
96
                });
274
56
            }
275
52
            particips
276
52
        };
277
52

            
278
52
        Ok(Some(victims))
279
84
    }
280

            
281
    /// Notify the chosen victims and obtain their responses
282
    ///
283
    /// This is the async part, and is done with the state unlocked.
284
    // Doesn't actually need `self`, only `victims`, but we take it for form's sake
285
78
    async fn notify_victims(&mut self, victims: Vec<Victim>) -> VictimResponses {
286
52
        let enabled = self.enabled;
287
52

            
288
52
        futures::future::join_all(
289
52
            //
290
68
            victims.into_iter().map(|(aid, particip)| async move {
291
68
                let particip = particip.promise_dropping_is_ok();
292
                // We run the `.reclaim()` calls within the same task (since that's what
293
                // `join_all` does).  So they all run on whatever executor thread is polling
294
                // the reclamation task.
295
68
                let reclaimed = AssertUnwindSafe(particip.reclaim(enabled))
296
68
                    .catch_unwind()
297
68
                    .await
298
68
                    .map_err(|_panicked| VictimPanicked);
299
68
                // We drop the `ProtectedArc<dyn IsParticipant>` here, which is OK
300
68
                // because we don't hold the lock.  Since drop isn't async, and
301
68
                // `join_all` doesn't spawn tasks, we drop them sequentially.
302
68
                (aid, reclaimed)
303
68
            }),
304
52
        )
305
52
        .await
306
52
    }
307

            
308
    /// Process the victim's responses and update `state` accordingly
309
    // Doesn't actually need `self`, only `state`, but we take it for form's sake
310
52
    fn handle_victim_responses(&mut self, state: &mut State, responses: VictimResponses) {
311
120
        for (aid, reclaimed) in responses {
312
68
            match reclaimed {
313
                Ok(Reclaimed::Collapsing) | Err(VictimPanicked) => {
314
68
                    let Some(mut arecord) = state.accounts.remove(aid) else {
315
                        // Account is gone, fair enough
316
12
                        continue;
317
                    };
318
56
                    arecord.auto_release(&mut state.global);
319
                    // Account is definitely gone now
320
                }
321
            }
322
        }
323
52
    }
324
}
325

            
326
//========== the reclamation task, in terms of the pieces ==========-
327

            
328
/// Return value from the task, when it finishes due to the tracker being shut down
329
struct TaskFinished;
330

            
331
/// Reclaim memory until we reach low water, if necessary
332
///
333
/// Looks to see if we're above `config.max`.
334
/// If so, constructs a list of victims, and starts reclaiming from them,
335
/// until we reach low water.
336
72
async fn inner_loop(
337
72
    tracker: &Arc<MemoryQuotaTracker>,
338
72
    _enabled: EnabledToken,
339
108
) -> Result<(), ReclaimCrashed> {
340
    let mut reclaiming;
341
    let mut victims;
342
    {
343
72
        let mut state_guard = GuardWithDeferredDrop::new(tracker.lock()?.enabled_or_bug()?);
344

            
345
72
        let Some(r) = Reclaiming::maybe_start(&mut state_guard) else {
346
40
            return Ok(());
347
        };
348
32
        reclaiming = r;
349

            
350
        // Duplicating this call to reclaiming.choose_victims means we don't
351
        // release the lock between `maybe_start` and `choose_victims` (here)
352
        // and between `handle_victim_responses` and `choose_victims` (bellw).
353
        // (Releasing the lock would not be a bug, but it's not desirable.)
354
32
        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
355
4
            return Ok(());
356
        };
357
28
        victims = v;
358
    }
359

            
360
    loop {
361
52
        let responses = reclaiming.notify_victims(mem::take(&mut victims)).await;
362
52
        let mut state_guard = tracker.lock()?.enabled_or_bug()?;
363
52
        reclaiming.handle_victim_responses(&mut state_guard, responses);
364
52
        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
365
28
            return Ok(());
366
        };
367
24
        victims = v;
368
    }
369
72
}
370

            
371
/// Internal long-running task, handling reclamation - main loop
372
///
373
/// Handles routine logging, but not termination
374
44
async fn task_loop(
375
44
    tracker: &Weak<MemoryQuotaTracker>,
376
44
    mut wakeup: mpsc::Receiver<()>,
377
44
    enabled: EnabledToken,
378
66
) -> Result<TaskFinished, ReclaimCrashed> {
379
    loop {
380
        // We don't hold a strong reference while we loop around, so we detect
381
        // last drop of an actual client handle.
382
        {
383
76
            let Some(tracker) = tracker.upgrade() else {
384
4
                return Ok(TaskFinished);
385
            };
386

            
387
72
            inner_loop(&tracker, enabled).await?;
388
        }
389

            
390
72
        let Some(()) = wakeup.next().await else {
391
            // Sender dropped
392
40
            return Ok(TaskFinished);
393
        };
394
    }
395
44
}
396

            
397
/// Internal long-running task, handling reclamation
398
///
399
/// This is the entrypoint used by the rest of the `tracker`.
400
/// It handles logging of crashes.
401
44
pub(super) async fn task(
402
44
    tracker: Weak<MemoryQuotaTracker>,
403
44
    wakeup: mpsc::Receiver<()>,
404
44
    enabled: EnabledToken,
405
66
) {
406
44
    match task_loop(&tracker, wakeup, enabled).await {
407
44
        Ok(TaskFinished) => {}
408
        Err(bug) => {
409
            let _: Option<()> = (|| {
410
                let tracker = tracker.upgrade()?;
411
                let mut state = tracker.state.as_enabled()?.lock().ok()?;
412
                state.total_used.set_poisoned();
413
                Some(())
414
            })();
415
            error_report!(bug, "memory tracker task failed");
416
        }
417
    }
418
44
}