1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#![warn(
    missing_docs,
    nonstandard_style,
    rust_2021_compatibility,
    rust_2018_idioms,
    clippy::unnested_or_patterns,
    clippy::redundant_closure_for_method_calls
)]

//! rpcmux: framework for muxing TCP connections

pub mod connector;
mod error;
mod jrpc;
mod offchain;
mod onchain;
pub mod pipeline;
pub mod transformer;

pub use error::{FatalErr, HttpRrErr, JsonRpcErr};
pub use jrpc::Request as JrpcRequest;
pub use offchain::{listen_on, listen_on_addr, Offchain};
pub use onchain::{conn_pool, jrpc_client, rr_client, ConnPool};

use futures::{Sink, Stream};

/// A Sink/Stream pair in an RPC muxing pipeline
pub trait Pair<S, R = S, E = JsonRpcErr>:
    Stream<Item = Result<R, E>> + Sink<Result<S, E>, Error = FatalErr> + Send
{
}

impl<P, S, R, E> Pair<S, R, E> for P where
    P: Stream<Item = Result<R, E>> + Sink<Result<S, E>, Error = FatalErr> + Send
{
}

/// Set up one or more listeners and handle all of them in a loop
#[macro_export]
macro_rules! listen_all {
    { $(($id: ident, $port: expr, $call: expr)),+ } => {
        paste::paste! {
            $(
                let mut [<$id _clients>] = $crate::listen_on($port);
                tracing::info!("Listening on port {} for pipeline {}", $port, stringify!($call));
            )+

            loop {
                tokio::select! {
                    $(
                        [<$id _client>] = [<$id _clients>].next() => {
                            match [<$id _client>] {
                                None => {
                                    tracing::debug!("{} exited", stringify!($id));
                                    break
                                }
                                Some(c) => $call(c),
                            };
                        }
                    )+
                }
            }
        }
    }
}