zebra_rpc/indexer/
methods.rs1use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::util::ServiceExt;
9
10use tracing::Span;
11use zebra_chain::{chain_tip::ChainTip, serialization::BytesInDisplayOrder};
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse, ReadState};
14
15use super::{
16 indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17 MempoolChangeMessage,
18};
19
20const RESPONSE_BUFFER_SIZE: usize = 4_000;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26 ReadStateService: ReadState,
27 Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29 type ChainTipChangeStream =
30 Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
31 type NonFinalizedStateChangeStream =
32 Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
33 type MempoolChangeStream =
34 Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
35
36 async fn chain_tip_change(
37 &self,
38 _: tonic::Request<Empty>,
39 ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
40 let span = Span::current();
41 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
42 let response_stream = ReceiverStream::new(response_receiver);
43 let mut chain_tip_change = self.chain_tip_change.clone();
44
45 tokio::spawn(async move {
46 while let Ok(()) = chain_tip_change.best_tip_changed().await {
48 let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
49 else {
50 continue;
51 };
52
53 if let Err(error) = response_sender
54 .send(Ok(BlockHashAndHeight::new(tip_hash, tip_height)))
55 .await
56 {
57 span.in_scope(|| {
58 tracing::info!(?error, "failed to send chain tip change, dropping task");
59 });
60 return;
61 }
62 }
63
64 span.in_scope(|| {
65 tracing::warn!("chain_tip_change channel has closed");
66 });
67
68 let _ = response_sender
69 .send(Err(Status::unavailable(
70 "chain_tip_change channel has closed",
71 )))
72 .await;
73 });
74
75 Ok(Response::new(Box::pin(response_stream)))
76 }
77
78 async fn non_finalized_state_change(
79 &self,
80 _: tonic::Request<Empty>,
81 ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
82 let span = Span::current();
83 let read_state = self.read_state.clone();
84 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
85 let response_stream = ReceiverStream::new(response_receiver);
86
87 tokio::spawn(async move {
88 let mut non_finalized_state_change = match read_state
89 .oneshot(ReadRequest::NonFinalizedBlocksListener)
90 .await
91 {
92 Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
93 Ok(_) => unreachable!("unexpected response type from ReadStateService"),
94 Err(error) => {
95 span.in_scope(|| {
96 tracing::error!(
97 ?error,
98 "failed to subscribe to non-finalized state changes"
99 );
100 });
101
102 let _ = response_sender
103 .send(Err(Status::unavailable(
104 "failed to subscribe to non-finalized state changes",
105 )))
106 .await;
107 return;
108 }
109 };
110
111 while let Some((hash, block)) = non_finalized_state_change.recv().await {
113 if let Err(error) = response_sender
114 .send(Ok(BlockAndHash::new(hash, block)))
115 .await
116 {
117 span.in_scope(|| {
118 tracing::info!(
119 ?error,
120 "failed to send non-finalized state change, dropping task"
121 );
122 });
123 return;
124 }
125 }
126
127 span.in_scope(|| {
128 tracing::warn!("non-finalized state change channel has closed");
129 });
130
131 let _ = response_sender
132 .send(Err(Status::unavailable(
133 "non-finalized state change channel has closed",
134 )))
135 .await;
136 });
137
138 Ok(Response::new(Box::pin(response_stream)))
139 }
140
141 async fn mempool_change(
142 &self,
143 _: tonic::Request<Empty>,
144 ) -> Result<Response<Self::MempoolChangeStream>, Status> {
145 let span = Span::current();
146 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
147 let response_stream = ReceiverStream::new(response_receiver);
148 let mut mempool_change = self.mempool_change.subscribe();
149
150 tokio::spawn(async move {
151 while let Ok(change) = mempool_change.recv().await {
153 for tx_id in change.tx_ids() {
154 span.in_scope(|| {
155 tracing::debug!("mempool change: {:?}", change);
156 });
157
158 if let Err(error) = response_sender
159 .send(Ok(MempoolChangeMessage {
160 change_type: match change.kind() {
161 MempoolChangeKind::Added => 0,
162 MempoolChangeKind::Invalidated => 1,
163 MempoolChangeKind::Mined => 2,
164 },
165 tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
166 auth_digest: tx_id
167 .auth_digest()
168 .map(|d| d.bytes_in_display_order().to_vec())
169 .unwrap_or_default(),
170 }))
171 .await
172 {
173 span.in_scope(|| {
174 tracing::info!(?error, "failed to send mempool change, dropping task");
175 });
176 return;
177 }
178 }
179 }
180
181 span.in_scope(|| {
182 tracing::warn!("mempool_change channel has closed");
183 });
184
185 let _ = response_sender
186 .send(Err(Status::unavailable(
187 "mempool_change channel has closed",
188 )))
189 .await;
190 });
191
192 Ok(Response::new(Box::pin(response_stream)))
193 }
194}