Omni.Session (Omni Agent v0.3.1)

Copy Markdown View Source

A Session wraps Omni.Agent with identity, persistence, and a branching message tree.

Where an agent holds one in-memory conversation, a session adds the things you need to build a real application around it:

  • Identity — every session has an id. Grab it, hand it around, reopen the conversation later with load: id.
  • A branching tree — regenerate a turn, edit a user message, or switch between alternate replies. The full history stays in the tree; nothing is overwritten.
  • Pluggable persistence — turns commit through an Omni.Session.Store adapter. The reference adapter writes to disk; write your own for Postgres, S3, or anywhere else.

Sessions otherwise behave like agents — prompt in, stream events out. Agent events are forwarded to Session subscribers re-tagged as {:session, pid, type, data}, alongside session-specific events for the tree, title, and store.

Starting and resuming

Every session has an id. Start a new one with :new (or omit for an auto-generated id), or reopen an existing one with :load.

store = {Omni.Session.Store.FileSystem, base_path: "priv/sessions"}

# Fresh session, auto-generated id
{:ok, session} = Omni.Session.start_link(
  agent: [model: {:anthropic, "claude-sonnet-4-6"}],
  store: store,
  subscribe: true
)

:ok = Omni.Session.prompt(session, "Name three mountains.")

Grab the id for later:

id = Omni.Session.get_snapshot(session).id
Omni.Session.stop(session)

Reopen the same session in a new process, after a restart, or days later:

{:ok, session} = Omni.Session.start_link(
  load: id,
  agent: [model: {:anthropic, "claude-sonnet-4-6"}],
  store: store
)

Load restores the persisted model, system prompt, opts, title, and full message tree. Tools are supplied fresh each boot — function references aren't safely serialisable. See Load-mode resolution below for field-by-field reconciliation between persisted state and start opts.

Omitting both :new and :load is equivalent to new: :auto. Passing an explicit new: "my-id" that collides with an existing persisted session returns {:error, :already_exists}. Supplying both :new and :load raises {:error, :ambiguous_mode}.

Branching and navigation

The message tree lets a session carry multiple children at any node — alternate replies, edits, or scratch branches. Three operations cover the common UX:

# Regenerate a turn — replay the target user message to get a
# fresh assistant reply; the original reply stays as a sibling
Omni.Session.branch(session, user_node_id)

# Edit the next user message — append a new user + turn as a
# child of the target assistant
Omni.Session.branch(session, assistant_node_id, "Try it this way.")

# Switch branches — move the active path to expose a different
# branch as the live conversation
Omni.Session.navigate(session, node_id)

branch/3 also accepts nil as the target to create a new disjoint root — the atomic equivalent of navigate(session, nil) followed by a fresh prompt/3. navigate(session, nil) on its own clears the active path; the next prompt then creates a new root.

To explore the tree, use Omni.Session.get_tree/1 with the Omni.Session.Tree helpers: children/2, siblings/2, path_to/2, and Enumerable over the active path.

All three operations are idle-only — they return {:error, status} with the current status (:busy or :paused) when a turn is in flight.

navigate/2 always lands on a tip — after walking to the target it follows cursors down to a leaf so the resulting state is ready for a prompt. branch/2,3 deliberately ends the in-flight window on a non-tip node; if that turn is cancelled or errors, the tree rolls back to its pre-branch state (also extended to a tip) — as if the branch was never started.

Start options

  • :newbinary() or :auto. Start a fresh session with the given id, or an auto-generated one. Mutually exclusive with :load.
  • :loadbinary(). Load an existing session by id. Mutually exclusive with :new.
  • :agent (required) — keyword() or {module(), keyword()}. Agent start options; the optional module is a callback module that use Omni.Agent.
  • :store (required) — {module(), keyword()} — a Omni.Session.Store adapter and its config.
  • :title — initial title string. Applied on :new only; ignored on :load (persisted title wins).
  • :subscribe — if true, subscribes the caller to session events as a :controller (see subscribe/1,2 for mode semantics).
  • :subscribers — list of pids (implicit :controller) or {pid, :controller | :observer} tuples to subscribe at startup.
  • :idle_shutdown_afternon_neg_integer() (ms) or nil. When a positive integer, the session self-shuts-down when the last controller unsubscribes (or dies) and the agent becomes idle. Unset / nil (the default) keeps the session running until explicit stop. Init does not evaluate — shutdown is only evaluated on transitions (a controller leaving, the agent going idle).
  • :name, :timeout, :hibernate_after, :spawn_opt, :debug — standard GenServer options.

Load-mode resolution

When loading, the persisted state is reconciled against start opts as follows:

FieldResolution
modelPersisted first; falls back to start opt if unresolvable. {:stop, :no_model} if neither is usable.
systemStart opt wins; falls back to persisted.
optsStart opt wins; falls back to persisted.
toolsStart opt only. Never persisted (function refs).
titlePersisted only. title: start option is ignored.
messagesDerived from the persisted tree. agent: [messages: _] is silently ignored.

On :new, agent: [messages: _] is rejected with {:error, :initial_messages_not_supported} — the tree is the sole entry point for messages.

new: "explicit-id" is rejected with {:error, :already_exists} when the id is already persisted in the store. new: :auto skips the check (128-bit entropy makes collision effectively impossible).

Auto-generated ids

new: :auto (and no mode supplied) generates 22-character URL-safe base64 with 128 bits of entropy:

:crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false)

Events

Subscribers receive {:session, pid, type, data} messages. Most events are Agent events forwarded verbatim except for the tag change:

{:agent, agent_pid, type, data}    {:session, session_pid, type, data}

This includes streaming deltas, :message, :step, :turn, :pause, :retry, :cancelled, :error, :state, :status, :tool_result.

Session-specific events:

{:session, pid, :tree,  %{tree: Tree.t(), new_nodes: [node_id()]}}
{:session, pid, :title, String.t() | nil}
{:session, pid, :store, {:saved, :tree | :state}}
{:session, pid, :store, {:error, :tree | :state, reason}}

At a turn commit, the event order is:

:turn (forwarded)  :tree  :store {:saved, :tree}

Session commits the turn's messages into the tree after forwarding the Agent's :turn event, then persists. Subscribers that want the logical turn boundary listen on :turn; subscribers that want the tree-structure change listen on :tree.

When a branch/2,3 turn is cancelled or errors, Session rolls the tree back to its pre-branch state and resyncs the Agent. The order is:

:cancelled (or :error)  :tree (restored)  :store {:saved, :tree}
   :state (forwarded from the resync)

Persistence

Session writes through the store on two triggers:

  • Turn commitssave_tree with :new_node_ids, plus a :tree event and a :store {:saved, :tree} / {:error, :tree, _} event.
  • Agent :state eventssave_state only when the persistable subset (model, system, opts, title) has changed since last write. Changes to :tools or :private do not trigger a write.

All store calls are synchronous; Session never halts on store errors, only emits :store {:error, _, _}. Adapter-specific reasons (POSIX atoms, etc.) bubble up unwrapped.

Linking and crash behaviour

Session starts the Agent linked. Agent crashes propagate to the Session (no trap_exit) — an unhealthy Agent takes the Session down rather than limping on. Sessions are cheap to reopen via load:.

When the Session stops gracefully, it stops the linked Agent as part of its termination.

Agent context

Before starting the Agent, Session writes its identity into the agent's :private under the reserved :omni key:

state.private.omni == %{session_id: id, session_pid: session_pid}

This is available from init/1 onward — useful for callback modules that build session-aware tools at startup:

defmodule MyAgent do
  use Omni.Agent

  @impl Omni.Agent
  def init(state) do
    session_id = state.private.omni.session_id
    tools = [MyApp.Tools.navigator(session_id), ...]
    {:ok, %{state | tools: tools}}
  end
end

:omni is framework-owned. Any user-supplied private[:omni] is overwritten when the Session starts the Agent; other :private keys are preserved. Callback code is free to use any other key.

Pub/sub

subscribe/1,2,3 registers a pid and atomically returns an %Omni.Session.Snapshot{} capturing the current tree, title, and agent slice. Every event emitted after the subscribe call is delivered to the subscriber. Monitors clean up subscribers on death.

Subscribers have a :mode (default :controller). Controllers count toward keeping the session alive when :idle_shutdown_after is configured; observers receive events but never hold the session open. Subscriptions are idempotent per pid — re-subscribing with a different mode updates the mode in place.

Going further

For apps managing many concurrent sessions under one supervisor — with registry-backed id lookup and a live feed of session activity — see Omni.Session.Manager.

Summary

Functions

Appends a tool to the wrapped Agent's tools. Convenience over set_agent(:tools, _). Tools are not persisted.

Branches from node_id, reusing the target's user content to regenerate its turn. node_id must reference a user node.

Branches from node_id with new user content.

Cancels the current turn. See Omni.Agent.cancel/1.

Returns a specification to start this module under a supervisor.

Returns the wrapped Agent's %State{}. See Omni.Agent.get_state/1.

Returns a single field from the wrapped Agent's state. See Omni.Agent.get_state/2.

Returns an %Omni.Session.Snapshot{} of the session right now.

Returns the session's title, or nil if unset.

Returns the session's %Omni.Session.Tree{}.

Sets the active path to the node at node_id and extends down to a leaf via cursors. Pass nil to clear the path entirely.

Sends a prompt to the wrapped Agent. See Omni.Agent.prompt/3.

Removes the tool with the given name from the wrapped Agent. Silent no-op if no matching tool exists.

Resumes a paused Agent. See Omni.Agent.resume/2.

Replaces Agent configuration fields. Passthrough to Omni.Agent.set_state/2.

Replaces or transforms a single Agent field. Passthrough to Omni.Agent.set_state/3.

Sets the session title. Emits a :title event and triggers a save_state via the persistable-subset change-detection path (a same-value set is a no-op).

Starts a Session process linked to the caller.

Stops the Session gracefully. The linked Agent is stopped as part of termination.

Subscribes the caller to session events.

Subscribes the caller with opts, or subscribes a specific pid as :controller. Same semantics as subscribe/1.

Subscribes the given pid with opts. See subscribe/2 for :mode.

Unsubscribes the caller from session events.

Unsubscribes the given pid from session events.

Functions

add_tool(session, tool)

@spec add_tool(GenServer.server(), Omni.Tool.t()) ::
  :ok | {:error, :busy | :paused} | {:error, term()}

Appends a tool to the wrapped Agent's tools. Convenience over set_agent(:tools, _). Tools are not persisted.

branch(session, node_id)

@spec branch(GenServer.server(), Omni.Session.Tree.node_id()) ::
  :ok | {:error, :not_found | :busy | :paused | :not_user_node | term()}

Branches from node_id, reusing the target's user content to regenerate its turn. node_id must reference a user node.

The active path ends on the user for the in-flight window; the Agent sees messages up to and including the user's parent. On turn commit, the leading (duplicate) user message is dropped and the remainder is pushed as children of node_id.

Idle-only: returns {:error, status} with the current status (:busy or :paused) when a turn is in flight.

branch(session, node_id, content)

@spec branch(GenServer.server(), Omni.Session.Tree.node_id() | nil, term()) ::
  :ok | {:error, :not_found | :busy | :paused | :not_assistant_node | term()}

Branches from node_id with new user content.

  • When node_id is an assistant node, the new user + its turn appends as children of the assistant — "edit the next user message."
  • When node_id is nil, creates a new disjoint root with the given content — the atomic equivalent of navigate(session, nil) followed by prompt(session, content).

Idle-only: returns {:error, status} with the current status (:busy or :paused) when a turn is in flight.

cancel(session)

@spec cancel(GenServer.server()) :: :ok | {:error, :idle}

Cancels the current turn. See Omni.Agent.cancel/1.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_agent(session)

@spec get_agent(GenServer.server()) :: Omni.Agent.State.t()

Returns the wrapped Agent's %State{}. See Omni.Agent.get_state/1.

get_agent(session, key)

@spec get_agent(GenServer.server(), atom()) :: term()

Returns a single field from the wrapped Agent's state. See Omni.Agent.get_state/2.

get_snapshot(session)

@spec get_snapshot(GenServer.server()) :: Omni.Session.Snapshot.t()

Returns an %Omni.Session.Snapshot{} of the session right now.

get_title(session)

@spec get_title(GenServer.server()) :: String.t() | nil

Returns the session's title, or nil if unset.

get_tree(session)

@spec get_tree(GenServer.server()) :: Omni.Session.Tree.t()

Returns the session's %Omni.Session.Tree{}.

prompt(session, content, opts \\ [])

@spec prompt(GenServer.server(), term(), keyword()) :: :ok

Sends a prompt to the wrapped Agent. See Omni.Agent.prompt/3.

remove_tool(session, tool_name)

@spec remove_tool(GenServer.server(), String.t()) ::
  :ok | {:error, :busy | :paused} | {:error, term()}

Removes the tool with the given name from the wrapped Agent. Silent no-op if no matching tool exists.

resume(session, decision)

@spec resume(GenServer.server(), term()) :: :ok | {:error, :idle | :busy}

Resumes a paused Agent. See Omni.Agent.resume/2.

set_agent(session, opts)

@spec set_agent(
  GenServer.server(),
  keyword()
) :: :ok | {:error, :busy | :paused} | {:error, term()}

Replaces Agent configuration fields. Passthrough to Omni.Agent.set_state/2.

Changes to :model, :system, or :opts trigger a save_state via the :state event path; other settable keys do not persist.

set_agent(session, field, value_or_fun)

@spec set_agent(GenServer.server(), atom(), term() | (term() -> term())) ::
  :ok | {:error, :busy | :paused} | {:error, term()}

Replaces or transforms a single Agent field. Passthrough to Omni.Agent.set_state/3.

set_title(session, title)

@spec set_title(GenServer.server(), String.t() | nil) :: :ok

Sets the session title. Emits a :title event and triggers a save_state via the persistable-subset change-detection path (a same-value set is a no-op).

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a Session process linked to the caller.

See the moduledoc for the full option list. :agent and :store are required.

stop(session)

@spec stop(GenServer.server()) :: :ok

Stops the Session gracefully. The linked Agent is stopped as part of termination.

subscribe(session)

@spec subscribe(GenServer.server()) :: {:ok, Omni.Session.Snapshot.t()}

Subscribes the caller to session events.

Returns {:ok, %Omni.Session.Snapshot{}} — the snapshot captures the current tree, title, and a consistent agent slice at the instant of subscription. Subsequent events are delivered as {:session, pid, type, data}.

Accepts mode: :controller | :observer (default :controller). Controllers count toward keeping the session alive when :idle_shutdown_after is configured; observers do not. Calling subscribe/1,2 twice from the same pid is idempotent; passing a different mode on the second call updates the pid's mode in place.

subscribe(session, opts)

@spec subscribe(GenServer.server(), keyword() | pid()) ::
  {:ok, Omni.Session.Snapshot.t()}

Subscribes the caller with opts, or subscribes a specific pid as :controller. Same semantics as subscribe/1.

subscribe(session, pid, opts)

@spec subscribe(GenServer.server(), pid(), keyword()) ::
  {:ok, Omni.Session.Snapshot.t()}

Subscribes the given pid with opts. See subscribe/2 for :mode.

unsubscribe(session)

@spec unsubscribe(GenServer.server()) :: :ok

Unsubscribes the caller from session events.

unsubscribe(session, pid)

@spec unsubscribe(GenServer.server(), pid()) :: :ok

Unsubscribes the given pid from session events.