1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use futures::{stream::select, SinkExt, StreamExt};
use crate::{connector::passthru_pair, Pair};
pub fn copy<S, R>(pair: impl Pair<S, R> + 'static) -> (impl Pair<S, R>, impl Pair<S, R>)
where
S: Send + 'static,
R: Clone + Send + 'static,
{
let (ret_left, left) = passthru_pair();
let (ret_right, right) = passthru_pair();
tokio::spawn(async move {
let (left_snd, left_rcv) = left.split();
let (right_snd, right_rcv) = right.split();
let (pair_snd, pair_rcv) = pair.split();
let send_f = select(left_rcv, right_rcv).map(Ok).forward(pair_snd);
let rcv_f = pair_rcv.map(Ok).forward(right_snd.fanout(left_snd));
tokio::select! {
res = send_f => {
tracing::debug!("copy send_f exited with {res:?}");
}
res = rcv_f => {
tracing::debug!("copy rcv_f exited with {res:?}");
}
}
tracing::debug!("copy thread exiting");
});
(ret_left, ret_right)
}
#[cfg(test)]
mod test {
use super::*;
use crate::JsonRpcErr;
use futures::{
future::{try_join, try_join3},
FutureExt,
};
#[tokio::test]
async fn test_copy() {
let (mut testin, testout) = passthru_pair::<usize, usize, JsonRpcErr>();
let (mut testout_1, mut testout_2) = copy(testout);
testin.send(Ok(0)).await.unwrap();
assert_eq!(testout_1.next().await.transpose().unwrap(), Some(0));
assert_eq!(testout_2.next().await.transpose().unwrap(), Some(0));
let sf = testin.send(Ok(1));
let rf1 = testout_1.next().map(Ok);
let rf2 = testout_2.next().map(Ok);
let (_, recv1, recv2) = try_join3(sf, rf1, rf2).await.unwrap();
assert_eq!(recv1.transpose().unwrap(), Some(1));
assert_eq!(recv2.transpose().unwrap(), Some(1));
testout_1.send(Ok(2)).await.unwrap();
assert_eq!(testin.next().await.transpose().unwrap(), Some(2));
assert!(testout_2.next().now_or_never().is_none());
testout_2.send(Ok(3)).await.unwrap();
assert_eq!(testin.next().await.transpose().unwrap(), Some(3));
assert!(testout_1.next().now_or_never().is_none());
let sf = testout_1.send(Ok(4));
let rf = testin.next().map(Ok);
let (_, recv) = try_join(sf, rf).await.unwrap();
assert_eq!(recv.transpose().unwrap(), Some(4));
assert!(testout_2.next().now_or_never().is_none());
let sf = testout_2.send(Ok(5));
let rf = testin.next().map(Ok);
let (_, recv) = try_join(sf, rf).await.unwrap();
assert_eq!(recv.transpose().unwrap(), Some(5));
assert!(testout_1.next().now_or_never().is_none());
testout_1.send(Ok(6)).await.unwrap();
testout_2.send(Ok(7)).await.unwrap();
if let Some(x) = testin.next().await.transpose().unwrap() {
assert!(x == 6 || x == 7);
assert_eq!(testin.next().await.transpose().unwrap(), Some(13 - x));
} else {
unreachable!()
}
assert!(testin.next().now_or_never().is_none());
assert!(testout_1.next().now_or_never().is_none());
assert!(testout_2.next().now_or_never().is_none());
}
}