1use std::{
4 cmp::min,
5 fmt,
6 io::{Cursor, Read, Write},
7};
8
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use bytes::{BufMut, BytesMut};
11use chrono::{TimeZone, Utc};
12use tokio_util::codec::{Decoder, Encoder};
13
14use zebra_chain::{
15 block::{self, Block},
16 parameters::{Magic, Network},
17 serialization::{
18 sha256d, zcash_deserialize_bytes_external_count, zcash_deserialize_external_count,
19 zcash_deserialize_string_external_count, CompactSizeMessage, FakeWriter, ReadZcashExt,
20 SerializationError as Error, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
21 MAX_HEADERS_PER_MESSAGE, MAX_PROTOCOL_MESSAGE_LEN,
22 },
23 transaction::Transaction,
24};
25
26use crate::constants;
27
28use super::{
29 addr::{AddrInVersion, AddrV1, AddrV2},
30 message::{
31 Message, RejectReason, VersionMessage, MAX_REJECT_MESSAGE_LENGTH, MAX_REJECT_REASON_LENGTH,
32 MAX_USER_AGENT_LENGTH,
33 },
34 types::*,
35};
36
37#[cfg(test)]
38mod tests;
39
40const HEADER_LEN: usize = 24usize;
42
43const MAX_HANDSHAKE_BODY_LEN: usize = 1024;
48
49pub struct Codec {
51 builder: Builder,
52 state: DecodeState,
53}
54
55pub struct Builder {
57 network: Network,
59 version: Version,
61 max_len: usize,
63 metrics_addr_label: Option<String>,
65}
66
67impl Codec {
68 pub fn builder() -> Builder {
70 Builder {
71 network: Network::Mainnet,
72 version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
73 max_len: MAX_HANDSHAKE_BODY_LEN,
74 metrics_addr_label: None,
75 }
76 }
77
78 pub fn reconfigure_version(&mut self, version: Version) {
80 self.builder.version = version;
81 }
82
83 pub fn reconfigure_full_body_len(&mut self) {
88 self.builder.max_len = MAX_PROTOCOL_MESSAGE_LEN;
89 }
90}
91
92impl Builder {
93 pub fn finish(self) -> Codec {
95 Codec {
96 builder: self,
97 state: DecodeState::Head,
98 }
99 }
100
101 pub fn for_network(mut self, network: &Network) -> Self {
103 self.network = network.clone();
104 self
105 }
106
107 #[allow(dead_code)]
109 pub fn for_version(mut self, version: Version) -> Self {
110 self.version = version;
111 self
112 }
113
114 #[allow(dead_code)]
116 pub fn with_max_body_len(mut self, len: usize) -> Self {
117 self.max_len = len;
118 self
119 }
120
121 pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self {
123 self.metrics_addr_label = Some(metrics_addr_label);
124 self
125 }
126}
127
128impl Encoder<Message> for Codec {
131 type Error = Error;
132
133 fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
134 use Error::Parse;
135
136 let body_length = self.body_length(&item);
137
138 if body_length > self.builder.max_len {
139 return Err(Parse("body length exceeded maximum size"));
140 }
141
142 if let Some(addr_label) = self.builder.metrics_addr_label.clone() {
143 metrics::counter!("zcash.net.out.bytes.total",
144 "addr" => addr_label)
145 .increment((body_length + HEADER_LEN) as u64);
146 }
147
148 use Message::*;
149 let command = match item {
155 Version { .. } => b"version\0\0\0\0\0",
156 Verack => b"verack\0\0\0\0\0\0",
157 Ping { .. } => b"ping\0\0\0\0\0\0\0\0",
158 Pong { .. } => b"pong\0\0\0\0\0\0\0\0",
159 Reject { .. } => b"reject\0\0\0\0\0\0",
160 Addr { .. } => b"addr\0\0\0\0\0\0\0\0",
161 GetAddr => b"getaddr\0\0\0\0\0",
162 Block { .. } => b"block\0\0\0\0\0\0\0",
163 GetBlocks { .. } => b"getblocks\0\0\0",
164 Headers { .. } => b"headers\0\0\0\0\0",
165 GetHeaders { .. } => b"getheaders\0\0",
166 Inv { .. } => b"inv\0\0\0\0\0\0\0\0\0",
167 GetData { .. } => b"getdata\0\0\0\0\0",
168 NotFound { .. } => b"notfound\0\0\0\0",
169 Tx { .. } => b"tx\0\0\0\0\0\0\0\0\0\0",
170 Mempool => b"mempool\0\0\0\0\0",
171 FilterLoad { .. } => b"filterload\0\0",
172 FilterAdd { .. } => b"filteradd\0\0\0",
173 FilterClear => b"filterclear\0",
174 };
175 trace!(?item, len = body_length);
176
177 dst.reserve(HEADER_LEN + body_length);
178 let start_len = dst.len();
179 {
180 let dst = &mut dst.writer();
181 dst.write_all(&self.builder.network.magic().0[..])?;
182 dst.write_all(command)?;
183 dst.write_u32::<LittleEndian>(body_length as u32)?;
184
185 dst.write_u32::<LittleEndian>(0)?;
188
189 self.write_body(&item, dst)?;
190 }
191 let checksum = sha256d::Checksum::from(&dst[start_len + HEADER_LEN..]);
192 dst[start_len + 20..][..4].copy_from_slice(&checksum.0);
193
194 Ok(())
195 }
196}
197
198impl Codec {
199 fn body_length(&self, msg: &Message) -> usize {
207 let mut writer = FakeWriter(0);
208
209 self.write_body(msg, &mut writer)
210 .expect("writer should never fail");
211 writer.0
212 }
213
214 fn write_body<W: Write>(&self, msg: &Message, mut writer: W) -> Result<(), Error> {
218 match msg {
219 Message::Version(VersionMessage {
220 version,
221 services,
222 timestamp,
223 address_recv,
224 address_from,
225 nonce,
226 user_agent,
227 start_height,
228 relay,
229 }) => {
230 writer.write_u32::<LittleEndian>(version.0)?;
231 writer.write_u64::<LittleEndian>(services.bits())?;
232 writer.write_i64::<LittleEndian>(timestamp.timestamp())?;
236
237 address_recv.zcash_serialize(&mut writer)?;
238 address_from.zcash_serialize(&mut writer)?;
239
240 writer.write_u64::<LittleEndian>(nonce.0)?;
241
242 if user_agent.len() > MAX_USER_AGENT_LENGTH {
243 return Err(Error::Parse(
245 "user agent too long: must be 256 bytes or less",
246 ));
247 }
248
249 user_agent.zcash_serialize(&mut writer)?;
250 writer.write_u32::<LittleEndian>(start_height.0)?;
251 writer.write_u8(*relay as u8)?;
252 }
253 Message::Verack => { }
254 Message::Ping(nonce) => {
255 writer.write_u64::<LittleEndian>(nonce.0)?;
256 }
257 Message::Pong(nonce) => {
258 writer.write_u64::<LittleEndian>(nonce.0)?;
259 }
260 Message::Reject {
261 message,
262 ccode,
263 reason,
264 data,
265 } => {
266 if message.len() > MAX_REJECT_MESSAGE_LENGTH {
267 return Err(Error::Parse(
269 "reject message too long: must be 12 bytes or less",
270 ));
271 }
272
273 message.zcash_serialize(&mut writer)?;
274
275 writer.write_u8(*ccode as u8)?;
276
277 if reason.len() > MAX_REJECT_REASON_LENGTH {
278 return Err(Error::Parse(
279 "reject reason too long: must be 111 bytes or less",
280 ));
281 }
282
283 reason.zcash_serialize(&mut writer)?;
284 if let Some(data) = data {
285 writer.write_all(data)?;
286 }
287 }
288 Message::Addr(addrs) => {
289 assert!(
290 addrs.len() <= constants::MAX_ADDRS_IN_MESSAGE,
291 "unexpectedly large Addr message: greater than MAX_ADDRS_IN_MESSAGE addresses"
292 );
293
294 let v1_addrs: Vec<AddrV1> = addrs.iter().map(|addr| AddrV1::from(*addr)).collect();
297 v1_addrs.zcash_serialize(&mut writer)?
298 }
299 Message::GetAddr => { }
300 Message::Block(block) => block.zcash_serialize(&mut writer)?,
301 Message::GetBlocks { known_blocks, stop } => {
302 writer.write_u32::<LittleEndian>(self.builder.version.0)?;
303 known_blocks.zcash_serialize(&mut writer)?;
304 stop.unwrap_or(block::Hash([0; 32]))
305 .zcash_serialize(&mut writer)?;
306 }
307 Message::GetHeaders { known_blocks, stop } => {
308 writer.write_u32::<LittleEndian>(self.builder.version.0)?;
309 known_blocks.zcash_serialize(&mut writer)?;
310 stop.unwrap_or(block::Hash([0; 32]))
311 .zcash_serialize(&mut writer)?;
312 }
313 Message::Headers(headers) => headers.zcash_serialize(&mut writer)?,
314 Message::Inv(hashes) => hashes.zcash_serialize(&mut writer)?,
315 Message::GetData(hashes) => hashes.zcash_serialize(&mut writer)?,
316 Message::NotFound(hashes) => hashes.zcash_serialize(&mut writer)?,
317 Message::Tx(transaction) => transaction.transaction.zcash_serialize(&mut writer)?,
318 Message::Mempool => { }
319 Message::FilterLoad {
320 filter,
321 hash_functions_count,
322 tweak,
323 flags,
324 } => {
325 writer.write_all(&filter.0)?;
326 writer.write_u32::<LittleEndian>(*hash_functions_count)?;
327 writer.write_u32::<LittleEndian>(tweak.0)?;
328 writer.write_u8(*flags)?;
329 }
330 Message::FilterAdd { data } => {
331 writer.write_all(data)?;
332 }
333 Message::FilterClear => { }
334 }
335 Ok(())
336 }
337}
338
339enum DecodeState {
342 Head,
343 Body {
344 body_len: usize,
345 command: [u8; 12],
346 checksum: sha256d::Checksum,
347 },
348}
349
350impl fmt::Debug for DecodeState {
351 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352 match self {
353 DecodeState::Head => write!(f, "DecodeState::Head"),
354 DecodeState::Body {
355 body_len,
356 command,
357 checksum,
358 } => f
359 .debug_struct("DecodeState::Body")
360 .field("body_len", &body_len)
361 .field("command", &String::from_utf8_lossy(command))
362 .field("checksum", &checksum)
363 .finish(),
364 }
365 }
366}
367
368impl Decoder for Codec {
369 type Item = Message;
370 type Error = Error;
371
372 #[allow(clippy::unwrap_in_result)]
373 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
374 use Error::Parse;
375 match self.state {
376 DecodeState::Head => {
377 if src.len() < HEADER_LEN {
379 trace!(?self.state, "src buffer does not have an entire header, waiting");
380 return Ok(None);
382 }
383
384 let header = src.split_to(HEADER_LEN);
386
387 let mut header_reader = Cursor::new(&header);
389 let magic = Magic(header_reader.read_4_bytes()?);
390 let command = header_reader.read_12_bytes()?;
391 let body_len = header_reader.read_u32::<LittleEndian>()? as usize;
392 let checksum = sha256d::Checksum(header_reader.read_4_bytes()?);
393 trace!(
394 ?self.state,
395 ?magic,
396 command = %String::from_utf8(
397 command.iter()
398 .cloned()
399 .flat_map(std::ascii::escape_default)
400 .collect()
401 ).unwrap(),
402 body_len,
403 ?checksum,
404 "read header from src buffer"
405 );
406
407 if magic != self.builder.network.magic() {
408 return Err(Parse("supplied magic did not meet expectations"));
409 }
410 if body_len > self.builder.max_len {
411 return Err(Parse("body length exceeded maximum size"));
412 }
413
414 if let Some(label) = self.builder.metrics_addr_label.clone() {
415 metrics::counter!("zcash.net.in.bytes.total", "addr" => label)
416 .increment((body_len + HEADER_LEN) as u64);
417 }
418
419 src.reserve(body_len + HEADER_LEN);
421
422 self.state = DecodeState::Body {
423 body_len,
424 command,
425 checksum,
426 };
427
428 self.decode(src)
430 }
431 DecodeState::Body {
432 body_len,
433 command,
434 checksum,
435 } => {
436 if src.len() < body_len {
437 trace!(?self.state, len = src.len(), "src buffer does not have an entire body, waiting");
439 return Ok(None);
440 }
441
442 let body = src.split_to(body_len);
446 self.state = DecodeState::Head;
447
448 if checksum != sha256d::Checksum::from(&body[..]) {
449 return Err(Parse(
450 "supplied message checksum does not match computed checksum",
451 ));
452 }
453
454 let mut body_reader = Cursor::new(&body);
455 match &command {
456 b"version\0\0\0\0\0" => self.read_version(&mut body_reader),
457 b"verack\0\0\0\0\0\0" => self.read_verack(&mut body_reader),
458 b"ping\0\0\0\0\0\0\0\0" => self.read_ping(&mut body_reader),
459 b"pong\0\0\0\0\0\0\0\0" => self.read_pong(&mut body_reader),
460 b"reject\0\0\0\0\0\0" => self.read_reject(&mut body_reader),
461 b"addr\0\0\0\0\0\0\0\0" => self.read_addr(&mut body_reader),
462 b"addrv2\0\0\0\0\0\0" => self.read_addrv2(&mut body_reader),
463 b"getaddr\0\0\0\0\0" => self.read_getaddr(&mut body_reader),
464 b"block\0\0\0\0\0\0\0" => self.read_block(&mut body_reader),
465 b"getblocks\0\0\0" => self.read_getblocks(&mut body_reader),
466 b"headers\0\0\0\0\0" => self.read_headers(&mut body_reader),
467 b"getheaders\0\0" => self.read_getheaders(&mut body_reader),
468 b"inv\0\0\0\0\0\0\0\0\0" => self.read_inv(&mut body_reader),
469 b"getdata\0\0\0\0\0" => self.read_getdata(&mut body_reader),
470 b"notfound\0\0\0\0" => self.read_notfound(&mut body_reader),
471 b"tx\0\0\0\0\0\0\0\0\0\0" => self.read_tx(&mut body_reader),
472 b"mempool\0\0\0\0\0" => self.read_mempool(&mut body_reader),
473 b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len),
474 b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len),
475 b"filterclear\0" => self.read_filterclear(&mut body_reader),
476 _ => {
477 let command_string = String::from_utf8_lossy(&command);
478
479 debug!(?command, %command_string, "unknown message command from peer");
487 return Ok(None);
488 }
489 }
490 .map(|msg| {
493 let extra_bytes = body.len() as u64 - body_reader.position();
497 if extra_bytes == 0 {
498 trace!(?extra_bytes, %msg, "finished message decoding");
499 } else {
500 debug!(?extra_bytes, %msg, "extra data after decoding message");
503 }
504 Some(msg)
505 })
506 }
507 }
508 }
509}
510
511impl Codec {
512 fn read_version<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
519 Ok(VersionMessage {
520 version: Version(reader.read_u32::<LittleEndian>()?),
521 services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
523 timestamp: Utc
524 .timestamp_opt(reader.read_i64::<LittleEndian>()?, 0)
525 .single()
526 .ok_or(Error::Parse(
527 "version timestamp is out of range for DateTime",
528 ))?,
529 address_recv: AddrInVersion::zcash_deserialize(&mut reader)?,
530 address_from: AddrInVersion::zcash_deserialize(&mut reader)?,
531 nonce: Nonce(reader.read_u64::<LittleEndian>()?),
532 user_agent: {
533 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
534 let byte_count: usize = byte_count.into();
535
536 if byte_count > MAX_USER_AGENT_LENGTH {
543 return Err(Error::Parse(
544 "user agent too long: must be 256 bytes or less",
545 ));
546 }
547
548 zcash_deserialize_string_external_count(byte_count, &mut reader)?
549 },
550 start_height: block::Height(reader.read_u32::<LittleEndian>()?),
551 relay: match reader.read_u8() {
552 Ok(val @ 0..=1) => val == 1,
553 Ok(_) => return Err(Error::Parse("non-bool value supplied in relay field")),
554 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => true,
555 Err(err) => Err(err)?,
556 },
557 }
558 .into())
559 }
560
561 fn read_verack<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
562 Ok(Message::Verack)
563 }
564
565 fn read_ping<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
566 Ok(Message::Ping(Nonce(reader.read_u64::<LittleEndian>()?)))
567 }
568
569 fn read_pong<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
570 Ok(Message::Pong(Nonce(reader.read_u64::<LittleEndian>()?)))
571 }
572
573 fn read_reject<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
574 Ok(Message::Reject {
575 message: {
576 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
577 let byte_count: usize = byte_count.into();
578
579 if byte_count > MAX_REJECT_MESSAGE_LENGTH {
583 return Err(Error::Parse(
584 "reject message too long: must be 12 bytes or less",
585 ));
586 }
587
588 zcash_deserialize_string_external_count(byte_count, &mut reader)?
589 },
590 ccode: match reader.read_u8()? {
591 0x01 => RejectReason::Malformed,
592 0x10 => RejectReason::Invalid,
593 0x11 => RejectReason::Obsolete,
594 0x12 => RejectReason::Duplicate,
595 0x40 => RejectReason::Nonstandard,
596 0x41 => RejectReason::Dust,
597 0x42 => RejectReason::InsufficientFee,
598 0x43 => RejectReason::Checkpoint,
599 0x50 => RejectReason::Other,
600 _ => return Err(Error::Parse("invalid RejectReason value in ccode field")),
601 },
602 reason: {
603 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
604 let byte_count: usize = byte_count.into();
605
606 if byte_count > MAX_REJECT_REASON_LENGTH {
610 return Err(Error::Parse(
611 "reject reason too long: must be 111 bytes or less",
612 ));
613 }
614
615 zcash_deserialize_string_external_count(byte_count, &mut reader)?
616 },
617 data: reader.read_32_bytes().ok(),
626 })
627 }
628
629 pub(super) fn read_addr<R: Read>(&self, reader: R) -> Result<Message, Error> {
631 let addrs: Vec<AddrV1> = reader.zcash_deserialize_into()?;
632
633 if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
634 return Err(Error::Parse(
635 "more than MAX_ADDRS_IN_MESSAGE in addr message",
636 ));
637 }
638
639 let addrs = addrs.into_iter().map(Into::into).collect();
641 Ok(Message::Addr(addrs))
642 }
643
644 pub(super) fn read_addrv2<R: Read>(&self, reader: R) -> Result<Message, Error> {
649 let addrs: Vec<AddrV2> = reader.zcash_deserialize_into()?;
650
651 if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
652 return Err(Error::Parse(
653 "more than MAX_ADDRS_IN_MESSAGE in addrv2 message",
654 ));
655 }
656
657 let addrs = addrs
660 .into_iter()
661 .filter_map(|addr| addr.try_into().ok())
662 .collect();
663 Ok(Message::Addr(addrs))
664 }
665
666 fn read_getaddr<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
667 Ok(Message::GetAddr)
668 }
669
670 fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
671 let result = Self::deserialize_block_spawning(reader);
672 Ok(Message::Block(result?.into()))
673 }
674
675 fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
676 if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
677 let known_blocks = Vec::zcash_deserialize(&mut reader)?;
678 let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
679 let stop = if stop_hash != block::Hash([0; 32]) {
680 Some(stop_hash)
681 } else {
682 None
683 };
684 Ok(Message::GetBlocks { known_blocks, stop })
685 } else {
686 Err(Error::Parse("getblocks version did not match negotiation"))
687 }
688 }
689
690 fn read_headers<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
696 let count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
698 let count: usize = count.into();
700 if count > MAX_HEADERS_PER_MESSAGE {
701 return Err(Error::Parse(
702 "headers message exceeds the protocol limit of 160 entries",
703 ));
704 }
705 Ok(Message::Headers(zcash_deserialize_external_count(
706 count,
707 &mut reader,
708 )?))
709 }
710
711 fn read_getheaders<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
712 if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
713 let known_blocks = Vec::zcash_deserialize(&mut reader)?;
714 let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
715 let stop = if stop_hash != block::Hash([0; 32]) {
716 Some(stop_hash)
717 } else {
718 None
719 };
720 Ok(Message::GetHeaders { known_blocks, stop })
721 } else {
722 Err(Error::Parse("getblocks version did not match negotiation"))
723 }
724 }
725
726 fn read_inv<R: Read>(&self, reader: R) -> Result<Message, Error> {
727 Ok(Message::Inv(Vec::zcash_deserialize(reader)?))
728 }
729
730 fn read_getdata<R: Read>(&self, reader: R) -> Result<Message, Error> {
731 Ok(Message::GetData(Vec::zcash_deserialize(reader)?))
732 }
733
734 fn read_notfound<R: Read>(&self, reader: R) -> Result<Message, Error> {
735 Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
736 }
737
738 fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
739 let result = Self::deserialize_transaction_spawning(reader);
740 Ok(Message::Tx(result?.into()))
741 }
742
743 fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
744 Ok(Message::Mempool)
745 }
746
747 fn read_filterload<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
748 const MAX_FILTERLOAD_FILTER_LENGTH: usize = 36000;
750
751 const FILTERLOAD_FIELDS_LENGTH: usize = 4 + 4 + 1;
754
755 const MAX_FILTERLOAD_MESSAGE_LENGTH: usize =
757 MAX_FILTERLOAD_FILTER_LENGTH + FILTERLOAD_FIELDS_LENGTH;
758
759 if !(FILTERLOAD_FIELDS_LENGTH..=MAX_FILTERLOAD_MESSAGE_LENGTH).contains(&body_len) {
760 return Err(Error::Parse("Invalid filterload message body length."));
761 }
762
763 let filter_length: usize = body_len - FILTERLOAD_FIELDS_LENGTH;
765 let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
766
767 Ok(Message::FilterLoad {
768 filter: Filter(filter_bytes),
769 hash_functions_count: reader.read_u32::<LittleEndian>()?,
770 tweak: Tweak(reader.read_u32::<LittleEndian>()?),
771 flags: reader.read_u8()?,
772 })
773 }
774
775 fn read_filteradd<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
776 const MAX_FILTERADD_LENGTH: usize = 520;
777
778 let filter_length: usize = min(body_len, MAX_FILTERADD_LENGTH);
780 let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
781
782 Ok(Message::FilterAdd { data: filter_bytes })
783 }
784
785 fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
786 Ok(Message::FilterClear)
787 }
788
789 #[allow(clippy::unwrap_in_result)]
791 fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
792 reader: R,
793 ) -> Result<Transaction, Error> {
794 let mut result = None;
795
796 tokio::task::block_in_place(|| {
805 rayon::in_place_scope_fifo(|s| {
806 s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
807 })
808 });
809
810 result.expect("scope has already finished")
811 }
812
813 #[allow(clippy::unwrap_in_result)]
815 fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
816 let mut result = None;
817
818 tokio::task::block_in_place(|| {
827 rayon::in_place_scope_fifo(|s| {
828 s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
829 })
830 });
831
832 result.expect("scope has already finished")
833 }
834}