Agentix.Persistence behaviour (Agentix v0.1.0)

Copy Markdown View Source

The persistence behaviour and a thin dispatch to the configured adapter.

An append-only, ordered events log is the source of truth; everything else is a cache or a derived artifact. Two adapters ship: Agentix.Persistence.ETS (default, ephemeral) and, later, an Ecto/Postgres adapter. Both must pass the shared Agentix.PersistenceConformance suite, so callers cannot tell them apart.

Records crossing this boundary are plain maps (and Agentix.Event structs), never adapter-specific structs, so the core never depends on Ecto.

Record shapes

  • conversation%{id, settings, status, fsm_state}.
  • fsm_state%{state, pending, last_seq} (a cache over the log).
  • summary%{from_seq, to_seq, content, version} (+ adapter-assigned id/inserted_at).
  • tool_call%{id, conversation_id, executor, status, args, result, ...}.
  • model_call%{turn_ref, rendered_context, model, usage, ...} (audit, off by default).

Configure with config :agentix, :persistence, Agentix.Persistence.ETS (or {module, opts}).

Summary

Callbacks

Appends event to the conversation log, assigning the next per-conversation seq (monotonic, 1-based). Returns the assigned seq. Implementations must keep seq strictly increasing per conversation; concurrent appends to the same conversation are not expected (one agent writes per conversation).

Cancels a pending expiry scheduled by schedule_expiry/3.

Deletes audit rows older than ttl_ms (relative to now) for the conversation, returning the count removed. Used for TTL-based GC of the audit table.

Returns the conversation record (%{id, settings, status, fsm_state}) or nil.

Returns the tool-call record for tool_call_id, or nil.

Returns the summary with the greatest to_seq for the conversation, or nil.

Revival read: returns {latest_summary_or_nil, events_after_that_summary}. With no summary, returns {nil, all_events}. This is how a revived agent rebuilds its working set without replaying from seq 1.

Returns the conversation's audit rows ordered by turn_ref (empty when audit is off).

Returns the conversation's tool calls currently in :pending status.

Upserts the conversation record, merging attrs (:settings, :status, :fsm_state).

Upserts just the fsm_state cache (%{state, pending, last_seq}), creating the conversation row if absent. fsm_state is a cache over the log, never the source of truth.

Records a model_call audit row (exactly what was rendered to the model that turn). A no-op unless the audit flag is enabled (config :agentix, :audit).

Stores a derived compaction summary (keyed by its to_seq).

Atomically resolves a tool call only if it is currently :pending, setting its status and result. Returns {:error, :stale} if the id is unknown or already resolved/expired — this is what makes double-clicks, resubmits, and the kill→revive→late-answer race safe. Implementations must guarantee atomicity against concurrent resolvers.

Schedules tool_call_id to be resolved to a tool-error after timeout_ms if it is still pending. Owned by the adapter (not a per-agent timer) so it survives the agent being killed. Rescheduling the same call replaces the prior timer.

Returns the conversation's events ordered by ascending seq. Options

Inserts or replaces a tool-call record (keyed by its tool_call_id). Used to track HITL suspensions so they survive a kill. New records default to status: :pending.

Types

conversation()

@type conversation() :: %{
  id: conversation_id(),
  settings: map(),
  status: atom(),
  fsm_state: map()
}

conversation_id()

@type conversation_id() :: String.t()

model_call()

@type model_call() :: map()

seq()

@type seq() :: non_neg_integer()

summary()

@type summary() :: map()

tool_call()

@type tool_call() :: map()

tool_call_id()

@type tool_call_id() :: String.t()

Callbacks

append_event(conversation_id, t)

@callback append_event(conversation_id(), Agentix.Event.t()) ::
  {:ok, seq()} | {:error, term()}

Appends event to the conversation log, assigning the next per-conversation seq (monotonic, 1-based). Returns the assigned seq. Implementations must keep seq strictly increasing per conversation; concurrent appends to the same conversation are not expected (one agent writes per conversation).

cancel_expiry(conversation_id, tool_call_id)

@callback cancel_expiry(conversation_id(), tool_call_id()) :: :ok

Cancels a pending expiry scheduled by schedule_expiry/3.

gc_model_calls(conversation_id, non_neg_integer)

@callback gc_model_calls(conversation_id(), non_neg_integer()) :: {:ok, non_neg_integer()}

Deletes audit rows older than ttl_ms (relative to now) for the conversation, returning the count removed. Used for TTL-based GC of the audit table.

get_conversation(conversation_id)

@callback get_conversation(conversation_id()) :: conversation() | nil

Returns the conversation record (%{id, settings, status, fsm_state}) or nil.

get_tool_call(tool_call_id)

@callback get_tool_call(tool_call_id()) :: tool_call() | nil

Returns the tool-call record for tool_call_id, or nil.

latest_summary(conversation_id)

@callback latest_summary(conversation_id()) :: summary() | nil

Returns the summary with the greatest to_seq for the conversation, or nil.

load_since(conversation_id)

@callback load_since(conversation_id()) :: {summary() | nil, [Agentix.Event.t()]}

Revival read: returns {latest_summary_or_nil, events_after_that_summary}. With no summary, returns {nil, all_events}. This is how a revived agent rebuilds its working set without replaying from seq 1.

model_calls(conversation_id)

@callback model_calls(conversation_id()) :: [model_call()]

Returns the conversation's audit rows ordered by turn_ref (empty when audit is off).

pending_tool_calls(conversation_id)

@callback pending_tool_calls(conversation_id()) :: [tool_call()]

Returns the conversation's tool calls currently in :pending status.

put_conversation(conversation_id, map)

@callback put_conversation(conversation_id(), map()) :: :ok

Upserts the conversation record, merging attrs (:settings, :status, :fsm_state).

put_fsm_state(conversation_id, map)

@callback put_fsm_state(conversation_id(), map()) :: :ok

Upserts just the fsm_state cache (%{state, pending, last_seq}), creating the conversation row if absent. fsm_state is a cache over the log, never the source of truth.

put_model_call(conversation_id, model_call)

@callback put_model_call(conversation_id(), model_call()) :: :ok

Records a model_call audit row (exactly what was rendered to the model that turn). A no-op unless the audit flag is enabled (config :agentix, :audit).

put_summary(conversation_id, summary)

@callback put_summary(conversation_id(), summary()) :: :ok

Stores a derived compaction summary (keyed by its to_seq).

resolve_tool_call(tool_call_id, atom, arg3)

@callback resolve_tool_call(tool_call_id(), atom(), map() | nil) :: :ok | {:error, :stale}

Atomically resolves a tool call only if it is currently :pending, setting its status and result. Returns {:error, :stale} if the id is unknown or already resolved/expired — this is what makes double-clicks, resubmits, and the kill→revive→late-answer race safe. Implementations must guarantee atomicity against concurrent resolvers.

schedule_expiry(conversation_id, tool_call_id, pos_integer)

@callback schedule_expiry(conversation_id(), tool_call_id(), pos_integer()) :: :ok

Schedules tool_call_id to be resolved to a tool-error after timeout_ms if it is still pending. Owned by the adapter (not a per-agent timer) so it survives the agent being killed. Rescheduling the same call replaces the prior timer.

stream_events(conversation_id, keyword)

@callback stream_events(
  conversation_id(),
  keyword()
) :: [Agentix.Event.t()]

Returns the conversation's events ordered by ascending seq. Options:

  • :after — exclusive seq lower bound (default 0);
  • :before — exclusive seq upper bound (default unbounded);
  • :limit — keep at most this many events, the most recent within the (after, before) range (the tail), still returned in ascending order.

:before + :limit is the backward-pagination primitive: read the newest page, then page older with before: set to the oldest seq already loaded.

upsert_tool_call(conversation_id, tool_call)

@callback upsert_tool_call(conversation_id(), tool_call()) :: :ok

Inserts or replaces a tool-call record (keyed by its tool_call_id). Used to track HITL suspensions so they survive a kill. New records default to status: :pending.

Functions

adapter()

@spec adapter() :: module()

The configured persistence adapter module.

append_event(conversation_id, event)

@spec append_event(conversation_id(), Agentix.Event.t()) ::
  {:ok, seq()} | {:error, term()}

cancel_expiry(conversation_id, tool_call_id)

@spec cancel_expiry(conversation_id(), tool_call_id()) :: :ok

gc_model_calls(conversation_id, ttl_ms)

@spec gc_model_calls(conversation_id(), non_neg_integer()) :: {:ok, non_neg_integer()}

get_conversation(conversation_id)

@spec get_conversation(conversation_id()) :: conversation() | nil

get_tool_call(tool_call_id)

@spec get_tool_call(tool_call_id()) :: tool_call() | nil

latest_summary(conversation_id)

@spec latest_summary(conversation_id()) :: summary() | nil

load_since(conversation_id)

@spec load_since(conversation_id()) :: {summary() | nil, [Agentix.Event.t()]}

model_calls(conversation_id)

@spec model_calls(conversation_id()) :: [model_call()]

pending_tool_calls(conversation_id)

@spec pending_tool_calls(conversation_id()) :: [tool_call()]

put_conversation(conversation_id, attrs)

@spec put_conversation(conversation_id(), map()) :: :ok

put_fsm_state(conversation_id, fsm_state)

@spec put_fsm_state(conversation_id(), map()) :: :ok

put_model_call(conversation_id, model_call)

@spec put_model_call(conversation_id(), model_call()) :: :ok

put_summary(conversation_id, summary)

@spec put_summary(conversation_id(), summary()) :: :ok

resolve_tool_call(tool_call_id, status, result)

@spec resolve_tool_call(tool_call_id(), atom(), map() | nil) :: :ok | {:error, :stale}

schedule_expiry(conversation_id, tool_call_id, timeout_ms)

@spec schedule_expiry(conversation_id(), tool_call_id(), pos_integer()) :: :ok

stream_events(conversation_id, opts \\ [])

@spec stream_events(
  conversation_id(),
  keyword()
) :: [Agentix.Event.t()]

upsert_tool_call(conversation_id, tool_call)

@spec upsert_tool_call(conversation_id(), tool_call()) :: :ok