The persistence behaviour and a thin dispatch to the configured adapter.
An append-only, ordered events log is the source of truth; everything else is
a cache or a derived artifact. Two adapters ship: Agentix.Persistence.ETS
(default, ephemeral) and, later, an Ecto/Postgres adapter. Both must pass the
shared Agentix.PersistenceConformance suite, so callers cannot tell them apart.
Records crossing this boundary are plain maps (and Agentix.Event structs),
never adapter-specific structs, so the core never depends on Ecto.
Record shapes
- conversation —
%{id, settings, status, fsm_state}. - fsm_state —
%{state, pending, last_seq}(a cache over the log). - summary —
%{from_seq, to_seq, content, version}(+ adapter-assigned id/inserted_at). - tool_call —
%{id, conversation_id, executor, status, args, result, ...}. - model_call —
%{turn_ref, rendered_context, model, usage, ...}(audit, off by default).
Configure with config :agentix, :persistence, Agentix.Persistence.ETS (or
{module, opts}).
Summary
Callbacks
Appends event to the conversation log, assigning the next per-conversation
seq (monotonic, 1-based). Returns the assigned seq. Implementations must keep
seq strictly increasing per conversation; concurrent appends to the same
conversation are not expected (one agent writes per conversation).
Cancels a pending expiry scheduled by schedule_expiry/3.
Deletes audit rows older than ttl_ms (relative to now) for the conversation,
returning the count removed. Used for TTL-based GC of the audit table.
Returns the conversation record (%{id, settings, status, fsm_state}) or nil.
Returns the tool-call record for tool_call_id, or nil.
Returns the summary with the greatest to_seq for the conversation, or nil.
Revival read: returns {latest_summary_or_nil, events_after_that_summary}. With
no summary, returns {nil, all_events}. This is how a revived agent rebuilds its
working set without replaying from seq 1.
Returns the conversation's audit rows ordered by turn_ref (empty when audit is off).
Returns the conversation's tool calls currently in :pending status.
Upserts the conversation record, merging attrs (:settings, :status, :fsm_state).
Upserts just the fsm_state cache (%{state, pending, last_seq}), creating the
conversation row if absent. fsm_state is a cache over the log, never the source
of truth.
Records a model_call audit row (exactly what was rendered to the model that
turn). A no-op unless the audit flag is enabled (config :agentix, :audit).
Stores a derived compaction summary (keyed by its to_seq).
Atomically resolves a tool call only if it is currently :pending, setting
its status and result. Returns {:error, :stale} if the id is unknown or
already resolved/expired — this is what makes double-clicks, resubmits, and the
kill→revive→late-answer race safe. Implementations must guarantee atomicity
against concurrent resolvers.
Schedules tool_call_id to be resolved to a tool-error after timeout_ms if it
is still pending. Owned by the adapter (not a per-agent timer) so it survives the
agent being killed. Rescheduling the same call replaces the prior timer.
Returns the conversation's events ordered by ascending seq. Options
Inserts or replaces a tool-call record (keyed by its tool_call_id). Used to
track HITL suspensions so they survive a kill. New records default to
status: :pending.
Functions
The configured persistence adapter module.
Types
@type conversation() :: %{ id: conversation_id(), settings: map(), status: atom(), fsm_state: map() }
@type conversation_id() :: String.t()
@type model_call() :: map()
@type seq() :: non_neg_integer()
@type summary() :: map()
@type tool_call() :: map()
@type tool_call_id() :: String.t()
Callbacks
@callback append_event(conversation_id(), Agentix.Event.t()) :: {:ok, seq()} | {:error, term()}
Appends event to the conversation log, assigning the next per-conversation
seq (monotonic, 1-based). Returns the assigned seq. Implementations must keep
seq strictly increasing per conversation; concurrent appends to the same
conversation are not expected (one agent writes per conversation).
@callback cancel_expiry(conversation_id(), tool_call_id()) :: :ok
Cancels a pending expiry scheduled by schedule_expiry/3.
@callback gc_model_calls(conversation_id(), non_neg_integer()) :: {:ok, non_neg_integer()}
Deletes audit rows older than ttl_ms (relative to now) for the conversation,
returning the count removed. Used for TTL-based GC of the audit table.
@callback get_conversation(conversation_id()) :: conversation() | nil
Returns the conversation record (%{id, settings, status, fsm_state}) or nil.
@callback get_tool_call(tool_call_id()) :: tool_call() | nil
Returns the tool-call record for tool_call_id, or nil.
@callback latest_summary(conversation_id()) :: summary() | nil
Returns the summary with the greatest to_seq for the conversation, or nil.
@callback load_since(conversation_id()) :: {summary() | nil, [Agentix.Event.t()]}
Revival read: returns {latest_summary_or_nil, events_after_that_summary}. With
no summary, returns {nil, all_events}. This is how a revived agent rebuilds its
working set without replaying from seq 1.
@callback model_calls(conversation_id()) :: [model_call()]
Returns the conversation's audit rows ordered by turn_ref (empty when audit is off).
@callback pending_tool_calls(conversation_id()) :: [tool_call()]
Returns the conversation's tool calls currently in :pending status.
@callback put_conversation(conversation_id(), map()) :: :ok
Upserts the conversation record, merging attrs (:settings, :status, :fsm_state).
@callback put_fsm_state(conversation_id(), map()) :: :ok
Upserts just the fsm_state cache (%{state, pending, last_seq}), creating the
conversation row if absent. fsm_state is a cache over the log, never the source
of truth.
@callback put_model_call(conversation_id(), model_call()) :: :ok
Records a model_call audit row (exactly what was rendered to the model that
turn). A no-op unless the audit flag is enabled (config :agentix, :audit).
@callback put_summary(conversation_id(), summary()) :: :ok
Stores a derived compaction summary (keyed by its to_seq).
@callback resolve_tool_call(tool_call_id(), atom(), map() | nil) :: :ok | {:error, :stale}
Atomically resolves a tool call only if it is currently :pending, setting
its status and result. Returns {:error, :stale} if the id is unknown or
already resolved/expired — this is what makes double-clicks, resubmits, and the
kill→revive→late-answer race safe. Implementations must guarantee atomicity
against concurrent resolvers.
@callback schedule_expiry(conversation_id(), tool_call_id(), pos_integer()) :: :ok
Schedules tool_call_id to be resolved to a tool-error after timeout_ms if it
is still pending. Owned by the adapter (not a per-agent timer) so it survives the
agent being killed. Rescheduling the same call replaces the prior timer.
@callback stream_events( conversation_id(), keyword() ) :: [Agentix.Event.t()]
Returns the conversation's events ordered by ascending seq. Options:
:after— exclusiveseqlower bound (default 0);:before— exclusivesequpper bound (default unbounded);:limit— keep at most this many events, the most recent within the(after, before)range (the tail), still returned in ascending order.
:before + :limit is the backward-pagination primitive: read the newest page,
then page older with before: set to the oldest seq already loaded.
@callback upsert_tool_call(conversation_id(), tool_call()) :: :ok
Inserts or replaces a tool-call record (keyed by its tool_call_id). Used to
track HITL suspensions so they survive a kill. New records default to
status: :pending.
Functions
@spec adapter() :: module()
The configured persistence adapter module.
@spec append_event(conversation_id(), Agentix.Event.t()) :: {:ok, seq()} | {:error, term()}
@spec cancel_expiry(conversation_id(), tool_call_id()) :: :ok
@spec gc_model_calls(conversation_id(), non_neg_integer()) :: {:ok, non_neg_integer()}
@spec get_conversation(conversation_id()) :: conversation() | nil
@spec get_tool_call(tool_call_id()) :: tool_call() | nil
@spec latest_summary(conversation_id()) :: summary() | nil
@spec load_since(conversation_id()) :: {summary() | nil, [Agentix.Event.t()]}
@spec model_calls(conversation_id()) :: [model_call()]
@spec pending_tool_calls(conversation_id()) :: [tool_call()]
@spec put_conversation(conversation_id(), map()) :: :ok
@spec put_fsm_state(conversation_id(), map()) :: :ok
@spec put_model_call(conversation_id(), model_call()) :: :ok
@spec put_summary(conversation_id(), summary()) :: :ok
@spec resolve_tool_call(tool_call_id(), atom(), map() | nil) :: :ok | {:error, :stale}
@spec schedule_expiry(conversation_id(), tool_call_id(), pos_integer()) :: :ok
@spec stream_events( conversation_id(), keyword() ) :: [Agentix.Event.t()]
@spec upsert_tool_call(conversation_id(), tool_call()) :: :ok