Skip to main content

zebrad/components/
health.rs

1//! HTTP health and readiness endpoints for `zebrad`.
2//!
3//! Overview
4//!
5//! - This module exposes two small HTTP/1.1 endpoints for basic liveness/readiness checks,
6//!   suitable for Kubernetes probes and load balancers.
7//! - Endpoints are opt-in, disabled by default. Enable by setting a `listen_addr` in the
8//!   `health` config section.
9//! - Plain-text responses and small responses keep the checks fast and safe.
10//!
11//! Endpoints
12//!
13//! - `GET /healthy` — returns `200 OK` if the process is up and the node has at least
14//!   `min_connected_peers` recently live peers (default: 1). Otherwise `503 Service Unavailable`.
15//! - `GET /ready` — returns `200 OK` if the node is near the chain tip, the estimated block lag is
16//!   within `ready_max_blocks_behind`, and the latest committed block is recent. On regtest/testnet,
17//!   readiness returns `200` unless `enforce_on_test_networks` is set.
18//!
19//! Security
20//!
21//! - Endpoints are unauthenticated by design. Bind to internal network interfaces,
22//!   and restrict exposure using network policy, firewall rules, and service configuration.
23//! - The server does not access or return private data. It only summarises coarse node state.
24//!
25//! Configuration and examples
26//!
27//! - See the Zebra Book for configuration details and Kubernetes probe examples:
28//!   <https://zebra.zfnd.org/user/health.html>
29
30mod config;
31#[cfg(test)]
32mod tests;
33
34pub use config::Config;
35use derive_new::new;
36
37use std::time::Instant;
38use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
39
40use bytes::Bytes;
41use chrono::Utc;
42use http_body_util::Full;
43use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE};
44use hyper::server::conn::http1;
45use hyper::{
46    body::Incoming, http::response::Builder as ResponseBuilder, Method, Request, Response,
47    StatusCode,
48};
49use hyper_util::rt::TokioIo;
50use tokio::{
51    sync::watch,
52    task::JoinHandle,
53    time::{self, MissedTickBehavior},
54};
55use tracing::{debug, info, warn};
56
57use zebra_chain::{chain_sync_status::ChainSyncStatus, parameters::Network};
58use zebra_network::AddressBookPeers;
59
60// Refresh peers on a short cadence so the cached snapshot tracks live network
61// conditions closely without hitting the address book mutex on every request.
62const PEER_METRICS_REFRESH_INTERVAL: Duration = Duration::from_secs(5);
63
64const METHOD_NOT_ALLOWED_MSG: &str = "method not allowed";
65const NOT_FOUND_MSG: &str = "not found";
66
67/// The maximum number of requests that will be handled in a given time interval before requests are dropped.
68const MAX_RECENT_REQUESTS: usize = 10_000;
69const RECENT_REQUEST_INTERVAL: Duration = Duration::from_secs(5);
70
71#[derive(Clone)]
72struct HealthCtx<SyncStatus>
73where
74    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
75{
76    config: Config,
77    network: Network,
78    chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
79    sync_status: SyncStatus,
80    num_live_peer_receiver: watch::Receiver<usize>,
81}
82
83/// Metrics tracking how long it's been since
84#[derive(Debug, Clone, PartialEq, Eq, new)]
85pub struct ChainTipMetrics {
86    /// Last time the chain tip height increased.
87    pub last_chain_tip_grow_time: Instant,
88    /// Estimated distance between Zebra's chain tip and the network chain tip.
89    pub remaining_sync_blocks: Option<i64>,
90}
91
92impl ChainTipMetrics {
93    /// Creates a new watch channel for reporting [`ChainTipMetrics`].
94    pub fn channel() -> (watch::Sender<Self>, watch::Receiver<Self>) {
95        watch::channel(Self {
96            last_chain_tip_grow_time: Instant::now(),
97            remaining_sync_blocks: None,
98        })
99    }
100}
101
102/// Starts the health server if `listen_addr` is configured.
103///
104/// Returns a task handle and the bound socket address. When disabled, returns a
105/// pending task and `None` for the address.
106///
107/// The server accepts HTTP/1.1 requests on a dedicated TCP listener and serves
108/// two endpoints: `/healthy` and `/ready`.
109///
110/// # Panics
111///
112/// - If the configured `listen_addr` cannot be bound.
113pub async fn init<AddressBook, SyncStatus>(
114    config: Config,
115    network: Network,
116    chain_tip_metrics_receiver: watch::Receiver<ChainTipMetrics>,
117    sync_status: SyncStatus,
118    address_book: AddressBook,
119) -> (JoinHandle<()>, Option<SocketAddr>)
120where
121    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
122    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
123{
124    let Some(listen_addr) = config.listen_addr else {
125        return (tokio::spawn(std::future::pending()), None);
126    };
127
128    info!("opening health endpoint at {listen_addr}...",);
129
130    let listener = tokio::net::TcpListener::bind(listen_addr)
131            .await
132            .unwrap_or_else(|e| panic!("Opening health endpoint listener {listen_addr:?} failed: {e:?}. Hint: Check if another zebrad is running, or change the health listen_addr in the config."));
133
134    let local = listener.local_addr().unwrap_or_else(|err| {
135        tracing::warn!(?err, "failed to read local addr from TcpListener");
136        listen_addr
137    });
138
139    info!("opened health endpoint at {}", local);
140
141    let (num_live_peer_sender, num_live_peer_receiver) = watch::channel(0);
142
143    // Seed the watch channel with the first snapshot so early requests see
144    // a consistent view even before the refresher loop has ticked.
145    if let Some(metrics) = num_live_peers(&address_book).await {
146        let _ = num_live_peer_sender.send(metrics);
147    }
148
149    // Refresh metrics in the background using a watch channel so request
150    // handlers can read the latest snapshot without taking locks.
151    let metrics_task = tokio::spawn(peer_metrics_refresh_task(
152        address_book.clone(),
153        num_live_peer_sender,
154    ));
155
156    let shared = Arc::new(HealthCtx {
157        config,
158        network,
159        chain_tip_metrics_receiver,
160        sync_status,
161        num_live_peer_receiver,
162    });
163
164    let server_task = tokio::spawn(run_health_server(listener, shared));
165
166    // Keep both async tasks tied to a single JoinHandle so shutdown and
167    // abort semantics mirror other components.
168    let task = tokio::spawn(async move {
169        tokio::select! {
170            _ = metrics_task => {},
171            _ = server_task => {},
172        }
173    });
174
175    (task, Some(local))
176}
177
178async fn handle_request<SyncStatus>(
179    req: Request<Incoming>,
180    ctx: Arc<HealthCtx<SyncStatus>>,
181) -> Result<Response<Full<Bytes>>, Infallible>
182where
183    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
184{
185    // Hyper is already lightweight, but we still fence non-GET methods to keep
186    // these endpoints deterministic for probes.
187    if req.method() != Method::GET {
188        return Ok(simple_response(
189            StatusCode::METHOD_NOT_ALLOWED,
190            METHOD_NOT_ALLOWED_MSG,
191        ));
192    }
193
194    let path = req.uri().path();
195    let response = match path {
196        "/healthy" => healthy(&ctx).await,
197        "/ready" => ready(&ctx).await,
198        _ => simple_response(StatusCode::NOT_FOUND, NOT_FOUND_MSG),
199    };
200
201    Ok(response)
202}
203
204// Liveness: ensure we still have the configured minimum of recently live peers,
205// matching historical behaviour but fed from the cached snapshot to avoid
206// mutex contention.
207async fn healthy<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
208where
209    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
210{
211    if *ctx.num_live_peer_receiver.borrow() >= ctx.config.min_connected_peers {
212        simple_response(StatusCode::OK, "ok")
213    } else {
214        simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers")
215    }
216}
217
218// Readiness: combine peer availability, sync progress, estimated lag, and tip
219// freshness to avoid the false positives called out in issue #4649 and the
220// implementation plan.
221async fn ready<SyncStatus>(ctx: &HealthCtx<SyncStatus>) -> Response<Full<Bytes>>
222where
223    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
224{
225    if !ctx.config.enforce_on_test_networks && ctx.network.is_a_test_network() {
226        return simple_response(StatusCode::OK, "ok");
227    }
228
229    if *ctx.num_live_peer_receiver.borrow() < ctx.config.min_connected_peers {
230        return simple_response(StatusCode::SERVICE_UNAVAILABLE, "insufficient peers");
231    }
232
233    // Keep the historical sync-gate but feed it with the richer readiness
234    // checks so we respect the plan's "ensure recent block commits" item.
235    if !ctx.sync_status.is_close_to_tip() {
236        return simple_response(StatusCode::SERVICE_UNAVAILABLE, "syncing");
237    }
238
239    let ChainTipMetrics {
240        last_chain_tip_grow_time,
241        remaining_sync_blocks,
242    } = ctx.chain_tip_metrics_receiver.borrow().clone();
243
244    let Some(remaining_sync_blocks) = remaining_sync_blocks else {
245        tracing::warn!("syncer is getting block hashes from peers, but state is empty");
246        return simple_response(StatusCode::SERVICE_UNAVAILABLE, "no tip");
247    };
248
249    let tip_age = last_chain_tip_grow_time.elapsed();
250    if tip_age > ctx.config.ready_max_tip_age {
251        return simple_response(
252            StatusCode::SERVICE_UNAVAILABLE,
253            &format!("tip_age={}s", tip_age.as_secs()),
254        );
255    }
256
257    if remaining_sync_blocks <= ctx.config.ready_max_blocks_behind {
258        simple_response(StatusCode::OK, "ok")
259    } else {
260        simple_response(
261            StatusCode::SERVICE_UNAVAILABLE,
262            &format!("lag={remaining_sync_blocks} blocks"),
263        )
264    }
265}
266
267// Measure peers on a blocking thread, mirroring the previous synchronous
268// implementation but without holding the mutex on the request path.
269async fn num_live_peers<A>(address_book: &A) -> Option<usize>
270where
271    A: AddressBookPeers + Clone + Send + Sync + 'static,
272{
273    let address_book = address_book.clone();
274    tokio::task::spawn_blocking(move || address_book.recently_live_peers(Utc::now()).len())
275        .await
276        .inspect_err(|err| warn!(?err, "failed to refresh peer metrics"))
277        .ok()
278}
279
280// Periodically update the cached peer metrics for all handlers that hold a
281// receiver. If receivers disappear we exit quietly so shutdown can proceed.
282async fn peer_metrics_refresh_task<A>(address_book: A, num_live_peer_sender: watch::Sender<usize>)
283where
284    A: AddressBookPeers + Clone + Send + Sync + 'static,
285{
286    let mut interval = time::interval(PEER_METRICS_REFRESH_INTERVAL);
287    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
288
289    loop {
290        // Updates are best-effort: if the snapshot fails or all receivers are
291        // dropped we exit quietly, letting the caller terminate the health task.
292        if let Some(metrics) = num_live_peers(&address_book).await {
293            if let Err(err) = num_live_peer_sender.send(metrics) {
294                tracing::warn!(?err, "failed to send to peer metrics channel");
295                break;
296            }
297        }
298
299        interval.tick().await;
300    }
301}
302
303async fn run_health_server<SyncStatus>(
304    listener: tokio::net::TcpListener,
305    shared: Arc<HealthCtx<SyncStatus>>,
306) where
307    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
308{
309    let mut num_recent_requests: usize = 0;
310    let mut last_request_count_reset_time = Instant::now();
311
312    // Dedicated accept loop to keep request handling small and predictable; we
313    // still spawn per-connection tasks but share the context clone.
314
315    loop {
316        match listener.accept().await {
317            Ok((stream, _)) => {
318                if num_recent_requests < MAX_RECENT_REQUESTS {
319                    num_recent_requests += 1;
320                } else if last_request_count_reset_time.elapsed() > RECENT_REQUEST_INTERVAL {
321                    num_recent_requests = 0;
322                    last_request_count_reset_time = Instant::now();
323                } else {
324                    // Drop the request if there have been too many recent requests
325                    continue;
326                }
327
328                let io = TokioIo::new(stream);
329                let svc_ctx = shared.clone();
330                let service =
331                    hyper::service::service_fn(move |req| handle_request(req, svc_ctx.clone()));
332
333                tokio::spawn(async move {
334                    if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
335                        debug!(?err, "health server connection closed with error");
336                    }
337                });
338            }
339            Err(err) => {
340                warn!(?err, "health server accept failed");
341            }
342        }
343    }
344}
345
346fn simple_response(status: StatusCode, body: &str) -> Response<Full<Bytes>> {
347    let bytes = Bytes::from(body.to_string());
348    let len = bytes.len();
349    ResponseBuilder::new()
350        .status(status)
351        .header(CONTENT_TYPE, "text/plain; charset=utf-8")
352        .header(CONTENT_LENGTH, len.to_string())
353        .body(Full::new(bytes))
354        .expect("valid response")
355}