Sinks — Consuming Streams
A Stream describes a sequence of values. A Sink describes how to consume them. Together they form a complete pipeline: Stream → operators → Sink.
Built-in Sinks
Most of the time you don't write a Sink explicitly — you use one of the consuming methods on Stream:
#![allow(unused)] fn main() { // Collect all elements into a Vec let users: Effect<Vec<User>, DbError, Db> = all_users().collect(); // Fold into a single value let total: Effect<u64, DbError, Db> = orders() .fold(0u64, |acc, order| acc + order.amount); // Run a side-effecting action for each element let logged: Effect<(), DbError, Db> = events() .for_each(|event| log_event(event)); // Drain (discard all values, run for side effects only) let drained: Effect<(), DbError, Db> = events() .map(|e| emit_metric(e)) .drain(); // Take the first N elements let first_ten: Effect<Vec<User>, DbError, Db> = all_users() .take(10) .collect(); }
Each of these methods turns a Stream<A, E, R> into an Effect<B, E, R>, which you can then run with run_blocking or compose further.
The Sink Trait
When the built-in consumers aren't enough, implement Sink:
#![allow(unused)] fn main() { use id_effect::{Sink, Chunk, Effect}; struct CsvWriter { path: PathBuf, written: usize, } impl Sink<Record> for CsvWriter { type Error = IoError; type Env = (); fn on_chunk( &mut self, chunk: Chunk<Record>, ) -> Effect<(), IoError, ()> { effect! { for record in &chunk { ~ self.write_csv_line(record); } () } } fn on_done(&mut self) -> Effect<(), IoError, ()> { effect! { ~ self.flush(); () } } } }
on_chunk is called for each chunk of elements. on_done is called once when the stream ends — use it to flush buffers or close handles.
Running a Stream into a Sink
#![allow(unused)] fn main() { let writer = CsvWriter::new("output.csv"); let effect: Effect<(), IoError, Db> = all_records() .run_into_sink(writer); run_blocking(effect, env)?; }
run_into_sink drives the stream and feeds each chunk to the sink. If the stream fails, on_done is not called — use resource scopes around the sink when cleanup is unconditionally required.
Sink Composition
Sinks can be composed: a ZipSink feeds the same stream to two sinks simultaneously:
#![allow(unused)] fn main() { let count_sink = CountSink::new(); let csv_sink = CsvWriter::new("out.csv"); // Both sinks receive every element let combined = ZipSink::new(count_sink, csv_sink); all_records().run_into_sink(combined) }
Each element is delivered to both sinks in order. If either sink fails, the whole pipeline fails.
Finite vs Infinite Streams and Sinks
A Sink doesn't know whether its stream is finite or infinite. Combine with take, take_while, or take_until to bound an infinite stream before running it into a sink:
#![allow(unused)] fn main() { // Process at most 1 hour of events let one_hour = Duration::from_secs(3600); event_stream() .take_until(sleep(one_hour)) .for_each(|event| process(event)) }
Summary
| Method | Returns | Use when |
|---|---|---|
.collect() | Effect<Vec<A>, …> | Small result sets that fit in memory |
.fold(init, f) | Effect<B, …> | Single aggregated value |
.for_each(f) | Effect<(), …> | Side effects per element |
.drain() | Effect<(), …> | Discard results, keep side effects |
.run_into_sink(s) | Effect<(), …> | Custom consumption logic |