datastream/erlang/par
BEAM-only bounded-concurrency combinators.
This module is the single place where multiple processes get involved, so callers can tell at the import site whether a pipeline runs in parallel.
Every public function in this module is gated on
@target(erlang). On the JavaScript target the module still
compiles, so import datastream/erlang/par itself does not fail,
but calling any function fails at the call site. The
beam_only_marker constant exists solely to keep the module
non-empty on the JavaScript target.
Concurrency knobs
max_workers: degree of parallelism. The maximum number of BEAM processes that run the user function at once. Onlymap_*andeach_*accept this knob;mergeis fixed at one worker per input stream.max_buffer: in-flight ceiling. The maximum number of elements pulled from the upstream(s) but not yet emitted to the downstream. Workers in flight (mid-f(x)) count toward this ceiling.
max_buffer >= max_workers is required where both apply; otherwise
the buffer would fill before parallelism is reached.
What happens when max_buffer is reached
| function | behaviour |
|---|---|
map_unordered | bound collapses to max_workers (results emit on receive); new dispatch is paused |
map_ordered | new dispatch is paused; pending dict + workers cap |
merge | per-worker Continue ack is withheld; workers wait |
Defaults
default_max_workers = 4 and default_max_buffer = 16 are the
values used by the no-argument merge, map_unordered,
map_ordered, each_unordered, each_ordered. They aim at the
common case (mixed CPU + IO work, modest cardinality). Tune via
the _with variants when:
- workload is IO-bound (consider
max_workersof 16-128 and a correspondingly largermax_buffer) - workload is CPU-bound (consider
max_workersequal toerlang:system_info(schedulers_online)) - upstream is rate-limited or memory-pressured (lower
max_bufferto throttle pull rate)
Notes
- A
f(x)thatpanics leaves the pipeline blocked; this is a known limitation across allpar.*combinators. mergeworkers poll for a coordinator-liveness signal everymerge_worker_liveness_check_msms. If the caller ofmerge/merge_withdrops the returned stream without reachingclose(e.g. by losing a reference inside a long-running process), the workers stay resident: the caller is still alive, so the liveness probe says “keep going”. Always drive the stream to a terminal or callcloseexplicitly.- Unbounded concurrency is intentionally not offered.
Limitation: incompatible with from_subject
Every combinator in this module pulls from its upstream inside a
spawned worker process. Streams built from
datastream/erlang/source.from_subject cannot survive that, since
an Erlang Subject can only be received by its owning process —
the worker would panic on the first pull. Wrap subject streams
outside the parallel layer (e.g. materialise to a list first) or
route them through a non-par.* pipeline.
Values
pub const default_max_buffer: Int
Default max_buffer for the no-argument variants of merge,
map_*, and each_*.
pub const default_max_workers: Int
Sentinel value documenting that this module is BEAM-only.
Default max_workers for the no-argument variants of map_* and
each_*.
pub fn each_ordered(
over stream: datastream.Stream(a),
with effect: fn(a) -> Nil,
) -> Nil
Side-effect-only variant of map_ordered. Uses
default_max_workers and default_max_buffer.
pub fn each_ordered_with(
over stream: datastream.Stream(a),
with effect: fn(a) -> Nil,
max_workers max_workers: Int,
max_buffer max_buffer: Int,
) -> Nil
Side-effect-only variant of map_ordered_with.
pub fn each_unordered(
over stream: datastream.Stream(a),
with effect: fn(a) -> Nil,
) -> Nil
Side-effect-only variant of map_unordered. Uses
default_max_workers and default_max_buffer.
pub fn each_unordered_with(
over stream: datastream.Stream(a),
with effect: fn(a) -> Nil,
max_workers max_workers: Int,
max_buffer max_buffer: Int,
) -> Nil
Side-effect-only variant of map_unordered_with.
pub fn map_ordered(
over stream: datastream.Stream(a),
with f: fn(a) -> b,
) -> datastream.Stream(b)
Run f in parallel using default_max_workers BEAM processes,
emitting results in input order.
See map_ordered_with for tunable concurrency.
pub fn map_ordered_with(
over stream: datastream.Stream(a),
with f: fn(a) -> b,
max_workers max_workers: Int,
max_buffer max_buffer: Int,
) -> datastream.Stream(b)
Run f in parallel across at most max_workers BEAM processes,
emitting results in input order.
max_buffer >= max_workers is REQUIRED: if max_buffer were
smaller, the buffer would fill while waiting for an early result
and ordering could not be enforced without stalling. Violation
panics at construction. in_flight = workers_busy + pending dict size; dispatch is paused once it reaches max_buffer.
pub fn map_unordered(
over stream: datastream.Stream(a),
with f: fn(a) -> b,
) -> datastream.Stream(b)
Run f in parallel using default_max_workers BEAM processes,
emitting results in the order they finish.
See map_unordered_with for tunable concurrency.
pub fn map_unordered_with(
over stream: datastream.Stream(a),
with f: fn(a) -> b,
max_workers max_workers: Int,
max_buffer max_buffer: Int,
) -> datastream.Stream(b)
Run f in parallel across at most max_workers BEAM processes.
Results are emitted in the order they finish, NOT input order.
max_workers >= 1 and max_buffer >= max_workers are required;
violations panic at construction. The max_buffer >= max_workers
rule is enforced uniformly across par.* so callers can swap
between map_unordered_with, map_ordered_with, and merge_with
without re-tuning. Note however that map_unordered emits each
result the moment it arrives, so in_flight = workers_busy and
max_buffer adds no extra ceiling beyond max_workers here. See
default_max_workers and default_max_buffer for sensible
starting values.
pub fn merge(
streams streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)
Interleave elements from every input stream in arrival order. Uses
default_max_buffer as the in-flight ceiling.
See merge_with for tunable buffering.
pub fn merge_with(
streams streams: List(datastream.Stream(a)),
max_buffer max_buffer: Int,
) -> datastream.Stream(a)
Interleave elements from every input stream in arrival order.
Order from each individual source is preserved relative to that source; cross-source order follows whichever worker process happens to deliver first.
max_buffer bounds the number of in-flight elements: pulled by
any worker but not yet emitted to the downstream. Implementation:
at startup at most max_buffer workers are issued a Continue
signal; the rest wait. Each worker pulls one element, sends it,
and waits for the next Continue. The coordinator hands a
Continue to the next waiting worker every time it emits an
element downstream. If max_buffer < len(streams), the late-
starting workers wait until earlier ones drain.
max_buffer >= 1 is required.
pub fn race(
streams streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)
Wait for the first source to emit; signal the other workers to stop, then continue with the winning source until it halts.
Implementation: spawn one worker per source. Each worker checks
for a stop signal, then does a single datastream.pull and
reports back on a shared subject. The first RaceNext wins; the
coordinator drains any loser messages already in its mailbox and
closes the rest they carry, then sends a stop signal to every
other worker.
Best-effort loser cleanup. A loser worker that was still
inside its datastream.pull when the stop signal arrived has no
way to observe it and will go on to send its RaceNext after the
coordinator has stopped reading. Such a RaceNext is dropped and
its rest is not closed. BEAM-internal resources held by the
abandoned rest are reclaimed by the runtime; external resources
(file descriptors, sockets) may leak. Pass external-resource
streams to race only when this trade-off is acceptable.