1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::future::{ready, Ready};
use tokio::task::JoinHandle;
use crate::Offchain;
mod ava;
mod dump;
mod echo;
mod eth;
mod pass;
pub use ava::ava;
pub use dump::dump;
pub use echo::echo;
pub use eth::{eth, eth_pair};
pub use pass::{pass, pass_pair};
pub fn drop_join_handle<R>(
f: impl Fn(Offchain) -> JoinHandle<R>,
) -> impl Fn(Offchain) -> Ready<()> {
move |off| {
drop(f(off));
ready(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::jrpc::{no_response, Id, IdReq, Request};
use crate::{jrpc_client, listen_on_addr, JsonRpcErr};
use futures::{FutureExt, SinkExt, StreamExt};
use rstest::rstest;
use serde_json::{json, Value};
use std::net::SocketAddr;
#[rstest]
#[case::http_http(false, false)]
#[case::http_ws(false, true)]
#[case::ws_http(true, false)]
#[case::ws_ws(true, true)]
#[tokio::test]
async fn test_pass_echo(#[case] ws_echo: bool, #[case] ws_client: bool) {
let listen_addr = SocketAddr::from(([127, 0, 0, 1], 0));
let (echo_clients, echo_addr) = listen_on_addr(listen_addr);
let call_echo = drop_join_handle(echo);
let echo_uri: hyper::Uri = if ws_echo {
format!("ws://127.0.0.1:{}", echo_addr.port())
} else {
format!("http://127.0.0.1:{}", echo_addr.port())
}
.try_into()
.unwrap();
let (pass_clients, pass_addr) = listen_on_addr(listen_addr);
let call_pass = drop_join_handle(move |c| pass(c, echo_uri.clone()));
tokio::spawn(async move {
let echo_fut = echo_clients.for_each_concurrent(None, call_echo);
let pass_fut = pass_clients.for_each_concurrent(None, call_pass);
tokio::join!(echo_fut, pass_fut)
});
let uri = if ws_client {
format!("ws://127.0.0.1:{}", pass_addr.port())
} else {
format!("http://127.0.0.1:{}", pass_addr.port())
}
.parse()
.unwrap();
let mut client = jrpc_client(uri, None).await.unwrap();
let err = no_response(Some(Value::Null));
client.send(Err(err)).await.unwrap();
assert!(matches!(
client.next().await,
Some(Err(JsonRpcErr::Jrpc(_)))
));
assert!(client.next().now_or_never().is_none());
let req = Request::with_params(Id::from(4), "test_1", Some(json!([1, 2, 3])));
client.send(Ok(req.clone())).await.unwrap();
let resp = Request::try_from(client.next().await.unwrap().unwrap()).unwrap();
assert_eq!(req, resp);
assert!(client.next().now_or_never().is_none());
let req = Request::with_params(IdReq::Notification, "test_2", Some(json!("asdf")));
client.send(Ok(req)).await.unwrap();
if !ws_client {
if let JsonRpcErr::Jrpc(resp) = client.next().await.unwrap().unwrap_err() {
assert_eq!(
(resp.id, resp.error.code),
(IdReq::Notification, (-32004).into())
);
} else {
unreachable!("error was not Jrpc");
}
}
assert!(client.next().now_or_never().is_none());
}
}