zebra_rpc/indexer/
methods.rs1use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::util::ServiceExt;
9
10use tracing::Span;
11use zebra_chain::{chain_tip::ChainTip, serialization::BytesInDisplayOrder};
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse, ReadState};
14
15use super::{
16 indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17 MempoolChangeMessage,
18};
19
20const RESPONSE_BUFFER_SIZE: usize = 64;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26 ReadStateService: ReadState,
27 Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29 type ChainTipChangeStream =
30 Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
31 type NonFinalizedStateChangeStream =
32 Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
33 type MempoolChangeStream =
34 Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
35
36 async fn chain_tip_change(
37 &self,
38 _: tonic::Request<Empty>,
39 ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
40 let span = Span::current();
41 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
42 let response_stream = ReceiverStream::new(response_receiver);
43 let mut chain_tip_change = self.chain_tip_change.clone();
44
45 tokio::spawn(async move {
46 while let Ok(()) = chain_tip_change.best_tip_changed().await {
48 let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
49 else {
50 continue;
51 };
52
53 match response_sender.try_send(Ok(BlockHashAndHeight::new(tip_hash, tip_height))) {
54 Ok(()) => {}
55 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
56 span.in_scope(|| {
57 tracing::info!("client disconnected, dropping chain_tip_change task");
58 });
59 return;
60 }
61 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
62 span.in_scope(|| {
63 tracing::warn!("slow consumer, dropping chain_tip_change stream");
64 });
65 return;
66 }
67 }
68 }
69
70 span.in_scope(|| {
71 tracing::warn!("chain_tip_change channel has closed");
72 });
73
74 let _ = response_sender
75 .send(Err(Status::unavailable(
76 "chain_tip_change channel has closed",
77 )))
78 .await;
79 });
80
81 Ok(Response::new(Box::pin(response_stream)))
82 }
83
84 async fn non_finalized_state_change(
85 &self,
86 _: tonic::Request<Empty>,
87 ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
88 let span = Span::current();
89 let read_state = self.read_state.clone();
90 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
91 let response_stream = ReceiverStream::new(response_receiver);
92
93 tokio::spawn(async move {
94 let mut non_finalized_state_change = match read_state
95 .oneshot(ReadRequest::NonFinalizedBlocksListener)
96 .await
97 {
98 Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
99 Ok(_) => unreachable!("unexpected response type from ReadStateService"),
100 Err(error) => {
101 span.in_scope(|| {
102 tracing::error!(
103 ?error,
104 "failed to subscribe to non-finalized state changes"
105 );
106 });
107
108 let _ = response_sender
109 .send(Err(Status::unavailable(
110 "failed to subscribe to non-finalized state changes",
111 )))
112 .await;
113 return;
114 }
115 };
116
117 while let Some((hash, block)) = non_finalized_state_change.recv().await {
119 match response_sender.try_send(Ok(BlockAndHash::new(hash, block))) {
120 Ok(()) => {}
121 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
122 span.in_scope(|| {
123 tracing::info!(
124 "client disconnected, dropping non_finalized_state_change task"
125 );
126 });
127 return;
128 }
129 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
130 span.in_scope(|| {
131 tracing::warn!(
132 "slow consumer, dropping non_finalized_state_change stream"
133 );
134 });
135 return;
136 }
137 }
138 }
139
140 span.in_scope(|| {
141 tracing::warn!("non-finalized state change channel has closed");
142 });
143
144 let _ = response_sender
145 .send(Err(Status::unavailable(
146 "non-finalized state change channel has closed",
147 )))
148 .await;
149 });
150
151 Ok(Response::new(Box::pin(response_stream)))
152 }
153
154 async fn mempool_change(
155 &self,
156 _: tonic::Request<Empty>,
157 ) -> Result<Response<Self::MempoolChangeStream>, Status> {
158 let span = Span::current();
159 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
160 let response_stream = ReceiverStream::new(response_receiver);
161 let mut mempool_change = self.mempool_change.subscribe();
162
163 tokio::spawn(async move {
164 while let Ok(change) = mempool_change.recv().await {
166 for tx_id in change.tx_ids() {
167 span.in_scope(|| {
168 tracing::debug!("mempool change: {:?}", change);
169 });
170
171 let msg = Ok(MempoolChangeMessage {
172 change_type: match change.kind() {
173 MempoolChangeKind::Added => 0,
174 MempoolChangeKind::Invalidated => 1,
175 MempoolChangeKind::Mined => 2,
176 },
177 tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
178 auth_digest: tx_id
179 .auth_digest()
180 .map(|d| d.bytes_in_display_order().to_vec())
181 .unwrap_or_default(),
182 });
183
184 match response_sender.try_send(msg) {
185 Ok(()) => {}
186 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
187 span.in_scope(|| {
188 tracing::info!("client disconnected, dropping mempool_change task");
189 });
190 return;
191 }
192 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
193 span.in_scope(|| {
194 tracing::warn!("slow consumer, dropping mempool_change stream");
195 });
196 return;
197 }
198 }
199 }
200 }
201
202 span.in_scope(|| {
203 tracing::warn!("mempool_change channel has closed");
204 });
205
206 let _ = response_sender
207 .send(Err(Status::unavailable(
208 "mempool_change channel has closed",
209 )))
210 .await;
211 });
212
213 Ok(Response::new(Box::pin(response_stream)))
214 }
215}