zebra_consensus/primitives/
sapling.rs1use core::fmt;
4use std::{
5 future::Future,
6 mem,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use futures::{future::BoxFuture, FutureExt, TryFutureExt};
12use once_cell::sync::Lazy;
13use rand::thread_rng;
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use sapling_crypto::{bundle::Authorized, BatchValidator, Bundle};
20use zcash_protocol::value::ZatBalance;
21use zebra_chain::transaction::SigHash;
22
23use crate::groth16::SAPLING;
24
25#[derive(Clone)]
26pub struct Item {
27 bundle: Bundle<Authorized, ZatBalance>,
29 sighash: SigHash,
31}
32
33impl Item {
34 pub fn new(bundle: Bundle<Authorized, ZatBalance>, sighash: SigHash) -> Self {
36 Self { bundle, sighash }
37 }
38}
39
40impl RequestWeight for Item {}
41
42#[derive(Default)]
46pub struct Verifier {
47 batch: BatchValidator,
49
50 tx: watch::Sender<Option<bool>>,
55}
56
57impl fmt::Debug for Verifier {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("Verifier")
60 .field("batch", &"..")
61 .field("tx", &self.tx)
62 .finish()
63 }
64}
65
66impl Drop for Verifier {
67 fn drop(&mut self) {
72 let batch = mem::take(&mut self.batch);
73 let tx = mem::take(&mut self.tx);
74
75 rayon::spawn_fifo(move || {
77 let (spend_vk, output_vk) = SAPLING.verifying_keys();
78
79 let res = batch.validate(&spend_vk, &output_vk, thread_rng());
81 let _ = tx.send(Some(res));
82 });
83 }
84}
85
86impl Service<BatchControl<Item>> for Verifier {
87 type Response = ();
88 type Error = Box<dyn std::error::Error + Send + Sync>;
89 type Future = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
90
91 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92 Poll::Ready(Ok(()))
93 }
94
95 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
96 match req {
97 BatchControl::Item(item) => {
98 let mut rx = self.tx.subscribe();
99
100 let bundle_check = self
101 .batch
102 .check_bundle(item.bundle, item.sighash.into())
103 .then_some(())
104 .ok_or("invalid Sapling bundle");
105
106 async move {
107 bundle_check?;
108
109 rx.changed()
110 .await
111 .map_err(|_| "verifier was dropped without flushing")
112 .and_then(|_| {
113 rx.borrow()
116 .ok_or("threadpool unexpectedly dropped channel sender")?
117 .then(|| {
118 metrics::counter!("proofs.groth16.verified").increment(1);
119 })
120 .ok_or_else(|| {
121 metrics::counter!("proofs.groth16.invalid").increment(1);
122 "batch verification of Sapling shielded data failed"
123 })
124 })
125 .map_err(Self::Error::from)
126 }
127 .boxed()
128 }
129
130 BatchControl::Flush => {
131 let batch = mem::take(&mut self.batch);
132 let tx = mem::take(&mut self.tx);
133
134 async move {
135 let start = std::time::Instant::now();
136 let spawn_result = tokio::task::spawn_blocking(move || {
137 let (spend_vk, output_vk) = SAPLING.verifying_keys();
138 batch.validate(&spend_vk, &output_vk, thread_rng())
139 })
140 .await;
141 let duration = start.elapsed().as_secs_f64();
142
143 let result_label = match &spawn_result {
144 Ok(true) => "success",
145 _ => "failure",
146 };
147 metrics::histogram!(
148 "zebra.consensus.batch.duration_seconds",
149 "verifier" => "groth16_sapling",
150 "result" => result_label
151 )
152 .record(duration);
153
154 let is_valid = spawn_result.as_ref().ok().copied();
156 let _ = tx.send(is_valid);
157 spawn_result.map(|_| ()).map_err(Self::Error::from)
158 }
159 .boxed()
160 }
161 }
162 }
163}
164
165pub fn verify_single(
167 item: Item,
168) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
169 async move {
170 let mut verifier = Verifier::default();
171
172 let check = verifier
173 .batch
174 .check_bundle(item.bundle, item.sighash.into())
175 .then_some(())
176 .ok_or("invalid Sapling bundle");
177 check?;
178
179 tokio::task::spawn_blocking(move || {
180 let (spend_vk, output_vk) = SAPLING.verifying_keys();
181
182 mem::take(&mut verifier.batch).validate(&spend_vk, &output_vk, thread_rng())
183 })
184 .await
185 .map_err(|_| "Sapling bundle validation thread panicked")?
186 .then_some(())
187 .ok_or("invalid proof or sig in Sapling bundle")
188 }
189 .map_err(Box::from)
190 .boxed()
191}
192
193pub static VERIFIER: Lazy<
195 Fallback<
196 Batch<Verifier, Item>,
197 ServiceFn<
198 fn(Item) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error + Send + Sync>>>,
199 >,
200 >,
201> = Lazy::new(|| {
202 Fallback::new(
203 Batch::new(
204 Verifier::default(),
205 super::MAX_BATCH_SIZE,
206 None,
207 super::MAX_BATCH_LATENCY,
208 ),
209 tower::service_fn(verify_single),
210 )
211});