datastream/erlang/source

BEAM-only source constructors.

These bridge between Erlang’s process / timer primitives and the cross-target Stream(a) value. The cross-target core has no business knowing about Subject, monotonic time, or BEAM timers, so they live here in datastream/erlang/source.

Every public function in this module is gated on @target(erlang). On the JavaScript target the module still compiles, so import datastream/erlang/source itself does not fail, but calling any function fails at the call site. The beam_only_marker constant exists solely to keep the module non-empty on the JavaScript target.

Values

pub fn bridge_subject_stream(
  over stream: datastream.Stream(a),
) -> datastream.Stream(a)

Materialise a from_subject-backed stream into a process-safe Stream(a) so the result can be composed with par.* / time.* / source.timeout.

The pull happens in the calling process — the same process that owns the Subject — so the owner-only receive constraint is respected. Once the upstream returns Done, the materialised List(a) is reopened as a list-backed stream, which is pure and can safely be pulled from any worker process.

Caveat: materialises the entire upstream into memory before emitting any element. Only use this on bounded subjects (a keep_running callback that eventually returns False, or a finite source). For an unbounded subject, route the parallel stage outside the subject pipeline instead — there is no process-safe way to lazily forward an Erlang Subject across a process boundary.

This is the official workaround for the from_subject × par.* / time.* incompatibility documented on from_subject, par, and source.timeout.

pub fn from_subject(
  from subject: process.Subject(a),
  while keep_running: fn() -> Bool,
) -> datastream.Stream(a)

Bridge a process Subject(a) into a Stream(a).

Each pull blocks on process.receive until either a message arrives — emitted as the next element — or keep_running() returns False between receives. The receive uses a short internal poll interval so keep_running stays responsive.

The Subject’s lifecycle is owned by the caller: this stream only reads, it never closes the subject. The terminal completing is sufficient to release the stream’s hold.

Limitation: the resulting stream cannot be passed directly to any combinator in datastream/erlang/par or datastream/erlang/time, nor to datastream/erlang/source.timeout — those run the pull in a worker process and the Subject’s owner-only receive constraint causes a panic in the worker. Use bridge_subject_stream below to materialise the subject side in the owning process before handing it off.

pub fn interval(every period_ms: Int) -> datastream.Stream(Nil)

Emit Nil every period_ms ms. Same first-emission timing as ticks; differs only in element type.

pub fn ticks(every period_ms: Int) -> datastream.Stream(Int)

Emit a monotonic millisecond timestamp every period_ms ms.

The first element is emitted period_ms after the first pull. Pair with stream.take (or any early-exit terminal) to bound the run.

pub fn timeout(
  over stream: datastream.Stream(a),
  within ms: Int,
) -> datastream.Stream(Result(a, Nil))

Wrap an upstream and add a per-element deadline.

Each element of stream that arrives within ms ms surfaces as Ok(element). If the upstream produces nothing for longer than ms ms, exactly one Error(Nil) element is emitted and the stream halts.

Implementation: each pull spawns an unlinked worker process that performs the datastream.pull and forwards the step on a dedicated subject; the parent waits with process.receive(_, within: ms). On a deadline trip the abandoned worker may still complete the pull afterwards; its result is silently discarded and any resource it had partially acquired is left to the BEAM runtime to reclaim.

Downstream early-exit closes the upstream the next pull would have read from; an abandoned worker is not waited on.

Limitation: because the pull happens in a different process than the one that built the upstream, this combinator MUST NOT be composed with from_subject: a Subject can only be received by its owning process, so the worker would panic. Wrap subject streams with their own application-level timeout instead.

Search Document