Skip to main content

zebra_consensus/primitives/
sapling.rs

1//! Async Sapling batch verifier service
2
3use core::fmt;
4use std::{
5    future::Future,
6    mem,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use futures::{future::BoxFuture, FutureExt, TryFutureExt};
12use once_cell::sync::Lazy;
13use rand::thread_rng;
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use sapling_crypto::{bundle::Authorized, BatchValidator, Bundle};
20use zcash_protocol::value::ZatBalance;
21use zebra_chain::transaction::SigHash;
22
23use crate::groth16::SAPLING;
24
25#[derive(Clone)]
26pub struct Item {
27    /// The bundle containing the Sapling shielded data to verify.
28    bundle: Bundle<Authorized, ZatBalance>,
29    /// The sighash of the transaction that contains the Sapling shielded data.
30    sighash: SigHash,
31}
32
33impl Item {
34    /// Creates a new [`Item`] from a Sapling bundle and sighash.
35    pub fn new(bundle: Bundle<Authorized, ZatBalance>, sighash: SigHash) -> Self {
36        Self { bundle, sighash }
37    }
38}
39
40impl RequestWeight for Item {}
41
42/// A service that verifies Sapling shielded data in batches.
43///
44/// Handles batching incoming requests, driving batches to completion, and reporting results.
45#[derive(Default)]
46pub struct Verifier {
47    /// A batch verifier for Sapling shielded data.
48    batch: BatchValidator,
49
50    /// A channel for broadcasting the verification result of the batch.
51    ///
52    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
53    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
54    tx: watch::Sender<Option<bool>>,
55}
56
57impl fmt::Debug for Verifier {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("Verifier")
60            .field("batch", &"..")
61            .field("tx", &self.tx)
62            .finish()
63    }
64}
65
66impl Drop for Verifier {
67    // Flush the current batch in case there are still any pending futures.
68    //
69    // Flushing the batch means we need to validate it. This function fires off the validation and
70    // returns immediately, usually before the validation finishes.
71    fn drop(&mut self) {
72        let batch = mem::take(&mut self.batch);
73        let tx = mem::take(&mut self.tx);
74
75        // The validation is CPU-intensive; do it on a dedicated thread so it does not block.
76        rayon::spawn_fifo(move || {
77            let (spend_vk, output_vk) = SAPLING.verifying_keys();
78
79            // Validate the batch and send the result through the channel.
80            let res = batch.validate(&spend_vk, &output_vk, thread_rng());
81            let _ = tx.send(Some(res));
82        });
83    }
84}
85
86impl Service<BatchControl<Item>> for Verifier {
87    type Response = ();
88    type Error = Box<dyn std::error::Error + Send + Sync>;
89    type Future = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
90
91    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92        Poll::Ready(Ok(()))
93    }
94
95    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
96        match req {
97            BatchControl::Item(item) => {
98                let mut rx = self.tx.subscribe();
99
100                let bundle_check = self
101                    .batch
102                    .check_bundle(item.bundle, item.sighash.into())
103                    .then_some(())
104                    .ok_or("invalid Sapling bundle");
105
106                async move {
107                    bundle_check?;
108
109                    rx.changed()
110                        .await
111                        .map_err(|_| "verifier was dropped without flushing")
112                        .and_then(|_| {
113                            // We use a new channel for each batch, so we always get the correct
114                            // batch result here.
115                            rx.borrow()
116                                .ok_or("threadpool unexpectedly dropped channel sender")?
117                                .then(|| {
118                                    metrics::counter!("proofs.groth16.verified").increment(1);
119                                })
120                                .ok_or_else(|| {
121                                    metrics::counter!("proofs.groth16.invalid").increment(1);
122                                    "batch verification of Sapling shielded data failed"
123                                })
124                        })
125                        .map_err(Self::Error::from)
126                }
127                .boxed()
128            }
129
130            BatchControl::Flush => {
131                let batch = mem::take(&mut self.batch);
132                let tx = mem::take(&mut self.tx);
133
134                async move {
135                    let start = std::time::Instant::now();
136                    let spawn_result = tokio::task::spawn_blocking(move || {
137                        let (spend_vk, output_vk) = SAPLING.verifying_keys();
138                        batch.validate(&spend_vk, &output_vk, thread_rng())
139                    })
140                    .await;
141                    let duration = start.elapsed().as_secs_f64();
142
143                    let result_label = match &spawn_result {
144                        Ok(true) => "success",
145                        _ => "failure",
146                    };
147                    metrics::histogram!(
148                        "zebra.consensus.batch.duration_seconds",
149                        "verifier" => "groth16_sapling",
150                        "result" => result_label
151                    )
152                    .record(duration);
153
154                    // Extract the value before consuming spawn_result
155                    let is_valid = spawn_result.as_ref().ok().copied();
156                    let _ = tx.send(is_valid);
157                    spawn_result.map(|_| ()).map_err(Self::Error::from)
158                }
159                .boxed()
160            }
161        }
162    }
163}
164
165/// Verifies a single [`Item`].
166pub fn verify_single(
167    item: Item,
168) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
169    async move {
170        let mut verifier = Verifier::default();
171
172        let check = verifier
173            .batch
174            .check_bundle(item.bundle, item.sighash.into())
175            .then_some(())
176            .ok_or("invalid Sapling bundle");
177        check?;
178
179        tokio::task::spawn_blocking(move || {
180            let (spend_vk, output_vk) = SAPLING.verifying_keys();
181
182            mem::take(&mut verifier.batch).validate(&spend_vk, &output_vk, thread_rng())
183        })
184        .await
185        .map_err(|_| "Sapling bundle validation thread panicked")?
186        .then_some(())
187        .ok_or("invalid proof or sig in Sapling bundle")
188    }
189    .map_err(Box::from)
190    .boxed()
191}
192
193/// Global batch verification context for Sapling shielded data.
194pub static VERIFIER: Lazy<
195    Fallback<
196        Batch<Verifier, Item>,
197        ServiceFn<
198            fn(Item) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error + Send + Sync>>>,
199        >,
200    >,
201> = Lazy::new(|| {
202    Fallback::new(
203        Batch::new(
204            Verifier::default(),
205            super::MAX_BATCH_SIZE,
206            None,
207            super::MAX_BATCH_LATENCY,
208        ),
209        tower::service_fn(verify_single),
210    )
211});