1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//! A type-conversion transformer

use futures::future::ready;
use futures::{Future, FutureExt, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::to_string;

use crate::Pair;

/// Apply a pair of synchronous conversions to a Pair, returning another Pair
///
/// Any errors in conversion will be returned from the Stream / sent into the Sink
pub fn convert<Si: Send, Ri, Ei: Send, So, Ro, Eo>(
    pair: impl Pair<Si, Ri, Ei>,
    fr: impl Fn(Result<Ri, Ei>) -> Result<Ro, Eo> + Send,
    fs: impl Fn(Result<So, Eo>) -> Result<Si, Ei> + Send,
) -> impl Pair<So, Ro, Eo> {
    pair.map(fr).with(move |x| ready(Ok(fs(x))))
}

/// Apply a pair of future-returning conversions to a Pair, returning another Pair
///
/// Any errors in conversion will be returned from the Stream / sent into the Sink
pub fn convert_fut<Si, Ri, Ei, So, Ro, Eo, Ffr, Ffs>(
    pair: impl Pair<Si, Ri, Ei>,
    fr: impl Fn(Result<Ri, Ei>) -> Ffr + Send,
    fs: impl Fn(Result<So, Eo>) -> Ffs + Send,
) -> impl Pair<So, Ro, Eo>
where
    Ffr: Future<Output = Result<Ro, Eo>> + Send,
    Ffs: Future<Output = Result<Si, Ei>> + Send,
{
    pair.then(fr).with(move |x| fs(x).map(Ok))
}

fn to_json<T, E>(s: Result<String, E>) -> Result<T, E>
where
    T: for<'a> Deserialize<'a> + Send,
    serde_json::Error: Into<E>,
{
    serde_json::from_str(&s?).map_err(Into::into)
}

fn from_json<T, E>(v: Result<T, E>) -> Result<String, E>
where
    T: Serialize + Send,
    serde_json::Error: Into<E>,
{
    to_string(&v?).map_err(Into::into)
}

/// Parses a stream of JSON strings into any type T which implements deserialize
pub fn json<Ti, To, Ei>(pair: impl Pair<String, String, Ei>) -> impl Pair<Ti, To, Ei>
where
    To: for<'a> Deserialize<'a> + Send,
    Ti: Serialize + Send,
    Ei: Send,
    serde_json::Error: Into<Ei>,
{
    convert(pair, to_json, from_json)
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::{connector::passthru_pair, JsonRpcErr};

    use serde_json::{json, Value};

    #[tokio::test]
    async fn test_json() {
        let (mut spair, jpair) = passthru_pair::<String, String, JsonRpcErr>();
        let mut jpair = json::<Value, Value, JsonRpcErr>(jpair);

        // do send before (not concurrently with) receive -- this should be possible!
        spair.send(Ok("\"asdf\"".to_owned())).await.unwrap();
        assert_eq!(jpair.next().await.transpose().unwrap(), Some(json!("asdf")));

        // do send before (not concurrently with) receive -- this should be possible!
        jpair.send(Ok(json!([1, 2, 3]))).await.unwrap();
        assert_eq!(
            spair.next().await.transpose().unwrap(),
            Some("[1,2,3]".to_owned())
        );
    }

    #[tokio::test]
    async fn test_json_fut() {
        let (mut spair, jpair) = passthru_pair();
        let to_json_fut = move |x| async { to_json::<Value, JsonRpcErr>(x) };
        let from_json_fut = move |x| async { from_json::<Value, JsonRpcErr>(x) };
        let mut jpair = Box::pin(convert_fut(jpair, to_json_fut, from_json_fut));

        // do send before (not concurrently with) receive -- this should be possible!
        spair.send(Ok("\"asdf\"".to_owned())).await.unwrap();
        assert_eq!(jpair.next().await.transpose().unwrap(), Some(json!("asdf")));

        // do send before (not concurrently with) receive -- this should be possible!
        jpair.send(Ok(json!([1, 2, 3]))).await.unwrap();
        assert_eq!(
            spair.next().await.transpose().unwrap(),
            Some("[1,2,3]".to_owned())
        );
    }
}