Dsxir. Stream
(dsxir v0.5.0)
Copy Markdown
Lazy streaming of a single predictor call — the DSPy streamify analog.
run/4 returns an Enumerable that, when enumerated, runs predictor name on
program and yields %Dsxir.Stream.Event{} values as tokens arrive, ending
with the final %Dsxir.Prediction{} as the last element:
program
|> Dsxir.stream(:answer, %{question: "..."}, listen: [:answer])
|> Enum.each(fn
%Dsxir.Stream.Event{type: :field_delta, data: token} -> IO.write(token)
%Dsxir.Prediction{} = prediction -> IO.inspect(prediction)
end)opts accepts :listen (field-level streaming) and any other predictor opt;
the :stream sink is supplied internally. :stream_timeout (default 60s)
bounds the wait between events.
A single named predictor is streamed because only Dsxir.Module.Runtime.call/4
propagates per-call opts to the predictor — the multi-node executor does not —
and per-predictor listening mirrors how DSPy attaches stream listeners.
Process model
The forward pass runs in a task under Dsxir.TaskSupervisor, replaying the
caller's Dsxir.Settings.snapshot/0 (captured eagerly at call time) so the
task resolves the same lm/adapter/metadata. The producer-to-consumer hand-off
is a mailbox: the sink sends each event to the enumerating process, and the
task's return value delivers the prediction last. The consuming process stays
responsive; a forward-pass crash surfaces as a raised exception during
enumeration rather than killing the consumer, and halting the stream early
reaps the task.
Summary
Functions
Return a lazy Enumerable that streams predictor name on program, yielding
%Dsxir.Stream.Event{} values followed by the final %Dsxir.Prediction{}. See
the module doc for the process model and recognised opts.
Functions
@spec run(Dsxir.Program.t(), atom(), map(), keyword()) :: Enumerable.t()
Return a lazy Enumerable that streams predictor name on program, yielding
%Dsxir.Stream.Event{} values followed by the final %Dsxir.Prediction{}. See
the module doc for the process model and recognised opts.