datastream/stream

Single-stream middle and composition combinators over Stream(a).

All combinators are lazy and order-preserving (unless documented otherwise). Building a pipeline with these combinators triggers no user callbacks; callbacks fire only when a terminal in fold or sink pulls the result.

Early-exit combinators (take, take_while) stop pulling upstream the moment their result is determined, which is what makes them safe on infinite sources.

Every combinator here forwards the upstream’s close callback so resource-backed streams (source.resource) are released on every termination path. Combinators that observe a Done from the upstream do NOT call close again — the source closed itself when it returned Done. Combinators that hold multiple upstreams open at once close them in the order specified in the spec (flat_map: inner before outer; zip: right before left).

Types

Why a checked stream constructor refused its argument.

Returned by take_checked, drop_checked, buffer_checked, chunks_of_checked, and any future non-panicking variant of a constructor that would otherwise reject its argument with a panic. Lets the caller surface argument-validation failures through Result instead of crashing the process — useful when the numeric argument comes from CLI flags, config files, or request parameters.

function names the constructor that rejected the value ("take", "drop", "buffer", "chunks_of") so a caller routing many checked constructors through the same handler can produce a meaningful error message.

  • NegativeCount covers constructors whose contract is >= 0 (take, drop).
  • NotPositive covers constructors whose contract is >= 1 (buffer, chunks_of).
pub type StreamArgError {
  NegativeCount(function: String, given: Int)
  NotPositive(function: String, given: Int)
}

Constructors

  • NegativeCount(function: String, given: Int)
  • NotPositive(function: String, given: Int)

Values

pub fn append(
  first: datastream.Stream(a),
  second: datastream.Stream(a),
) -> datastream.Stream(a)

Yield all of first, then all of second. If first is infinite, second is never opened.

Honours the spec close ordering: second is opened only after first returns Done (and so has already closed itself); on early exit before first finishes, second is never opened.

Close contract: first self-closes when it returns Done; append does not call close on it again. After first is exhausted, second’s elements are yielded directly — when second itself returns Done, it self-closes in the same way. On early exit mid-second, the downstream’s close call propagates to the active second node.

pub fn broadcast(
  over stream: datastream.Stream(a),
  into n: Int,
) -> List(datastream.Stream(a))

Split stream into n independent consumer streams that share a single underlying source.

Each consumer pulls at its own pace; any element the source produces is observed by every still-alive consumer in order. A consumer that lags behind sees the same prefix as a fast consumer, just later. This is the fan-out / pub-sub combinator: an observability tap can run in parallel with the main pipeline without forcing the source to be re-evaluated, which matters for resource-backed and otherwise non-replayable sources.

n MUST be >= 1. A n < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

Buffer size — unbounded by design. Per-consumer queues grow without limit. The worst-case memory footprint is O(max_pull_distance × n), where max_pull_distance is the difference between the fastest and slowest consumer’s pull count at any point in time, and n is the consumer count. Concretely: if consumer A pulls 1 000 000 elements while B is paused, B’s queue holds 1 000 000 elements until B catches up.

For resource-backed sources of bounded cardinality this is fine. For cardinality-unbounded sources (source.iterate, source.repeat, source.unfold of an infinite seed) it is a silent memory hazard. Two safer composition patterns:

  • Cap each consumer: broadcast(s, n) |> list.map(stream.take(_, k)) so the slowest consumer’s queue is bounded by k - 1.
  • Use broadcast_bounded(s, n, max_queue) for an explicit per-consumer queue limit. Exceeding the limit panics with a structured message rather than letting the process OOM in production.

Cross-target: implemented via a small FFI-backed mutable reference (datastream/internal/ref). On Erlang the cell is a process-dictionary slot; on JavaScript it is a one-field mutable object. No gleam_erlang processes are spawned; no shared global state. The Ref(_) is local to each broadcast call.

Close contract: every consumer’s close is recorded; the upstream is closed exactly once, on whichever call brings the active-consumer count to zero. If the upstream returns Done first it self-closes, and the close callbacks are no-ops.

pub fn broadcast_bounded(
  over stream: datastream.Stream(a),
  into n: Int,
  max_queue max_queue: Int,
) -> List(datastream.Stream(a))

Like broadcast, but each per-consumer queue is capped at max_queue elements. If a fanned-out element would push any consumer’s queue beyond that bound, the next upstream pull panics with a structured message naming the limit. Use this in production wiring (HTTP fan-out, websocket multicast, Kafka producer tees, …) where a stalled slow consumer must surface as a crash instead of an OOM.

n MUST be >= 1 and max_queue MUST be >= 1. Both checks are construction-time and panic on failure, matching the datastream module-level invalid-argument policy.

Behaviour on success is identical to broadcast(s, n) until the queue bound is hit. Until then there is no overhead beyond a per-pull list.length check on the freshly-enqueued queues.

What “exceeded” means. The check fires after the producing pull has already enqueued the element to every still-alive consumer that wasn’t the puller. A consumer queue that was at max_queue - 1 and gets one more element is fine; one that was at max_queue and would become max_queue + 1 triggers the panic. The slowest consumer therefore has up to max_queue elements in its queue before the next overflowing pull stops the pipeline.

Pair with broadcast’s docstring above for the queue-growth formula and the unbounded baseline.

pub fn buffer(
  over stream: datastream.Stream(a),
  prefetch capacity: Int,
) -> datastream.Stream(a)

Eagerly pull capacity elements ahead from stream and yield them to the consumer at its pace, refilling the internal queue back to capacity after every consumer pull.

buffer is the prefetch combinator: it decouples the consumer’s pull cadence from upstream latency. For latency-bound upstreams (HTTP body bytes, cold reads from disk) the consumer can do its own per-element work in parallel with the next upstream pull instead of blocking serially on each one.

capacity MUST be >= 1. A capacity < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy — capacity == 0 would defeat the point of buffering, and a negative capacity is programmer error.

Element type, order, and cardinality are preserved. When upstream returns Done, any already-buffered elements are still drained before the buffered stream itself emits Done. On consumer-side early termination the upstream is closed once and any unconsumed buffered elements are discarded — the upstream produced them in good faith but resource cleanup wins over delivery, matching take’s behaviour.

pub fn buffer_checked(
  over stream: datastream.Stream(a),
  prefetch capacity: Int,
) -> Result(datastream.Stream(a), StreamArgError)

Like buffer, but returns the argument-validation failure as a Result instead of panicking. Use this when capacity comes from dynamic input (CLI, config, request parameters); the panicking buffer remains the right tool for trusted constants.

On success the stream behaves identically to buffer(over: stream, prefetch: capacity).

The caller is responsible for closing stream if the constructor returns Error — no upstream pull happens, but the upstream is otherwise untouched.

pub fn chunks_of(
  over stream: datastream.Stream(a),
  into size: Int,
) -> datastream.Stream(chunk.Chunk(a))

Group adjacent elements into fixed-size chunks.

size MUST be >= 1. A size < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

The trailing chunk may be smaller than size when the source length is not divisible.

pub fn chunks_of_checked(
  over stream: datastream.Stream(a),
  into size: Int,
) -> Result(datastream.Stream(chunk.Chunk(a)), StreamArgError)

Like chunks_of, but returns the argument-validation failure as a Result instead of panicking. Use this when size comes from dynamic input (CLI, config, request parameters); the panicking chunks_of remains the right tool for trusted constants.

On success the stream behaves identically to chunks_of(over: stream, into: size).

The caller is responsible for closing stream if the constructor returns Error.

pub fn concat(
  streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)

Walk a List(Stream(a)) in list order, yielding every element of every stream.

Each inner stream is closed (by reaching its own Done) before the next one is opened. On early exit, the active stream is closed.

pub fn dedupe_adjacent(
  stream: datastream.Stream(a),
) -> datastream.Stream(a)

Collapse runs of ==-equal adjacent values to a single occurrence.

pub fn drop(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Discard the first n elements; n == 0 is the identity.

n MUST be >= 0. A negative n is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

Resource handling at n == 0 — identity semantics: no upstream pull, no close. Asymmetric with take(s, 0) (which closes eagerly). Callers building drop(s, 0) for unknown n and discarding the result without consuming it are responsible for closing s themselves. See take’s “Asymmetry of take(s, 0) and drop(s, 0)” subsection above for the side-by-side table and recommended practice.

pub fn drop_checked(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> Result(datastream.Stream(a), StreamArgError)

Like drop, but returns the argument-validation failure as a Result instead of panicking. Use this when n comes from dynamic input.

On success the stream behaves identically to drop(from: stream, up_to: n).

The caller is responsible for closing stream if the constructor returns Error.

pub fn drop_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Discard the longest prefix where predicate holds, then yield the rest.

pub fn filter(
  over stream: datastream.Stream(a),
  keeping predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Keep only the elements for which predicate returns True.

Relative order of the survivors is preserved.

pub fn filter_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> option.Option(b),
) -> datastream.Stream(b)

Apply f to each element and keep only the Some(x) results.

pub fn flat_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> datastream.Stream(b),
) -> datastream.Stream(b)

Apply f to each element and concatenate the inner streams it produces.

Pulls one inner stream at a time; the next inner stream is not constructed until the previous one is exhausted. On early exit before the outer is Done, the active inner is closed first, then the outer.

Close contract: when an inner stream returns Done, flat_map does NOT call close on it — the source is expected to have released its resources when it returned Done (the self-close convention followed by source.resource). Custom streams built with make must release resources inside next on the Done path if they hold any.

pub fn flatten(
  streams: datastream.Stream(datastream.Stream(a)),
) -> datastream.Stream(a)

Flatten a stream of streams into a single stream.

Equivalent to flat_map(s, fn(x) { x }); the close ordering is the same.

pub fn group_adjacent(
  over stream: datastream.Stream(a),
  by key: fn(a) -> k,
) -> datastream.Stream(#(k, chunk.Chunk(a)))

Group consecutive elements that share key(element).

pub fn interrupt_when(
  in stream: datastream.Stream(a),
  signal signal: datastream.Stream(Bool),
) -> datastream.Stream(a)

Terminate stream on the first True element from signal.

interrupt_when is the external counterpart of take_while: take_while ends a stream when a pulled element fails a predicate; interrupt_when ends it when an unrelated stream produces a fire signal — typically a timer expiry, a parent- shutdown probe, or a cancel-token bridge built with from_subject.

On every consumer pull, signal is checked first:

  • signal yields True → both stream and the rest of signal are closed and the consumer sees Done immediately.
  • signal yields False → that signal element is consumed; the pull descends into stream and yields its next element. Subsequent consumer pulls see the rest of signal.
  • signal returns Done → the consumer pull descends into stream. signal is then ignored on subsequent pulls; a Done producer is never re-pulled per the Step contract.

The Stream(Bool) shape (rather than Stream(Nil)) is what makes counted / time-based signals representable in the pull model — Done cannot mean “not yet, ask again later” because Done is terminal, so a “fire after N consumer pulls” signal is built as False-stream-of-length-N followed by True.

Close ordering: on consumer-side early termination signal is closed first, then stream — right-before-left, matching zip and flat_map.

pub fn intersperse(
  over stream: datastream.Stream(a),
  with separator: a,
) -> datastream.Stream(a)

Insert separator between adjacent elements.

Empty and single-element streams are unchanged.

pub fn map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
) -> datastream.Stream(b)

Apply f to every element. Cardinality and order are preserved.

pub fn map_accum(
  over stream: datastream.Stream(a),
  from initial: state,
  with step: fn(state, a) -> #(state, b),
) -> datastream.Stream(b)

Thread a state through the stream while emitting elements of a possibly different type.

pub fn scan(
  over stream: datastream.Stream(a),
  from initial: b,
  with step: fn(b, a) -> b,
) -> datastream.Stream(b)

Left-fold the stream, emitting one output per input.

The seed itself is NOT emitted, so output cardinality equals input cardinality. On empty input the output is empty.

pub fn take(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Yield at most the first n elements; n == 0 yields the empty stream.

n MUST be >= 0. A negative n is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

Stops pulling upstream the moment the nth element has been emitted, and closes the upstream on that early exit. This is what makes take safe on infinite resource-backed sources.

Resource handling at n == 0 — early-exit semantics extend to the degenerate case: the first pull returns Done and closes the upstream. This is asymmetric with drop(s, 0), which is the identity (no close, no pull). For dynamic-n pipelines built from take_checked / drop_checked where n may end up 0, see the “Asymmetry of take(s, 0) and drop(s, 0)” subsection below for guidance.

Asymmetry of take(s, 0) and drop(s, 0)

ConstructorPull behaviour at n == 0Resource handling
take(s, 0)First pull returns DoneCloses s eagerly
drop(s, 0)Identity (first pull pulls from s)Does not pre-close

Both choices follow each constructor’s primary semantics (take is early-exit; drop is identity), but the combination surprises callers who treat the two as duals. Practical consequences:

  • take(s, 0) discarded without consumption — upstream is auto-closed on the first pull, but if no terminal ever pulls, the resource stays open until the calling process exits.
  • drop(s, 0) discarded without consumption — upstream is never touched. Callers that build drop(s, 0) and throw away the result must close s themselves to avoid leaking resource-backed sources (source.try_resource, file handles, sockets).

When n comes from config / CLI / request parameters and may be 0, prefer driving the resulting stream through a terminal (fold.to_list, sink.each, …) so the close path runs regardless of which branch the runtime took.

pub fn take_checked(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> Result(datastream.Stream(a), StreamArgError)

Like take, but returns the argument-validation failure as a Result instead of panicking. Use this when n comes from dynamic input (CLI, config, request parameters).

On success the stream behaves identically to take(from: stream, up_to: n).

The caller is responsible for closing stream if the constructor returns Error — no upstream pull happens, but the upstream is otherwise untouched.

Example: case stream.take_checked(from: src, up_to: configured_limit) { Ok(s) -> … Error(stream.NegativeCount(function: _, given: g)) -> … }

pub fn take_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Yield the longest prefix where predicate holds, then stop.

Stops pulling upstream and closes it as soon as predicate returns False, so take_while terminates on infinite resource-backed sources whose prefix eventually fails the predicate.

pub fn tap(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
) -> datastream.Stream(a)

Call effect once per element and re-emit the element unchanged.

pub fn unzip(
  stream: datastream.Stream(#(a, b)),
) -> #(datastream.Stream(a), datastream.Stream(b))

Counterpart of zip: split a Stream(#(a, b)) into a pair of independent component streams.

Both component streams see the same elements (left projections for the first, right projections for the second) in the same order. Each consumer pulls at its own pace, sharing the upstream via broadcast under the hood.

Cross-target: same FFI-backed shared state as broadcast, so unzip runs on both Erlang and JavaScript.

Buffer size: unbounded per consumer, inherited from broadcast. A pipeline that drives one half of the unzipped pair to completion while the other half stays unpulled will accumulate the entire stream in the lagging half’s queue.

pub fn zip(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
) -> datastream.Stream(#(a, b))

Pair-wise zip two streams; halts the moment either source halts.

Both upstreams may be open at once. On halt, closes right first, then left, matching the spec.

pub fn zip_with(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
  with combiner: fn(a, b) -> c,
) -> datastream.Stream(c)

Combine two streams element-wise with combiner; halts the moment either source halts.

Same close ordering as zip: right then left.

Search Document