Skip to main content

zebra_consensus/primitives/
redjubjub.rs

1//! Async RedJubjub batch verifier service
2
3use std::{
4    future::Future,
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use zebra_chain::primitives::redjubjub::*;
20
21use crate::BoxError;
22
23use super::{spawn_fifo, spawn_fifo_and_convert};
24
25#[cfg(test)]
26mod tests;
27
28/// The type of the batch verifier.
29type BatchVerifier = batch::Verifier;
30
31/// The type of verification results.
32type VerifyResult = Result<(), Error>;
33
34/// The type of the batch sender channel.
35type Sender = watch::Sender<Option<VerifyResult>>;
36
37/// The type of the batch item.
38/// This is a newtype around a `RedJubjubItem`.
39#[derive(Clone, Debug)]
40pub struct Item(batch::Item);
41
42impl RequestWeight for Item {}
43
44impl<T: Into<batch::Item>> From<T> for Item {
45    fn from(value: T) -> Self {
46        Self(value.into())
47    }
48}
49
50impl Item {
51    fn verify_single(self) -> VerifyResult {
52        self.0.verify_single()
53    }
54}
55
56/// Global batch verification context for RedJubjub signatures.
57///
58/// This service transparently batches contemporaneous signature verifications,
59/// handling batch failures by falling back to individual verification.
60///
61/// Note that making a `Service` call requires mutable access to the service, so
62/// you should call `.clone()` on the global handle to create a local, mutable
63/// handle.
64pub static VERIFIER: Lazy<
65    Fallback<
66        Batch<Verifier, Item>,
67        ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
68    >,
69> = Lazy::new(|| {
70    Fallback::new(
71        Batch::new(
72            Verifier::default(),
73            super::MAX_BATCH_SIZE,
74            None,
75            super::MAX_BATCH_LATENCY,
76        ),
77        // We want to fallback to individual verification if batch verification fails,
78        // so we need a Service to use.
79        //
80        // Because we have to specify the type of a static, we need to be able to
81        // write the type of the closure and its return value. But both closures and
82        // async blocks have unnameable types. So instead we cast the closure to a function
83        // (which is possible because it doesn't capture any state), and use a BoxFuture
84        // to erase the result type.
85        // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
86        tower::service_fn(
87            (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
88        ),
89    )
90});
91
92/// RedJubjub signature verifier service
93pub struct Verifier {
94    /// A batch verifier for RedJubjub signatures.
95    batch: BatchVerifier,
96
97    /// A channel for broadcasting the result of a batch to the futures for each batch item.
98    ///
99    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
100    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
101    tx: Sender,
102}
103
104impl Default for Verifier {
105    fn default() -> Self {
106        let batch = BatchVerifier::default();
107        let (tx, _) = watch::channel(None);
108        Self { batch, tx }
109    }
110}
111
112impl Verifier {
113    /// Returns the batch verifier and channel sender from `self`,
114    /// replacing them with a new empty batch.
115    fn take(&mut self) -> (BatchVerifier, Sender) {
116        // Use a new verifier and channel for each batch.
117        let batch = mem::take(&mut self.batch);
118
119        let (tx, _) = watch::channel(None);
120        let tx = mem::replace(&mut self.tx, tx);
121
122        (batch, tx)
123    }
124
125    /// Synchronously process the batch, and send the result using the channel sender.
126    /// This function blocks until the batch is completed.
127    fn verify(batch: BatchVerifier, tx: Sender) {
128        let result = batch.verify(thread_rng());
129        let _ = tx.send(Some(result));
130    }
131
132    /// Flush the batch using a thread pool, and return the result via the channel.
133    /// This returns immediately, usually before the batch is completed.
134    fn flush_blocking(&mut self) {
135        let (batch, tx) = self.take();
136
137        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
138        //
139        // We don't care about execution order here, because this method is only called on drop.
140        tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
141    }
142
143    /// Flush the batch using a thread pool, and return the result via the channel.
144    /// This function returns a future that becomes ready when the batch is completed.
145    async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
146        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
147        let start = std::time::Instant::now();
148        let result = spawn_fifo(move || batch.verify(thread_rng())).await;
149        let duration = start.elapsed().as_secs_f64();
150
151        let result_label = match &result {
152            Ok(Ok(())) => "success",
153            _ => "failure",
154        };
155        metrics::histogram!(
156            "zebra.consensus.batch.duration_seconds",
157            "verifier" => "redjubjub",
158            "result" => result_label
159        )
160        .record(duration);
161
162        let _ = tx.send(result.ok());
163    }
164
165    /// Verify a single item using a thread pool, and return the result.
166    async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
167        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
168        spawn_fifo_and_convert(move || item.verify_single()).await
169    }
170}
171
172impl Service<BatchControl<Item>> for Verifier {
173    type Response = ();
174    type Error = BoxError;
175    type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
176
177    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178        Poll::Ready(Ok(()))
179    }
180
181    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
182        match req {
183            BatchControl::Item(item) => {
184                tracing::trace!("got item");
185                self.batch.queue(item.0);
186                let mut rx = self.tx.subscribe();
187
188                Box::pin(async move {
189                    match rx.changed().await {
190                        Ok(()) => {
191                            // We use a new channel for each batch,
192                            // so we always get the correct batch result here.
193                            let result = rx.borrow()
194                                .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
195
196                            if result.is_ok() {
197                                tracing::trace!(?result, "validated redjubjub signature");
198                                metrics::counter!("signatures.redjubjub.validated").increment(1);
199                            } else {
200                                tracing::trace!(?result, "invalid redjubjub signature");
201                                metrics::counter!("signatures.redjubjub.invalid").increment(1);
202                            }
203
204                            result.map_err(BoxError::from)
205                        }
206                        Err(_recv_error) => panic!("verifier was dropped without flushing"),
207                    }
208                })
209            }
210
211            BatchControl::Flush => {
212                tracing::trace!("got redjubjub flush command");
213
214                let (batch, tx) = self.take();
215
216                Box::pin(Self::flush_spawning(batch, tx).map(Ok))
217            }
218        }
219    }
220}
221
222impl Drop for Verifier {
223    fn drop(&mut self) {
224        // We need to flush the current batch in case there are still any pending futures.
225        // This returns immediately, usually before the batch is completed.
226        self.flush_blocking();
227    }
228}