1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#![warn(
missing_docs,
nonstandard_style,
rust_2021_compatibility,
rust_2018_idioms,
clippy::unnested_or_patterns,
clippy::redundant_closure_for_method_calls
)]
pub mod connector;
mod error;
mod jrpc;
mod offchain;
mod onchain;
pub mod pipeline;
pub mod transformer;
pub use error::{FatalErr, HttpRrErr, JsonRpcErr};
pub use jrpc::Request as JrpcRequest;
pub use offchain::{listen_on, listen_on_addr, Offchain};
pub use onchain::{conn_pool, jrpc_client, rr_client, ConnPool};
use futures::{Sink, Stream};
pub trait Pair<S, R = S, E = JsonRpcErr>:
Stream<Item = Result<R, E>> + Sink<Result<S, E>, Error = FatalErr> + Send
{
}
impl<P, S, R, E> Pair<S, R, E> for P where
P: Stream<Item = Result<R, E>> + Sink<Result<S, E>, Error = FatalErr> + Send
{
}
#[macro_export]
macro_rules! listen_all {
{ $(($id: ident, $port: expr, $call: expr)),+ } => {
paste::paste! {
$(
let mut [<$id _clients>] = $crate::listen_on($port);
tracing::info!("Listening on port {} for pipeline {}", $port, stringify!($call));
)+
loop {
tokio::select! {
$(
[<$id _client>] = [<$id _clients>].next() => {
match [<$id _client>] {
None => {
tracing::debug!("{} exited", stringify!($id));
break
}
Some(c) => $call(c),
};
}
)+
}
}
}
}
}