Skip to main content

zebra_rpc/
server.rs

1//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
2//!
3//! This endpoint is compatible with clients that incorrectly send
4//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
5//! such as `lightwalletd`.
6//!
7//! See the full list of
8//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
9
10use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network};
18use zebra_consensus::router::service_trait::BlockVerifierService;
19use zebra_network::AddressBookPeers;
20use zebra_node_services::mempool::MempoolService;
21use zebra_state::{ReadState as ReadStateService, State as StateService};
22
23use crate::{
24    config,
25    methods::{RpcImpl, RpcServer as _},
26    server::{
27        http_request_compatibility::HttpRequestMiddlewareLayer,
28        rpc_call_compatibility::FixRpcResponseMiddleware, rpc_metrics::RpcMetricsMiddleware,
29        rpc_tracing::RpcTracingMiddleware,
30    },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37pub mod rpc_metrics;
38pub mod rpc_tracing;
39
40#[cfg(test)]
41mod tests;
42
43/// Zebra RPC Server
44#[derive(Clone)]
45pub struct RpcServer {
46    /// The RPC config.
47    config: config::rpc::Config,
48
49    /// The configured network.
50    network: Network,
51
52    /// Zebra's application version, with build metadata.
53    build_version: String,
54
55    /// A server handle used to shuts down the RPC server.
56    close_handle: ServerHandle,
57}
58
59impl fmt::Debug for RpcServer {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_struct("RpcServer")
62            .field("config", &self.config)
63            .field("network", &self.network)
64            .field("build_version", &self.build_version)
65            .field(
66                "close_handle",
67                // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
68                &"ServerHandle",
69            )
70            .finish()
71    }
72}
73
74/// The message to log when logging the RPC server's listen address
75pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
76
77type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
78
79impl RpcServer {
80    /// Starts the RPC server.
81    ///
82    /// `build_version` and `user_agent` are version strings for the application,
83    /// which are used in RPC responses.
84    ///
85    /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
86    /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
87    ///
88    /// # Panics
89    ///
90    /// - If [`Config::listen_addr`](config::rpc::Config::listen_addr) is `None`.
91    //
92    // TODO:
93    // - replace VersionString with semver::Version, and update the tests to provide valid versions
94    #[allow(clippy::too_many_arguments)]
95    pub async fn start<
96        Mempool,
97        State,
98        ReadState,
99        Tip,
100        BlockVerifierRouter,
101        SyncStatus,
102        AddressBook,
103    >(
104        rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
105        conf: config::rpc::Config,
106    ) -> Result<ServerTask, tower::BoxError>
107    where
108        Mempool: MempoolService,
109        State: StateService,
110        ReadState: ReadStateService,
111        Tip: ChainTip + Clone + Send + Sync + 'static,
112        AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
113        BlockVerifierRouter: BlockVerifierService,
114        SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
115    {
116        let listen_addr = conf
117            .listen_addr
118            .expect("caller should make sure listen_addr is set");
119
120        let http_middleware_layer = if conf.enable_cookie_auth {
121            let cookie = Cookie::default();
122            cookie::write_to_disk(&cookie, &conf.cookie_dir)
123                .expect("Zebra must be able to write the auth cookie to the disk");
124            HttpRequestMiddlewareLayer::new(Some(cookie))
125        } else {
126            HttpRequestMiddlewareLayer::new(None)
127        };
128
129        let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
130
131        let rpc_middleware = RpcServiceBuilder::new()
132            .rpc_logger(1024)
133            .layer_fn(FixRpcResponseMiddleware::new)
134            .layer_fn(RpcMetricsMiddleware::new)
135            .layer_fn(RpcTracingMiddleware::new);
136
137        let server = Server::builder()
138            .http_only()
139            .set_http_middleware(http_middleware)
140            .set_rpc_middleware(rpc_middleware)
141            .max_response_body_size(
142                conf.max_response_body_size
143                    .try_into()
144                    .expect("should be valid"),
145            )
146            .build(listen_addr)
147            .await?;
148
149        info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
150
151        Ok(tokio::spawn(async move {
152            server.start(rpc.into_rpc()).stopped().await;
153            Ok(())
154        }))
155    }
156
157    /// Shut down this RPC server, blocking the current thread.
158    ///
159    /// This method can be called from within a tokio executor without panicking.
160    /// But it is blocking, so `shutdown()` should be used instead.
161    pub fn shutdown_blocking(&self) {
162        Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
163    }
164
165    /// Shut down this RPC server asynchronously.
166    /// Returns a task that completes when the server is shut down.
167    pub fn shutdown(&self) -> JoinHandle<()> {
168        let close_handle = self.close_handle.clone();
169        let config = self.config.clone();
170        let span = Span::current();
171
172        tokio::task::spawn_blocking(move || {
173            span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
174        })
175    }
176
177    /// Shuts down this RPC server using its `close_handle`.
178    ///
179    /// See `shutdown_blocking()` for details.
180    fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
181        // The server is a blocking task, so it can't run inside a tokio thread.
182        // See the note at wait_on_server.
183        let span = Span::current();
184        let wait_on_shutdown = move || {
185            span.in_scope(|| {
186                if config.enable_cookie_auth {
187                    if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
188                        warn!(
189                            ?err,
190                            "unexpectedly could not remove the rpc auth cookie from the disk"
191                        )
192                    }
193                }
194
195                info!("Stopping RPC server");
196                let _ = close_handle.stop();
197                debug!("Stopped RPC server");
198            })
199        };
200
201        let span = Span::current();
202        let thread_handle = std::thread::spawn(wait_on_shutdown);
203
204        // Propagate panics from the inner std::thread to the outer tokio blocking task
205        span.in_scope(|| match thread_handle.join() {
206            Ok(()) => (),
207            Err(panic_object) => panic::resume_unwind(panic_object),
208        })
209    }
210}
211
212impl Drop for RpcServer {
213    fn drop(&mut self) {
214        // Block on shutting down, propagating panics.
215        // This can take around 150 seconds.
216        //
217        // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
218        self.shutdown_blocking();
219    }
220}