Backpressure Policies — Controlling Flow

A stream is a pipeline: a producer emits data, operators transform it, a consumer processes it. Problems arise when the producer is faster than the consumer. Backpressure is the mechanism that handles this mismatch.

The Problem

Producer: emits 10,000 events/sec
Consumer: processes 1,000 events/sec

What happens to the 9,000 surplus events per second?

Your options are: block the producer, drop events, or buffer them. Each is correct in different contexts. id_effect makes the choice explicit via BackpressurePolicy.

BackpressurePolicy

#![allow(unused)]
fn main() {
use id_effect::BackpressurePolicy;

// Block the producer until the consumer catches up (default for bounded channels)
BackpressurePolicy::Block

// Drop the newest events when the buffer is full
BackpressurePolicy::DropLatest

// Drop the oldest events when the buffer is full (keep the freshest data)
BackpressurePolicy::DropOldest

// Unbounded buffering — never drop, never block (use carefully)
BackpressurePolicy::Unbounded
}

Applying a Policy to a Channel-Backed Stream

The most common place to specify backpressure is when bridging from a channel to a Stream:

#![allow(unused)]
fn main() {
use id_effect::{stream_from_channel_with_policy, BackpressurePolicy};
use std::sync::mpsc;

let (tx, rx) = mpsc::channel::<Event>();

// Drop old events; always reflect the latest state
let stream = stream_from_channel_with_policy(rx, 1024, BackpressurePolicy::DropOldest);
}

Contrast with stream_from_channel, which uses Block by default. If you don't think about backpressure at this point, Block is the safe choice — you won't lose data, but a slow consumer will slow down the producer.

Choosing a Policy

ScenarioPolicy
Financial transactions — no data loss acceptableBlock
Real-time sensor readings — only latest mattersDropOldest
Log pipeline — drop excess if overwhelmedDropLatest
Batch import — control memory, halt on overflowBlock
Dashboard metrics — fresh data over completenessDropOldest

Stream-Level Backpressure

Streams composed with flat_map or merge also have implicit backpressure: downstream operators signal upstream when they can accept more work. This happens automatically and doesn't require a policy setting — the Stream runtime handles it.

For explicit control over concurrency in flat_map:

#![allow(unused)]
fn main() {
stream
    .flat_map_with_concurrency(4, |id| fetch_record(id))
    // Only 4 fetch_record effects run concurrently
    // Others wait until a slot frees — natural backpressure
}

Monitoring Drops

When using DropLatest or DropOldest, you often want to know how many events were dropped:

#![allow(unused)]
fn main() {
let (stream, dropped_counter) = stream_from_channel_with_policy_and_counter(
    rx,
    1024,
    BackpressurePolicy::DropOldest,
);

// Periodically log the counter
effect! {
    loop {
        let n = dropped_counter.load(Ordering::Relaxed);
        if n > 0 {
            ~ log.warn(format!("Dropped {n} events due to backpressure"));
        }
        ~ sleep(Duration::from_secs(10));
    }
}
}

Summary

Always choose a backpressure policy explicitly. The default (Block) is safe but can stall producers. DropOldest is often right for real-time data. DropLatest is right when order matters but throughput doesn't. Unbounded is only acceptable when the rate is truly bounded by the domain.