Skip to main content

zebra_network/protocol/external/
codec.rs

1//! A Tokio codec mapping byte streams to Bitcoin message streams.
2
3use std::{
4    cmp::min,
5    fmt,
6    io::{Cursor, Read, Write},
7};
8
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use bytes::{BufMut, BytesMut};
11use chrono::{TimeZone, Utc};
12use tokio_util::codec::{Decoder, Encoder};
13
14use zebra_chain::{
15    block::{self, Block},
16    parameters::{Magic, Network},
17    serialization::{
18        sha256d, zcash_deserialize_bytes_external_count, zcash_deserialize_external_count,
19        zcash_deserialize_string_external_count, CompactSizeMessage, FakeWriter, ReadZcashExt,
20        SerializationError as Error, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
21        MAX_HEADERS_PER_MESSAGE, MAX_PROTOCOL_MESSAGE_LEN,
22    },
23    transaction::Transaction,
24};
25
26use crate::constants;
27
28use super::{
29    addr::{AddrInVersion, AddrV1, AddrV2},
30    message::{
31        Message, RejectReason, VersionMessage, MAX_REJECT_MESSAGE_LENGTH, MAX_REJECT_REASON_LENGTH,
32        MAX_USER_AGENT_LENGTH,
33    },
34    types::*,
35};
36
37#[cfg(test)]
38mod tests;
39
40/// The length of a Bitcoin message header.
41const HEADER_LEN: usize = 24usize;
42
43/// The maximum body length allowed before the handshake completes.
44///
45/// Version messages are ~344 bytes max (including a 256-byte user agent);
46/// verack is 0 bytes. 1 KB provides headroom for future protocol changes.
47const MAX_HANDSHAKE_BODY_LEN: usize = 1024;
48
49/// A codec which produces Bitcoin messages from byte streams and vice versa.
50pub struct Codec {
51    builder: Builder,
52    state: DecodeState,
53}
54
55/// A builder for specifying [`Codec`] options.
56pub struct Builder {
57    /// The network magic to use in encoding.
58    network: Network,
59    /// The protocol version to speak when encoding/decoding.
60    version: Version,
61    /// The maximum allowable message length.
62    max_len: usize,
63    /// An optional address label, to use for reporting metrics.
64    metrics_addr_label: Option<String>,
65}
66
67impl Codec {
68    /// Return a builder for constructing a [`Codec`].
69    pub fn builder() -> Builder {
70        Builder {
71            network: Network::Mainnet,
72            version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
73            max_len: MAX_HANDSHAKE_BODY_LEN,
74            metrics_addr_label: None,
75        }
76    }
77
78    /// Reconfigure the version used by the codec, e.g., after completing a handshake.
79    pub fn reconfigure_version(&mut self, version: Version) {
80        self.builder.version = version;
81    }
82
83    /// Raise the maximum accepted body length to the full protocol limit.
84    ///
85    /// Called after a successful handshake so post-handshake messages (blocks,
86    /// transactions) can use the full `MAX_PROTOCOL_MESSAGE_LEN`.
87    pub fn reconfigure_full_body_len(&mut self) {
88        self.builder.max_len = MAX_PROTOCOL_MESSAGE_LEN;
89    }
90}
91
92impl Builder {
93    /// Finalize the builder and return a [`Codec`].
94    pub fn finish(self) -> Codec {
95        Codec {
96            builder: self,
97            state: DecodeState::Head,
98        }
99    }
100
101    /// Configure the codec for the given [`Network`].
102    pub fn for_network(mut self, network: &Network) -> Self {
103        self.network = network.clone();
104        self
105    }
106
107    /// Configure the codec for the given [`Version`].
108    #[allow(dead_code)]
109    pub fn for_version(mut self, version: Version) -> Self {
110        self.version = version;
111        self
112    }
113
114    /// Configure the codec's maximum accepted payload size, in bytes.
115    #[allow(dead_code)]
116    pub fn with_max_body_len(mut self, len: usize) -> Self {
117        self.max_len = len;
118        self
119    }
120
121    /// Configure the codec with a label corresponding to the peer address.
122    pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self {
123        self.metrics_addr_label = Some(metrics_addr_label);
124        self
125    }
126}
127
128// ======== Encoding =========
129
130impl Encoder<Message> for Codec {
131    type Error = Error;
132
133    fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
134        use Error::Parse;
135
136        let body_length = self.body_length(&item);
137
138        if body_length > self.builder.max_len {
139            return Err(Parse("body length exceeded maximum size"));
140        }
141
142        if let Some(addr_label) = self.builder.metrics_addr_label.clone() {
143            metrics::counter!("zcash.net.out.bytes.total",
144                              "addr" => addr_label)
145            .increment((body_length + HEADER_LEN) as u64);
146        }
147
148        use Message::*;
149        // Note: because all match arms must have
150        // the same type, and the array length is
151        // part of the type, having at least one
152        // of length 12 checks that they are all
153        // of length 12, as they must be &[u8; 12].
154        let command = match item {
155            Version { .. } => b"version\0\0\0\0\0",
156            Verack => b"verack\0\0\0\0\0\0",
157            Ping { .. } => b"ping\0\0\0\0\0\0\0\0",
158            Pong { .. } => b"pong\0\0\0\0\0\0\0\0",
159            Reject { .. } => b"reject\0\0\0\0\0\0",
160            Addr { .. } => b"addr\0\0\0\0\0\0\0\0",
161            GetAddr => b"getaddr\0\0\0\0\0",
162            Block { .. } => b"block\0\0\0\0\0\0\0",
163            GetBlocks { .. } => b"getblocks\0\0\0",
164            Headers { .. } => b"headers\0\0\0\0\0",
165            GetHeaders { .. } => b"getheaders\0\0",
166            Inv { .. } => b"inv\0\0\0\0\0\0\0\0\0",
167            GetData { .. } => b"getdata\0\0\0\0\0",
168            NotFound { .. } => b"notfound\0\0\0\0",
169            Tx { .. } => b"tx\0\0\0\0\0\0\0\0\0\0",
170            Mempool => b"mempool\0\0\0\0\0",
171            FilterLoad { .. } => b"filterload\0\0",
172            FilterAdd { .. } => b"filteradd\0\0\0",
173            FilterClear => b"filterclear\0",
174        };
175        trace!(?item, len = body_length);
176
177        dst.reserve(HEADER_LEN + body_length);
178        let start_len = dst.len();
179        {
180            let dst = &mut dst.writer();
181            dst.write_all(&self.builder.network.magic().0[..])?;
182            dst.write_all(command)?;
183            dst.write_u32::<LittleEndian>(body_length as u32)?;
184
185            // We zero the checksum at first, and compute it later
186            // after the body has been written.
187            dst.write_u32::<LittleEndian>(0)?;
188
189            self.write_body(&item, dst)?;
190        }
191        let checksum = sha256d::Checksum::from(&dst[start_len + HEADER_LEN..]);
192        dst[start_len + 20..][..4].copy_from_slice(&checksum.0);
193
194        Ok(())
195    }
196}
197
198impl Codec {
199    /// Obtain the size of the body of a given message. This will match the
200    /// number of bytes written to the writer provided to `write_body` for the
201    /// same message.
202    // # Performance TODO
203    //
204    // If this code shows up in profiles, replace with a size estimate or cached size,
205    // to avoid multiple serializations for large data structures like lists, blocks, and transactions.
206    fn body_length(&self, msg: &Message) -> usize {
207        let mut writer = FakeWriter(0);
208
209        self.write_body(msg, &mut writer)
210            .expect("writer should never fail");
211        writer.0
212    }
213
214    /// Write the body of the message into the given writer. This allows writing
215    /// the message body prior to writing the header, so that the header can
216    /// contain a checksum of the message body.
217    fn write_body<W: Write>(&self, msg: &Message, mut writer: W) -> Result<(), Error> {
218        match msg {
219            Message::Version(VersionMessage {
220                version,
221                services,
222                timestamp,
223                address_recv,
224                address_from,
225                nonce,
226                user_agent,
227                start_height,
228                relay,
229            }) => {
230                writer.write_u32::<LittleEndian>(version.0)?;
231                writer.write_u64::<LittleEndian>(services.bits())?;
232                // # Security
233                // DateTime<Utc>::timestamp has a smaller range than i64, so
234                // serialization can not error.
235                writer.write_i64::<LittleEndian>(timestamp.timestamp())?;
236
237                address_recv.zcash_serialize(&mut writer)?;
238                address_from.zcash_serialize(&mut writer)?;
239
240                writer.write_u64::<LittleEndian>(nonce.0)?;
241
242                if user_agent.len() > MAX_USER_AGENT_LENGTH {
243                    // zcashd won't accept this version message
244                    return Err(Error::Parse(
245                        "user agent too long: must be 256 bytes or less",
246                    ));
247                }
248
249                user_agent.zcash_serialize(&mut writer)?;
250                writer.write_u32::<LittleEndian>(start_height.0)?;
251                writer.write_u8(*relay as u8)?;
252            }
253            Message::Verack => { /* Empty payload -- no-op */ }
254            Message::Ping(nonce) => {
255                writer.write_u64::<LittleEndian>(nonce.0)?;
256            }
257            Message::Pong(nonce) => {
258                writer.write_u64::<LittleEndian>(nonce.0)?;
259            }
260            Message::Reject {
261                message,
262                ccode,
263                reason,
264                data,
265            } => {
266                if message.len() > MAX_REJECT_MESSAGE_LENGTH {
267                    // zcashd won't accept this reject message
268                    return Err(Error::Parse(
269                        "reject message too long: must be 12 bytes or less",
270                    ));
271                }
272
273                message.zcash_serialize(&mut writer)?;
274
275                writer.write_u8(*ccode as u8)?;
276
277                if reason.len() > MAX_REJECT_REASON_LENGTH {
278                    return Err(Error::Parse(
279                        "reject reason too long: must be 111 bytes or less",
280                    ));
281                }
282
283                reason.zcash_serialize(&mut writer)?;
284                if let Some(data) = data {
285                    writer.write_all(data)?;
286                }
287            }
288            Message::Addr(addrs) => {
289                assert!(
290                    addrs.len() <= constants::MAX_ADDRS_IN_MESSAGE,
291                    "unexpectedly large Addr message: greater than MAX_ADDRS_IN_MESSAGE addresses"
292                );
293
294                // Regardless of the way we received the address,
295                // Zebra always sends `addr` messages
296                let v1_addrs: Vec<AddrV1> = addrs.iter().map(|addr| AddrV1::from(*addr)).collect();
297                v1_addrs.zcash_serialize(&mut writer)?
298            }
299            Message::GetAddr => { /* Empty payload -- no-op */ }
300            Message::Block(block) => block.zcash_serialize(&mut writer)?,
301            Message::GetBlocks { known_blocks, stop } => {
302                writer.write_u32::<LittleEndian>(self.builder.version.0)?;
303                known_blocks.zcash_serialize(&mut writer)?;
304                stop.unwrap_or(block::Hash([0; 32]))
305                    .zcash_serialize(&mut writer)?;
306            }
307            Message::GetHeaders { known_blocks, stop } => {
308                writer.write_u32::<LittleEndian>(self.builder.version.0)?;
309                known_blocks.zcash_serialize(&mut writer)?;
310                stop.unwrap_or(block::Hash([0; 32]))
311                    .zcash_serialize(&mut writer)?;
312            }
313            Message::Headers(headers) => headers.zcash_serialize(&mut writer)?,
314            Message::Inv(hashes) => hashes.zcash_serialize(&mut writer)?,
315            Message::GetData(hashes) => hashes.zcash_serialize(&mut writer)?,
316            Message::NotFound(hashes) => hashes.zcash_serialize(&mut writer)?,
317            Message::Tx(transaction) => transaction.transaction.zcash_serialize(&mut writer)?,
318            Message::Mempool => { /* Empty payload -- no-op */ }
319            Message::FilterLoad {
320                filter,
321                hash_functions_count,
322                tweak,
323                flags,
324            } => {
325                writer.write_all(&filter.0)?;
326                writer.write_u32::<LittleEndian>(*hash_functions_count)?;
327                writer.write_u32::<LittleEndian>(tweak.0)?;
328                writer.write_u8(*flags)?;
329            }
330            Message::FilterAdd { data } => {
331                writer.write_all(data)?;
332            }
333            Message::FilterClear => { /* Empty payload -- no-op */ }
334        }
335        Ok(())
336    }
337}
338
339// ======== Decoding =========
340
341enum DecodeState {
342    Head,
343    Body {
344        body_len: usize,
345        command: [u8; 12],
346        checksum: sha256d::Checksum,
347    },
348}
349
350impl fmt::Debug for DecodeState {
351    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352        match self {
353            DecodeState::Head => write!(f, "DecodeState::Head"),
354            DecodeState::Body {
355                body_len,
356                command,
357                checksum,
358            } => f
359                .debug_struct("DecodeState::Body")
360                .field("body_len", &body_len)
361                .field("command", &String::from_utf8_lossy(command))
362                .field("checksum", &checksum)
363                .finish(),
364        }
365    }
366}
367
368impl Decoder for Codec {
369    type Item = Message;
370    type Error = Error;
371
372    #[allow(clippy::unwrap_in_result)]
373    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
374        use Error::Parse;
375        match self.state {
376            DecodeState::Head => {
377                // First check that the src buffer contains an entire header.
378                if src.len() < HEADER_LEN {
379                    trace!(?self.state, "src buffer does not have an entire header, waiting");
380                    // Signal that decoding requires more data.
381                    return Ok(None);
382                }
383
384                // Now that we know that src contains a header, split off the header section.
385                let header = src.split_to(HEADER_LEN);
386
387                // Create a cursor over the header and parse its fields.
388                let mut header_reader = Cursor::new(&header);
389                let magic = Magic(header_reader.read_4_bytes()?);
390                let command = header_reader.read_12_bytes()?;
391                let body_len = header_reader.read_u32::<LittleEndian>()? as usize;
392                let checksum = sha256d::Checksum(header_reader.read_4_bytes()?);
393                trace!(
394                    ?self.state,
395                    ?magic,
396                    command = %String::from_utf8(
397                        command.iter()
398                            .cloned()
399                            .flat_map(std::ascii::escape_default)
400                            .collect()
401                    ).unwrap(),
402                    body_len,
403                    ?checksum,
404                    "read header from src buffer"
405                );
406
407                if magic != self.builder.network.magic() {
408                    return Err(Parse("supplied magic did not meet expectations"));
409                }
410                if body_len > self.builder.max_len {
411                    return Err(Parse("body length exceeded maximum size"));
412                }
413
414                if let Some(label) = self.builder.metrics_addr_label.clone() {
415                    metrics::counter!("zcash.net.in.bytes.total", "addr" =>  label)
416                        .increment((body_len + HEADER_LEN) as u64);
417                }
418
419                // Reserve buffer space for the expected body and the following header.
420                src.reserve(body_len + HEADER_LEN);
421
422                self.state = DecodeState::Body {
423                    body_len,
424                    command,
425                    checksum,
426                };
427
428                // Now that the state is updated, recurse to attempt body decoding.
429                self.decode(src)
430            }
431            DecodeState::Body {
432                body_len,
433                command,
434                checksum,
435            } => {
436                if src.len() < body_len {
437                    // Need to wait for the full body
438                    trace!(?self.state, len = src.len(), "src buffer does not have an entire body, waiting");
439                    return Ok(None);
440                }
441
442                // Now that we know we have the full body, split off the body,
443                // and reset the decoder state for the next message. Otherwise
444                // we will attempt to read the next header as the current body.
445                let body = src.split_to(body_len);
446                self.state = DecodeState::Head;
447
448                if checksum != sha256d::Checksum::from(&body[..]) {
449                    return Err(Parse(
450                        "supplied message checksum does not match computed checksum",
451                    ));
452                }
453
454                let mut body_reader = Cursor::new(&body);
455                match &command {
456                    b"version\0\0\0\0\0" => self.read_version(&mut body_reader),
457                    b"verack\0\0\0\0\0\0" => self.read_verack(&mut body_reader),
458                    b"ping\0\0\0\0\0\0\0\0" => self.read_ping(&mut body_reader),
459                    b"pong\0\0\0\0\0\0\0\0" => self.read_pong(&mut body_reader),
460                    b"reject\0\0\0\0\0\0" => self.read_reject(&mut body_reader),
461                    b"addr\0\0\0\0\0\0\0\0" => self.read_addr(&mut body_reader),
462                    b"addrv2\0\0\0\0\0\0" => self.read_addrv2(&mut body_reader),
463                    b"getaddr\0\0\0\0\0" => self.read_getaddr(&mut body_reader),
464                    b"block\0\0\0\0\0\0\0" => self.read_block(&mut body_reader),
465                    b"getblocks\0\0\0" => self.read_getblocks(&mut body_reader),
466                    b"headers\0\0\0\0\0" => self.read_headers(&mut body_reader),
467                    b"getheaders\0\0" => self.read_getheaders(&mut body_reader),
468                    b"inv\0\0\0\0\0\0\0\0\0" => self.read_inv(&mut body_reader),
469                    b"getdata\0\0\0\0\0" => self.read_getdata(&mut body_reader),
470                    b"notfound\0\0\0\0" => self.read_notfound(&mut body_reader),
471                    b"tx\0\0\0\0\0\0\0\0\0\0" => self.read_tx(&mut body_reader),
472                    b"mempool\0\0\0\0\0" => self.read_mempool(&mut body_reader),
473                    b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len),
474                    b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len),
475                    b"filterclear\0" => self.read_filterclear(&mut body_reader),
476                    _ => {
477                        let command_string = String::from_utf8_lossy(&command);
478
479                        // # Security
480                        //
481                        // Zcash connections are not authenticated, so malicious nodes can
482                        // send fake messages, with connected peers' IP addresses in the IP header.
483                        //
484                        // Since we can't verify their source, Zebra needs to ignore unexpected messages,
485                        // because closing the connection could cause a denial of service or eclipse attack.
486                        debug!(?command, %command_string, "unknown message command from peer");
487                        return Ok(None);
488                    }
489                }
490                // We need Ok(Some(msg)) to signal that we're done decoding.
491                // This is also convenient for tracing the parse result.
492                .map(|msg| {
493                    // bitcoin allows extra data at the end of most messages,
494                    // so that old nodes can still read newer message formats,
495                    // and ignore any extra fields
496                    let extra_bytes = body.len() as u64 - body_reader.position();
497                    if extra_bytes == 0 {
498                        trace!(?extra_bytes, %msg, "finished message decoding");
499                    } else {
500                        // log when there are extra bytes, so we know when we need to
501                        // upgrade message formats
502                        debug!(?extra_bytes, %msg, "extra data after decoding message");
503                    }
504                    Some(msg)
505                })
506            }
507        }
508    }
509}
510
511impl Codec {
512    /// Deserializes a version message.
513    ///
514    /// The `relay` field is optional, as defined in <https://developer.bitcoin.org/reference/p2p_networking.html#version>
515    ///
516    /// Note: zcashd only requires fields up to `address_recv`, but everything up to `relay` is required in Zebra.
517    ///       see <https://github.com/zcash/zcash/blob/11d563904933e889a11d9685c3b249f1536cfbe7/src/main.cpp#L6490-L6507>
518    fn read_version<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
519        Ok(VersionMessage {
520            version: Version(reader.read_u32::<LittleEndian>()?),
521            // Use from_bits_truncate to discard unknown service bits.
522            services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
523            timestamp: Utc
524                .timestamp_opt(reader.read_i64::<LittleEndian>()?, 0)
525                .single()
526                .ok_or(Error::Parse(
527                    "version timestamp is out of range for DateTime",
528                ))?,
529            address_recv: AddrInVersion::zcash_deserialize(&mut reader)?,
530            address_from: AddrInVersion::zcash_deserialize(&mut reader)?,
531            nonce: Nonce(reader.read_u64::<LittleEndian>()?),
532            user_agent: {
533                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
534                let byte_count: usize = byte_count.into();
535
536                // # Security
537                //
538                // Limit peer set memory usage, Zebra stores an `Arc<VersionMessage>` per
539                // connected peer.
540                //
541                // Without this check, we can use `200 peers * 2 MB message size limit = 400 MB`.
542                if byte_count > MAX_USER_AGENT_LENGTH {
543                    return Err(Error::Parse(
544                        "user agent too long: must be 256 bytes or less",
545                    ));
546                }
547
548                zcash_deserialize_string_external_count(byte_count, &mut reader)?
549            },
550            start_height: block::Height(reader.read_u32::<LittleEndian>()?),
551            relay: match reader.read_u8() {
552                Ok(val @ 0..=1) => val == 1,
553                Ok(_) => return Err(Error::Parse("non-bool value supplied in relay field")),
554                Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => true,
555                Err(err) => Err(err)?,
556            },
557        }
558        .into())
559    }
560
561    fn read_verack<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
562        Ok(Message::Verack)
563    }
564
565    fn read_ping<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
566        Ok(Message::Ping(Nonce(reader.read_u64::<LittleEndian>()?)))
567    }
568
569    fn read_pong<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
570        Ok(Message::Pong(Nonce(reader.read_u64::<LittleEndian>()?)))
571    }
572
573    fn read_reject<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
574        Ok(Message::Reject {
575            message: {
576                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
577                let byte_count: usize = byte_count.into();
578
579                // # Security
580                //
581                // Limit log size on disk, Zebra might print large reject messages to disk.
582                if byte_count > MAX_REJECT_MESSAGE_LENGTH {
583                    return Err(Error::Parse(
584                        "reject message too long: must be 12 bytes or less",
585                    ));
586                }
587
588                zcash_deserialize_string_external_count(byte_count, &mut reader)?
589            },
590            ccode: match reader.read_u8()? {
591                0x01 => RejectReason::Malformed,
592                0x10 => RejectReason::Invalid,
593                0x11 => RejectReason::Obsolete,
594                0x12 => RejectReason::Duplicate,
595                0x40 => RejectReason::Nonstandard,
596                0x41 => RejectReason::Dust,
597                0x42 => RejectReason::InsufficientFee,
598                0x43 => RejectReason::Checkpoint,
599                0x50 => RejectReason::Other,
600                _ => return Err(Error::Parse("invalid RejectReason value in ccode field")),
601            },
602            reason: {
603                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
604                let byte_count: usize = byte_count.into();
605
606                // # Security
607                //
608                // Limit log size on disk, Zebra might print large reject messages to disk.
609                if byte_count > MAX_REJECT_REASON_LENGTH {
610                    return Err(Error::Parse(
611                        "reject reason too long: must be 111 bytes or less",
612                    ));
613                }
614
615                zcash_deserialize_string_external_count(byte_count, &mut reader)?
616            },
617            // Sometimes there's data, sometimes there isn't. There's no length
618            // field, this is just implicitly encoded by the body_len.
619            // Apparently all existing implementations only supply 32 bytes of
620            // data (hash identifying the rejected object) or none (and we model
621            // the Reject message that way), so instead of passing in the
622            // body_len separately and calculating remaining bytes, just try to
623            // read 32 bytes and ignore any failures. (The caller will log and
624            // ignore any trailing bytes.)
625            data: reader.read_32_bytes().ok(),
626        })
627    }
628
629    /// Deserialize an `addr` (v1) message into a list of `MetaAddr`s.
630    pub(super) fn read_addr<R: Read>(&self, reader: R) -> Result<Message, Error> {
631        let addrs: Vec<AddrV1> = reader.zcash_deserialize_into()?;
632
633        if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
634            return Err(Error::Parse(
635                "more than MAX_ADDRS_IN_MESSAGE in addr message",
636            ));
637        }
638
639        // Convert the received address format to Zebra's internal `MetaAddr`.
640        let addrs = addrs.into_iter().map(Into::into).collect();
641        Ok(Message::Addr(addrs))
642    }
643
644    /// Deserialize an `addrv2` message into a list of `MetaAddr`s.
645    ///
646    /// Currently, Zebra parses received `addrv2`s, ignoring some address types.
647    /// Zebra never sends `addrv2` messages.
648    pub(super) fn read_addrv2<R: Read>(&self, reader: R) -> Result<Message, Error> {
649        let addrs: Vec<AddrV2> = reader.zcash_deserialize_into()?;
650
651        if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
652            return Err(Error::Parse(
653                "more than MAX_ADDRS_IN_MESSAGE in addrv2 message",
654            ));
655        }
656
657        // Convert the received address format to Zebra's internal `MetaAddr`,
658        // ignoring unsupported network IDs.
659        let addrs = addrs
660            .into_iter()
661            .filter_map(|addr| addr.try_into().ok())
662            .collect();
663        Ok(Message::Addr(addrs))
664    }
665
666    fn read_getaddr<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
667        Ok(Message::GetAddr)
668    }
669
670    fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
671        let result = Self::deserialize_block_spawning(reader);
672        Ok(Message::Block(result?.into()))
673    }
674
675    fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
676        if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
677            let known_blocks = Vec::zcash_deserialize(&mut reader)?;
678            let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
679            let stop = if stop_hash != block::Hash([0; 32]) {
680                Some(stop_hash)
681            } else {
682                None
683            };
684            Ok(Message::GetBlocks { known_blocks, stop })
685        } else {
686            Err(Error::Parse("getblocks version did not match negotiation"))
687        }
688    }
689
690    /// Deserialize a `headers` message.
691    ///
692    /// See [Zcash block header] for the enumeration of these fields.
693    ///
694    /// [Zcash block header](https://zips.z.cash/protocol/protocol.pdf#page=84)
695    fn read_headers<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
696        // CompactSizeMessage is bounded to MAX_PROTOCOL_MESSAGE_LEN on deserialization.
697        let count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
698        // Infallible: CompactSizeMessage wraps u32, which always fits in usize.
699        let count: usize = count.into();
700        if count > MAX_HEADERS_PER_MESSAGE {
701            return Err(Error::Parse(
702                "headers message exceeds the protocol limit of 160 entries",
703            ));
704        }
705        Ok(Message::Headers(zcash_deserialize_external_count(
706            count,
707            &mut reader,
708        )?))
709    }
710
711    fn read_getheaders<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
712        if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
713            let known_blocks = Vec::zcash_deserialize(&mut reader)?;
714            let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
715            let stop = if stop_hash != block::Hash([0; 32]) {
716                Some(stop_hash)
717            } else {
718                None
719            };
720            Ok(Message::GetHeaders { known_blocks, stop })
721        } else {
722            Err(Error::Parse("getblocks version did not match negotiation"))
723        }
724    }
725
726    fn read_inv<R: Read>(&self, reader: R) -> Result<Message, Error> {
727        Ok(Message::Inv(Vec::zcash_deserialize(reader)?))
728    }
729
730    fn read_getdata<R: Read>(&self, reader: R) -> Result<Message, Error> {
731        Ok(Message::GetData(Vec::zcash_deserialize(reader)?))
732    }
733
734    fn read_notfound<R: Read>(&self, reader: R) -> Result<Message, Error> {
735        Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
736    }
737
738    fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
739        let result = Self::deserialize_transaction_spawning(reader);
740        Ok(Message::Tx(result?.into()))
741    }
742
743    fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
744        Ok(Message::Mempool)
745    }
746
747    fn read_filterload<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
748        // The maximum length of a filter.
749        const MAX_FILTERLOAD_FILTER_LENGTH: usize = 36000;
750
751        // The data length of the fields:
752        // hash_functions_count + tweak + flags.
753        const FILTERLOAD_FIELDS_LENGTH: usize = 4 + 4 + 1;
754
755        // The maximum length of a filter message's data.
756        const MAX_FILTERLOAD_MESSAGE_LENGTH: usize =
757            MAX_FILTERLOAD_FILTER_LENGTH + FILTERLOAD_FIELDS_LENGTH;
758
759        if !(FILTERLOAD_FIELDS_LENGTH..=MAX_FILTERLOAD_MESSAGE_LENGTH).contains(&body_len) {
760            return Err(Error::Parse("Invalid filterload message body length."));
761        }
762
763        // Memory Denial of Service: we just checked the untrusted parsed length
764        let filter_length: usize = body_len - FILTERLOAD_FIELDS_LENGTH;
765        let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
766
767        Ok(Message::FilterLoad {
768            filter: Filter(filter_bytes),
769            hash_functions_count: reader.read_u32::<LittleEndian>()?,
770            tweak: Tweak(reader.read_u32::<LittleEndian>()?),
771            flags: reader.read_u8()?,
772        })
773    }
774
775    fn read_filteradd<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
776        const MAX_FILTERADD_LENGTH: usize = 520;
777
778        // Memory Denial of Service: limit the untrusted parsed length
779        let filter_length: usize = min(body_len, MAX_FILTERADD_LENGTH);
780        let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
781
782        Ok(Message::FilterAdd { data: filter_bytes })
783    }
784
785    fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
786        Ok(Message::FilterClear)
787    }
788
789    /// Given the reader, deserialize the transaction in the rayon thread pool.
790    #[allow(clippy::unwrap_in_result)]
791    fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
792        reader: R,
793    ) -> Result<Transaction, Error> {
794        let mut result = None;
795
796        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
797        //
798        // Since we use `block_in_place()`, other futures running on the connection task will be blocked:
799        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
800        //
801        // We can't use `spawn_blocking()` because:
802        // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
803        // - There is no way to check the blocking task's future for panics
804        tokio::task::block_in_place(|| {
805            rayon::in_place_scope_fifo(|s| {
806                s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
807            })
808        });
809
810        result.expect("scope has already finished")
811    }
812
813    /// Given the reader, deserialize the block in the rayon thread pool.
814    #[allow(clippy::unwrap_in_result)]
815    fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
816        let mut result = None;
817
818        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
819        //
820        // Since we use `block_in_place()`, other futures running on the connection task will be blocked:
821        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
822        //
823        // We can't use `spawn_blocking()` because:
824        // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
825        // - There is no way to check the blocking task's future for panics
826        tokio::task::block_in_place(|| {
827            rayon::in_place_scope_fifo(|s| {
828                s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
829            })
830        });
831
832        result.expect("scope has already finished")
833    }
834}