1
//! Iterator extension for splitting into batches, each introduced by a batch-starting item
2
//!
3
//! See
4
//! [`IteratorExt::batching_split_before_loose`] and
5
//! [`IteratorExt::batching_split_before_with_header`].
6
//!
7
//! # **UNSTABLE**
8
//!
9
//! This whole module is UNSTABLE and not part of the semver guarantees.
10
//! You'll only see it if you ran rustdoc with `--document-private-items`.
11
// This is achieved with `#[doc(hidden)]` on the top-level module reexport
12
// in `lib.rs`, which is the only place all of this isactually exposed.
13

            
14
use std::iter;
15
use std::marker::PhantomData;
16

            
17
use crate::util::PeekableIterator;
18

            
19
/// Iterator for the header, transformable into a [`Batches`] yielding subsequent batches
20
///
21
/// Returned by
22
/// [`.batching_split_before_with_header()`](IteratorExt::batching_split_before_with_header).
23
///
24
/// This type is both:
25
///  * An [`Iterator`], which returns the items in the header
26
///    (before the first batch-starting item);
27
///  * Transformable using [.subsequent()](BatchesWithHeader::subsequent)
28
///    into a [`Batches`], which yields the remainder of the input,
29
///    split into batches.
30
///
31
/// `II` is the iterator item type.  `I` is the input iterator.
32
/// `F` is the predicate for testing if an item is batch-starting.
33
pub struct BatchesWithHeader<II, I, F> {
34
    /// Input
35
    input: Input<II, I, F>,
36
}
37

            
38
/// Input, shared by our public structs
39
struct Input<II, I, F> {
40
    /// The input iterator
41
    unfiltered: I,
42
    /// Callback to test if this is batch-starting
43
    batch_starting: F,
44
    /// We're like a function that yields II
45
    marker: PhantomData<fn() -> II>,
46
}
47

            
48
/// An iterator-like object yielding an iterator for each batch.
49
///
50
/// Each call to [`.next_batch()`](Batches::next_batch)
51
/// yields an iterator for one subsequent batch,
52
/// which in turn will yield the individual items.
53
///
54
/// `Batches` is not an [`Iterator`] because
55
/// its returned item type (the sub-iterator)
56
/// borrows it mutably.
57
///
58
/// `II` is the iterator item type.  `I` is the input iterator.
59
/// `F` is the predicate for testing if an item is batch-starting.
60
pub struct Batches<II, I, F> {
61
    /// Input
62
    input: Input<II, I, F>,
63
    /// Should we avoid draining the end of the previous batch
64
    no_drain: Option<NoDrainToken>,
65
    /// Should we yield even (one) batch-starting item
66
    yield_one: Option<EvenYieldOneBatchStarting>,
67
}
68

            
69
/// Token stored (or not) in the state to indicate not to drain the previous batch
70
///
71
/// (We use `Option<NoDrainToken>` rather than `bool` because
72
/// booleans can be very confusing, and because
73
/// `Option` has good ergonomics with [`.take()`](Option::take) and `?`.)
74
struct NoDrainToken;
75

            
76
/// Token stored (or not) in the state to indicate to yield even a batch-starting item
77
///
78
/// (We use `Option<NoDrainToken>` rather than `bool` because
79
/// booleans can be very confusing, and because
80
/// `Option` has good ergonomics with [`.take()`](Option::take) and `?`.)
81
struct EvenYieldOneBatchStarting;
82

            
83
/// Iterator to yield the members of a batch.
84
///
85
/// This is the iterator returned by
86
/// [`.next_batch()`](Batches::next_batch).
87
///
88
/// `II` is the iterator item type.  `I` is the input iterator.
89
/// `F` is the predicate for testing if an item is batch-starting.
90
pub struct Batch<'p, II, I, F> {
91
    /// The parent, with all the actual state etc.
92
    ///
93
    /// It is less confusing to keep all the state in the parent iterator.
94
    parent: &'p mut Batches<II, I, F>,
95
}
96

            
97
impl<II, I, F> Input<II, I, F>
98
where
99
    I: Iterator<Item = II> + PeekableIterator,
100
    F: FnMut(&II) -> bool,
101
{
102
    /// Yield the next item - unless it is batch-starting.
103
7666
    fn next_non_starting(&mut self) -> Option<II> {
104
7666
        let item = self.unfiltered.peek()?;
105
6948
        if (self.batch_starting)(item) {
106
2357
            return None;
107
4591
        };
108
4591
        self.unfiltered.next()
109
7666
    }
110
}
111

            
112
impl<II, I, F> Iterator for BatchesWithHeader<II, I, F>
113
where
114
    I: Iterator<Item = II> + PeekableIterator,
115
    F: FnMut(&II) -> bool,
116
{
117
    type Item = II;
118

            
119
1109
    fn next(&mut self) -> Option<II> {
120
1109
        self.input.next_non_starting()
121
1109
    }
122
}
123

            
124
impl<II, I, F> BatchesWithHeader<II, I, F>
125
where
126
    I: Iterator<Item = II> + PeekableIterator,
127
    F: FnMut(&II) -> bool,
128
{
129
    /// Proceed from the header to the subsequent batches
130
    ///
131
    /// Any un-yielded items remaining in the header will be discarded.
132
319
    pub fn subsequent(mut self) -> Batches<II, I, F> {
133
319
        // Discard any un-yielded contents of the header
134
319
        let _ = self.by_ref().count();
135
319

            
136
319
        Batches {
137
319
            input: self.input,
138
319
            yield_one: None,
139
319
            no_drain: None,
140
319
        }
141
319
    }
142
}
143

            
144
impl<II, I, F> Iterator for Batch<'_, II, I, F>
145
where
146
    I: Iterator<Item = II> + PeekableIterator,
147
    F: FnMut(&II) -> bool,
148
{
149
    type Item = II;
150

            
151
7582
    fn next(&mut self) -> Option<II> {
152
7582
        if self.parent.yield_one.take().is_some() {
153
1025
            self.parent.input.unfiltered.next()
154
        } else {
155
6557
            self.parent.input.next_non_starting()
156
        }
157
7582
    }
158
}
159

            
160
impl<II, I: Iterator<Item = II> + PeekableIterator, F: FnMut(&II) -> bool> Batches<II, I, F> {
161
    /// Proceed to the next batch
162
    ///
163
    /// If the input is exhausted (ie, there is no next batch), returns `None`.
164
    ///
165
    /// Any un-yielded items remaining in the previous batch will be discarded.
166
    //
167
    // Batches is a LendingIterator - its returned item type borrows from the
168
    // iterator itself - so can't impl Iterator.
169
    // <https://rust-lang.github.io/generic-associated-types-initiative/design_patterns/iterable.html>
170
1386
    pub fn next_batch(&mut self) -> Option<Batch<'_, II, I, F>> {
171
1386
        // Drain to the end of the batch
172
1386
        if self.no_drain.take().is_none() {
173
1368
            let _ = Batch { parent: self }.count();
174
1368
        }
175
1386
        let _: &II = self.input.unfiltered.peek()?;
176
1025
        self.yield_one = Some(EvenYieldOneBatchStarting);
177
1025
        Some(Batch { parent: self })
178
1386
    }
179

            
180
    /// Map each batch
181
90
    pub fn map<T>(
182
90
        mut self,
183
90
        mut f: impl FnMut(Batch<'_, II, I, F>) -> T,
184
90
    ) -> impl Iterator<Item = T> {
185
116
        iter::from_fn(move || {
186
36
            let batch = self.next_batch()?;
187
24
            Some(f(batch))
188
116
        })
189
90
    }
190
}
191

            
192
/// **Extension trait providing `batching_split_before`**
193
pub trait IteratorExt: Iterator + Sized {
194
    /// Splits the input into a header followed by batches started according to a predicate
195
    ///
196
    /// The input is divided into:
197
    ///  * A header, containing no batch-starting items
198
    ///  * Zero or more subsequent batches, each with precisely one batch-starting item
199
    ///
200
    /// The returned value from `batching_split_before_with_header` is an iterator,
201
    /// which yields the elements in the header - before the first batch-starting item.
202
    ///
203
    /// After processing the header, call
204
    /// [`.subsequent()`](BatchesWithHeader::subsequent)
205
    /// which will return a [`Batches`],
206
    /// which is a meta-iterator-like-object which yields the subsequent batches.
207
    ///
208
    /// Each subsequent batch is then returned by calling
209
    /// [`.next_batch()`](Batches::next_batch)
210
    /// which yields a separate sub-iterator.
211
    ///
212
    /// A new batch is recognised for each input item for which `batch_starting` returns true.
213
    ///
214
    /// This method is named **with_header** because it separates out the header,
215
    /// using a typestate pattern, which is convenient for processing the header
216
    /// separately.
217
    ///
218
    /// (You will want to iterate the first batch by reference,
219
    /// so that the iteration doesn't consume the [`BatchesWithHeader`],
220
    /// which is what you will need to call `.subsequent()`.
221
    /// The API insists that you process the batches sequentially:
222
    /// you can only be processing one batch at a time.)
223
    ///
224
    /// # **UNSTABLE**
225
    ///
226
    /// This method is UNSTABLE and not part of the semver guarantees.
227
    /// You'll only see it if you ran rustdoc with `--document-private-items`.
228
    ///
229
    /// # Example
230
    ///
231
    /// ```
232
    /// use itertools::Itertools as _;
233
    /// use tor_netdoc::batching_split_before::IteratorExt as _;
234
    ///
235
    /// let mut batches = (1..10).peekable().batching_split_before_with_header(|v| v % 3 == 0);
236
    /// assert_eq!(batches.by_ref().collect_vec(), [ 1, 2 ]);
237
    ///
238
    /// let mut batches = batches.subsequent();
239
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 3, 4, 5 ]);
240
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 6, 7, 8 ]);
241
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 9 ]);
242
    /// assert!(batches.next_batch().is_none());
243
    /// ```
244
325
    fn batching_split_before_with_header<F>(
245
325
        self,
246
325
        batch_starting: F,
247
325
    ) -> BatchesWithHeader<Self::Item, Self, F>
248
325
    where
249
325
        F: FnMut(&Self::Item) -> bool,
250
325
    {
251
325
        let input = Input {
252
325
            unfiltered: self,
253
325
            batch_starting,
254
325
            marker: PhantomData,
255
325
        };
256
325
        BatchesWithHeader { input }
257
325
    }
258

            
259
    /// Splits the input into batches, with new batches started according to a predicate
260
    ///
261
    /// The input is divided into batches, just before each batch-starting item.
262
    /// The batch-starting item is included as the first item of every batch,
263
    /// except the first batch if the input starts with a non-batch-starting-item.
264
    ///
265
    /// If the input iterator is empty, there are no batches.
266
    ///
267
    /// This method is named **loose** because it neither
268
    /// insists that the iterator start with a batch-starting item,
269
    /// nor returns batches which always start with a batch-starting item.
270
    /// It is up to the caller to handle a possible first batch with no batch-starting item.
271
    ///
272
    /// Each batch is returned by calling
273
    /// [`.next_batch()`](Batches::next_batch)
274
    /// which yields a separate sub-iterator.
275
    ///
276
    /// A new batch is recognised for each input item for which `batch_start` returns true.
277
    ///
278
    /// (The API insists that you process the batches sequentially:
279
    /// you can only be processing one batch at a time.)
280
    ///
281
    /// # **UNSTABLE**
282
    ///
283
    /// This method is UNSTABLE and not part of the semver guarantees.
284
    /// You'll only see it if you ran rustdoc with `--document-private-items`.
285
    ///
286
    /// # Example
287
    ///
288
    /// ```
289
    /// use itertools::Itertools as _;
290
    /// use tor_netdoc::batching_split_before::IteratorExt as _;
291
    ///
292
    /// let mut batches = (1..10).peekable().batching_split_before_loose(|v| v % 3 == 0);
293
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 1, 2 ]);
294
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 3, 4, 5 ]);
295
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 6, 7, 8 ]);
296
    /// assert_eq!(batches.next_batch().unwrap().collect_vec(), [ 9 ]);
297
    /// assert!(batches.next_batch().is_none());
298
    /// ```
299
96
    fn batching_split_before_loose<F>(self, batch_starting: F) -> Batches<Self::Item, Self, F>
300
96
    where
301
96
        F: FnMut(&Self::Item) -> bool,
302
96
    {
303
96
        let input = Input {
304
96
            unfiltered: self,
305
96
            batch_starting,
306
96
            marker: PhantomData,
307
96
        };
308
96
        Batches {
309
96
            input,
310
96
            no_drain: Some(NoDrainToken),
311
96
            yield_one: None,
312
96
        }
313
96
    }
314
}
315
impl<I: Iterator> IteratorExt for I {}
316

            
317
#[cfg(test)]
318
mod tests {
319
    // @@ begin test lint list maintained by maint/add_warning @@
320
    #![allow(clippy::bool_assert_comparison)]
321
    #![allow(clippy::clone_on_copy)]
322
    #![allow(clippy::dbg_macro)]
323
    #![allow(clippy::mixed_attributes_style)]
324
    #![allow(clippy::print_stderr)]
325
    #![allow(clippy::print_stdout)]
326
    #![allow(clippy::single_char_pattern)]
327
    #![allow(clippy::unwrap_used)]
328
    #![allow(clippy::unchecked_duration_subtraction)]
329
    #![allow(clippy::useless_vec)]
330
    #![allow(clippy::needless_pass_by_value)]
331
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
332
    use super::*;
333
    use crate::util::*;
334
    use itertools::chain;
335
    use std::fmt::Debug;
336
    use std::iter;
337

            
338
    struct TrackingPeekable<I: Iterator>(Peekable<I>);
339
    impl<I: Iterator> Iterator for TrackingPeekable<I>
340
    where
341
        I::Item: Debug,
342
    {
343
        type Item = I::Item;
344
        fn next(&mut self) -> Option<I::Item> {
345
            let v = self.0.next();
346
            eprintln!("        iter yielded {v:?}");
347
            v
348
        }
349
    }
350
    impl<I: Iterator> PeekableIterator for TrackingPeekable<I>
351
    where
352
        I::Item: Debug,
353
    {
354
        fn peek(&mut self) -> Option<&I::Item> {
355
            let v = self.0.peek();
356
            eprintln!("        iter peeked {v:?}");
357
            v
358
        }
359
    }
360

            
361
    #[test]
362
    fn test_batching_split_before() {
363
        fn chk_exp(mut iter: impl Iterator<Item = u32>, exp: &[u32]) {
364
            eprintln!("    exp {exp:?}");
365
            for exp in exp.iter().cloned() {
366
                assert_eq!(iter.next(), Some(exp));
367
            }
368
            assert_eq!(iter.next(), None);
369
            assert_eq!(iter.next(), None);
370
            assert_eq!(iter.next(), None);
371
        }
372

            
373
        let chk_breakdown = |input: &[u32], iexp: &[u32], sexp: &[&[u32]]| {
374
            let chk_batches = |mut subseq: Batches<_, _, _>, sexp: &mut dyn Iterator<Item = _>| {
375
                loop {
376
                    match (subseq.next_batch(), sexp.next()) {
377
                        (Some(batch), Some(sexp)) => chk_exp(batch, sexp),
378
                        (None, None) => break,
379
                        (b, e) => panic!("({:?}, {e:?}", b.map(|_| ())),
380
                    }
381
                }
382
                assert!(subseq.next_batch().is_none());
383
                assert!(subseq.next_batch().is_none());
384
            };
385

            
386
            eprintln!("input {input:?}");
387
            let input = || TrackingPeekable(input.iter().cloned().peekable());
388
            let is_starting = |v: &u32| *v >= 10;
389

            
390
            {
391
                let mut header = input().batching_split_before_with_header(is_starting);
392

            
393
                chk_exp(&mut header, iexp);
394
                eprintln!("    subsequent...");
395
                let subseq = header.subsequent();
396
                let mut sexp = sexp.iter().cloned();
397
                chk_batches(subseq, &mut sexp);
398
            }
399

            
400
            {
401
                let batches = input().batching_split_before_loose(is_starting);
402

            
403
                let mut sexp =
404
                    chain!(iter::once(iexp), sexp.iter().cloned(),).filter(|s| !s.is_empty());
405

            
406
                chk_batches(batches, &mut sexp);
407
            }
408
        };
409

            
410
        chk_breakdown(&[], &[], &[]);
411

            
412
        chk_breakdown(&[10], &[], &[&[10]]);
413

            
414
        chk_breakdown(
415
            &[1, 2, 30, 4, 5, 60, 7, 8],
416
            &[1, 2],
417
            &[&[30, 4, 5], &[60, 7, 8]],
418
        );
419
    }
420
}