Skip to main content

zebra_network/peer_set/
set.rs

1//! Abstractions that represent "the rest of the network".
2//!
3//! # Implementation
4//!
5//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
6//!
7//! As described in Tower's documentation, it:
8//!
9//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
10//! >
11//! > As described in the [Finagle Guide][finagle]:
12//! >
13//! > > The algorithm randomly picks two services from the set of ready endpoints and
14//! > > selects the least loaded of the two. By repeatedly using this strategy, we can
15//! > > expect a manageable upper bound on the maximum load of any server.
16//! > >
17//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
18//! > > `n` is the number of servers in the cluster.
19//!
20//! The Power of Two Choices should work well for many network requests, but not all of them.
21//! Some requests should only be made to a subset of connected peers.
22//! For example, a request for a particular inventory item
23//! should be made to a peer that has recently advertised that inventory hash.
24//! Other requests require broadcasts, such as transaction diffusion.
25//!
26//! Implementing this specialized routing logic inside the `PeerSet` -- so that
27//! it continues to abstract away "the rest of the network" into one endpoint --
28//! is not a problem, as the `PeerSet` can simply maintain more information on
29//! its peers and route requests appropriately. However, there is a problem with
30//! maintaining accurate backpressure information, because the `Service` trait
31//! requires that service readiness is independent of the data in the request.
32//!
33//! For this reason, in the future, this code will probably be refactored to
34//! address this backpressure mismatch. One possibility is to refactor the code
35//! so that one entity holds and maintains the peer set and metadata on the
36//! peers, and each "backpressure category" of request is assigned to different
37//! `Service` impls with specialized `poll_ready()` implementations. Another
38//! less-elegant solution (which might be useful as an intermediate step for the
39//! inventory case) is to provide a way to borrow a particular backing service,
40//! say by address.
41//!
42//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
43//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
44//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
45//!
46//! # Behavior During Network Upgrades
47//!
48//! [ZIP-201] specifies peer behavior during network upgrades:
49//!
50//! > With scheduled network upgrades, at the activation height, nodes on each consensus branch
51//! > should disconnect from nodes on other consensus branches and only accept new incoming
52//! > connections from nodes on the same consensus branch.
53//!
54//! Zebra handles this with the help of [`MinimumPeerVersion`], which determines the minimum peer
55//! protocol version to accept based on the current best chain tip height. The minimum version is
56//! therefore automatically increased when the block height reaches a network upgrade's activation
57//! height. The helper type is then used to:
58//!
59//! - cancel handshakes to outdated peers, in `handshake::negotiate_version`
60//! - cancel requests to and disconnect from peers that have become outdated, in
61//!   [`PeerSet::push_unready`]
62//! - disconnect from peers that have just responded and became outdated, in
63//!   [`PeerSet::poll_unready`]
64//! - disconnect from idle peers that have become outdated, in
65//!   [`PeerSet::disconnect_from_outdated_peers`]
66//!
67//! ## Network Coalescence
68//!
69//! [ZIP-201] also specifies how Zcashd behaves [leading up to a activation
70//! height][1]. Since Zcashd limits the number of connections to at most eight
71//! peers, it will gradually migrate its connections to up-to-date peers as it
72//! approaches the activation height.
73//!
74//! The motivation for this behavior is to avoid an abrupt partitioning the network, which can lead
75//! to isolated peers and increases the chance of an eclipse attack on some peers of the network.
76//!
77//! Zebra does not gradually migrate its peers as it approaches an activation height. This is
78//! because Zebra by default can connect to up to 75 peers, as can be seen in [`Config::default`].
79//! Since this is a lot larger than the 8 peers Zcashd connects to, an eclipse attack becomes a lot
80//! more costly to execute, and the probability of an abrupt network partition that isolates peers
81//! is lower.
82//!
83//! Even if a Zebra node is manually configured to connect to a smaller number
84//! of peers, the [`AddressBook`][2] is configured to hold a large number of
85//! peer addresses ([`MAX_ADDRS_IN_ADDRESS_BOOK`][3]). Since the address book
86//! prioritizes addresses it trusts (like those that it has successfully
87//! connected to before), the node should be able to recover and rejoin the
88//! network by itself, as long as the address book is populated with enough
89//! entries.
90//!
91//! [1]: https://zips.z.cash/zip-0201#network-coalescence
92//! [2]: crate::AddressBook
93//! [3]: crate::constants::MAX_ADDRS_IN_ADDRESS_BOOK
94//! [ZIP-201]: https://zips.z.cash/zip-0201
95
96use std::{
97    collections::{HashMap, HashSet},
98    convert,
99    fmt::Debug,
100    marker::PhantomData,
101    net::IpAddr,
102    pin::Pin,
103    sync::Arc,
104    task::{Context, Poll},
105    time::Instant,
106};
107
108use futures::{
109    channel::{mpsc, oneshot},
110    future::{FutureExt, TryFutureExt},
111    prelude::*,
112    stream::FuturesUnordered,
113    task::noop_waker,
114};
115use indexmap::IndexMap;
116use itertools::Itertools;
117use num_integer::div_ceil;
118use tokio::{
119    sync::{broadcast, mpsc as tokio_mpsc, watch},
120    task::JoinHandle,
121};
122use tower::{
123    discover::{Change, Discover},
124    load::Load,
125    Service,
126};
127
128use zebra_chain::{chain_tip::ChainTip, parameters::Network};
129
130use crate::{
131    address_book::AddressMetrics,
132    constants::MIN_PEER_SET_LOG_INTERVAL,
133    peer::{LoadTrackedClient, MinimumPeerVersion},
134    peer_set::{
135        stall_tracker::FindResponseStallTracker,
136        unready_service::{Error as UnreadyError, UnreadyService},
137        InventoryChange, InventoryRegistry,
138    },
139    protocol::{
140        external::InventoryHash,
141        internal::{Request, Response},
142    },
143    BoxError, Config, PeerError, PeerSocketAddr, SharedPeerError,
144};
145
146#[cfg(test)]
147mod tests;
148
149/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
150///
151/// In response to this signal, the crawler tries to open more peer connections.
152#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
153pub struct MorePeers;
154
155/// A signal sent by the [`PeerSet`] to cancel a [`Client`][1]'s current request
156/// or response.
157///
158/// When it receives this signal, the [`Client`][1] stops processing and exits.
159///
160/// [1]: crate::peer::Client
161#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
162pub struct CancelClientWork;
163
164type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response, BoxError>> + Send + 'static>>;
165
166/// Classification of a `FindBlocks`/`FindHeaders` response, sent from a
167/// response-wrapping future to [`PeerSet::poll_ready`] via an mpsc channel so
168/// the stall tracker can be updated and the peer disconnected if needed.
169#[derive(Copy, Clone, Debug, PartialEq, Eq)]
170enum StallOutcome {
171    Stall,
172    Clear,
173}
174
175fn classify_find_response<E>(result: &Result<Response, E>) -> Option<StallOutcome> {
176    match result {
177        Ok(Response::BlockHashes(hashes)) if hashes.is_empty() => Some(StallOutcome::Stall),
178        Ok(Response::BlockHashes(_)) => Some(StallOutcome::Clear),
179        Ok(Response::BlockHeaders(headers)) if headers.is_empty() => Some(StallOutcome::Stall),
180        Ok(Response::BlockHeaders(_)) => Some(StallOutcome::Clear),
181        Ok(_) => None,
182        Err(_) => Some(StallOutcome::Stall),
183    }
184}
185
186/// A [`tower::Service`] that abstractly represents "the rest of the network".
187///
188/// # Security
189///
190/// The `Discover::Key` must be the transient remote address of each peer. This
191/// address may only be valid for the duration of a single connection. (For
192/// example, inbound connections have an ephemeral remote port, and proxy
193/// connections have an ephemeral local or proxy port.)
194///
195/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
196pub struct PeerSet<D, C>
197where
198    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
199    D::Error: Into<BoxError>,
200    C: ChainTip,
201{
202    // Peer Tracking: New Peers
203    //
204    /// Provides new and deleted peer [`Change`]s to the peer set,
205    /// via the [`Discover`] trait implementation.
206    discover: D,
207
208    /// A channel that asks the peer crawler task to connect to more peers.
209    demand_signal: mpsc::Sender<MorePeers>,
210
211    /// A watch channel receiver with a copy of banned IP addresses.
212    bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
213
214    /// Tracks peers returning empty `FindBlocks`/`FindHeaders` responses.
215    /// Mutated only from [`Self::poll_ready`] via [`Self::stall_event_rx`].
216    find_response_stalls: FindResponseStallTracker,
217
218    /// Receives stall/clear events from tracked routing futures in
219    /// [`Self::route_p2c`]. The channel keeps the tracker single-owner (no
220    /// `Mutex`) and confines mutation to `poll_ready`, where the peer set can
221    /// call [`Self::remove`] directly.
222    stall_event_rx: tokio_mpsc::UnboundedReceiver<(PeerSocketAddr, StallOutcome)>,
223
224    /// Producer clones handed to each tracked request's response wrapper.
225    stall_event_tx: tokio_mpsc::UnboundedSender<(PeerSocketAddr, StallOutcome)>,
226
227    // Peer Tracking: Ready Peers
228    //
229    /// Connected peers that are ready to receive requests from Zebra,
230    /// or send requests to Zebra.
231    ready_services: HashMap<D::Key, D::Service>,
232
233    // Request Routing
234    //
235    /// Stores gossiped inventory hashes from connected peers.
236    ///
237    /// Used to route inventory requests to peers that are likely to have it.
238    inventory_registry: InventoryRegistry,
239
240    /// Stores requests that should be routed to peers once they are ready.
241    queued_broadcast_all: Option<(
242        Request,
243        tokio::sync::mpsc::Sender<ResponseFuture>,
244        HashSet<D::Key>,
245    )>,
246
247    // Peer Tracking: Busy Peers
248    //
249    /// Connected peers that are handling a Zebra request,
250    /// or Zebra is handling one of their requests.
251    unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
252
253    /// Channels used to cancel the request that an unready service is doing.
254    cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
255
256    // Peer Validation
257    //
258    /// An endpoint to see the minimum peer protocol version in real time.
259    ///
260    /// The minimum version depends on the block height, and [`MinimumPeerVersion`] listens for
261    /// height changes and determines the correct minimum version.
262    minimum_peer_version: MinimumPeerVersion<C>,
263
264    /// The configured limit for inbound and outbound connections.
265    ///
266    /// The peer set panics if this size is exceeded.
267    /// If that happens, our connection limit code has a bug.
268    peerset_total_connection_limit: usize,
269
270    // Background Tasks
271    //
272    /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
273    ///
274    /// The join handles passed into the PeerSet are used populate the `guards` member
275    handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
276
277    /// Unordered set of handles to background tasks associated with the `PeerSet`
278    ///
279    /// These guards are checked for errors as part of `poll_ready` which lets
280    /// the `PeerSet` propagate errors from background tasks back to the user
281    guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
282
283    // Metrics and Logging
284    //
285    /// Address book metrics watch channel.
286    ///
287    /// Used for logging diagnostics.
288    address_metrics: watch::Receiver<AddressMetrics>,
289
290    /// The last time we logged a message about the peer set size
291    last_peer_log: Option<Instant>,
292
293    /// The configured maximum number of peers that can be in the
294    /// peer set per IP, defaults to [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`]
295    max_conns_per_ip: usize,
296
297    /// The network of this peer set.
298    network: Network,
299}
300
301impl<D, C> Drop for PeerSet<D, C>
302where
303    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
304    D::Error: Into<BoxError>,
305    C: ChainTip,
306{
307    fn drop(&mut self) {
308        // We don't have access to the current task (if any), so we just drop everything we can.
309        let waker = noop_waker();
310        let mut cx = Context::from_waker(&waker);
311
312        self.shut_down_tasks_and_channels(&mut cx);
313    }
314}
315
316impl<D, C> PeerSet<D, C>
317where
318    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
319    D::Error: Into<BoxError>,
320    C: ChainTip,
321{
322    #[allow(clippy::too_many_arguments)]
323    /// Construct a peerset which uses `discover` to manage peer connections.
324    ///
325    /// Arguments:
326    /// - `config`: configures the peer set connection limit;
327    /// - `discover`: handles peer connects and disconnects;
328    /// - `demand_signal`: requests more peers when all peers are busy (unready);
329    /// - `handle_rx`: receives background task handles,
330    ///   monitors them to make sure they're still running,
331    ///   and shuts down all the tasks as soon as one task exits;
332    /// - `inv_stream`: receives inventory changes from peers,
333    ///   allowing the peer set to direct inventory requests;
334    /// - `bans_receiver`: receives a map of banned IP addresses that should be dropped;
335    /// - `address_book`: when peer set is busy, it logs address book diagnostics.
336    /// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
337    /// - `max_conns_per_ip`: configured maximum number of peers that can be in the
338    ///   peer set per IP, defaults to the config value or to
339    ///   [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`].
340    pub fn new(
341        config: &Config,
342        discover: D,
343        demand_signal: mpsc::Sender<MorePeers>,
344        handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
345        inv_stream: broadcast::Receiver<InventoryChange>,
346        bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
347        address_metrics: watch::Receiver<AddressMetrics>,
348        minimum_peer_version: MinimumPeerVersion<C>,
349        max_conns_per_ip: Option<usize>,
350    ) -> Self {
351        let (stall_event_tx, stall_event_rx) = tokio_mpsc::unbounded_channel();
352        Self {
353            // New peers
354            discover,
355            demand_signal,
356            // Banned peers
357            bans_receiver,
358
359            // Stall tracking
360            find_response_stalls: FindResponseStallTracker::new(),
361            stall_event_rx,
362            stall_event_tx,
363
364            // Ready peers
365            ready_services: HashMap::new(),
366            // Request Routing
367            inventory_registry: InventoryRegistry::new(inv_stream),
368            queued_broadcast_all: None,
369
370            // Busy peers
371            unready_services: FuturesUnordered::new(),
372            cancel_handles: HashMap::new(),
373
374            // Peer validation
375            minimum_peer_version,
376            peerset_total_connection_limit: config.peerset_total_connection_limit(),
377
378            // Background tasks
379            handle_rx,
380            guards: futures::stream::FuturesUnordered::new(),
381
382            // Metrics
383            last_peer_log: None,
384            address_metrics,
385
386            max_conns_per_ip: max_conns_per_ip.unwrap_or(config.max_connections_per_ip),
387
388            network: config.network.clone(),
389        }
390    }
391
392    /// Check background task handles to make sure they're still running.
393    ///
394    /// Never returns `Ok`.
395    ///
396    /// If any background task exits, shuts down all other background tasks,
397    /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
398    /// receiving the background tasks, or the background tasks exiting.
399    fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
400        futures::ready!(self.receive_tasks_if_needed(cx))?;
401
402        // Return Pending if all background tasks are still running.
403        match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) {
404            Some(res) => {
405                info!(
406                    background_tasks = %self.guards.len(),
407                    "a peer set background task exited, shutting down other peer set tasks"
408                );
409
410                self.shut_down_tasks_and_channels(cx);
411
412                // Flatten the join result and inner result, and return any errors.
413                res.map_err(Into::into)
414                    // TODO: replace with Result::flatten when it stabilises (#70142)
415                    .and_then(convert::identity)?;
416
417                // Turn Ok() task exits into errors.
418                Poll::Ready(Err("a peer set background task exited".into()))
419            }
420
421            None => {
422                self.shut_down_tasks_and_channels(cx);
423                Poll::Ready(Err("all peer set background tasks have exited".into()))
424            }
425        }
426    }
427
428    /// Receive background tasks, if they've been sent on the channel, but not consumed yet.
429    ///
430    /// Returns a result representing the current task state, or `Poll::Pending` if the background
431    /// tasks should be polled again to check their state.
432    fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
433        if self.guards.is_empty() {
434            // Return Pending if the tasks have not been sent yet.
435            let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx));
436
437            match handles {
438                // The tasks have been sent, but not consumed yet.
439                Ok(handles) => {
440                    // Currently, the peer set treats an empty background task set as an error.
441                    //
442                    // TODO: refactor `handle_rx` and `guards` into an enum
443                    //       for the background task state: Waiting/Running/Shutdown.
444                    assert!(
445                        !handles.is_empty(),
446                        "the peer set requires at least one background task"
447                    );
448
449                    self.guards.extend(handles);
450
451                    Poll::Ready(Ok(()))
452                }
453
454                // The sender was dropped without sending the tasks.
455                Err(_) => Poll::Ready(Err(
456                    "sender did not send peer background tasks before it was dropped".into(),
457                )),
458            }
459        } else {
460            Poll::Ready(Ok(()))
461        }
462    }
463
464    /// Shut down:
465    /// - services by dropping the service lists
466    /// - background tasks via their join handles or cancel handles
467    /// - channels by closing the channel
468    fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) {
469        // Drop services and cancel their background tasks.
470        self.ready_services = HashMap::new();
471
472        for (_peer_key, handle) in self.cancel_handles.drain() {
473            let _ = handle.send(CancelClientWork);
474        }
475        self.unready_services = FuturesUnordered::new();
476
477        // Close the MorePeers channel for all senders,
478        // so we don't add more peers to a shut down peer set.
479        self.demand_signal.close_channel();
480
481        // Shut down background tasks, ignoring pending polls.
482        self.handle_rx.close();
483        let _ = self.receive_tasks_if_needed(cx);
484        for guard in self.guards.iter() {
485            guard.abort();
486        }
487    }
488
489    /// Checks for newly ready, disconnects from outdated peers, and polls ready peer errors.
490    fn poll_peers(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
491        // Check for newly ready peers, including newly added peers (which are added as unready).
492        // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready
493        // peers.
494        //
495        // Each connected peer should become ready within a few minutes, or timeout, close the
496        // connection, and release its connection slot.
497        //
498        // TODO: drop peers that overload us with inbound messages and never become ready (#7822)
499        let _poll_pending_or_ready: Poll<Option<()>> = self.poll_unready(cx)?;
500
501        // Cleanup
502
503        // Only checks the versions of ready peers, so it needs to run after `poll_unready()`.
504        self.disconnect_from_outdated_peers();
505
506        // Check for failures in ready peers, removing newly errored or disconnected peers.
507        // So it needs to run after `poll_unready()`.
508        self.poll_ready_peer_errors(cx).map(Ok)
509    }
510
511    /// Check busy peer services for request completion or errors.
512    ///
513    /// Move newly ready services to the ready list if they are for peers with supported protocol
514    /// versions, otherwise they are dropped. Also drop failed services.
515    ///
516    /// Never returns an error.
517    ///
518    /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are
519    /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty.
520    ///
521    /// If there are any remaining unready peers, registers a wakeup for the next time one becomes
522    /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come
523    /// from peers, there needs to be at least one peer to register a wakeup.)
524    fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, BoxError>> {
525        let mut result = Poll::Pending;
526
527        // # Correctness
528        //
529        // `poll_next()` must always be called, because `self.unready_services` could have been
530        // empty before the call to `self.poll_ready()`.
531        //
532        // > When new futures are added, `poll_next` must be called in order to begin receiving
533        // > wake-ups for new futures.
534        //
535        // <https://docs.rs/futures/latest/futures/stream/futures_unordered/struct.FuturesUnordered.html>
536        //
537        // Returns Pending if we've finished processing the unready service changes,
538        // but there are still some unready services.
539        loop {
540            // No ready peers left, but there are some unready peers pending.
541            let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else {
542                break;
543            };
544
545            match ready_peer {
546                // No unready peers in the list.
547                None => {
548                    // If we've finished processing the unready service changes, and there are no
549                    // unready services left, it doesn't make sense to return Pending, because
550                    // their stream is terminated. But when we add more unready peers and call
551                    // `poll_next()`, its termination status will be reset, and it will receive
552                    // wakeups again.
553                    if result.is_pending() {
554                        result = Poll::Ready(Ok(None));
555                    }
556
557                    break;
558                }
559
560                // Unready -> Ready
561                Some(Ok((key, svc))) => {
562                    trace!(?key, "service became ready");
563
564                    if self.bans_receiver.borrow().contains_key(&key.ip()) {
565                        warn!(?key, "service is banned, dropping service");
566                        std::mem::drop(svc);
567                        let cancel = self.cancel_handles.remove(&key);
568                        debug_assert!(
569                            cancel.is_some(),
570                            "missing cancel handle for banned unready peer"
571                        );
572                        continue;
573                    }
574
575                    self.push_ready(true, key, svc);
576
577                    // Return Ok if at least one peer became ready.
578                    result = Poll::Ready(Ok(Some(())));
579                }
580
581                // Unready -> Canceled
582                Some(Err((key, UnreadyError::Canceled))) => {
583                    // A service be canceled because we've connected to the same service twice.
584                    // In that case, there is a cancel handle for the peer address,
585                    // but it belongs to the service for the newer connection.
586                    trace!(
587                        ?key,
588                        duplicate_connection = self.cancel_handles.contains_key(&key),
589                        "service was canceled, dropping service"
590                    );
591                }
592                Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
593                    // Similarly, services with dropped cancel handes can have duplicates.
594                    trace!(
595                        ?key,
596                        duplicate_connection = self.cancel_handles.contains_key(&key),
597                        "cancel handle was dropped, dropping service"
598                    );
599                }
600
601                // Unready -> Errored
602                Some(Err((key, UnreadyError::Inner(error)))) => {
603                    debug!(%error, "service failed while unready, dropping service");
604
605                    let cancel = self.cancel_handles.remove(&key);
606                    assert!(cancel.is_some(), "missing cancel handle");
607                }
608            }
609        }
610
611        result
612    }
613
614    /// Checks previously ready peer services for errors.
615    ///
616    /// The only way these peer `Client`s can become unready is when we send them a request,
617    /// because the peer set has exclusive access to send requests to each peer. (If an inbound
618    /// request is in progress, it will be handled, then our request will be sent by the connection
619    /// task.)
620    ///
621    /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no
622    /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error.
623    ///
624    /// # Panics
625    ///
626    /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests
627    /// are sent to peers without putting them in `unready_peers`.
628    fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> {
629        let mut previous = HashMap::new();
630        std::mem::swap(&mut previous, &mut self.ready_services);
631
632        // TODO: consider only checking some peers each poll (for performance reasons),
633        //       but make sure we eventually check all of them.
634        for (key, mut svc) in previous.drain() {
635            let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else {
636                unreachable!(
637                    "unexpected unready peer: peers must be put into the unready_peers list \
638                     after sending them a request"
639                );
640            };
641
642            match peer_readiness {
643                // Still ready, add it back to the list.
644                Ok(()) => {
645                    if self.bans_receiver.borrow().contains_key(&key.ip()) {
646                        debug!(?key, "service ip is banned, dropping service");
647                        std::mem::drop(svc);
648                        continue;
649                    }
650
651                    self.push_ready(false, key, svc)
652                }
653
654                // Ready -> Errored
655                Err(error) => {
656                    debug!(%error, "service failed while ready, dropping service");
657
658                    // Ready services can just be dropped, they don't need any cleanup.
659                    std::mem::drop(svc);
660                }
661            }
662        }
663
664        if self.ready_services.is_empty() {
665            Poll::Pending
666        } else {
667            Poll::Ready(())
668        }
669    }
670
671    /// Returns the number of peer connections Zebra already has with
672    /// the provided IP address
673    ///
674    /// # Performance
675    ///
676    /// This method is `O(connected peers)`, so it should not be called from a loop
677    /// that is already iterating through the peer set.
678    fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
679        self.ready_services
680            .keys()
681            .chain(self.cancel_handles.keys())
682            .filter(|addr| addr.ip() == ip)
683            .count()
684    }
685
686    /// Returns `true` if Zebra is already connected to the IP and port in `addr`.
687    fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
688        self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
689    }
690
691    /// Processes the entire list of newly inserted or removed services.
692    ///
693    /// Puts inserted services in the unready list.
694    /// Drops removed services, after cancelling any pending requests.
695    ///
696    /// If the peer connector channel is closed, returns an error.
697    ///
698    /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't
699    /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`.
700    fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
701        // Return pending if there are no peers in the list.
702        let mut result = Poll::Pending;
703
704        loop {
705            // If we've emptied the list, finish looping, otherwise process the new peer.
706            let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else {
707                break;
708            };
709
710            // If the change channel has a permanent error, return that error.
711            let change = discovered
712                .ok_or("discovery stream closed")?
713                .map_err(Into::into)?;
714
715            // Otherwise we have successfully processed a peer.
716            result = Poll::Ready(Ok(()));
717
718            // Process each change.
719            match change {
720                Change::Remove(key) => {
721                    trace!(?key, "got Change::Remove from Discover");
722                    self.remove(&key);
723                }
724                Change::Insert(key, svc) => {
725                    // We add peers as unready, so that we:
726                    // - always do the same checks on every ready peer, and
727                    // - check for any errors that happened right after the handshake
728                    trace!(?key, "got Change::Insert from Discover");
729
730                    // # Security
731                    //
732                    // Drop the new peer if we are already connected to it.
733                    // Preferring old connections avoids connection thrashing.
734                    if self.has_peer_with_addr(key) {
735                        std::mem::drop(svc);
736                        continue;
737                    }
738
739                    // # Security
740                    //
741                    // drop the new peer if there are already `max_conns_per_ip` peers with
742                    // the same IP address in the peer set.
743                    if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
744                        std::mem::drop(svc);
745                        continue;
746                    }
747
748                    self.push_unready(key, svc);
749                }
750            }
751        }
752
753        result
754    }
755
756    /// Checks if the minimum peer version has changed, and disconnects from outdated peers.
757    fn disconnect_from_outdated_peers(&mut self) {
758        if let Some(minimum_version) = self.minimum_peer_version.changed() {
759            // It is ok to drop ready services, they don't need anything cancelled.
760            self.ready_services
761                .retain(|_address, peer| peer.remote_version() >= minimum_version);
762        }
763    }
764
765    /// Takes a ready service by key.
766    fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
767        if let Some(svc) = self.ready_services.remove(key) {
768            assert!(
769                !self.cancel_handles.contains_key(key),
770                "cancel handles are only used for unready service work"
771            );
772
773            Some(svc)
774        } else {
775            None
776        }
777    }
778
779    /// Drains pending stall/clear events from tracked routing futures and
780    /// disconnects peers that have exceeded the stall threshold. The peer's
781    /// TCP connection is closed when its service is dropped; address book and
782    /// ban list are untouched, so the peer is free to reconnect.
783    fn drain_stall_events(&mut self, cx: &mut Context<'_>) {
784        while let Poll::Ready(Some((addr, outcome))) = self.stall_event_rx.poll_recv(cx) {
785            match outcome {
786                StallOutcome::Stall => {
787                    if self.find_response_stalls.record_stall(addr) {
788                        info!(
789                            ?addr,
790                            "dropping stalled peer: exceeded FindBlocks/FindHeaders stall threshold",
791                        );
792                        self.remove(&addr);
793                    }
794                }
795                StallOutcome::Clear => self.find_response_stalls.clear(addr),
796            }
797        }
798    }
799
800    /// Remove the service corresponding to `key` from the peer set.
801    ///
802    /// Drops the service, cancelling any pending request or response to that peer.
803    /// If the peer does not exist, does nothing.
804    fn remove(&mut self, key: &D::Key) {
805        self.find_response_stalls.clear(*key);
806
807        if let Some(ready_service) = self.take_ready_service(key) {
808            // A ready service has no work to cancel, so just drop it.
809            std::mem::drop(ready_service);
810        } else if let Some(handle) = self.cancel_handles.remove(key) {
811            // Cancel the work, implicitly dropping the cancel handle.
812            // The service future returns a `Canceled` error,
813            // making `poll_unready` drop the service.
814            let _ = handle.send(CancelClientWork);
815        }
816    }
817
818    /// Adds a ready service to the ready list if it's for a peer with a supported version.
819    /// If `was_unready` is true, also removes the peer's cancel handle.
820    ///
821    /// If the service is for a connection to an outdated peer, the service is dropped.
822    fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) {
823        let cancel = self.cancel_handles.remove(&key);
824        assert_eq!(
825            cancel.is_some(),
826            was_unready,
827            "missing or unexpected cancel handle"
828        );
829
830        if svc.remote_version() >= self.minimum_peer_version.current() {
831            self.ready_services.insert(key, svc);
832        } else {
833            std::mem::drop(svc);
834        }
835    }
836
837    /// Adds a busy service to the unready list if it's for a peer with a supported version,
838    /// and adds a cancel handle for the service's current request.
839    ///
840    /// If the service is for a connection to an outdated peer, the request is cancelled and the
841    /// service is dropped.
842    fn push_unready(&mut self, key: D::Key, svc: D::Service) {
843        let peer_version = svc.remote_version();
844        let (tx, rx) = oneshot::channel();
845
846        self.unready_services.push(UnreadyService {
847            key: Some(key),
848            service: Some(svc),
849            cancel: rx,
850            _req: PhantomData,
851        });
852
853        if peer_version >= self.minimum_peer_version.current() {
854            self.cancel_handles.insert(key, tx);
855        } else {
856            // Cancel any request made to the service because it is using an outdated protocol
857            // version.
858            let _ = tx.send(CancelClientWork);
859        }
860    }
861
862    /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
863    fn select_ready_p2c_peer(&self) -> Option<D::Key> {
864        self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect())
865    }
866
867    /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service.
868    #[allow(clippy::unwrap_in_result)]
869    fn select_p2c_peer_from_list(&self, ready_service_list: &HashSet<D::Key>) -> Option<D::Key> {
870        match ready_service_list.len() {
871            0 => None,
872            1 => Some(
873                *ready_service_list
874                    .iter()
875                    .next()
876                    .expect("just checked there is one service"),
877            ),
878            len => {
879                // Choose 2 random peers, then return the least loaded of those 2 peers.
880                let (a, b) = {
881                    let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
882                    let a = idxs.index(0);
883                    let b = idxs.index(1);
884
885                    let a = *ready_service_list
886                        .iter()
887                        .nth(a)
888                        .expect("sample returns valid indexes");
889                    let b = *ready_service_list
890                        .iter()
891                        .nth(b)
892                        .expect("sample returns valid indexes");
893
894                    (a, b)
895                };
896
897                let a_load = self.query_load(&a).expect("supplied services are ready");
898                let b_load = self.query_load(&b).expect("supplied services are ready");
899
900                let selected = if a_load <= b_load { a } else { b };
901
902                trace!(
903                    a.key = ?a,
904                    a.load = ?a_load,
905                    b.key = ?b,
906                    b.load = ?b_load,
907                    selected = ?selected,
908                    ?len,
909                    "selected service by p2c"
910                );
911
912                Some(selected)
913            }
914        }
915    }
916
917    /// Randomly chooses `max_peers` ready services, ignoring service load.
918    ///
919    /// The chosen peers are unique, but their order is not fully random.
920    fn select_random_ready_peers(&self, max_peers: usize) -> Vec<D::Key> {
921        use rand::seq::IteratorRandom;
922
923        self.ready_services
924            .keys()
925            .copied()
926            .choose_multiple(&mut rand::thread_rng(), max_peers)
927    }
928
929    /// Accesses a ready endpoint by `key` and returns its current load.
930    ///
931    /// Returns `None` if the service is not in the ready service list.
932    fn query_load(&self, key: &D::Key) -> Option<<D::Service as Load>::Metric> {
933        let svc = self.ready_services.get(key);
934        svc.map(|svc| svc.load())
935    }
936
937    /// Routes a request using P2C load-balancing.
938    fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
939        if let Some(p2c_key) = self.select_ready_p2c_peer() {
940            tracing::trace!(?p2c_key, "routing based on p2c");
941
942            let mut svc = self
943                .take_ready_service(&p2c_key)
944                .expect("selected peer must be ready");
945
946            let track_stalls = matches!(
947                &req,
948                Request::FindBlocks { .. } | Request::FindHeaders { .. }
949            );
950
951            let fut = svc.call(req);
952            self.push_unready(p2c_key, svc);
953
954            if track_stalls {
955                let stall_tx = self.stall_event_tx.clone();
956                return async move {
957                    let result = fut.await;
958                    if let Some(outcome) = classify_find_response(&result) {
959                        let _ = stall_tx.send((p2c_key, outcome));
960                    }
961                    result.map_err(Into::into)
962                }
963                .boxed();
964            }
965
966            return fut.map_err(Into::into).boxed();
967        }
968
969        async move {
970            // Let other tasks run, so a retry request might get different ready peers.
971            tokio::task::yield_now().await;
972
973            // # Security
974            //
975            // Avoid routing requests to peers that are missing inventory.
976            // If we kept trying doomed requests, peers that are missing our requested inventory
977            // could take up a large amount of our bandwidth and retry limits.
978            Err(SharedPeerError::from(PeerError::NoReadyPeers))
979        }
980        .map_err(Into::into)
981        .boxed()
982    }
983
984    /// Tries to route a request to a ready peer that advertised that inventory,
985    /// falling back to a ready peer that isn't missing the inventory.
986    ///
987    /// If all ready peers are missing the inventory,
988    /// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error.
989    ///
990    /// Uses P2C to route requests to the least loaded peer in each list.
991    fn route_inv(
992        &mut self,
993        req: Request,
994        hash: InventoryHash,
995    ) -> <Self as tower::Service<Request>>::Future {
996        let advertising_peer_list = self
997            .inventory_registry
998            .advertising_peers(hash)
999            .filter(|&addr| self.ready_services.contains_key(addr))
1000            .copied()
1001            .collect();
1002
1003        // # Security
1004        //
1005        // Choose a random, less-loaded peer with the inventory.
1006        //
1007        // If we chose the first peer in HashMap order,
1008        // peers would be able to influence our choice by switching addresses.
1009        // But we need the choice to be random,
1010        // so that a peer can't provide all our inventory responses.
1011        let peer = self.select_p2c_peer_from_list(&advertising_peer_list);
1012
1013        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
1014            let peer = peer.expect("just checked peer is Some");
1015            tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
1016            let fut = svc.call(req);
1017            self.push_unready(peer, svc);
1018            return fut.map_err(Into::into).boxed();
1019        }
1020
1021        let missing_peer_list: HashSet<PeerSocketAddr> = self
1022            .inventory_registry
1023            .missing_peers(hash)
1024            .copied()
1025            .collect();
1026        let maybe_peer_list = self
1027            .ready_services
1028            .keys()
1029            .filter(|addr| !missing_peer_list.contains(addr))
1030            .copied()
1031            .collect();
1032
1033        // Security: choose a random, less-loaded peer that might have the inventory.
1034        let peer = self.select_p2c_peer_from_list(&maybe_peer_list);
1035
1036        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
1037            let peer = peer.expect("just checked peer is Some");
1038            tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
1039            let fut = svc.call(req);
1040            self.push_unready(peer, svc);
1041            return fut.map_err(Into::into).boxed();
1042        }
1043
1044        tracing::debug!(
1045            ?hash,
1046            "all ready peers are missing inventory, failing request"
1047        );
1048
1049        async move {
1050            // Let other tasks run, so a retry request might get different ready peers.
1051            tokio::task::yield_now().await;
1052
1053            // # Security
1054            //
1055            // Avoid routing requests to peers that are missing inventory.
1056            // If we kept trying doomed requests, peers that are missing our requested inventory
1057            // could take up a large amount of our bandwidth and retry limits.
1058            Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
1059                hash,
1060            ])))
1061        }
1062        .map_err(Into::into)
1063        .boxed()
1064    }
1065
1066    /// Routes the same request to up to `max_peers` ready peers, ignoring return values.
1067    ///
1068    /// `max_peers` must be at least one, and at most the number of ready peers.
1069    fn route_multiple(
1070        &mut self,
1071        req: Request,
1072        max_peers: usize,
1073    ) -> <Self as tower::Service<Request>>::Future {
1074        assert!(
1075            max_peers > 0,
1076            "requests must be routed to at least one peer"
1077        );
1078        assert!(
1079            max_peers <= self.ready_services.len(),
1080            "requests can only be routed to ready peers"
1081        );
1082
1083        let selected_peers = self.select_random_ready_peers(max_peers);
1084        self.send_multiple(req, selected_peers)
1085    }
1086
1087    /// Sends the same request to the provided ready peers, ignoring return values.
1088    ///
1089    /// # Security
1090    ///
1091    /// Callers should choose peers randomly, ignoring load.
1092    /// This avoids favouring malicious peers, because peers can influence their own load.
1093    ///
1094    /// The order of peers isn't completely random,
1095    /// but peer request order is not security-sensitive.
1096    fn send_multiple(
1097        &mut self,
1098        req: Request,
1099        peers: Vec<D::Key>,
1100    ) -> <Self as tower::Service<Request>>::Future {
1101        let futs = FuturesUnordered::new();
1102        for key in peers {
1103            let mut svc = self
1104                .take_ready_service(&key)
1105                .expect("selected peers are ready");
1106            futs.push(svc.call(req.clone()).map_err(|_| ()));
1107            self.push_unready(key, svc);
1108        }
1109
1110        async move {
1111            let results = futs.collect::<Vec<Result<_, _>>>().await;
1112            tracing::debug!(
1113                ok.len = results.iter().filter(|r| r.is_ok()).count(),
1114                err.len = results.iter().filter(|r| r.is_err()).count(),
1115                "sent peer request to multiple peers"
1116            );
1117            Ok(Response::Nil)
1118        }
1119        .boxed()
1120    }
1121
1122    /// Broadcasts the same request to lots of ready peers, ignoring return values.
1123    fn route_broadcast(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
1124        // Broadcasts ignore the response
1125        self.route_multiple(req, self.number_of_peers_to_broadcast())
1126    }
1127
1128    /// Broadcasts the same request to all ready peers, ignoring return values.
1129    fn broadcast_all(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
1130        let ready_peers = self.ready_services.keys().copied().collect();
1131        let send_multiple_fut = self.send_multiple(req.clone(), ready_peers);
1132        let Some(mut queued_broadcast_fut_receiver) = self.queue_broadcast_all_unready(&req) else {
1133            return send_multiple_fut;
1134        };
1135
1136        async move {
1137            let _ = send_multiple_fut.await?;
1138            while queued_broadcast_fut_receiver.recv().await.is_some() {}
1139            Ok(Response::Nil)
1140        }
1141        .boxed()
1142    }
1143
1144    /// If there are unready peers, queues a request to be broadcasted to them and
1145    /// returns a channel receiver for callers to await the broadcast_all() futures, or
1146    /// returns None if there are no unready peers.
1147    fn queue_broadcast_all_unready(
1148        &mut self,
1149        req: &Request,
1150    ) -> Option<tokio::sync::mpsc::Receiver<ResponseFuture>> {
1151        if !self.cancel_handles.is_empty() {
1152            /// How many broadcast all futures to send to the channel until the peer set should wait for the channel consumer
1153            /// to read a message before continuing to send the queued broadcast request to peers that were originally unready.
1154            const QUEUED_BROADCAST_FUTS_CHANNEL_SIZE: usize = 3;
1155
1156            let (sender, receiver) = tokio::sync::mpsc::channel(QUEUED_BROADCAST_FUTS_CHANNEL_SIZE);
1157            let unready_peers: HashSet<_> = self.cancel_handles.keys().cloned().collect();
1158            let queued = (req.clone(), sender, unready_peers);
1159
1160            // Drop the existing queued broadcast all request, if any.
1161            self.queued_broadcast_all = Some(queued);
1162
1163            Some(receiver)
1164        } else {
1165            None
1166        }
1167    }
1168
1169    /// Broadcasts the same requests to all ready peers which were unready when
1170    /// [`PeerSet::broadcast_all()`] was last called, ignoring return values.
1171    fn broadcast_all_queued(&mut self) {
1172        let Some((req, sender, mut remaining_peers)) = self.queued_broadcast_all.take() else {
1173            return;
1174        };
1175
1176        let bans = self.bans_receiver.borrow().clone();
1177        remaining_peers.retain(|addr| !bans.contains_key(&addr.ip()));
1178
1179        let Ok(reserved_send_slot) = sender.try_reserve() else {
1180            self.queued_broadcast_all = Some((req, sender, remaining_peers));
1181            return;
1182        };
1183
1184        let peers: Vec<_> = self
1185            .ready_services
1186            .keys()
1187            .filter(|ready_peer| remaining_peers.remove(ready_peer))
1188            .copied()
1189            .collect();
1190
1191        reserved_send_slot.send(self.send_multiple(req.clone(), peers).boxed());
1192
1193        if !remaining_peers.is_empty() {
1194            self.queued_broadcast_all = Some((req, sender, remaining_peers));
1195        }
1196    }
1197
1198    /// Given a number of ready peers calculate to how many of them Zebra will
1199    /// actually send the request to. Return this number.
1200    pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
1201        if self.network.is_regtest() {
1202            // In regtest, we broadcast to all peers, so that we can test the
1203            // peer set with a small number of peers.
1204            self.ready_services.len()
1205        } else {
1206            // We are currently sending broadcast messages to a third of the total peers.
1207            const PEER_FRACTION_TO_BROADCAST: usize = 3;
1208
1209            // Round up, so that if we have one ready peer, it gets the request.
1210            div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST)
1211        }
1212    }
1213
1214    /// Returns the list of addresses in the peer set.
1215    fn peer_set_addresses(&self) -> Vec<PeerSocketAddr> {
1216        self.ready_services
1217            .keys()
1218            .chain(self.cancel_handles.keys())
1219            .cloned()
1220            .collect()
1221    }
1222
1223    /// Logs the peer set size, and any potential connectivity issues.
1224    fn log_peer_set_size(&mut self) {
1225        let ready_services_len = self.ready_services.len();
1226        let unready_services_len = self.unready_services.len();
1227        trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len);
1228
1229        let now = Instant::now();
1230
1231        // These logs are designed to be human-readable in a terminal, at the
1232        // default Zebra log level. If you need to know the peer set size for
1233        // every request, use the trace-level logs, or the metrics exporter.
1234        if let Some(last_peer_log) = self.last_peer_log {
1235            // Avoid duplicate peer set logs
1236            if now.duration_since(last_peer_log) < MIN_PEER_SET_LOG_INTERVAL {
1237                return;
1238            }
1239        } else {
1240            // Suppress initial logs until the peer set has started up.
1241            // There can be multiple initial requests before the first peer is
1242            // ready.
1243            self.last_peer_log = Some(now);
1244            return;
1245        }
1246
1247        self.last_peer_log = Some(now);
1248
1249        // Log potential duplicate connections.
1250        let peers = self.peer_set_addresses();
1251
1252        // Check for duplicates by address and port: these are unexpected and represent a bug.
1253        let duplicates: Vec<PeerSocketAddr> = peers.iter().duplicates().cloned().collect();
1254
1255        let mut peer_counts = peers.iter().counts();
1256        peer_counts.retain(|peer, _count| duplicates.contains(peer));
1257
1258        if !peer_counts.is_empty() {
1259            let duplicate_connections: usize = peer_counts.values().sum();
1260
1261            warn!(
1262                ?duplicate_connections,
1263                duplicated_peers = ?peer_counts.len(),
1264                peers = ?peers.len(),
1265                "duplicate peer connections in peer set"
1266            );
1267        }
1268
1269        // Check for duplicates by address: these can happen if there are multiple nodes
1270        // behind a NAT or on a single server.
1271        let peers: Vec<IpAddr> = peers.iter().map(|addr| addr.ip()).collect();
1272        let duplicates: Vec<IpAddr> = peers.iter().duplicates().cloned().collect();
1273
1274        let mut peer_counts = peers.iter().counts();
1275        peer_counts.retain(|peer, _count| duplicates.contains(peer));
1276
1277        if !peer_counts.is_empty() {
1278            let duplicate_connections: usize = peer_counts.values().sum();
1279
1280            info!(
1281                ?duplicate_connections,
1282                duplicated_peers = ?peer_counts.len(),
1283                peers = ?peers.len(),
1284                "duplicate IP addresses in peer set"
1285            );
1286        }
1287
1288        // Only log connectivity warnings if all our peers are busy (or there are no peers).
1289        if ready_services_len > 0 {
1290            return;
1291        }
1292
1293        let address_metrics = *self.address_metrics.borrow();
1294        if unready_services_len == 0 {
1295            warn!(
1296                ?address_metrics,
1297                "network request with no peer connections. Hint: check your network connection"
1298            );
1299        } else {
1300            info!(?address_metrics, "network request with no ready peers: finding more peers, waiting for {} peers to answer requests",
1301                  unready_services_len);
1302        }
1303    }
1304
1305    /// Updates the peer set metrics.
1306    ///
1307    /// # Panics
1308    ///
1309    /// If the peer set size exceeds the connection limit.
1310    fn update_metrics(&self) {
1311        let num_ready = self.ready_services.len();
1312        let num_unready = self.unready_services.len();
1313        let num_peers = num_ready + num_unready;
1314        metrics::gauge!("pool.num_ready").set(num_ready as f64);
1315        metrics::gauge!("pool.num_unready").set(num_unready as f64);
1316        metrics::gauge!("zcash.net.peers").set(num_peers as f64);
1317
1318        // Security: make sure we haven't exceeded the connection limit
1319        if num_peers > self.peerset_total_connection_limit {
1320            let address_metrics = *self.address_metrics.borrow();
1321            panic!(
1322                "unexpectedly exceeded configured peer set connection limit: \n\
1323                 peers: {num_peers:?}, ready: {num_ready:?}, unready: {num_unready:?}, \n\
1324                 address_metrics: {address_metrics:?}",
1325            );
1326        }
1327    }
1328}
1329
1330impl<D, C> Service<Request> for PeerSet<D, C>
1331where
1332    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
1333    D::Error: Into<BoxError>,
1334    C: ChainTip,
1335{
1336    type Response = Response;
1337    type Error = BoxError;
1338    type Future =
1339        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1340
1341    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1342        // Update service and peer statuses.
1343        //
1344        // # Correctness
1345        //
1346        // All of the futures that receive a context from this method can wake the peer set buffer
1347        // task. If there are no ready peers, and no new peers, network requests will pause until:
1348        // - an unready peer becomes ready, or
1349        // - a new peer arrives.
1350
1351        // Drain stall events first, so disconnects free up slots that
1352        // `poll_discover` can fill in the same poll cycle.
1353        self.drain_stall_events(cx);
1354
1355        // Check for new peers, and register a task wakeup when the next new peers arrive. New peers
1356        // can be infrequent if our connection slots are full, or we're connected to all
1357        // available/useful peers.
1358        let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?;
1359
1360        // These tasks don't provide new peers or newly ready peers.
1361        let _poll_pending: Poll<()> = self.poll_background_errors(cx)?;
1362        let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?;
1363
1364        let ready_peers = self.poll_peers(cx)?;
1365
1366        // These metrics should run last, to report the most up-to-date information.
1367        self.log_peer_set_size();
1368        self.update_metrics();
1369
1370        if ready_peers.is_pending() {
1371            // # Correctness
1372            //
1373            // If the channel is full, drop the demand signal rather than waiting. If we waited
1374            // here, the crawler could deadlock sending a request to fetch more peers, because it
1375            // also empties the channel.
1376            trace!("no ready services, sending demand signal");
1377            let _ = self.demand_signal.try_send(MorePeers);
1378
1379            // # Correctness
1380            //
1381            // The current task must be scheduled for wakeup every time we return `Poll::Pending`.
1382            //
1383            // As long as there are unready or new peers, this task will run, because:
1384            // - `poll_discover` schedules this task for wakeup when new peers arrive.
1385            // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this
1386            //   task for wakeup when peer services become ready.
1387            //
1388            // To avoid peers blocking on a full peer status/error channel:
1389            // - `poll_background_errors` schedules this task for wakeup when the peer status
1390            //   update task exits.
1391            return Poll::Pending;
1392        }
1393
1394        self.broadcast_all_queued();
1395
1396        if self.ready_services.is_empty() {
1397            self.poll_peers(cx)
1398        } else {
1399            Poll::Ready(Ok(()))
1400        }
1401    }
1402
1403    fn call(&mut self, req: Request) -> Self::Future {
1404        let fut = match req {
1405            // Only do inventory-aware routing on individual items.
1406            Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
1407                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1408                self.route_inv(req, hash)
1409            }
1410            Request::TransactionsById(ref hashes) if hashes.len() == 1 => {
1411                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1412                self.route_inv(req, hash)
1413            }
1414
1415            // Broadcast advertisements to lots of peers
1416            Request::AdvertiseTransactionIds(_, _) => self.route_broadcast(req),
1417            Request::AdvertiseBlock(_, _) => self.route_broadcast(req),
1418            Request::AdvertiseBlockToAll(_) => self.broadcast_all(req),
1419
1420            // Choose a random less-loaded peer for all other requests
1421            _ => self.route_p2c(req),
1422        };
1423        self.update_metrics();
1424
1425        fut
1426    }
1427}