1mod config;
31#[cfg(test)]
32mod tests;
33
34pub use config::Config;
35use derive_new::new;
36
37use std::time::Instant;
38use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
39
40use bytes::Bytes;
41use chrono::Utc;
42use http_body_util::Full;
43use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE};
44use hyper::server::conn::http1;
45use hyper::{
46 body::Incoming, http::response::Builder as ResponseBuilder, Method, Request, Response,
47 StatusCode,
48};
49use hyper_util::rt::TokioIo;
50use tokio::{
51 sync::watch,
52 task::JoinHandle,
53 time::{self, MissedTickBehavior},
54};
55use tracing::{debug, info, warn};
56
57use zebra_chain::{chain_sync_status::ChainSyncStatus, parameters::Network};
58use zebra_network::AddressBookPeers;
59
60const PEER_METRICS_REFRESH_INTERVAL: Duration = Duration::from_secs(5);
63
64const METHOD_NOT_ALLOWED_MSG: &str = "method not allowed";
65const NOT_FOUND_MSG: &str = "not found";
66
67const MAX_RECENT_REQUESTS: usize = 10_000;
69const RECENT_REQUEST_INTERVAL: Duration = Duration::from_secs(5);
70
71#[derive(Clone)]
72struct HealthCtx<SyncStatus>
73where
74 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
75{
76 config: Config,
77 network: Network,
78 chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
79 sync_status: SyncStatus,
80 num_live_peer_receiver: watch::Receiver<usize>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, new)]
85pub struct ChainTipMetrics {
86 pub last_chain_tip_grow_time: Instant,
88 pub remaining_sync_blocks: Option<i64>,
90}
91
92impl ChainTipMetrics {
93 pub fn channel() -> (watch::Sender<Self>, watch::Receiver<Self>) {
95 watch::channel(Self {
96 last_chain_tip_grow_time: Instant::now(),
97 remaining_sync_blocks: None,
98 })
99 }
100}
101
102pub async fn init<AddressBook, SyncStatus>(
114 config: Config,
115 network: Network,
116 chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
117 sync_status: SyncStatus,
118 address_book: AddressBook,
119) -> (JoinHandle<()>, Option<SocketAddr>)
120where
121 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
122 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
123{
124 let Some(listen_addr) = config.listen_addr else {
125 return (tokio::spawn(std::future::pending()), None);
126 };
127
128 info!("opening health endpoint at {listen_addr}...",);
129
130 let listener = tokio::net::TcpListener::bind(listen_addr)
131 .await
132 .unwrap_or_else(|e| panic!("Opening health endpoint listener {listen_addr:?} failed: {e:?}. Hint: Check if another zebrad is running, or change the health listen_addr in the config."));
133
134 let local = listener.local_addr().unwrap_or_else(|err| {
135 tracing::warn!(?err, "failed to read local addr from TcpListener");
136 listen_addr
137 });
138
139 info!("opened health endpoint at {}", local);
140
141 let (num_live_peer_sender, num_live_peer_receiver) = watch::channel(0);
142
143 if let Some(metrics) = num_live_peers(&address_book).await {
146 let _ = num_live_peer_sender.send(metrics);
147 }
148
149 let metrics_task = tokio::spawn(peer_metrics_refresh_task(
152 address_book.clone(),
153 num_live_peer_sender,
154 ));
155
156 let shared = Arc::new(HealthCtx {
157 config,
158 network,
159 chain_tip_metrics_receiver,
160 sync_status,
161 num_live_peer_receiver,
162 });
163
164 let server_task = tokio::spawn(run_health_server(listener, shared));
165
166 let task = tokio::spawn(async move {
169 tokio::select! {
170 _ = metrics_task => {},
171 _ = server_task => {},
172 }
173 });
174
175 (task, Some(local))
176}
177
178async fn handle_request<SyncStatus>(
179 req: Request<Incoming>,
180 ctx: Arc<HealthCtx<SyncStatus>>,
181) -> Result<Response<Full<Bytes>>, Infallible>
182where
183 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
184{
185 if req.method() != Method::GET {
188 return Ok(simple_response(
189 StatusCode::METHOD_NOT_ALLOWED,
190 METHOD_NOT_ALLOWED_MSG,
191 ));
192 }
193
194 let path = req.uri().path();
195 let response = match path {
196 "/healthy" => healthy(&ctx).await,
197 "/ready" => ready(&ctx).await,
198 _ => simple_response(StatusCode::NOT_FOUND, NOT_FOUND_MSG),
199 };
200
201 Ok(response)
202}
203
204async fn healthy<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
208where
209 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
210{
211 if *ctx.num_live_peer_receiver.borrow() >= ctx.config.min_connected_peers {
212 simple_response(StatusCode::OK, "ok")
213 } else {
214 simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers")
215 }
216}
217
218async fn ready<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
222where
223 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
224{
225 if !ctx.config.enforce_on_test_networks && ctx.network.is_a_test_network() {
226 return simple_response(StatusCode::OK, "ok");
227 }
228
229 if *ctx.num_live_peer_receiver.borrow() < ctx.config.min_connected_peers {
230 return simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers");
231 }
232
233 if !ctx.sync_status.is_close_to_tip() {
236 return simple_response(StatusCode::SERVICE_UNAVAILABLE, "syncing");
237 }
238
239 let ChainTipMetrics {
240 last_chain_tip_grow_time,
241 remaining_sync_blocks,
242 } = ctx.chain_tip_metrics_receiver.borrow().clone();
243
244 let Some(remaining_sync_blocks) = remaining_sync_blocks else {
245 tracing::warn!("syncer is getting block hashes from peers, but state is empty");
246 return simple_response(StatusCode::SERVICE_UNAVAILABLE, "no tip");
247 };
248
249 let tip_age = last_chain_tip_grow_time.elapsed();
250 if tip_age > ctx.config.ready_max_tip_age {
251 return simple_response(
252 StatusCode::SERVICE_UNAVAILABLE,
253 &format!("tip_age={}s", tip_age.as_secs()),
254 );
255 }
256
257 if remaining_sync_blocks <= ctx.config.ready_max_blocks_behind {
258 simple_response(StatusCode::OK, "ok")
259 } else {
260 simple_response(
261 StatusCode::SERVICE_UNAVAILABLE,
262 &format!("lag={remaining_sync_blocks} blocks"),
263 )
264 }
265}
266
267async fn num_live_peers<A>(address_book: &A) -> Option<usize>
270where
271 A: AddressBookPeers + Clone + Send + Sync + 'static,
272{
273 let address_book = address_book.clone();
274 tokio::task::spawn_blocking(move || address_book.recently_live_peers(Utc::now()).len())
275 .await
276 .inspect_err(|err| warn!(?err, "failed to refresh peer metrics"))
277 .ok()
278}
279
280async fn peer_metrics_refresh_task<A>(address_book: A, num_live_peer_sender: watch::Sender<usize>)
283where
284 A: AddressBookPeers + Clone + Send + Sync + 'static,
285{
286 let mut interval = time::interval(PEER_METRICS_REFRESH_INTERVAL);
287 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
288
289 loop {
290 if let Some(metrics) = num_live_peers(&address_book).await {
293 if let Err(err) = num_live_peer_sender.send(metrics) {
294 tracing::warn!(?err, "failed to send to peer metrics channel");
295 break;
296 }
297 }
298
299 interval.tick().await;
300 }
301}
302
303async fn run_health_server<SyncStatus>(
304 listener: tokio::net::TcpListener,
305 shared: Arc<HealthCtx<SyncStatus>>,
306) where
307 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
308{
309 let mut num_recent_requests: usize = 0;
310 let mut last_request_count_reset_time = Instant::now();
311
312 loop {
316 match listener.accept().await {
317 Ok((stream, _)) => {
318 if num_recent_requests < MAX_RECENT_REQUESTS {
319 num_recent_requests += 1;
320 } else if last_request_count_reset_time.elapsed() > RECENT_REQUEST_INTERVAL {
321 num_recent_requests = 0;
322 last_request_count_reset_time = Instant::now();
323 } else {
324 continue;
326 }
327
328 let io = TokioIo::new(stream);
329 let svc_ctx = shared.clone();
330 let service =
331 hyper::service::service_fn(move |req| handle_request(req, svc_ctx.clone()));
332
333 tokio::spawn(async move {
334 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
335 debug!(?err, "health server connection closed with error");
336 }
337 });
338 }
339 Err(err) => {
340 warn!(?err, "health server accept failed");
341 }
342 }
343 }
344}
345
346fn simple_response(status: StatusCode, body: &str) -> Response<Full<Bytes>> {
347 let bytes = Bytes::from(body.to_string());
348 let len = bytes.len();
349 ResponseBuilder::new()
350 .status(status)
351 .header(CONTENT_TYPE, "text/plain; charset=utf-8")
352 .header(CONTENT_LENGTH, len.to_string())
353 .body(Full::new(bytes))
354 .expect("valid response")
355}