Skip to main content

zebra_network/peer/
connection.rs

1//! Zebra's per-peer connection state machine.
2//!
3//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
4//! protocol.
5//!
6//! This module contains a lot of undocumented state, assumptions and invariants.
7//! And it's unclear if these assumptions match the `zcashd` implementation.
8//! It should be refactored into a cleaner set of request/response pairs (#1515).
9
10use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};
11
12use futures::{future::Either, prelude::*};
13use rand::{seq::SliceRandom, thread_rng, Rng};
14use tokio::time::{sleep, Sleep};
15use tower::{Service, ServiceExt};
16use tracing_futures::Instrument;
17
18use zebra_chain::{
19    block::{self, Block},
20    serialization::SerializationError,
21    transaction::{UnminedTx, UnminedTxId},
22};
23
24use crate::{
25    constants::{
26        self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
27        OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT,
28    },
29    meta_addr::MetaAddr,
30    peer::{
31        connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
32        ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
33        SharedPeerError,
34    },
35    peer_set::ConnectionTracker,
36    protocol::{
37        external::{types::Nonce, InventoryHash, Message},
38        internal::{InventoryResponse, Request, Response},
39    },
40    BoxError, PeerSocketAddr, MAX_TX_INV_IN_SENT_MESSAGE,
41};
42
43use InventoryResponse::*;
44
45mod peer_tx;
46
47#[cfg(test)]
48mod tests;
49
50#[derive(Debug)]
51pub(super) enum Handler {
52    /// Indicates that the handler has finished processing the request.
53    /// An error here is scoped to the request.
54    Finished(Result<Response, PeerError>),
55    Ping {
56        nonce: Nonce,
57        ping_sent_at: Instant,
58    },
59    Peers,
60    FindBlocks,
61    FindHeaders,
62    BlocksByHash {
63        pending_hashes: HashSet<block::Hash>,
64        blocks: Vec<Arc<Block>>,
65    },
66    TransactionsById {
67        pending_ids: HashSet<UnminedTxId>,
68        transactions: Vec<UnminedTx>,
69    },
70    MempoolTransactionIds,
71}
72
73impl fmt::Display for Handler {
74    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75        f.write_str(&match self {
76            Handler::Finished(Ok(response)) => format!("Finished({response})"),
77            Handler::Finished(Err(error)) => format!("Finished({error})"),
78
79            Handler::Ping { .. } => "Ping".to_string(),
80            Handler::Peers => "Peers".to_string(),
81
82            Handler::FindBlocks => "FindBlocks".to_string(),
83            Handler::FindHeaders => "FindHeaders".to_string(),
84            Handler::BlocksByHash {
85                pending_hashes,
86                blocks,
87            } => format!(
88                "BlocksByHash {{ pending_hashes: {}, blocks: {} }}",
89                pending_hashes.len(),
90                blocks.len()
91            ),
92
93            Handler::TransactionsById {
94                pending_ids,
95                transactions,
96            } => format!(
97                "TransactionsById {{ pending_ids: {}, transactions: {} }}",
98                pending_ids.len(),
99                transactions.len()
100            ),
101            Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
102        })
103    }
104}
105
106impl Handler {
107    /// Returns the Zebra internal handler type as a string.
108    pub fn command(&self) -> Cow<'static, str> {
109        match self {
110            Handler::Finished(Ok(response)) => format!("Finished({})", response.command()).into(),
111            Handler::Finished(Err(error)) => format!("Finished({})", error.kind()).into(),
112
113            Handler::Ping { .. } => "Ping".into(),
114            Handler::Peers => "Peers".into(),
115
116            Handler::FindBlocks => "FindBlocks".into(),
117            Handler::FindHeaders => "FindHeaders".into(),
118
119            Handler::BlocksByHash { .. } => "BlocksByHash".into(),
120            Handler::TransactionsById { .. } => "TransactionsById".into(),
121
122            Handler::MempoolTransactionIds => "MempoolTransactionIds".into(),
123        }
124    }
125
126    /// Try to handle `msg` as a response to a client request, possibly consuming
127    /// it in the process.
128    ///
129    /// This function is where we statefully interpret Bitcoin/Zcash messages
130    /// into responses to messages in the internal request/response protocol.
131    /// This conversion is done by a sequence of (request, message) match arms,
132    /// each of which contains the conversion logic for that pair.
133    ///
134    /// Taking ownership of the message means that we can pass ownership of its
135    /// contents to responses without additional copies.  If the message is not
136    /// interpretable as a response, we return ownership to the caller.
137    ///
138    /// Unexpected messages are left unprocessed, and may be rejected later.
139    ///
140    /// `addr` responses are limited to avoid peer set takeover. Any excess
141    /// addresses are stored in `cached_addrs`.
142    fn process_message(
143        &mut self,
144        msg: Message,
145        cached_addrs: &mut Vec<MetaAddr>,
146        transient_addr: Option<PeerSocketAddr>,
147    ) -> Option<Message> {
148        let mut ignored_msg = None;
149        // TODO: can this be avoided?
150        let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
151
152        debug!(handler = %tmp_state, %msg, "received peer response to Zebra request");
153
154        *self = match (tmp_state, msg) {
155            (
156                Handler::Ping {
157                    nonce,
158                    ping_sent_at,
159                },
160                Message::Pong(rsp_nonce),
161            ) => {
162                if nonce == rsp_nonce {
163                    let duration = ping_sent_at.elapsed();
164                    Handler::Finished(Ok(Response::Pong(duration)))
165                } else {
166                    Handler::Ping {
167                        nonce,
168                        ping_sent_at,
169                    }
170                }
171            }
172
173            (Handler::Peers, Message::Addr(new_addrs)) => {
174                // Security: This method performs security-sensitive operations, see its comments
175                // for details.
176                let response_addrs =
177                    Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT);
178
179                debug!(
180                    new_addrs = new_addrs.len(),
181                    response_addrs = response_addrs.len(),
182                    remaining_addrs = cached_addrs.len(),
183                    PEER_ADDR_RESPONSE_LIMIT,
184                    "responding to Peers request using new and cached addresses",
185                );
186
187                Handler::Finished(Ok(Response::Peers(response_addrs)))
188            }
189
190            // `zcashd` returns requested transactions in a single batch of messages.
191            // Other transaction or non-transaction messages can come before or after the batch.
192            // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
193            // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617
194            (
195                Handler::TransactionsById {
196                    mut pending_ids,
197                    mut transactions,
198                },
199                Message::Tx(transaction),
200            ) => {
201                // assumptions:
202                //   - the transaction messages are sent in a single continuous batch
203                //   - missing transactions are silently skipped
204                //     (there is no `notfound` message at the end of the batch)
205                if pending_ids.remove(&transaction.id) {
206                    // we are in the middle of the continuous transaction messages
207                    transactions.push(transaction);
208                } else {
209                    // We got a transaction we didn't ask for. If the caller doesn't know any of the
210                    // transactions, they should have sent a `notfound` with all the hashes, rather
211                    // than an unsolicited transaction.
212                    //
213                    // So either:
214                    // 1. The peer implements the protocol badly, skipping `notfound`.
215                    //    We should cancel the request, so we don't hang waiting for transactions
216                    //    that will never arrive.
217                    // 2. The peer sent an unsolicited transaction.
218                    //    We should ignore the transaction, and wait for the actual response.
219                    //
220                    // We end the request, so we don't hang on bad peers (case 1). But we keep the
221                    // connection open, so the inbound service can process transactions from good
222                    // peers (case 2).
223                    ignored_msg = Some(Message::Tx(transaction));
224                }
225
226                if ignored_msg.is_some() && transactions.is_empty() {
227                    // If we didn't get anything we wanted, retry the request.
228                    let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
229                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
230                } else if pending_ids.is_empty() || ignored_msg.is_some() {
231                    // If we got some of what we wanted, let the internal client know.
232                    let available = transactions
233                        .into_iter()
234                        .map(|t| InventoryResponse::Available((t, transient_addr)));
235                    let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
236
237                    Handler::Finished(Ok(Response::Transactions(
238                        available.chain(missing).collect(),
239                    )))
240                } else {
241                    // Keep on waiting for more.
242                    Handler::TransactionsById {
243                        pending_ids,
244                        transactions,
245                    }
246                }
247            }
248            // `zcashd` peers actually return this response
249            (
250                Handler::TransactionsById {
251                    pending_ids,
252                    transactions,
253                },
254                Message::NotFound(missing_invs),
255            ) => {
256                // assumptions:
257                //   - the peer eventually returns a transaction or a `notfound` entry
258                //     for each hash
259                //   - all `notfound` entries are contained in a single message
260                //   - the `notfound` message comes after the transaction messages
261                //
262                // If we're in sync with the peer, then the `notfound` should contain the remaining
263                // hashes from the handler. If we're not in sync with the peer, we should return
264                // what we got so far.
265                let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
266                if missing_transaction_ids != pending_ids {
267                    trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
268                    // if these errors are noisy, we should replace them with debugs
269                    debug!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
270                }
271                if missing_transaction_ids.len() != missing_invs.len() {
272                    trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
273                    debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
274                }
275
276                if transactions.is_empty() {
277                    // If we didn't get anything we wanted, retry the request.
278                    let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
279                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
280                } else {
281                    // If we got some of what we wanted, let the internal client know.
282                    let available = transactions
283                        .into_iter()
284                        .map(|t| InventoryResponse::Available((t, transient_addr)));
285                    let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
286
287                    Handler::Finished(Ok(Response::Transactions(
288                        available.chain(missing).collect(),
289                    )))
290                }
291            }
292
293            // `zcashd` returns requested blocks in a single batch of messages.
294            // Other blocks or non-blocks messages can come before or after the batch.
295            // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
296            // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
297            (
298                Handler::BlocksByHash {
299                    mut pending_hashes,
300                    mut blocks,
301                },
302                Message::Block(block),
303            ) => {
304                // assumptions:
305                //   - the block messages are sent in a single continuous batch
306                //   - missing blocks are silently skipped
307                //     (there is no `notfound` message at the end of the batch)
308                if pending_hashes.remove(&block.hash()) {
309                    // we are in the middle of the continuous block messages
310                    blocks.push(block);
311                } else {
312                    // We got a block we didn't ask for.
313                    //
314                    // So either:
315                    // 1. The response is for a previously cancelled block request.
316                    //    We should treat that block as an inbound gossiped block,
317                    //    and wait for the actual response.
318                    // 2. The peer doesn't know any of the blocks we asked for.
319                    //    We should cancel the request, so we don't hang waiting for blocks that
320                    //    will never arrive.
321                    // 3. The peer sent an unsolicited block.
322                    //    We should treat that block as an inbound gossiped block,
323                    //    and wait for the actual response.
324                    //
325                    // We ignore the message, so we don't desynchronize with the peer. This happens
326                    // when we cancel a request and send a second different request, but receive a
327                    // response for the first request. If we ended the request then, we could send
328                    // a third request to the peer, and end up having to end that request as well
329                    // when the response for the second request arrives.
330                    //
331                    // Ignoring the message gives us a chance to synchronize back to the correct
332                    // request. If that doesn't happen, this request times out.
333                    //
334                    // In case 2, if peers respond with a `notfound` message,
335                    // the cascading errors don't happen. The `notfound` message cancels our request,
336                    // and we know we are in sync with the peer.
337                    //
338                    // Zebra sends `notfound` in response to block requests, but `zcashd` doesn't.
339                    // So we need this message workaround, and the related inventory workarounds.
340                    ignored_msg = Some(Message::Block(block));
341                }
342
343                if pending_hashes.is_empty() {
344                    // If we got everything we wanted, let the internal client know.
345                    let available = blocks
346                        .into_iter()
347                        .map(|block| InventoryResponse::Available((block, transient_addr)));
348                    Handler::Finished(Ok(Response::Blocks(available.collect())))
349                } else {
350                    // Keep on waiting for all the blocks we wanted, until we get them or time out.
351                    Handler::BlocksByHash {
352                        pending_hashes,
353                        blocks,
354                    }
355                }
356            }
357            // peers are allowed to return this response, but `zcashd` never does
358            (
359                Handler::BlocksByHash {
360                    pending_hashes,
361                    blocks,
362                },
363                Message::NotFound(missing_invs),
364            ) => {
365                // assumptions:
366                //   - the peer eventually returns a block or a `notfound` entry
367                //     for each hash
368                //   - all `notfound` entries are contained in a single message
369                //   - the `notfound` message comes after the block messages
370                //
371                // If we're in sync with the peer, then the `notfound` should contain the remaining
372                // hashes from the handler. If we're not in sync with the peer, we should return
373                // what we got so far, and log an error.
374                let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
375                if missing_blocks != pending_hashes {
376                    trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
377                    // if these errors are noisy, we should replace them with debugs
378                    debug!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
379                }
380                if missing_blocks.len() != missing_invs.len() {
381                    trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
382                    debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
383                }
384
385                if blocks.is_empty() {
386                    // If we didn't get anything we wanted, retry the request.
387                    let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
388                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
389                } else {
390                    // If we got some of what we wanted, let the internal client know.
391                    let available = blocks
392                        .into_iter()
393                        .map(|block| InventoryResponse::Available((block, transient_addr)));
394                    let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
395
396                    Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
397                }
398            }
399
400            // TODO:
401            // - use `any(inv)` rather than `all(inv)`?
402            (Handler::FindBlocks, Message::Inv(items))
403                if items
404                    .iter()
405                    .all(|item| matches!(item, InventoryHash::Block(_))) =>
406            {
407                Handler::Finished(Ok(Response::BlockHashes(
408                    block_hashes(&items[..]).collect(),
409                )))
410            }
411            (Handler::FindHeaders, Message::Headers(headers)) => {
412                Handler::Finished(Ok(Response::BlockHeaders(headers)))
413            }
414
415            (Handler::MempoolTransactionIds, Message::Inv(items))
416                if items.iter().all(|item| item.unmined_tx_id().is_some()) =>
417            {
418                Handler::Finished(Ok(Response::TransactionIds(
419                    transaction_ids(&items).collect(),
420                )))
421            }
422
423            // By default, messages are not responses.
424            (state, msg) => {
425                trace!(?msg, "did not interpret message as response");
426                ignored_msg = Some(msg);
427                state
428            }
429        };
430
431        ignored_msg
432    }
433
434    /// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size`
435    /// addresses from that cache.
436    ///
437    /// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if
438    /// there are no new addresses. `response_size` can be zero or `None` if there is no response
439    /// needed.
440    fn update_addr_cache<'new>(
441        cached_addrs: &mut Vec<MetaAddr>,
442        new_addrs: impl IntoIterator<Item = &'new MetaAddr>,
443        response_size: impl Into<Option<usize>>,
444    ) -> Vec<MetaAddr> {
445        // # Peer Set Reliability
446        //
447        // Newly received peers are added to the cache, so that we can use them if the connection
448        // doesn't respond to our getaddr requests.
449        //
450        // Add the new addresses to the end of the cache.
451        cached_addrs.extend(new_addrs);
452
453        // # Security
454        //
455        // We limit how many peer addresses we take from each peer, so that our address book
456        // and outbound connections aren't controlled by a single peer (#1869). We randomly select
457        // peers, so the remote peer can't control which addresses we choose by changing the order
458        // in the messages they send.
459        let response_size = response_size.into().unwrap_or_default();
460
461        let mut temp_cache = Vec::new();
462        std::mem::swap(cached_addrs, &mut temp_cache);
463
464        // The response is fully shuffled, remaining is partially shuffled.
465        let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size);
466
467        // # Security
468        //
469        // The cache size is limited to avoid memory denial of service.
470        //
471        // It's ok to just partially shuffle the cache, because it doesn't actually matter which
472        // peers we drop. Having excess peers is rare, because most peers only send one large
473        // unsolicited peer message when they first connect.
474        *cached_addrs = remaining.to_vec();
475        cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE);
476
477        response.to_vec()
478    }
479}
480
481#[derive(Debug)]
482#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
483pub(super) enum State {
484    /// Awaiting a client request or a peer message.
485    AwaitingRequest,
486    /// Awaiting a peer message we can interpret as a response to a client request.
487    AwaitingResponse {
488        handler: Handler,
489        tx: MustUseClientResponseSender,
490        span: tracing::Span,
491    },
492    /// A failure has occurred and we are shutting down the connection.
493    Failed,
494}
495
496impl fmt::Display for State {
497    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
498        f.write_str(&match self {
499            State::AwaitingRequest => "AwaitingRequest".to_string(),
500            State::AwaitingResponse { handler, .. } => {
501                format!("AwaitingResponse({handler})")
502            }
503            State::Failed => "Failed".to_string(),
504        })
505    }
506}
507
508impl State {
509    /// Returns the Zebra internal state as a string.
510    pub fn command(&self) -> Cow<'static, str> {
511        match self {
512            State::AwaitingRequest => "AwaitingRequest".into(),
513            State::AwaitingResponse { handler, .. } => {
514                format!("AwaitingResponse({})", handler.command()).into()
515            }
516            State::Failed => "Failed".into(),
517        }
518    }
519}
520
521/// The outcome of mapping an inbound [`Message`] to a [`Request`].
522#[derive(Clone, Debug, Eq, PartialEq)]
523#[must_use = "inbound messages must be handled"]
524pub enum InboundMessage {
525    /// The message was mapped to an inbound [`Request`].
526    AsRequest(Request),
527
528    /// The message was consumed by the mapping method.
529    ///
530    /// For example, it could be cached, treated as an error,
531    /// or an internally handled [`Message::Ping`].
532    Consumed,
533
534    /// The message was not used by the inbound message handler.
535    Unused,
536}
537
538impl From<Request> for InboundMessage {
539    fn from(request: Request) -> Self {
540        InboundMessage::AsRequest(request)
541    }
542}
543
544/// The channels, services, and associated state for a peer connection.
545pub struct Connection<S, Tx>
546where
547    Tx: Sink<Message, Error = SerializationError> + Unpin,
548{
549    /// The metadata for the connected peer `service`.
550    ///
551    /// This field is used for debugging.
552    pub connection_info: Arc<ConnectionInfo>,
553
554    /// The state of this connection's current request or response.
555    pub(super) state: State,
556
557    /// A timeout for a client request. This is stored separately from
558    /// State so that we can move the future out of it independently of
559    /// other state handling.
560    pub(super) request_timer: Option<Pin<Box<Sleep>>>,
561
562    /// Unused peers from recent `addr` or `addrv2` messages from this peer.
563    /// Also holds the initial addresses sent in `version` messages, or guessed from the remote IP.
564    ///
565    /// When peers send solicited or unsolicited peer advertisements, Zebra puts them in this cache.
566    ///
567    /// When Zebra's components request peers, some cached peers are randomly selected,
568    /// consumed, and returned as a modified response. This works around `zcashd`'s address
569    /// response rate-limit.
570    ///
571    /// The cache size is limited to avoid denial of service attacks.
572    pub(super) cached_addrs: Vec<MetaAddr>,
573
574    /// The `inbound` service, used to answer requests from this connection's peer.
575    pub(super) svc: S,
576
577    /// A channel for requests that Zebra's internal services want to send to remote peers.
578    ///
579    /// This channel accepts [`Request`]s, and produces [`InProgressClientRequest`]s.
580    pub(super) client_rx: ClientRequestReceiver,
581
582    /// A slot for an error shared between the Connection and the Client that uses it.
583    ///
584    /// `None` unless the connection or client have errored.
585    pub(super) error_slot: ErrorSlot,
586
587    /// A channel for sending Zcash messages to the connected peer.
588    ///
589    /// This channel accepts [`Message`]s.
590    ///
591    /// The corresponding peer message receiver is passed to [`Connection::run`].
592    pub(super) peer_tx: PeerTx<Tx>,
593
594    /// A connection tracker that reduces the open connection count when dropped.
595    /// Used to limit the number of open connections in Zebra.
596    ///
597    /// This field does nothing until it is dropped.
598    ///
599    /// # Security
600    ///
601    /// If this connection tracker or `Connection`s are leaked,
602    /// the number of active connections will appear higher than it actually is.
603    /// If enough connections leak, Zebra will stop making new connections.
604    #[allow(dead_code)]
605    pub(super) connection_tracker: ConnectionTracker,
606
607    /// The metrics label for this peer. Usually the remote IP and port.
608    pub(super) metrics_label: String,
609
610    /// The state for this peer, when the metrics were last updated.
611    pub(super) last_metrics_state: Option<Cow<'static, str>>,
612
613    /// The time of the last overload error response from the inbound
614    /// service to a request from this connection,
615    /// or None if this connection hasn't yet received an overload error.
616    last_overload_time: Option<Instant>,
617}
618
619impl<S, Tx> fmt::Debug for Connection<S, Tx>
620where
621    Tx: Sink<Message, Error = SerializationError> + Unpin,
622{
623    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
624        // skip the channels, they don't tell us anything useful
625        f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
626            .field("connection_info", &self.connection_info)
627            .field("state", &self.state)
628            .field("request_timer", &self.request_timer)
629            .field("cached_addrs", &self.cached_addrs.len())
630            .field("error_slot", &self.error_slot)
631            .field("metrics_label", &self.metrics_label)
632            .field("last_metrics_state", &self.last_metrics_state)
633            .field("last_overload_time", &self.last_overload_time)
634            .finish()
635    }
636}
637
638impl<S, Tx> Connection<S, Tx>
639where
640    Tx: Sink<Message, Error = SerializationError> + Unpin,
641{
642    /// Return a new connection from its channels, services, shared state, and metadata.
643    pub(crate) fn new(
644        inbound_service: S,
645        client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
646        error_slot: ErrorSlot,
647        peer_tx: Tx,
648        connection_tracker: ConnectionTracker,
649        connection_info: Arc<ConnectionInfo>,
650        initial_cached_addrs: Vec<MetaAddr>,
651    ) -> Self {
652        let metrics_label = connection_info.connected_addr.get_transient_addr_label();
653
654        Connection {
655            connection_info,
656            state: State::AwaitingRequest,
657            request_timer: None,
658            cached_addrs: initial_cached_addrs,
659            svc: inbound_service,
660            client_rx: client_rx.into(),
661            error_slot,
662            peer_tx: peer_tx.into(),
663            connection_tracker,
664            metrics_label,
665            last_metrics_state: None,
666            last_overload_time: None,
667        }
668    }
669}
670
671impl<S, Tx> Connection<S, Tx>
672where
673    S: Service<Request, Response = Response, Error = BoxError>,
674    S::Error: Into<BoxError>,
675    Tx: Sink<Message, Error = SerializationError> + Unpin,
676{
677    /// Consume this `Connection` to form a spawnable future containing its event loop.
678    ///
679    /// `peer_rx` is a channel for receiving Zcash [`Message`]s from the connected peer.
680    /// The corresponding peer message receiver is [`Connection::peer_tx`].
681    pub async fn run<Rx>(mut self, mut peer_rx: Rx)
682    where
683        Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
684    {
685        // At a high level, the event loop we want is as follows: we check for any
686        // incoming messages from the remote peer, check if they should be interpreted
687        // as a response to a pending client request, and if not, interpret them as a
688        // request from the remote peer to our node.
689        //
690        // We also need to handle those client requests in the first place. The client
691        // requests are received from the corresponding `peer::Client` over a bounded
692        // channel (with bound 1, to minimize buffering), but there is no relationship
693        // between the stream of client requests and the stream of peer messages, so we
694        // cannot ignore one kind while waiting on the other. Moreover, we cannot accept
695        // a second client request while the first one is still pending.
696        //
697        // To do this, we inspect the current request state.
698        //
699        // If there is no pending request, we wait on either an incoming peer message or
700        // an incoming request, whichever comes first.
701        //
702        // If there is a pending request, we wait only on an incoming peer message, and
703        // check whether it can be interpreted as a response to the pending request.
704        //
705        // TODO: turn this comment into a module-level comment, after splitting the module.
706        loop {
707            self.update_state_metrics(None);
708
709            match self.state {
710                State::AwaitingRequest => {
711                    trace!("awaiting client request or peer message");
712                    // # Correctness
713                    //
714                    // Currently, select prefers the first future if multiple futures are ready.
715                    // We use this behaviour to prioritise messages on each individual peer
716                    // connection in this order:
717                    // - incoming messages from the remote peer, then
718                    // - outgoing messages to the remote peer.
719                    //
720                    // This improves the performance of peer responses to Zebra requests, and new
721                    // peer requests to Zebra's inbound service.
722                    //
723                    // `futures::StreamExt::next()` is cancel-safe:
724                    // <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
725                    // This means that messages from the future that isn't selected stay in the stream,
726                    // and they will be returned next time the future is checked.
727                    //
728                    // If an inbound peer message arrives at a ready peer that also has a pending
729                    // request from Zebra, we want to process the peer's message first.
730                    // If we process the Zebra request first:
731                    // - we could misinterpret the inbound peer message as a response to the Zebra
732                    //   request, or
733                    // - if the peer message is a request to Zebra, and we put the peer in the
734                    //   AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
735                    //   request. (Zebra services make multiple requests or retry, so this is ok.)
736                    //
737                    // # Security
738                    //
739                    // If a peer sends an uninterrupted series of messages, it will delay any new
740                    // requests from Zebra to that individual peer. This is behaviour we want,
741                    // because:
742                    // - any responses to Zebra's requests to that peer would be slow or timeout,
743                    // - the peer will eventually fail a Zebra keepalive check and get disconnected,
744                    // - if there are too many inbound messages overall, the inbound service will
745                    //   return an overload error and the peer will be disconnected.
746                    //
747                    // Messages to other peers will continue to be processed concurrently. Some
748                    // Zebra services might be temporarily delayed until the peer times out, if a
749                    // request to that peer is sent by the service, and the service blocks until
750                    // the request completes (or times out).
751                    match future::select(peer_rx.next(), self.client_rx.next()).await {
752                        Either::Left((None, _)) => {
753                            self.fail_with(PeerError::ConnectionClosed).await;
754                        }
755                        Either::Left((Some(Err(e)), _)) => self.fail_with(e).await,
756                        Either::Left((Some(Ok(msg)), _)) => {
757                            let unhandled_msg = self.handle_message_as_request(msg).await;
758
759                            if let Some(unhandled_msg) = unhandled_msg {
760                                debug!(
761                                    %unhandled_msg,
762                                    "ignoring unhandled request while awaiting a request"
763                                );
764                            }
765                        }
766                        Either::Right((None, _)) => {
767                            trace!("client_rx closed, ending connection");
768
769                            // There are no requests to be flushed,
770                            // but we need to set an error and update metrics.
771                            // (We don't want to log this error, because it's normal behaviour.)
772                            self.shutdown_async(PeerError::ClientDropped).await;
773                            break;
774                        }
775                        Either::Right((Some(req), _)) => {
776                            let span = req.span.clone();
777                            self.handle_client_request(req).instrument(span).await
778                        }
779                    }
780                }
781
782                // Check whether the handler is finished before waiting for a response message,
783                // because the response might be `Nil` or synthetic.
784                State::AwaitingResponse {
785                    handler: Handler::Finished(_),
786                    ref span,
787                    ..
788                } => {
789                    // We have to get rid of the span reference so we can tamper with the state.
790                    let span = span.clone();
791                    trace!(
792                        parent: &span,
793                        "returning completed response to client request"
794                    );
795
796                    // Replace the state with a temporary value,
797                    // so we can take ownership of the response sender.
798                    let tmp_state = std::mem::replace(&mut self.state, State::Failed);
799
800                    if let State::AwaitingResponse {
801                        handler: Handler::Finished(response),
802                        tx,
803                        ..
804                    } = tmp_state
805                    {
806                        if let Ok(response) = response.as_ref() {
807                            debug!(%response, "finished receiving peer response to Zebra request");
808                            // Add a metric for inbound responses to outbound requests.
809                            metrics::counter!(
810                                "zebra.net.in.responses",
811                                "command" => response.command(),
812                                "addr" => self.metrics_label.clone(),
813                            )
814                            .increment(1);
815                        } else {
816                            debug!(error = ?response, "error in peer response to Zebra request");
817                        }
818
819                        let _ = tx.send(response.map_err(Into::into));
820                    } else {
821                        unreachable!("already checked for AwaitingResponse");
822                    }
823
824                    self.state = State::AwaitingRequest;
825                }
826
827                // We're awaiting a response to a client request,
828                // so wait on either a peer message, or on a request cancellation.
829                State::AwaitingResponse {
830                    ref span,
831                    ref mut tx,
832                    ..
833                } => {
834                    // we have to get rid of the span reference so we can tamper with the state
835                    let span = span.clone();
836                    trace!(parent: &span, "awaiting response to client request");
837                    let timer_ref = self
838                        .request_timer
839                        .as_mut()
840                        .expect("timeout must be set while awaiting response");
841
842                    // # Security
843                    //
844                    // select() prefers the first future if multiple futures are ready.
845                    //
846                    // If multiple futures are ready, we want the priority for each individual
847                    // connection to be:
848                    // - cancellation, then
849                    // - timeout, then
850                    // - peer responses.
851                    //
852                    // (Messages to other peers are processed concurrently.)
853                    //
854                    // This makes sure a peer can't block disconnection or timeouts by sending too
855                    // many messages. It also avoids doing work to process messages after a
856                    // connection has failed.
857                    let cancel = future::select(tx.cancellation(), timer_ref);
858                    match future::select(cancel, peer_rx.next())
859                        .instrument(span.clone())
860                        .await
861                    {
862                        Either::Right((None, _)) => {
863                            self.fail_with(PeerError::ConnectionClosed).await
864                        }
865                        Either::Right((Some(Err(e)), _)) => self.fail_with(e).await,
866                        Either::Right((Some(Ok(peer_msg)), _cancel)) => {
867                            self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));
868
869                            // Try to process the message using the handler.
870                            // This extremely awkward construction avoids
871                            // keeping a live reference to handler across the
872                            // call to handle_message_as_request, which takes
873                            // &mut self. This is a sign that we don't properly
874                            // factor the state required for inbound and
875                            // outbound requests.
876                            let request_msg = match self.state {
877                                State::AwaitingResponse {
878                                    ref mut handler, ..
879                                } => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs, self.connection_info.connected_addr.get_transient_addr())),
880                                _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
881                                                  self.state,
882                                                  peer_msg,
883                                                  self.client_rx,
884                                ),
885                            };
886
887                            self.update_state_metrics(None);
888
889                            // If the message was not consumed as a response,
890                            // check whether it can be handled as a request.
891                            let unused_msg = if let Some(request_msg) = request_msg {
892                                // do NOT instrument with the request span, this is
893                                // independent work
894                                self.handle_message_as_request(request_msg).await
895                            } else {
896                                None
897                            };
898
899                            if let Some(unused_msg) = unused_msg {
900                                debug!(
901                                    %unused_msg,
902                                    %self.state,
903                                    "ignoring peer message: not a response or a request",
904                                );
905                            }
906                        }
907                        Either::Left((Either::Right(_), _peer_fut)) => {
908                            trace!(parent: &span, "client request timed out");
909                            let e = PeerError::ConnectionReceiveTimeout;
910
911                            // Replace the state with a temporary value,
912                            // so we can take ownership of the response sender.
913                            self.state = match std::mem::replace(&mut self.state, State::Failed) {
914                                // Special case: ping timeouts fail the connection.
915                                State::AwaitingResponse {
916                                    handler: Handler::Ping { .. },
917                                    tx,
918                                    ..
919                                } => {
920                                    // We replaced the original state, which means `fail_with` won't see it.
921                                    // So we do the state request cleanup manually.
922                                    let e = SharedPeerError::from(e);
923                                    let _ = tx.send(Err(e.clone()));
924                                    self.fail_with(e).await;
925                                    State::Failed
926                                }
927                                // Other request timeouts fail the request.
928                                State::AwaitingResponse { tx, .. } => {
929                                    let _ = tx.send(Err(e.into()));
930                                    State::AwaitingRequest
931                                }
932                                _ => unreachable!(
933                                    "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
934                                    self.client_rx
935                                ),
936                            };
937                        }
938                        Either::Left((Either::Left(_), _peer_fut)) => {
939                            // The client receiver was dropped, so we don't need to send on `tx` here.
940                            trace!(parent: &span, "client request was cancelled");
941                            self.state = State::AwaitingRequest;
942                        }
943                    }
944                }
945
946                // This connection has failed: stop the event loop, and complete the future.
947                State::Failed => break,
948            }
949        }
950
951        // TODO: close peer_rx here, after changing it from a stream to a channel
952
953        let error = self.error_slot.try_get_error();
954        assert!(
955            error.is_some(),
956            "closing connections must call fail_with() or shutdown() to set the error slot"
957        );
958
959        self.update_state_metrics(error.expect("checked is_some").to_string());
960    }
961
962    /// Fail this connection, log the failure, and shut it down.
963    /// See [`Self::shutdown_async()`] for details.
964    ///
965    /// Use [`Self::shutdown_async()`] to avoid logging the failure,
966    /// and [`Self::shutdown()`] from non-async code.
967    async fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
968        let error = error.into();
969
970        debug!(
971            %error,
972            client_receiver = ?self.client_rx,
973            "failing peer service with error"
974        );
975
976        self.shutdown_async(error).await;
977    }
978
979    /// Handle an internal client request, possibly generating outgoing messages to the
980    /// remote peer.
981    ///
982    /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
983    async fn handle_client_request(&mut self, req: InProgressClientRequest) {
984        trace!(?req.request);
985        use Request::*;
986        use State::*;
987        let InProgressClientRequest { request, tx, span } = req;
988
989        if tx.is_canceled() {
990            metrics::counter!("peer.canceled").increment(1);
991            debug!(state = %self.state, %request, "ignoring canceled request");
992
993            metrics::counter!(
994                "zebra.net.out.requests.canceled",
995                "command" => request.command(),
996                "addr" => self.metrics_label.clone(),
997            )
998            .increment(1);
999            self.update_state_metrics(format!("Out::Req::Canceled::{}", request.command()));
1000
1001            return;
1002        }
1003
1004        debug!(state = %self.state, %request, "sending request from Zebra to peer");
1005
1006        // Add a metric for outbound requests.
1007        metrics::counter!(
1008            "zebra.net.out.requests",
1009            "command" => request.command(),
1010            "addr" => self.metrics_label.clone(),
1011        )
1012        .increment(1);
1013        self.update_state_metrics(format!("Out::Req::{}", request.command()));
1014
1015        let new_handler = match (&self.state, request) {
1016            (Failed, request) => panic!(
1017                "failed connection cannot handle new request: {:?}, client_receiver: {:?}",
1018                request,
1019                self.client_rx
1020            ),
1021            (pending @ AwaitingResponse { .. }, request) => panic!(
1022                "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
1023                request,
1024                pending,
1025                self.client_rx
1026            ),
1027
1028            // Take some cached addresses from the peer connection. This address cache helps
1029            // work-around a `zcashd` addr response rate-limit.
1030            (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
1031                // Security: This method performs security-sensitive operations, see its comments
1032                // for details.
1033                let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT);
1034
1035                debug!(
1036                    response_addrs = response_addrs.len(),
1037                    remaining_addrs = self.cached_addrs.len(),
1038                    PEER_ADDR_RESPONSE_LIMIT,
1039                    "responding to Peers request using some cached addresses",
1040                );
1041
1042                Ok(Handler::Finished(Ok(Response::Peers(response_addrs))))
1043            }
1044            (AwaitingRequest, Peers) => self
1045                .peer_tx
1046                .send(Message::GetAddr)
1047                .await
1048                .map(|()| Handler::Peers),
1049
1050            (AwaitingRequest, Ping(nonce)) => {
1051                let ping_sent_at = Instant::now();
1052
1053                self
1054                    .peer_tx
1055                    .send(Message::Ping(nonce))
1056                    .await
1057                    .map(|()| Handler::Ping { nonce, ping_sent_at })
1058            }
1059
1060            (AwaitingRequest, BlocksByHash(hashes)) => {
1061                self
1062                    .peer_tx
1063                    .send(Message::GetData(
1064                        hashes.iter().map(|h| (*h).into()).collect(),
1065                    ))
1066                    .await
1067                    .map(|()|
1068                         Handler::BlocksByHash {
1069                             blocks: Vec::with_capacity(hashes.len()),
1070                             pending_hashes: hashes,
1071                         }
1072                    )
1073            }
1074            (AwaitingRequest, TransactionsById(ids)) => {
1075                self
1076                    .peer_tx
1077                    .send(Message::GetData(
1078                        ids.iter().map(Into::into).collect(),
1079                    ))
1080                    .await
1081                    .map(|()|
1082                         Handler::TransactionsById {
1083                             transactions: Vec::with_capacity(ids.len()),
1084                             pending_ids: ids,
1085                         })
1086            }
1087
1088            (AwaitingRequest, FindBlocks { known_blocks, stop }) => {
1089                self
1090                    .peer_tx
1091                    .send(Message::GetBlocks { known_blocks, stop })
1092                    .await
1093                    .map(|()|
1094                         Handler::FindBlocks
1095                    )
1096            }
1097            (AwaitingRequest, FindHeaders { known_blocks, stop }) => {
1098                self
1099                    .peer_tx
1100                    .send(Message::GetHeaders { known_blocks, stop })
1101                    .await
1102                    .map(|()|
1103                         Handler::FindHeaders
1104                    )
1105            }
1106
1107            (AwaitingRequest, MempoolTransactionIds) => {
1108                self
1109                    .peer_tx
1110                    .send(Message::Mempool)
1111                    .await
1112                    .map(|()|
1113                         Handler::MempoolTransactionIds
1114                    )
1115            }
1116
1117            (AwaitingRequest, PushTransaction(transaction)) => {
1118                self
1119                    .peer_tx
1120                    .send(Message::Tx(transaction))
1121                    .await
1122                    .map(|()|
1123                         Handler::Finished(Ok(Response::Nil))
1124                    )
1125            }
1126            (AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
1127                let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1128                    .try_into()
1129                    .expect("constant fits in usize");
1130
1131                // # Security
1132                //
1133                // In most cases, we try to split over-sized requests into multiple network-layer
1134                // messages. But we are unlikely to reach this limit with the default mempool
1135                // config, so a gossip like this could indicate a network amplification attack.
1136                //
1137                // This limit is particularly important here, because advertisements send the same
1138                // message to half our available peers.
1139                //
1140                // If there are thousands of transactions in the mempool, letting peers know the
1141                // exact transactions we have isn't that important, so it's ok to drop arbitrary
1142                // transaction hashes from our response.
1143                if hashes.len() > max_tx_inv_in_message {
1144                    debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip");
1145                }
1146
1147                let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect();
1148
1149                self
1150                    .peer_tx
1151                    .send(Message::Inv(hashes))
1152                    .await
1153                    .map(|()|
1154                         Handler::Finished(Ok(Response::Nil))
1155                    )
1156            }
1157            (AwaitingRequest, AdvertiseBlock(hash) | AdvertiseBlockToAll(hash)) => {
1158                self
1159                    .peer_tx
1160                    .send(Message::Inv(vec![hash.into()]))
1161                    .await
1162                    .map(|()|
1163                         Handler::Finished(Ok(Response::Nil))
1164                    )
1165            }
1166        };
1167
1168        // Update the connection state with a new handler, or fail with an error.
1169        match new_handler {
1170            Ok(handler) => {
1171                self.state = AwaitingResponse { handler, span, tx };
1172                self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
1173            }
1174            Err(error) => {
1175                let error = SharedPeerError::from(error);
1176                let _ = tx.send(Err(error.clone()));
1177                self.fail_with(error).await;
1178            }
1179        };
1180    }
1181
1182    /// Handle `msg` as a request from a peer to this Zebra instance.
1183    ///
1184    /// If the message is not handled, it is returned.
1185    // This function has its own span, because we're creating a new work
1186    // context (namely, the work of processing the inbound msg as a request)
1187    #[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))]
1188    async fn handle_message_as_request(&mut self, msg: Message) -> Option<Message> {
1189        trace!(?msg);
1190        debug!(state = %self.state, %msg, "received inbound peer message");
1191
1192        self.update_state_metrics(format!("In::Msg::{}", msg.command()));
1193
1194        use InboundMessage::*;
1195
1196        let req = match msg {
1197            Message::Ping(nonce) => {
1198                trace!(?nonce, "responding to heartbeat");
1199                if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
1200                    self.fail_with(e).await;
1201                }
1202                Consumed
1203            }
1204            // These messages shouldn't be sent outside of a handshake.
1205            Message::Version { .. } => {
1206                self.fail_with(PeerError::DuplicateHandshake).await;
1207                Consumed
1208            }
1209            Message::Verack => {
1210                self.fail_with(PeerError::DuplicateHandshake).await;
1211                Consumed
1212            }
1213            // These messages should already be handled as a response if they
1214            // could be a response, so if we see them here, they were either
1215            // sent unsolicited, or they were sent in response to a canceled request
1216            // that we've already forgotten about.
1217            Message::Reject { .. } => {
1218                debug!(%msg, "got reject message unsolicited or from canceled request");
1219                Unused
1220            }
1221            Message::NotFound { .. } => {
1222                debug!(%msg, "got notfound message unsolicited or from canceled request");
1223                Unused
1224            }
1225            Message::Pong(_) => {
1226                debug!(%msg, "got pong message unsolicited or from canceled request");
1227                Unused
1228            }
1229            Message::Block(_) => {
1230                debug!(%msg, "got block message unsolicited or from canceled request");
1231                Unused
1232            }
1233            Message::Headers(_) => {
1234                debug!(%msg, "got headers message unsolicited or from canceled request");
1235                Unused
1236            }
1237            // These messages should never be sent by peers.
1238            Message::FilterLoad { .. } | Message::FilterAdd { .. } | Message::FilterClear => {
1239                // # Security
1240                //
1241                // Zcash connections are not authenticated, so malicious nodes can send fake messages,
1242                // with connected peers' IP addresses in the IP header.
1243                //
1244                // Since we can't verify their source, Zebra needs to ignore unexpected messages,
1245                // because closing the connection could cause a denial of service or eclipse attack.
1246                debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
1247
1248                // Ignored, but consumed because it is technically a protocol error.
1249                Consumed
1250            }
1251
1252            // # Security
1253            //
1254            // Zebra crawls the network proactively, and that's the only way peers get into our
1255            // address book. This prevents peers from filling our address book with malicious peer
1256            // addresses.
1257            Message::Addr(ref new_addrs) => {
1258                // # Peer Set Reliability
1259                //
1260                // We keep a list of the unused peer addresses sent by each connection, to work
1261                // around `zcashd`'s `getaddr` response rate-limit.
1262                let no_response =
1263                    Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None);
1264                assert_eq!(
1265                    no_response,
1266                    Vec::new(),
1267                    "peers unexpectedly taken from cache"
1268                );
1269
1270                debug!(
1271                    new_addrs = new_addrs.len(),
1272                    cached_addrs = self.cached_addrs.len(),
1273                    "adding unsolicited addresses to cached addresses",
1274                );
1275
1276                Consumed
1277            }
1278            Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(),
1279            Message::Inv(ref items) => match &items[..] {
1280                // We don't expect to be advertised multiple blocks at a time,
1281                // so we ignore any advertisements of multiple blocks.
1282                [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash).into(),
1283
1284                // Some peers advertise invs with mixed item types.
1285                // But we're just interested in the transaction invs.
1286                //
1287                // TODO: split mixed invs into multiple requests,
1288                //       but skip runs of multiple blocks.
1289                tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1290                    Request::AdvertiseTransactionIds(transaction_ids(items).collect()).into()
1291                }
1292
1293                // Log detailed messages for ignored inv advertisement messages.
1294                [] => {
1295                    debug!(%msg, "ignoring empty inv");
1296
1297                    // This might be a minor protocol error, or it might mean "not found".
1298                    Unused
1299                }
1300                [InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
1301                    debug!(%msg, "ignoring inv with multiple blocks");
1302                    Unused
1303                }
1304                _ => {
1305                    debug!(%msg, "ignoring inv with no transactions");
1306                    Unused
1307                }
1308            },
1309            Message::GetData(ref items) => match &items[..] {
1310                // Some peers advertise invs with mixed item types.
1311                // So we suspect they might do the same with getdata.
1312                //
1313                // Since we can only handle one message at a time,
1314                // we treat it as a block request if there are any blocks,
1315                // or a transaction request if there are any transactions.
1316                //
1317                // TODO: split mixed getdata into multiple requests.
1318                b_hashes
1319                    if b_hashes
1320                        .iter()
1321                        .any(|item| matches!(item, InventoryHash::Block(_))) =>
1322                {
1323                    Request::BlocksByHash(block_hashes(items).collect()).into()
1324                }
1325                tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1326                    Request::TransactionsById(transaction_ids(items).collect()).into()
1327                }
1328
1329                // Log detailed messages for ignored getdata request messages.
1330                [] => {
1331                    debug!(%msg, "ignoring empty getdata");
1332
1333                    // This might be a minor protocol error, or it might mean "not found".
1334                    Unused
1335                }
1336                _ => {
1337                    debug!(%msg, "ignoring getdata with no blocks or transactions");
1338                    Unused
1339                }
1340            },
1341            Message::GetAddr => Request::Peers.into(),
1342            Message::GetBlocks {
1343                ref known_blocks,
1344                stop,
1345            } => Request::FindBlocks {
1346                known_blocks: known_blocks.clone(),
1347                stop,
1348            }
1349            .into(),
1350            Message::GetHeaders {
1351                ref known_blocks,
1352                stop,
1353            } => Request::FindHeaders {
1354                known_blocks: known_blocks.clone(),
1355                stop,
1356            }
1357            .into(),
1358            Message::Mempool => Request::MempoolTransactionIds.into(),
1359        };
1360
1361        // Handle the request, and return unused messages.
1362        match req {
1363            AsRequest(req) => {
1364                self.drive_peer_request(req).await;
1365                None
1366            }
1367            Consumed => None,
1368            Unused => Some(msg),
1369        }
1370    }
1371
1372    /// Given a `req` originating from the peer, drive it to completion and send
1373    /// any appropriate messages to the remote peer. If an error occurs while
1374    /// processing the request (e.g., the service is shedding load), then we call
1375    /// fail_with to terminate the entire peer connection, shrinking the number
1376    /// of connected peers.
1377    async fn drive_peer_request(&mut self, req: Request) {
1378        trace!(?req);
1379
1380        // Add a metric for inbound requests
1381        metrics::counter!(
1382            "zebra.net.in.requests",
1383            "command" => req.command(),
1384            "addr" => self.metrics_label.clone(),
1385        )
1386        .increment(1);
1387        self.update_state_metrics(format!("In::Req::{}", req.command()));
1388
1389        // Give the inbound service time to clear its queue,
1390        // before sending the next inbound request.
1391        tokio::task::yield_now().await;
1392
1393        // # Security
1394        //
1395        // Holding buffer slots for a long time can cause hangs:
1396        // <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
1397        //
1398        // The inbound service must be called immediately after a buffer slot is reserved.
1399        //
1400        // The inbound service never times out in readiness, because the load shed layer is always
1401        // ready, and returns an error in response to the request instead.
1402        if self.svc.ready().await.is_err() {
1403            self.fail_with(PeerError::ServiceShutdown).await;
1404            return;
1405        }
1406
1407        // Inbound service request timeouts are handled by the timeout layer in `start::start()`.
1408        let rsp = match self.svc.call(req.clone()).await {
1409            Err(e) => {
1410                if e.is::<tower::load_shed::error::Overloaded>() {
1411                    // # Security
1412                    //
1413                    // The peer request queue must have a limited length.
1414                    // The buffer and load shed layers are added in `start::start()`.
1415                    tracing::debug!("inbound service is overloaded, may close connection");
1416
1417                    let now = Instant::now();
1418
1419                    self.handle_inbound_overload(req, now, PeerError::Overloaded)
1420                        .await;
1421                } else if e.is::<tower::timeout::error::Elapsed>() {
1422                    // # Security
1423                    //
1424                    // Peer requests must have a timeout.
1425                    // The timeout layer is added in `start::start()`.
1426                    tracing::info!(%req, "inbound service request timed out, may close connection");
1427
1428                    let now = Instant::now();
1429
1430                    self.handle_inbound_overload(req, now, PeerError::InboundTimeout)
1431                        .await;
1432                } else {
1433                    // We could send a reject to the remote peer, but that might cause
1434                    // them to disconnect, and we might be using them to sync blocks.
1435                    // For similar reasons, we don't want to fail_with() here - we
1436                    // only close the connection if the peer is doing something wrong.
1437                    info!(
1438                        %e,
1439                        connection_state = ?self.state,
1440                        client_receiver = ?self.client_rx,
1441                        "error processing peer request",
1442                    );
1443                    self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
1444                }
1445
1446                return;
1447            }
1448            Ok(rsp) => rsp,
1449        };
1450
1451        // Add a metric for outbound responses to inbound requests
1452        metrics::counter!(
1453            "zebra.net.out.responses",
1454            "command" => rsp.command(),
1455            "addr" => self.metrics_label.clone(),
1456        )
1457        .increment(1);
1458        self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));
1459
1460        // TODO: split response handler into its own method
1461        match rsp.clone() {
1462            Response::Nil => { /* generic success, do nothing */ }
1463            Response::Peers(addrs) => {
1464                if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
1465                    self.fail_with(e).await;
1466                }
1467            }
1468            Response::Pong(duration) => {
1469                debug!(?duration, "responding to Ping with Pong RTT");
1470            }
1471            Response::Transactions(transactions) => {
1472                // Generate one tx message per transaction,
1473                // then a notfound message with all the missing transaction ids.
1474                let mut missing_ids = Vec::new();
1475
1476                for transaction in transactions.into_iter() {
1477                    match transaction {
1478                        Available((transaction, _)) => {
1479                            if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
1480                                self.fail_with(e).await;
1481                                return;
1482                            }
1483                        }
1484                        Missing(id) => missing_ids.push(id.into()),
1485                    }
1486                }
1487
1488                if !missing_ids.is_empty() {
1489                    if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
1490                        self.fail_with(e).await;
1491                        return;
1492                    }
1493                }
1494            }
1495            Response::Blocks(blocks) => {
1496                // Generate one tx message per block,
1497                // then a notfound message with all the missing block hashes.
1498                let mut missing_hashes = Vec::new();
1499
1500                for block in blocks.into_iter() {
1501                    match block {
1502                        Available((block, _)) => {
1503                            if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
1504                                self.fail_with(e).await;
1505                                return;
1506                            }
1507                        }
1508                        Missing(hash) => missing_hashes.push(hash.into()),
1509                    }
1510                }
1511
1512                if !missing_hashes.is_empty() {
1513                    if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
1514                        self.fail_with(e).await;
1515                        return;
1516                    }
1517                }
1518            }
1519            Response::BlockHashes(hashes) => {
1520                if let Err(e) = self
1521                    .peer_tx
1522                    .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
1523                    .await
1524                {
1525                    self.fail_with(e).await
1526                }
1527            }
1528            Response::BlockHeaders(headers) => {
1529                if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
1530                    self.fail_with(e).await
1531                }
1532            }
1533            Response::TransactionIds(hashes) => {
1534                let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1535                    .try_into()
1536                    .expect("constant fits in usize");
1537
1538                // # Security
1539                //
1540                // In most cases, we try to split over-sized responses into multiple network-layer
1541                // messages. But we are unlikely to reach this limit with the default mempool
1542                // config, so a response like this could indicate a network amplification attack.
1543                //
1544                // If there are thousands of transactions in the mempool, letting peers know the
1545                // exact transactions we have isn't that important, so it's ok to drop arbitrary
1546                // transaction hashes from our response.
1547                if hashes.len() > max_tx_inv_in_message {
1548                    debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response");
1549                }
1550
1551                let hashes = hashes
1552                    .into_iter()
1553                    .take(max_tx_inv_in_message)
1554                    .map(Into::into)
1555                    .collect();
1556
1557                if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
1558                    self.fail_with(e).await
1559                }
1560            }
1561        }
1562
1563        debug!(state = %self.state, %req, %rsp, "sent Zebra response to peer");
1564
1565        // Give the inbound service time to clear its queue,
1566        // before checking the connection for the next inbound or outbound request.
1567        tokio::task::yield_now().await;
1568    }
1569
1570    /// Handle inbound service overload and timeout error responses by randomly terminating some
1571    /// connections.
1572    ///
1573    /// # Security
1574    ///
1575    /// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
1576    /// to reduce the load on the application. But dropping every connection that receives an
1577    /// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
1578    /// connections, and stop itself downloading blocks or transactions.
1579    ///
1580    /// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
1581    /// its connections to other peers.
1582    ///
1583    /// So instead, Zebra drops some overloaded connections at random. If a connection has recently
1584    /// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
1585    /// single peer (or multiple peers) to perform a denial of service attack.
1586    ///
1587    /// The inbound connection rate-limit also makes it hard for multiple peers to perform this
1588    /// attack, because each inbound connection can only send one inbound request before its
1589    /// probability of being disconnected increases.
1590    async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) {
1591        let prev = self.last_overload_time.replace(now);
1592        let drop_connection_probability = overload_drop_connection_probability(now, prev);
1593
1594        if thread_rng().gen::<f32>() < drop_connection_probability {
1595            if matches!(error, PeerError::Overloaded) {
1596                metrics::counter!("pool.closed.loadshed").increment(1);
1597            } else {
1598                metrics::counter!("pool.closed.inbound.timeout").increment(1);
1599            }
1600
1601            tracing::info!(
1602                drop_connection_probability = format!("{drop_connection_probability:.3}"),
1603                remote_user_agent = ?self.connection_info.remote.user_agent,
1604                negotiated_version = ?self.connection_info.negotiated_version,
1605                peer = ?self.metrics_label,
1606                last_peer_state = ?self.last_metrics_state,
1607                // TODO: remove this detailed debug info once #6506 is fixed
1608                remote_height = ?self.connection_info.remote.start_height,
1609                cached_addrs = ?self.cached_addrs.len(),
1610                connection_state = ?self.state,
1611                "inbound service {error} error, closing connection",
1612            );
1613
1614            self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command()));
1615            self.fail_with(error).await;
1616        } else {
1617            self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command()));
1618
1619            if matches!(error, PeerError::Overloaded) {
1620                metrics::counter!("pool.ignored.loadshed").increment(1);
1621            } else {
1622                metrics::counter!("pool.ignored.inbound.timeout").increment(1);
1623            }
1624        }
1625    }
1626}
1627
1628/// Returns the probability of dropping a connection where the last overload was at `prev`,
1629/// and the current overload is `now`.
1630///
1631/// # Security
1632///
1633/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
1634/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
1635///
1636/// Connections that have seen a previous overload error in that time
1637/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
1638/// This probability increases quadratically, so peers that send lots of inbound
1639/// requests are more likely to be dropped.
1640///
1641/// ## Examples
1642///
1643/// If a connection sends multiple overloads close together, it is very likely to be
1644/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
1645/// to be disconnected.
1646fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
1647    let Some(prev) = prev else {
1648        return MIN_OVERLOAD_DROP_PROBABILITY;
1649    };
1650
1651    let protection_fraction_since_last_overload =
1652        (now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();
1653
1654    // Quadratically increase the disconnection probability for very recent overloads.
1655    // Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
1656    let overload_fraction = protection_fraction_since_last_overload.powi(2);
1657
1658    let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
1659    let raw_drop_probability =
1660        MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);
1661
1662    raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
1663}
1664
1665impl<S, Tx> Connection<S, Tx>
1666where
1667    Tx: Sink<Message, Error = SerializationError> + Unpin,
1668{
1669    /// Update the connection state metrics for this connection,
1670    /// using `extra_state_info` as additional state information.
1671    fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
1672        let current_metrics_state = if let Some(extra_state_info) = extra_state_info.into() {
1673            format!("{}::{extra_state_info}", self.state.command()).into()
1674        } else {
1675            self.state.command()
1676        };
1677
1678        if self.last_metrics_state.as_ref() == Some(&current_metrics_state) {
1679            return;
1680        }
1681
1682        self.erase_state_metrics();
1683
1684        // Set the new state
1685        metrics::gauge!(
1686            "zebra.net.connection.state",
1687            "command" => current_metrics_state.clone(),
1688            "addr" => self.metrics_label.clone(),
1689        )
1690        .increment(1.0);
1691
1692        self.last_metrics_state = Some(current_metrics_state);
1693    }
1694
1695    /// Erase the connection state metrics for this connection.
1696    fn erase_state_metrics(&mut self) {
1697        if let Some(last_metrics_state) = self.last_metrics_state.take() {
1698            metrics::gauge!(
1699                "zebra.net.connection.state",
1700                "command" => last_metrics_state,
1701                "addr" => self.metrics_label.clone(),
1702            )
1703            .set(0.0);
1704        }
1705    }
1706
1707    /// Marks the peer as having failed with `error`, and performs connection cleanup,
1708    /// including async channel closes.
1709    ///
1710    /// If the connection has errored already, re-use the original error.
1711    /// Otherwise, fail the connection with `error`.
1712    async fn shutdown_async(&mut self, error: impl Into<SharedPeerError>) {
1713        // Close async channels first, so other tasks can start shutting down.
1714        // There's nothing we can do about errors while shutting down, and some errors are expected.
1715        //
1716        // TODO: close peer_tx and peer_rx in shutdown() and Drop, after:
1717        // - using channels instead of streams/sinks?
1718        // - exposing the underlying implementation rather than using generics and closures?
1719        // - adding peer_rx to the connection struct (optional)
1720        let _ = self.peer_tx.close().await;
1721
1722        self.shutdown(error);
1723    }
1724
1725    /// Marks the peer as having failed with `error`, and performs connection cleanup.
1726    /// See [`Self::shutdown_async()`] for details.
1727    ///
1728    /// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels.
1729    fn shutdown(&mut self, error: impl Into<SharedPeerError>) {
1730        let mut error = error.into();
1731
1732        // Close channels first, so other tasks can start shutting down.
1733        self.client_rx.close();
1734
1735        // Update the shared error slot
1736        //
1737        // # Correctness
1738        //
1739        // Error slots use a threaded `std::sync::Mutex`, so accessing the slot
1740        // can block the async task's current thread. We only perform a single
1741        // slot update per `Client`. We ignore subsequent error slot updates.
1742        let slot_result = self.error_slot.try_update_error(error.clone());
1743
1744        if let Err(AlreadyErrored { original_error }) = slot_result {
1745            debug!(
1746                new_error = %error,
1747                %original_error,
1748                connection_state = ?self.state,
1749                "multiple errors on connection: \
1750                 failed connections should stop processing pending requests and responses, \
1751                 then close the connection"
1752            );
1753
1754            error = original_error;
1755        } else {
1756            debug!(%error,
1757                   connection_state = ?self.state,
1758                   "shutting down peer service with error");
1759        }
1760
1761        // Prepare to flush any pending client requests.
1762        //
1763        // We've already closed the client channel, so setting State::Failed
1764        // will make the main loop flush any pending requests.
1765        //
1766        // However, we may have an outstanding client request in State::AwaitingResponse,
1767        // so we need to deal with it first.
1768        if let State::AwaitingResponse { tx, .. } =
1769            std::mem::replace(&mut self.state, State::Failed)
1770        {
1771            // # Correctness
1772            //
1773            // We know the slot has Some(error), because we just set it above,
1774            // and the error slot is never unset.
1775            //
1776            // Accessing the error slot locks a threaded std::sync::Mutex, which
1777            // can block the current async task thread. We briefly lock the mutex
1778            // to clone the error.
1779            let _ = tx.send(Err(error.clone()));
1780        }
1781
1782        // Make the timer and metrics consistent with the Failed state.
1783        self.request_timer = None;
1784        self.update_state_metrics(None);
1785
1786        // Finally, flush pending client requests.
1787        while let Some(InProgressClientRequest { tx, span, .. }) =
1788            self.client_rx.close_and_flush_next()
1789        {
1790            trace!(
1791                parent: &span,
1792                %error,
1793                "sending an error response to a pending request on a failed connection"
1794            );
1795            let _ = tx.send(Err(error.clone()));
1796        }
1797    }
1798}
1799
1800impl<S, Tx> Drop for Connection<S, Tx>
1801where
1802    Tx: Sink<Message, Error = SerializationError> + Unpin,
1803{
1804    fn drop(&mut self) {
1805        self.shutdown(PeerError::ConnectionDropped);
1806
1807        self.erase_state_metrics();
1808    }
1809}
1810
1811/// Map a list of inventory hashes to the corresponding unmined transaction IDs.
1812/// Non-transaction inventory hashes are skipped.
1813///
1814/// v4 transactions use a legacy transaction ID, and
1815/// v5 transactions use a witnessed transaction ID.
1816fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator<Item = UnminedTxId> + '_ {
1817    items.iter().filter_map(InventoryHash::unmined_tx_id)
1818}
1819
1820/// Map a list of inventory hashes to the corresponding block hashes.
1821/// Non-block inventory hashes are skipped.
1822fn block_hashes(items: &'_ [InventoryHash]) -> impl Iterator<Item = block::Hash> + '_ {
1823    items.iter().filter_map(InventoryHash::block_hash)
1824}