1use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network};
18use zebra_consensus::router::service_trait::BlockVerifierService;
19use zebra_network::AddressBookPeers;
20use zebra_node_services::mempool::MempoolService;
21use zebra_state::{ReadState as ReadStateService, State as StateService};
22
23use crate::{
24 config,
25 methods::{RpcImpl, RpcServer as _},
26 server::{
27 http_request_compatibility::HttpRequestMiddlewareLayer,
28 rpc_call_compatibility::FixRpcResponseMiddleware, rpc_metrics::RpcMetricsMiddleware,
29 rpc_tracing::RpcTracingMiddleware,
30 },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37pub mod rpc_metrics;
38pub mod rpc_tracing;
39
40#[cfg(test)]
41mod tests;
42
43#[derive(Clone)]
45pub struct RpcServer {
46 config: config::rpc::Config,
48
49 network: Network,
51
52 build_version: String,
54
55 close_handle: ServerHandle,
57}
58
59impl fmt::Debug for RpcServer {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("RpcServer")
62 .field("config", &self.config)
63 .field("network", &self.network)
64 .field("build_version", &self.build_version)
65 .field(
66 "close_handle",
67 &"ServerHandle",
69 )
70 .finish()
71 }
72}
73
74pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
76
77type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
78
79impl RpcServer {
80 #[allow(clippy::too_many_arguments)]
95 pub async fn start<
96 Mempool,
97 State,
98 ReadState,
99 Tip,
100 BlockVerifierRouter,
101 SyncStatus,
102 AddressBook,
103 >(
104 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
105 conf: config::rpc::Config,
106 ) -> Result<ServerTask, tower::BoxError>
107 where
108 Mempool: MempoolService,
109 State: StateService,
110 ReadState: ReadStateService,
111 Tip: ChainTip + Clone + Send + Sync + 'static,
112 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
113 BlockVerifierRouter: BlockVerifierService,
114 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
115 {
116 let listen_addr = conf
117 .listen_addr
118 .expect("caller should make sure listen_addr is set");
119
120 let http_middleware_layer = if conf.enable_cookie_auth {
121 let cookie = Cookie::default();
122 cookie::write_to_disk(&cookie, &conf.cookie_dir)
123 .expect("Zebra must be able to write the auth cookie to the disk");
124 HttpRequestMiddlewareLayer::new(Some(cookie))
125 } else {
126 HttpRequestMiddlewareLayer::new(None)
127 };
128
129 let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
130
131 let rpc_middleware = RpcServiceBuilder::new()
132 .rpc_logger(1024)
133 .layer_fn(FixRpcResponseMiddleware::new)
134 .layer_fn(RpcMetricsMiddleware::new)
135 .layer_fn(RpcTracingMiddleware::new);
136
137 let server = Server::builder()
138 .http_only()
139 .set_http_middleware(http_middleware)
140 .set_rpc_middleware(rpc_middleware)
141 .max_response_body_size(
142 conf.max_response_body_size
143 .try_into()
144 .expect("should be valid"),
145 )
146 .build(listen_addr)
147 .await?;
148
149 info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
150
151 Ok(tokio::spawn(async move {
152 server.start(rpc.into_rpc()).stopped().await;
153 Ok(())
154 }))
155 }
156
157 pub fn shutdown_blocking(&self) {
162 Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
163 }
164
165 pub fn shutdown(&self) -> JoinHandle<()> {
168 let close_handle = self.close_handle.clone();
169 let config = self.config.clone();
170 let span = Span::current();
171
172 tokio::task::spawn_blocking(move || {
173 span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
174 })
175 }
176
177 fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
181 let span = Span::current();
184 let wait_on_shutdown = move || {
185 span.in_scope(|| {
186 if config.enable_cookie_auth {
187 if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
188 warn!(
189 ?err,
190 "unexpectedly could not remove the rpc auth cookie from the disk"
191 )
192 }
193 }
194
195 info!("Stopping RPC server");
196 let _ = close_handle.stop();
197 debug!("Stopped RPC server");
198 })
199 };
200
201 let span = Span::current();
202 let thread_handle = std::thread::spawn(wait_on_shutdown);
203
204 span.in_scope(|| match thread_handle.join() {
206 Ok(()) => (),
207 Err(panic_object) => panic::resume_unwind(panic_object),
208 })
209 }
210}
211
212impl Drop for RpcServer {
213 fn drop(&mut self) {
214 self.shutdown_blocking();
219 }
220}