Skip to main content

zebra_state/service/
watch_receiver.rs

1//! Shared [`tokio::sync::watch`] channel wrappers.
2//!
3//! These wrappers help use watch channels correctly.
4
5use tokio::sync::watch;
6
7/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
8/// while avoiding deadlocks.
9///
10/// Returns data from the most recent state,
11/// regardless of how many times you call its methods.
12///
13/// Cloned instances provide identical state data.
14///
15/// # Correctness
16///
17/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
18///
19/// # Note
20///
21/// If a lot of blocks are committed at the same time,
22/// the watch channel will skip some block updates,
23/// even though those updates were committed to the state.
24#[derive(Clone, Debug)]
25pub struct WatchReceiver<T> {
26    /// The receiver for the current state data.
27    receiver: watch::Receiver<T>,
28}
29
30impl<T> WatchReceiver<T> {
31    /// Create a new [`WatchReceiver`] from a watch channel receiver.
32    pub fn new(receiver: watch::Receiver<T>) -> Self {
33        Self { receiver }
34    }
35}
36
37impl<T> WatchReceiver<T>
38where
39    T: Clone,
40{
41    /// Maps the current data `T` to `U` by applying a function to the watched value,
42    /// while holding the receiver lock as briefly as possible.
43    ///
44    /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
45    /// extract some information from it.
46    ///
47    /// Does not mark the watched data as seen.
48    ///
49    /// # Performance
50    ///
51    /// A single read lock is acquired to clone `T`, and then released after the
52    /// clone. To make this clone efficient, large or expensive `T` can be
53    /// wrapped in an [`std::sync::Arc`]. (Or individual fields can be wrapped
54    /// in an [`std::sync::Arc`].)
55    ///
56    /// # Correctness
57    ///
58    /// To prevent deadlocks:
59    ///
60    /// - `receiver.borrow()` should not be called before this method while in the same scope.
61    ///
62    /// It is important to avoid calling `borrow` more than once in the same scope, which
63    /// effectively tries to acquire two read locks to the shared data in the watch channel. If
64    /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
65    /// starts acquiring a write-lock, and prevents further read-locks from being acquired until
66    /// the update is finished.
67    ///
68    /// What can happen in that scenario is:
69    ///
70    /// 1. The receiver manages to acquire a read-lock for the first `borrow`
71    /// 2. The sender starts acquiring the write-lock
72    /// 3. The receiver fails to acquire a read-lock for the second `borrow`
73    ///
74    /// Now both the sender and the receivers hang, because the sender won't release the lock until
75    /// it can update the value, and the receiver won't release its first read-lock until it
76    /// acquires the second read-lock and finishes what it's doing.
77    pub fn with_watch_data<U, F>(&self, f: F) -> U
78    where
79        F: FnOnce(T) -> U,
80    {
81        // Make sure that the borrow's watch channel read lock
82        // is dropped before the closure is executed.
83        //
84        // Without this change, an eager reader can repeatedly block the channel writer.
85        // This seems to happen easily in RPC & ReadStateService futures.
86        // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
87        let cloned_data = self.cloned_watch_data();
88
89        f(cloned_data)
90    }
91
92    /// Calls the provided closure with the watch data in the channel
93    /// and returns the output.
94    ///
95    /// Does not mark the watched data as seen.
96    ///
97    /// The closure provided to this method will hold a read lock,
98    /// callers are expected to ensure any closures they provide
99    /// will promptly drop the read lock.
100    pub fn borrow_mapped<U: 'static, F>(&self, f: F) -> U
101    where
102        F: FnOnce(watch::Ref<T>) -> U,
103    {
104        f(self.receiver.borrow())
105    }
106
107    /// Returns a clone of the watch data in the channel.
108    /// Cloning the watched data helps avoid deadlocks.
109    ///
110    /// Does not mark the watched data as seen.
111    ///
112    /// See `with_watch_data()` for details.
113    pub fn cloned_watch_data(&self) -> T {
114        self.receiver.borrow().clone()
115    }
116
117    /// Calls [`watch::Receiver::changed()`] and returns the result.
118    /// Returns when the inner value has been updated, even if the old and new values are equal.
119    ///
120    /// Marks the watched data as seen.
121    pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
122        self.receiver.changed().await
123    }
124
125    /// Calls [`watch::Receiver::has_changed()`] and returns the result.
126    /// Returns `true` when the inner value has been updated, even if the old and new values are equal.
127    ///
128    /// Does not mark the watched data as seen.
129    pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
130        self.receiver.has_changed()
131    }
132
133    /// Marks the watched data as seen.
134    pub fn mark_as_seen(&mut self) {
135        self.receiver.borrow_and_update();
136    }
137
138    /// Marks the watched data as unseen.
139    /// Calls [`watch::Receiver::mark_changed()`].
140    pub fn mark_changed(&mut self) {
141        self.receiver.mark_changed();
142    }
143}