ExAthena.Session (ExAthena v0.6.0)

Copy Markdown View Source

GenServer that owns a multi-turn conversation.

A Session is the right abstraction when you want:

  • the message history to persist across multiple user turns,
  • resumable state across LiveView reconnects,
  • streaming deltas broadcast to subscribers,
  • an identifiable process pid / name you can monitor.

For one-shot agent runs, use ExAthena.Loop.run/2 directly. For truly stateless single-turn inference, ExAthena.query/2.

Usage

{:ok, pid} = ExAthena.Session.start_link(
  provider: :ollama,
  model: "llama3.1",
  tools: :all,
  cwd: "/path/to/project"
)

{:ok, result} = ExAthena.Session.send_message(pid, "read mix.exs and list deps")
IO.puts(result.text)

ExAthena.Session.stop(pid)

Each send_message appends to the session's message list, runs the agent loop to completion, and returns the final result. Subsequent messages include the full prior history, so the model has context.

Session resume

Pass :messages to start_link/1 to seed the conversation with a prior history, typically obtained from resume/2:

{:ok, msgs} = ExAthena.Session.resume(session_id, store: :ets)
{:ok, pid}  = ExAthena.Session.start_link(
  provider: :ollama,
  messages: msgs,
  session_id: session_id
)

When the configured store implements ExAthena.Sessions.SchemaStore (currently only ETS), the session also dual-writes every message to the row tables so resume/2 can read from them directly.

Session rewind

Drop messages after a saved snapshot, leaving the session alive at that point so the next send_message continues from the rewound state:

{:ok, snap} = ExAthena.Session.checkpoint(session_id, store: :ets)
# ... more turns ...
{:ok, info} = ExAthena.Session.rewind(session_id, {:snapshot, snap.id}, store: :ets)
info.messages_deleted  # number of messages dropped

Snapshots beyond the rewind anchor are deliberately kept as potential redo targets; no separate redo API exists in v1.

Summary

Functions

Write (or return) a named savepoint anchored at a specific message.

Returns a specification to start this module under a supervisor.

Clone a session row and a prefix of its messages under a new session_id.

Return the current message list (for debugging / persistence).

Resume a session by reading prior messages back from a store.

Drop all messages after a snapshot or message anchor, leaving the session alive at that point.

Send a user message; blocks until the loop terminates.

Return the stable session id assigned at start.

Start a session. Accepts the same options as ExAthena.Loop.run/2 plus

Stop the session.

Types

rewind_target()

@type rewind_target() :: {:snapshot, String.t()} | {:message, String.t()}

Functions

checkpoint(session_id, opts \\ [])

@spec checkpoint(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Write (or return) a named savepoint anchored at a specific message.

Options

  • :store — must implement SchemaStore (:ets or a custom row-shaped store). Returns {:error, :unsupported_store} for :in_memory / :jsonl.
  • :message_id — anchor message; defaults to the most-recent message.
  • :label — optional human-readable name for the snapshot.
  • :metadata — optional map stored inside the snapshot state.

Two calls with the same session_id, anchor message_id, label, and metadata return the same snapshot row (idempotent).

Emits [:ex_athena, :session, :checkpoint] with measurements %{message_count: n} and metadata %{session_id:, message_id:, snapshot_id:, store:, idempotent:}.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

fork(session_id, opts \\ [])

@spec fork(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Clone a session row and a prefix of its messages under a new session_id.

Options

  • :store — must implement SchemaStore. Returns {:error, :unsupported_store} otherwise.
  • :checkpoint_id — look up the snapshot row and use its message_id as the fork point.
  • :message_id — explicit message anchor (takes effect when no checkpoint_id is given).
  • :title — title for the new session; defaults to "<source_title> (fork)".
  • :copy_snapshots — when true, snapshot rows whose anchor message was included are copied with their message_id rewritten to the new session's corresponding message id. Defaults to false.

Returns {:ok, %{session_id: new_id, parent_id: source_id, message_count: n}} or {:error, reason}.

Emits [:ex_athena, :session, :fork] with measurements %{message_count: n} and metadata %{session_id: new_id, parent_id: source_id, store:, anchor_message_id:}.

messages(server)

@spec messages(GenServer.server()) :: [map()]

Return the current message list (for debugging / persistence).

resume(session_id, opts \\ [])

@spec resume(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, term()}

Resume a session by reading prior messages back from a store.

Options

  • :store:in_memory (default), :ets, :jsonl, or a module. When the store implements SchemaStore, the row tables are queried directly; otherwise the event-log is replayed.
  • :as — shape of the returned payload:
    • :messages (default) — {:ok, [Message.t()]}, backwards compatible.
    • :state{:ok, %Loop.State{messages: ..., session_id: ...}}.
    • :map{:ok, %{session_id:, messages:, last_user:, last_assistant:}}.
  • :replay_last_user_turn — when true, drops the trailing assistant message (and any trailing non-user messages) so callers can re-feed the last user prompt. Defaults to false.

Emits [:ex_athena, :session, :resume] telemetry with measurements %{message_count: n} and metadata %{session_id:, source:, store:}.

rewind(session_id, target, opts \\ [])

@spec rewind(String.t(), rewind_target(), keyword()) ::
  {:ok, map()} | {:error, term()}

Drop all messages after a snapshot or message anchor, leaving the session alive at that point.

Options

  • :store — must implement SchemaStore (:ets or a custom module). Returns {:error, :unsupported_store} for :in_memory / :jsonl.

target is one of:

  • {:snapshot, snapshot_id} — resolve the snapshot's anchor message, then delete everything after it.
  • {:message, message_id} — use the message directly as the anchor.

Returns {:ok, %{session_id:, anchor_message_id:, messages_deleted:, message_count:}} or {:error, :unsupported_store | :not_found}.

Snapshots beyond the anchor are preserved as potential redo targets.

Emits [:ex_athena, :session, :rewind] with measurements %{messages_deleted: n, message_count: m} and metadata %{session_id:, anchor_message_id:, target:, store:} where target is the atom :snapshot or :message.

send_message(server, message, opts \\ [])

@spec send_message(GenServer.server(), String.t(), keyword()) ::
  {:ok, map()} | {:error, term()}

Send a user message; blocks until the loop terminates.

session_id(server)

@spec session_id(GenServer.server()) :: String.t()

Return the stable session id assigned at start.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a session. Accepts the same options as ExAthena.Loop.run/2 plus:

  • :name — GenServer name.
  • :system_prompt — pinned system prompt used on every turn.
  • :store:in_memory (default), :ets, :jsonl, or a custom module implementing ExAthena.Sessions.Store. Per-turn events are persisted via the chosen store; resume/2 reads them back.
  • :messages — seed the conversation with a prior message history (e.g. from resume/2). Each entry is passed through Messages.from_map/1.
  • :session_id — reuse a stable id from a prior session; generated if absent.

stop(server)

@spec stop(GenServer.server()) :: :ok

Stop the session.