1
//! Extension trait for `Sink`.
2

            
3
use std::{
4
    marker::PhantomData,
5
    pin::Pin,
6
    task::{Context, Poll},
7
};
8

            
9
use futures::{ready, sink::Sink};
10
use pin_project::pin_project;
11

            
12
/// Extension trait for `Sink`
13
pub trait SinkExt<Item>: Sink<Item> {
14
    /// As `Sink::with`, but takes a function that returns an `Item` rather
15
    /// than `Future<Output=Item>`.
16
    fn with_fn<F, T, E>(self, func: F) -> WithFn<Self, F, T, E>
17
    // or error?
18
    where
19
        Self: Sized,
20
        F: FnMut(T) -> Result<Item, E>,
21
        E: From<Self::Error>;
22
}
23

            
24
impl<Item, S> SinkExt<Item> for S
25
where
26
    S: Sink<Item>,
27
{
28
4
    fn with_fn<F, T, E>(self, func: F) -> WithFn<Self, F, T, E>
29
4
    where
30
4
        Self: Sized,
31
4
        F: FnMut(T) -> Result<Item, E>,
32
4
        E: From<Self::Error>,
33
4
    {
34
4
        WithFn {
35
4
            sink: self,
36
4
            func,
37
4
            _phantom: PhantomData,
38
4
        }
39
4
    }
40
}
41

            
42
/// Sink returned by [`SinkExt::with_fn`].
43
12
#[pin_project]
44
pub struct WithFn<S, F, T, E> {
45
    /// The underlying sink
46
    #[pin]
47
    sink: S,
48
    /// The user-provided function.
49
    func: F,
50
    /// Phantom data to ensure type consistency.
51
    _phantom: PhantomData<fn() -> Result<T, E>>,
52
}
53

            
54
impl<S, Item, F, T, E> Sink<T> for WithFn<S, F, T, E>
55
where
56
    S: Sink<Item>,
57
    F: FnMut(T) -> Result<Item, E>,
58
    E: From<S::Error>,
59
{
60
    type Error = E;
61

            
62
4
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63
4
        ready!(self.project().sink.poll_ready(cx))?;
64
4
        Poll::Ready(Ok(()))
65
4
    }
66

            
67
4
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68
4
        ready!(self.project().sink.poll_flush(cx))?;
69
4
        Poll::Ready(Ok(()))
70
4
    }
71

            
72
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73
        ready!(self.project().sink.poll_close(cx))?;
74
        Poll::Ready(Ok(()))
75
    }
76

            
77
4
    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
78
4
        let this = self.project();
79
4
        let item = (this.func)(item)?;
80
4
        this.sink.start_send(item).map_err(E::from)
81
4
    }
82
}