Forcola.Stream (forcola v0.2.0)

Copy Markdown View Source

Line-by-line output from a shim-supervised process as an Enumerable.

For CLIs that emit NDJSON or line-oriented progress (agent CLIs, docker events, git clone --progress). The child runs in its own process group; halting the stream, timing out, or BEAM death all kill the whole group.

Forcola.Stream.lines(["claude", "-p", prompt], timeout_ms: 300_000)
|> Stream.map(&:json.decode/1)
|> Enum.to_list()

Termination semantics

  • A zero exit ends the stream cleanly.
  • A non-zero exit, death by signal, timeout, or spawn failure raises Forcola.Stream.Error after every line produced before death has been emitted. The consumers this is built for are NDJSON CLIs where mid-stream death matters: silently dropping the exit reproduces the leak this library exists to close, and a tagged final element would force every Stream.map(&decode/1) pipeline to special-case it. Stderr captured during the run rides in the exception.
  • Halting the stream early (Enum.take/2, Stream.take_while/2, an exception downstream) kills the process group and blocks until the shim confirms the group is dead.
  • If the consuming process dies, the port closes, the shim sees stdin EOF, and the group is killed.

Idle timeout

:timeout_ms bounds the whole run. :idle_timeout_ms (optional) bounds the gap between output frames instead, detecting a stalled producer without capping total runtime. The two are independent and composable: whichever bound is reached first fires, and the raised Forcola.Stream.Error marks which one (:idle_timed_out vs :timed_out).

The idle deadline resets on any child liveness signal: every STDOUT or STDERR data frame resets it, not only newline-terminated lines. A child that writes bytes without a newline, or writes only to stderr, still counts as alive. On idle expiry the same early-halt kill-and-confirm path runs (KILL the group, wait for the shim to confirm death, close the port) and Forcola.Stream.Error is raised with idle_timed_out: true after any lines already produced have been emitted. Omitting the option (the default) leaves behavior identical to a run bounded only by :timeout_ms.

Summary

Functions

Run argv and return its stdout as a lazy stream of lines.

Functions

lines(argv, opts)

@spec lines(
  [String.t(), ...],
  keyword()
) :: Enumerable.t()

Run argv and return its stdout as a lazy stream of lines.

Takes the same options as Forcola.run/2, including :user/:group for running the child as a different user (POSIX-only, fail-closed, privileged shim required); :timeout_ms is required and bounds the whole run, not the gap between lines.

:idle_timeout_ms (optional, milliseconds; default nil = disabled) bounds the gap between output frames: if no STDOUT or STDERR data arrives within the interval the producer is treated as stalled, the process group is killed, and Forcola.Stream.Error is raised with idle_timed_out: true. It is independent of and composable with :timeout_ms; whichever bound fires first wins. See the module docs for the exact reset semantic.

Lines are emitted without their trailing newline. A partial line held across frame boundaries is emitted once its newline arrives; a final partial line with no newline is emitted before the stream terminates.

See the module docs for how termination is surfaced.