zebra_network/peer/connection.rs
1//! Zebra's per-peer connection state machine.
2//!
3//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
4//! protocol.
5//!
6//! This module contains a lot of undocumented state, assumptions and invariants.
7//! And it's unclear if these assumptions match the `zcashd` implementation.
8//! It should be refactored into a cleaner set of request/response pairs (#1515).
9
10use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};
11
12use futures::{future::Either, prelude::*};
13use rand::{seq::SliceRandom, thread_rng, Rng};
14use tokio::time::{sleep, Sleep};
15use tower::{Service, ServiceExt};
16use tracing_futures::Instrument;
17
18use zebra_chain::{
19 block::{self, Block},
20 serialization::SerializationError,
21 transaction::{UnminedTx, UnminedTxId},
22};
23
24use crate::{
25 constants::{
26 self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
27 OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT,
28 },
29 meta_addr::MetaAddr,
30 peer::{
31 connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
32 ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
33 SharedPeerError,
34 },
35 peer_set::ConnectionTracker,
36 protocol::{
37 external::{types::Nonce, InventoryHash, Message},
38 internal::{InventoryResponse, Request, Response},
39 },
40 BoxError, PeerSocketAddr, MAX_TX_INV_IN_SENT_MESSAGE,
41};
42
43use InventoryResponse::*;
44
45mod peer_tx;
46
47#[cfg(test)]
48mod tests;
49
50#[derive(Debug)]
51pub(super) enum Handler {
52 /// Indicates that the handler has finished processing the request.
53 /// An error here is scoped to the request.
54 Finished(Result<Response, PeerError>),
55 Ping {
56 nonce: Nonce,
57 ping_sent_at: Instant,
58 },
59 Peers,
60 FindBlocks,
61 FindHeaders,
62 BlocksByHash {
63 pending_hashes: HashSet<block::Hash>,
64 blocks: Vec<Arc<Block>>,
65 },
66 TransactionsById {
67 pending_ids: HashSet<UnminedTxId>,
68 transactions: Vec<UnminedTx>,
69 },
70 MempoolTransactionIds,
71}
72
73impl fmt::Display for Handler {
74 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75 f.write_str(&match self {
76 Handler::Finished(Ok(response)) => format!("Finished({response})"),
77 Handler::Finished(Err(error)) => format!("Finished({error})"),
78
79 Handler::Ping { .. } => "Ping".to_string(),
80 Handler::Peers => "Peers".to_string(),
81
82 Handler::FindBlocks => "FindBlocks".to_string(),
83 Handler::FindHeaders => "FindHeaders".to_string(),
84 Handler::BlocksByHash {
85 pending_hashes,
86 blocks,
87 } => format!(
88 "BlocksByHash {{ pending_hashes: {}, blocks: {} }}",
89 pending_hashes.len(),
90 blocks.len()
91 ),
92
93 Handler::TransactionsById {
94 pending_ids,
95 transactions,
96 } => format!(
97 "TransactionsById {{ pending_ids: {}, transactions: {} }}",
98 pending_ids.len(),
99 transactions.len()
100 ),
101 Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
102 })
103 }
104}
105
106impl Handler {
107 /// Returns the Zebra internal handler type as a string.
108 pub fn command(&self) -> Cow<'static, str> {
109 match self {
110 Handler::Finished(Ok(response)) => format!("Finished({})", response.command()).into(),
111 Handler::Finished(Err(error)) => format!("Finished({})", error.kind()).into(),
112
113 Handler::Ping { .. } => "Ping".into(),
114 Handler::Peers => "Peers".into(),
115
116 Handler::FindBlocks => "FindBlocks".into(),
117 Handler::FindHeaders => "FindHeaders".into(),
118
119 Handler::BlocksByHash { .. } => "BlocksByHash".into(),
120 Handler::TransactionsById { .. } => "TransactionsById".into(),
121
122 Handler::MempoolTransactionIds => "MempoolTransactionIds".into(),
123 }
124 }
125
126 /// Try to handle `msg` as a response to a client request, possibly consuming
127 /// it in the process.
128 ///
129 /// This function is where we statefully interpret Bitcoin/Zcash messages
130 /// into responses to messages in the internal request/response protocol.
131 /// This conversion is done by a sequence of (request, message) match arms,
132 /// each of which contains the conversion logic for that pair.
133 ///
134 /// Taking ownership of the message means that we can pass ownership of its
135 /// contents to responses without additional copies. If the message is not
136 /// interpretable as a response, we return ownership to the caller.
137 ///
138 /// Unexpected messages are left unprocessed, and may be rejected later.
139 ///
140 /// `addr` responses are limited to avoid peer set takeover. Any excess
141 /// addresses are stored in `cached_addrs`.
142 fn process_message(
143 &mut self,
144 msg: Message,
145 cached_addrs: &mut Vec<MetaAddr>,
146 transient_addr: Option<PeerSocketAddr>,
147 ) -> Option<Message> {
148 let mut ignored_msg = None;
149 // TODO: can this be avoided?
150 let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
151
152 debug!(handler = %tmp_state, %msg, "received peer response to Zebra request");
153
154 *self = match (tmp_state, msg) {
155 (
156 Handler::Ping {
157 nonce,
158 ping_sent_at,
159 },
160 Message::Pong(rsp_nonce),
161 ) => {
162 if nonce == rsp_nonce {
163 let duration = ping_sent_at.elapsed();
164 Handler::Finished(Ok(Response::Pong(duration)))
165 } else {
166 Handler::Ping {
167 nonce,
168 ping_sent_at,
169 }
170 }
171 }
172
173 (Handler::Peers, Message::Addr(new_addrs)) => {
174 // Security: This method performs security-sensitive operations, see its comments
175 // for details.
176 let response_addrs =
177 Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT);
178
179 debug!(
180 new_addrs = new_addrs.len(),
181 response_addrs = response_addrs.len(),
182 remaining_addrs = cached_addrs.len(),
183 PEER_ADDR_RESPONSE_LIMIT,
184 "responding to Peers request using new and cached addresses",
185 );
186
187 Handler::Finished(Ok(Response::Peers(response_addrs)))
188 }
189
190 // `zcashd` returns requested transactions in a single batch of messages.
191 // Other transaction or non-transaction messages can come before or after the batch.
192 // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
193 // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617
194 (
195 Handler::TransactionsById {
196 mut pending_ids,
197 mut transactions,
198 },
199 Message::Tx(transaction),
200 ) => {
201 // assumptions:
202 // - the transaction messages are sent in a single continuous batch
203 // - missing transactions are silently skipped
204 // (there is no `notfound` message at the end of the batch)
205 if pending_ids.remove(&transaction.id) {
206 // we are in the middle of the continuous transaction messages
207 transactions.push(transaction);
208 } else {
209 // We got a transaction we didn't ask for. If the caller doesn't know any of the
210 // transactions, they should have sent a `notfound` with all the hashes, rather
211 // than an unsolicited transaction.
212 //
213 // So either:
214 // 1. The peer implements the protocol badly, skipping `notfound`.
215 // We should cancel the request, so we don't hang waiting for transactions
216 // that will never arrive.
217 // 2. The peer sent an unsolicited transaction.
218 // We should ignore the transaction, and wait for the actual response.
219 //
220 // We end the request, so we don't hang on bad peers (case 1). But we keep the
221 // connection open, so the inbound service can process transactions from good
222 // peers (case 2).
223 ignored_msg = Some(Message::Tx(transaction));
224 }
225
226 if ignored_msg.is_some() && transactions.is_empty() {
227 // If we didn't get anything we wanted, retry the request.
228 let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
229 Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
230 } else if pending_ids.is_empty() || ignored_msg.is_some() {
231 // If we got some of what we wanted, let the internal client know.
232 let available = transactions
233 .into_iter()
234 .map(|t| InventoryResponse::Available((t, transient_addr)));
235 let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
236
237 Handler::Finished(Ok(Response::Transactions(
238 available.chain(missing).collect(),
239 )))
240 } else {
241 // Keep on waiting for more.
242 Handler::TransactionsById {
243 pending_ids,
244 transactions,
245 }
246 }
247 }
248 // `zcashd` peers actually return this response
249 (
250 Handler::TransactionsById {
251 pending_ids,
252 transactions,
253 },
254 Message::NotFound(missing_invs),
255 ) => {
256 // assumptions:
257 // - the peer eventually returns a transaction or a `notfound` entry
258 // for each hash
259 // - all `notfound` entries are contained in a single message
260 // - the `notfound` message comes after the transaction messages
261 //
262 // If we're in sync with the peer, then the `notfound` should contain the remaining
263 // hashes from the handler. If we're not in sync with the peer, we should return
264 // what we got so far.
265 let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
266 if missing_transaction_ids != pending_ids {
267 trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
268 // if these errors are noisy, we should replace them with debugs
269 debug!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
270 }
271 if missing_transaction_ids.len() != missing_invs.len() {
272 trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
273 debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
274 }
275
276 if transactions.is_empty() {
277 // If we didn't get anything we wanted, retry the request.
278 let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
279 Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
280 } else {
281 // If we got some of what we wanted, let the internal client know.
282 let available = transactions
283 .into_iter()
284 .map(|t| InventoryResponse::Available((t, transient_addr)));
285 let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
286
287 Handler::Finished(Ok(Response::Transactions(
288 available.chain(missing).collect(),
289 )))
290 }
291 }
292
293 // `zcashd` returns requested blocks in a single batch of messages.
294 // Other blocks or non-blocks messages can come before or after the batch.
295 // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
296 // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
297 (
298 Handler::BlocksByHash {
299 mut pending_hashes,
300 mut blocks,
301 },
302 Message::Block(block),
303 ) => {
304 // assumptions:
305 // - the block messages are sent in a single continuous batch
306 // - missing blocks are silently skipped
307 // (there is no `notfound` message at the end of the batch)
308 if pending_hashes.remove(&block.hash()) {
309 // we are in the middle of the continuous block messages
310 blocks.push(block);
311 } else {
312 // We got a block we didn't ask for.
313 //
314 // So either:
315 // 1. The response is for a previously cancelled block request.
316 // We should treat that block as an inbound gossiped block,
317 // and wait for the actual response.
318 // 2. The peer doesn't know any of the blocks we asked for.
319 // We should cancel the request, so we don't hang waiting for blocks that
320 // will never arrive.
321 // 3. The peer sent an unsolicited block.
322 // We should treat that block as an inbound gossiped block,
323 // and wait for the actual response.
324 //
325 // We ignore the message, so we don't desynchronize with the peer. This happens
326 // when we cancel a request and send a second different request, but receive a
327 // response for the first request. If we ended the request then, we could send
328 // a third request to the peer, and end up having to end that request as well
329 // when the response for the second request arrives.
330 //
331 // Ignoring the message gives us a chance to synchronize back to the correct
332 // request. If that doesn't happen, this request times out.
333 //
334 // In case 2, if peers respond with a `notfound` message,
335 // the cascading errors don't happen. The `notfound` message cancels our request,
336 // and we know we are in sync with the peer.
337 //
338 // Zebra sends `notfound` in response to block requests, but `zcashd` doesn't.
339 // So we need this message workaround, and the related inventory workarounds.
340 ignored_msg = Some(Message::Block(block));
341 }
342
343 if pending_hashes.is_empty() {
344 // If we got everything we wanted, let the internal client know.
345 let available = blocks
346 .into_iter()
347 .map(|block| InventoryResponse::Available((block, transient_addr)));
348 Handler::Finished(Ok(Response::Blocks(available.collect())))
349 } else {
350 // Keep on waiting for all the blocks we wanted, until we get them or time out.
351 Handler::BlocksByHash {
352 pending_hashes,
353 blocks,
354 }
355 }
356 }
357 // peers are allowed to return this response, but `zcashd` never does
358 (
359 Handler::BlocksByHash {
360 pending_hashes,
361 blocks,
362 },
363 Message::NotFound(missing_invs),
364 ) => {
365 // assumptions:
366 // - the peer eventually returns a block or a `notfound` entry
367 // for each hash
368 // - all `notfound` entries are contained in a single message
369 // - the `notfound` message comes after the block messages
370 //
371 // If we're in sync with the peer, then the `notfound` should contain the remaining
372 // hashes from the handler. If we're not in sync with the peer, we should return
373 // what we got so far, and log an error.
374 let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
375 if missing_blocks != pending_hashes {
376 trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
377 // if these errors are noisy, we should replace them with debugs
378 debug!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
379 }
380 if missing_blocks.len() != missing_invs.len() {
381 trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
382 debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
383 }
384
385 if blocks.is_empty() {
386 // If we didn't get anything we wanted, retry the request.
387 let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
388 Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
389 } else {
390 // If we got some of what we wanted, let the internal client know.
391 let available = blocks
392 .into_iter()
393 .map(|block| InventoryResponse::Available((block, transient_addr)));
394 let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
395
396 Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
397 }
398 }
399
400 // TODO:
401 // - use `any(inv)` rather than `all(inv)`?
402 (Handler::FindBlocks, Message::Inv(items))
403 if items
404 .iter()
405 .all(|item| matches!(item, InventoryHash::Block(_))) =>
406 {
407 Handler::Finished(Ok(Response::BlockHashes(
408 block_hashes(&items[..]).collect(),
409 )))
410 }
411 (Handler::FindHeaders, Message::Headers(headers)) => {
412 Handler::Finished(Ok(Response::BlockHeaders(headers)))
413 }
414
415 (Handler::MempoolTransactionIds, Message::Inv(items))
416 if items.iter().all(|item| item.unmined_tx_id().is_some()) =>
417 {
418 Handler::Finished(Ok(Response::TransactionIds(
419 transaction_ids(&items).collect(),
420 )))
421 }
422
423 // By default, messages are not responses.
424 (state, msg) => {
425 trace!(?msg, "did not interpret message as response");
426 ignored_msg = Some(msg);
427 state
428 }
429 };
430
431 ignored_msg
432 }
433
434 /// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size`
435 /// addresses from that cache.
436 ///
437 /// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if
438 /// there are no new addresses. `response_size` can be zero or `None` if there is no response
439 /// needed.
440 fn update_addr_cache<'new>(
441 cached_addrs: &mut Vec<MetaAddr>,
442 new_addrs: impl IntoIterator<Item = &'new MetaAddr>,
443 response_size: impl Into<Option<usize>>,
444 ) -> Vec<MetaAddr> {
445 // # Peer Set Reliability
446 //
447 // Newly received peers are added to the cache, so that we can use them if the connection
448 // doesn't respond to our getaddr requests.
449 //
450 // Add the new addresses to the end of the cache.
451 cached_addrs.extend(new_addrs);
452
453 // # Security
454 //
455 // We limit how many peer addresses we take from each peer, so that our address book
456 // and outbound connections aren't controlled by a single peer (#1869). We randomly select
457 // peers, so the remote peer can't control which addresses we choose by changing the order
458 // in the messages they send.
459 let response_size = response_size.into().unwrap_or_default();
460
461 let mut temp_cache = Vec::new();
462 std::mem::swap(cached_addrs, &mut temp_cache);
463
464 // The response is fully shuffled, remaining is partially shuffled.
465 let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size);
466
467 // # Security
468 //
469 // The cache size is limited to avoid memory denial of service.
470 //
471 // It's ok to just partially shuffle the cache, because it doesn't actually matter which
472 // peers we drop. Having excess peers is rare, because most peers only send one large
473 // unsolicited peer message when they first connect.
474 *cached_addrs = remaining.to_vec();
475 cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE);
476
477 response.to_vec()
478 }
479}
480
481#[derive(Debug)]
482#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
483pub(super) enum State {
484 /// Awaiting a client request or a peer message.
485 AwaitingRequest,
486 /// Awaiting a peer message we can interpret as a response to a client request.
487 AwaitingResponse {
488 handler: Handler,
489 tx: MustUseClientResponseSender,
490 span: tracing::Span,
491 },
492 /// A failure has occurred and we are shutting down the connection.
493 Failed,
494}
495
496impl fmt::Display for State {
497 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
498 f.write_str(&match self {
499 State::AwaitingRequest => "AwaitingRequest".to_string(),
500 State::AwaitingResponse { handler, .. } => {
501 format!("AwaitingResponse({handler})")
502 }
503 State::Failed => "Failed".to_string(),
504 })
505 }
506}
507
508impl State {
509 /// Returns the Zebra internal state as a string.
510 pub fn command(&self) -> Cow<'static, str> {
511 match self {
512 State::AwaitingRequest => "AwaitingRequest".into(),
513 State::AwaitingResponse { handler, .. } => {
514 format!("AwaitingResponse({})", handler.command()).into()
515 }
516 State::Failed => "Failed".into(),
517 }
518 }
519}
520
521/// The outcome of mapping an inbound [`Message`] to a [`Request`].
522#[derive(Clone, Debug, Eq, PartialEq)]
523#[must_use = "inbound messages must be handled"]
524pub enum InboundMessage {
525 /// The message was mapped to an inbound [`Request`].
526 AsRequest(Request),
527
528 /// The message was consumed by the mapping method.
529 ///
530 /// For example, it could be cached, treated as an error,
531 /// or an internally handled [`Message::Ping`].
532 Consumed,
533
534 /// The message was not used by the inbound message handler.
535 Unused,
536}
537
538impl From<Request> for InboundMessage {
539 fn from(request: Request) -> Self {
540 InboundMessage::AsRequest(request)
541 }
542}
543
544/// The channels, services, and associated state for a peer connection.
545pub struct Connection<S, Tx>
546where
547 Tx: Sink<Message, Error = SerializationError> + Unpin,
548{
549 /// The metadata for the connected peer `service`.
550 ///
551 /// This field is used for debugging.
552 pub connection_info: Arc<ConnectionInfo>,
553
554 /// The state of this connection's current request or response.
555 pub(super) state: State,
556
557 /// A timeout for a client request. This is stored separately from
558 /// State so that we can move the future out of it independently of
559 /// other state handling.
560 pub(super) request_timer: Option<Pin<Box<Sleep>>>,
561
562 /// Unused peers from recent `addr` or `addrv2` messages from this peer.
563 /// Also holds the initial addresses sent in `version` messages, or guessed from the remote IP.
564 ///
565 /// When peers send solicited or unsolicited peer advertisements, Zebra puts them in this cache.
566 ///
567 /// When Zebra's components request peers, some cached peers are randomly selected,
568 /// consumed, and returned as a modified response. This works around `zcashd`'s address
569 /// response rate-limit.
570 ///
571 /// The cache size is limited to avoid denial of service attacks.
572 pub(super) cached_addrs: Vec<MetaAddr>,
573
574 /// The `inbound` service, used to answer requests from this connection's peer.
575 pub(super) svc: S,
576
577 /// A channel for requests that Zebra's internal services want to send to remote peers.
578 ///
579 /// This channel accepts [`Request`]s, and produces [`InProgressClientRequest`]s.
580 pub(super) client_rx: ClientRequestReceiver,
581
582 /// A slot for an error shared between the Connection and the Client that uses it.
583 ///
584 /// `None` unless the connection or client have errored.
585 pub(super) error_slot: ErrorSlot,
586
587 /// A channel for sending Zcash messages to the connected peer.
588 ///
589 /// This channel accepts [`Message`]s.
590 ///
591 /// The corresponding peer message receiver is passed to [`Connection::run`].
592 pub(super) peer_tx: PeerTx<Tx>,
593
594 /// A connection tracker that reduces the open connection count when dropped.
595 /// Used to limit the number of open connections in Zebra.
596 ///
597 /// This field does nothing until it is dropped.
598 ///
599 /// # Security
600 ///
601 /// If this connection tracker or `Connection`s are leaked,
602 /// the number of active connections will appear higher than it actually is.
603 /// If enough connections leak, Zebra will stop making new connections.
604 #[allow(dead_code)]
605 pub(super) connection_tracker: ConnectionTracker,
606
607 /// The metrics label for this peer. Usually the remote IP and port.
608 pub(super) metrics_label: String,
609
610 /// The state for this peer, when the metrics were last updated.
611 pub(super) last_metrics_state: Option<Cow<'static, str>>,
612
613 /// The time of the last overload error response from the inbound
614 /// service to a request from this connection,
615 /// or None if this connection hasn't yet received an overload error.
616 last_overload_time: Option<Instant>,
617}
618
619impl<S, Tx> fmt::Debug for Connection<S, Tx>
620where
621 Tx: Sink<Message, Error = SerializationError> + Unpin,
622{
623 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
624 // skip the channels, they don't tell us anything useful
625 f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
626 .field("connection_info", &self.connection_info)
627 .field("state", &self.state)
628 .field("request_timer", &self.request_timer)
629 .field("cached_addrs", &self.cached_addrs.len())
630 .field("error_slot", &self.error_slot)
631 .field("metrics_label", &self.metrics_label)
632 .field("last_metrics_state", &self.last_metrics_state)
633 .field("last_overload_time", &self.last_overload_time)
634 .finish()
635 }
636}
637
638impl<S, Tx> Connection<S, Tx>
639where
640 Tx: Sink<Message, Error = SerializationError> + Unpin,
641{
642 /// Return a new connection from its channels, services, shared state, and metadata.
643 pub(crate) fn new(
644 inbound_service: S,
645 client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
646 error_slot: ErrorSlot,
647 peer_tx: Tx,
648 connection_tracker: ConnectionTracker,
649 connection_info: Arc<ConnectionInfo>,
650 initial_cached_addrs: Vec<MetaAddr>,
651 ) -> Self {
652 let metrics_label = connection_info.connected_addr.get_transient_addr_label();
653
654 Connection {
655 connection_info,
656 state: State::AwaitingRequest,
657 request_timer: None,
658 cached_addrs: initial_cached_addrs,
659 svc: inbound_service,
660 client_rx: client_rx.into(),
661 error_slot,
662 peer_tx: peer_tx.into(),
663 connection_tracker,
664 metrics_label,
665 last_metrics_state: None,
666 last_overload_time: None,
667 }
668 }
669}
670
671impl<S, Tx> Connection<S, Tx>
672where
673 S: Service<Request, Response = Response, Error = BoxError>,
674 S::Error: Into<BoxError>,
675 Tx: Sink<Message, Error = SerializationError> + Unpin,
676{
677 /// Consume this `Connection` to form a spawnable future containing its event loop.
678 ///
679 /// `peer_rx` is a channel for receiving Zcash [`Message`]s from the connected peer.
680 /// The corresponding peer message receiver is [`Connection::peer_tx`].
681 pub async fn run<Rx>(mut self, mut peer_rx: Rx)
682 where
683 Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
684 {
685 // At a high level, the event loop we want is as follows: we check for any
686 // incoming messages from the remote peer, check if they should be interpreted
687 // as a response to a pending client request, and if not, interpret them as a
688 // request from the remote peer to our node.
689 //
690 // We also need to handle those client requests in the first place. The client
691 // requests are received from the corresponding `peer::Client` over a bounded
692 // channel (with bound 1, to minimize buffering), but there is no relationship
693 // between the stream of client requests and the stream of peer messages, so we
694 // cannot ignore one kind while waiting on the other. Moreover, we cannot accept
695 // a second client request while the first one is still pending.
696 //
697 // To do this, we inspect the current request state.
698 //
699 // If there is no pending request, we wait on either an incoming peer message or
700 // an incoming request, whichever comes first.
701 //
702 // If there is a pending request, we wait only on an incoming peer message, and
703 // check whether it can be interpreted as a response to the pending request.
704 //
705 // TODO: turn this comment into a module-level comment, after splitting the module.
706 loop {
707 self.update_state_metrics(None);
708
709 match self.state {
710 State::AwaitingRequest => {
711 trace!("awaiting client request or peer message");
712 // # Correctness
713 //
714 // Currently, select prefers the first future if multiple futures are ready.
715 // We use this behaviour to prioritise messages on each individual peer
716 // connection in this order:
717 // - incoming messages from the remote peer, then
718 // - outgoing messages to the remote peer.
719 //
720 // This improves the performance of peer responses to Zebra requests, and new
721 // peer requests to Zebra's inbound service.
722 //
723 // `futures::StreamExt::next()` is cancel-safe:
724 // <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
725 // This means that messages from the future that isn't selected stay in the stream,
726 // and they will be returned next time the future is checked.
727 //
728 // If an inbound peer message arrives at a ready peer that also has a pending
729 // request from Zebra, we want to process the peer's message first.
730 // If we process the Zebra request first:
731 // - we could misinterpret the inbound peer message as a response to the Zebra
732 // request, or
733 // - if the peer message is a request to Zebra, and we put the peer in the
734 // AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
735 // request. (Zebra services make multiple requests or retry, so this is ok.)
736 //
737 // # Security
738 //
739 // If a peer sends an uninterrupted series of messages, it will delay any new
740 // requests from Zebra to that individual peer. This is behaviour we want,
741 // because:
742 // - any responses to Zebra's requests to that peer would be slow or timeout,
743 // - the peer will eventually fail a Zebra keepalive check and get disconnected,
744 // - if there are too many inbound messages overall, the inbound service will
745 // return an overload error and the peer will be disconnected.
746 //
747 // Messages to other peers will continue to be processed concurrently. Some
748 // Zebra services might be temporarily delayed until the peer times out, if a
749 // request to that peer is sent by the service, and the service blocks until
750 // the request completes (or times out).
751 match future::select(peer_rx.next(), self.client_rx.next()).await {
752 Either::Left((None, _)) => {
753 self.fail_with(PeerError::ConnectionClosed).await;
754 }
755 Either::Left((Some(Err(e)), _)) => self.fail_with(e).await,
756 Either::Left((Some(Ok(msg)), _)) => {
757 let unhandled_msg = self.handle_message_as_request(msg).await;
758
759 if let Some(unhandled_msg) = unhandled_msg {
760 debug!(
761 %unhandled_msg,
762 "ignoring unhandled request while awaiting a request"
763 );
764 }
765 }
766 Either::Right((None, _)) => {
767 trace!("client_rx closed, ending connection");
768
769 // There are no requests to be flushed,
770 // but we need to set an error and update metrics.
771 // (We don't want to log this error, because it's normal behaviour.)
772 self.shutdown_async(PeerError::ClientDropped).await;
773 break;
774 }
775 Either::Right((Some(req), _)) => {
776 let span = req.span.clone();
777 self.handle_client_request(req).instrument(span).await
778 }
779 }
780 }
781
782 // Check whether the handler is finished before waiting for a response message,
783 // because the response might be `Nil` or synthetic.
784 State::AwaitingResponse {
785 handler: Handler::Finished(_),
786 ref span,
787 ..
788 } => {
789 // We have to get rid of the span reference so we can tamper with the state.
790 let span = span.clone();
791 trace!(
792 parent: &span,
793 "returning completed response to client request"
794 );
795
796 // Replace the state with a temporary value,
797 // so we can take ownership of the response sender.
798 let tmp_state = std::mem::replace(&mut self.state, State::Failed);
799
800 if let State::AwaitingResponse {
801 handler: Handler::Finished(response),
802 tx,
803 ..
804 } = tmp_state
805 {
806 if let Ok(response) = response.as_ref() {
807 debug!(%response, "finished receiving peer response to Zebra request");
808 // Add a metric for inbound responses to outbound requests.
809 metrics::counter!(
810 "zebra.net.in.responses",
811 "command" => response.command(),
812 "addr" => self.metrics_label.clone(),
813 )
814 .increment(1);
815 } else {
816 debug!(error = ?response, "error in peer response to Zebra request");
817 }
818
819 let _ = tx.send(response.map_err(Into::into));
820 } else {
821 unreachable!("already checked for AwaitingResponse");
822 }
823
824 self.state = State::AwaitingRequest;
825 }
826
827 // We're awaiting a response to a client request,
828 // so wait on either a peer message, or on a request cancellation.
829 State::AwaitingResponse {
830 ref span,
831 ref mut tx,
832 ..
833 } => {
834 // we have to get rid of the span reference so we can tamper with the state
835 let span = span.clone();
836 trace!(parent: &span, "awaiting response to client request");
837 let timer_ref = self
838 .request_timer
839 .as_mut()
840 .expect("timeout must be set while awaiting response");
841
842 // # Security
843 //
844 // select() prefers the first future if multiple futures are ready.
845 //
846 // If multiple futures are ready, we want the priority for each individual
847 // connection to be:
848 // - cancellation, then
849 // - timeout, then
850 // - peer responses.
851 //
852 // (Messages to other peers are processed concurrently.)
853 //
854 // This makes sure a peer can't block disconnection or timeouts by sending too
855 // many messages. It also avoids doing work to process messages after a
856 // connection has failed.
857 let cancel = future::select(tx.cancellation(), timer_ref);
858 match future::select(cancel, peer_rx.next())
859 .instrument(span.clone())
860 .await
861 {
862 Either::Right((None, _)) => {
863 self.fail_with(PeerError::ConnectionClosed).await
864 }
865 Either::Right((Some(Err(e)), _)) => self.fail_with(e).await,
866 Either::Right((Some(Ok(peer_msg)), _cancel)) => {
867 self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));
868
869 // Try to process the message using the handler.
870 // This extremely awkward construction avoids
871 // keeping a live reference to handler across the
872 // call to handle_message_as_request, which takes
873 // &mut self. This is a sign that we don't properly
874 // factor the state required for inbound and
875 // outbound requests.
876 let request_msg = match self.state {
877 State::AwaitingResponse {
878 ref mut handler, ..
879 } => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs, self.connection_info.connected_addr.get_transient_addr())),
880 _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
881 self.state,
882 peer_msg,
883 self.client_rx,
884 ),
885 };
886
887 self.update_state_metrics(None);
888
889 // If the message was not consumed as a response,
890 // check whether it can be handled as a request.
891 let unused_msg = if let Some(request_msg) = request_msg {
892 // do NOT instrument with the request span, this is
893 // independent work
894 self.handle_message_as_request(request_msg).await
895 } else {
896 None
897 };
898
899 if let Some(unused_msg) = unused_msg {
900 debug!(
901 %unused_msg,
902 %self.state,
903 "ignoring peer message: not a response or a request",
904 );
905 }
906 }
907 Either::Left((Either::Right(_), _peer_fut)) => {
908 trace!(parent: &span, "client request timed out");
909 let e = PeerError::ConnectionReceiveTimeout;
910
911 // Replace the state with a temporary value,
912 // so we can take ownership of the response sender.
913 self.state = match std::mem::replace(&mut self.state, State::Failed) {
914 // Special case: ping timeouts fail the connection.
915 State::AwaitingResponse {
916 handler: Handler::Ping { .. },
917 tx,
918 ..
919 } => {
920 // We replaced the original state, which means `fail_with` won't see it.
921 // So we do the state request cleanup manually.
922 let e = SharedPeerError::from(e);
923 let _ = tx.send(Err(e.clone()));
924 self.fail_with(e).await;
925 State::Failed
926 }
927 // Other request timeouts fail the request.
928 State::AwaitingResponse { tx, .. } => {
929 let _ = tx.send(Err(e.into()));
930 State::AwaitingRequest
931 }
932 _ => unreachable!(
933 "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
934 self.client_rx
935 ),
936 };
937 }
938 Either::Left((Either::Left(_), _peer_fut)) => {
939 // The client receiver was dropped, so we don't need to send on `tx` here.
940 trace!(parent: &span, "client request was cancelled");
941 self.state = State::AwaitingRequest;
942 }
943 }
944 }
945
946 // This connection has failed: stop the event loop, and complete the future.
947 State::Failed => break,
948 }
949 }
950
951 // TODO: close peer_rx here, after changing it from a stream to a channel
952
953 let error = self.error_slot.try_get_error();
954 assert!(
955 error.is_some(),
956 "closing connections must call fail_with() or shutdown() to set the error slot"
957 );
958
959 self.update_state_metrics(error.expect("checked is_some").to_string());
960 }
961
962 /// Fail this connection, log the failure, and shut it down.
963 /// See [`Self::shutdown_async()`] for details.
964 ///
965 /// Use [`Self::shutdown_async()`] to avoid logging the failure,
966 /// and [`Self::shutdown()`] from non-async code.
967 async fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
968 let error = error.into();
969
970 debug!(
971 %error,
972 client_receiver = ?self.client_rx,
973 "failing peer service with error"
974 );
975
976 self.shutdown_async(error).await;
977 }
978
979 /// Handle an internal client request, possibly generating outgoing messages to the
980 /// remote peer.
981 ///
982 /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
983 async fn handle_client_request(&mut self, req: InProgressClientRequest) {
984 trace!(?req.request);
985 use Request::*;
986 use State::*;
987 let InProgressClientRequest { request, tx, span } = req;
988
989 if tx.is_canceled() {
990 metrics::counter!("peer.canceled").increment(1);
991 debug!(state = %self.state, %request, "ignoring canceled request");
992
993 metrics::counter!(
994 "zebra.net.out.requests.canceled",
995 "command" => request.command(),
996 "addr" => self.metrics_label.clone(),
997 )
998 .increment(1);
999 self.update_state_metrics(format!("Out::Req::Canceled::{}", request.command()));
1000
1001 return;
1002 }
1003
1004 debug!(state = %self.state, %request, "sending request from Zebra to peer");
1005
1006 // Add a metric for outbound requests.
1007 metrics::counter!(
1008 "zebra.net.out.requests",
1009 "command" => request.command(),
1010 "addr" => self.metrics_label.clone(),
1011 )
1012 .increment(1);
1013 self.update_state_metrics(format!("Out::Req::{}", request.command()));
1014
1015 let new_handler = match (&self.state, request) {
1016 (Failed, request) => panic!(
1017 "failed connection cannot handle new request: {:?}, client_receiver: {:?}",
1018 request,
1019 self.client_rx
1020 ),
1021 (pending @ AwaitingResponse { .. }, request) => panic!(
1022 "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
1023 request,
1024 pending,
1025 self.client_rx
1026 ),
1027
1028 // Take some cached addresses from the peer connection. This address cache helps
1029 // work-around a `zcashd` addr response rate-limit.
1030 (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
1031 // Security: This method performs security-sensitive operations, see its comments
1032 // for details.
1033 let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT);
1034
1035 debug!(
1036 response_addrs = response_addrs.len(),
1037 remaining_addrs = self.cached_addrs.len(),
1038 PEER_ADDR_RESPONSE_LIMIT,
1039 "responding to Peers request using some cached addresses",
1040 );
1041
1042 Ok(Handler::Finished(Ok(Response::Peers(response_addrs))))
1043 }
1044 (AwaitingRequest, Peers) => self
1045 .peer_tx
1046 .send(Message::GetAddr)
1047 .await
1048 .map(|()| Handler::Peers),
1049
1050 (AwaitingRequest, Ping(nonce)) => {
1051 let ping_sent_at = Instant::now();
1052
1053 self
1054 .peer_tx
1055 .send(Message::Ping(nonce))
1056 .await
1057 .map(|()| Handler::Ping { nonce, ping_sent_at })
1058 }
1059
1060 (AwaitingRequest, BlocksByHash(hashes)) => {
1061 self
1062 .peer_tx
1063 .send(Message::GetData(
1064 hashes.iter().map(|h| (*h).into()).collect(),
1065 ))
1066 .await
1067 .map(|()|
1068 Handler::BlocksByHash {
1069 blocks: Vec::with_capacity(hashes.len()),
1070 pending_hashes: hashes,
1071 }
1072 )
1073 }
1074 (AwaitingRequest, TransactionsById(ids)) => {
1075 self
1076 .peer_tx
1077 .send(Message::GetData(
1078 ids.iter().map(Into::into).collect(),
1079 ))
1080 .await
1081 .map(|()|
1082 Handler::TransactionsById {
1083 transactions: Vec::with_capacity(ids.len()),
1084 pending_ids: ids,
1085 })
1086 }
1087
1088 (AwaitingRequest, FindBlocks { known_blocks, stop }) => {
1089 self
1090 .peer_tx
1091 .send(Message::GetBlocks { known_blocks, stop })
1092 .await
1093 .map(|()|
1094 Handler::FindBlocks
1095 )
1096 }
1097 (AwaitingRequest, FindHeaders { known_blocks, stop }) => {
1098 self
1099 .peer_tx
1100 .send(Message::GetHeaders { known_blocks, stop })
1101 .await
1102 .map(|()|
1103 Handler::FindHeaders
1104 )
1105 }
1106
1107 (AwaitingRequest, MempoolTransactionIds) => {
1108 self
1109 .peer_tx
1110 .send(Message::Mempool)
1111 .await
1112 .map(|()|
1113 Handler::MempoolTransactionIds
1114 )
1115 }
1116
1117 (AwaitingRequest, PushTransaction(transaction)) => {
1118 self
1119 .peer_tx
1120 .send(Message::Tx(transaction))
1121 .await
1122 .map(|()|
1123 Handler::Finished(Ok(Response::Nil))
1124 )
1125 }
1126 (AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
1127 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1128 .try_into()
1129 .expect("constant fits in usize");
1130
1131 // # Security
1132 //
1133 // In most cases, we try to split over-sized requests into multiple network-layer
1134 // messages. But we are unlikely to reach this limit with the default mempool
1135 // config, so a gossip like this could indicate a network amplification attack.
1136 //
1137 // This limit is particularly important here, because advertisements send the same
1138 // message to half our available peers.
1139 //
1140 // If there are thousands of transactions in the mempool, letting peers know the
1141 // exact transactions we have isn't that important, so it's ok to drop arbitrary
1142 // transaction hashes from our response.
1143 if hashes.len() > max_tx_inv_in_message {
1144 debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip");
1145 }
1146
1147 let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect();
1148
1149 self
1150 .peer_tx
1151 .send(Message::Inv(hashes))
1152 .await
1153 .map(|()|
1154 Handler::Finished(Ok(Response::Nil))
1155 )
1156 }
1157 (AwaitingRequest, AdvertiseBlock(hash) | AdvertiseBlockToAll(hash)) => {
1158 self
1159 .peer_tx
1160 .send(Message::Inv(vec![hash.into()]))
1161 .await
1162 .map(|()|
1163 Handler::Finished(Ok(Response::Nil))
1164 )
1165 }
1166 };
1167
1168 // Update the connection state with a new handler, or fail with an error.
1169 match new_handler {
1170 Ok(handler) => {
1171 self.state = AwaitingResponse { handler, span, tx };
1172 self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
1173 }
1174 Err(error) => {
1175 let error = SharedPeerError::from(error);
1176 let _ = tx.send(Err(error.clone()));
1177 self.fail_with(error).await;
1178 }
1179 };
1180 }
1181
1182 /// Handle `msg` as a request from a peer to this Zebra instance.
1183 ///
1184 /// If the message is not handled, it is returned.
1185 // This function has its own span, because we're creating a new work
1186 // context (namely, the work of processing the inbound msg as a request)
1187 #[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))]
1188 async fn handle_message_as_request(&mut self, msg: Message) -> Option<Message> {
1189 trace!(?msg);
1190 debug!(state = %self.state, %msg, "received inbound peer message");
1191
1192 self.update_state_metrics(format!("In::Msg::{}", msg.command()));
1193
1194 use InboundMessage::*;
1195
1196 let req = match msg {
1197 Message::Ping(nonce) => {
1198 trace!(?nonce, "responding to heartbeat");
1199 if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
1200 self.fail_with(e).await;
1201 }
1202 Consumed
1203 }
1204 // These messages shouldn't be sent outside of a handshake.
1205 Message::Version { .. } => {
1206 self.fail_with(PeerError::DuplicateHandshake).await;
1207 Consumed
1208 }
1209 Message::Verack => {
1210 self.fail_with(PeerError::DuplicateHandshake).await;
1211 Consumed
1212 }
1213 // These messages should already be handled as a response if they
1214 // could be a response, so if we see them here, they were either
1215 // sent unsolicited, or they were sent in response to a canceled request
1216 // that we've already forgotten about.
1217 Message::Reject { .. } => {
1218 debug!(%msg, "got reject message unsolicited or from canceled request");
1219 Unused
1220 }
1221 Message::NotFound { .. } => {
1222 debug!(%msg, "got notfound message unsolicited or from canceled request");
1223 Unused
1224 }
1225 Message::Pong(_) => {
1226 debug!(%msg, "got pong message unsolicited or from canceled request");
1227 Unused
1228 }
1229 Message::Block(_) => {
1230 debug!(%msg, "got block message unsolicited or from canceled request");
1231 Unused
1232 }
1233 Message::Headers(_) => {
1234 debug!(%msg, "got headers message unsolicited or from canceled request");
1235 Unused
1236 }
1237 // These messages should never be sent by peers.
1238 Message::FilterLoad { .. } | Message::FilterAdd { .. } | Message::FilterClear => {
1239 // # Security
1240 //
1241 // Zcash connections are not authenticated, so malicious nodes can send fake messages,
1242 // with connected peers' IP addresses in the IP header.
1243 //
1244 // Since we can't verify their source, Zebra needs to ignore unexpected messages,
1245 // because closing the connection could cause a denial of service or eclipse attack.
1246 debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
1247
1248 // Ignored, but consumed because it is technically a protocol error.
1249 Consumed
1250 }
1251
1252 // # Security
1253 //
1254 // Zebra crawls the network proactively, and that's the only way peers get into our
1255 // address book. This prevents peers from filling our address book with malicious peer
1256 // addresses.
1257 Message::Addr(ref new_addrs) => {
1258 // # Peer Set Reliability
1259 //
1260 // We keep a list of the unused peer addresses sent by each connection, to work
1261 // around `zcashd`'s `getaddr` response rate-limit.
1262 let no_response =
1263 Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None);
1264 assert_eq!(
1265 no_response,
1266 Vec::new(),
1267 "peers unexpectedly taken from cache"
1268 );
1269
1270 debug!(
1271 new_addrs = new_addrs.len(),
1272 cached_addrs = self.cached_addrs.len(),
1273 "adding unsolicited addresses to cached addresses",
1274 );
1275
1276 Consumed
1277 }
1278 Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(),
1279 Message::Inv(ref items) => match &items[..] {
1280 // We don't expect to be advertised multiple blocks at a time,
1281 // so we ignore any advertisements of multiple blocks.
1282 [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash).into(),
1283
1284 // Some peers advertise invs with mixed item types.
1285 // But we're just interested in the transaction invs.
1286 //
1287 // TODO: split mixed invs into multiple requests,
1288 // but skip runs of multiple blocks.
1289 tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1290 Request::AdvertiseTransactionIds(transaction_ids(items).collect()).into()
1291 }
1292
1293 // Log detailed messages for ignored inv advertisement messages.
1294 [] => {
1295 debug!(%msg, "ignoring empty inv");
1296
1297 // This might be a minor protocol error, or it might mean "not found".
1298 Unused
1299 }
1300 [InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
1301 debug!(%msg, "ignoring inv with multiple blocks");
1302 Unused
1303 }
1304 _ => {
1305 debug!(%msg, "ignoring inv with no transactions");
1306 Unused
1307 }
1308 },
1309 Message::GetData(ref items) => match &items[..] {
1310 // Some peers advertise invs with mixed item types.
1311 // So we suspect they might do the same with getdata.
1312 //
1313 // Since we can only handle one message at a time,
1314 // we treat it as a block request if there are any blocks,
1315 // or a transaction request if there are any transactions.
1316 //
1317 // TODO: split mixed getdata into multiple requests.
1318 b_hashes
1319 if b_hashes
1320 .iter()
1321 .any(|item| matches!(item, InventoryHash::Block(_))) =>
1322 {
1323 Request::BlocksByHash(block_hashes(items).collect()).into()
1324 }
1325 tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1326 Request::TransactionsById(transaction_ids(items).collect()).into()
1327 }
1328
1329 // Log detailed messages for ignored getdata request messages.
1330 [] => {
1331 debug!(%msg, "ignoring empty getdata");
1332
1333 // This might be a minor protocol error, or it might mean "not found".
1334 Unused
1335 }
1336 _ => {
1337 debug!(%msg, "ignoring getdata with no blocks or transactions");
1338 Unused
1339 }
1340 },
1341 Message::GetAddr => Request::Peers.into(),
1342 Message::GetBlocks {
1343 ref known_blocks,
1344 stop,
1345 } => Request::FindBlocks {
1346 known_blocks: known_blocks.clone(),
1347 stop,
1348 }
1349 .into(),
1350 Message::GetHeaders {
1351 ref known_blocks,
1352 stop,
1353 } => Request::FindHeaders {
1354 known_blocks: known_blocks.clone(),
1355 stop,
1356 }
1357 .into(),
1358 Message::Mempool => Request::MempoolTransactionIds.into(),
1359 };
1360
1361 // Handle the request, and return unused messages.
1362 match req {
1363 AsRequest(req) => {
1364 self.drive_peer_request(req).await;
1365 None
1366 }
1367 Consumed => None,
1368 Unused => Some(msg),
1369 }
1370 }
1371
1372 /// Given a `req` originating from the peer, drive it to completion and send
1373 /// any appropriate messages to the remote peer. If an error occurs while
1374 /// processing the request (e.g., the service is shedding load), then we call
1375 /// fail_with to terminate the entire peer connection, shrinking the number
1376 /// of connected peers.
1377 async fn drive_peer_request(&mut self, req: Request) {
1378 trace!(?req);
1379
1380 // Add a metric for inbound requests
1381 metrics::counter!(
1382 "zebra.net.in.requests",
1383 "command" => req.command(),
1384 "addr" => self.metrics_label.clone(),
1385 )
1386 .increment(1);
1387 self.update_state_metrics(format!("In::Req::{}", req.command()));
1388
1389 // Give the inbound service time to clear its queue,
1390 // before sending the next inbound request.
1391 tokio::task::yield_now().await;
1392
1393 // # Security
1394 //
1395 // Holding buffer slots for a long time can cause hangs:
1396 // <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
1397 //
1398 // The inbound service must be called immediately after a buffer slot is reserved.
1399 //
1400 // The inbound service never times out in readiness, because the load shed layer is always
1401 // ready, and returns an error in response to the request instead.
1402 if self.svc.ready().await.is_err() {
1403 self.fail_with(PeerError::ServiceShutdown).await;
1404 return;
1405 }
1406
1407 // Inbound service request timeouts are handled by the timeout layer in `start::start()`.
1408 let rsp = match self.svc.call(req.clone()).await {
1409 Err(e) => {
1410 if e.is::<tower::load_shed::error::Overloaded>() {
1411 // # Security
1412 //
1413 // The peer request queue must have a limited length.
1414 // The buffer and load shed layers are added in `start::start()`.
1415 tracing::debug!("inbound service is overloaded, may close connection");
1416
1417 let now = Instant::now();
1418
1419 self.handle_inbound_overload(req, now, PeerError::Overloaded)
1420 .await;
1421 } else if e.is::<tower::timeout::error::Elapsed>() {
1422 // # Security
1423 //
1424 // Peer requests must have a timeout.
1425 // The timeout layer is added in `start::start()`.
1426 tracing::info!(%req, "inbound service request timed out, may close connection");
1427
1428 let now = Instant::now();
1429
1430 self.handle_inbound_overload(req, now, PeerError::InboundTimeout)
1431 .await;
1432 } else {
1433 // We could send a reject to the remote peer, but that might cause
1434 // them to disconnect, and we might be using them to sync blocks.
1435 // For similar reasons, we don't want to fail_with() here - we
1436 // only close the connection if the peer is doing something wrong.
1437 info!(
1438 %e,
1439 connection_state = ?self.state,
1440 client_receiver = ?self.client_rx,
1441 "error processing peer request",
1442 );
1443 self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
1444 }
1445
1446 return;
1447 }
1448 Ok(rsp) => rsp,
1449 };
1450
1451 // Add a metric for outbound responses to inbound requests
1452 metrics::counter!(
1453 "zebra.net.out.responses",
1454 "command" => rsp.command(),
1455 "addr" => self.metrics_label.clone(),
1456 )
1457 .increment(1);
1458 self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));
1459
1460 // TODO: split response handler into its own method
1461 match rsp.clone() {
1462 Response::Nil => { /* generic success, do nothing */ }
1463 Response::Peers(addrs) => {
1464 if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
1465 self.fail_with(e).await;
1466 }
1467 }
1468 Response::Pong(duration) => {
1469 debug!(?duration, "responding to Ping with Pong RTT");
1470 }
1471 Response::Transactions(transactions) => {
1472 // Generate one tx message per transaction,
1473 // then a notfound message with all the missing transaction ids.
1474 let mut missing_ids = Vec::new();
1475
1476 for transaction in transactions.into_iter() {
1477 match transaction {
1478 Available((transaction, _)) => {
1479 if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
1480 self.fail_with(e).await;
1481 return;
1482 }
1483 }
1484 Missing(id) => missing_ids.push(id.into()),
1485 }
1486 }
1487
1488 if !missing_ids.is_empty() {
1489 if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
1490 self.fail_with(e).await;
1491 return;
1492 }
1493 }
1494 }
1495 Response::Blocks(blocks) => {
1496 // Generate one tx message per block,
1497 // then a notfound message with all the missing block hashes.
1498 let mut missing_hashes = Vec::new();
1499
1500 for block in blocks.into_iter() {
1501 match block {
1502 Available((block, _)) => {
1503 if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
1504 self.fail_with(e).await;
1505 return;
1506 }
1507 }
1508 Missing(hash) => missing_hashes.push(hash.into()),
1509 }
1510 }
1511
1512 if !missing_hashes.is_empty() {
1513 if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
1514 self.fail_with(e).await;
1515 return;
1516 }
1517 }
1518 }
1519 Response::BlockHashes(hashes) => {
1520 if let Err(e) = self
1521 .peer_tx
1522 .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
1523 .await
1524 {
1525 self.fail_with(e).await
1526 }
1527 }
1528 Response::BlockHeaders(headers) => {
1529 if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
1530 self.fail_with(e).await
1531 }
1532 }
1533 Response::TransactionIds(hashes) => {
1534 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1535 .try_into()
1536 .expect("constant fits in usize");
1537
1538 // # Security
1539 //
1540 // In most cases, we try to split over-sized responses into multiple network-layer
1541 // messages. But we are unlikely to reach this limit with the default mempool
1542 // config, so a response like this could indicate a network amplification attack.
1543 //
1544 // If there are thousands of transactions in the mempool, letting peers know the
1545 // exact transactions we have isn't that important, so it's ok to drop arbitrary
1546 // transaction hashes from our response.
1547 if hashes.len() > max_tx_inv_in_message {
1548 debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response");
1549 }
1550
1551 let hashes = hashes
1552 .into_iter()
1553 .take(max_tx_inv_in_message)
1554 .map(Into::into)
1555 .collect();
1556
1557 if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
1558 self.fail_with(e).await
1559 }
1560 }
1561 }
1562
1563 debug!(state = %self.state, %req, %rsp, "sent Zebra response to peer");
1564
1565 // Give the inbound service time to clear its queue,
1566 // before checking the connection for the next inbound or outbound request.
1567 tokio::task::yield_now().await;
1568 }
1569
1570 /// Handle inbound service overload and timeout error responses by randomly terminating some
1571 /// connections.
1572 ///
1573 /// # Security
1574 ///
1575 /// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
1576 /// to reduce the load on the application. But dropping every connection that receives an
1577 /// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
1578 /// connections, and stop itself downloading blocks or transactions.
1579 ///
1580 /// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
1581 /// its connections to other peers.
1582 ///
1583 /// So instead, Zebra drops some overloaded connections at random. If a connection has recently
1584 /// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
1585 /// single peer (or multiple peers) to perform a denial of service attack.
1586 ///
1587 /// The inbound connection rate-limit also makes it hard for multiple peers to perform this
1588 /// attack, because each inbound connection can only send one inbound request before its
1589 /// probability of being disconnected increases.
1590 async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) {
1591 let prev = self.last_overload_time.replace(now);
1592 let drop_connection_probability = overload_drop_connection_probability(now, prev);
1593
1594 if thread_rng().gen::<f32>() < drop_connection_probability {
1595 if matches!(error, PeerError::Overloaded) {
1596 metrics::counter!("pool.closed.loadshed").increment(1);
1597 } else {
1598 metrics::counter!("pool.closed.inbound.timeout").increment(1);
1599 }
1600
1601 tracing::info!(
1602 drop_connection_probability = format!("{drop_connection_probability:.3}"),
1603 remote_user_agent = ?self.connection_info.remote.user_agent,
1604 negotiated_version = ?self.connection_info.negotiated_version,
1605 peer = ?self.metrics_label,
1606 last_peer_state = ?self.last_metrics_state,
1607 // TODO: remove this detailed debug info once #6506 is fixed
1608 remote_height = ?self.connection_info.remote.start_height,
1609 cached_addrs = ?self.cached_addrs.len(),
1610 connection_state = ?self.state,
1611 "inbound service {error} error, closing connection",
1612 );
1613
1614 self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command()));
1615 self.fail_with(error).await;
1616 } else {
1617 self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command()));
1618
1619 if matches!(error, PeerError::Overloaded) {
1620 metrics::counter!("pool.ignored.loadshed").increment(1);
1621 } else {
1622 metrics::counter!("pool.ignored.inbound.timeout").increment(1);
1623 }
1624 }
1625 }
1626}
1627
1628/// Returns the probability of dropping a connection where the last overload was at `prev`,
1629/// and the current overload is `now`.
1630///
1631/// # Security
1632///
1633/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
1634/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
1635///
1636/// Connections that have seen a previous overload error in that time
1637/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
1638/// This probability increases quadratically, so peers that send lots of inbound
1639/// requests are more likely to be dropped.
1640///
1641/// ## Examples
1642///
1643/// If a connection sends multiple overloads close together, it is very likely to be
1644/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
1645/// to be disconnected.
1646fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
1647 let Some(prev) = prev else {
1648 return MIN_OVERLOAD_DROP_PROBABILITY;
1649 };
1650
1651 let protection_fraction_since_last_overload =
1652 (now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();
1653
1654 // Quadratically increase the disconnection probability for very recent overloads.
1655 // Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
1656 let overload_fraction = protection_fraction_since_last_overload.powi(2);
1657
1658 let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
1659 let raw_drop_probability =
1660 MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);
1661
1662 raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
1663}
1664
1665impl<S, Tx> Connection<S, Tx>
1666where
1667 Tx: Sink<Message, Error = SerializationError> + Unpin,
1668{
1669 /// Update the connection state metrics for this connection,
1670 /// using `extra_state_info` as additional state information.
1671 fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
1672 let current_metrics_state = if let Some(extra_state_info) = extra_state_info.into() {
1673 format!("{}::{extra_state_info}", self.state.command()).into()
1674 } else {
1675 self.state.command()
1676 };
1677
1678 if self.last_metrics_state.as_ref() == Some(¤t_metrics_state) {
1679 return;
1680 }
1681
1682 self.erase_state_metrics();
1683
1684 // Set the new state
1685 metrics::gauge!(
1686 "zebra.net.connection.state",
1687 "command" => current_metrics_state.clone(),
1688 "addr" => self.metrics_label.clone(),
1689 )
1690 .increment(1.0);
1691
1692 self.last_metrics_state = Some(current_metrics_state);
1693 }
1694
1695 /// Erase the connection state metrics for this connection.
1696 fn erase_state_metrics(&mut self) {
1697 if let Some(last_metrics_state) = self.last_metrics_state.take() {
1698 metrics::gauge!(
1699 "zebra.net.connection.state",
1700 "command" => last_metrics_state,
1701 "addr" => self.metrics_label.clone(),
1702 )
1703 .set(0.0);
1704 }
1705 }
1706
1707 /// Marks the peer as having failed with `error`, and performs connection cleanup,
1708 /// including async channel closes.
1709 ///
1710 /// If the connection has errored already, re-use the original error.
1711 /// Otherwise, fail the connection with `error`.
1712 async fn shutdown_async(&mut self, error: impl Into<SharedPeerError>) {
1713 // Close async channels first, so other tasks can start shutting down.
1714 // There's nothing we can do about errors while shutting down, and some errors are expected.
1715 //
1716 // TODO: close peer_tx and peer_rx in shutdown() and Drop, after:
1717 // - using channels instead of streams/sinks?
1718 // - exposing the underlying implementation rather than using generics and closures?
1719 // - adding peer_rx to the connection struct (optional)
1720 let _ = self.peer_tx.close().await;
1721
1722 self.shutdown(error);
1723 }
1724
1725 /// Marks the peer as having failed with `error`, and performs connection cleanup.
1726 /// See [`Self::shutdown_async()`] for details.
1727 ///
1728 /// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels.
1729 fn shutdown(&mut self, error: impl Into<SharedPeerError>) {
1730 let mut error = error.into();
1731
1732 // Close channels first, so other tasks can start shutting down.
1733 self.client_rx.close();
1734
1735 // Update the shared error slot
1736 //
1737 // # Correctness
1738 //
1739 // Error slots use a threaded `std::sync::Mutex`, so accessing the slot
1740 // can block the async task's current thread. We only perform a single
1741 // slot update per `Client`. We ignore subsequent error slot updates.
1742 let slot_result = self.error_slot.try_update_error(error.clone());
1743
1744 if let Err(AlreadyErrored { original_error }) = slot_result {
1745 debug!(
1746 new_error = %error,
1747 %original_error,
1748 connection_state = ?self.state,
1749 "multiple errors on connection: \
1750 failed connections should stop processing pending requests and responses, \
1751 then close the connection"
1752 );
1753
1754 error = original_error;
1755 } else {
1756 debug!(%error,
1757 connection_state = ?self.state,
1758 "shutting down peer service with error");
1759 }
1760
1761 // Prepare to flush any pending client requests.
1762 //
1763 // We've already closed the client channel, so setting State::Failed
1764 // will make the main loop flush any pending requests.
1765 //
1766 // However, we may have an outstanding client request in State::AwaitingResponse,
1767 // so we need to deal with it first.
1768 if let State::AwaitingResponse { tx, .. } =
1769 std::mem::replace(&mut self.state, State::Failed)
1770 {
1771 // # Correctness
1772 //
1773 // We know the slot has Some(error), because we just set it above,
1774 // and the error slot is never unset.
1775 //
1776 // Accessing the error slot locks a threaded std::sync::Mutex, which
1777 // can block the current async task thread. We briefly lock the mutex
1778 // to clone the error.
1779 let _ = tx.send(Err(error.clone()));
1780 }
1781
1782 // Make the timer and metrics consistent with the Failed state.
1783 self.request_timer = None;
1784 self.update_state_metrics(None);
1785
1786 // Finally, flush pending client requests.
1787 while let Some(InProgressClientRequest { tx, span, .. }) =
1788 self.client_rx.close_and_flush_next()
1789 {
1790 trace!(
1791 parent: &span,
1792 %error,
1793 "sending an error response to a pending request on a failed connection"
1794 );
1795 let _ = tx.send(Err(error.clone()));
1796 }
1797 }
1798}
1799
1800impl<S, Tx> Drop for Connection<S, Tx>
1801where
1802 Tx: Sink<Message, Error = SerializationError> + Unpin,
1803{
1804 fn drop(&mut self) {
1805 self.shutdown(PeerError::ConnectionDropped);
1806
1807 self.erase_state_metrics();
1808 }
1809}
1810
1811/// Map a list of inventory hashes to the corresponding unmined transaction IDs.
1812/// Non-transaction inventory hashes are skipped.
1813///
1814/// v4 transactions use a legacy transaction ID, and
1815/// v5 transactions use a witnessed transaction ID.
1816fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator<Item = UnminedTxId> + '_ {
1817 items.iter().filter_map(InventoryHash::unmined_tx_id)
1818}
1819
1820/// Map a list of inventory hashes to the corresponding block hashes.
1821/// Non-block inventory hashes are skipped.
1822fn block_hashes(items: &'_ [InventoryHash]) -> impl Iterator<Item = block::Hash> + '_ {
1823 items.iter().filter_map(InventoryHash::block_hash)
1824}