Skip to main content

zebra_rpc/indexer/
methods.rs

1//! Implements `Indexer` methods on the `IndexerRPC` type
2
3use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::util::ServiceExt;
9
10use tracing::Span;
11use zebra_chain::{chain_tip::ChainTip, serialization::BytesInDisplayOrder};
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse, ReadState};
14
15use super::{
16    indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17    MempoolChangeMessage,
18};
19
20/// The maximum number of messages that can be queued to be streamed to a client.
21const RESPONSE_BUFFER_SIZE: usize = 64;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26    ReadStateService: ReadState,
27    Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29    type ChainTipChangeStream =
30        Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
31    type NonFinalizedStateChangeStream =
32        Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
33    type MempoolChangeStream =
34        Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
35
36    async fn chain_tip_change(
37        &self,
38        _: tonic::Request<Empty>,
39    ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
40        let span = Span::current();
41        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
42        let response_stream = ReceiverStream::new(response_receiver);
43        let mut chain_tip_change = self.chain_tip_change.clone();
44
45        tokio::spawn(async move {
46            // Notify the client of chain tip changes until the channel is closed
47            while let Ok(()) = chain_tip_change.best_tip_changed().await {
48                let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
49                else {
50                    continue;
51                };
52
53                match response_sender.try_send(Ok(BlockHashAndHeight::new(tip_hash, tip_height))) {
54                    Ok(()) => {}
55                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
56                        span.in_scope(|| {
57                            tracing::info!("client disconnected, dropping chain_tip_change task");
58                        });
59                        return;
60                    }
61                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
62                        span.in_scope(|| {
63                            tracing::warn!("slow consumer, dropping chain_tip_change stream");
64                        });
65                        return;
66                    }
67                }
68            }
69
70            span.in_scope(|| {
71                tracing::warn!("chain_tip_change channel has closed");
72            });
73
74            let _ = response_sender
75                .send(Err(Status::unavailable(
76                    "chain_tip_change channel has closed",
77                )))
78                .await;
79        });
80
81        Ok(Response::new(Box::pin(response_stream)))
82    }
83
84    async fn non_finalized_state_change(
85        &self,
86        _: tonic::Request<Empty>,
87    ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
88        let span = Span::current();
89        let read_state = self.read_state.clone();
90        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
91        let response_stream = ReceiverStream::new(response_receiver);
92
93        tokio::spawn(async move {
94            let mut non_finalized_state_change = match read_state
95                .oneshot(ReadRequest::NonFinalizedBlocksListener)
96                .await
97            {
98                Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
99                Ok(_) => unreachable!("unexpected response type from ReadStateService"),
100                Err(error) => {
101                    span.in_scope(|| {
102                        tracing::error!(
103                            ?error,
104                            "failed to subscribe to non-finalized state changes"
105                        );
106                    });
107
108                    let _ = response_sender
109                        .send(Err(Status::unavailable(
110                            "failed to subscribe to non-finalized state changes",
111                        )))
112                        .await;
113                    return;
114                }
115            };
116
117            // Notify the client of chain tip changes until the channel is closed
118            while let Some((hash, block)) = non_finalized_state_change.recv().await {
119                match response_sender.try_send(Ok(BlockAndHash::new(hash, block))) {
120                    Ok(()) => {}
121                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
122                        span.in_scope(|| {
123                            tracing::info!(
124                                "client disconnected, dropping non_finalized_state_change task"
125                            );
126                        });
127                        return;
128                    }
129                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
130                        span.in_scope(|| {
131                            tracing::warn!(
132                                "slow consumer, dropping non_finalized_state_change stream"
133                            );
134                        });
135                        return;
136                    }
137                }
138            }
139
140            span.in_scope(|| {
141                tracing::warn!("non-finalized state change channel has closed");
142            });
143
144            let _ = response_sender
145                .send(Err(Status::unavailable(
146                    "non-finalized state change channel has closed",
147                )))
148                .await;
149        });
150
151        Ok(Response::new(Box::pin(response_stream)))
152    }
153
154    async fn mempool_change(
155        &self,
156        _: tonic::Request<Empty>,
157    ) -> Result<Response<Self::MempoolChangeStream>, Status> {
158        let span = Span::current();
159        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
160        let response_stream = ReceiverStream::new(response_receiver);
161        let mut mempool_change = self.mempool_change.subscribe();
162
163        tokio::spawn(async move {
164            // Notify the client of chain tip changes until the channel is closed
165            while let Ok(change) = mempool_change.recv().await {
166                for tx_id in change.tx_ids() {
167                    span.in_scope(|| {
168                        tracing::debug!("mempool change: {:?}", change);
169                    });
170
171                    let msg = Ok(MempoolChangeMessage {
172                        change_type: match change.kind() {
173                            MempoolChangeKind::Added => 0,
174                            MempoolChangeKind::Invalidated => 1,
175                            MempoolChangeKind::Mined => 2,
176                        },
177                        tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
178                        auth_digest: tx_id
179                            .auth_digest()
180                            .map(|d| d.bytes_in_display_order().to_vec())
181                            .unwrap_or_default(),
182                    });
183
184                    match response_sender.try_send(msg) {
185                        Ok(()) => {}
186                        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
187                            span.in_scope(|| {
188                                tracing::info!("client disconnected, dropping mempool_change task");
189                            });
190                            return;
191                        }
192                        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
193                            span.in_scope(|| {
194                                tracing::warn!("slow consumer, dropping mempool_change stream");
195                            });
196                            return;
197                        }
198                    }
199                }
200            }
201
202            span.in_scope(|| {
203                tracing::warn!("mempool_change channel has closed");
204            });
205
206            let _ = response_sender
207                .send(Err(Status::unavailable(
208                    "mempool_change channel has closed",
209                )))
210                .await;
211        });
212
213        Ok(Response::new(Box::pin(response_stream)))
214    }
215}