Summary

Zebra programmers need to carefully write async code so it doesn't deadlock or hang. This is particularly important for poll, select, Buffer, Batch, and Mutex.

Zebra executes concurrent tasks using async Rust, with the tokio executor.

At a higher level, Zebra also uses tower::Services, tower::Buffers, and our own tower-batch-control implementation.

Motivation

Like all concurrent codebases, Zebra needs to obey certain constraints to avoid hangs. Unfortunately, Rust's tooling in these areas is still developing. So Zebra developers need to manually check these constraints during design, development, reviews, and testing.

Definitions

  • hang: a Zebra component stops making progress.
  • constraint: a rule that Zebra must follow to prevent hangs.
  • CORRECTNESS comment: the documentation for a constraint in Zebra's code.
  • task: an async task can execute code independently of other tasks, using cooperative multitasking.
  • contention: slower execution because multiple tasks are waiting to acquire a lock, buffer/batch slot, or readiness.
  • missed wakeup: a task hangs because it is never scheduled for wakeup.
  • lock: exclusive access to a shared resource. Locks stop other code from running until they are released. For example, a mutex, buffer slot, or service readiness.
  • critical section: code that is executed while holding a lock.
  • deadlock: a hang that stops an async task executing code, because it is waiting for a lock, slot, or task readiness. For example: a task is waiting for a service to be ready, but the service readiness depends on that task making progress.
  • starvation or livelock: a hang that executes code, but doesn't do anything useful. For example: a loop never terminates.

Guide-level explanation

If you are designing, developing, or testing concurrent Zebra code, follow the patterns in these examples to avoid hangs.

If you are reviewing concurrent Zebra designs or code, make sure that:

  • it is clear how the design or code avoids hangs
  • the design or code follows the patterns in these examples (as much as possible)
  • the concurrency constraints and risks are documented

The Reference section contains in-depth background information about Rust async concurrency in Zebra.

Here are some examples of concurrent designs and documentation in Zebra:

Registering Wakeups Before Returning Poll::Pending

To avoid missed wakeups, futures must schedule a wakeup before they return Poll::Pending. For more details, see the Poll::Pending and Wakeups section.

Zebra's unready_service.rs uses the ready! macro to correctly handle Poll::Pending from the inner service.

You can see some similar constraints in pull request #1954.

#![allow(unused)]
fn main() {
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
//`ready!` returns `Poll::Pending` when the service is unready, and
// the inner `poll_ready` schedules this task for wakeup.
//
// `cancel.poll` also schedules this task for wakeup if it is canceled.
let res = ready!(this
    .service
    .as_mut()
    .expect("poll after ready")
    .poll_ready(cx));
}

Futures-Aware Mutexes

To avoid hangs or slowdowns, prefer futures-aware types, particularly for complex waiting or locking code. But in some simple cases, std::sync::Mutex is more efficient. For more details, see the Futures-Aware Types section.

Zebra's Handshake won't block other tasks on its thread, because it uses futures::lock::Mutex:

#![allow(unused)]
fn main() {
pub async fn negotiate_version(
    peer_conn: &mut Framed<TcpStream, Codec>,
    addr: &SocketAddr,
    config: Config,
    nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
    user_agent: String,
    our_services: PeerServices,
    relay: bool,
) -> Result<(Version, PeerServices), HandshakeError> {
    // Create a random nonce for this connection
    let local_nonce = Nonce::default();
    // # Correctness
    //
    // It is ok to wait for the lock here, because handshakes have a short
    // timeout, and the async mutex will be released when the task times
    // out.
    nonces.lock().await.insert(local_nonce);

    ...
}
}

Zebra's Inbound service can't use an async-aware mutex for its AddressBook, because the mutex is shared with non-async code. It only holds the mutex to clone the address book, reducing the amount of time that other tasks on its thread are blocked:

#![allow(unused)]
fn main() {
// # Correctness
//
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize after releasing
// the lock.
let peers = address_book.lock().unwrap().clone();
let mut peers = peers.sanitized();
}

Avoiding Deadlocks when Acquiring Buffer or Service Readiness

To avoid deadlocks, readiness and locks must be acquired in a consistent order. For more details, see the Acquiring Buffer Slots, Mutexes, or Readiness section.

Zebra's ChainVerifier avoids deadlocks, contention, and errors by:

  • calling poll_ready before each call
  • acquiring buffer slots for the earlier verifier first (based on blockchain order)
  • ensuring that buffers are large enough for concurrent tasks
#![allow(unused)]
fn main() {
// We acquire checkpoint readiness before block readiness, to avoid an unlikely
// hang during the checkpoint to block verifier transition. If the checkpoint and
// block verifiers are contending for the same buffer/batch, we want the checkpoint
// verifier to win, so that checkpoint verification completes, and block verification
// can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
//
// The chain verifier holds one slot in each verifier, for each concurrent task.
// Therefore, any shared buffers or batches polled by these verifiers should double
// their bounds. (For example, the state service buffer.)
ready!(self
    .checkpoint
    .poll_ready(cx)
    .map_err(VerifyChainError::Checkpoint))?;
ready!(self.block.poll_ready(cx).map_err(VerifyChainError::Block))?;
Poll::Ready(Ok(()))
}

Critical Section Compiler Errors

To avoid deadlocks or slowdowns, critical sections should be as short as possible, and they should not depend on any other tasks. For more details, see the Acquiring Buffer Slots, Mutexes, or Readiness section.

Zebra's CandidateSet must release a std::sync::Mutex lock before awaiting a tokio::time::Sleep future. This ensures that the threaded mutex lock isn't held over the await point.

If the lock isn't dropped, compilation fails, because the mutex lock can't be sent between threads.

#![allow(unused)]
fn main() {
// # Correctness
//
// In this critical section, we hold the address mutex, blocking the
// current thread, and all async tasks scheduled on that thread.
//
// To avoid deadlocks, the critical section:
// - must not acquire any other locks
// - must not await any futures
//
// To avoid hangs, any computation in the critical section should
// be kept to a minimum.
let reconnect = {
    let mut guard = self.address_book.lock().unwrap();
    ...
    let reconnect = guard.reconnection_peers().next()?;

    let reconnect = MetaAddr::new_reconnect(&reconnect.addr, &reconnect.services);
    guard.update(reconnect);
    reconnect
};

// SECURITY: rate-limit new candidate connections
sleep.await;
}

Sharing Progress between Multiple Futures

To avoid starvation and deadlocks, tasks that depend on multiple futures should make progress on all of those futures. This is particularly important for tasks that depend on their own outputs. For more details, see the Unbiased Selection section.

Zebra's peer crawler task avoids starvation and deadlocks by:

You can see a range of hang fixes in pull request #1950.

#![allow(unused)]
fn main() {
// CORRECTNESS
//
// To avoid hangs and starvation, the crawler must:
// - spawn a separate task for each handshake, so they can make progress
//   independently (and avoid deadlocking each other)
// - use the `select!` macro for all actions, because the `select` function
//   is biased towards the first ready future

loop {
    let crawler_action = tokio::select! {
        a = handshakes.next() => a,
        a = crawl_timer.next() => a,
        _ = demand_rx.next() => {
            if let Some(candidate) = candidates.next().await {
                // candidates.next has a short delay, and briefly holds the address
                // book lock, so it shouldn't hang
                DemandHandshake { candidate }
            } else {
                DemandCrawl
            }
        }
    };

    match crawler_action {
        DemandHandshake { candidate } => {
            // spawn each handshake into an independent task, so it can make
            // progress independently of the crawls
            let hs_join =
                tokio::spawn(dial(candidate, connector.clone()));
            handshakes.push(Box::pin(hs_join));
        }
        DemandCrawl => {
            // update has timeouts, and briefly holds the address book
            // lock, so it shouldn't hang
            candidates.update().await?;
        }
        // handle handshake responses and the crawl timer
    }
}
}

Prioritising Cancellation Futures

To avoid starvation, cancellation futures must take priority over other futures, if multiple futures are ready. For more details, see the Biased Selection section.

Zebra's connection.rs avoids hangs by prioritising the cancel and timer futures over the peer receiver future. Under heavy load, the peer receiver future could always be ready with a new message, starving the cancel or timer futures.

You can see a range of hang fixes in pull request #1950.

#![allow(unused)]
fn main() {
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
let cancel = future::select(tx.cancellation(), timer_ref);
match future::select(cancel, peer_rx.next()) {
    ...
}
}

Atomic Shutdown Flag

As of April 2021, Zebra implements some shutdown checks using an atomic bool.

Zebra's shutdown.rs avoids data races and missed updates by using the strongest memory ordering (SeqCst).

We plan to replace this raw atomic code with a channel, see #1678.

#![allow(unused)]
fn main() {
/// A flag to indicate if Zebra is shutting down.
///
/// Initialized to `false` at startup.
pub static IS_SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);

/// Returns true if the application is shutting down.
pub fn is_shutting_down() -> bool {
    // ## Correctness:
    //
    // Since we're shutting down, and this is a one-time operation,
    // performance is not important. So we use the strongest memory
    // ordering.
    // https://doc.rust-lang.org/nomicon/atomics.html#sequentially-consistent
    IS_SHUTTING_DOWN.load(Ordering::SeqCst)
}

/// Sets the Zebra shutdown flag to `true`.
pub fn set_shutting_down() {
    IS_SHUTTING_DOWN.store(true, Ordering::SeqCst);
}
}

Integration Testing Async Code

Sometimes, it is difficult to unit test async code, because it has complex dependencies. For more details, see the Testing Async Code section.

zebrad's acceptance tests run short Zebra syncs on the Zcash mainnet or testnet. These acceptance tests make sure that zebrad can:

  • sync blocks using its async block download and verification pipeline
  • cancel a sync
  • reload disk state after a restart

These tests were introduced in pull request #1193.

#![allow(unused)]
fn main() {
/// Test if `zebrad` can sync some larger checkpoints on mainnet.
#[test]
fn sync_large_checkpoints_mainnet() -> Result<()> {
    let reuse_tempdir = sync_until(
        LARGE_CHECKPOINT_TEST_HEIGHT,
        Mainnet,
        STOP_AT_HEIGHT_REGEX,
        LARGE_CHECKPOINT_TIMEOUT,
        None,
    )?;

    // if stopping corrupts the rocksdb database, zebrad might hang or crash here
    // if stopping does not write the rocksdb database to disk, Zebra will
    // sync, rather than stopping immediately at the configured height
    sync_until(
        (LARGE_CHECKPOINT_TEST_HEIGHT - 1).unwrap(),
        Mainnet,
        "previous state height is greater than the stop height",
        STOP_ON_LOAD_TIMEOUT,
        Some(reuse_tempdir),
    )?;

    Ok(())
}
}

Instrumenting Async Functions

Sometimes, it is difficult to debug async code, because there are many tasks running concurrently. For more details, see the Monitoring Async Code section.

Zebra runs instrumentation on some of its async function using tracing. Here's an instrumentation example from Zebra's sync block downloader:

#![allow(unused)]
fn main() {
/// Queue a block for download and verification.
///
/// This method waits for the network to become ready, and returns an error
/// only if the network service fails. It returns immediately after queuing
/// the request.
#[instrument(level = "debug", skip(self), fields(%hash))]
pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> {
    ...
}
}

Tracing and Metrics in Async Functions

Sometimes, it is difficult to monitor async code, because there are many tasks running concurrently. For more details, see the Monitoring Async Code section.

Zebra's client requests are monitored via:

  • trace and debug logs using tracing crate
  • related work spans using the tracing crate
  • counters using the metrics crate
#![allow(unused)]
fn main() {
/// Handle an incoming client request, possibly generating outgoing messages to the
/// remote peer.
///
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
async fn handle_client_request(&mut self, req: InProgressClientRequest) {
    trace!(?req.request);

    let InProgressClientRequest { request, tx, span } = req;

    if tx.is_canceled() {
        metrics::counter!("peer.canceled", 1);
        tracing::debug!("ignoring canceled request");
        return;
    }
    ...
}
}

Reference-level explanation

The reference section contains in-depth information about concurrency in Zebra:

Most Zebra designs or code changes will only touch on one or two of these areas.

After an await, the rest of the Future might not be run

Futures can be "canceled" at any await point. Authors of futures must be aware that after an await, the code might not run. Futures might be polled to completion causing the code to work. But then many years later, the code is changed and the future might conditionally not be polled to completion which breaks things. The burden falls on the user of the future to poll to completion, and there is no way for the lib author to enforce this - they can only document this invariant.

https://github.com/rust-lang/wg-async-foundations/blob/master/src/vision/submitted_stories/status_quo/alan_builds_a_cache.md#-frequently-asked-questions

In particular, FutureExt::now_or_never:

  • drops the future, and
  • doesn't schedule the task for wakeups.

So even if the future or service passed to now_or_never is cloned, the task won't be awoken when it is ready again.

Task starvation

Tokio tasks are scheduled cooperatively:

a task is allowed to run until it yields, indicating to the Tokio runtime’s scheduler that it cannot currently continue executing. When a task yields, the Tokio runtime switches to executing the next task.

If a task doesn't yield during a CPU-intensive operation, or a tight loop, it can starve other tasks on the same thread. This can cause hangs or timeouts.

There are a few different ways to avoid task starvation:

Poll::Pending and Wakeups

When returning Poll::Pending, poll functions must ensure that the task will be woken up when it is ready to make progress.

In most cases, the poll function calls another poll function that schedules the task for wakeup.

Any code that generates a new Poll::Pending should either have:

  • a CORRECTNESS comment explaining how the task is scheduled for wakeup, or
  • a wakeup implementation, with tests to ensure that the wakeup functions as expected.

Note: poll functions often have a qualifier, like poll_ready or poll_next.

Futures-Aware Types

Prefer futures-aware types in complex locking or waiting code, rather than types which will block the current thread.

For example:

  • Use futures::lock::Mutex rather than std::sync::Mutex
  • Use tokio::time::{sleep, timeout} rather than std::thread::sleep

Always qualify ambiguous names like Mutex and sleep, so that it is obvious when a call will block.

If you are unable to use futures-aware types:

  • block the thread for as short a time as possible
  • document the correctness of each blocking call
  • consider re-designing the code to use tower::Services, or other futures-aware types

In some simple cases, std::sync::Mutex is correct and more efficient, when:

  • the value behind the mutex is just data, and
  • the locking behaviour is simple.

In these cases:

wrap the Arc<Mutex<...>> in a struct that provides non-async methods for performing operations on the data within, and only lock the mutex inside these methods

For more details, see the tokio documentation.

Acquiring Buffer Slots, Mutexes, or Readiness

Ideally, buffer slots, mutexes, or readiness should be:

  • acquired with one lock per critical section, and
  • held for as short a time as possible.

If multiple locks are required for a critical section, acquire them in the same order any time those locks are used. If tasks acquire multiple locks in different orders, they can deadlock, each holding a lock that the other needs.

If a buffer, mutex, future or service has complex readiness dependencies, schedule those dependencies separate tasks using tokio::spawn. Otherwise, it might deadlock due to a dependency loop within a single executor task.

Carefully read the documentation of the channel methods you call, to check if they lock. For example, tokio::sync::watch::Receiver::borrow holds a read lock, so the borrowed data should always be cloned. Use Arc for efficient clones if needed.

Never have two active watch borrow guards in the same scope, because that can cause a deadlock. The watch::Sender may start acquiring a write lock while the first borrow guard is active but the second one isn't. That means that the first read lock was acquired, but the second never will be because starting to acquire the write lock blocks any other read locks from being acquired. At the same time, the write lock will also never finish acquiring, because it waits for all read locks to be released, and the first read lock won't be released before the second read lock is acquired.

In all of these cases:

  • make critical sections as short as possible, and
  • do not depend on other tasks or locks inside the critical section.

Acquiring Service Readiness

Note: do not call poll_ready on multiple tasks, then match against the results. Use the ready! macro instead, to acquire service readiness in a consistent order.

Buffer and Batch

The constraints imposed by the tower::Buffer and tower::Batch implementations are:

  1. poll_ready must be called at least once for each call
  2. Once we've reserved a buffer slot, we always get Poll::Ready from a buffer, regardless of the current readiness of the buffer or its underlying service
  3. The Buffer/Batch capacity limits the number of concurrently waiting tasks. Once this limit is reached, further tasks will block, awaiting a free reservation.
  4. Some tasks can depend on other tasks before they resolve. (For example: block validation.) If there are task dependencies, the Buffer/Batch capacity must be larger than the maximum number of concurrently waiting tasks, or Zebra could deadlock (hang).

We also avoid hangs because:

  • the timeouts on network messages, block downloads, and block verification will restart verification if it hangs
  • Buffer and Batch release their reservations when response future is returned by the buffered/batched service, even if the returned future hangs
    • in general, we should move as much work into futures as possible, unless the design requires sequential calls
  • larger Buffer/Batch bounds

Buffered Services

A service should be provided wrapped in a Buffer if:

  • it is a complex service
  • it has multiple callers, or
  • it has a single caller that calls it multiple times concurrently.

Services might also have other reasons for using a Buffer. These reasons should be documented.

Choosing Buffer Bounds

Zebra's Buffer bounds should be set to the maximum number of concurrent requests, plus 1:

it's advisable to set bound to be at least the maximum number of concurrent requests the Buffer will see https://docs.rs/tower/0.4.3/tower/buffer/struct.Buffer.html#method.new

The extra slot protects us from future changes that add an extra caller, or extra concurrency.

As a general rule, Zebra Buffers should all have at least 5 slots, because most Zebra services can be called concurrently by:

  • the sync service,
  • the inbound service, and
  • multiple concurrent zebra-client blockchain scanning tasks.

Services might also have other reasons for a larger bound. These reasons should be documented.

We should limit Buffer lengths for services whose requests or responses contain Blocks (or other large data items, such as Transaction vectors). A long Buffer full of Blocks can significantly increase memory usage.

For example, parsing a malicious 2 MB block can take up to 12 MB of RAM. So a 5 slot buffer can use 60 MB of RAM.

Long Buffers can also increase request latency. Latency isn't a concern for Zebra's core use case as a node software, but it might be an issue if wallets, exchanges, or block explorers want to use Zebra.

Awaiting Multiple Futures

When awaiting multiple futures, Zebra can use biased or unbiased selection.

Typically, we prefer unbiased selection, so that if multiple futures are ready, they each have a chance of completing. But if one of the futures needs to take priority (for example, cancellation), you might want to use biased selection.

Unbiased Selection

The futures::select! and tokio::select! macros select ready arguments at random by default.

To poll a select! in order, pass biased; as the first argument to the macro.

Also consider the FuturesUnordered stream for unbiased selection of a large number of futures. However, this macro and stream require mapping all arguments to the same type.

Consider mapping the returned type to a custom enum with module-specific names.

Biased Selection

The futures::select is biased towards its first argument. If the first argument is always ready, the second argument will never be returned. (This behavior is not documented or guaranteed.) This bias can cause starvation or hangs. Consider edge cases where queues are full, or there are a lot of messages. If in doubt:

  • put shutdown or cancel oneshots first, then timers, then other futures
  • use the select! macro to ensure fairness

Select's bias can be useful to ensure that cancel oneshots and timers are always executed first. Consider the select_biased! macro and FuturesOrdered stream for guaranteed ordered selection of futures. (However, this macro and stream require mapping all arguments to the same type.)

The futures::select Either return type is complex, particularly when nested. This makes code hard to read and maintain. Map the Either to a custom enum.

Replacing Atomics with Channels

If you're considering using atomics, prefer a safe, tested, portable abstraction, like tokio's watch or oneshot channels.

In Zebra, we try to use safe abstractions, and write obviously correct code. It takes a lot of effort to write, test, and maintain low-level code. Almost all of our performance-critical code is in cryptographic libraries. And our biggest performance gains from those libraries come from async batch cryptography.

We are gradually replacing atomics with channels in Zebra.

Atomic Risks

Some atomic sizes and atomic operations are not available on some platforms. Others come with a performance penalty on some platforms.

It's also easy to use a memory ordering that's too weak. Future code changes might require a stronger memory ordering. But it's hard to test for these kinds of memory ordering bugs.

Some memory ordering bugs can only be discovered on non-x86 platforms. And when they do occur, they can be rare. x86 processors guarantee strong orderings, even for Relaxed accesses. Since Zebra's CI all runs on x86 (as of June 2021), our tests get AcqRel orderings, even when we specify Relaxed. But ARM processors like the Apple M1 implement weaker memory orderings, including genuinely Relaxed access. For more details, see the hardware reordering section of the Rust nomicon.

But if a Zebra feature requires atomics:

  1. use an AtomicUsize with the strongest memory ordering (SeqCst)
  2. use a weaker memory ordering, with:
  • a correctness comment,
  • multithreaded tests with a concurrency permutation harness like loom, on x86 and ARM, and
  • benchmarks to prove that the low-level code is faster.

Tokio's watch channel uses SeqCst for reads and writes to its internal "version" atomic. So Zebra should do the same.

Testing Async Code

Zebra's existing acceptance and integration tests will catch most hangs and deadlocks.

Some tests are only run after merging to main. If a recently merged PR fails on main, we revert the PR, and fix the failure.

Some concurrency bugs only happen intermittently. Zebra developers should run regular full syncs to ensure that their code doesn't cause intermittent hangs. This is particularly important for code that modifies Zebra's highly concurrent crates:

  • zebrad
  • zebra-network
  • zebra-state
  • zebra-consensus
  • tower-batch-control
  • tower-fallback

Monitoring Async Code

Zebra uses the following crates for monitoring and diagnostics:

These introspection tools are also useful during testing:

  • tracing logs individual events
    • spans track related work through the download and verification pipeline
  • metrics monitors overall progress and error rates
    • labels split counters or gauges into different categories (for example, by peer address)

Drawbacks

Implementing and reviewing these constraints creates extra work for developers. But concurrency bugs slow down every developer, and impact users. And diagnosing those bugs can take a lot of developer effort.

Unresolved questions

Can we catch these bugs using automated tests?

How can we diagnose these kinds of issues faster and more reliably?