datastream/stream

Single-stream middle and composition combinators over Stream(a).

All combinators are lazy and order-preserving (unless documented otherwise). Building a pipeline with these combinators triggers no user callbacks; callbacks fire only when a terminal in fold or sink pulls the result.

Early-exit combinators (take, take_while) stop pulling upstream the moment their result is determined, which is what makes them safe on infinite sources.

Every combinator here forwards the upstream’s close callback so resource-backed streams (source.resource) are released on every termination path. Combinators that observe a Done from the upstream do NOT call close again — the source closed itself when it returned Done. Combinators that hold multiple upstreams open at once close them in the order specified in the spec (flat_map: inner before outer; zip: right before left).

Values

pub fn append(
  first: datastream.Stream(a),
  second: datastream.Stream(a),
) -> datastream.Stream(a)

Yield all of first, then all of second. If first is infinite, second is never opened.

Honours the spec close ordering: second is opened only after first returns Done (and so has already closed itself); on early exit before first finishes, second is never opened.

Close contract: first self-closes when it returns Done; append does not call close on it again. After first is exhausted, second’s elements are yielded directly — when second itself returns Done, it self-closes in the same way. On early exit mid-second, the downstream’s close call propagates to the active second node.

pub fn broadcast(
  over stream: datastream.Stream(a),
  into n: Int,
) -> List(datastream.Stream(a))

Split stream into n independent consumer streams that share a single underlying source.

Each consumer pulls at its own pace; any element the source produces is observed by every still-alive consumer in order. A consumer that lags behind sees the same prefix as a fast consumer, just later. This is the fan-out / pub-sub combinator: an observability tap can run in parallel with the main pipeline without forcing the source to be re-evaluated, which matters for resource-backed and otherwise non-replayable sources.

n MUST be >= 1. A n < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

Buffer size: the per-consumer queue is currently unbounded. A consumer that never pulls while another pulls aggressively will accumulate the entire stream in its queue; users with that risk should compose broadcast(..., n) |> list.map(stream.take(_, k)) or its equivalent. A bounded variant with an explicit drop policy is left as a follow-up — see the Roadmap.

Cross-target: implemented via a small FFI-backed mutable reference (datastream/internal/ref). On Erlang the cell is a process-dictionary slot; on JavaScript it is a one-field mutable object. No gleam_erlang processes are spawned; no shared global state. The Ref(_) is local to each broadcast call.

Close contract: every consumer’s close is recorded; the upstream is closed exactly once, on whichever call brings the active-consumer count to zero. If the upstream returns Done first it self-closes, and the close callbacks are no-ops.

pub fn buffer(
  over stream: datastream.Stream(a),
  prefetch capacity: Int,
) -> datastream.Stream(a)

Eagerly pull capacity elements ahead from stream and yield them to the consumer at its pace, refilling the internal queue back to capacity after every consumer pull.

buffer is the prefetch combinator: it decouples the consumer’s pull cadence from upstream latency. For latency-bound upstreams (HTTP body bytes, cold reads from disk) the consumer can do its own per-element work in parallel with the next upstream pull instead of blocking serially on each one.

capacity MUST be >= 1. A capacity < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy — capacity == 0 would defeat the point of buffering, and a negative capacity is programmer error.

Element type, order, and cardinality are preserved. When upstream returns Done, any already-buffered elements are still drained before the buffered stream itself emits Done. On consumer-side early termination the upstream is closed once and any unconsumed buffered elements are discarded — the upstream produced them in good faith but resource cleanup wins over delivery, matching take’s behaviour.

pub fn chunks_of(
  over stream: datastream.Stream(a),
  into size: Int,
) -> datastream.Stream(chunk.Chunk(a))

Group adjacent elements into fixed-size chunks.

size MUST be >= 1. A size < 1 is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

The trailing chunk may be smaller than size when the source length is not divisible.

pub fn concat(
  streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)

Walk a List(Stream(a)) in list order, yielding every element of every stream.

Each inner stream is closed (by reaching its own Done) before the next one is opened. On early exit, the active stream is closed.

pub fn dedupe_adjacent(
  stream: datastream.Stream(a),
) -> datastream.Stream(a)

Collapse runs of ==-equal adjacent values to a single occurrence.

pub fn drop(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Discard the first n elements; n == 0 is the identity.

n MUST be >= 0. A negative n is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

pub fn drop_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Discard the longest prefix where predicate holds, then yield the rest.

pub fn filter(
  over stream: datastream.Stream(a),
  keeping predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Keep only the elements for which predicate returns True.

Relative order of the survivors is preserved.

pub fn filter_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> option.Option(b),
) -> datastream.Stream(b)

Apply f to each element and keep only the Some(x) results.

pub fn flat_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> datastream.Stream(b),
) -> datastream.Stream(b)

Apply f to each element and concatenate the inner streams it produces.

Pulls one inner stream at a time; the next inner stream is not constructed until the previous one is exhausted. On early exit before the outer is Done, the active inner is closed first, then the outer.

Close contract: when an inner stream returns Done, flat_map does NOT call close on it — the source is expected to have released its resources when it returned Done (the self-close convention followed by source.resource). Custom streams built with make must release resources inside next on the Done path if they hold any.

pub fn flatten(
  streams: datastream.Stream(datastream.Stream(a)),
) -> datastream.Stream(a)

Flatten a stream of streams into a single stream.

Equivalent to flat_map(s, fn(x) { x }); the close ordering is the same.

pub fn group_adjacent(
  over stream: datastream.Stream(a),
  by key: fn(a) -> k,
) -> datastream.Stream(#(k, chunk.Chunk(a)))

Group consecutive elements that share key(element).

pub fn interrupt_when(
  in stream: datastream.Stream(a),
  signal signal: datastream.Stream(Bool),
) -> datastream.Stream(a)

Terminate stream on the first True element from signal.

interrupt_when is the external counterpart of take_while: take_while ends a stream when a pulled element fails a predicate; interrupt_when ends it when an unrelated stream produces a fire signal — typically a timer expiry, a parent- shutdown probe, or a cancel-token bridge built with from_subject.

On every consumer pull, signal is checked first:

  • signal yields True → both stream and the rest of signal are closed and the consumer sees Done immediately.
  • signal yields False → that signal element is consumed; the pull descends into stream and yields its next element. Subsequent consumer pulls see the rest of signal.
  • signal returns Done → the consumer pull descends into stream. signal is then ignored on subsequent pulls; a Done producer is never re-pulled per the Step contract.

The Stream(Bool) shape (rather than Stream(Nil)) is what makes counted / time-based signals representable in the pull model — Done cannot mean “not yet, ask again later” because Done is terminal, so a “fire after N consumer pulls” signal is built as False-stream-of-length-N followed by True.

Close ordering: on consumer-side early termination signal is closed first, then stream — right-before-left, matching zip and flat_map.

pub fn intersperse(
  over stream: datastream.Stream(a),
  with separator: a,
) -> datastream.Stream(a)

Insert separator between adjacent elements.

Empty and single-element streams are unchanged.

pub fn map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
) -> datastream.Stream(b)

Apply f to every element. Cardinality and order are preserved.

pub fn map_accum(
  over stream: datastream.Stream(a),
  from initial: state,
  with step: fn(state, a) -> #(state, b),
) -> datastream.Stream(b)

Thread a state through the stream while emitting elements of a possibly different type.

pub fn scan(
  over stream: datastream.Stream(a),
  from initial: b,
  with step: fn(b, a) -> b,
) -> datastream.Stream(b)

Left-fold the stream, emitting one output per input.

The seed itself is NOT emitted, so output cardinality equals input cardinality. On empty input the output is empty.

pub fn take(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Yield at most the first n elements; n == 0 yields the empty stream.

n MUST be >= 0. A negative n is rejected at construction time with a panic per the datastream module-level invalid-argument policy.

Stops pulling upstream the moment the nth element has been emitted, and closes the upstream on that early exit. This is what makes take safe on infinite resource-backed sources.

pub fn take_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Yield the longest prefix where predicate holds, then stop.

Stops pulling upstream and closes it as soon as predicate returns False, so take_while terminates on infinite resource-backed sources whose prefix eventually fails the predicate.

pub fn tap(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
) -> datastream.Stream(a)

Call effect once per element and re-emit the element unchanged.

pub fn unzip(
  stream: datastream.Stream(#(a, b)),
) -> #(datastream.Stream(a), datastream.Stream(b))

Counterpart of zip: split a Stream(#(a, b)) into a pair of independent component streams.

Both component streams see the same elements (left projections for the first, right projections for the second) in the same order. Each consumer pulls at its own pace, sharing the upstream via broadcast under the hood.

Cross-target: same FFI-backed shared state as broadcast, so unzip runs on both Erlang and JavaScript.

Buffer size: unbounded per consumer, inherited from broadcast. A pipeline that drives one half of the unzipped pair to completion while the other half stays unpulled will accumulate the entire stream in the lagging half’s queue.

pub fn zip(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
) -> datastream.Stream(#(a, b))

Pair-wise zip two streams; halts the moment either source halts.

Both upstreams may be open at once. On halt, closes right first, then left, matching the spec.

pub fn zip_with(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
  with combiner: fn(a, b) -> c,
) -> datastream.Stream(c)

Combine two streams element-wise with combiner; halts the moment either source halts.

Same close ordering as zip: right then left.

Search Document