datastream/stream
Single-stream middle and composition combinators over Stream(a).
All combinators are lazy and order-preserving (unless documented
otherwise). Building a pipeline with these combinators triggers no
user callbacks; callbacks fire only when a terminal in fold or
sink pulls the result.
Early-exit combinators (take, take_while) stop pulling upstream
the moment their result is determined, which is what makes them
safe on infinite sources.
Every combinator here forwards the upstream’s close callback so
resource-backed streams (source.resource) are released on every
termination path. Combinators that observe a Done from the
upstream do NOT call close again — the source closed itself when
it returned Done. Combinators that hold multiple upstreams open
at once close them in the order specified in the spec
(flat_map: inner before outer; zip: right before left).
Types
Why a checked stream constructor refused its argument.
Returned by take_checked, drop_checked, buffer_checked,
chunks_of_checked, and any future non-panicking variant of a
constructor that would otherwise reject its argument with a
panic. Lets the caller surface argument-validation failures
through Result instead of crashing the process — useful when
the numeric argument comes from CLI flags, config files, or
request parameters.
function names the constructor that rejected the value
("take", "drop", "buffer", "chunks_of") so a caller
routing many checked constructors through the same handler can
produce a meaningful error message.
NegativeCountcovers constructors whose contract is>= 0(take,drop).NotPositivecovers constructors whose contract is>= 1(buffer,chunks_of).
pub type StreamArgError {
NegativeCount(function: String, given: Int)
NotPositive(function: String, given: Int)
}
Constructors
-
NegativeCount(function: String, given: Int) -
NotPositive(function: String, given: Int)
Values
pub fn append(
first: datastream.Stream(a),
second: datastream.Stream(a),
) -> datastream.Stream(a)
Yield all of first, then all of second. If first is infinite,
second is never opened.
Honours the spec close ordering: second is opened only after
first returns Done (and so has already closed itself); on early
exit before first finishes, second is never opened.
Close contract: first self-closes when it returns Done;
append does not call close on it again. After first is
exhausted, second’s elements are yielded directly — when second
itself returns Done, it self-closes in the same way. On early
exit mid-second, the downstream’s close call propagates to the
active second node.
pub fn broadcast(
over stream: datastream.Stream(a),
into n: Int,
) -> List(datastream.Stream(a))
Split stream into n independent consumer streams that share a
single underlying source.
Each consumer pulls at its own pace; any element the source produces is observed by every still-alive consumer in order. A consumer that lags behind sees the same prefix as a fast consumer, just later. This is the fan-out / pub-sub combinator: an observability tap can run in parallel with the main pipeline without forcing the source to be re-evaluated, which matters for resource-backed and otherwise non-replayable sources.
n MUST be >= 1. A n < 1 is rejected at construction time
with a panic per the datastream module-level invalid-argument
policy.
Buffer size — unbounded by design. Per-consumer queues grow
without limit. The worst-case memory footprint is
O(max_pull_distance × n), where max_pull_distance is the
difference between the fastest and slowest consumer’s pull count
at any point in time, and n is the consumer count. Concretely:
if consumer A pulls 1 000 000 elements while B is paused, B’s
queue holds 1 000 000 elements until B catches up.
For resource-backed sources of bounded cardinality this is fine.
For cardinality-unbounded sources (source.iterate,
source.repeat, source.unfold of an infinite seed) it is a
silent memory hazard. Two safer composition patterns:
- Cap each consumer:
broadcast(s, n) |> list.map(stream.take(_, k))so the slowest consumer’s queue is bounded byk - 1. - Use
broadcast_bounded(s, n, max_queue)for an explicit per-consumer queue limit. Exceeding the limit panics with a structured message rather than letting the process OOM in production.
Cross-target: implemented via a small FFI-backed mutable
reference (datastream/internal/ref). On Erlang the cell is a
process-dictionary slot; on JavaScript it is a one-field mutable
object. No gleam_erlang processes are spawned; no shared global
state. The Ref(_) is local to each broadcast call.
Close contract: every consumer’s close is recorded; the
upstream is closed exactly once, on whichever call brings the
active-consumer count to zero. If the upstream returns Done
first it self-closes, and the close callbacks are no-ops.
pub fn broadcast_bounded(
over stream: datastream.Stream(a),
into n: Int,
max_queue max_queue: Int,
) -> List(datastream.Stream(a))
Like broadcast, but each per-consumer queue is capped at
max_queue elements. If a fanned-out element would push any
consumer’s queue beyond that bound, the next upstream pull panics
with a structured message naming the limit. Use this in
production wiring (HTTP fan-out, websocket multicast, Kafka
producer tees, …) where a stalled slow consumer must surface as a
crash instead of an OOM.
n MUST be >= 1 and max_queue MUST be >= 1. Both checks
are construction-time and panic on failure, matching the
datastream module-level invalid-argument policy.
Behaviour on success is identical to broadcast(s, n) until the
queue bound is hit. Until then there is no overhead beyond a
per-pull list.length check on the freshly-enqueued queues.
What “exceeded” means. The check fires after the producing
pull has already enqueued the element to every still-alive
consumer that wasn’t the puller. A consumer queue that was at
max_queue - 1 and gets one more element is fine; one that was
at max_queue and would become max_queue + 1 triggers the
panic. The slowest consumer therefore has up to max_queue
elements in its queue before the next overflowing pull stops the
pipeline.
Pair with broadcast’s docstring above for the queue-growth
formula and the unbounded baseline.
pub fn buffer(
over stream: datastream.Stream(a),
prefetch capacity: Int,
) -> datastream.Stream(a)
Eagerly pull capacity elements ahead from stream and yield them
to the consumer at its pace, refilling the internal queue back to
capacity after every consumer pull.
buffer is the prefetch combinator: it decouples the consumer’s
pull cadence from upstream latency. For latency-bound upstreams
(HTTP body bytes, cold reads from disk) the consumer can do its own
per-element work in parallel with the next upstream pull instead of
blocking serially on each one.
capacity MUST be >= 1. A capacity < 1 is rejected at
construction time with a panic per the datastream module-level
invalid-argument policy — capacity == 0 would defeat the point
of buffering, and a negative capacity is programmer error.
Element type, order, and cardinality are preserved. When upstream
returns Done, any already-buffered elements are still drained
before the buffered stream itself emits Done. On consumer-side
early termination the upstream is closed once and any unconsumed
buffered elements are discarded — the upstream produced them in
good faith but resource cleanup wins over delivery, matching
take’s behaviour.
pub fn buffer_checked(
over stream: datastream.Stream(a),
prefetch capacity: Int,
) -> Result(datastream.Stream(a), StreamArgError)
Like buffer, but returns the argument-validation failure as a
Result instead of panicking. Use this when capacity comes
from dynamic input (CLI, config, request parameters); the
panicking buffer remains the right tool for trusted constants.
On success the stream behaves identically to
buffer(over: stream, prefetch: capacity).
The caller is responsible for closing stream if the
constructor returns Error — no upstream pull happens, but the
upstream is otherwise untouched.
pub fn chunks_of(
over stream: datastream.Stream(a),
into size: Int,
) -> datastream.Stream(chunk.Chunk(a))
Group adjacent elements into fixed-size chunks.
size MUST be >= 1. A size < 1 is rejected at construction
time with a panic per the datastream module-level
invalid-argument policy.
The trailing chunk may be smaller than size when the source
length is not divisible.
pub fn chunks_of_checked(
over stream: datastream.Stream(a),
into size: Int,
) -> Result(datastream.Stream(chunk.Chunk(a)), StreamArgError)
Like chunks_of, but returns the argument-validation failure as
a Result instead of panicking. Use this when size comes from
dynamic input (CLI, config, request parameters); the panicking
chunks_of remains the right tool for trusted constants.
On success the stream behaves identically to
chunks_of(over: stream, into: size).
The caller is responsible for closing stream if the
constructor returns Error.
pub fn concat(
streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)
Walk a List(Stream(a)) in list order, yielding every element of
every stream.
Each inner stream is closed (by reaching its own Done) before the
next one is opened. On early exit, the active stream is closed.
pub fn dedupe_adjacent(
stream: datastream.Stream(a),
) -> datastream.Stream(a)
Collapse runs of ==-equal adjacent values to a single occurrence.
pub fn drop(
over stream: datastream.Stream(a),
up_to n: Int,
) -> datastream.Stream(a)
Discard the first n elements; n == 0 is the identity.
n MUST be >= 0. A negative n is rejected at construction
time with a panic per the datastream module-level
invalid-argument policy.
Resource handling at n == 0 — identity semantics: no
upstream pull, no close. Asymmetric with take(s, 0) (which
closes eagerly). Callers building drop(s, 0) for unknown n
and discarding the result without consuming it are responsible
for closing s themselves. See take’s “Asymmetry of take(s, 0) and drop(s, 0)” subsection above for the side-by-side table
and recommended practice.
pub fn drop_checked(
over stream: datastream.Stream(a),
up_to n: Int,
) -> Result(datastream.Stream(a), StreamArgError)
Like drop, but returns the argument-validation failure as a
Result instead of panicking. Use this when n comes from
dynamic input.
On success the stream behaves identically to
drop(over: stream, up_to: n).
The caller is responsible for closing stream if the
constructor returns Error.
pub fn drop_while(
over stream: datastream.Stream(a),
satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)
Discard the longest prefix where predicate holds, then yield the rest.
pub fn filter(
over stream: datastream.Stream(a),
keeping predicate: fn(a) -> Bool,
) -> datastream.Stream(a)
Keep only the elements for which predicate returns True.
Relative order of the survivors is preserved.
pub fn filter_map(
over stream: datastream.Stream(a),
with f: fn(a) -> option.Option(b),
) -> datastream.Stream(b)
Apply f to each element and keep only the Some(x) results.
pub fn flat_map(
over stream: datastream.Stream(a),
with f: fn(a) -> datastream.Stream(b),
) -> datastream.Stream(b)
Apply f to each element and concatenate the inner streams it
produces.
Pulls one inner stream at a time; the next inner stream is not
constructed until the previous one is exhausted. On early exit
before the outer is Done, the active inner is closed first, then
the outer.
Close contract: when an inner stream returns Done, flat_map
does NOT call close on it — the source is expected to have
released its resources when it returned Done (the self-close
convention followed by source.resource). Custom streams built
with make must release resources inside next on the Done
path if they hold any.
pub fn flatten(
streams: datastream.Stream(datastream.Stream(a)),
) -> datastream.Stream(a)
Flatten a stream of streams into a single stream.
Equivalent to flat_map(s, fn(x) { x }); the close ordering is the
same.
pub fn group_adjacent(
over stream: datastream.Stream(a),
by key: fn(a) -> k,
) -> datastream.Stream(#(k, chunk.Chunk(a)))
Group consecutive elements that share key(element).
pub fn interrupt_when(
over stream: datastream.Stream(a),
signal signal: datastream.Stream(Bool),
) -> datastream.Stream(a)
Terminate stream on the first True element from signal.
interrupt_when is the external counterpart of take_while:
take_while ends a stream when a pulled element fails a
predicate; interrupt_when ends it when an unrelated stream
produces a fire signal — typically a timer expiry, a parent-
shutdown probe, or a cancel-token bridge built with
from_subject.
On every consumer pull, signal is checked first:
signalyieldsTrue→ bothstreamand the rest ofsignalare closed and the consumer seesDoneimmediately.signalyieldsFalse→ that signal element is consumed; the pull descends intostreamand yields its next element. Subsequent consumer pulls see the rest ofsignal.signalreturnsDone→ the consumer pull descends intostream.signalis then ignored on subsequent pulls; aDoneproducer is never re-pulled per theStepcontract.
The Stream(Bool) shape (rather than Stream(Nil)) is what
makes counted / time-based signals representable in the pull
model — Done cannot mean “not yet, ask again later” because
Done is terminal, so a “fire after N consumer pulls” signal
is built as False-stream-of-length-N followed by True.
Close ordering: on consumer-side early termination signal
is closed first, then stream — right-before-left, matching
zip and flat_map.
pub fn intersperse(
over stream: datastream.Stream(a),
with separator: a,
) -> datastream.Stream(a)
Insert separator between adjacent elements.
Empty and single-element streams are unchanged.
pub fn map(
over stream: datastream.Stream(a),
with f: fn(a) -> b,
) -> datastream.Stream(b)
Apply f to every element. Cardinality and order are preserved.
pub fn map_accum(
over stream: datastream.Stream(a),
from initial: state,
with step: fn(state, a) -> #(state, b),
) -> datastream.Stream(b)
Thread a state through the stream while emitting elements of a possibly different type.
pub fn scan(
over stream: datastream.Stream(a),
from initial: b,
with step: fn(b, a) -> b,
) -> datastream.Stream(b)
Left-fold the stream, emitting one output per input.
The seed itself is NOT emitted, so output cardinality equals input cardinality. On empty input the output is empty.
pub fn take(
over stream: datastream.Stream(a),
up_to n: Int,
) -> datastream.Stream(a)
Yield at most the first n elements; n == 0 yields the empty
stream.
n MUST be >= 0. A negative n is rejected at construction
time with a panic per the datastream module-level
invalid-argument policy.
Stops pulling upstream the moment the nth element has been
emitted, and closes the upstream on that early exit. This is what
makes take safe on infinite resource-backed sources.
Resource handling at n == 0 — early-exit semantics extend to
the degenerate case: the first pull returns Done and closes
the upstream. This is asymmetric with drop(s, 0), which is the
identity (no close, no pull). For dynamic-n pipelines built from
take_checked / drop_checked where n may end up 0, see the
“Asymmetry of take(s, 0) and drop(s, 0)” subsection below for
guidance.
Asymmetry of take(s, 0) and drop(s, 0)
| Constructor | Pull behaviour at n == 0 | Resource handling |
|---|---|---|
take(s, 0) | First pull returns Done | Closes s eagerly |
drop(s, 0) | Identity (first pull pulls from s) | Does not pre-close |
Both choices follow each constructor’s primary semantics
(take is early-exit; drop is identity), but the combination
surprises callers who treat the two as duals. Practical
consequences:
take(s, 0)discarded without consumption — upstream is auto-closed on the first pull, but if no terminal ever pulls, the resource stays open until the calling process exits.drop(s, 0)discarded without consumption — upstream is never touched. Callers that builddrop(s, 0)and throw away the result must closesthemselves to avoid leaking resource-backed sources (source.try_resource, file handles, sockets).
When n comes from config / CLI / request parameters and may be
0, prefer driving the resulting stream through a terminal
(fold.to_list, sink.each, …) so the close path runs
regardless of which branch the runtime took.
pub fn take_checked(
over stream: datastream.Stream(a),
up_to n: Int,
) -> Result(datastream.Stream(a), StreamArgError)
Like take, but returns the argument-validation failure as a
Result instead of panicking. Use this when n comes from
dynamic input (CLI, config, request parameters).
On success the stream behaves identically to
take(over: stream, up_to: n).
The caller is responsible for closing stream if the
constructor returns Error — no upstream pull happens, but the
upstream is otherwise untouched.
Example: case stream.take_checked(over: src, up_to: configured_limit) { Ok(s) -> … Error(stream.NegativeCount(function: _, given: g)) -> … }
pub fn take_while(
over stream: datastream.Stream(a),
satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)
Yield the longest prefix where predicate holds, then stop.
Stops pulling upstream and closes it as soon as predicate returns
False, so take_while terminates on infinite resource-backed
sources whose prefix eventually fails the predicate.
pub fn tap(
over stream: datastream.Stream(a),
with effect: fn(a) -> Nil,
) -> datastream.Stream(a)
Call effect once per element and re-emit the element unchanged.
pub fn unzip(
stream: datastream.Stream(#(a, b)),
) -> #(datastream.Stream(a), datastream.Stream(b))
Counterpart of zip: split a Stream(#(a, b)) into a pair of
independent component streams.
Both component streams see the same elements (left projections
for the first, right projections for the second) in the same
order. Each consumer pulls at its own pace, sharing the upstream
via broadcast under the hood.
Cross-target: same FFI-backed shared state as broadcast,
so unzip runs on both Erlang and JavaScript.
Buffer size: unbounded per consumer, inherited from
broadcast. A pipeline that drives one half of the unzipped pair
to completion while the other half stays unpulled will accumulate
the entire stream in the lagging half’s queue.
pub fn zip(
left: datastream.Stream(a),
right: datastream.Stream(b),
) -> datastream.Stream(#(a, b))
Pair-wise zip two streams; halts the moment either source halts.
Both upstreams may be open at once. On halt, closes right first,
then left, matching the spec.
pub fn zip_with(
left: datastream.Stream(a),
right: datastream.Stream(b),
with combiner: fn(a, b) -> c,
) -> datastream.Stream(c)
Combine two streams element-wise with combiner; halts the moment
either source halts.
Same close ordering as zip: right then left.