1use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{
18 block::MAX_BLOCK_BYTES, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip,
19 parameters::Network,
20};
21use zebra_consensus::router::service_trait::BlockVerifierService;
22use zebra_network::AddressBookPeers;
23use zebra_node_services::mempool::MempoolService;
24use zebra_state::{ReadState as ReadStateService, State as StateService};
25
26use crate::{
27 config,
28 methods::{RpcImpl, RpcServer as _},
29 server::{
30 http_request_compatibility::HttpRequestMiddlewareLayer,
31 rpc_call_compatibility::FixRpcResponseMiddleware, rpc_metrics::RpcMetricsMiddleware,
32 rpc_tracing::RpcTracingMiddleware,
33 },
34};
35
36pub mod cookie;
37pub mod error;
38pub mod http_request_compatibility;
39pub mod rpc_call_compatibility;
40pub mod rpc_metrics;
41pub mod rpc_tracing;
42
43#[cfg(test)]
44mod tests;
45
46#[derive(Clone)]
48pub struct RpcServer {
49 config: config::rpc::Config,
51
52 network: Network,
54
55 build_version: String,
57
58 close_handle: ServerHandle,
60}
61
62impl fmt::Debug for RpcServer {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("RpcServer")
65 .field("config", &self.config)
66 .field("network", &self.network)
67 .field("build_version", &self.build_version)
68 .field(
69 "close_handle",
70 &"ServerHandle",
72 )
73 .finish()
74 }
75}
76
77pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
79
80type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
81
82impl RpcServer {
83 #[allow(clippy::too_many_arguments)]
98 pub async fn start<
99 Mempool,
100 State,
101 ReadState,
102 Tip,
103 BlockVerifierRouter,
104 SyncStatus,
105 AddressBook,
106 >(
107 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
108 conf: config::rpc::Config,
109 ) -> Result<ServerTask, tower::BoxError>
110 where
111 Mempool: MempoolService,
112 State: StateService,
113 ReadState: ReadStateService,
114 Tip: ChainTip + Clone + Send + Sync + 'static,
115 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
116 BlockVerifierRouter: BlockVerifierService,
117 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
118 {
119 let listen_addr = conf
120 .listen_addr
121 .expect("caller should make sure listen_addr is set");
122
123 let max_request_body_size = (MAX_BLOCK_BYTES as usize) * 2 + 1024;
126
127 let http_middleware_layer = if conf.enable_cookie_auth {
128 let cookie = Cookie::default();
129 cookie::write_to_disk(&cookie, &conf.cookie_dir)
130 .expect("Zebra must be able to write the auth cookie to the disk");
131 HttpRequestMiddlewareLayer::new(Some(cookie), max_request_body_size)
132 } else {
133 HttpRequestMiddlewareLayer::new(None, max_request_body_size)
134 };
135
136 let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
137
138 let rpc_middleware = RpcServiceBuilder::new()
139 .rpc_logger(1024)
140 .layer_fn(FixRpcResponseMiddleware::new)
141 .layer_fn(RpcMetricsMiddleware::new)
142 .layer_fn(RpcTracingMiddleware::new);
143
144 let server = Server::builder()
145 .http_only()
146 .set_http_middleware(http_middleware)
147 .set_rpc_middleware(rpc_middleware)
148 .max_response_body_size(
149 conf.max_response_body_size
150 .try_into()
151 .expect("should be valid"),
152 )
153 .build(listen_addr)
154 .await?;
155
156 info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
157
158 Ok(tokio::spawn(async move {
159 server.start(rpc.into_rpc()).stopped().await;
160 Ok(())
161 }))
162 }
163
164 pub fn shutdown_blocking(&self) {
169 Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
170 }
171
172 pub fn shutdown(&self) -> JoinHandle<()> {
175 let close_handle = self.close_handle.clone();
176 let config = self.config.clone();
177 let span = Span::current();
178
179 tokio::task::spawn_blocking(move || {
180 span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
181 })
182 }
183
184 fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
188 let span = Span::current();
191 let wait_on_shutdown = move || {
192 span.in_scope(|| {
193 if config.enable_cookie_auth {
194 if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
195 warn!(
196 ?err,
197 "unexpectedly could not remove the rpc auth cookie from the disk"
198 )
199 }
200 }
201
202 info!("Stopping RPC server");
203 let _ = close_handle.stop();
204 debug!("Stopped RPC server");
205 })
206 };
207
208 let span = Span::current();
209 let thread_handle = std::thread::spawn(wait_on_shutdown);
210
211 span.in_scope(|| match thread_handle.join() {
213 Ok(()) => (),
214 Err(panic_object) => panic::resume_unwind(panic_object),
215 })
216 }
217}
218
219impl Drop for RpcServer {
220 fn drop(&mut self) {
221 self.shutdown_blocking();
226 }
227}