datastream/erlang/source
BEAM-only source constructors.
These bridge between Erlang’s process / timer primitives and the
cross-target Stream(a) value. The cross-target core has no
business knowing about Subject, monotonic time, or BEAM timers,
so they live here in datastream/erlang/source.
Every public function in this module is gated on
@target(erlang). On the JavaScript target the module still
compiles, so import datastream/erlang/source itself does not
fail, but calling any function fails at the call site. The
beam_only_marker constant exists solely to keep the module
non-empty on the JavaScript target.
Values
pub fn from_subject(
from subject: process.Subject(a),
while keep_running: fn() -> Bool,
) -> datastream.Stream(a)
Bridge a process Subject(a) into a Stream(a).
Each pull blocks on process.receive until either a message
arrives — emitted as the next element — or keep_running()
returns False between receives. The receive uses a short
internal poll interval so keep_running stays responsive.
The Subject’s lifecycle is owned by the caller: this stream
only reads, it never closes the subject. The terminal completing
is sufficient to release the stream’s hold.
Limitation: the resulting stream cannot be passed to any
combinator in datastream/erlang/par or datastream/erlang/time,
nor to datastream/erlang/source.timeout — those run the pull in
a worker process and the Subject’s owner-only receive
constraint causes a panic.
pub fn interval(every period_ms: Int) -> datastream.Stream(Nil)
Emit Nil every period_ms ms. Same first-emission timing as
ticks; differs only in element type.
pub fn ticks(every period_ms: Int) -> datastream.Stream(Int)
Emit a monotonic millisecond timestamp every period_ms ms.
The first element is emitted period_ms after the first pull.
Pair with stream.take (or any early-exit terminal) to bound
the run.
pub fn timeout(
over stream: datastream.Stream(a),
within ms: Int,
) -> datastream.Stream(Result(a, Nil))
Wrap an upstream and add a per-element deadline.
Each element of stream that arrives within ms ms surfaces as
Ok(element). If the upstream produces nothing for longer than
ms ms, exactly one Error(Nil) element is emitted and the
stream halts.
Implementation: each pull spawns an unlinked worker process that
performs the datastream.pull and forwards the step on a
dedicated subject; the parent waits with process.receive(_, within: ms). On a deadline trip the abandoned worker may still
complete the pull afterwards; its result is silently discarded
and any resource it had partially acquired is left to the BEAM
runtime to reclaim.
Downstream early-exit closes the upstream the next pull would have read from; an abandoned worker is not waited on.
Limitation: because the pull happens in a different process
than the one that built the upstream, this combinator MUST NOT
be composed with from_subject: a Subject can only be received
by its owning process, so the worker would panic. Wrap subject
streams with their own application-level timeout instead.