Sinks — Consuming Streams

A Stream describes a sequence of values. A Sink describes how to consume them. Together they form a complete pipeline: Stream → operators → Sink.

Built-in Sinks

Most of the time you don't write a Sink explicitly — you use one of the consuming methods on Stream:

#![allow(unused)]
fn main() {
// Collect all elements into a Vec
let users: Effect<Vec<User>, DbError, Db> = all_users().collect();

// Fold into a single value
let total: Effect<u64, DbError, Db> = orders()
    .fold(0u64, |acc, order| acc + order.amount);

// Run a side-effecting action for each element
let logged: Effect<(), DbError, Db> = events()
    .for_each(|event| log_event(event));

// Drain (discard all values, run for side effects only)
let drained: Effect<(), DbError, Db> = events()
    .map(|e| emit_metric(e))
    .drain();

// Take the first N elements
let first_ten: Effect<Vec<User>, DbError, Db> = all_users()
    .take(10)
    .collect();
}

Each of these methods turns a Stream<A, E, R> into an Effect<B, E, R>, which you can then run with run_blocking or compose further.

The Sink Trait

When the built-in consumers aren't enough, implement Sink:

#![allow(unused)]
fn main() {
use id_effect::{Sink, Chunk, Effect};

struct CsvWriter {
    path: PathBuf,
    written: usize,
}

impl Sink<Record> for CsvWriter {
    type Error = IoError;
    type Env   = ();

    fn on_chunk(
        &mut self,
        chunk: Chunk<Record>,
    ) -> Effect<(), IoError, ()> {
        effect! {
            for record in &chunk {
                ~ self.write_csv_line(record);
            }
            ()
        }
    }

    fn on_done(&mut self) -> Effect<(), IoError, ()> {
        effect! {
            ~ self.flush();
            ()
        }
    }
}
}

on_chunk is called for each chunk of elements. on_done is called once when the stream ends — use it to flush buffers or close handles.

Running a Stream into a Sink

#![allow(unused)]
fn main() {
let writer = CsvWriter::new("output.csv");

let effect: Effect<(), IoError, Db> = all_records()
    .run_into_sink(writer);

run_blocking(effect, env)?;
}

run_into_sink drives the stream and feeds each chunk to the sink. If the stream fails, on_done is not called — use resource scopes around the sink when cleanup is unconditionally required.

Sink Composition

Sinks can be composed: a ZipSink feeds the same stream to two sinks simultaneously:

#![allow(unused)]
fn main() {
let count_sink  = CountSink::new();
let csv_sink    = CsvWriter::new("out.csv");

// Both sinks receive every element
let combined = ZipSink::new(count_sink, csv_sink);

all_records().run_into_sink(combined)
}

Each element is delivered to both sinks in order. If either sink fails, the whole pipeline fails.

Finite vs Infinite Streams and Sinks

A Sink doesn't know whether its stream is finite or infinite. Combine with take, take_while, or take_until to bound an infinite stream before running it into a sink:

#![allow(unused)]
fn main() {
// Process at most 1 hour of events
let one_hour = Duration::from_secs(3600);

event_stream()
    .take_until(sleep(one_hour))
    .for_each(|event| process(event))
}

Summary

MethodReturnsUse when
.collect()Effect<Vec<A>, …>Small result sets that fit in memory
.fold(init, f)Effect<B, …>Single aggregated value
.for_each(f)Effect<(), …>Side effects per element
.drain()Effect<(), …>Discard results, keep side effects
.run_into_sink(s)Effect<(), …>Custom consumption logic