Skip to main content

zebra_rpc/
server.rs

1//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
2//!
3//! This endpoint is compatible with clients that incorrectly send
4//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
5//! such as `lightwalletd`.
6//!
7//! See the full list of
8//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
9
10use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{
18    block::MAX_BLOCK_BYTES, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip,
19    parameters::Network,
20};
21use zebra_consensus::router::service_trait::BlockVerifierService;
22use zebra_network::AddressBookPeers;
23use zebra_node_services::mempool::MempoolService;
24use zebra_state::{ReadState as ReadStateService, State as StateService};
25
26use crate::{
27    config,
28    methods::{RpcImpl, RpcServer as _},
29    server::{
30        http_request_compatibility::HttpRequestMiddlewareLayer,
31        rpc_call_compatibility::FixRpcResponseMiddleware, rpc_metrics::RpcMetricsMiddleware,
32        rpc_tracing::RpcTracingMiddleware,
33    },
34};
35
36pub mod cookie;
37pub mod error;
38pub mod http_request_compatibility;
39pub mod rpc_call_compatibility;
40pub mod rpc_metrics;
41pub mod rpc_tracing;
42
43#[cfg(test)]
44mod tests;
45
46/// Zebra RPC Server
47#[derive(Clone)]
48pub struct RpcServer {
49    /// The RPC config.
50    config: config::rpc::Config,
51
52    /// The configured network.
53    network: Network,
54
55    /// Zebra's application version, with build metadata.
56    build_version: String,
57
58    /// A server handle used to shuts down the RPC server.
59    close_handle: ServerHandle,
60}
61
62impl fmt::Debug for RpcServer {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("RpcServer")
65            .field("config", &self.config)
66            .field("network", &self.network)
67            .field("build_version", &self.build_version)
68            .field(
69                "close_handle",
70                // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
71                &"ServerHandle",
72            )
73            .finish()
74    }
75}
76
77/// The message to log when logging the RPC server's listen address
78pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
79
80type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
81
82impl RpcServer {
83    /// Starts the RPC server.
84    ///
85    /// `build_version` and `user_agent` are version strings for the application,
86    /// which are used in RPC responses.
87    ///
88    /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
89    /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
90    ///
91    /// # Panics
92    ///
93    /// - If [`Config::listen_addr`](config::rpc::Config::listen_addr) is `None`.
94    //
95    // TODO:
96    // - replace VersionString with semver::Version, and update the tests to provide valid versions
97    #[allow(clippy::too_many_arguments)]
98    pub async fn start<
99        Mempool,
100        State,
101        ReadState,
102        Tip,
103        BlockVerifierRouter,
104        SyncStatus,
105        AddressBook,
106    >(
107        rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
108        conf: config::rpc::Config,
109    ) -> Result<ServerTask, tower::BoxError>
110    where
111        Mempool: MempoolService,
112        State: StateService,
113        ReadState: ReadStateService,
114        Tip: ChainTip + Clone + Send + Sync + 'static,
115        AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
116        BlockVerifierRouter: BlockVerifierService,
117        SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
118    {
119        let listen_addr = conf
120            .listen_addr
121            .expect("caller should make sure listen_addr is set");
122
123        // The largest RPC request is submitblock, which sends a full block
124        // as a hex string (2x MAX_BLOCK_BYTES) plus a small JSON-RPC wrapper.
125        let max_request_body_size = (MAX_BLOCK_BYTES as usize) * 2 + 1024;
126
127        let http_middleware_layer = if conf.enable_cookie_auth {
128            let cookie = Cookie::default();
129            cookie::write_to_disk(&cookie, &conf.cookie_dir)
130                .expect("Zebra must be able to write the auth cookie to the disk");
131            HttpRequestMiddlewareLayer::new(Some(cookie), max_request_body_size)
132        } else {
133            HttpRequestMiddlewareLayer::new(None, max_request_body_size)
134        };
135
136        let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
137
138        let rpc_middleware = RpcServiceBuilder::new()
139            .rpc_logger(1024)
140            .layer_fn(FixRpcResponseMiddleware::new)
141            .layer_fn(RpcMetricsMiddleware::new)
142            .layer_fn(RpcTracingMiddleware::new);
143
144        let server = Server::builder()
145            .http_only()
146            .set_http_middleware(http_middleware)
147            .set_rpc_middleware(rpc_middleware)
148            .max_response_body_size(
149                conf.max_response_body_size
150                    .try_into()
151                    .expect("should be valid"),
152            )
153            .build(listen_addr)
154            .await?;
155
156        info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
157
158        Ok(tokio::spawn(async move {
159            server.start(rpc.into_rpc()).stopped().await;
160            Ok(())
161        }))
162    }
163
164    /// Shut down this RPC server, blocking the current thread.
165    ///
166    /// This method can be called from within a tokio executor without panicking.
167    /// But it is blocking, so `shutdown()` should be used instead.
168    pub fn shutdown_blocking(&self) {
169        Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
170    }
171
172    /// Shut down this RPC server asynchronously.
173    /// Returns a task that completes when the server is shut down.
174    pub fn shutdown(&self) -> JoinHandle<()> {
175        let close_handle = self.close_handle.clone();
176        let config = self.config.clone();
177        let span = Span::current();
178
179        tokio::task::spawn_blocking(move || {
180            span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
181        })
182    }
183
184    /// Shuts down this RPC server using its `close_handle`.
185    ///
186    /// See `shutdown_blocking()` for details.
187    fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
188        // The server is a blocking task, so it can't run inside a tokio thread.
189        // See the note at wait_on_server.
190        let span = Span::current();
191        let wait_on_shutdown = move || {
192            span.in_scope(|| {
193                if config.enable_cookie_auth {
194                    if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
195                        warn!(
196                            ?err,
197                            "unexpectedly could not remove the rpc auth cookie from the disk"
198                        )
199                    }
200                }
201
202                info!("Stopping RPC server");
203                let _ = close_handle.stop();
204                debug!("Stopped RPC server");
205            })
206        };
207
208        let span = Span::current();
209        let thread_handle = std::thread::spawn(wait_on_shutdown);
210
211        // Propagate panics from the inner std::thread to the outer tokio blocking task
212        span.in_scope(|| match thread_handle.join() {
213            Ok(()) => (),
214            Err(panic_object) => panic::resume_unwind(panic_object),
215        })
216    }
217}
218
219impl Drop for RpcServer {
220    fn drop(&mut self) {
221        // Block on shutting down, propagating panics.
222        // This can take around 150 seconds.
223        //
224        // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
225        self.shutdown_blocking();
226    }
227}