Skip to main content

zebra_rpc/indexer/
server.rs

1//! A tonic RPC server for Zebra's indexer API.
2
3use std::net::SocketAddr;
4
5use tokio::task::JoinHandle;
6use tonic::transport::{server::TcpIncoming, Server};
7use tower::BoxError;
8use zebra_chain::chain_tip::ChainTip;
9use zebra_node_services::mempool::MempoolTxSubscriber;
10use zebra_state::ReadState;
11
12use crate::{indexer::indexer_server::IndexerServer, server::OPENED_RPC_ENDPOINT_MSG};
13
14type ServerTask = JoinHandle<Result<(), BoxError>>;
15
16/// Indexer RPC service.
17pub struct IndexerRPC<ReadStateService, Tip>
18where
19    ReadStateService: ReadState,
20    Tip: ChainTip + Clone + Send + Sync + 'static,
21{
22    pub(super) read_state: ReadStateService,
23    pub(super) chain_tip_change: Tip,
24    pub(super) mempool_change: MempoolTxSubscriber,
25}
26
27/// Initializes the indexer RPC server
28#[tracing::instrument(skip_all)]
29pub async fn init<ReadStateService, Tip>(
30    listen_addr: SocketAddr,
31    read_state: ReadStateService,
32    chain_tip_change: Tip,
33    mempool_change: MempoolTxSubscriber,
34) -> Result<(ServerTask, SocketAddr), BoxError>
35where
36    ReadStateService: ReadState,
37    Tip: ChainTip + Clone + Send + Sync + 'static,
38{
39    let indexer_service = IndexerRPC {
40        read_state,
41        chain_tip_change,
42        mempool_change,
43    };
44
45    let reflection_service = tonic_reflection::server::Builder::configure()
46        .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
47        .build_v1()
48        .unwrap();
49
50    tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
51
52    let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
53
54    let listen_addr = tcp_listener.local_addr()?;
55    tracing::info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr);
56
57    let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
58        Server::builder()
59            .add_service(reflection_service)
60            .add_service(IndexerServer::new(indexer_service))
61            .serve_with_incoming(TcpIncoming::from(tcp_listener))
62            .await?;
63
64        Ok(())
65    });
66
67    Ok((server_task, listen_addr))
68}