zebra_network/peer_set/limit.rs
1//! Counting active connections used by Zebra.
2//!
3//! These types can be used to count any kind of active resource.
4//! But they are currently used to track the number of open connections.
5
6use std::{fmt, sync::Arc};
7
8use tokio::sync::mpsc;
9
10/// A signal sent by a [`Connection`][1] when it opens or closes.
11///
12/// Used to count the number of open connections.
13///
14/// [1]: crate::peer::Connection
15#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
16enum ConnectionStatus {
17 Opened,
18 Closed,
19}
20
21/// A counter for active connections.
22///
23/// Creates a [`ConnectionTracker`] to track each active connection.
24/// When these trackers are dropped, the counter gets notified.
25pub struct ActiveConnectionCounter {
26 /// The number of active connections tracked using this counter.
27 count: usize,
28
29 /// The number of connection slots that are reserved for connection attempts.
30 reserved_count: usize,
31
32 /// The limit for this type of connection, for diagnostics only.
33 /// The caller must enforce the limit by ignoring, delaying, or dropping connections.
34 limit: usize,
35
36 /// The label for this connection counter, typically its type.
37 label: Arc<str>,
38
39 /// The channel used to send opened or closed connection notifications.
40 status_notification_tx: mpsc::UnboundedSender<ConnectionStatus>,
41
42 /// The channel used to receive opened or closed connection notifications.
43 status_notification_rx: mpsc::UnboundedReceiver<ConnectionStatus>,
44
45 /// Active connection count progress transmitter.
46 #[cfg(feature = "progress-bar")]
47 connection_bar: howudoin::Tx,
48}
49
50impl fmt::Debug for ActiveConnectionCounter {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("ActiveConnectionCounter")
53 .field("label", &self.label)
54 .field("count", &self.count)
55 .field("reserved_count", &self.reserved_count)
56 .field("limit", &self.limit)
57 .finish()
58 }
59}
60
61impl ActiveConnectionCounter {
62 /// Create and return a new active connection counter.
63 pub fn new_counter() -> Self {
64 Self::new_counter_with(usize::MAX, "Active Connections")
65 }
66
67 /// Create and return a new active connection counter with `limit` and `label`.
68 /// The caller must check and enforce limits using [`update_count()`](Self::update_count).
69 pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
70 // The number of items in this channel is bounded by the connection limit.
71 let (status_notification_tx, status_notification_rx) = mpsc::unbounded_channel();
72
73 let label = label.to_string();
74
75 #[cfg(feature = "progress-bar")]
76 let connection_bar = howudoin::new_root().label(label.clone());
77
78 Self {
79 count: 0,
80 reserved_count: 0,
81 limit,
82 label: label.into(),
83 status_notification_rx,
84 status_notification_tx,
85 #[cfg(feature = "progress-bar")]
86 connection_bar,
87 }
88 }
89
90 /// Create and return a new [`ConnectionTracker`], and add 1 to this counter.
91 ///
92 /// When the returned tracker is dropped, this counter will be notified, and decreased by 1.
93 pub fn track_connection(&mut self) -> ConnectionTracker {
94 ConnectionTracker::new(self)
95 }
96
97 /// Check for closed connection notifications, and return the current connection count.
98 pub fn update_count(&mut self) -> usize {
99 let previous_connections = self.count;
100
101 // We ignore errors here:
102 // - TryRecvError::Empty means that there are no pending close notifications
103 // - TryRecvError::Closed is unreachable, because we hold a sender
104 while let Ok(status) = self.status_notification_rx.try_recv() {
105 match status {
106 ConnectionStatus::Opened => {
107 self.reserved_count -= 1;
108 self.count += 1;
109
110 debug!(
111 open_connections = ?self.count,
112 ?previous_connections,
113 limit = ?self.limit,
114 label = ?self.label,
115 "a peer connection was opened",
116 );
117 }
118 ConnectionStatus::Closed => {
119 self.count -= 1;
120
121 debug!(
122 open_connections = ?self.count,
123 ?previous_connections,
124 limit = ?self.limit,
125 label = ?self.label,
126 "a peer connection was closed",
127 );
128 }
129 }
130 }
131
132 trace!(
133 open_connections = ?self.count,
134 ?previous_connections,
135 limit = ?self.limit,
136 label = ?self.label,
137 "updated active connection count",
138 );
139
140 #[cfg(feature = "progress-bar")]
141 self.connection_bar
142 .set_pos(u64::try_from(self.count).expect("fits in u64"));
143 // .set_len(u64::try_from(self.limit).expect("fits in u64"));
144
145 self.count + self.reserved_count
146 }
147}
148
149impl Drop for ActiveConnectionCounter {
150 fn drop(&mut self) {
151 #[cfg(feature = "progress-bar")]
152 self.connection_bar.close();
153 }
154}
155
156/// A per-connection tracker.
157///
158/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
159/// When these trackers are dropped, the counter gets notified.
160pub struct ConnectionTracker {
161 /// The channel used to send open connection status notifications on first response or
162 /// closed connection notifications on drop.
163 status_notification_tx: mpsc::UnboundedSender<ConnectionStatus>,
164
165 /// A flag indicating whether this connection tracker has sent a notification that the
166 /// connection has been opened and that another notification should not be sent on drop.
167 has_marked_open: bool,
168
169 /// The label for this connection counter, typically its type.
170 label: Arc<str>,
171}
172
173impl fmt::Debug for ConnectionTracker {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_tuple("ConnectionTracker")
176 .field(&self.label)
177 .finish()
178 }
179}
180
181impl ConnectionTracker {
182 /// Sends a notification to the status notification channel to count the connection as open, and
183 /// marks this [`ConnectionTracker`] as having sent that open notification to avoid double-counting.
184 pub fn mark_open(&mut self) {
185 if !self.has_marked_open {
186 let _ = self.status_notification_tx.send(ConnectionStatus::Opened);
187 self.has_marked_open = true;
188 }
189 }
190
191 /// Create and return a new active connection tracker, and add 1 to `counter`.
192 /// All connection trackers share a label with their connection counter.
193 ///
194 /// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
195 fn new(counter: &mut ActiveConnectionCounter) -> Self {
196 counter.reserved_count += 1;
197
198 debug!(
199 open_connections = ?counter.count,
200 limit = ?counter.limit,
201 label = ?counter.label,
202 "opening a new peer connection",
203 );
204
205 Self {
206 status_notification_tx: counter.status_notification_tx.clone(),
207 has_marked_open: false,
208 label: counter.label.clone(),
209 }
210 }
211}
212
213impl Drop for ConnectionTracker {
214 /// Notifies the corresponding connection counter that the connection has closed.
215 fn drop(&mut self) {
216 debug!(label = ?self.label, "closing a peer connection");
217
218 // We ignore disconnected errors, because the receiver can be dropped
219 // before some connections are dropped.
220 //
221 // # Security
222 //
223 // This channel is actually bounded by the inbound and outbound connection limit.
224 //
225 // # Correctness
226 //
227 // Unopened connections should be opened just before being closed to decrement both
228 // the count for pending connection attempts and the count for open connections in
229 // the active connection tracker.
230 self.mark_open();
231 let _ = self.status_notification_tx.send(ConnectionStatus::Closed);
232 }
233}