zebra_consensus/primitives/
redjubjub.rs1use std::{
4 future::Future,
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use zebra_chain::primitives::redjubjub::*;
20
21use crate::BoxError;
22
23use super::{spawn_fifo, spawn_fifo_and_convert};
24
25#[cfg(test)]
26mod tests;
27
28type BatchVerifier = batch::Verifier;
30
31type VerifyResult = Result<(), Error>;
33
34type Sender = watch::Sender<Option<VerifyResult>>;
36
37#[derive(Clone, Debug)]
40pub struct Item(batch::Item);
41
42impl RequestWeight for Item {}
43
44impl<T: Into<batch::Item>> From<T> for Item {
45 fn from(value: T) -> Self {
46 Self(value.into())
47 }
48}
49
50impl Item {
51 fn verify_single(self) -> VerifyResult {
52 self.0.verify_single()
53 }
54}
55
56pub static VERIFIER: Lazy<
65 Fallback<
66 Batch<Verifier, Item>,
67 ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
68 >,
69> = Lazy::new(|| {
70 Fallback::new(
71 Batch::new(
72 Verifier::default(),
73 super::MAX_BATCH_SIZE,
74 None,
75 super::MAX_BATCH_LATENCY,
76 ),
77 tower::service_fn(
87 (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
88 ),
89 )
90});
91
92pub struct Verifier {
94 batch: BatchVerifier,
96
97 tx: Sender,
102}
103
104impl Default for Verifier {
105 fn default() -> Self {
106 let batch = BatchVerifier::default();
107 let (tx, _) = watch::channel(None);
108 Self { batch, tx }
109 }
110}
111
112impl Verifier {
113 fn take(&mut self) -> (BatchVerifier, Sender) {
116 let batch = mem::take(&mut self.batch);
118
119 let (tx, _) = watch::channel(None);
120 let tx = mem::replace(&mut self.tx, tx);
121
122 (batch, tx)
123 }
124
125 fn verify(batch: BatchVerifier, tx: Sender) {
128 let result = batch.verify(thread_rng());
129 let _ = tx.send(Some(result));
130 }
131
132 fn flush_blocking(&mut self) {
135 let (batch, tx) = self.take();
136
137 tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
141 }
142
143 async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
146 let start = std::time::Instant::now();
148 let result = spawn_fifo(move || batch.verify(thread_rng())).await;
149 let duration = start.elapsed().as_secs_f64();
150
151 let result_label = match &result {
152 Ok(Ok(())) => "success",
153 _ => "failure",
154 };
155 metrics::histogram!(
156 "zebra.consensus.batch.duration_seconds",
157 "verifier" => "redjubjub",
158 "result" => result_label
159 )
160 .record(duration);
161
162 let _ = tx.send(result.ok());
163 }
164
165 async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
167 spawn_fifo_and_convert(move || item.verify_single()).await
169 }
170}
171
172impl Service<BatchControl<Item>> for Verifier {
173 type Response = ();
174 type Error = BoxError;
175 type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
176
177 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178 Poll::Ready(Ok(()))
179 }
180
181 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
182 match req {
183 BatchControl::Item(item) => {
184 tracing::trace!("got item");
185 self.batch.queue(item.0);
186 let mut rx = self.tx.subscribe();
187
188 Box::pin(async move {
189 match rx.changed().await {
190 Ok(()) => {
191 let result = rx.borrow()
194 .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
195
196 if result.is_ok() {
197 tracing::trace!(?result, "validated redjubjub signature");
198 metrics::counter!("signatures.redjubjub.validated").increment(1);
199 } else {
200 tracing::trace!(?result, "invalid redjubjub signature");
201 metrics::counter!("signatures.redjubjub.invalid").increment(1);
202 }
203
204 result.map_err(BoxError::from)
205 }
206 Err(_recv_error) => panic!("verifier was dropped without flushing"),
207 }
208 })
209 }
210
211 BatchControl::Flush => {
212 tracing::trace!("got redjubjub flush command");
213
214 let (batch, tx) = self.take();
215
216 Box::pin(Self::flush_spawning(batch, tx).map(Ok))
217 }
218 }
219 }
220}
221
222impl Drop for Verifier {
223 fn drop(&mut self) {
224 self.flush_blocking();
227 }
228}