Context for episode CRUD operations and lifecycle management.
Summary
Functions
Return the most recent still-active (:running or :blocked) episode for a
conversation, or nil. Useful for a "stop" button that needs the current
turn's episode id.
Execute the cancellation sequence for an episode (spec §4.9).
Cancel running and blocked episodes for the given actor and expectation.
Attempt to claim a stale episode for recovery via optimistic update.
Count episodes for the given actor ID(s). Accepts same filter opts as list_by_actors.
Force-fires an episode for an actor/expectation pair.
Return the started_at of the most recent schedule-triggered episode
for the given actor and expectation. Returns nil if none found.
List episodes for the given actor ID(s).
List episodes for the given actor(s) where the trigger payload or workflow input contains the given subject key/value pair.
List episodes for a conversation, ordered by start time.
List :running episodes with no recent step journal activity.
Returns the next step number for an episode's journal (current max + 1, or 1 when there are no steps yet).
Request cancellation of an episode (e.g. a UI "stop" button). Cooperative and safe mid-flight, and never touches the parent conversation
Resolve an approval on a blocked interactive episode. Verifies the plan_hash matches the approval_requested step.
Functions
Return the most recent still-active (:running or :blocked) episode for a
conversation, or nil. Useful for a "stop" button that needs the current
turn's episode id.
Execute the cancellation sequence for an episode (spec §4.9).
- Journal :episode_failed step with error_class "canceled"
- Set status to :canceled
- Cancel pending outputs (:proposed → :canceled)
- Publish "episode.canceled" Bus event
- Emit [:cyclium, :episode, :canceled] telemetry
Attempt to claim a stale episode for recovery via optimistic update.
Sets phase to "recovering" only if the episode is still :running.
Returns {:ok, episode} if claimed, {:error, :already_claimed} if
another node got there first.
Count episodes for the given actor ID(s). Accepts same filter opts as list_by_actors.
Force-fires an episode for an actor/expectation pair.
Options
:mode—:live(default) or:dry_run:trigger_payload— map of trigger data:overrides— dry run overrides map with optional keys:"tool_overrides"—%{"capability.action" => mock_result}"synthesis_override"— mock synthesis result
Returns {:ok, episode} or {:error, reason}.
Return the started_at of the most recent schedule-triggered episode
for the given actor and expectation. Returns nil if none found.
Used by schedule timers on actor init to compute correct delay after restarts (surviving missed windows).
List episodes for the given actor ID(s).
Options
:statuses— list of status atoms to filter by (default: all):limit— max rows to return (default: 50):offset— rows to skip (default: 0):order—:ascor:descbystarted_at(default::desc):exclude_archived— whentrue, excludes episodes with a non-nilarchived_at(default:false)
List episodes for the given actor(s) where the trigger payload or workflow input contains the given subject key/value pair.
Queries two JSON paths to cover both trigger types:
- Event-triggered:
trigger_ref.payload.<key> - Workflow-triggered:
trigger_ref.input.<key>
Detects the repo adapter at runtime and uses the appropriate JSON text
extraction — Postgres #>> or SQL Server JSON_VALUE.
Options
Same as list_by_actors/2.
List episodes for a conversation, ordered by start time.
List :running episodes with no recent step journal activity.
An episode is considered stale if:
- Its most recent
EpisodeStep.created_atis older thanstale_after_ms, OR - It has no steps and its
started_atis older than the threshold
Options
:source_stack— restrict the scan to episodes created by this stack. Pre-migration rows withNULLsource_stack are included (treated as "match any stack") to avoid orphaning legacy episodes. When omitted, the scan is unscoped.:source_env— restrict the scan to episodes created by this env, matched by strict equality (so an env-tagged node won't recover the default node's work and vice-versa). Passnilto scope to the unset/default env (includes legacyNULLrows). When omitted, env is not filtered.
Used by Cyclium.Recovery.sweep/1 to find orphaned episodes after deploys.
@spec next_step_no(binary()) :: pos_integer()
Returns the next step number for an episode's journal (current max + 1, or 1 when there are no steps yet).
Shared by the episode runner and the output router so step numbering stays consistent across both. Steps for a single episode are journaled serially by one process, so the read-then-increment is race-free in practice.
Request cancellation of an episode (e.g. a UI "stop" button). Cooperative and safe mid-flight, and never touches the parent conversation:
:running— broadcasts to the worker (any node), which stops at its next step boundary and writes:canceleditself. Returns{:ok, :requested}.:blocked— no worker running, so cancel directly. Returns{:ok, :canceled}.- terminal —
{:error, {:not_active, status}}.
Resolve an approval on a blocked interactive episode. Verifies the plan_hash matches the approval_requested step.