Omni.Agent behaviour (Omni Agent v0.5.0)

Copy Markdown View Source

License

Stateful LLM agents for Elixir — persistent, branching conversations, tool approval, steering, and multi-session management. Built on Omni.

The layers

Each layer is a standalone building block. Pick the one that matches the scope of what you're building — you can stop at any level.

ModuleWhat is it
Omni.Session.Managermany sessions — supervision, registry, live feed
Omni.Sessionpersistent conversation — branching, regen, navigation
Omni.Agentstateful conversation — tools, callbacks, events
Omnistateless LLM API — stream_text, tools, structs

Quick start

Start an agent without a callback module for simple conversations:

{:ok, agent} = Omni.Agent.start_link(
  model: {:anthropic, "claude-sonnet-4-6"},
  subscribe: true
)
:ok = Omni.Agent.prompt(agent, "Hello!")

# Events arrive as process messages
receive do
  {:agent, ^agent, :text_delta, %{delta: text}} -> IO.write(text)
  {:agent, ^agent, :turn, {:stop, response}} -> IO.puts("\nDone!")
end

Pass subscribe: true (or subscribers: [pid1, pid2]) to receive events. Other processes can attach later via subscribe/1,2.

Custom agents

Define a module with use Omni.Agent to customize behaviour through lifecycle callbacks. All callbacks are optional with sensible defaults.

init/1 receives the fully-resolved initial %State{} and returns a possibly-modified state. Use it to bake in defaults (system prompt, tools) or to customize the state based on per-invocation input passed via :private:

defmodule GreeterAgent do
  use Omni.Agent

  @impl Omni.Agent
  def init(state) do
    system = "You are a helpful assistant. The user's name is #{state.private.user}."
    {:ok, %{state | system: system}}
  end

  @impl Omni.Agent
  def handle_turn(%{stop_reason: :length}, state) do
    {:continue, "Continue where you left off.", state}
  end

  def handle_turn(_response, state), do: {:stop, state}
end

{:ok, agent} = GreeterAgent.start_link(
  model: {:anthropic, "claude-sonnet-4-6"},
  private: %{user: "Alice"}
)

For static defaults — system prompt, tools, inference opts — set them in init/1 directly rather than overriding start_link/1:

defmodule ResearchAgent do
  use Omni.Agent

  @impl Omni.Agent
  def init(state) do
    state = %{state |
      system: "You are a research assistant.",
      tools: [SearchTool.new(), FetchTool.new()]
    }
    {:ok, state}
  end
end

{:ok, agent} = ResearchAgent.start_link(
  model: {:anthropic, "claude-sonnet-4-6"}
)

Start options

Options for start_link/1 and start_link/2:

  • :model (required) — {provider_id, model_id} tuple or %Model{}
  • :system — system prompt string
  • :messages — initial %Message{} list. Must be empty or end with an :assistant message containing no %ToolUse{} blocks
  • :tools — list of %Tool{} structs
  • :private — initial private map (runtime state visible in callbacks via state.private). The :omni key is reserved for framework- injected context when running under Omni.Session — see Omni.Agent.State
  • :subscribe — if true, subscribes the caller to agent events
  • :subscribers — list of pids to subscribe to agent events
  • :tool_timeout — tool execution timeout. Either an integer in ms (applied to all tools) or a 1-arity function receiving the tool name and returning a timeout — the batch uses the maximum. Default 5_000
  • :opts — inference options passed to stream_text each step (:temperature, :max_tokens, :max_steps, etc.)
  • :name, :timeout, :hibernate_after, :spawn_opt, :debug — standard GenServer options

Events

Subscribers receive {:agent, pid, type, data} messages. Attach one or more via subscribe/1,2 (or the :subscribe / :subscribers start options). There are two categories:

Streaming events — forwarded from each LLM response as it arrives:

{:agent, pid, :text_start,     %{index: 0}}
{:agent, pid, :text_delta,     %{index: 0, delta: "Hello"}}
{:agent, pid, :text_end,       %{index: 0, content: %Text{}}}
{:agent, pid, :thinking_start, %{index: 0}}
{:agent, pid, :thinking_delta, %{index: 0, delta: "..."}}
{:agent, pid, :thinking_end,   %{index: 0, content: %Thinking{}}}
{:agent, pid, :tool_use_start, %{index: 1, id: "call_1", name: "search"}}
{:agent, pid, :tool_use_delta, %{index: 1, delta: "{\"q\":"}}
{:agent, pid, :tool_use_end,   %{index: 1, content: %ToolUse{}}}

Agent-level events — emitted by the agent at lifecycle boundaries:

{:agent, pid, :message,     %Message{}}                        # message appended to pending
{:agent, pid, :tool_result, %ToolResult{}}                     # tool executed, result available
{:agent, pid, :step,        %Response{}}                       # step complete, per-step response
{:agent, pid, :turn,        {:continue, %Response{}}}          # turn committed, next turn follows
{:agent, pid, :turn,        {:stop, %Response{}}}              # turn committed, agent idle
{:agent, pid, :pause,       {reason, %ToolUse{}}}              # waiting for tool decision
{:agent, pid, :retry,       reason}                            # non-terminal error, agent retrying
{:agent, pid, :error,       reason}                            # terminal error, agent goes idle
{:agent, pid, :cancelled,   %Response{stop_reason: :cancelled}} # cancel/1 invoked; pending discarded
{:agent, pid, :state,       %State{}}                          # set_state mutation applied
{:agent, pid, :status,      :idle | :busy | :paused}           # status transitioned

:message fires each time a message is appended to the in-flight pending queue — the initial user message, each assistant response after streaming completes, the tool-result user message after execution, and the continuation user message after {:continue, _, _}. It arrives after all streaming deltas for that message and before the next lifecycle event.

:step fires after each LLM request-response completes. The response's messages field always carries exactly [user, assistant] — the user message that prompted the step (the initial turn prompt, a continuation prompt, or a tool-result user message) and the assistant response.

:turn fires each time a turn completes and commits that turn's pending messages to state.messages. {:continue, response} means handle_turn/2 requested another turn — a continuation user message is appended and a new turn begins immediately. {:stop, response} means the agent is idle. A single prompt/3 therefore produces one :turn {:stop, _}, optionally preceded by any number of :turn {:continue, _} events; each event's response carries only the just-committed turn's messages and usage, not the cumulative across continuations.

:cancelled fires after cancel/1 with stop_reason: :cancelled — pending messages are discarded (state.messages unchanged). :error fires after handle_error/2 returns {:stop, state} — pending messages are discarded and the agent goes idle. :state fires after a successful set_state/2,3 call with the full new %State{}. :status fires on every lifecycle transition (:idle/:busy/:paused) and always precedes the derived event that caused the transition (e.g. :status :idle before :turn {:stop, _}). A simple chatbot (one step per prompt) sees :status :busy → :message(user) → :message(assistant) → :step → :status :idle → :turn {:stop, _}.

Tools and the agent loop

The agent manages its own tool execution loop, separate from the stateless loop used by generate_text/stream_text. This enables per-tool approval gates and pause/resume — capabilities that the stateless loop cannot support.

When the model responds with tool use blocks, the agent processes them in two phases:

  1. Decision phasehandle_tool_use/2 is called sequentially for each tool use. Return {:execute, state} to approve, {:reject, reason, state} to send an error result, {:result, result, state} to provide a result directly, or {:pause, reason, state} to wait for external input via resume/2.

  2. Execution phase — approved tools run in parallel. Results (from execution, rejection, and direct provision) are passed to handle_tool_result/2, then sent back to the model as a user message. The agent spawns the next LLM request automatically.

If a tool has no handler and falls through handle_tool_use/2 without being intercepted (via {:result, ...} or {:pause, ...}), the agent stops the turn — handle_turn/2 fires with stop_reason: :tool_use.

Pub/sub and snapshot

The agent maintains a set of subscribed pids. Use subscribe/1,2 to attach (returns {:ok, %Snapshot{}} atomically so late joiners can align with in-flight state), and unsubscribe/1,2 to detach. Subscribers are cleaned up automatically when they die.

get_snapshot/1 returns a %Omni.Agent.Snapshot{} holding the committed state, the pending messages of the in-flight turn, and the current streaming assistant message (if any). Compose everything known right now as state.messages ++ pending ++ List.wrap(partial).

Pause and resume

When handle_tool_use/2 returns {:pause, reason, state}, the agent pauses and sends {:agent, pid, :pause, {reason, %ToolUse{}}} to its subscribers. The reason is an app-defined term (e.g., :authorize, :ui_input) that tags why the agent paused. The caller inspects the tool use and resumes:

Agent.resume(agent, :execute)              # execute the tool
Agent.resume(agent, {:reject, "Denied"})   # reject with error result
Agent.resume(agent, {:result, result})      # provide a result directly

After resuming, the agent continues processing remaining tool decisions.

Prompt queuing

Calling prompt/3 while the agent is busy or paused stages the content for the next turn boundary. When the current step sequence completes:

  • handle_turn/2 fires as normal (for bookkeeping, state updates)
  • The staged prompt overrides handle_turn's decision — the agent continues with the staged content regardless of whether the callback returned {:stop, state} or {:continue, ...}

This enables steering an autonomous agent mid-run:

:ok = Agent.prompt(agent, "Stop what you're doing, focus on X instead")

Calling prompt/3 again replaces the staged prompt (last-one-wins).

Autonomous agents

The difference between a chatbot (one step per prompt) and an autonomous agent (works until done) is entirely in the callbacks. A completion tool with a trivial handler serves as the signal — the agent loops until the model calls it:

defmodule ResearchAgent do
  use Omni.Agent

  @impl Omni.Agent
  def init(state) do
    state = %{state |
      system: "You are a research assistant. Use your tools to research, " <>
              "then call task_complete with your findings.",
      tools: [SearchTool.new(), FetchTool.new(), task_complete()],
      opts: Keyword.put(state.opts, :max_steps, 30)
    }
    {:ok, state}
  end

  @impl Omni.Agent
  def handle_turn(%{stop_reason: :length}, state) do
    {:continue, "Continue where you left off.", state}
  end

  def handle_turn(response, state) do
    if completion_tool_called?(response) do
      {:stop, state}
    else
      {:continue, "Continue working. Call task_complete when finished.", state}
    end
  end

  defp task_complete do
    Omni.tool(
      name: "task_complete",
      description: "Call when the task is fully complete.",
      input_schema: Omni.Schema.object(
        %{result: Omni.Schema.string(description: "Summary of what was accomplished")},
        required: [:result]
      ),
      handler: fn _input -> "OK" end
    )
  end

  defp completion_tool_called?(response) do
    Enum.any?(response.messages, fn message ->
      Enum.any?(message.content, fn
        %Omni.Content.ToolUse{name: "task_complete"} -> true
        _ -> false
      end)
    end)
  end
end

Steps, turns, and max_steps

The agent loop has two levels:

  • Step — a single LLM request-response cycle. If the model calls tools, the agent handles them and makes another request. Each request is one step.
  • Turn — starts with prompt/3, ends with {:stop, response}. A turn may contain multiple steps. handle_turn/2 fires when the model responds without executable tools. If it returns {:continue, ...}, the agent keeps working within the same turn.

Each step emits a :step event with the per-step response. Turn boundaries emit :turn events. A turn with continuation looks like this:

turn
  step 1  :step  tool_use  step 2  :step  tool_use
     step 3  :step  handle_turn
     {:continue, "keep going"}  :turn {:continue, _}
  step 4  :step  handle_turn
     {:stop, state}  :turn {:stop, _}

:max_steps (default :infinity) caps the total number of LLM requests across the turn. Set it in :opts at startup or override per-prompt via prompt/3:

Agent.prompt(agent, "Do exhaustive research", max_steps: 50)

The step counter (state.step) is visible in all callbacks.

LiveView integration

Subscribe on mount and handle events in handle_info/2:

def mount(_params, _session, socket) do
  {:ok, agent} = Omni.Agent.start_link(model: ..., subscribe: true)
  {:ok, assign(socket, agent: agent)}
end

def handle_event("submit", %{"prompt" => text}, socket) do
  :ok = Agent.prompt(socket.assigns.agent, text)
  {:noreply, socket}
end

def handle_info({:agent, _pid, :text_delta, %{delta: text}}, socket) do
  {:noreply, stream_insert(socket, :chunks, %{text: text})}
end

def handle_info({:agent, _pid, :turn, {:stop, _response}}, socket) do
  {:noreply, assign(socket, :status, :complete)}
end

def handle_info({:agent, _pid, :error, reason}, socket) do
  {:noreply, put_flash(socket, :error, "Agent error: #{inspect(reason)}")}
end

Going further

Omni.Session wraps an agent with conversation identity, a branching message tree, and pluggable persistence. The events documented above are forwarded re-tagged as {:session, pid, type, data}, with tree- and store-level events added on top. Sessions support regenerating a turn, editing a user message, and navigating between alternate replies.

Omni.Session.Manager supervises many sessions under a registry, with idle-shutdown and a live feed of session activity — drop-in for apps running multiple concurrent conversations.

Summary

Callbacks

Called when an LLM request fails entirely.

Called after each tool executes, before results are sent to the model.

Called for each tool use block during the decision phase.

Called when the model completes without executable tools.

Called when the agent starts.

Called when the agent process terminates.

Functions

Cancels the current turn.

Returns a %Omni.Agent.Snapshot{} of the agent right now.

Returns the agent's %State{} struct or a single field from it.

Sends a prompt to the agent.

Resumes a paused agent with a tool decision.

Replaces agent configuration fields. Idle only. Atomic.

Replaces or transforms a single configuration field. Idle only.

Starts and links an agent process without a callback module.

Starts and links an agent process with the given callback module.

Subscribes the caller to agent events.

Subscribes the given pid to agent events.

Unsubscribes the caller from agent events.

Unsubscribes the given pid from agent events.

Callbacks

handle_error(error, state)

@callback handle_error(error :: term(), state :: Omni.Agent.State.t()) ::
  {:stop, Omni.Agent.State.t()} | {:retry, Omni.Agent.State.t()}

Called when an LLM request fails entirely.

This fires when stream_text returns {:error, reason} — a network failure, authentication error, or other request-level problem. This is distinct from handle_turn with a :length or :refusal stop reason, which means the request succeeded but the model couldn't complete normally.

Return {:stop, state} to surface the error to subscribers (the agent discards pending messages and goes idle), or {:retry, state} to retry the same step immediately.

Default: {:stop, state}.

handle_tool_result(result, state)

@callback handle_tool_result(
  result :: Omni.Content.ToolResult.t(),
  state :: Omni.Agent.State.t()
) ::
  {:ok, Omni.Content.ToolResult.t(), Omni.Agent.State.t()}

Called after each tool executes, before results are sent to the model.

Invoked sequentially for each result after all approved tools have finished executing in parallel. Return {:ok, result, state} to pass the result through, or modify result before returning to alter what the model sees.

Default: {:ok, result, state}.

handle_tool_use(tool_use, state)

@callback handle_tool_use(
  tool_use :: Omni.Content.ToolUse.t(),
  state :: Omni.Agent.State.t()
) ::
  {:execute, Omni.Agent.State.t()}
  | {:reject, term(), Omni.Agent.State.t()}
  | {:result, Omni.Content.ToolResult.t(), Omni.Agent.State.t()}
  | {:pause, term(), Omni.Agent.State.t()}

Called for each tool use block during the decision phase.

When the model responds with tool use blocks, this callback is invoked sequentially for each one before any tools execute. Return values:

  • {:execute, state} — queue the tool for execution
  • {:reject, reason, state} — send an error result to the model
  • {:result, result, state} — provide a %ToolResult{} directly, skip execution
  • {:pause, reason, state} — pause the agent and send {:agent, pid, :pause, {reason, tool_use}} to subscribers; resume later with resume/2

After all decisions are collected, approved tools execute in parallel. Rejected and provided results are merged with executed results.

Default: {:execute, state}.

handle_turn(response, state)

@callback handle_turn(response :: Omni.Response.t(), state :: Omni.Agent.State.t()) ::
  {:stop, Omni.Agent.State.t()} | {:continue, term(), Omni.Agent.State.t()}

Called when the model completes without executable tools.

Fires after the model responds and there are no tools to execute — either the model returned text only, or all tool uses were handled during the decision phase. Check response.stop_reason for why the model stopped:

  • :stop — the model finished naturally
  • :tool_use — tool use blocks present but no handlers available
  • :length — output was truncated (hit max output tokens)
  • :refusal — the model declined due to content or safety policy

Return {:stop, state} to end the current turn and go idle (subscribers receive {:agent, pid, :turn, {:stop, response}}), or {:continue, content, state} to commit the current turn and start a new turn with content as the next user message (subscribers receive {:agent, pid, :turn, {:continue, response}}). The content argument accepts a string or a list of content blocks.

If a staged prompt exists (from prompt/3 while busy), it overrides this callback's decision. See the "Prompt queuing" section in the moduledoc.

Default: {:stop, state}.

init(state)

@callback init(state :: Omni.Agent.State.t()) ::
  {:ok, Omni.Agent.State.t()} | {:error, term()}

Called when the agent starts.

Receives the fully-resolved initial %State{} (start options merged, model resolved) and returns a possibly-modified state. The callback can tweak any field — inject :private, preload :messages, swap the :system prompt, add :tools.

The returned state is validated against the :messages invariant (empty or ending with an :assistant message containing no %ToolUse{} blocks). An invalid returned state causes start_link to fail with {:error, :invalid_messages}.

Return {:error, reason} to refuse startup.

Default: {:ok, state} (identity).

terminate(reason, state)

@callback terminate(reason :: term(), state :: Omni.Agent.State.t()) :: term()

Called when the agent process terminates.

Use for cleaning up resources acquired in init/1. Receives the shutdown reason and the current state. Standard GenServer termination semantics apply.

Default: no-op.

Functions

cancel(agent)

@spec cancel(GenServer.server()) :: :ok | {:error, :idle}

Cancels the current turn.

Kills any running tasks, discards pending messages, and emits {:agent, pid, :cancelled, %Response{stop_reason: :cancelled}}. The agent's state.messages remains unchanged.

Returns {:error, :idle} if the agent is already idle.

get_snapshot(agent)

@spec get_snapshot(GenServer.server()) :: Omni.Agent.Snapshot.t()

Returns a %Omni.Agent.Snapshot{} of the agent right now.

The snapshot bundles the committed state, the in-flight turn's pending messages, and the currently-streaming assistant message (if any). Compose everything the agent knows as state.messages ++ pending ++ List.wrap(partial).

get_state(agent)

@spec get_state(GenServer.server()) :: Omni.Agent.State.t()

Returns the agent's %State{} struct or a single field from it.

With no key, returns the full %State{}. With a key, returns the value of that field (or nil for unknown keys).

Agent.get_state(agent)             #=> %State{model: ..., messages: [...], ...}
Agent.get_state(agent, :status)    #=> :idle
Agent.get_state(agent, :messages)  #=> [%Message{}, ...]
Agent.get_state(agent, :private)   #=> %{}

get_state(agent, key)

@spec get_state(GenServer.server(), atom()) :: term()

prompt(agent, content, opts \\ [])

@spec prompt(GenServer.server(), term(), keyword()) :: :ok

Sends a prompt to the agent.

content accepts a string (wrapped in a Text block) or a list of content blocks (for attachments or ToolResult blocks for manual tool execution). Options are merged on top of the agent's default :opts for this turn only.

Behaviour depends on agent status:

  • Idle — starts a new turn immediately.
  • Running or paused — stages the content for the next turn boundary, overriding handle_turn's decision. See "Prompt queuing" in the moduledoc.

resume(agent, decision)

@spec resume(
  GenServer.server(),
  :execute | {:reject, term()} | {:result, Omni.Content.ToolResult.t()}
) ::
  :ok | {:error, :idle | :busy}

Resumes a paused agent with a tool decision.

Only valid when the agent is :paused (from handle_tool_use/2 returning {:pause, reason, state}). The agent continues processing remaining tool decisions after resuming.

  • :execute — queue the pending tool for execution
  • {:reject, reason} — reject with an error result sent to the model
  • {:result, result} — provide a %ToolResult{} directly

Returns {:error, status} with the current status (:idle or :busy) if the agent is not paused.

set_state(agent, opts)

@spec set_state(
  GenServer.server(),
  keyword()
) :: :ok | {:error, :busy | :paused} | {:error, term()}

Replaces agent configuration fields. Idle only. Atomic.

Accepts the following keys:

  • :model — replace the model. Resolved via Omni.get_model/2. Fails with {:error, {:model_not_found, ref}} if not found
  • :system — replace the system prompt
  • :messages — replace the committed message history. Must be empty or end with an :assistant message containing no %ToolUse{} blocks; otherwise fails with {:error, :invalid_messages}
  • :tools — replace the tool list
  • :opts — replace inference opts

All values are replaced, not merged. To merge opts, use the function form of set_state/3.

:private is not settable — callback modules own mutation via %{state | private: ...}.

Unrecognized keys return {:error, {:invalid_key, key}}. Returns {:error, status} with the current status (:busy or :paused) when the agent is not idle.

set_state(agent, field, value_or_fun)

@spec set_state(GenServer.server(), atom(), term() | (term() -> term())) ::
  :ok | {:error, :busy | :paused} | {:error, term()}

Replaces or transforms a single configuration field. Idle only.

When value_or_fun is a value, replaces the field directly. When value_or_fun is a 1-arity function, calls it with the current value and uses the return as the new value.

Agent.set_state(agent, :system, "Be concise.")
Agent.set_state(agent, :opts, fn opts -> Keyword.merge(opts, temperature: 0.7) end)
Agent.set_state(agent, :tools, fn tools -> [new_tool | tools] end)

Settable fields: :model, :system, :messages, :tools, :opts. Returns {:error, {:invalid_key, field}} for other fields. Returns {:error, :invalid_messages} if :messages fails the invariant. Returns {:error, status} with the current status (:busy or :paused) when the agent is not idle.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts and links an agent process without a callback module.

All default callbacks apply (single turn per prompt, all tools auto-executed, errors stop the agent). See "Start options" in the moduledoc for accepted keys.

start_link(module, opts)

@spec start_link(
  module() | nil,
  keyword()
) :: GenServer.on_start()

Starts and links an agent process with the given callback module.

The module must use Omni.Agent. See "Start options" in the moduledoc for accepted keys.

subscribe(agent)

@spec subscribe(GenServer.server()) :: {:ok, Omni.Agent.Snapshot.t()}

Subscribes the caller to agent events.

Returns {:ok, %Snapshot{}} — the snapshot captures the agent's state, the in-flight turn's pending messages, and any currently-streaming assistant message at the instant of subscription. Every event emitted after this call is delivered to the caller, so a subscriber joining mid-stream can populate its view and keep up without gaps.

Subscribing is idempotent. Subscribers are monitored and removed automatically when they die.

subscribe(agent, pid)

@spec subscribe(GenServer.server(), pid()) :: {:ok, Omni.Agent.Snapshot.t()}

Subscribes the given pid to agent events.

Same semantics as subscribe/1, but for an arbitrary pid rather than the caller.

unsubscribe(agent)

@spec unsubscribe(GenServer.server()) :: :ok

Unsubscribes the caller from agent events.

unsubscribe(agent, pid)

@spec unsubscribe(GenServer.server(), pid()) :: :ok

Unsubscribes the given pid from agent events.