1
//! `TotalQtyNotifier`
2
//!
3
//! This newtype assures that we wake up the reclamation task when nceessary
4

            
5
use super::*;
6

            
7
/// Wrapper for `TotalQty`
8
#[derive(Deref, Debug)]
9
pub(super) struct TotalQtyNotifier {
10
    /// Total memory usage
11
    ///
12
    /// Invariant: equal to
13
    /// ```text
14
    ///    Σ        Σ         PRecord.used
15
    ///     ARecord  PRecord
16
    /// ```
17
    #[deref]
18
    total_used: TotalQty,
19

            
20
    /// Condvar to wake up the reclamation task
21
    ///
22
    /// The reclamation task has another clone of this
23
    reclamation_task_wakeup: mpsc::Sender<()>,
24
}
25

            
26
impl TotalQtyNotifier {
27
    /// Make a new `TotalQtyNotifier`, which will notify a specified condvar
28
44
    pub(super) fn new_zero(reclamation_task_wakeup: mpsc::Sender<()>) -> Self {
29
44
        TotalQtyNotifier {
30
44
            total_used: TotalQty::ZERO,
31
44
            reclamation_task_wakeup,
32
44
        }
33
44
    }
34

            
35
    /// Record that some memory has been (or will be) allocated by a participant
36
    ///
37
    /// Signals the wakeup task if we need to.
38
23332
    pub(super) fn claim(
39
23332
        &mut self,
40
23332
        precord: &mut PRecord,
41
23332
        want: Qty,
42
23332
        config: &ConfigInner,
43
23332
    ) -> crate::Result<ClaimedQty> {
44
23332
        let got = self
45
23332
            .total_used
46
23332
            .claim(&mut precord.used, want)
47
23332
            .ok_or_else(|| internal!("integer overflow attempting to add claim {}", want))?;
48
23332
        self.maybe_wakeup(config);
49
23332
        Ok(got)
50
23332
    }
51

            
52
    /// Check to see if we need to wake up the reclamation task, and if so, do so
53
23332
    pub(super) fn maybe_wakeup(&mut self, config: &ConfigInner) {
54
23332
        if self.total_used > config.max {
55
32
            match self.reclamation_task_wakeup.try_send(()) {
56
32
                Ok(()) => {}
57
                Err(e) if e.is_full() => {}
58
                // reactor shutting down, having dropped reclamation task?
59
                Err(e) => debug!("could not notify reclamation task: {e}"),
60
            };
61
23300
        }
62
23332
    }
63

            
64
    /// Declare this poisoned, and prevent further claims
65
    pub(super) fn set_poisoned(&mut self) {
66
        self.total_used.set_poisoned();
67
    }
68

            
69
    /// Record that some memory has been (or will be) freed by a participant
70
19968
    pub(super) fn release(&mut self, precord: &mut PRecord, have: ClaimedQty) // infallible
71
19968
    {
72
19968
        // TODO if the participant's usage underflows, tell it to reclaim
73
19968
        // (and log some kind of internal error)
74
19968
        self.total_used.release(&mut precord.used, have);
75
19968
    }
76
}