SkillKit.Stream (SkillKit v0.4.0)

Copy Markdown View Source

Lazy stream of agent events from the caller's mailbox.

When an agent is started with caller: self(), the caller process receives a stream of structs as the turn unfolds (%Event.Delta{}, %Event.ToolCallComplete{}, %Types.AssistantMessage{}, %Event.Error{}, ...). stream/2 exposes that as an Enumerable so rendering, broadcasting, filtering, etc. compose via the standard library's Stream.* and Enum.*:

agent.name
|> SkillKit.Stream.stream()
|> Stream.each(&print/1)
|> Enum.reduce(:timeout, fn event, _ -> event end)
|> case do
  %AssistantMessage{} = msg -> {:ok, msg}
  %Event.Error{reason: reason} -> {:error, reason}
  :timeout -> {:error, :timeout}
end

Summary

Functions

Returns a lazy Enumerable of events from the caller's mailbox for one turn of agent_name.

Functions

stream(agent_name, opts \\ [])

@spec stream(
  String.t(),
  keyword()
) :: Enumerable.t()

Returns a lazy Enumerable of events from the caller's mailbox for one turn of agent_name.

The stream emits every event reaching the caller (including sub-loops) in arrival order, and ends when:

  • the root agent's %AssistantMessage{} or %Event.Error{} arrives — it is emitted as the final element; or
  • the per-receive timeout elapses with no new event — no further element is emitted.

Sub-loop terminal events flow through like any other event and do not end the stream.

Must be called from the process registered as :caller in SkillKit.start_agent/2.

Options

  • :timeout — ms to wait for the next event, or :infinity (default: 60000).