1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*! Library-wide error types
 *
 *  Each "pipeline type" (i.e., JsonRpc, HttpRr) has a corresponding error
 *  type. The currently defined error types for pipelines are:
 *
 *  - JsonRpcErr represents an error processing a request in a pipeline that
 *    carries JSON-RPC values. This is the error type that a Pair<S, R>
 *    carries by default.
 *
 *  - HttpRrErr represents an error processing a request in a pipeline that
 *    carries HTTP Request/Response values.
 *
 *  In addition, there's an over-arching error type:
 *
 *  - FatalErr represents an unrecoverable error. Pair<S, R>'s Sink produces this
 *    error if something goes wrong when writing data to the sink, for example.
 *
 *  Because of the behavior of several combinators in StreamExt and SinkExt
 *  (notably, StreamExt::forward), it is important that we **do not implement**
 *  automatic JsonRpcErr -> FatalErr or FatalErr -> JsonRpcErr conversions (and likewise
 *  for OffchainErr and any future errors of this sort). The issue is that
 *  StreamExt::forward assumes a Stream<Result<T, E>> will be writing into
 *  a Sink<T, Error = E>, so if you accidentally supply a Sink<T, Error = E>,
 *  StreamExt::forward will eat errors rather than passing them downstream.
 *  A Sink with Error != E will cause a type error in this case.
 *
 *  Notes:
 *
 *  1. If you want to write from a Stream<Item = Result<T, E>> into a
 *  Sink<Result<T, E>> using StreamExt::forward, the right way to do it is
 *  `stream.map(Ok).forward(sink)`. This ensures that the Result value, not
 *  just the T values, go into the sink.
 *
 *  2.  JsonRpcErr and HttpRrErr can carry a FatalErr along. This is
 *  to make it possible for Stream adaptors to generate fatal errors in the
 *  pipeline. The idea is that the FatalErr gets carried along until it hits
 *  `Offchain` (or possibly other elements in a pipeline), at which point the
 *  pipeline gets killed.
 */

use ethers::signers::WalletError;
use futures::channel::mpsc;
use hyper::{http::StatusCode, Response};
use serde_json::Error as JsonParseError;
use std::sync::Arc;
use thiserror::Error;
use tokio_tungstenite::tungstenite::Error as WsError;
use url::ParseError;

use crate::{
    jrpc::{parse_error, Error as JrpcError},
    offchain::{json_string, response},
};

// implement From for something that we stuff into an Arc
macro_rules! into_arc_variant {
    ($e: ty, $t: ty, $v: ident) => {
        impl From<$t> for $e {
            fn from(other: $t) -> Self {
                Self::$v(Arc::new(other))
            }
        }
    };
}

/// Failure of an entire pipeline.
#[derive(Clone, Debug, Error)]
pub enum FatalErr {
    /// Failed to write to mpsc channel
    #[error("Writing to mpsc channel: {0}")]
    Channel(#[from] mpsc::SendError),

    /// Failed to write to a WebSocket channel
    #[error("Writing to websocket: {0}")]
    WebSocket(#[from] Arc<WsError>),

    /// Failed to write to a WebSocket channel
    #[error("Parsing URL: {0}")]
    InvalidURL(#[from] ParseError),

    /// Unsupported Onchain URI scheme
    #[error("Unsupported URI scheme: {0:?}")]
    UriScheme(Option<String>),

    /// Stream closed unexpectedly
    #[error("Stream closed unexpectedly")]
    Closed,

    /// Error creating wallet
    #[error("Creating wallet: {0:?}")]
    Wallet(#[from] Arc<WalletError>),

    /// Error from cubist config (e.g., reading secrets)
    #[error("Secret read error: {0}")]
    ReadSecretError(String),
}

into_arc_variant!(FatalErr, WsError, WebSocket);
into_arc_variant!(FatalErr, WalletError, Wallet);

/// An error message for a JSON-RPC processing pipeline.
/// This is either a wrapper around `serde_json::Value`
/// or a fatal error that should kill the pipeline.
///
/// The intent is that elements in a processing pipeline
/// should generate descriptive error messages and wrap
/// them in JsonRpcErr.
#[derive(Clone, Debug, Error)]
pub enum JsonRpcErr {
    /// JSON-RPC error response
    #[error("JSON-RPC error: {0}")]
    Jrpc(#[from] JrpcError),

    /// A fatal error that kills the pipeline
    // NOTE: do not implement From<FatalErr>! Explicit, manual conversions only.
    #[error("Fatal error: {0}")]
    Fatal(FatalErr),
}

impl From<JsonParseError> for JsonRpcErr {
    fn from(other: JsonParseError) -> Self {
        parse_error(other.to_string())
    }
}

/// An error message for an HTTP Request/Response pipeline.
/// This is either a wrapper around a Result or a fatal error
/// that should kill the pipeline.
///
/// As with JsonRpcErr, the idea is that individual elements of
/// the pipeline should generate detailed error messages and then
/// pass them along the pipeline wrapped in HttpRrErr.
#[derive(Debug, Error)]
pub enum HttpRrErr {
    /// HTTP error Response
    #[error("HTTP error response: {0:?}")]
    Http(Response<String>),

    /// A fatal error that kills the pipeline
    // NOTE: do not implement From<FatalErr>! Explicit, manual conversions only.
    #[error("Fatal error: {0}")]
    Fatal(FatalErr),
}

impl From<JsonParseError> for HttpRrErr {
    fn from(other: JsonParseError) -> Self {
        response(StatusCode::BAD_REQUEST, other.to_string()).into()
    }
}

impl From<Response<String>> for HttpRrErr {
    fn from(other: Response<String>) -> Self {
        Self::Http(other)
    }
}

impl From<JsonRpcErr> for HttpRrErr {
    fn from(other: JsonRpcErr) -> Self {
        match other {
            JsonRpcErr::Fatal(f) => Self::Fatal(f),
            JsonRpcErr::Jrpc(v) => {
                if v.id.is_notification() {
                    Self::Http(response(StatusCode::NO_CONTENT, "".to_owned()))
                } else {
                    Self::Http(response(StatusCode::OK, json_string(&v.into())))
                }
            }
        }
    }
}