datastream/erlang/par

BEAM-only bounded-concurrency combinators.

This module is the single place where multiple processes get involved, so callers can tell at the import site whether a pipeline runs in parallel.

Every public function in this module is gated on @target(erlang). On the JavaScript target the module still compiles, so import datastream/erlang/par itself does not fail, but calling any function fails at the call site. The beam_only_marker constant exists solely to keep the module non-empty on the JavaScript target.

Concurrency knobs

max_buffer >= max_workers is required where both apply; otherwise the buffer would fill before parallelism is reached.

What happens when max_buffer is reached

functionbehaviour
map_unorderedbound collapses to max_workers (results emit on receive); new dispatch is paused
map_orderednew dispatch is paused; pending dict + workers cap
mergeper-worker Continue ack is withheld; workers wait

Defaults

default_max_workers = 4 and default_max_buffer = 16 are the values used by the no-argument merge, map_unordered, map_ordered, each_unordered, each_ordered. They aim at the common case (mixed CPU + IO work, modest cardinality). Tune via the _with variants when:

Notes

Limitation: incompatible with from_subject

Every combinator in this module pulls from its upstream inside a spawned worker process. Streams built from datastream/erlang/source.from_subject cannot survive that, since an Erlang Subject can only be received by its owning process — the worker would panic on the first pull. Wrap subject streams outside the parallel layer (e.g. materialise to a list first) or route them through a non-par.* pipeline.

Values

pub const default_max_buffer: Int

Default max_buffer for the no-argument variants of merge, map_*, and each_*.

pub const default_max_workers: Int

Sentinel value documenting that this module is BEAM-only. Default max_workers for the no-argument variants of map_* and each_*.

pub fn each_ordered(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
) -> Nil

Side-effect-only variant of map_ordered. Uses default_max_workers and default_max_buffer.

pub fn each_ordered_with(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
  max_workers max_workers: Int,
  max_buffer max_buffer: Int,
) -> Nil

Side-effect-only variant of map_ordered_with.

pub fn each_unordered(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
) -> Nil

Side-effect-only variant of map_unordered. Uses default_max_workers and default_max_buffer.

pub fn each_unordered_with(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
  max_workers max_workers: Int,
  max_buffer max_buffer: Int,
) -> Nil

Side-effect-only variant of map_unordered_with.

pub fn map_ordered(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
) -> datastream.Stream(b)

Run f in parallel using default_max_workers BEAM processes, emitting results in input order.

See map_ordered_with for tunable concurrency.

pub fn map_ordered_with(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
  max_workers max_workers: Int,
  max_buffer max_buffer: Int,
) -> datastream.Stream(b)

Run f in parallel across at most max_workers BEAM processes, emitting results in input order.

max_buffer >= max_workers is REQUIRED: if max_buffer were smaller, the buffer would fill while waiting for an early result and ordering could not be enforced without stalling. Violation panics at construction. in_flight = workers_busy + pending dict size; dispatch is paused once it reaches max_buffer.

pub fn map_unordered(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
) -> datastream.Stream(b)

Run f in parallel using default_max_workers BEAM processes, emitting results in the order they finish.

See map_unordered_with for tunable concurrency.

pub fn map_unordered_with(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
  max_workers max_workers: Int,
  max_buffer max_buffer: Int,
) -> datastream.Stream(b)

Run f in parallel across at most max_workers BEAM processes. Results are emitted in the order they finish, NOT input order.

max_workers >= 1 and max_buffer >= max_workers are required; violations panic at construction. The max_buffer >= max_workers rule is enforced uniformly across par.* so callers can swap between map_unordered_with, map_ordered_with, and merge_with without re-tuning. Note however that map_unordered emits each result the moment it arrives, so in_flight = workers_busy and max_buffer adds no extra ceiling beyond max_workers here. See default_max_workers and default_max_buffer for sensible starting values.

pub fn merge(
  streams streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)

Interleave elements from every input stream in arrival order. Uses default_max_buffer as the in-flight ceiling.

See merge_with for tunable buffering.

pub fn merge_with(
  streams streams: List(datastream.Stream(a)),
  max_buffer max_buffer: Int,
) -> datastream.Stream(a)

Interleave elements from every input stream in arrival order.

Order from each individual source is preserved relative to that source; cross-source order follows whichever worker process happens to deliver first.

max_buffer bounds the number of in-flight elements: pulled by any worker but not yet emitted to the downstream. Implementation: at startup at most max_buffer workers are issued a Continue signal; the rest wait. Each worker pulls one element, sends it, and waits for the next Continue. The coordinator hands a Continue to the next waiting worker every time it emits an element downstream. If max_buffer < len(streams), the late- starting workers wait until earlier ones drain.

max_buffer >= 1 is required.

pub fn race(
  streams streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)

Wait for the first source to emit; signal the other workers to stop, then continue with the winning source until it halts.

Implementation: spawn one worker per source. Each worker checks for a stop signal, then does a single datastream.pull and reports back on a shared subject. The first RaceNext wins; the coordinator drains any loser messages already in its mailbox and closes the rest they carry, then sends a stop signal to every other worker.

Best-effort loser cleanup. A loser worker that was still inside its datastream.pull when the stop signal arrived has no way to observe it and will go on to send its RaceNext after the coordinator has stopped reading. Such a RaceNext is dropped and its rest is not closed. BEAM-internal resources held by the abandoned rest are reclaimed by the runtime; external resources (file descriptors, sockets) may leak. Pass external-resource streams to race only when this trade-off is acceptable.

Search Document