Skip to main content

zebra_network/peer_set/
limit.rs

1//! Counting active connections used by Zebra.
2//!
3//! These types can be used to count any kind of active resource.
4//! But they are currently used to track the number of open connections.
5
6use std::{fmt, sync::Arc};
7
8use tokio::sync::mpsc;
9
10/// A signal sent by a [`Connection`][1] when it opens or closes.
11///
12/// Used to count the number of open connections.
13///
14/// [1]: crate::peer::Connection
15#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
16enum ConnectionStatus {
17    Opened,
18    Closed,
19}
20
21/// A counter for active connections.
22///
23/// Creates a [`ConnectionTracker`] to track each active connection.
24/// When these trackers are dropped, the counter gets notified.
25pub struct ActiveConnectionCounter {
26    /// The number of active connections tracked using this counter.
27    count: usize,
28
29    /// The number of connection slots that are reserved for connection attempts.
30    reserved_count: usize,
31
32    /// The limit for this type of connection, for diagnostics only.
33    /// The caller must enforce the limit by ignoring, delaying, or dropping connections.
34    limit: usize,
35
36    /// The label for this connection counter, typically its type.
37    label: Arc<str>,
38
39    /// The channel used to send opened or closed connection notifications.
40    status_notification_tx: mpsc::UnboundedSender<ConnectionStatus>,
41
42    /// The channel used to receive opened or closed connection notifications.
43    status_notification_rx: mpsc::UnboundedReceiver<ConnectionStatus>,
44
45    /// Active connection count progress transmitter.
46    #[cfg(feature = "progress-bar")]
47    connection_bar: howudoin::Tx,
48}
49
50impl fmt::Debug for ActiveConnectionCounter {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("ActiveConnectionCounter")
53            .field("label", &self.label)
54            .field("count", &self.count)
55            .field("reserved_count", &self.reserved_count)
56            .field("limit", &self.limit)
57            .finish()
58    }
59}
60
61impl ActiveConnectionCounter {
62    /// Create and return a new active connection counter.
63    pub fn new_counter() -> Self {
64        Self::new_counter_with(usize::MAX, "Active Connections")
65    }
66
67    /// Create and return a new active connection counter with `limit` and `label`.
68    /// The caller must check and enforce limits using [`update_count()`](Self::update_count).
69    pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
70        // The number of items in this channel is bounded by the connection limit.
71        let (status_notification_tx, status_notification_rx) = mpsc::unbounded_channel();
72
73        let label = label.to_string();
74
75        #[cfg(feature = "progress-bar")]
76        let connection_bar = howudoin::new_root().label(label.clone());
77
78        Self {
79            count: 0,
80            reserved_count: 0,
81            limit,
82            label: label.into(),
83            status_notification_rx,
84            status_notification_tx,
85            #[cfg(feature = "progress-bar")]
86            connection_bar,
87        }
88    }
89
90    /// Create and return a new [`ConnectionTracker`], and add 1 to this counter.
91    ///
92    /// When the returned tracker is dropped, this counter will be notified, and decreased by 1.
93    pub fn track_connection(&mut self) -> ConnectionTracker {
94        ConnectionTracker::new(self)
95    }
96
97    /// Check for closed connection notifications, and return the current connection count.
98    pub fn update_count(&mut self) -> usize {
99        let previous_connections = self.count;
100
101        // We ignore errors here:
102        // - TryRecvError::Empty means that there are no pending close notifications
103        // - TryRecvError::Closed is unreachable, because we hold a sender
104        while let Ok(status) = self.status_notification_rx.try_recv() {
105            match status {
106                ConnectionStatus::Opened => {
107                    self.reserved_count -= 1;
108                    self.count += 1;
109
110                    debug!(
111                        open_connections = ?self.count,
112                        ?previous_connections,
113                        limit = ?self.limit,
114                        label = ?self.label,
115                        "a peer connection was opened",
116                    );
117                }
118                ConnectionStatus::Closed => {
119                    self.count -= 1;
120
121                    debug!(
122                        open_connections = ?self.count,
123                        ?previous_connections,
124                        limit = ?self.limit,
125                        label = ?self.label,
126                        "a peer connection was closed",
127                    );
128                }
129            }
130        }
131
132        trace!(
133            open_connections = ?self.count,
134            ?previous_connections,
135            limit = ?self.limit,
136            label = ?self.label,
137            "updated active connection count",
138        );
139
140        #[cfg(feature = "progress-bar")]
141        self.connection_bar
142            .set_pos(u64::try_from(self.count).expect("fits in u64"));
143        // .set_len(u64::try_from(self.limit).expect("fits in u64"));
144
145        self.count + self.reserved_count
146    }
147}
148
149impl Drop for ActiveConnectionCounter {
150    fn drop(&mut self) {
151        #[cfg(feature = "progress-bar")]
152        self.connection_bar.close();
153    }
154}
155
156/// A per-connection tracker.
157///
158/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
159/// When these trackers are dropped, the counter gets notified.
160pub struct ConnectionTracker {
161    /// The channel used to send open connection status notifications on first response or
162    /// closed connection notifications on drop.
163    status_notification_tx: mpsc::UnboundedSender<ConnectionStatus>,
164
165    /// A flag indicating whether this connection tracker has sent a notification that the
166    /// connection has been opened and that another notification should not be sent on drop.
167    has_marked_open: bool,
168
169    /// The label for this connection counter, typically its type.
170    label: Arc<str>,
171}
172
173impl fmt::Debug for ConnectionTracker {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        f.debug_tuple("ConnectionTracker")
176            .field(&self.label)
177            .finish()
178    }
179}
180
181impl ConnectionTracker {
182    /// Sends a notification to the status notification channel to count the connection as open, and
183    /// marks this [`ConnectionTracker`] as having sent that open notification to avoid double-counting.
184    pub fn mark_open(&mut self) {
185        if !self.has_marked_open {
186            let _ = self.status_notification_tx.send(ConnectionStatus::Opened);
187            self.has_marked_open = true;
188        }
189    }
190
191    /// Create and return a new active connection tracker, and add 1 to `counter`.
192    /// All connection trackers share a label with their connection counter.
193    ///
194    /// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
195    fn new(counter: &mut ActiveConnectionCounter) -> Self {
196        counter.reserved_count += 1;
197
198        debug!(
199            open_connections = ?counter.count,
200            limit = ?counter.limit,
201            label = ?counter.label,
202            "opening a new peer connection",
203        );
204
205        Self {
206            status_notification_tx: counter.status_notification_tx.clone(),
207            has_marked_open: false,
208            label: counter.label.clone(),
209        }
210    }
211}
212
213impl Drop for ConnectionTracker {
214    /// Notifies the corresponding connection counter that the connection has closed.
215    fn drop(&mut self) {
216        debug!(label = ?self.label, "closing a peer connection");
217
218        // We ignore disconnected errors, because the receiver can be dropped
219        // before some connections are dropped.
220        //
221        // # Security
222        //
223        // This channel is actually bounded by the inbound and outbound connection limit.
224        //
225        // # Correctness
226        //
227        // Unopened connections should be opened just before being closed to decrement both
228        // the count for pending connection attempts and the count for open connections in
229        // the active connection tracker.
230        self.mark_open();
231        let _ = self.status_notification_tx.send(ConnectionStatus::Closed);
232    }
233}