zebra_rpc/indexer/
server.rs1use std::net::SocketAddr;
4
5use tokio::task::JoinHandle;
6use tonic::transport::{server::TcpIncoming, Server};
7use tower::BoxError;
8use zebra_chain::chain_tip::ChainTip;
9use zebra_node_services::mempool::MempoolTxSubscriber;
10use zebra_state::ReadState;
11
12use crate::{indexer::indexer_server::IndexerServer, server::OPENED_RPC_ENDPOINT_MSG};
13
14type ServerTask = JoinHandle<Result<(), BoxError>>;
15
16pub struct IndexerRPC<ReadStateService, Tip>
18where
19 ReadStateService: ReadState,
20 Tip: ChainTip + Clone + Send + Sync + 'static,
21{
22 pub(super) read_state: ReadStateService,
23 pub(super) chain_tip_change: Tip,
24 pub(super) mempool_change: MempoolTxSubscriber,
25}
26
27#[tracing::instrument(skip_all)]
29pub async fn init<ReadStateService, Tip>(
30 listen_addr: SocketAddr,
31 read_state: ReadStateService,
32 chain_tip_change: Tip,
33 mempool_change: MempoolTxSubscriber,
34) -> Result<(ServerTask, SocketAddr), BoxError>
35where
36 ReadStateService: ReadState,
37 Tip: ChainTip + Clone + Send + Sync + 'static,
38{
39 let indexer_service = IndexerRPC {
40 read_state,
41 chain_tip_change,
42 mempool_change,
43 };
44
45 let reflection_service = tonic_reflection::server::Builder::configure()
46 .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
47 .build_v1()
48 .unwrap();
49
50 tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
51
52 let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
53
54 let listen_addr = tcp_listener.local_addr()?;
55 tracing::info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr);
56
57 let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
58 Server::builder()
59 .add_service(reflection_service)
60 .add_service(IndexerServer::new(indexer_service))
61 .serve_with_incoming(TcpIncoming::from(tcp_listener))
62 .await?;
63
64 Ok(())
65 });
66
67 Ok((server_task, listen_addr))
68}