Skip to main content

zebra_rpc/indexer/
methods.rs

1//! Implements `Indexer` methods on the `IndexerRPC` type
2
3use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::util::ServiceExt;
9
10use tracing::Span;
11use zebra_chain::{chain_tip::ChainTip, serialization::BytesInDisplayOrder};
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse, ReadState};
14
15use super::{
16    indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17    MempoolChangeMessage,
18};
19
20/// The maximum number of messages that can be queued to be streamed to a client
21const RESPONSE_BUFFER_SIZE: usize = 4_000;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26    ReadStateService: ReadState,
27    Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29    type ChainTipChangeStream =
30        Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
31    type NonFinalizedStateChangeStream =
32        Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
33    type MempoolChangeStream =
34        Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
35
36    async fn chain_tip_change(
37        &self,
38        _: tonic::Request<Empty>,
39    ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
40        let span = Span::current();
41        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
42        let response_stream = ReceiverStream::new(response_receiver);
43        let mut chain_tip_change = self.chain_tip_change.clone();
44
45        tokio::spawn(async move {
46            // Notify the client of chain tip changes until the channel is closed
47            while let Ok(()) = chain_tip_change.best_tip_changed().await {
48                let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
49                else {
50                    continue;
51                };
52
53                if let Err(error) = response_sender
54                    .send(Ok(BlockHashAndHeight::new(tip_hash, tip_height)))
55                    .await
56                {
57                    span.in_scope(|| {
58                        tracing::info!(?error, "failed to send chain tip change, dropping task");
59                    });
60                    return;
61                }
62            }
63
64            span.in_scope(|| {
65                tracing::warn!("chain_tip_change channel has closed");
66            });
67
68            let _ = response_sender
69                .send(Err(Status::unavailable(
70                    "chain_tip_change channel has closed",
71                )))
72                .await;
73        });
74
75        Ok(Response::new(Box::pin(response_stream)))
76    }
77
78    async fn non_finalized_state_change(
79        &self,
80        _: tonic::Request<Empty>,
81    ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
82        let span = Span::current();
83        let read_state = self.read_state.clone();
84        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
85        let response_stream = ReceiverStream::new(response_receiver);
86
87        tokio::spawn(async move {
88            let mut non_finalized_state_change = match read_state
89                .oneshot(ReadRequest::NonFinalizedBlocksListener)
90                .await
91            {
92                Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
93                Ok(_) => unreachable!("unexpected response type from ReadStateService"),
94                Err(error) => {
95                    span.in_scope(|| {
96                        tracing::error!(
97                            ?error,
98                            "failed to subscribe to non-finalized state changes"
99                        );
100                    });
101
102                    let _ = response_sender
103                        .send(Err(Status::unavailable(
104                            "failed to subscribe to non-finalized state changes",
105                        )))
106                        .await;
107                    return;
108                }
109            };
110
111            // Notify the client of chain tip changes until the channel is closed
112            while let Some((hash, block)) = non_finalized_state_change.recv().await {
113                if let Err(error) = response_sender
114                    .send(Ok(BlockAndHash::new(hash, block)))
115                    .await
116                {
117                    span.in_scope(|| {
118                        tracing::info!(
119                            ?error,
120                            "failed to send non-finalized state change, dropping task"
121                        );
122                    });
123                    return;
124                }
125            }
126
127            span.in_scope(|| {
128                tracing::warn!("non-finalized state change channel has closed");
129            });
130
131            let _ = response_sender
132                .send(Err(Status::unavailable(
133                    "non-finalized state change channel has closed",
134                )))
135                .await;
136        });
137
138        Ok(Response::new(Box::pin(response_stream)))
139    }
140
141    async fn mempool_change(
142        &self,
143        _: tonic::Request<Empty>,
144    ) -> Result<Response<Self::MempoolChangeStream>, Status> {
145        let span = Span::current();
146        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
147        let response_stream = ReceiverStream::new(response_receiver);
148        let mut mempool_change = self.mempool_change.subscribe();
149
150        tokio::spawn(async move {
151            // Notify the client of chain tip changes until the channel is closed
152            while let Ok(change) = mempool_change.recv().await {
153                for tx_id in change.tx_ids() {
154                    span.in_scope(|| {
155                        tracing::debug!("mempool change: {:?}", change);
156                    });
157
158                    if let Err(error) = response_sender
159                        .send(Ok(MempoolChangeMessage {
160                            change_type: match change.kind() {
161                                MempoolChangeKind::Added => 0,
162                                MempoolChangeKind::Invalidated => 1,
163                                MempoolChangeKind::Mined => 2,
164                            },
165                            tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
166                            auth_digest: tx_id
167                                .auth_digest()
168                                .map(|d| d.bytes_in_display_order().to_vec())
169                                .unwrap_or_default(),
170                        }))
171                        .await
172                    {
173                        span.in_scope(|| {
174                            tracing::info!(?error, "failed to send mempool change, dropping task");
175                        });
176                        return;
177                    }
178                }
179            }
180
181            span.in_scope(|| {
182                tracing::warn!("mempool_change channel has closed");
183            });
184
185            let _ = response_sender
186                .send(Err(Status::unavailable(
187                    "mempool_change channel has closed",
188                )))
189                .await;
190        });
191
192        Ok(Response::new(Box::pin(response_stream)))
193    }
194}